提交 d1259b3c 编写于 作者: L Liu Jicong

fix(tmq): remove topic ref cnt

上级 1aa97bc1
......@@ -468,7 +468,7 @@ typedef struct {
char* ast;
char* physicalPlan;
SSchemaWrapper schema;
int32_t refConsumerCnt;
// int32_t refConsumerCnt;
} SMqTopicObj;
typedef struct {
......
......@@ -414,6 +414,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto SUBSCRIBE_OVER;
}
#if 0
// ref topic to prevent drop
// TODO make topic complete
SMqTopicObj topicObj = {0};
......@@ -422,6 +423,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
#endif
mndReleaseTopic(pMnode, pTopic);
}
......
......@@ -1044,9 +1044,9 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser);
......
......@@ -21,6 +21,7 @@
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
......@@ -188,7 +189,15 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
bool create = false;
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
if (pOffsetObj == NULL) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName);
if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
continue;
}
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
pOffsetObj->dbUid = pTopic->dbUid;
mndReleaseTopic(pMnode, pTopic);
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
create = true;
}
......
......@@ -286,7 +286,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false;
if (totLevel == 2) {
if (totLevel == 2 || strcmp(pStream->sourceDb, pStream->targetDb) != 0) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
......@@ -407,7 +407,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pSdb, pDb);
......
......@@ -393,6 +393,15 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
streamObj.trigger = pCreate->triggerType;
streamObj.waterMark = pCreate->watermark;
if (streamObj.targetSTbName[0]) {
pDb = mndAcquireDbByStb(pMnode, streamObj.targetSTbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, pReq);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
......
......@@ -157,6 +157,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
int32_t vgId = pRebVg->pVgEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
if (pVgObj == NULL) {
ASSERT(0);
taosMemoryFree(buf);
return -1;
}
......@@ -451,6 +452,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
taosArrayPush(pConsumerNew->rebNewTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
ASSERT(0);
goto REB_FAIL;
}
}
......@@ -469,9 +471,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
ASSERT(0);
goto REB_FAIL;
}
}
#if 0
if (consumerNum) {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
......@@ -486,9 +490,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
pTopic->refConsumerCnt = topicObj.refConsumerCnt;
mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup,
topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) {
ASSERT(0);
goto REB_FAIL;
}
}
}
#endif
// 4. TODO commit log: modification log
......@@ -496,7 +504,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
mndTransSetCb(pTrans, MQ_REB_TRANS_START_FUNC, MQ_REB_TRANS_STOP_FUNC, NULL, 0);
// 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) {
ASSERT(0);
goto REB_FAIL;
}
mndTransDrop(pTrans);
return 0;
......
......@@ -15,6 +15,7 @@
#include "mndTopic.h"
#include "mndAuth.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
......@@ -121,7 +122,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
}
SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);
/*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
......@@ -221,7 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
pTopic->schema.pSchema = NULL;
}
SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);
/*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
......@@ -253,7 +254,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);
/*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/
/*taosWLockLatch(&pOldTopic->lock);*/
......@@ -327,7 +328,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.refConsumerCnt = 0;
/*topicObj.refConsumerCnt = 0;*/
if (pCreate->ast && pCreate->ast[0]) {
topicObj.ast = strdup(pCreate->ast);
......@@ -492,8 +493,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
}
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
/*SSdb *pSdb = pMnode->pSdb;*/
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMDropTopicReq dropReq = {0};
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......@@ -513,12 +514,36 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
}
void *pIter = NULL;
SMqConsumerObj *pConsumer;
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name,
pConsumer->consumerId, pConsumer->cgroup);
return -1;
}
}
sdbRelease(pSdb, pConsumer);
}
#if 0
if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
}
#endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq);
if (pTrans == NULL) {
......
......@@ -382,6 +382,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
time.sleep(15)
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 10 end ...... ")
......@@ -453,6 +454,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
time.sleep(15)
tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 11 end ...... ")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册