diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 29217e29a4f9cf5b5a2f70db77a931b88eded27b..76f3c1b12d7e8e4ae27b38d3a3907ae933d7d904 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -479,6 +479,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); +#if 1 + +#endif if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a36307de93aac793fc0168fe1fbbdb9800f0f506..8d2c66ff903006ef62d11d1df93173c5599be7f4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -49,14 +49,19 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamScanInfo* pInfo = pOperator->info; +#if 0 // TODO: if a block was set but not consumed, // prevent setting a different type of block pInfo->validBlockIndex = 0; - /*if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) {*/ - /*taosArrayClearP(pInfo->pBlockLists, taosMemoryFree);*/ - /*} else {*/ - taosArrayClear(pInfo->pBlockLists); - /*}*/ + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + taosArrayClearP(pInfo->pBlockLists, taosMemoryFree); + } else { + taosArrayClear(pInfo->pBlockLists); + } +#endif + + ASSERT(pInfo->validBlockIndex == 0); + ASSERT(taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { // ASSERT(numOfBlocks > 1); @@ -79,17 +84,15 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - taosArrayPush(pInfo->pBlockLists, &pDataBlock); -#if 0 // TODO optimize SSDataBlock* p = createOneDataBlock(pDataBlock, false); + /*qError("alloc p i, %d %p", i, p);*/ p->info = pDataBlock->info; taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); -#endif } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { @@ -103,6 +106,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu static FORCE_INLINE void streamInputBlockDataDestory(void* pBlock) { blockDataDestroy((SSDataBlock*)pBlock); } void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { +#if 0 SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) { return; @@ -119,6 +123,7 @@ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { } else { ASSERT(0); } +#endif } int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8dcc2cd39be48e6d7c3d2cc6e14ed9239286cbb5..dc9e0e8d451e6bc5539d49f78a0135399979990f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -920,6 +920,17 @@ _error: } static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { + if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { + size_t total = taosArrayGetSize(pInfo->pBlockLists); + for (int32_t i = 0; i < total; i++) { + SSDataBlock* p = taosArrayGetP(pInfo->pBlockLists, i); + taosArrayDestroy(p->pDataBlock); + taosMemoryFree(p); + } + } + taosArrayClear(pInfo->pBlockLists); + pInfo->validBlockIndex = 0; +#if 0 size_t total = taosArrayGetSize(pInfo->pBlockLists); pInfo->validBlockIndex = 0; @@ -928,6 +939,7 @@ static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { blockDataDestroy(p); } taosArrayClear(pInfo->pBlockLists); +#endif } static bool isSessionWindow(SStreamScanInfo* pInfo) { @@ -1576,9 +1588,10 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor +FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { - /*doClearBufferedBlocks(pInfo);*/ + doClearBufferedBlocks(pInfo); /*pOperator->status = OP_EXEC_DONE;*/ return NULL; } @@ -1613,7 +1626,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; printDataBlock(pDelBlock, "stream scan delete result"); - return pInfo->pDeleteDataRes; + if (pInfo->pDeleteDataRes->info.rows > 0) { + return pInfo->pDeleteDataRes; + } else { + goto FETCH_NEXT_BLOCK; + } } else { pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->updateResIndex = 0; @@ -1626,7 +1643,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->tqReader) { blockDataDestroy(pDelBlock); } - return pInfo->pDeleteDataRes; + if (pInfo->pDeleteDataRes->info.rows > 0) { + return pInfo->pDeleteDataRes; + } else { + goto FETCH_NEXT_BLOCK; + } } } break; default: @@ -1691,6 +1712,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { while (1) { if (pInfo->tqReader->pMsg == NULL) { if (pInfo->validBlockIndex >= totBlockNum) { + doClearBufferedBlocks(pInfo); return NULL; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index aed4ae9b2668e76d36f759696441dd42996b5a93..f10e5e33f1efc2013b36c3dc4ff2382a51503c9c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5655,7 +5655,6 @@ static void doStreamIntervalAggImpl2(SOperatorInfo* pOperatorInfo, SSDataBlock* TSKEY* tsCols = NULL; SResultRow* pResult = NULL; int32_t forwardRows = 0; - int32_t aa = 4; ASSERT(pSDataBlock->pDataBlock != NULL); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);