未验证 提交 9b29fa0c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18223 from taosdata/feature/stream

fix(stream): stop scan when no output
...@@ -47,7 +47,7 @@ typedef struct SUpdateInfo { ...@@ -47,7 +47,7 @@ typedef struct SUpdateInfo {
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
......
...@@ -1981,9 +1981,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1981,9 +1981,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pBlock != NULL) { if (pBlock != NULL) {
calBlockTbName(pInfo, pBlock); calBlockTbName(pInfo, pBlock);
if (pInfo->pUpdateInfo) { if (pInfo->pUpdateInfo) {
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
} }
qDebug("stream recover scan get block, rows %d", pBlock->info.rows); qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
printDataBlock(pBlock, "scan recover");
return pBlock; return pBlock;
} }
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
......
...@@ -91,6 +91,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -91,6 +91,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
qSetStreamOpOpen(exec); qSetStreamOpOpen(exec);
bool finished = false;
while (1) { while (1) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
...@@ -106,7 +107,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -106,7 +107,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (qExecTask(exec, &output, &ts) < 0) { if (qExecTask(exec, &output, &ts) < 0) {
ASSERT(0); ASSERT(0);
} }
if (output == NULL) break; if (output == NULL) {
finished = true;
break;
}
SSDataBlock block = {0}; SSDataBlock block = {0};
assignOneDataBlock(&block, output); assignOneDataBlock(&block, output);
...@@ -133,6 +137,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -133,6 +137,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask); streamDispatch(pTask);
} }
if (finished) break;
} }
return 0; return 0;
} }
......
...@@ -163,9 +163,9 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { ...@@ -163,9 +163,9 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
return false; return false;
} }
void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
if (pBlock == NULL || pBlock->info.rows == 0) return; if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN;
TSKEY maxTs = -1; TSKEY maxTs = INT64_MIN;
int64_t tbUid = pBlock->info.uid; int64_t tbUid = pBlock->info.uid;
SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);
...@@ -186,6 +186,7 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr ...@@ -186,6 +186,7 @@ void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t pr
if (pMaxTs == NULL || *pMaxTs > maxTs) { if (pMaxTs == NULL || *pMaxTs > maxTs) {
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
} }
return maxTs;
} }
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册