提交 143f39b6 编写于 作者: H Haojun Liao

fix(stream): set the correct end key of delete block.

上级 82ab8181
......@@ -1550,7 +1550,95 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
}
}
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
ASSERT(pCol->pData != NULL);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
p[i] = (*ts <= pWindow->ekey);
if (!p[i]) {
hasUnqualified = true;
}
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, STimeWindow* pWindow, const char* id) {
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
int64_t skey = pWindow->skey;
int64_t ekey = pWindow->ekey;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
if (pWindow->skey != INT64_MIN) {
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
} else if (pWindow->ekey != INT64_MAX) {
for(int32_t i = 0; i < numOfRows; ++i) {
if (tsEndCol[i] > ekey) {
tsEndCol[i] = ekey;
}
if (tsStartCol[i] <= ekey) {
p[i] = true;
} else {
hasUnqualified = true;
}
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
} else {
qDebug("%s not update the delete block", id);
}
taosMemoryFree(p);
}
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册