From a2a7dffb5d726b21fde8c3a171f787a7f4e2a6e7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 18 Feb 2023 13:52:07 +0800 Subject: [PATCH] refactor: do some internal refactor and add some logs. --- source/client/src/clientTmq.c | 25 +++++++++++++------------ source/client/test/clientTests.cpp | 2 +- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a17ad97756..5aef065ce0 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -641,17 +641,14 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName, - numOfVgroups); - for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName, - pVg->vgId); + tscDebug("consumer:0x%" PRIx64 " begin commit for topic %s, vgId:%d, ordinal:%d/%d", tmq->consumerId, pTopic->topicName, + pVg->vgId, j + 1, numOfVgroups); if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { - tscDebug("consumer: %" PRId64 ", vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId, + tscDebug("consumer:0x%" PRId64 " vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version); if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { continue; @@ -811,7 +808,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId); + tscDebug("consumer:0x%"PRIx64" will retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); @@ -819,7 +816,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; - tscDebug("consumer:0x%"PRIx64" commit to mnode in %.2f s", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); + tscDebug("consumer:0x%"PRIx64" will commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { // do nothing @@ -1066,7 +1063,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = 0; - tscDebug("consumer:0x%"PRIx64", tmq subscribe start, numOfTopic %d", tmq->consumerId, sz); + tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); @@ -1156,7 +1153,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto FAIL; } - tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry count:%d in 500ms", tmq->consumerId, retryCnt); + tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } @@ -1435,11 +1432,15 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { // Epoch will only increase when received newer epoch ep msg SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); - tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); if (head->epoch <= epoch) { + tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", + tmq->consumerId, head->epoch, epoch); goto END; } + tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, + head->epoch, epoch); + if (!async) { SMqAskEpRsp rsp; tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); @@ -1548,7 +1549,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - tscDebug("consumer:0x%" PRIx64 ", ask ep from mnode", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d", tmq->consumerId, async); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 9b777f05c0..59c931d9aa 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -671,7 +671,7 @@ TEST(clientCase, generated_request_id_test) { uint64_t v = generateRequestId(); void* result = taosHashGet(phash, &v, sizeof(v)); if (result != nullptr) { - printf("0x%lx, index:%d\n", v, i); +// printf("0x%llx, index:%d\n", v, i); } assert(result == nullptr); taosHashPut(phash, &v, sizeof(v), NULL, 0); -- GitLab