未验证 提交 d8b0bf31 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11744 from taosdata/feature/tq

feat: add show topic
...@@ -103,6 +103,7 @@ int32_t create_topic() { ...@@ -103,6 +103,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/ /*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
......
...@@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = { ...@@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = {
static const SPerfsTableSchema topicSchema[] = { static const SPerfsTableSchema topicSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, /*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, /*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/
}; };
static const SPerfsTableSchema consumerSchema[] = { static const SPerfsTableSchema consumerSchema[] = {
......
...@@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream); // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream); /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 ...@@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return code; return code;
} }
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark,
STrans *pTrans) {
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
......
...@@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj ...@@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
int32_t mndInitTopic(SMnode *pMnode) { int32_t mndInitTopic(SMnode *pMnode) {
...@@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) { ...@@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
...@@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo ...@@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return 0; return 0;
} }
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[TSDB_DB_FNAME_LEN] = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); while (numOfRows < rowsCapacity) {
if (pDb == NULL) return 0;
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (pTopic->dbUid != pDb->uid) { int32_t cols = 0;
if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
}
sdbRelease(pSdb, pTopic); char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
continue; tstrncpy(&topicName[VARSTR_HEADER_SIZE], pTopic->name, TSDB_TOPIC_NAME_LEN);
} varDataSetLen(topicName, strlen(&topicName[VARSTR_HEADER_SIZE]));
cols = 0; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
char topicName[TSDB_TOPIC_NAME_LEN] = {0}; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN); colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, topicName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
*(int64_t *)pWrite = pTopic->createTime; char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE);
cols++; strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql);
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; taosMemoryFree(sql);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
cols++;
numOfRows++; numOfRows++;
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
return numOfRows; return numOfRows;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册