From 44a9695460a9485edc95d27ec4878c7cb67c708a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 17 May 2022 04:09:25 +0800 Subject: [PATCH] enh(tmq): cascade drop --- include/util/taoserror.h | 1 + source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndOffset.h | 3 ++ source/dnode/mnode/impl/inc/mndSubscribe.h | 1 + source/dnode/mnode/impl/inc/mndTopic.h | 2 + source/dnode/mnode/impl/src/mndConsumer.c | 18 +++++-- source/dnode/mnode/impl/src/mndOffset.c | 52 +++++++++++++++++- source/dnode/mnode/impl/src/mndSubscribe.c | 56 ++++++++++++++++++- source/dnode/mnode/impl/src/mndTopic.c | 63 +++++++++++++++++----- source/util/src/terror.c | 1 + 10 files changed, 176 insertions(+), 22 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 96fb210e26..d3fe936e88 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -281,6 +281,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E8) #define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) #define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA) +#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB) // mnode-stream #define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5eb6866387..0ce67dd3c2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -459,6 +459,7 @@ typedef struct { char* ast; char* physicalPlan; SSchemaWrapper schema; + int32_t refConsumerCnt; } SMqTopicObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndOffset.h b/source/dnode/mnode/impl/inc/mndOffset.h index b468496165..900181858b 100644 --- a/source/dnode/mnode/impl/inc/mndOffset.h +++ b/source/dnode/mnode/impl/inc/mndOffset.h @@ -38,6 +38,9 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c } int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); + +bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 5ca672e8dd..50cede62ce 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -32,6 +32,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index be3f9c3283..d7e6f9c87b 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -35,6 +35,8 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); +int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 9c8c6d32eb..76b8081849 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -399,6 +399,9 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { int32_t newTopicNum = taosArrayGetSize(newSub); // check topic existance + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); + if (pTrans == NULL) goto SUBSCRIBE_OVER; + for (int32_t i = 0; i < newTopicNum; i++) { char *topic = taosArrayGetP(newSub, i); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); @@ -406,7 +409,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; goto SUBSCRIBE_OVER; } - // TODO lock topic to prevent drop + + // ref topic to prevent drop + // TODO make topic complete + SMqTopicObj topicObj = {0}; + memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); + topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1; + if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER; + mndReleaseTopic(pMnode, pTopic); } @@ -422,8 +432,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); - if (pTrans == NULL) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; @@ -494,8 +502,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { goto SUBSCRIBE_OVER; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); - if (pTrans == NULL) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; } @@ -503,6 +509,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { code = TSDB_CODE_MND_ACTION_IN_PROGRESS; SUBSCRIBE_OVER: + mndTransDrop(pTrans); + if (pConsumerOld) { /*taosRUnLockLatch(&pConsumerOld->lock);*/ mndReleaseConsumer(pMnode, pConsumerOld); diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 24a7c0d389..a7d174c9c5 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -50,6 +50,14 @@ int32_t mndInitOffset(SMnode *pMnode) { void mndCleanupOffset(SMnode *pMnode) {} +bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) { + int32_t i = 0; + while (pOffset->key[i] != ':') i++; + while (pOffset->key[i] != ':') i++; + if (strcmp(&pOffset->key[i + 1], topic) == 0) return true; + return false; +} + SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) { terrno = TSDB_CODE_OUT_OF_MEMORY; void *buf = NULL; @@ -134,10 +142,11 @@ int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicNa int32_t sz = taosArrayGetSize(vgs); for (int32_t i = 0; i < sz; i++) { int32_t vgId = *(int32_t *)taosArrayGet(vgs, i); - SMqOffsetObj offsetObj; + SMqOffsetObj offsetObj = {0}; if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) { return -1; } + // TODO assign db offsetObj.offset = -1; SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj); if (pOffsetRaw == NULL) { @@ -240,6 +249,14 @@ static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOff return 0; } +static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) { + SSdbRaw *pRedoRaw = mndOffsetActionEncode(pOffset); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t code = -1; SSdb *pSdb = pMnode->pSdb; @@ -247,7 +264,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { void *pIter = NULL; SMqOffsetObj *pOffset = NULL; while (1) { - pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset); + pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset); if (pIter == NULL) break; if (pOffset->dbUid != pDb->uid) { @@ -256,8 +273,39 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) { + sdbRelease(pSdb, pOffset); + goto END; + } + + sdbRelease(pSdb, pOffset); + } + + code = 0; +END: + return code; +} + +int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) { + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqOffsetObj *pOffset = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset); + if (pIter == NULL) break; + + if (!mndOffsetFromTopic(pOffset, topic)) { + sdbRelease(pSdb, pOffset); + continue; + } + + if (mndSetDropOffsetRedoLogs(pMnode, pTrans, pOffset) < 0) { + sdbRelease(pSdb, pOffset); goto END; } + + sdbRelease(pSdb, pOffset); } code = 0; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 5ad4863322..4a0882f3fe 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -389,7 +389,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebOutputObj *pOutput) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, &pMsg->rpcMsg); if (pTrans == NULL) { return -1; } @@ -458,6 +458,20 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO goto REB_FAIL; } } + if (consumerNum) { + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); + if (pTopic) { + // TODO make topic complete + SMqTopicObj topicObj = {0}; + memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); + topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum; + if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL; + } + } + // 4. TODO commit log: modification log // 5. set cb @@ -688,6 +702,14 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) { return 0; } +static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { + SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); if (pCommitRaw == NULL) return -1; @@ -712,6 +734,38 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { } if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { + sdbRelease(pSdb, pSub); + goto END; + } + } + + code = 0; +END: + return code; +} + +int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) { + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqSubscribeObj *pSub = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); + if (pIter == NULL) break; + + char topic[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CGROUP_LEN]; + mndSplitSubscribeKey(pSub->key, topic, cgroup, true); + if (strcmp(topic, topicName) != 0) { + sdbRelease(pSdb, pSub); + continue; + } + + // iter all vnode to delete handle + + if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { + sdbRelease(pSdb, pSub); goto END; } } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bd2923ac1a..bc650ec95b 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -18,8 +18,10 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndOffset.h" #include "mndShow.h" #include "mndStb.h" +#include "mndSubscribe.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -106,6 +108,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); @@ -190,6 +193,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { goto TOPIC_DECODE_OVER; } + SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER); + SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); terrno = TSDB_CODE_SUCCESS; @@ -220,11 +225,13 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_32(&pOldTopic->version, pNewTopic->version); - taosWLockLatch(&pOldTopic->lock); + atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt); + + /*taosWLockLatch(&pOldTopic->lock);*/ // TODO handle update - taosWUnLockLatch(&pOldTopic->lock); + /*taosWUnLockLatch(&pOldTopic->lock);*/ return 0; } @@ -292,6 +299,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.version = 1; topicObj.sql = strdup(pCreate->sql); topicObj.sqlLen = strlen(pCreate->sql) + 1; + topicObj.refConsumerCnt = 0; if (pCreate->ast && pCreate->ast[0]) { topicObj.ast = strdup(pCreate->ast); @@ -436,15 +444,7 @@ CREATE_TOPIC_OVER: return code; } -static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) { - // TODO: cannot drop when subscribed - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); - if (pTrans == NULL) { - mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); - return -1; - } - mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); - +static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SNodeMsg *pReq, SMqTopicObj *pTopic) { SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); @@ -465,6 +465,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) { SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; SMDropTopicReq dropReq = {0}; if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { @@ -485,10 +486,35 @@ static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) { return -1; } } - // TODO: check ref - int32_t code = mndDropTopic(pMnode, pReq, pTopic); - // TODO: iterate and drop related subscriptions and offsets + // check ref + if (pTopic->refConsumerCnt != 0) { + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); + if (pTrans == NULL) { + mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); + return -1; + } + + mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); + +#if 1 + if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { + ASSERT(0); + return -1; + } +#endif + + if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { + ASSERT(0); + return -1; + } + + int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); mndReleaseTopic(pMnode, pTopic); if (code != 0) { @@ -577,6 +603,15 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB return numOfRows; } +int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) { + SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) { SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); if (pCommitRaw == NULL) return -1; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8a2a60d3e2..1aa0064ab3 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -285,6 +285,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid qu TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped") // mnode-sma TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") -- GitLab