提交 50ae5e74 编写于 作者: H Haojun Liao

refactor: do some internal refactor and add some logs.

上级 a2a7dffb
......@@ -509,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
.handle = NULL,
};
tscDebug("consumer:0x%" PRIx64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64, tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version);
// TODO: put into cb
......@@ -643,16 +643,15 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
tscDebug("consumer:0x%" PRIx64 " begin commit for topic %s, vgId:%d, ordinal:%d/%d", tmq->consumerId, pTopic->topicName,
pVg->vgId, j + 1, numOfVgroups);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
tscDebug("consumer:0x%" PRId64 " vg:%d, current %" PRId64 ", committed %" PRId64 "", tmq->consumerId, pVg->vgId,
pVg->currentOffset.version, pVg->committedOffset.version);
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, current %" PRId64 ", committed %" PRId64, tmq->consumerId,
pTopic->topicName, pVg->vgId, pVg->currentOffset.version, pVg->committedOffset.version);
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
continue;
}
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
}
}
}
......@@ -808,7 +807,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" will retrieve ep from mnode in 1s", pTmq->consumerId);
tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
......@@ -816,7 +815,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
tscDebug("consumer:0x%"PRIx64" will commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
// do nothing
......@@ -1309,7 +1308,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:0x%" PRIx64", update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tscDebug("consumer:0x%" PRIx64" update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
......@@ -1634,14 +1633,18 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
// broadcast the poll request to all related vnodes
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64" start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
for (int i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) {
if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt);
continue;
/*if (vgSkipCnt < 10000) continue;*/
......@@ -1653,6 +1656,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
SMqPollReq req = {0};
......@@ -1663,6 +1667,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
tsem_post(&tmq->rspSem);
return -1;
}
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
......@@ -1684,6 +1689,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
tsem_post(&tmq->rspSem);
return -1;
}
pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch;
......@@ -1790,7 +1796,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper);
......@@ -1809,7 +1815,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper);
......@@ -1840,7 +1846,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper);
return pRsp;
} else {
tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tscDebug("consumer:0x%"PRIx64" msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper);
......@@ -1885,7 +1891,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (retryCnt++ > 10) {
return NULL;
}
tscDebug("consumer not ready, retry");
tscDebug("consumer:0x%"PRIx64" not ready, retry:%d/10 in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册