diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ade16b7f1a15cc67b6b60b2ae8966a0bc931b007..59a407656d47d2016fe00afb350851b89ae4e3fb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -198,7 +198,7 @@ typedef struct { int32_t waitingRspNum; int32_t totalRspNum; int32_t code; - tmq_commit_cb* userCb; + tmq_commit_cb* callbackFn; /*SArray* successfulOffsets;*/ /*SArray* failedOffsets;*/ void* userParam; @@ -590,7 +590,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; - pParamSet->userCb = pCommitFp; + pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); @@ -656,7 +656,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us pParamSet->refId = tmq->refId; pParamSet->epoch = tmq->epoch; - pParamSet->userCb = pCommitFp; + pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; // init as 1 to prevent concurrency issue @@ -810,6 +810,12 @@ OVER: taosReleaseRef(tmqMgmt.rsetId, refId); } +static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { + if (code != 0) { + tscDebug("consumer:0x%"PRIx64", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); + } +} + int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = taosAllocateQall(); taosReadAllQitems(pTmq->delayedTask, qall); @@ -833,7 +839,9 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - asyncCommitAllOffsets(pTmq, pTmq->commitCb, pTmq->commitCbUserParam); + tmq_commit_cb* pCallbackFn = pTmq->commitCb? pTmq->commitCb:defaultCommitCbFn; + + asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; @@ -1029,8 +1037,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; - /*pTmq->epStatus = 0;*/ - /*pTmq->epSkipCnt = 0;*/ // set conf strcpy(pTmq->clientId, conf->clientId); @@ -2279,7 +2285,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } // if no more waiting rsp - pParamSet->userCb(tmq, pParamSet->code, pParamSet->userParam); + pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); taosMemoryFree(pParamSet); taosReleaseRef(tmqMgmt.rsetId, refId);