From 52e1fcbff74b18bc45ceee417f1e85b54d198717 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 9 May 2022 18:09:55 +0800 Subject: [PATCH] feat(tmq): show subcription --- include/common/systable.h | 27 +++-- include/common/tmsg.h | 2 +- source/common/src/systable.c | 4 +- source/dnode/mnode/impl/src/mndShow.c | 4 +- source/dnode/mnode/impl/src/mndSubscribe.c | 128 +++++++++++++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 1 + 6 files changed, 147 insertions(+), 19 deletions(-) diff --git a/include/common/systable.h b/include/common/systable.h index bd8aae998f..78e6e355e2 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -44,27 +44,26 @@ extern "C" { #define TSDB_INS_TABLE_VNODES "vnodes" #define TSDB_INS_TABLE_CONFIGS "configs" -#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" -#define TSDB_PERFS_TABLE_SMAS "smas" -#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" -#define TSDB_PERFS_TABLE_CONNECTIONS "connections" -#define TSDB_PERFS_TABLE_QUERIES "queries" -#define TSDB_PERFS_TABLE_TOPICS "topics" -#define TSDB_PERFS_TABLE_CONSUMERS "consumers" -#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" -#define TSDB_PERFS_TABLE_OFFSETS "offsets" -#define TSDB_PERFS_TABLE_TRANS "trans" -#define TSDB_PERFS_TABLE_STREAMS "streams" +#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" +#define TSDB_PERFS_TABLE_SMAS "smas" +#define TSDB_PERFS_TABLE_CONNECTIONS "connections" +#define TSDB_PERFS_TABLE_QUERIES "queries" +#define TSDB_PERFS_TABLE_TOPICS "topics" +#define TSDB_PERFS_TABLE_CONSUMERS "consumers" +#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" +#define TSDB_PERFS_TABLE_OFFSETS "offsets" +#define TSDB_PERFS_TABLE_TRANS "trans" +#define TSDB_PERFS_TABLE_STREAMS "streams" typedef struct SSysDbTableSchema { - const char *name; + const char* name; const int32_t type; const int32_t bytes; } SSysDbTableSchema; typedef struct SSysTableMeta { - const char *name; - const SSysDbTableSchema *schema; + const char* name; + const SSysDbTableSchema* schema; const int32_t colNum; } SSysTableMeta; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7636d9b9d0..171506484a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -99,7 +99,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_VGROUP, TSDB_MGMT_TABLE_TOPICS, TSDB_MGMT_TABLE_CONSUMERS, - TSDB_MGMT_TABLE_SUBSCRIBES, + TSDB_MGMT_TABLE_SUBSCRIPTIONS, TSDB_MGMT_TABLE_TRANS, TSDB_MGMT_TABLE_SMAS, TSDB_MGMT_TABLE_CONFIGS, diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 4b88b5b384..11b0636584 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -273,8 +273,8 @@ static const SSysDbTableSchema consumerSchema[] = { }; static const SSysDbTableSchema subscriptionSchema[] = { - {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index be333d154a..b44c8c932b 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_VGROUP; } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) { type = TSDB_MGMT_TABLE_CONSUMERS; - } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) { - type = TSDB_MGMT_TABLE_SUBSCRIBES; + } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) { + type = TSDB_MGMT_TABLE_SUBSCRIPTIONS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) { type = TSDB_MGMT_TABLE_TRANS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 545caea03d..c947a1913e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg); +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); + static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); if (pRedoRaw == NULL) return -1; @@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); + return sdbSetTable(pMnode->pSdb, table); } @@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { END: return code; } + +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { + SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SMqSubscribeObj *pSub = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); + if (pShow->pIter == NULL) break; + + taosRLockLatch(&pSub->lock); + + if (numOfRows + pSub->vgNum > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum); + } + + SMqConsumerEp *pConsumerEp = NULL; + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + pConsumerEp = (SMqConsumerEp *)pIter; + + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + for (int32_t j = 0; j < sz; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + } + + int32_t sz = taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, NULL, true); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + + taosRUnLockLatch(&pSub->lock); + sdbRelease(pSdb, pSub); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 63d429df9e..01149f793f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); + static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); -- GitLab