From 984b83156384888f78e70fc613d2627c71b06b9f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 24 May 2022 15:56:10 +0800 Subject: [PATCH] fix(tmq): unref topic --- source/dnode/mnode/impl/src/mndConsumer.c | 2 ++ source/dnode/mnode/impl/src/mndSubscribe.c | 6 +++++- tests/system-test/7-tmq/subscribeDb.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 1bb003bab9..7cebeb35f5 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -419,6 +419,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqTopicObj topicObj = {0}; memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1; + mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup, + topicObj.refConsumerCnt); if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER; mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3713bd501a..0ece5d29e5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -417,7 +417,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 2. redo log: subscribe and vg assignment // subscribe - if (mndSetSubRedoLogs(pMnode, pTrans, pOutput->pSub) != 0) { + if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { goto REB_FAIL; } @@ -479,6 +479,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu SMqTopicObj topicObj = {0}; memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum; + // TODO is that correct? + pTopic->refConsumerCnt = topicObj.refConsumerCnt; + mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup, + topicObj.refConsumerCnt); if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL; } } diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index 65a6f5fad6..66c79fd292 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -813,7 +813,7 @@ class TDTestCase: self.tmqCase1(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath) - # self.tmqCase2a(cfgPath, buildPath) + self.tmqCase2a(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath) -- GitLab