diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 1e98f62f36de555796d6b5581e64273eb4998530..d358f4eecfbc488d11984ef386041da4e8c20c38 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1080,7 +1080,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = 0; - tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz); + tscDebug("consumer:0x%"PRIx64" subscribe %d topics", tmq->consumerId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); @@ -1103,7 +1103,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } tNameExtractFullName(&name, topicFName); - tscDebug("consumer:0x%"PRIx64" subscribe topic: %s", tmq->consumerId, topicFName); + tscDebug("consumer:0x%"PRIx64" subscribe topic:%s", tmq->consumerId, topicFName); taosArrayPush(req.topicNames, &topicFName); } @@ -1774,6 +1774,8 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) } void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { + tscDebug("consumer:0x%"PRIx64" start to handle the rsp", tmq->consumerId); + while (1) { SMqRspWrapper* rspWrapper = NULL; taosGetQitem(tmq->qall, (void**)&rspWrapper); @@ -1783,20 +1785,17 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosGetQitem(tmq->qall, (void**)&rspWrapper); if (rspWrapper == NULL) { - /*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/ return NULL; } } - tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper); - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { taosFreeQitem(rspWrapper); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; return NULL; } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; - tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId); /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { @@ -1823,6 +1822,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); + if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, @@ -1887,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime); + tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime); #if 0 tmqHandleAllDelayedTask(tmq); @@ -1900,7 +1902,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tscDebug("consumer:0x%" PRIx64 ", poll return since consumer is init", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); return NULL; } @@ -1926,25 +1928,25 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { - tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj); + tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); return NULL; } if (timeout != -1) { int64_t currentTime = taosGetTimestampMs(); - int64_t passedTime = currentTime - startTime; - if (passedTime > timeout) { - tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + int64_t elapsedTime = currentTime - startTime; + if (elapsedTime > timeout) { + tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/ /*", left time %" PRId64,*/ - /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/ - tsem_timewait(&tmq->rspSem, (timeout - passedTime)); + /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/ + tsem_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { // use tsem_timewait instead of tsem_wait to avoid unexpected stuck tsem_timewait(&tmq->rspSem, 1000);