提交 3c698b24 编写于 作者: B Benguang Zhao

enh: add mndAddPrepareNewVgAction

上级 873ad883
...@@ -70,7 +70,7 @@ int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); ...@@ -70,7 +70,7 @@ int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendNullLog(STrans *pTrans); int32_t mndTransAppendNullLog(STrans *pTrans);
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pRaw); int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
......
...@@ -36,6 +36,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); ...@@ -36,6 +36,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId); SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid); int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType); int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
......
...@@ -414,6 +414,13 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -414,6 +414,13 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
} }
static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
}
return 0;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb); SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL) return -1; if (pDbRaw == NULL) return -1;
...@@ -424,7 +431,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD ...@@ -424,7 +431,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1; if (sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE) != 0) return -1;
} }
return 0; return 0;
...@@ -575,6 +582,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, ...@@ -575,6 +582,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetOper(pTrans, MND_OPER_CREATE_DB); mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
......
...@@ -388,7 +388,7 @@ static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVg ...@@ -388,7 +388,7 @@ static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVg
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1; if (sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE) != 0) return -1;
return 0; return 0;
} }
...@@ -626,7 +626,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -626,7 +626,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
......
...@@ -1333,6 +1333,19 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) ...@@ -1333,6 +1333,19 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true; bool continueExec = true;
int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
if (numOfActions == 0) goto _OVER;
mInfo("trans:%d, execute %d prepare actions.", pTrans->id, numOfActions);
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
mndTransExecSingleAction(pMnode, pTrans, pAction);
}
_OVER:
pTrans->stage = TRN_STAGE_REDO_ACTION; pTrans->stage = TRN_STAGE_REDO_ACTION;
mInfo("trans:%d, stage from prepare to redoAction", pTrans->id); mInfo("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec; return continueExec;
......
...@@ -483,15 +483,15 @@ static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t v ...@@ -483,15 +483,15 @@ static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t v
return pReq; return pReq;
} }
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dstVgId, int32_t *pContLen) { static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeHashRangeReq alterReq = { SAlterVnodeHashRangeReq alterReq = {
.srcVgId = pVgroup->vgId, .srcVgId = srcVgId,
.dstVgId = dstVgId, .dstVgId = pVgroup->vgId,
.hashBegin = pVgroup->hashBegin, .hashBegin = pVgroup->hashBegin,
.hashEnd = pVgroup->hashEnd, .hashEnd = pVgroup->hashEnd,
}; };
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", pVgroup->vgId, dstVgId, mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
pVgroup->hashBegin, pVgroup->hashEnd); pVgroup->hashBegin, pVgroup->hashEnd);
int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq); int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
if (contLen < 0) { if (contLen < 0) {
...@@ -1207,12 +1207,12 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD ...@@ -1207,12 +1207,12 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0; return 0;
} }
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dstVgId) { static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, pVgroup, dstVgId, &contLen); void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
...@@ -1247,6 +1247,21 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb ...@@ -1247,6 +1247,21 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb
return 0; return 0;
} }
int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err;
STransAction action = {.pRaw = pRaw};
if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
pRaw = NULL;
return 0;
_err:
sdbFreeRaw(pRaw);
return -1;
}
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) { int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode == NULL) return -1; if (pDnode == NULL) return -1;
...@@ -2313,12 +2328,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro ...@@ -2313,12 +2328,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
// alter vgId and hash range // alter vgId and hash range
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER; int32_t srcVgId = newVg1.vgId;
newVg1.vgId = maxVgId; newVg1.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
maxVgId++; maxVgId++;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER; srcVgId = newVg2.vgId;
newVg2.vgId = maxVgId; newVg2.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
......
...@@ -122,6 +122,7 @@ typedef enum { ...@@ -122,6 +122,7 @@ typedef enum {
SDB_STATUS_DROPPING = 2, SDB_STATUS_DROPPING = 2,
SDB_STATUS_DROPPED = 3, SDB_STATUS_DROPPED = 3,
SDB_STATUS_READY = 4, SDB_STATUS_READY = 4,
SDB_STATUS_UPDATE = 5,
} ESdbStatus; } ESdbStatus;
typedef enum { typedef enum {
......
...@@ -256,6 +256,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -256,6 +256,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize); code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
case SDB_STATUS_READY: case SDB_STATUS_READY:
case SDB_STATUS_UPDATE:
case SDB_STATUS_DROPPING: case SDB_STATUS_DROPPING:
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize); code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册