From 05a0774b6d706833356031c498c5cef8779547a7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 27 May 2022 13:58:58 +0800 Subject: [PATCH] feat(tmq): add sub type --- include/common/tmsg.h | 25 +++++++++++++++---------- source/common/src/tmsg.c | 22 ++++++++++++++-------- source/dnode/mnode/impl/src/mndTopic.c | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 4 ---- source/libs/parser/src/parTranslater.c | 8 ++++---- 5 files changed, 37 insertions(+), 30 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cc11254dcd..04c3a338b9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1473,15 +1473,25 @@ typedef struct { int64_t streamId; } SMVCreateStreamRsp, SMSCreateStreamRsp; +enum { + TOPIC_SUB_TYPE__DB = 1, + TOPIC_SUB_TYPE__TABLE, + TOPIC_SUB_TYPE__COLUMN, +}; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic int8_t igExists; - int8_t withTbName; - int8_t withSchema; - int8_t withTag; + // int8_t withTbName; + // int8_t withSchema; + // int8_t withTag; + int8_t subType; char* sql; - char* ast; - char subscribeDbName[TSDB_DB_NAME_LEN]; + union { + char* ast; + char subDbName[TSDB_DB_NAME_LEN]; + char subStbName[TSDB_TABLE_FNAME_LEN]; + }; } SCMCreateTopicReq; int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); @@ -2145,11 +2155,6 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { return buf; } -enum { - TOPIC_SUB_TYPE__DB = 1, - TOPIC_SUB_TYPE__TABLE, -}; - typedef struct { SMsgHead head; int64_t leftForVer; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 22ac412434..32f764022d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2677,10 +2677,12 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; - if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1; - if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1; - if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->subscribeDbName) < 0) return -1; + /*if (tEncodeI8(&encoder, pReq->withTbName) < 0) return -1;*/ + /*if (tEncodeI8(&encoder, pReq->withSchema) < 0) return -1;*/ + /*if (tEncodeI8(&encoder, pReq->withTag) < 0) return -1;*/ + if (tEncodeI8(&encoder, pReq->subType) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->subDbName) < 0) return -1; + /*if (tEncodeCStr(&encoder, pReq->subStbName) < 0) return -1;*/ if (tEncodeI32(&encoder, sqlLen) < 0) return -1; if (tEncodeI32(&encoder, astLen) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; @@ -2703,10 +2705,14 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1; - if (tDecodeCStrTo(&decoder, pReq->subscribeDbName) < 0) return -1; + /*if (tDecodeI8(&decoder, &pReq->withTbName) < 0) return -1;*/ + /*if (tDecodeI8(&decoder, &pReq->withSchema) < 0) return -1;*/ + /*if (tDecodeI8(&decoder, &pReq->withTag) < 0) return -1;*/ + if (tDecodeI8(&decoder, &pReq->subType) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->subDbName) < 0) return -1; + /*if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {*/ + /*if (tDecodeCStrTo(&decoder, pReq->subStbName) < 0) return -1;*/ + /*}*/ if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; if (tDecodeI32(&decoder, &astLen) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 2048c79847..89f4bfc779 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -307,7 +307,7 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq } static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { - if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subscribeDbName[0] == 0) { + if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0 || pCreate->subDbName[0] == 0) { terrno = TSDB_CODE_MND_INVALID_TOPIC; return -1; } @@ -333,8 +333,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.subType = TOPIC_SUB_TYPE__TABLE; - topicObj.withTbName = pCreate->withTbName; - topicObj.withSchema = pCreate->withSchema; + /*topicObj.withTbName = pCreate->withTbName;*/ + /*topicObj.withSchema = pCreate->withSchema;*/ SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { @@ -441,7 +441,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { goto CREATE_TOPIC_OVER; } - pDb = mndAcquireDb(pMnode, createTopicReq.subscribeDbName); + pDb = mndAcquireDb(pMnode, createTopicReq.subDbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; goto CREATE_TOPIC_OVER; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9941b00ff7..93ff049b3e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -122,10 +122,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { ASSERT(0); } - /*if (tdbBegin(pTq->pMetaStore, &txn) < 0) {*/ - /*ASSERT(0);*/ - /*}*/ - TBC* pCur; if (tdbTbcOpen(pTq->pExecStore, &pCur, &txn) < 0) { ASSERT(0); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 73abe6cd9d..467df44b7c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3239,9 +3239,9 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); tNameGetFullDbName(&name, pReq->name); pReq->igExists = pStmt->ignoreExists; - pReq->withTbName = pStmt->pOptions->withTable; - pReq->withSchema = pStmt->pOptions->withSchema; - pReq->withTag = pStmt->pOptions->withTag; + /*pReq->withTbName = pStmt->pOptions->withTable;*/ + /*pReq->withSchema = pStmt->pOptions->withSchema;*/ + /*pReq->withTag = pStmt->pOptions->withTag;*/ pReq->sql = strdup(pCxt->pParseCxt->pSql); if (NULL == pReq->sql) { @@ -3262,7 +3262,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS dbName = pStmt->subscribeDbName; } tNameSetDbName(&name, pCxt->pParseCxt->acctId, dbName, strlen(dbName)); - tNameGetFullDbName(&name, pReq->subscribeDbName); + tNameGetFullDbName(&name, pReq->subDbName); return code; } -- GitLab