diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 814c7c060cbd463dd456c65ee358d30de29b8926..7d730e31e181ab0a1b7a61b1f4149958ebe987aa 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -402,8 +402,6 @@ typedef struct SStreamScanInfo { uint64_t numOfExec; // execution times STqReader* tqReader; - int32_t tsArrayIndex; - SArray* tsArray; uint64_t groupId; SUpdateInfo* pUpdateInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f3917dfeb1170f314f23633ed764a5e8b486def6..d493adbea31d8f265fb0317043dc2467fca2fa33 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1247,30 +1247,38 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { qDebug("scan mode %d", pInfo->scanMode); - if (pInfo->scanMode == STREAM_SCAN_FROM_RES) { - blockDataDestroy(pInfo->pUpdateRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - return pInfo->pRes; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - return pInfo->pUpdateRes; - } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE || pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) { - SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - if (pSDB) { - pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; - checkUpdateData(pInfo, true, pSDB, false); - return pSDB; - } - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + switch (pInfo->scanMode) { + case STREAM_SCAN_FROM_RES: { + blockDataDestroy(pInfo->pUpdateRes); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + return pInfo->pRes; + } break; + case STREAM_SCAN_FROM_UPDATERES: { + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + return pInfo->pUpdateRes; + } break; + case STREAM_SCAN_FROM_DATAREADER_RANGE: + case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { + SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + if (pSDB) { + pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; + checkUpdateData(pInfo, true, pSDB, false); + return pSDB; + } + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } break; + default: + break; } - if (isStateWindow(pInfo) && pInfo->sessionSup.pStreamAggSup->pScanBlock->info.rows > 0) { + SStreamAggSupporter* pSup = pInfo->sessionSup.pStreamAggSup; + if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->updateResIndex = 0; - copyDataBlock(pInfo->pUpdateRes, pInfo->sessionSup.pStreamAggSup->pScanBlock); - blockDataCleanup(pInfo->sessionSup.pStreamAggSup->pScanBlock); + copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock); + blockDataCleanup(pSup->pScanBlock); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); return pInfo->pUpdateRes; } @@ -1329,7 +1337,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); /*pOperator->status = OP_EXEC_DONE;*/ } else if (pInfo->pUpdateInfo) { - pInfo->tsArrayIndex = 0; checkUpdateData(pInfo, true, pInfo->pRes, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey); if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -1387,7 +1394,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { #if 1 if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; - destroyTableScanOperatorInfo(pTableScanInfo, 1); + destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput); } #endif if (pStreamScan->tqReader) { @@ -1401,8 +1408,8 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pPullDataRes); blockDataDestroy(pStreamScan->pDeleteDataRes); + blockDataDestroy(pStreamScan->pUpdateDataRes); taosArrayDestroy(pStreamScan->pBlockLists); - taosArrayDestroy(pStreamScan->tsArray); taosMemoryFree(pStreamScan); } @@ -1444,11 +1451,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } - pInfo->tsArray = taosArrayInit(4, sizeof(int32_t)); - if (pInfo->tsArray == NULL) { - goto _error; - } - if (pHandle->vnode) { SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 19abb88df7ff5405044b3c77818109ad1eecaebf..025806e11730c5226575c628a9f6c6695a26fa79 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1648,6 +1648,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { } } nodesDestroyNode((SNode*)pInfo->pPhyNode); + colDataDestroy(&pInfo->twAggSup.timeWindowData); taosMemoryFreeClear(param); } @@ -2934,9 +2935,10 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) { pBlock->info.groupId = 0; pBlock->info.rows = 0; pBlock->info.type = type; - pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t); + pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY); - pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData)); + pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData)); SColumnInfoData infoData = {0}; infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; infoData.info.bytes = sizeof(TSKEY); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index b7b635e28f8a30f336ddda86a5b96c2e0be138c2..f2a5ba0ab53ce26bbf39db0426830de253e0732b 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -207,6 +207,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) { } taosArrayDestroy(pInfo->pTsSBFs); + taosHashCleanup(pInfo->pMap); taosMemoryFree(pInfo); }