提交 5161eca7 编写于 作者: L Liu Jicong

fix(stream): stop scan when no output

上级 e2005cfa
......@@ -47,7 +47,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
......
......@@ -1981,9 +1981,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pBlock != NULL) {
calBlockTbName(pInfo, pBlock);
if (pInfo->pUpdateInfo) {
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
}
qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
printDataBlock(pBlock, "scan recover");
return pBlock;
}
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
......
......@@ -91,6 +91,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
void* exec = pTask->exec.executor;
qSetStreamOpOpen(exec);
bool finished = false;
while (1) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
......@@ -106,7 +107,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(0);
}
if (output == NULL) break;
if (output == NULL) {
finished = true;
break;
}
SSDataBlock block = {0};
assignOneDataBlock(&block, output);
......@@ -133,6 +137,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask);
}
if (finished) break;
}
return 0;
}
......
......@@ -163,9 +163,9 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
return false;
}
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
if (pBlock == NULL || pBlock->info.rows == 0) return;
TSKEY maxTs = -1;
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN;
TSKEY maxTs = INT64_MIN;
int64_t tbUid = pBlock->info.uid;
SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);
......@@ -186,6 +186,7 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr
if (pMaxTs == NULL || *pMaxTs > maxTs) {
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
}
return maxTs;
}
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册