提交 4d6b9d4a 编写于 作者: H Haojun Liao

fix(stream): split delete msg for real-time scan from wal.

上级 06367372
......@@ -1774,6 +1774,67 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
}
}
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// re-build the delete block, ONLY according to the split timestamp
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
if (skey == INT64_MIN) {
return;
}
int32_t numOfRows = pBlock->info.rows;
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
for (int32_t i = 0; i < numOfRows; i++) {
if (tsStartCol[i] < skey) {
tsStartCol[i] = skey;
}
if (tsEndCol[i] >= skey) {
p[i] = true;
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
taosMemoryFree(p);
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -1922,6 +1983,7 @@ FETCH_NEXT_BLOCK:
if (pInfo->pUpdateInfo) {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
}
blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
case STREAM_NORMAL:
......@@ -1944,7 +2006,9 @@ FETCH_NEXT_BLOCK:
} else {
pDelBlock = pBlock;
}
setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered");
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
......@@ -1952,6 +2016,7 @@ FETCH_NEXT_BLOCK:
}
goto FETCH_NEXT_BLOCK;
}
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
......@@ -2093,39 +2158,15 @@ FETCH_NEXT_BLOCK:
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
{ // do additional time window filter
STimeWindow* pWindow = &pStreamInfo->fillHistoryWindow;
if (pWindow->skey != INT64_MIN) {
qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey);
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
bool hasUnqualified = false;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
for(int32_t i = 0; i < pBlock->info.rows; ++i) {
int64_t* ts = (int64_t*) colDataGetData(pCol, i);
p[i] = (*ts >= pWindow->skey);
if (!p[i]) {
hasUnqualified = true;
}
}
if (hasUnqualified) {
trimDataBlock(pBlock, pBlock->info.rows, p);
}
taosMemoryFree(p);
}
}
// apply additional time window filter
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows,
pInfo->pUpdateDataRes->info.rows);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {
break;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册