未验证 提交 0e3f6be6 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13254 from taosdata/feature/tq

fix(tmq): remove topic ref cnt
...@@ -473,7 +473,7 @@ typedef struct { ...@@ -473,7 +473,7 @@ typedef struct {
char* ast; char* ast;
char* physicalPlan; char* physicalPlan;
SSchemaWrapper schema; SSchemaWrapper schema;
int32_t refConsumerCnt; // int32_t refConsumerCnt;
} SMqTopicObj; } SMqTopicObj;
typedef struct { typedef struct {
......
...@@ -414,6 +414,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -414,6 +414,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
goto SUBSCRIBE_OVER; goto SUBSCRIBE_OVER;
} }
#if 0
// ref topic to prevent drop // ref topic to prevent drop
// TODO make topic complete // TODO make topic complete
SMqTopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
...@@ -422,6 +423,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -422,6 +423,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup, mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
topicObj.refConsumerCnt); topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER; if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
#endif
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
} }
......
...@@ -1044,9 +1044,9 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { ...@@ -1044,9 +1044,9 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser); SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser);
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
...@@ -188,7 +189,15 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) { ...@@ -188,7 +189,15 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
bool create = false; bool create = false;
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
if (pOffsetObj == NULL) { if (pOffsetObj == NULL) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOffset->topicName);
if (pTopic == NULL) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("submit offset to topic %s failed since %s", pOffset->topicName, terrstr());
continue;
}
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj)); pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
pOffsetObj->dbUid = pTopic->dbUid;
mndReleaseTopic(pMnode, pTopic);
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN); memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
create = true; create = true;
} }
......
...@@ -286,7 +286,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -286,7 +286,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
bool hasExtraSink = false; bool hasExtraSink = false;
if (totLevel == 2) { if (totLevel == 2 || strcmp(pStream->sourceDb, pStream->targetDb) != 0) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink // add extra sink
...@@ -407,7 +407,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -407,7 +407,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
/*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/ /*pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;*/
pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH; pTask->dispatchMsgType = TDMT_VND_TASK_DISPATCH;
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
ASSERT(pDb); ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
......
...@@ -393,6 +393,15 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq ...@@ -393,6 +393,15 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
streamObj.trigger = pCreate->triggerType; streamObj.trigger = pCreate->triggerType;
streamObj.waterMark = pCreate->watermark; streamObj.waterMark = pCreate->watermark;
if (streamObj.targetSTbName[0]) {
pDb = mndAcquireDbByStb(pMnode, streamObj.targetSTbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, pReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, pReq);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
......
...@@ -157,6 +157,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM ...@@ -157,6 +157,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
int32_t vgId = pRebVg->pVgEp->vgId; int32_t vgId = pRebVg->pVgEp->vgId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
if (pVgObj == NULL) { if (pVgObj == NULL) {
ASSERT(0);
taosMemoryFree(buf); taosMemoryFree(buf);
return -1; return -1;
} }
...@@ -451,6 +452,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -451,6 +452,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
taosArrayPush(pConsumerNew->rebNewTopics, &topic); taosArrayPush(pConsumerNew->rebNewTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld); mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
ASSERT(0);
goto REB_FAIL; goto REB_FAIL;
} }
} }
...@@ -469,9 +471,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -469,9 +471,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); taosArrayPush(pConsumerNew->rebRemovedTopics, &topic);
mndReleaseConsumer(pMnode, pConsumerOld); mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
ASSERT(0);
goto REB_FAIL; goto REB_FAIL;
} }
} }
#if 0
if (consumerNum) { if (consumerNum) {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
...@@ -486,9 +490,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -486,9 +490,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
pTopic->refConsumerCnt = topicObj.refConsumerCnt; pTopic->refConsumerCnt = topicObj.refConsumerCnt;
mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup, mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup,
topicObj.refConsumerCnt); topicObj.refConsumerCnt);
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL; if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) {
ASSERT(0);
goto REB_FAIL;
}
} }
} }
#endif
// 4. TODO commit log: modification log // 4. TODO commit log: modification log
...@@ -496,7 +504,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -496,7 +504,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_TEST_MQ_REB, NULL, 0); mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_TEST_MQ_REB, NULL, 0);
// 6. execution // 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) {
ASSERT(0);
goto REB_FAIL;
}
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "mndTopic.h" #include "mndTopic.h"
#include "mndAuth.h" #include "mndAuth.h"
#include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
...@@ -121,7 +122,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -121,7 +122,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
} }
SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER); /*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
...@@ -221,7 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -221,7 +222,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
pTopic->schema.pSchema = NULL; pTopic->schema.pSchema = NULL;
} }
SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER); /*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
...@@ -253,7 +254,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic ...@@ -253,7 +254,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version); atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt); /*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/
/*taosWLockLatch(&pOldTopic->lock);*/ /*taosWLockLatch(&pOldTopic->lock);*/
...@@ -327,7 +328,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * ...@@ -327,7 +328,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.version = 1; topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql); topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1; topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.refConsumerCnt = 0; /*topicObj.refConsumerCnt = 0;*/
if (pCreate->ast && pCreate->ast[0]) { if (pCreate->ast && pCreate->ast[0]) {
topicObj.ast = strdup(pCreate->ast); topicObj.ast = strdup(pCreate->ast);
...@@ -492,8 +493,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo ...@@ -492,8 +493,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
} }
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
/*SSdb *pSdb = pMnode->pSdb;*/ SSdb *pSdb = pMnode->pSdb;
SMDropTopicReq dropReq = {0}; SMDropTopicReq dropReq = {0};
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
...@@ -513,12 +514,36 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -513,12 +514,36 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
} }
} }
void *pIter = NULL;
SMqConsumerObj *pConsumer;
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) break;
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
if (strcmp(name, pTopic->name) == 0) {
mndReleaseConsumer(pMnode, pConsumer);
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since subscribed by consumer %ld from cgroup %s", dropReq.name,
pConsumer->consumerId, pConsumer->cgroup);
return -1;
}
}
sdbRelease(pSdb, pConsumer);
}
#if 0
if (pTopic->refConsumerCnt != 0) { if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1; return -1;
} }
#endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq);
if (pTrans == NULL) { if (pTrans == NULL) {
......
...@@ -382,6 +382,7 @@ class TDTestCase: ...@@ -382,6 +382,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
time.sleep(15)
tdSql.query("drop topic %s"%topicName1) tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 10 end ...... ") tdLog.printNoPrefix("======== test case 10 end ...... ")
...@@ -453,6 +454,7 @@ class TDTestCase: ...@@ -453,6 +454,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
time.sleep(15)
tdSql.query("drop topic %s"%topicName1) tdSql.query("drop topic %s"%topicName1)
tdLog.printNoPrefix("======== test case 11 end ...... ") tdLog.printNoPrefix("======== test case 11 end ...... ")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册