diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5aef065ce0d6a8f5a26f7eb3f458ed7083cf0c55..b9b66e1222e8698a77aac804314af2f2bb20a7ca 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -509,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT .handle = NULL, }; - tscDebug("consumer:0x%" PRIx64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey, + tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64, tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version); // TODO: put into cb @@ -643,16 +643,15 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - - tscDebug("consumer:0x%" PRIx64 " begin commit for topic %s, vgId:%d, ordinal:%d/%d", tmq->consumerId, pTopic->topicName, - pVg->vgId, j + 1, numOfVgroups); - if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { - tscDebug("consumer:0x%" PRId64 " vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId, - pVg->currentOffset.version, pVg->committedOffset.version); + tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, current %" PRId64 ", committed %" PRId64, tmq->consumerId, + pTopic->topicName, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version); if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { continue; } + } else { + tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups); } } } @@ -808,7 +807,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - tscDebug("consumer:0x%"PRIx64" will retrieve ep from mnode in 1s", pTmq->consumerId); + tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); @@ -816,7 +815,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - tscDebug("consumer:0x%"PRIx64" will commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); + tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { // do nothing @@ -1309,7 +1308,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { int32_t topicNumGet = taosArrayGetSize(pRsp->topics); char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscDebug("consumer:0x%" PRIx64", update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", + tscDebug("consumer:0x%" PRIx64" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); @@ -1634,14 +1633,18 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { // broadcast the poll request to all related vnodes int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { - for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); + tscDebug("consumer:0x%" PRIx64" start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); + + for (int i = 0; i < numOfTopics; i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); - if (vgStatus != TMQ_VG_STATUS__IDLE) { + if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscTrace("consumer:0x%" PRIx64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, + tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; /*if (vgSkipCnt < 10000) continue;*/ @@ -1653,6 +1656,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } #endif } + atomic_store_32(&pVg->vgSkipCnt, 0); SMqPollReq req = {0}; @@ -1663,6 +1667,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { tsem_post(&tmq->rspSem); return -1; } + char* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); @@ -1684,6 +1689,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { tsem_post(&tmq->rspSem); return -1; } + pParam->refId = tmq->refId; pParam->epoch = tmq->epoch; @@ -1790,7 +1796,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch); tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); @@ -1809,7 +1815,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); @@ -1840,7 +1846,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmqFreeRspWrapper(rspWrapper); taosFreeQitem(pollRspWrapper); @@ -1885,7 +1891,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (retryCnt++ > 10) { return NULL; } - tscDebug("consumer not ready, retry"); + + tscDebug("consumer:0x%"PRIx64" not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 59c931d9aa8b30bce9933c322db2bf50c8611093..cb3c2f8c6836b345630730943ee1702d0cb6bab1 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName"); + tmq_conf_set(conf, "group.id", "newabcdefgjhijlm__"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 658fa75d9cf6eb20fcebc1f0d315bb86e874a74f..c033c3eea9cc1e992216f633912425efad7b616a 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -413,19 +413,21 @@ void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) { return (void *)buf; } -SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) { - SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); - if (pSubNew == NULL) return NULL; - memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN); - taosInitRWLatch(&pSubNew->lock); - pSubNew->vgNum = 0; - pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - // TODO set hash free fp - /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/ +SMqSubscribeObj *tNewSubscribeObj(const char* key) { + SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj)); + if (pSubObj == NULL) { + return NULL; + } - pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *)); + memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN); + taosInitRWLatch(&pSubObj->lock); + pSubObj->vgNum = 0; + pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); - return pSubNew; + // TODO set hash free fp + /*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/ + pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES); + return pSubObj; } SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {