From 222e925644eeb77a977301e7b342f4bbdf6d17a4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Jul 2022 15:31:36 +0800 Subject: [PATCH] feat(wal): ref --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/tq/tqMeta.c | 11 +++++++++-- source/libs/wal/src/walRead.c | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 740cfba9e3..118e3a5d43 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -385,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { - int64_t fetchVer = fetchOffsetNew.version + 1; - SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + int64_t fetchVer = fetchOffsetNew.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { code = -1; goto OVER; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 620417016f..290ffe5c8d 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) { ASSERT(0); } - TXN txn; + TXN txn = {0}; if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { ASSERT(0); @@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) { STqHandle handle; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSTqHandle(&decoder, &handle); - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + + handle.pRef = walOpenRef(pTq->pVnode->pWal); + if (handle.pRef == NULL) { + ASSERT(0); + } + walRefVer(handle.pRef, handle.snapshotVer); + if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SReadHandle reader = { .meta = pTq->pVnode->pMeta, @@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) { handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader); } else { + handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 37d7a8c3a9..ac62b7d98d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -54,7 +54,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { } /*if (pReader->cond.enableRef) {*/ - /*taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/ + /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/ /*}*/ return pReader; -- GitLab