From a3e214b9e8eb24b842ad22b96f09e83bcf9e2632 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 26 May 2023 19:17:04 +0800 Subject: [PATCH] fix:put poll to push manager if wal not exist when offset is latest --- include/libs/wal/wal.h | 3 ++- source/dnode/vnode/src/tq/tqUtil.c | 12 +++--------- source/libs/executor/src/executor.c | 3 --- source/libs/wal/src/walRef.c | 17 +++++++++-------- 4 files changed, 14 insertions(+), 21 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 7e106eefde..7af218b78e 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -205,7 +205,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); -SWalRef *walRefFirstVer(SWal *, SWalRef *); +void walRefFirstVer(SWal *, SWalRef *); +void walRefLastVer(SWal *, SWalRef *); SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 33d2a238c5..5eaf7b240b 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -115,18 +115,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqOffsetResetToData(pOffsetVal, 0, 0); } } else { - pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); - if (pHandle->pRef == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - // offset set to previous version when init + walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1); } } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - // offset set to previous version when init - tqOffsetResetToLog(pOffsetVal, walGetLastVer(pTq->pVnode->pWal)); + walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); + tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer); } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fb0b9957f8..a73deffa52 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1080,9 +1080,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT const char* id = GET_TASKID(pTaskInfo); // if pOffset equal to current offset, means continue consume - char buf[80] = {0}; - tFormatOffset(buf, 80, &pTaskInfo->streamInfo.currentOffset); - qDebug("currentOffset:%s", buf); if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) { return 0; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 6aba661926..eb36389f1d 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -63,21 +63,22 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { return 0; } -SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { - if (pRef == NULL) { - pRef = walOpenRef(pWal); - if (pRef == NULL) { - return NULL; - } - } +void walRefFirstVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexLock(&pWal->mutex); int64_t ver = walGetFirstVer(pWal); pRef->refVer = ver; taosThreadMutexUnlock(&pWal->mutex); wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver); +} - return pRef; +void walRefLastVer(SWal *pWal, SWalRef *pRef) { + taosThreadMutexLock(&pWal->mutex); + int64_t ver = walGetLastVer(pWal); + pRef->refVer = ver; + + taosThreadMutexUnlock(&pWal->mutex); + wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver); } SWalRef *walRefCommittedVer(SWal *pWal) { -- GitLab