diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 909b7d0877e0a0bac316986fb9aff5356a5182a4..ff2e419c7528c18be20e6677cd46b7e4695486fc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1493,7 +1493,7 @@ typedef struct { } SMVSubscribeRsp; typedef struct { - char name[TSDB_TABLE_FNAME_LEN]; + char name[TSDB_TOPIC_FNAME_LEN]; int8_t igNotExists; } SMDropTopicReq; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c768e001c5479da84b5ffbaa2a516eeb4a9b620d..0ce689f19cd8f2579047f9e49eca0ca69609d1f5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1307,7 +1307,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { - // TODO + if (tmq->status == TMQ_CONSUMER_STATUS__READY) { + tmq_list_t* lst = tmq_list_new(); + tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } + } + // TODO: free resources return TMQ_RESP_ERR__SUCCESS; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5ff0282c878ba428b7d16a2fdad2c9ba8416ee50..81682bb734252d85ff8af19eda3edb506d17f8a2 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -262,7 +262,7 @@ static const SSysDbTableSchema topicSchema[] = { static const SSysDbTableSchema consumerSchema[] = { {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, - {.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, @@ -275,7 +275,7 @@ static const SSysDbTableSchema consumerSchema[] = { static const SSysDbTableSchema subscriptionSchema[] = { {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "group_id", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index a8bfe91cbf8edfb225510253b9536115b21cea30..210e336ac251165cfefb0c8f8b60ddbb9d1df843 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -29,6 +29,7 @@ enum { MQ_CONSUMER_STATUS__LOST, MQ_CONSUMER_STATUS__LOST_IN_REB, MQ_CONSUMER_STATUS__LOST_REBD, + MQ_CONSUMER_STATUS__REMOVED, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6c77c379e056b09d00f43c04ab5133418fcd722e..9c8c6d32eb8547028a92b8b1e604f1539c724aa8 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -486,6 +486,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { } } + if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && + taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + /*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/ + /*pConsumerNew->updateType = */ + /*}*/ + goto SUBSCRIBE_OVER; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); if (pTrans == NULL) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; @@ -789,6 +797,10 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer); if (pShow->pIter == NULL) break; + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { + sdbRelease(pSdb, pConsumer); + continue; + } taosRLockLatch(&pConsumer->lock); @@ -810,12 +822,12 @@ static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); - // group id - char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN); - varDataSetLen(groupId, strlen(varDataVal(groupId))); + // consumer group + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)groupId, false); + colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); // app id char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c947a1913eb761133b5f27c9d683d9fc85aac377..2a81f28eddef99b673a061780ed8a7fcbcf7fac6 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -171,14 +171,21 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM return 0; } -static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) { +static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) { int32_t i = 0; while (key[i] != TMQ_SEPARATOR) { i++; } memcpy(cgroup, key, i); cgroup[i] = 0; - strcpy(topic, &key[i + 1]); + if (fullName) { + strcpy(topic, &key[i + 1]); + } else { + while (key[i] != '.') { + i++; + } + strcpy(topic, &key[i + 1]); + } return 0; } @@ -426,7 +433,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO pConsumerNew->updateType = CONSUMER_UPDATE__ADD; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup); + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebNewTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { @@ -444,7 +451,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE; char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup); + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { @@ -494,7 +501,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { // split sub key and extract topic char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pRebInfo->key, topic, cgroup); + mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); ASSERT(pTopic); taosRLockLatch(&pTopic->lock); @@ -747,7 +754,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, topic, cgroup); + mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup))); @@ -790,7 +797,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock // topic and cgroup char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, topic, cgroup); + mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); varDataSetLen(topic, strlen(varDataVal(topic))); varDataSetLen(cgroup, strlen(varDataVal(cgroup))); diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 73cdf7f59ce9333f41e9dd2cfd1b7347949790de..4a59d18d8798222cd425c2bde45138f262372e57 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -331,12 +331,6 @@ void loop_consume(SThreadInfo* pInfo) { } } - err = tmq_consumer_close(pInfo->tmq); - if (err) { - printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeRowCnt = totalRows; @@ -372,6 +366,13 @@ void* consumeThreadFunc(void* param) { return NULL; } + err = tmq_consumer_close(pInfo->tmq); + if (err) { + printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + pInfo->tmq = NULL; + // save consume result into consumeresult table saveConsumeResult(pInfo);