未验证 提交 05755a28 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12276 from taosdata/feature/tq

feat(tmq): show subcription
......@@ -45,7 +45,6 @@ extern "C" {
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "smas"
#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes"
#define TSDB_PERFS_TABLE_CONNECTIONS "connections"
#define TSDB_PERFS_TABLE_QUERIES "queries"
#define TSDB_PERFS_TABLE_TOPICS "topics"
......
......@@ -99,7 +99,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_VGROUP,
TSDB_MGMT_TABLE_TOPICS,
TSDB_MGMT_TABLE_CONSUMERS,
TSDB_MGMT_TABLE_SUBSCRIBES,
TSDB_MGMT_TABLE_SUBSCRIPTIONS,
TSDB_MGMT_TABLE_TRANS,
TSDB_MGMT_TABLE_SMAS,
TSDB_MGMT_TABLE_CONFIGS,
......
......@@ -274,8 +274,8 @@ static const SSysDbTableSchema consumerSchema[] = {
};
static const SSysDbTableSchema subscriptionSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
};
......
......@@ -85,8 +85,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_VGROUP;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) {
type = TSDB_MGMT_TABLE_CONSUMERS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIBES, len) == 0) {
type = TSDB_MGMT_TABLE_SUBSCRIBES;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SUBSCRIPTIONS, len) == 0) {
type = TSDB_MGMT_TABLE_SUBSCRIPTIONS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) {
type = TSDB_MGMT_TABLE_TRANS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) {
......
......@@ -44,6 +44,9 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs
static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pMsg);
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1;
......@@ -71,6 +74,10 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -706,3 +713,124 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
END:
return code;
}
static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SMqSubscribeObj *pSub = NULL;
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub);
if (pShow->pIter == NULL) break;
taosRLockLatch(&pSub->lock);
if (numOfRows + pSub->vgNum > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
}
SMqConsumerEp *pConsumerEp = NULL;
void *pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < sz; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SColumnInfoData *pColInfo;
int32_t cols = 0;
// topic and cgroup
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup);
varDataSetLen(topic, strlen(varDataVal(topic)));
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// vg id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
// offset
#if 0
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif
numOfRows++;
}
}
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SColumnInfoData *pColInfo;
int32_t cols = 0;
// topic and cgroup
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
mndSplitSubscribeKey(pSub->key, topic, cgroup);
varDataSetLen(topic, strlen(varDataVal(topic)));
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topic, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// vg id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
// consumer id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, NULL, true);
// offset
#if 0
// subscribe time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
// rebalance time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
#endif
numOfRows++;
}
taosRUnLockLatch(&pSub->lock);
sdbRelease(pSdb, pSub);
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
......@@ -35,6 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
......
......@@ -23,11 +23,12 @@ echo $UDF1_DIR
echo $UDF2_DIR
UDF_TMP=/tmp/udf
rm -rf $UDF_TMP
mkdir $UDF_TMP
rm $UDF_TMP/libudf1.so
rm $UDF_TMP/libudf2.so
echo "Copy udf shared library files to $UDF_TMP"
cp $UDF1_DIR $UDF_TMP
cp $UDF1_DIR $UDF_TMP
echo "copy udf1 result: $?"
cp $UDF2_DIR $UDF_TMP
echo "copy udf2 result: $?"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册