From 1013b06107fa060fe86eb2b6e0348042f767a3e4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 31 May 2022 19:24:26 +0800 Subject: [PATCH] refactor: make trans support global conflict --- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndConsumer.c | 6 +++--- source/dnode/mnode/impl/src/mndOffset.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/dnode/mnode/impl/src/mndSubscribe.c | 13 ++++++------- source/dnode/mnode/impl/src/mndTopic.c | 4 ++-- 6 files changed, 14 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index d1fd98137d..9608a84a10 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -122,7 +122,7 @@ typedef struct { int64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; char lastError[TSDB_TRANS_ERROR_LEN]; - char desc[TSDB_TRANS_DESC_LEN]; + char o[TSDB_TRANS_DESC_LEN]; int32_t startFunc; int32_t stopFunc; int32_t paramLen; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 470b7ac1bf..0314891d59 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg); if (pTrans == NULL) goto FAIL; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumer); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg); if (pTrans == NULL) goto FAIL; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(newSub); // check topic existance - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg); if (pTrans == NULL) goto SUBSCRIBE_OVER; for (int32_t i = 0; i < newTopicNum; i++) { diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 8dfcbfbf3c..00c8bb30d0 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg); for (int32_t i = 0; i < commitOffsetReq.num; i++) { SMqOffset *pOffset = &commitOffsetReq.offsets[i]; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 246d5dc951..5ee5b06a57 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN); } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq); if (pTrans == NULL) { mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7f422e2919..e7ce1cbe78 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -393,8 +393,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR mInfo("rebalance calculation completed, rebalanced vg:"); for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); - mInfo("vg: %d moved from consumer %ld to consumer %ld", pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, - pOutputRebVg->newConsumerId); + mInfo("vgId:%d moved from consumer %" PRId64 " to consumer %" PRId64, pOutputRebVg->pVgEp->vgId, + pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } // 9. clear @@ -404,10 +404,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pMsg); - if (pTrans == NULL) { - return -1; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg); + if (pTrans == NULL) return -1; + // make txn: // 1. redo action: action to all vg const SArray *rebVgs = pOutput->rebVgs; @@ -623,7 +622,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq); if (pTrans == NULL) { mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 28f7ae2607..74d8f9c2b6 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -377,7 +377,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.withSchema = 1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); taosMemoryFreeClear(topicObj.ast); @@ -545,7 +545,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } #endif - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); return -1; -- GitLab