From 5200a4810704cd47b741c335fbc3b4343ea54986 Mon Sep 17 00:00:00 2001 From: jiacy-jcy <714897623@qq.com> Date: Tue, 19 Jul 2022 16:54:19 +0800 Subject: [PATCH] fix: update --- source/libs/executor/src/scanoperator.c | 4 +++- source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/streamData.c | 11 ++++++----- source/libs/stream/src/streamExec.c | 4 +++- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7ecd9645e1..698bdab71a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1431,9 +1431,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } if (pBlockInfo->rows > 0) { break; + } else { + pInfo->tqReader->pMsg = NULL; + return NULL; } /*blockDataCleanup(pInfo->pRes);*/ - pInfo->tqReader->pMsg = NULL; } // record the scan action. diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index d10ea76c83..093242c610 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); +SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); void streamFreeQitem(SStreamQueueItem* data); #ifdef __cplusplus diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 6989c36332..b28dba3472 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -83,6 +83,7 @@ SStreamMergedSubmit* streamMergedSubmitNew() { pMerged->reqs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(int32_t*)); if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL; + pMerged->type = STREAM_INPUT__MERGED_SUBMIT; return pMerged; FAIL: if (pMerged->reqs) taosArrayDestroy(pMerged->reqs); @@ -121,7 +122,7 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { } } -int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { +SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { ASSERT(elem); if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; @@ -129,13 +130,13 @@ int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); taosArrayDestroy(pBlockSrc->blocks); taosFreeQitem(elem); - return 0; + return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem; streamMergeSubmit(pMerged, pBlockSrc); taosFreeQitem(elem); - return 0; + return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); ASSERT(pMerged); @@ -143,9 +144,9 @@ int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem); taosFreeQitem(dst); taosFreeQitem(elem); - return 0; + return (SStreamQueueItem*)pMerged; } else { - return -1; + return NULL; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c49e6c9b6c..33d6762646 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -162,11 +162,13 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*}*/ } else { - if (streamAppendQueueItem(data, qItem) < 0) { + void* newRet; + if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) { streamQueueProcessFail(pTask->inputQueue); break; } else { cnt++; + data = newRet; /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ streamQueueProcessSuccess(pTask->inputQueue); } -- GitLab