diff --git a/include/common/systable.h b/include/common/systable.h index 506bdcfa8a78e00981d22da51f4116d5c12de64f..e36beb13f2eb2cae1ec93495c3b84550fce617ce 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -45,7 +45,6 @@ extern "C" { #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "smas" -#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" #define TSDB_PERFS_TABLE_CONNECTIONS "connections" #define TSDB_PERFS_TABLE_QUERIES "queries" #define TSDB_PERFS_TABLE_TOPICS "topics" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8e6e73d649eab32b51a46f681920321ca001d6c4..9ec80293a838f9ee825dc77e97f31b12d621caf6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -99,7 +99,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_VGROUP, TSDB_MGMT_TABLE_TOPICS, TSDB_MGMT_TABLE_CONSUMERS, - TSDB_MGMT_TABLE_SUBSCRIBES, + TSDB_MGMT_TABLE_SUBSCRIPTIONS, TSDB_MGMT_TABLE_TRANS, TSDB_MGMT_TABLE_SMAS, TSDB_MGMT_TABLE_CONFIGS, diff --git a/source/common/src/systable.c b/source/common/src/systable.c index e89551bb7d5ae8e04428285a3890fae1a9bb3590..5ff0282c878ba428b7d16a2fdad2c9ba8416ee50 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -274,8 +274,8 @@ static const SSysDbTableSchema consumerSchema[] = { }; static const SSysDbTableSchema subscriptionSchema[] = { - {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index be333d154a4b39d2cf7fd9f65878663a03dda184..b44c8c932bf2316d88050a263516c32ab8a5a5e0 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_VGROUP; } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) { type = TSDB_MGMT_TABLE_CONSUMERS; - } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) { - type = TSDB_MGMT_TABLE_SUBSCRIBES; + } else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) { + type = TSDB_MGMT_TABLE_SUBSCRIPTIONS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) { type = TSDB_MGMT_TABLE_TRANS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 545caea03d350366f6849cc1807519c3849951fe..c947a1913eb761133b5f27c9d683d9fc85aac377 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg); +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); + static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { SSdbRaw *pRedoRaw = mndSubActionEncode(pSub); if (pRedoRaw == NULL) return -1; @@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); + + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe); + return sdbSetTable(pMnode->pSdb, table); } @@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { END: return code; } + +static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { + SMnode *pMnode = pReq->pNode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SMqSubscribeObj *pSub = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); + if (pShow->pIter == NULL) break; + + taosRLockLatch(&pSub->lock); + + if (numOfRows + pSub->vgNum > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum); + } + + SMqConsumerEp *pConsumerEp = NULL; + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + pConsumerEp = (SMqConsumerEp *)pIter; + + int32_t sz = taosArrayGetSize(pConsumerEp->vgs); + for (int32_t j = 0; j < sz; j++) { + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + } + + int32_t sz = taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < sz; i++) { + SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, topic, cgroup); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, NULL, true); + + // offset +#if 0 + // subscribe time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); + + // rebalance time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); +#endif + + numOfRows++; + } + + taosRUnLockLatch(&pSub->lock); + sdbRelease(pSdb, pSub); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 63d429df9e41f40167bdcfd2fbaea80e9146b37c..01149f793f3d8666c4409d46f064fdeac620f45f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); + static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); diff --git a/tests/script/sh/copy_udf.sh b/tests/script/sh/copy_udf.sh index 97f8b163633d019f7bae6d0c0ea66ef9f1366eee..7b5b5f47206065a8ebb00186bed72cd5ac9afe40 100755 --- a/tests/script/sh/copy_udf.sh +++ b/tests/script/sh/copy_udf.sh @@ -23,11 +23,12 @@ echo $UDF1_DIR echo $UDF2_DIR UDF_TMP=/tmp/udf +rm -rf $UDF_TMP mkdir $UDF_TMP -rm $UDF_TMP/libudf1.so -rm $UDF_TMP/libudf2.so echo "Copy udf shared library files to $UDF_TMP" -cp $UDF1_DIR $UDF_TMP +cp $UDF1_DIR $UDF_TMP +echo "copy udf1 result: $?" cp $UDF2_DIR $UDF_TMP +echo "copy udf2 result: $?"