diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 826bbbd8a363349a1e0b7e2370ef51cf951a0649..de338a464281ff2fae0b09579f6f7e9acd805463 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1799,8 +1799,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->dataRsp.blockNum == 0) { - tscDebug("consumer:0x%" PRIx64 " empty block received in poll rsp", tmq->consumerId); - + tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d", tmq->consumerId, pVg->vgId); taosFreeQitem(pollRspWrapper); rspWrapper = NULL; continue; @@ -1913,11 +1912,11 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { - if (retryCnt++ > 10) { + if (retryCnt++ > 40) { return NULL; } - tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt); + tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 803c4f81e7417493cc8e2eaa472a9dccfe5657f0..50aa1bb8c777b8ea9d58d0610feae944b09edf06 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -621,7 +621,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pExistedConsumer = mndAcquireConsumer(pMnode, consumerId); if (pExistedConsumer == NULL) { - mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup); + mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s, numOfTopics:%d", consumerId, + subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList)); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);