提交 e3e42cdf 编写于 作者: H Haojun Liao

fix(query): set update ts flag for stream.

上级 d7edcfd2
......@@ -358,6 +358,10 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSiz
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
if (pDataBlock->info.rows > 0) {
ASSERT(pDataBlock->info.dataLoad == 1);
}
if (pDataBlock == NULL || pDataBlock->info.rows <= 0 || pDataBlock->info.dataLoad == 0) {
return 0;
}
......
......@@ -533,6 +533,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
pBlock->info.id.uid = pReader->msgIter.uid;
pBlock->info.rows = pReader->msgIter.numOfRows;
pBlock->info.version = pReader->pMsg->version;
pBlock->info.dataLoad = 1;
while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row);
......
......@@ -510,6 +510,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
blockDataEnsureCapacity(pRes, pBlock->info.rows);
// data from mnode
pRes->info.dataLoad = 1;
pRes->info.rows = pBlock->info.rows;
relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
blockDataDestroy(pBlock);
......
......@@ -2546,6 +2546,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pState, &key, pRow);
}
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
return TSDB_CODE_SUCCESS;
}
......@@ -2635,6 +2636,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
}
}
pBlock->info.dataLoad = 1;
pBlock->info.rows += pRow->numOfRows;
// saveSessionDiscBuf(pState, pKey, pVal, size);
releaseOutputBuf(pState, NULL, pRow);
......
......@@ -1313,6 +1313,7 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
}
pDestBlock->info.type = STREAM_CLEAR;
pDestBlock->info.version = pSrcBlock->info.version;
pDestBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pDestBlock, 0);
return code;
}
......@@ -1421,6 +1422,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
}
if (out && pInfo->pUpdateDataRes->info.rows > 0) {
pInfo->pUpdateDataRes->info.version = pBlock->info.version;
pInfo->pUpdateDataRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pUpdateDataRes, 0);
pInfo->pUpdateDataRes->info.type = pInfo->partitionSup.needCalc ? STREAM_DELETE_DATA : STREAM_CLEAR;
}
......@@ -1483,6 +1485,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
blockDataFreeRes((SSDataBlock*)pBlock);
......@@ -1771,6 +1774,7 @@ FETCH_NEXT_BLOCK:
// TODO move into scan
pBlock->info.calWin.skey = INT64_MIN;
pBlock->info.calWin.ekey = INT64_MAX;
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) {
case STREAM_NORMAL:
......@@ -1948,6 +1952,7 @@ FETCH_NEXT_BLOCK:
}
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册