diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d08cabd27e9ec1c66c13be2fa6b81f0998b54370..f6a2c5fdc162c179a480f09116f59ddb00d307db 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1228,8 +1228,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { -// taosMsleep(500); + taosMsleep(500); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); + tscDebug("consumer:0x%" PRIx64" wait for the re-balance, wait for 500ms and set status to be RECOVER", tmq->consumerId); } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { @@ -1918,7 +1919,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); - // sleep for a while + taosMsleep(500); // sleep for a while return NULL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index e52b04605359f43981c63a9056e20adde9b0a6d7..24974a1973f02c3df83cbc2befaacb4d695fbd8d 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -637,7 +637,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { subscribe.cgroup, (int32_t) taosArrayGetSize(pTopicList)); pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256); + tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); // set the update type pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;