From 3035ecd58e3dac047dc52fb59ae4938c62fb288b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 22 Apr 2022 14:26:36 +0800 Subject: [PATCH] add show topic --- example/src/tmq.c | 1 + source/dnode/mnode/impl/src/mndPerfSchema.c | 4 +- source/dnode/mnode/impl/src/mndStream.c | 5 +- source/dnode/mnode/impl/src/mndTopic.c | 52 +++++++-------------- 4 files changed, 24 insertions(+), 38 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 88fce7f4be..46f57799e7 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -103,6 +103,7 @@ int32_t create_topic() { /*const char* sql = "select * from tu1";*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c index 38e9e79228..fe96bcb709 100644 --- a/source/dnode/mnode/impl/src/mndPerfSchema.c +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = { static const SPerfsTableSchema topicSchema[] = { {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + /*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/ {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + /*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/ }; static const SPerfsTableSchema consumerSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 58e5b6c65b..1062154e02 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) { /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream); - mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream); + /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/ return sdbSetTable(pMnode->pSdb, table); } @@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, + STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 22b1b404bb..f17b8a4d88 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); -static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); int32_t mndInitTopic(SMnode *pMnode) { @@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); - // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); return sdbSetTable(pMnode->pSdb, table); @@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo return 0; } -static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SMqTopicObj *pTopic = NULL; - int32_t cols = 0; - char *pWrite; - char prefix[TSDB_DB_FNAME_LEN] = {0}; - SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); - if (pDb == NULL) return 0; - - tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN); - strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = (int32_t)strlen(prefix); - - while (numOfRows < rows) { + while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); if (pShow->pIter == NULL) break; - if (pTopic->dbUid != pDb->uid) { - if (strncmp(pTopic->name, prefix, prefixLen) != 0) { - mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); - } + int32_t cols = 0; - sdbRelease(pSdb, pTopic); - continue; - } + char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(&topicName[VARSTR_HEADER_SIZE], pTopic->name, TSDB_TOPIC_NAME_LEN); + varDataSetLen(topicName, strlen(&topicName[VARSTR_HEADER_SIZE])); - cols = 0; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); - char topicName[TSDB_TOPIC_NAME_LEN] = {0}; - tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, topicName); - cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pTopic->createTime; - cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE); + strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql); + varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); + colDataAppend(pColInfo, numOfRows, (const char *)sql, false); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]); - cols++; + taosMemoryFree(sql); numOfRows++; sdbRelease(pSdb, pTopic); } - mndReleaseDb(pMnode, pDb); pShow->numOfRows += numOfRows; return numOfRows; } -- GitLab