From 095f6aa4e08667f799fca651b3f11872984928b7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 23 Jul 2022 20:07:16 +0800 Subject: [PATCH] fix(tmq): correctly set reader status --- include/libs/wal/wal.h | 1 + source/dnode/vnode/src/tq/tq.c | 8 ++++++++ source/dnode/vnode/src/tq/tqExec.c | 11 ++++++++++- source/dnode/vnode/src/tq/tqRead.c | 5 ++++- source/libs/executor/src/scanoperator.c | 19 +++++++++++-------- source/libs/wal/src/walRead.c | 10 ++++++++-- 6 files changed, 42 insertions(+), 12 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index ad89e51a24..907b3be560 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -135,6 +135,7 @@ typedef struct { int64_t curVersion; int64_t capacity; int8_t curInvalid; + int8_t curStopped; TdThreadMutex mutex; SWalFilterCond cond; SWalCkHead *pHead; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9f80bc50a4..daad7c8bae 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -138,6 +138,14 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0); } + if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { + if (pRsp->blockNum > 0) { + ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + } else { + ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + } + } + int32_t len; int32_t code; tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index d8851c3775..40dbbda603 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -65,12 +65,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa qTaskInfo_t task = pExec->execCol.task; if (qStreamPrepareScan(task, pOffset) < 0) { + tqDebug("prepare scan failed, return"); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; return 0; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); if (qStreamPrepareScan(task, pOffset) < 0) { + tqDebug("prepare scan failed, return"); pRsp->rspOffset = *pOffset; return 0; } @@ -126,9 +128,16 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ASSERT(pRsp->rspOffset.type != 0); +#if 0 if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version); + if (pRsp->blockNum > 0) { + ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version); + } else { + ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version); + } } +#endif + tqDebug("task exec exited"); break; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 821b48275a..6f0b5af4f6 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -132,10 +132,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (1) { if (!fromProcessedMsg) { if (walNextValidMsg(pReader->pWalReader) < 0) { - pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curInvalid; + pReader->ver = + pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped); ret->offset.type = TMQ_OFFSET__LOG; ret->offset.version = pReader->ver; ret->fetchType = FETCH_TYPE__NONE; + tqDebug("return offset %ld, no more valid", ret->offset.version); ASSERT(ret->offset.version >= 0); return -1; } @@ -167,6 +169,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->offset.version = pReader->ver; ASSERT(pReader->ver >= 0); ret->fetchType = FETCH_TYPE__NONE; + tqDebug("return offset %ld, processed finish", ret->offset.version); return 0; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d493adbea3..8985346c6e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -946,7 +946,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr } blockDataCleanup(pDestBlock); int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS) { return code; } ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3); @@ -994,16 +994,16 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); - uint64_t* uidCol = (uint64_t*)pUidCol->pData; + uint64_t* uidCol = (uint64_t*)pUidCol->pData; ASSERT(pTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - TSKEY* tsCol = (TSKEY*)pTsCol->pData; + TSKEY* tsCol = (TSKEY*)pTsCol->pData; SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[0]); - for (int32_t i = 0; i < rows; ) { + uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[0]); + for (int32_t i = 0; i < rows;) { colDataAppend(pCalStartTsCol, pDestBlock->info.rows, (const char*)(tsCol + i), false); STimeWindow win = getSlidingWindow(tsCol, &pInfo->interval, &pSrcBlock->info, &i); colDataAppend(pCalEndTsCol, pDestBlock->info.rows, (const char*)(tsCol + i - 1), false); @@ -1167,8 +1167,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return NULL; } else if (ret.fetchType == FETCH_TYPE__NONE) { pTaskInfo->streamInfo.lastStatus = ret.offset; - ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 >= pTaskInfo->streamInfo.prepareStatus.version); - qDebug("stream scan log return null"); + ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); + ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); + char formatBuf[80]; + tFormatOffset(formatBuf, 80, &ret.offset); + qDebug("stream scan log return null, offset %s", formatBuf); return NULL; } else { ASSERT(0); @@ -1272,7 +1275,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { default: break; } - + SStreamAggSupporter* pSup = pInfo->sessionSup.pStreamAggSup; if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 5bc9cdafa2..6d0e844e8e 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -21,7 +21,7 @@ static int32_t walFetchBodyNew(SWalReader *pRead); static int32_t walSkipFetchBodyNew(SWalReader *pRead); SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { - SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader)); + SWalReader *pRead = taosMemoryCalloc(1, sizeof(SWalReader)); if (pRead == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -75,6 +75,7 @@ int32_t walNextValidMsg(SWalReader *pRead) { wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld", pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + pRead->curStopped = 0; while (fetchVer <= endVer) { if (walFetchHeadNew(pRead, fetchVer) < 0) { return -1; @@ -93,6 +94,7 @@ int32_t walNextValidMsg(SWalReader *pRead) { ASSERT(fetchVer == pRead->curVersion); } } + pRead->curStopped = 1; return -1; } @@ -221,6 +223,8 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { int64_t contLen; bool seeked = false; + wDebug("vgId:%d, wal starts to fetch head %d", pRead->pWal->cfg.vgId, fetchVer); + if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (walReadSeekVer(pRead, fetchVer) < 0) { ASSERT(0); @@ -257,6 +261,8 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { SWalCont *pReadHead = &pRead->pHead->head; int64_t ver = pReadHead->version; + wDebug("vgId:%d, wal starts to fetch body %ld", pRead->pWal->cfg.vgId, ver); + if (pRead->capacity < pReadHead->bodyLen) { void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen); if (ptr == NULL) { @@ -300,8 +306,8 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { return -1; } + wDebug("version %ld is fetched, cursor advance", ver); pRead->curVersion = ver + 1; - wDebug("version advance to %ld, fetch body", pRead->curVersion); return 0; } -- GitLab