diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 1bb003bab9f526da988945c1025d09aebcffd39d..7cebeb35f5bb9e3f2b363c438a1ce70ad3296717 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -419,6 +419,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SMqTopicObj topicObj = {0}; memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1; + mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup, + topicObj.refConsumerCnt); if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER; mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3713bd501a42e5e450a0cb2dc80e0ebe14c32417..0ece5d29e525a649e8a02b0890eb7ae951008f7b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -417,7 +417,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 2. redo log: subscribe and vg assignment // subscribe - if (mndSetSubRedoLogs(pMnode, pTrans, pOutput->pSub) != 0) { + if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { goto REB_FAIL; } @@ -479,6 +479,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu SMqTopicObj topicObj = {0}; memcpy(&topicObj, pTopic, sizeof(SMqTopicObj)); topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum; + // TODO is that correct? + pTopic->refConsumerCnt = topicObj.refConsumerCnt; + mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup, + topicObj.refConsumerCnt); if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL; } } diff --git a/tests/system-test/7-tmq/subscribeDb.py b/tests/system-test/7-tmq/subscribeDb.py index 65a6f5fad63c9351f46a45f2727d92c501b06e85..66c79fd29222714e24f4a83ac417502758c02eff 100644 --- a/tests/system-test/7-tmq/subscribeDb.py +++ b/tests/system-test/7-tmq/subscribeDb.py @@ -813,7 +813,7 @@ class TDTestCase: self.tmqCase1(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath) - # self.tmqCase2a(cfgPath, buildPath) + self.tmqCase2a(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath)