未验证 提交 087d54a8 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22198 from taosdata/fix/3_liaohj

fix(stream): do filter before the update check.
...@@ -1322,13 +1322,14 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1322,13 +1322,14 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x", pTask->id.idStr, req.downstreamId);
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) { if (remain > 0) {
tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain); tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp",
pTask->id.idStr, req.downstreamId, remain);
} else { } else {
tqDebug("s-task:%s all downstream tasks rsp scan-history completed msg", pTask->id.idStr); tqDebug(
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg", pTask->id.idStr, req.downstreamId);
streamProcessScanHistoryFinishRsp(pTask); streamProcessScanHistoryFinishRsp(pTask);
} }
......
...@@ -2569,9 +2569,18 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2569,9 +2569,18 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pScanInfo == NULL) {
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
}
continue;
}
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2580,9 +2589,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2580,9 +2589,6 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
// reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) { if (!hasDataInLastFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
......
...@@ -2155,15 +2155,14 @@ FETCH_NEXT_BLOCK: ...@@ -2155,15 +2155,14 @@ FETCH_NEXT_BLOCK:
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
// apply additional time window filter // apply additional time window filter
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) { if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册