提交 23e8edd1 编写于 作者: H Haojun Liao

fix(tmq): set the default commit callback fn.

上级 f1384206
...@@ -198,7 +198,7 @@ typedef struct { ...@@ -198,7 +198,7 @@ typedef struct {
int32_t waitingRspNum; int32_t waitingRspNum;
int32_t totalRspNum; int32_t totalRspNum;
int32_t code; int32_t code;
tmq_commit_cb* userCb; tmq_commit_cb* callbackFn;
/*SArray* successfulOffsets;*/ /*SArray* successfulOffsets;*/
/*SArray* failedOffsets;*/ /*SArray* failedOffsets;*/
void* userParam; void* userParam;
...@@ -590,7 +590,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p ...@@ -590,7 +590,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p
pParamSet->refId = tmq->refId; pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch; pParamSet->epoch = tmq->epoch;
pParamSet->userCb = pCommitFp; pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
...@@ -656,7 +656,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us ...@@ -656,7 +656,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
pParamSet->refId = tmq->refId; pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch; pParamSet->epoch = tmq->epoch;
pParamSet->userCb = pCommitFp; pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
// init as 1 to prevent concurrency issue // init as 1 to prevent concurrency issue
...@@ -810,6 +810,12 @@ OVER: ...@@ -810,6 +810,12 @@ OVER:
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
} }
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
if (code != 0) {
tscDebug("consumer:0x%"PRIx64", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
}
}
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = taosAllocateQall(); STaosQall* qall = taosAllocateQall();
taosReadAllQitems(pTmq->delayedTask, qall); taosReadAllQitems(pTmq->delayedTask, qall);
...@@ -833,7 +839,9 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { ...@@ -833,7 +839,9 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
asyncCommitAllOffsets(pTmq, pTmq->commitCb, pTmq->commitCbUserParam); tmq_commit_cb* pCallbackFn = pTmq->commitCb? pTmq->commitCb:defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId; *pRefId = pTmq->refId;
...@@ -1029,8 +1037,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { ...@@ -1029,8 +1037,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->status = TMQ_CONSUMER_STATUS__INIT;
pTmq->pollCnt = 0; pTmq->pollCnt = 0;
pTmq->epoch = 0; pTmq->epoch = 0;
/*pTmq->epStatus = 0;*/
/*pTmq->epSkipCnt = 0;*/
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
...@@ -2279,7 +2285,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { ...@@ -2279,7 +2285,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
} }
// if no more waiting rsp // if no more waiting rsp
pParamSet->userCb(tmq, pParamSet->code, pParamSet->userParam); pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册