提交 6836dc1c 编写于 作者: H Haojun Liao

refactor: do some internal refactor and add some logs.

上级 7d915626
...@@ -153,11 +153,9 @@ typedef struct { ...@@ -153,11 +153,9 @@ typedef struct {
typedef struct { typedef struct {
// subscribe info // subscribe info
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
SArray* vgs; // SArray<SMqClientVg>
SArray* vgs; // SArray<SMqClientVg>
SSchemaWrapper schema; SSchemaWrapper schema;
} SMqClientTopic; } SMqClientTopic;
...@@ -511,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT ...@@ -511,7 +509,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
.handle = NULL, .handle = NULL,
}; };
tscDebug("consumer:%" PRId64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey, tscDebug("consumer:0x%" PRIx64 ", commit offset of %s on vgId:%d, offset is %" PRId64, tmq->consumerId, pOffset->subKey,
pVg->vgId, pOffset->val.version); pVg->vgId, pOffset->val.version);
// TODO: put into cb // TODO: put into cb
...@@ -642,13 +640,14 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async, ...@@ -642,13 +640,14 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName, int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
(int32_t)taosArrayGetSize(pTopic->vgs)); tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
numOfVgroups);
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName, tscDebug("consumer:0x%" PRIx64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName,
pVg->vgId); pVg->vgId);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
...@@ -792,34 +791,38 @@ OVER: ...@@ -792,34 +791,38 @@ OVER:
taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
} }
int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = taosAllocateQall(); STaosQall* qall = taosAllocateQall();
taosReadAllQitems(tmq->delayedTask, qall); taosReadAllQitems(pTmq->delayedTask, qall);
tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
while (1) { while (1) {
int8_t* pTaskType = NULL; int8_t* pTaskType = NULL;
taosGetQitem(qall, (void**)&pTaskType); taosGetQitem(qall, (void**)&pTaskType);
if (pTaskType == NULL) break; if (pTaskType == NULL) break;
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
tmqAskEp(tmq, true); tmqAskEp(pTmq, true);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = tmq->refId; *pRefId = pTmq->refId;
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &tmq->epTimer); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = tmq->refId; *pRefId = pTmq->refId;
taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId, tmqMgmt.timer, &tmq->commitTimer); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
} else { } else {
ASSERT(0); ASSERT(0);
} }
taosFreeQitem(pTaskType); taosFreeQitem(pTaskType);
} }
taosFreeQall(qall); taosFreeQall(qall);
return 0; return 0;
} }
...@@ -947,7 +950,7 @@ static void tmqMgmtInit(void) { ...@@ -947,7 +950,7 @@ static void tmqMgmtInit(void) {
} }
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl); tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
if (tmqMgmt.rsetId != 0) { if (tmqMgmt.rsetId < 0) {
tmqInitRes = terrno; tmqInitRes = terrno;
} }
} }
...@@ -1257,7 +1260,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1257,7 +1260,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
tscDebug("consumer:%" PRId64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d", tscDebug("consumer:0x%" PRIx64 ", recv poll: vgId:%d, req offset %" PRId64 ", rsp offset %" PRId64 " type %d",
tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, tmq->consumerId, pVg->vgId, pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version,
rspType); rspType);
...@@ -1280,7 +1283,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1280,7 +1283,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pEpSet);
tscDebug("consumer:%" PRId64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper); tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue %p", tmq->consumerId, pRspWrapper);
taosWriteQitem(tmq->mqueue, pRspWrapper); taosWriteQitem(tmq->mqueue, pRspWrapper);
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
...@@ -1297,10 +1300,12 @@ CREATE_MSG_FAIL: ...@@ -1297,10 +1300,12 @@ CREATE_MSG_FAIL:
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
bool set = false; bool set = false;
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num:%d", tmq->consumerId, tmq->epoch, epoch, tscDebug("consumer:0x%" PRIx64", update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
topicNumGet); tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) { if (newTopics == NULL) {
...@@ -1312,19 +1317,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1312,19 +1317,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
taosArrayDestroy(newTopics); taosArrayDestroy(newTopics);
return false; return false;
} }
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < topicNumCur; i++) { for (int32_t i = 0; i < topicNumCur; i++) {
// find old topic // find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if (pTopicCur->vgs) { if (pTopicCur->vgs) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur); tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
for (int32_t j = 0; j < vgNumCur; j++) { for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &pVgCur->currentOffset); tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, tscDebug("consumer:0x%" PRIx64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
pVgCur->vgId, vgKey, buf); pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal));
} }
...@@ -1340,7 +1345,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1340,7 +1345,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName); tscDebug("consumer:0x%" PRIx64 ", update topic: %s", tmq->consumerId, topic.topicName);
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
...@@ -1366,6 +1371,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1366,6 +1371,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
} }
taosArrayPush(newTopics, &topic); taosArrayPush(newTopics, &topic);
} }
// destroy current buffered existed topics info
if (tmq->clientTopics) { if (tmq->clientTopics) {
int32_t sz = taosArrayGetSize(tmq->clientTopics); int32_t sz = taosArrayGetSize(tmq->clientTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
...@@ -1373,17 +1380,21 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { ...@@ -1373,17 +1380,21 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema); if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
taosArrayDestroy(pTopic->vgs); taosArrayDestroy(pTopic->vgs);
} }
taosArrayDestroy(tmq->clientTopics); taosArrayDestroy(tmq->clientTopics);
} }
taosHashCleanup(pHash); taosHashCleanup(pHash);
tmq->clientTopics = newTopics; tmq->clientTopics = newTopics;
if (taosArrayGetSize(tmq->clientTopics) == 0) if (taosArrayGetSize(tmq->clientTopics) == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
else } else {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
tscDebug("consumer:0x%" PRIx64 ", update topic info completed", tmq->consumerId);
return set; return set;
} }
...@@ -1406,8 +1417,8 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1406,8 +1417,8 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
pParam->code = code; pParam->code = code;
if (code != 0) { if (code != 0) {
tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId, tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId,
pParam->async, code); pParam->async, tstrerror(code));
goto END; goto END;
} }
...@@ -1416,7 +1427,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1416,7 +1427,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
// Epoch will only increase when received newer epoch ep msg // Epoch will only increase when received newer epoch ep msg
SMqRspHead* head = pMsg->pData; SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch); int32_t epoch = atomic_load_32(&tmq->epoch);
tscDebug("consumer:%" PRId64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
if (head->epoch <= epoch) { if (head->epoch <= epoch) {
goto END; goto END;
} }
...@@ -1435,6 +1446,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1435,6 +1446,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
code = -1; code = -1;
goto END; goto END;
} }
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch; pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
...@@ -1463,7 +1475,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1463,7 +1475,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) { if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1); int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer:%" PRId64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt); tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0; if (epSkipCnt < 5000) return 0;
} }
atomic_store_32(&tmq->epSkipCnt, 0); atomic_store_32(&tmq->epSkipCnt, 0);
...@@ -1521,7 +1533,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { ...@@ -1521,7 +1533,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
.handle = NULL, .handle = NULL,
}; };
sendInfo->requestId = generateRequestId(); sendInfo->requestId = tmq->consumerId;
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb; sendInfo->fp = tmqAskEpCb;
...@@ -1611,6 +1623,7 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1611,6 +1623,7 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj; return pRspObj;
} }
// broadcast the poll request to all related vnodes
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
...@@ -1619,7 +1632,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1619,7 +1632,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, tscTrace("consumer:0x%" PRIx64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt); vgSkipCnt);
continue; continue;
/*if (vgSkipCnt < 10000) continue;*/ /*if (vgSkipCnt < 10000) continue;*/
...@@ -1627,7 +1640,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1627,7 +1640,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if (skipCnt < 30000) { if (skipCnt < 30000) {
continue; continue;
} else { } else {
tscDebug("consumer:%" PRId64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId); tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
} }
#endif #endif
} }
...@@ -1683,6 +1696,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1683,6 +1696,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
.len = msgSize, .len = msgSize,
.handle = NULL, .handle = NULL,
}; };
sendInfo->requestId = req.reqId; sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
...@@ -1690,18 +1704,19 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { ...@@ -1690,18 +1704,19 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
sendInfo->msgType = TDMT_VND_TMQ_CONSUME; sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/
char offsetFormatBuf[80]; char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset); tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
tmq->pollCnt++; tmq->pollCnt++;
} }
} }
return 0; return 0;
} }
...@@ -1739,7 +1754,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1739,7 +1754,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} }
} }
tscDebug("consumer:%" PRId64 " handle rsp %p", tmq->consumerId, rspWrapper); tscDebug("consumer:0x%" PRIx64 " handle rsp %p", tmq->consumerId, rspWrapper);
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
taosFreeQitem(rspWrapper); taosFreeQitem(rspWrapper);
...@@ -1747,7 +1762,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1747,7 +1762,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
tscDebug("consumer %" PRId64 " actual process poll rsp", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " actual process poll rsp", tmq->consumerId);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
...@@ -1766,8 +1781,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1766,8 +1781,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->dataRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
...@@ -1785,8 +1800,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1785,8 +1800,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->metaRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
...@@ -1816,8 +1831,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1816,8 +1831,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
return pRsp; return pRsp;
} else { } else {
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tscDebug("consumer:0x%"PRIx64", msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
tmqFreeRspWrapper(rspWrapper); tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
...@@ -1827,7 +1842,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1827,7 +1842,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmqHandleNoPollRsp(tmq, rspWrapper, &reset); tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
taosFreeQitem(rspWrapper); taosFreeQitem(rspWrapper);
if (pollIfReset && reset) { if (pollIfReset && reset) {
tscDebug("consumer:%" PRId64 ", reset and repoll", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, timeout); tmqPollImpl(tmq, timeout);
} }
} }
...@@ -1838,7 +1853,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1838,7 +1853,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
void* rspObj; void* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
tscDebug("consumer:%" PRId64 ", start poll at %" PRId64, tmq->consumerId, startTime); tscDebug("consumer:0x%" PRIx64 ", start poll at %" PRId64, tmq->consumerId, startTime);
#if 0 #if 0
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
...@@ -1851,7 +1866,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1851,7 +1866,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
// in no topic status, delayed task also need to be processed // in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:%" PRId64 ", poll return since consumer status is init", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 ", poll return since consumer status is init", tmq->consumerId);
return NULL; return NULL;
} }
...@@ -1868,28 +1883,30 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -1868,28 +1883,30 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (1) { while (1) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
if (tmqPollImpl(tmq, timeout) < 0) { if (tmqPollImpl(tmq, timeout) < 0) {
tscDebug("consumer:%" PRId64 " return since poll err", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
/*return NULL;*/ /*return NULL;*/
} }
rspObj = tmqHandleAllRsp(tmq, timeout, false); rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) { if (rspObj) {
tscDebug("consumer:%" PRId64 ", return rsp %p", tmq->consumerId, rspObj); tscDebug("consumer:0x%" PRIx64 ", return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj; return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscDebug("consumer:%" PRId64 ", return null since no committed offset", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 ", return null since no committed offset", tmq->consumerId);
return NULL; return NULL;
} }
if (timeout != -1) { if (timeout != -1) {
int64_t currentTime = taosGetTimestampMs(); int64_t currentTime = taosGetTimestampMs();
int64_t passedTime = currentTime - startTime; int64_t passedTime = currentTime - startTime;
if (passedTime > timeout) { if (passedTime > timeout) {
tscDebug("consumer:%" PRId64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tscDebug("consumer:0x%" PRIx64 ", (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime); tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL; return NULL;
} }
/*tscInfo("consumer:%" PRId64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/ /*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
/*", left time %" PRId64,*/ /*", left time %" PRId64,*/
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/ /*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - passedTime));*/
tsem_timewait(&tmq->rspSem, (timeout - passedTime)); tsem_timewait(&tmq->rspSem, (timeout - passedTime));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册