提交 31a8af9e 编写于 作者: wmmhello's avatar wmmhello

fix:do not commit offset if seek offset

上级 8e011c46
...@@ -2799,35 +2799,35 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ ...@@ -2799,35 +2799,35 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
// update the offset, and then commit to vnode // update the offset, and then commit to vnode
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
pOffsetInfo->committedOffset.version = INT64_MIN; // pOffsetInfo->committedOffset.version = INT64_MIN;
pVg->seekUpdated = true; pVg->seekUpdated = true;
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); // SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
if (pInfo == NULL) { // tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); //
return TSDB_CODE_OUT_OF_MEMORY; // SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
} // if (pInfo == NULL) {
// tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
tsem_init(&pInfo->sem, 0, 0); // return TSDB_CODE_OUT_OF_MEMORY;
pInfo->code = 0; // }
//
asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo); // tsem_init(&pInfo->sem, 0, 0);
// pInfo->code = 0;
tsem_wait(&pInfo->sem); //
int32_t code = pInfo->code; // asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);
//
tsem_destroy(&pInfo->sem); // tsem_wait(&pInfo->sem);
taosMemoryFree(pInfo); // int32_t code = pInfo->code;
//
if (code != TSDB_CODE_SUCCESS) { // tsem_destroy(&pInfo->sem);
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code)); // taosMemoryFree(pInfo);
} //
// if (code != TSDB_CODE_SUCCESS) {
// tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code));
// }
return code; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册