From 82b2ae419481238b617cb659621b5134d9c55bd9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 19 May 2022 11:20:47 +0800 Subject: [PATCH] enh(tmq): set and show client id --- include/common/tmsg.h | 3 +++ source/common/src/systable.c | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndConsumer.c | 9 ++++---- source/dnode/mnode/impl/src/mndTopic.c | 26 +++++++++++------------ tests/script/tsim/tstream/basic1.sim | 20 ++++++++--------- 6 files changed, 32 insertions(+), 30 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7a60542313..f7562a4b9b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1480,6 +1480,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; + char clientId[256]; SArray* topicNames; // SArray } SCMSubscribeReq; @@ -1487,6 +1488,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->cgroup); + tlen += taosEncodeString(buf, pReq->clientId); int32_t topicNum = taosArrayGetSize(pReq->topicNames); tlen += taosEncodeFixedI32(buf, topicNum); @@ -1500,6 +1502,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->cgroup); + buf = taosDecodeStringTo(buf, pReq->clientId); int32_t topicNum; buf = taosDecodeFixedI32(buf, &topicNum); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index e4e5abe148..9fe7645e2b 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -263,7 +263,7 @@ static const SSysDbTableSchema topicSchema[] = { static const SSysDbTableSchema consumerSchema[] = { {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ec9139836a..a04417736a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -463,7 +463,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - char appId[TSDB_CGROUP_LEN]; + char clientId[256]; int8_t updateType; // used only for update int32_t epoch; int32_t status; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 274876567e..4fe7d8a399 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -427,6 +427,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerOld = mndAcquireConsumer(pMnode, consumerId); if (pConsumerOld == NULL) { pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->rebNewTopics = newSub; subscribe.topicNames = NULL; @@ -848,11 +849,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); // app id - char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN); - varDataSetLen(appId, strlen(varDataVal(appId))); + char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN); + varDataSetLen(clientId, strlen(varDataVal(clientId))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)appId, false); + colDataAppend(pColInfo, numOfRows, (const char *)clientId, false); // status char status[20 + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 3f559cb6c0..c6eebb5c5d 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -492,8 +492,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo } static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + /*SSdb *pSdb = pMnode->pSdb;*/ SMDropTopicReq dropReq = {0}; if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { @@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); - // if (pTopic == NULL) { - // if (dropReq.igNotExists) { - // mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); - // return 0; - // } else { - // terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - // mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - // return -1; - // } - // } + if (pTopic == NULL) { + if (dropReq.igNotExists) { + mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); + return 0; + } else { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); + return -1; + } + } if (pTopic->refConsumerCnt != 0) { mndReleaseTopic(pMnode, pTopic); @@ -528,12 +528,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); -#if 1 if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { ASSERT(0); return -1; } -#endif if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { ASSERT(0); diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index 9965772c75..0997e313c8 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -409,53 +409,53 @@ sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; # row 1 if $data11 != 4 then print ======$data11 - # return -1 + return -1 endi if $data12 != 4 then print ======$data12 - # return -1 + return -1 endi if $data13 != 10 then print ======$data13 - # return -1 + return -1 endi if $data14 != 3 then print ======$data14 - # return -1 + return -1 endi if $data15 != 1 then print ======$data15 - # return -1 + return -1 endi # row 2 if $data21 != 4 then print ======$data21 - # return -1 + return -1 endi if $data22 != 4 then print ======$data22 - # return -1 + return -1 endi if $data23 != 15 then print ======$data23 - # return -1 + return -1 endi if $data24 != 4 then print ======$data24 - # return -1 + return -1 endi if $data25 != 3 then print ======$data25 - # return -1 + return -1 endi -- GitLab