提交 82b2ae41 编写于 作者: L Liu Jicong

enh(tmq): set and show client id

上级 7d261da4
...@@ -1480,6 +1480,7 @@ typedef struct { ...@@ -1480,6 +1480,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char clientId[256];
SArray* topicNames; // SArray<char**> SArray* topicNames; // SArray<char**>
} SCMSubscribeReq; } SCMSubscribeReq;
...@@ -1487,6 +1488,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc ...@@ -1487,6 +1488,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerId);
tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->cgroup);
tlen += taosEncodeString(buf, pReq->clientId);
int32_t topicNum = taosArrayGetSize(pReq->topicNames); int32_t topicNum = taosArrayGetSize(pReq->topicNames);
tlen += taosEncodeFixedI32(buf, topicNum); tlen += taosEncodeFixedI32(buf, topicNum);
...@@ -1500,6 +1502,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc ...@@ -1500,6 +1502,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerId);
buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeStringTo(buf, pReq->cgroup);
buf = taosDecodeStringTo(buf, pReq->clientId);
int32_t topicNum; int32_t topicNum;
buf = taosDecodeFixedI32(buf, &topicNum); buf = taosDecodeFixedI32(buf, &topicNum);
......
...@@ -263,7 +263,7 @@ static const SSysDbTableSchema topicSchema[] = { ...@@ -263,7 +263,7 @@ static const SSysDbTableSchema topicSchema[] = {
static const SSysDbTableSchema consumerSchema[] = { static const SSysDbTableSchema consumerSchema[] = {
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
......
...@@ -463,7 +463,7 @@ typedef struct { ...@@ -463,7 +463,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t consumerId; int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
char appId[TSDB_CGROUP_LEN]; char clientId[256];
int8_t updateType; // used only for update int8_t updateType; // used only for update
int32_t epoch; int32_t epoch;
int32_t status; int32_t status;
......
...@@ -427,6 +427,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -427,6 +427,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerOld = mndAcquireConsumer(pMnode, consumerId); pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
if (pConsumerOld == NULL) { if (pConsumerOld == NULL) {
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
pConsumerNew->rebNewTopics = newSub; pConsumerNew->rebNewTopics = newSub;
subscribe.topicNames = NULL; subscribe.topicNames = NULL;
...@@ -848,11 +849,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -848,11 +849,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id // app id
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN); tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
varDataSetLen(appId, strlen(varDataVal(appId))); varDataSetLen(clientId, strlen(varDataVal(clientId)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)appId, false); colDataAppend(pColInfo, numOfRows, (const char *)clientId, false);
// status // status
char status[20 + VARSTR_HEADER_SIZE] = {0}; char status[20 + VARSTR_HEADER_SIZE] = {0};
......
...@@ -492,8 +492,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo ...@@ -492,8 +492,8 @@ static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTo
} }
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; /*SSdb *pSdb = pMnode->pSdb;*/
SMDropTopicReq dropReq = {0}; SMDropTopicReq dropReq = {0};
if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSMDropTopicReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
...@@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
} }
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
// if (pTopic == NULL) { if (pTopic == NULL) {
// if (dropReq.igNotExists) { if (dropReq.igNotExists) {
// mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
// return 0; return 0;
// } else { } else {
// terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
// mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
// return -1; return -1;
// } }
// } }
if (pTopic->refConsumerCnt != 0) { if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
...@@ -528,12 +528,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { ...@@ -528,12 +528,10 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 1
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
#endif
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0); ASSERT(0);
......
...@@ -409,53 +409,53 @@ sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt; ...@@ -409,53 +409,53 @@ sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1 # row 1
if $data11 != 4 then if $data11 != 4 then
print ======$data11 print ======$data11
# return -1 return -1
endi endi
if $data12 != 4 then if $data12 != 4 then
print ======$data12 print ======$data12
# return -1 return -1
endi endi
if $data13 != 10 then if $data13 != 10 then
print ======$data13 print ======$data13
# return -1 return -1
endi endi
if $data14 != 3 then if $data14 != 3 then
print ======$data14 print ======$data14
# return -1 return -1
endi endi
if $data15 != 1 then if $data15 != 1 then
print ======$data15 print ======$data15
# return -1 return -1
endi endi
# row 2 # row 2
if $data21 != 4 then if $data21 != 4 then
print ======$data21 print ======$data21
# return -1 return -1
endi endi
if $data22 != 4 then if $data22 != 4 then
print ======$data22 print ======$data22
# return -1 return -1
endi endi
if $data23 != 15 then if $data23 != 15 then
print ======$data23 print ======$data23
# return -1 return -1
endi endi
if $data24 != 4 then if $data24 != 4 then
print ======$data24 print ======$data24
# return -1 return -1
endi endi
if $data25 != 3 then if $data25 != 3 then
print ======$data25 print ======$data25
# return -1 return -1
endi endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册