提交 d8d6b6dd 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/TD-24749

...@@ -150,7 +150,7 @@ enum { ...@@ -150,7 +150,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_HB, "consumer-hb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DO_REBALANCE, "do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_DROP_CGROUP, "drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_UNUSED2, "unused2", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_VG, "create-vg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_TIMER, "tmq-tmr", SMTimerReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
......
...@@ -154,14 +154,14 @@ typedef struct SSnapshotMeta { ...@@ -154,14 +154,14 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm); SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm); int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm);
......
...@@ -345,7 +345,7 @@ int32_t* taosGetErrno(); ...@@ -345,7 +345,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D4) #define TSDB_CODE_MND_TRANS_CLOG_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x03D4)
#define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5) #define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5)
#define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal #define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal
#define TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7) #define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7)
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF) #define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF)
// mnode-mq // mnode-mq
......
...@@ -108,7 +108,7 @@ typedef enum { ...@@ -108,7 +108,7 @@ typedef enum {
TRN_STAGE_UNDO_ACTION = 3, TRN_STAGE_UNDO_ACTION = 3,
TRN_STAGE_COMMIT = 4, TRN_STAGE_COMMIT = 4,
TRN_STAGE_COMMIT_ACTION = 5, TRN_STAGE_COMMIT_ACTION = 5,
TRN_STAGE_FINISHED = 6, TRN_STAGE_FINISH = 6,
TRN_STAGE_PRE_FINISH = 7 TRN_STAGE_PRE_FINISH = 7
} ETrnStage; } ETrnStage;
...@@ -157,6 +157,7 @@ typedef struct { ...@@ -157,6 +157,7 @@ typedef struct {
void* rpcRsp; void* rpcRsp;
int32_t rpcRspLen; int32_t rpcRspLen;
int32_t redoActionPos; int32_t redoActionPos;
SArray* prepareActions;
SArray* redoActions; SArray* redoActions;
SArray* undoActions; SArray* undoActions;
SArray* commitActions; SArray* commitActions;
......
...@@ -70,6 +70,7 @@ int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); ...@@ -70,6 +70,7 @@ int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendNullLog(STrans *pTrans); int32_t mndTransAppendNullLog(STrans *pTrans);
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
...@@ -78,15 +79,23 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam ...@@ -78,15 +79,23 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam
void mndTransSetSerial(STrans *pTrans); void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans); void mndTransSetParallel(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper); void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans); int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans);
static int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans) {
return mndTransCheckConflict(pMnode, pTrans);
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
int32_t mndTransProcessRsp(SRpcMsg *pRsp); int32_t mndTransProcessRsp(SRpcMsg *pRsp);
void mndTransPullup(SMnode *pMnode); void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans); int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader); void mndTransExecute(SMnode *pMnode, STrans *pTrans);
void mndTransRefresh(SMnode *pMnode, STrans *pTrans);
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname); int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
SSdbRaw *mndTransEncode(STrans *pTrans);
SSdbRow *mndTransDecode(SSdbRaw *pRaw);
void mndTransDropData(STrans *pTrans);
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -27,6 +27,7 @@ void mndCleanupVgroup(SMnode *pMnode); ...@@ -27,6 +27,7 @@ void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup); SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
void mndSortVnodeGid(SVgObj *pVgroup); void mndSortVnodeGid(SVgObj *pVgroup);
...@@ -36,6 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); ...@@ -36,6 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId); SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid); int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType); int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
......
...@@ -414,6 +414,13 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -414,6 +414,13 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
} }
static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
}
return 0;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb); SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL) return -1; if (pDbRaw == NULL) return -1;
...@@ -424,7 +431,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD ...@@ -424,7 +431,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1; if (sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE) != 0) return -1;
} }
return 0; return 0;
...@@ -589,9 +596,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, ...@@ -589,9 +596,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mInfo("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mInfo("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbName(pTrans, dbObj.name, NULL); mndTransSetDbName(pTrans, dbObj.name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetOper(pTrans, MND_OPER_CREATE_DB); mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
...@@ -832,7 +840,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p ...@@ -832,7 +840,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
int32_t code = -1; int32_t code = -1;
mndTransSetDbName(pTrans, pOld->name, NULL); mndTransSetDbName(pTrans, pOld->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
...@@ -1129,7 +1137,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { ...@@ -1129,7 +1137,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mInfo("trans:%d start to drop db:%s", pTrans->id, pDb->name); mInfo("trans:%d start to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
goto _OVER; goto _OVER;
} }
......
...@@ -632,7 +632,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC ...@@ -632,7 +632,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep); mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
pRaw = mndDnodeActionEncode(&dnodeObj); pRaw = mndDnodeActionEncode(&dnodeObj);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
...@@ -889,7 +889,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM ...@@ -889,7 +889,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force); mInfo("trans:%d, used to drop dnode:%d, force:%d", pTrans->id, pDnode->id, force);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
pRaw = mndDnodeActionEncode(pDnode); pRaw = mndDnodeActionEncode(pDnode);
if (pRaw == NULL) goto _OVER; if (pRaw == NULL) goto _OVER;
......
...@@ -645,7 +645,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt ...@@ -645,7 +645,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt
// mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name); // mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
...@@ -721,7 +721,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p ...@@ -721,7 +721,7 @@ static int32_t mndDropIdx(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SIdxObj *p
mInfo("trans:%d, used to drop idx:%s", pTrans->id, pIdx->name); mInfo("trans:%d, used to drop idx:%s", pTrans->id, pIdx->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
if (mndSetDropIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER; if (mndSetDropIdxRedoLogs(pMnode, pTrans, pIdx) != 0) goto _OVER;
...@@ -860,4 +860,4 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -860,4 +860,4 @@ int32_t mndDropIdxsByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
} }
return 0; return 0;
} }
\ No newline at end of file
...@@ -578,7 +578,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, ...@@ -578,7 +578,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
SMnodeObj mnodeObj = {0}; SMnodeObj mnodeObj = {0};
mnodeObj.id = pDnode->id; mnodeObj.id = pDnode->id;
...@@ -732,7 +732,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { ...@@ -732,7 +732,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER; if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......
...@@ -388,7 +388,7 @@ static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVg ...@@ -388,7 +388,7 @@ static int32_t mndSetCreateSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVg
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING) != 0) return -1; if (sdbSetRawStatus(pVgRaw, SDB_STATUS_UPDATE) != 0) return -1;
return 0; return 0;
} }
...@@ -622,11 +622,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -622,11 +622,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
...@@ -845,7 +845,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -845,7 +845,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
......
...@@ -874,7 +874,7 @@ _OVER: ...@@ -874,7 +874,7 @@ _OVER:
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1; if (mndTransCheckConflict(pMnode, pTrans) != 0) return -1;
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
...@@ -1968,7 +1968,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb ...@@ -1968,7 +1968,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name); mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (needRsp) { if (needRsp) {
void *pCont = NULL; void *pCont = NULL;
...@@ -1998,7 +1998,7 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO ...@@ -1998,7 +1998,7 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDbO
mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name); mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (needRsp) { if (needRsp) {
void *pCont = NULL; void *pCont = NULL;
...@@ -2242,7 +2242,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p ...@@ -2242,7 +2242,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
...@@ -3298,7 +3298,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) { ...@@ -3298,7 +3298,7 @@ static int32_t mndCheckIndexReq(SCreateTagIndexReq *pReq) {
mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name); mInfo("trans:%d, used to add index to stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER;
......
...@@ -735,7 +735,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -735,7 +735,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
goto _OVER; goto _OVER;
} }
...@@ -890,7 +890,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { ...@@ -890,7 +890,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint");
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
...@@ -1001,7 +1001,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -1001,7 +1001,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
...@@ -1369,7 +1369,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { ...@@ -1369,7 +1369,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
...@@ -1477,7 +1477,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { ...@@ -1477,7 +1477,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
......
...@@ -549,7 +549,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -549,7 +549,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
} }
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
return -1; return -1;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndVgroup.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) { if (pMsg == NULL || pMsg->pCont == NULL) {
...@@ -73,76 +74,200 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -73,76 +74,200 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
SMnode *pMnode = pFsm->data; SSdbRow *pRow = NULL;
int32_t code = -1;
if (pAction->msgType == TDMT_MND_CREATE_VG) {
pRow = mndVgroupActionDecode(pAction->pRaw);
if (pRow == NULL) goto _OUT;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OUT;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId,
maxVgId);
goto _OUT;
}
}
code = 0;
_OUT:
taosMemoryFreeClear(pRow);
return code;
}
static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = -1;
int32_t action = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->prepareActions);
if (numOfActions == 0) {
code = 0;
goto _OUT;
}
mInfo("trans:%d, validate %d prepare actions.", pTrans->id, numOfActions);
for (action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
if (pAction->actionType != TRANS_ACTION_RAW) {
mError("trans:%d, prepare action:%d of unexpected type:%d", pTrans->id, action, pAction->actionType);
goto _OUT;
}
code = mndTransValidatePrepareAction(pMnode, pTrans, pAction);
if (code != 0) {
mError("trans:%d, failed to validate prepare action: %d, numOfActions:%d", pTrans->id, action, numOfActions);
goto _OUT;
}
}
code = 0;
_OUT:
return code;
}
static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) {
if (pTrans->stage == TRN_STAGE_PREPARE) {
if (mndTransCheckConflict(pMnode, pTrans) < 0) {
mError("trans:%d, failed to validate trans conflicts.", pTrans->id);
return -1;
}
return mndTransValidatePrepareStage(pMnode, pTrans);
}
return 0;
}
static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) {
STrans *pTrans = NULL;
int32_t code = -1;
SSdbRow *pRow = mndTransDecode(pRaw);
if (pRow == NULL) goto _OUT;
pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) goto _OUT;
code = mndTransValidateImp(pMnode, pTrans);
_OUT:
if (pTrans) mndTransDropData(pTrans);
if (pRow) taosMemoryFreeClear(pRow);
if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT);
return code;
}
int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
terrno = TSDB_CODE_SUCCESS;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
STrans *pTrans = NULL;
int32_t code = -1;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
if (transId <= 0) {
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
terrno = TSDB_CODE_INVALID_MSG;
goto _OUT;
}
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p sec:%d seq:%" PRId64, " role:%s raw:%p sec:%d seq:%" PRId64,
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw, pMgmt->transSec, pMgmt->transSeq); pRaw, pMgmt->transSec, pMgmt->transSeq);
if (pMeta->code == 0) { code = mndTransValidate(pMnode, pRaw);
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw); if (code != 0) {
if (code != 0) { mError("trans:%d, failed to validate requested trans since %s", transId, terrstr());
mError("trans:%d, failed to write to sdb since %s", transId, terrstr()); code = 0;
return 0; pMeta->code = terrno;
} goto _OUT;
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); }
code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
code = 0;
pMeta->code = terrno;
goto _OUT;
}
pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
goto _OUT;
} }
if (pTrans->stage == TRN_STAGE_PREPARE) {
bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
if (!continueExec) goto _OUT;
}
if (pTrans->id != pMgmt->transId) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
pTrans->id, pTrans->createdTime, pMgmt->transId);
mndTransRefresh(pMnode, pTrans);
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
code = 0;
_OUT:
if (pTrans) mndReleaseTrans(pMnode, pTrans);
return code;
}
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = pMeta->code; if (pMgmt->transId == 0) {
goto _OUT;
}
if (transId <= 0) { pMgmt->transId = 0;
taosThreadMutexUnlock(&pMgmt->lock); pMgmt->transSec = 0;
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); pMgmt->transSeq = 0;
} else if (transId == pMgmt->transId) { pMgmt->errCode = code;
if (pMgmt->errCode != 0) { tsem_post(&pMgmt->syncSem);
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else { if (pMgmt->errCode != 0) {
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq); mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode));
}
pMgmt->transId = 0;
pMgmt->transSec = 0;
pMgmt->transSeq = 0;
tsem_post(&pMgmt->syncSem);
taosThreadMutexUnlock(&pMgmt->lock);
} else { } else {
taosThreadMutexUnlock(&pMgmt->lock); mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq);
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
transId, pTrans->createdTime, pMgmt->transId);
mndTransExecute(pMnode, pTrans, false);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
}
} }
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); _OUT:
taosThreadMutexUnlock(&pMgmt->lock);
return 0; return 0;
} }
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
int32_t code = 0; SMnode *pMnode = pFsm->data;
int32_t code = pMsg->code;
if (code != 0) {
goto _OUT;
}
pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyIndex = pMeta->index;
pMsg->info.conn.applyTerm = pMeta->term; pMsg->info.conn.applyTerm = pMeta->term;
pMeta->code = 0;
if (pMsg->code == 0) { atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
SMnode *pMnode = pFsm->data;
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
}
if (!syncUtilUserCommit(pMsg->msgType)) { if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out; goto _OUT;
} }
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
_out: code = mndProcessWriteMsg(pMnode, pMsg, pMeta);
_OUT:
mndPostMgmtCode(pMnode, code ? code : pMeta->code);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return code; return code;
......
...@@ -753,7 +753,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -753,7 +753,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
} }
mndTransSetDbName(pTrans, pTopic->db, NULL); mndTransSetDbName(pTrans, pTopic->db, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#define VGROUP_VER_NUMBER 1 #define VGROUP_VER_NUMBER 1
#define VGROUP_RESERVE_SIZE 64 #define VGROUP_RESERVE_SIZE 64
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
...@@ -483,15 +482,15 @@ static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t v ...@@ -483,15 +482,15 @@ static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t v
return pReq; return pReq;
} }
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dstVgId, int32_t *pContLen) { static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVgObj *pVgroup, int32_t *pContLen) {
SAlterVnodeHashRangeReq alterReq = { SAlterVnodeHashRangeReq alterReq = {
.srcVgId = pVgroup->vgId, .srcVgId = srcVgId,
.dstVgId = dstVgId, .dstVgId = pVgroup->vgId,
.hashBegin = pVgroup->hashBegin, .hashBegin = pVgroup->hashBegin,
.hashEnd = pVgroup->hashEnd, .hashEnd = pVgroup->hashEnd,
}; };
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", pVgroup->vgId, dstVgId, mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
pVgroup->hashBegin, pVgroup->hashEnd); pVgroup->hashBegin, pVgroup->hashEnd);
int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq); int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
if (contLen < 0) { if (contLen < 0) {
...@@ -1207,12 +1206,12 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD ...@@ -1207,12 +1206,12 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0; return 0;
} }
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dstVgId) { static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, pVgroup, dstVgId, &contLen); void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, srcVgId, pVgroup, &contLen);
if (pReq == NULL) return -1; if (pReq == NULL) return -1;
action.pCont = pReq; action.pCont = pReq;
...@@ -1247,6 +1246,21 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb ...@@ -1247,6 +1246,21 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb
return 0; return 0;
} }
int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err;
STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG};
if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
pRaw = NULL;
return 0;
_err:
sdbFreeRaw(pRaw);
return -1;
}
int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) { int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode == NULL) return -1; if (pDnode == NULL) return -1;
...@@ -2241,10 +2255,13 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, ...@@ -2241,10 +2255,13 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
return 0; return 0;
} }
static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) { typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
SSdbRaw *pRaw = mndVgroupActionEncode(pVg); SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err; if (pRaw == NULL) goto _err;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err; if (appendActionCb(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, vgStatus); (void)sdbSetRawStatus(pRaw, vgStatus);
pRaw = NULL; pRaw = NULL;
return 0; return 0;
...@@ -2253,18 +2270,32 @@ _err: ...@@ -2253,18 +2270,32 @@ _err:
return -1; return -1;
} }
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
SSdbRaw *pRaw = mndDbActionEncode(pDb);
if (pRaw == NULL) goto _err;
if (appendActionCb(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, dbStatus);
pRaw = NULL;
return 0;
_err:
sdbFreeRaw(pRaw);
return -1;
}
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = NULL; STrans *pTrans = NULL;
SSdbRaw *pRaw = NULL;
SDbObj dbObj = {0}; SDbObj dbObj = {0};
SArray *pArray = mndBuildDnodesArray(pMnode, 0); SArray *pArray = mndBuildDnodesArray(pMnode, 0);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "split-vgroup"); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId); mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
mndTransSetDbName(pTrans, pDb->name, NULL);
SVgObj newVg1 = {0}; SVgObj newVg1 = {0};
memcpy(&newVg1, pVgroup, sizeof(SVgObj)); memcpy(&newVg1, pVgroup, sizeof(SVgObj));
mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica, mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
...@@ -2316,45 +2347,54 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro ...@@ -2316,45 +2347,54 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
// alter vgId and hash range // alter vgId and hash range
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER; int32_t srcVgId = newVg1.vgId;
newVg1.vgId = maxVgId; newVg1.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
maxVgId++; maxVgId++;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER; srcVgId = newVg2.vgId;
newVg2.vgId = maxVgId; newVg2.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
if (mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
if (mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
if (mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
// update db status
memcpy(&dbObj, pDb, sizeof(SDbObj));
if (dbObj.cfg.pRetensions != NULL) {
dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
if (dbObj.cfg.pRetensions == NULL) goto _OVER;
}
dbObj.vgVersion++;
dbObj.updateTime = taosGetTimestampMs();
dbObj.cfg.numOfVgroups++;
if (mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
// adjust vgroup replica // adjust vgroup replica
if (pDb->cfg.replications != newVg1.replica) { if (pDb->cfg.replications != newVg1.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER; if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
} else { } else {
if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER; if (mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
} }
if (pDb->cfg.replications != newVg2.replica) { if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER; if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else { } else {
if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER; if (mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
} }
if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER; if (mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
memcpy(&dbObj, pDb, sizeof(SDbObj)); // commit db status
if (dbObj.cfg.pRetensions != NULL) {
dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
if (dbObj.cfg.pRetensions == NULL) goto _OVER;
}
dbObj.vgVersion++; dbObj.vgVersion++;
dbObj.updateTime = taosGetTimestampMs(); dbObj.updateTime = taosGetTimestampMs();
dbObj.cfg.numOfVgroups++; if (mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
pRaw = mndDbActionEncode(&dbObj);
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
...@@ -2362,7 +2402,6 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro ...@@ -2362,7 +2402,6 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
_OVER: _OVER:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
mndTransDrop(pTrans); mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
taosArrayDestroy(dbObj.cfg.pRetensions); taosArrayDestroy(dbObj.cfg.pRetensions);
return code; return code;
} }
......
...@@ -122,6 +122,7 @@ typedef enum { ...@@ -122,6 +122,7 @@ typedef enum {
SDB_STATUS_DROPPING = 2, SDB_STATUS_DROPPING = 2,
SDB_STATUS_DROPPED = 3, SDB_STATUS_DROPPED = 3,
SDB_STATUS_READY = 4, SDB_STATUS_READY = 4,
SDB_STATUS_UPDATE = 5,
} ESdbStatus; } ESdbStatus;
typedef enum { typedef enum {
......
...@@ -256,6 +256,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -256,6 +256,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw) {
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize); code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
case SDB_STATUS_READY: case SDB_STATUS_READY:
case SDB_STATUS_UPDATE:
case SDB_STATUS_DROPPING: case SDB_STATUS_DROPPING:
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize); code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
......
...@@ -879,9 +879,13 @@ static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) { ...@@ -879,9 +879,13 @@ static int32_t metaFilterTableByHash(SMeta *pMeta, SArray *uidList) {
SDecoder dc = {0}; SDecoder dc = {0};
tDecoderInit(&dc, pData, nData); tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &me); metaDecodeEntry(&dc, &me);
if (me.type != TSDB_SUPER_TABLE) { if (me.type != TSDB_SUPER_TABLE) {
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, me.name); char tbFName[TSDB_TABLE_FNAME_LEN + 1];
if (TSDB_CODE_VND_HASH_MISMATCH == ret) { snprintf(tbFName, sizeof(tbFName), "%s.%s", pMeta->pVnode->config.dbname, me.name);
tbFName[TSDB_TABLE_FNAME_LEN] = '\0';
int32_t ret = vnodeValidateTableHash(pMeta->pVnode, tbFName);
if (ret < 0 && terrno == TSDB_CODE_VND_HASH_MISMATCH) {
taosArrayPush(uidList, &me.uid); taosArrayPush(uidList, &me.uid);
} }
} }
...@@ -910,6 +914,7 @@ int32_t metaTrimTables(SMeta *pMeta) { ...@@ -910,6 +914,7 @@ int32_t metaTrimTables(SMeta *pMeta) {
goto end; goto end;
} }
metaInfo("vgId:%d, trim %ld tables", TD_VID(pMeta->pVnode), taosArrayGetSize(tbUids));
metaDropTables(pMeta, tbUids); metaDropTables(pMeta, tbUids);
end: end:
......
...@@ -325,7 +325,7 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) { ...@@ -325,7 +325,7 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) { if (hashValue < pVnode->config.hashBegin || hashValue > pVnode->config.hashEnd) {
terrno = TSDB_CODE_VND_HASH_MISMATCH; terrno = TSDB_CODE_VND_HASH_MISMATCH;
return TSDB_CODE_VND_HASH_MISMATCH; return -1;
} }
return 0; return 0;
......
...@@ -431,7 +431,7 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm ...@@ -431,7 +431,7 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
} }
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (pMsg->code == 0) { if (pMsg->code == 0) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
} }
...@@ -451,7 +451,7 @@ static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFs ...@@ -451,7 +451,7 @@ static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFs
return 0; return 0;
} }
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 1) { if (pMeta->isWeak == 1) {
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta); return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
} }
...@@ -463,7 +463,7 @@ static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) { ...@@ -463,7 +463,7 @@ static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
return atomic_load_64(&pVnode->state.applied); return atomic_load_64(&pVnode->state.applied);
} }
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
......
...@@ -618,8 +618,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ ...@@ -618,8 +618,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
return -1; return -1;
} }
// not restored, vnode enable if (!pSyncNode->restoreFinish) {
if (!pSyncNode->restoreFinish && pSyncNode->vgId != 1) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64, sNError(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
......
...@@ -275,7 +275,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction ...@@ -275,7 +275,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog is null")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished") TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRNAS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
// mnode-mq // mnode-mq
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册