提交 ed21ef04 编写于 作者: H Haojun Liao

fix(tmq): fix sync commit error.

上级 32109e73
...@@ -1315,7 +1315,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1315,7 +1315,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
char buf[80]; char buf[80];
tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset); tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId); tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder; SDecoder decoder;
...@@ -2132,9 +2132,9 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { ...@@ -2132,9 +2132,9 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
pInfo->code = 0; pInfo->code = 0;
if (pRes == NULL) { if (pRes == NULL) {
asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo);
} else {
asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo); asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
} else {
asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo);
} }
tsem_wait(&pInfo->sem); tsem_wait(&pInfo->sem);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册