提交 4b7c88bc 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 37fea8cf
......@@ -1080,7 +1080,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SCMSubscribeReq req = {0};
int32_t code = 0;
tscDebug("consumer:0x%"PRIx64" tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
tscDebug("consumer:0x%"PRIx64" subscribe %d topics", tmq->consumerId, sz);
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256);
......@@ -1103,7 +1103,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
tNameExtractFullName(&name, topicFName);
tscDebug("consumer:0x%"PRIx64" subscribe topic: %s", tmq->consumerId, topicFName);
tscDebug("consumer:0x%"PRIx64" subscribe topic:%s", tmq->consumerId, topicFName);
taosArrayPush(req.topicNames, &topicFName);
}
......@@ -1774,6 +1774,8 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
}
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%"PRIx64" start to handle the rsp", tmq->consumerId);
while (1) {
SMqRspWrapper* rspWrapper = NULL;
taosGetQitem(tmq->qall, (void**)&rspWrapper);
......@@ -1783,20 +1785,17 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosGetQitem(tmq->qall, (void**)&rspWrapper);
if (rspWrapper == NULL) {
/*tscDebug("consumer %" PRId64 " mqueue empty", tmq->consumerId);*/
return NULL;
}
}
tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
......@@ -1823,6 +1822,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
......@@ -1887,7 +1889,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj;
int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64, tmq->consumerId, startTime);
#if 0
tmqHandleAllDelayedTask(tmq);
......@@ -1900,7 +1902,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:0x%" PRIx64 ", poll return since consumer is init", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
return NULL;
}
......@@ -1926,25 +1928,25 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
return NULL;
}
if (timeout != -1) {
int64_t currentTime = taosGetTimestampMs();
int64_t passedTime = currentTime - startTime;
if (passedTime > timeout) {
tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout) {
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
/*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
/*", left time %" PRId64,*/
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
tsem_timewait(&tmq->rspSem, (timeout - passedTime));
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/
tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else {
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
tsem_timewait(&tmq->rspSem, 1000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册