From 7ab2e76c3535b12f7f7e75b83f3fe8e4c2ed45dd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 12 May 2022 11:31:56 +0800 Subject: [PATCH] feat(tmq): support get tb name --- example/src/tmq.c | 9 +++++++-- include/client/taos.h | 5 +---- include/common/tmsg.h | 10 ++++++++++ source/client/src/tmq.c | 13 +++++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 4 ++-- source/dnode/vnode/src/inc/meta.h | 10 +++++----- source/dnode/vnode/src/tq/tq.c | 26 ++++++++++++++++++++++++++ 7 files changed, 64 insertions(+), 13 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 976d658fa6..2ee91c254c 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) { int32_t numOfFields = taos_field_count(msg); taos_print_row(buf, row, fields, numOfFields); printf("%s\n", buf); + + const char* tbName = tmq_get_table_name(msg); + if (tbName) { + printf("from tb: %s\n", tbName); + } } } @@ -101,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/client/taos.h b/include/client/taos.h index 26d4d18234..486d5f5fef 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); -// TODO -#if 0 -DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); -#endif +DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); #if 0 DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b50367af03..d34892a278 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2480,6 +2480,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); tlen += taosEncodeSSchemaWrapper(buf, pSW); } + if (pRsp->withTbName) { + char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i); + tlen += taosEncodeString(buf, tbName); + } } } return tlen; @@ -2492,6 +2496,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI32(buf, &pRsp->blockNum); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); if (pRsp->blockNum != 0) { buf = taosDecodeFixedI8(buf, &pRsp->withTbName); @@ -2510,6 +2515,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeSSchemaWrapper(buf, pSW); taosArrayPush(pRsp->blockSchema, &pSW); } + if (pRsp->withTbName) { + char* name = NULL; + buf = taosDecodeString(buf, &name); + taosArrayPush(pRsp->blockTbName, &name); + } } } return (void*)buf; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0ce689f19c..b42f072e54 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { return -1; } } + +const char* tmq_get_table_name(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; + } + const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + return name; + } + return NULL; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 41d4d5f406..00379ecda1 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.subType = TOPIC_SUB_TYPE__TABLE; - topicObj.withTbName = 0; - topicObj.withSchema = 0; + topicObj.withTbName = pCreate->withTbName; + topicObj.withSchema = pCreate->withSchema; SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 96feee3d7d..6dd9ac0fed 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -111,10 +111,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); // SMetaDB int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); -int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); -int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); -int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); -int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); +// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); +int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); +int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); +int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); #endif #endif @@ -123,4 +123,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); } #endif -#endif /*_TD_VNODE_META_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_META_H_*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4b9551b250..29fabc0f9f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.withSchema = pExec->withSchema; + rsp.withTbName = pExec->withTbName; + rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockSchema = taosArrayInit(0, sizeof(void*)); + rsp.blockTbName = taosArrayInit(0, sizeof(void*)); while (1) { consumerEpoch = atomic_load_32(&pExec->epoch); @@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayPush(rsp.blockSchema, &pSW); } + if (pExec->withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + if (metaGetTableEntryByUid(&mr, uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } + rsp.blockNum++; } // db subscribe @@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ASSERT(actualLen <= dataStrLen); taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockData, &buf); + if (pExec->withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); taosArrayPush(rsp.blockSchema, &pSW); @@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); return 0; } -- GitLab