diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 647da79e563a9d697c556b73c6aad2b701516fb2..cc11254dcdbb6ff511a487e44405c578c1501ace 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1664,6 +1664,10 @@ typedef struct { int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq); +typedef struct { + int8_t reserved; +} SMDropCgroupRsp; + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4687062b180b719cbfa0aea18948d52face10de6..dc67c8904284ff44a33471cebfdc2a71f7fa91d3 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -151,6 +151,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMqConsumerLostMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mnode-mq-consumer-recover", SMqConsumerRecoverMsg, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "mnode-mq-drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a924719cf9d1355ce745267b39481b9dbd349faf..2b2ce31673c5fc261187ea4a3dfc120228aa8400 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -268,6 +268,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) #define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA) #define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB) +#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC) // mnode-stream #define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 6318f2e3f2b790b5aff25e774c762a8bb5f4c4da..9c62214b0eec6e0736dc20fb7653ca91188bfe84 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -94,6 +94,7 @@ typedef enum { TRN_TYPE_ALTER_STREAM = 1027, TRN_TYPE_CONSUMER_LOST = 1028, TRN_TYPE_CONSUMER_RECOVER = 1029, + TRN_TYPE_DROP_CGROUP = 1030, TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_GLOBAL_SCOPE = 2000, diff --git a/source/dnode/mnode/impl/inc/mndOffset.h b/source/dnode/mnode/impl/inc/mndOffset.h index 900181858bd724873ea948d450e830cc83643463..f7569b964875bbffe90c8fc5525fda8f68b688b8 100644 --- a/source/dnode/mnode/impl/inc/mndOffset.h +++ b/source/dnode/mnode/impl/inc/mndOffset.h @@ -39,6 +39,7 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); +int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey); bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic); diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 50cede62ce424ae855f46ba0f359b5088058e4d1..d91c2bd4c3f69063420f3a775f6183e3eaa3824d 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -33,6 +33,7 @@ int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); +int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index dca07f6a6d2910630a939d119b6d21e287112866..01516d03f28f168b71ea5272bf983c181a059bcd 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -58,6 +58,12 @@ bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) { return false; } +bool mndOffsetFromSubKey(SMqOffsetObj *pOffset, const char *subKey) { + int32_t i = 0; + while (pOffset->key[i] != ':') i++; + if (strcmp(&pOffset->key[i + 1], subKey) == 0) return true; + return false; +} SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) { terrno = TSDB_CODE_OUT_OF_MEMORY; void *buf = NULL; @@ -303,7 +309,35 @@ int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) continue; } - if (mndSetDropOffsetRedoLogs(pMnode, pTrans, pOffset) < 0) { + if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) { + sdbRelease(pSdb, pOffset); + goto END; + } + + sdbRelease(pSdb, pOffset); + } + + code = 0; +END: + return code; +} + +int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey) { + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqOffsetObj *pOffset = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset); + if (pIter == NULL) break; + + if (!mndOffsetFromSubKey(pOffset, subKey)) { + sdbRelease(pSdb, pOffset); + continue; + } + + if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) { sdbRelease(pSdb, pOffset); goto END; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0ece5d29e525a649e8a02b0890eb7ae951008f7b..17cf5d43b575dbe2840bec24a29e97dc399ccd7d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -42,6 +42,7 @@ static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg); +static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pMsg); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -75,6 +76,8 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); + mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); + mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); @@ -581,6 +584,57 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { return 0; } +static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + /*SSdb *pSdb = pMnode->pSdb;*/ + SMDropCgroupReq dropReq = {0}; + + if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic); + if (pSub == NULL) { + if (dropReq.igNotExists) { + mDebug("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic); + return 0; + } else { + terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; + mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr()); + return -1; + } + } + + if (taosHashGetSize(pSub->consumerHash) == 0) { + terrno = TSDB_CODE_MND_CGROUP_USED; + mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_CGROUP, pReq); + if (pTrans == NULL) { + mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); + return -1; + } + + mDebug("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); + + if (mndDropOffsetBySubKey(pMnode, pTrans, pSub->key) < 0) { + ASSERT(0); + return -1; + } + + if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { + mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); + return -1; + } + + mndReleaseSubscribe(pMnode, pSub); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} + void mndCleanupSubscribe(SMnode *pMnode) {} static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { @@ -735,7 +789,7 @@ static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscrib return 0; } -static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { +int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;