提交 cb5ec76c 编写于 作者: S Shengliang Guan

enh: check conflict before create trans

上级 e6c6de96
...@@ -557,6 +557,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, ...@@ -557,6 +557,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mInfo("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mInfo("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbName(pTrans, dbObj.name, NULL); mndTransSetDbName(pTrans, dbObj.name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetOper(pTrans, MND_OPER_CREATE_DB); mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
...@@ -776,7 +778,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p ...@@ -776,7 +778,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
int32_t code = -1; int32_t code = -1;
mndTransSetDbName(pTrans, pOld->name, NULL); mndTransSetDbName(pTrans, pOld->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1; if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
...@@ -1038,7 +1040,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { ...@@ -1038,7 +1040,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mInfo("trans:%d, used to drop db:%s", pTrans->id, pDb->name); mInfo("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER; if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER;
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
......
...@@ -595,6 +595,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -595,6 +595,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma");
if (pTrans == NULL) goto _OVER; if (pTrans == NULL) goto _OVER;
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
...@@ -809,6 +811,8 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p ...@@ -809,6 +811,8 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbName(pTrans, pDb->name, NULL); mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans); mndTransSetSerial(pTrans);
char streamName[TSDB_TABLE_FNAME_LEN] = {0}; char streamName[TSDB_TABLE_FNAME_LEN] = {0};
......
...@@ -823,6 +823,7 @@ _OVER: ...@@ -823,6 +823,7 @@ _OVER:
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1;
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
...@@ -1856,6 +1857,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb ...@@ -1856,6 +1857,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb
mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name); mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (needRsp) { if (needRsp) {
void *pCont = NULL; void *pCont = NULL;
...@@ -2055,6 +2057,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p ...@@ -2055,6 +2057,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
......
...@@ -640,9 +640,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -640,9 +640,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER; goto _OVER;
} }
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
// create stb for stream // create stb for stream
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
...@@ -790,6 +792,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { ...@@ -790,6 +792,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint");
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans);
return -1;
}
taosRLockLatch(&pStream->lock); taosRLockLatch(&pStream->lock);
// 1. redo action: broadcast checkpoint source msg for all source vg // 1. redo action: broadcast checkpoint source msg for all source vg
int32_t totLevel = taosArrayGetSize(pStream->tasks); int32_t totLevel = taosArrayGetSize(pStream->tasks);
...@@ -882,11 +890,11 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -882,11 +890,11 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
} }
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1; return -1;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream");
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
...@@ -894,6 +902,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -894,6 +902,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
} }
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
// drop all tasks // drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
......
...@@ -442,7 +442,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -442,7 +442,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans);
return -1;
}
// make txn: // make txn:
// 1. redo action: action to all vg // 1. redo action: action to all vg
......
...@@ -706,13 +706,19 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -706,13 +706,19 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
#endif #endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic");
mndTransSetDbName(pTrans, pTopic->db, NULL);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
return -1; return -1;
} }
mndTransSetDbName(pTrans, pTopic->db, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
return -1;
}
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
// TODO check if rebalancing // TODO check if rebalancing
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册