From 143f39b6f79b6c4105f7b2eaf44dc5fb5db7190a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Jul 2023 14:22:46 +0800 Subject: [PATCH] fix(stream): set the correct end key of delete block. --- source/libs/executor/src/scanoperator.c | 90 ++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3d5e4a7d5f..6e6231a7e7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1550,7 +1550,95 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } } -static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) { +static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { + if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); + + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); + + ASSERT(pCol->pData != NULL); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts <= pWindow->ekey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); + } +} + +// re-build the delete block, ONLY according to the split timestamp +static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) { + int32_t numOfRows = pBlock->info.rows; + bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + bool hasUnqualified = false; + int64_t skey = pWindow->skey; + int64_t ekey = pWindow->ekey; + + SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; + SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; + + if (pWindow->skey != INT64_MIN) { + for (int32_t i = 0; i < numOfRows; i++) { + if (tsStartCol[i] < skey) { + tsStartCol[i] = skey; + } + + if (tsEndCol[i] >= skey) { + p[i] = true; + } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + for(int32_t i = 0; i < numOfRows; ++i) { + if (tsEndCol[i] > ekey) { + tsEndCol[i] = ekey; + } + + if (tsStartCol[i] <= ekey) { + p[i] = true; + } else { + hasUnqualified = true; + } + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); + } else { + qDebug("%s not update the delete block", id); + } + + taosMemoryFree(p); +} + +static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; -- GitLab