diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ab60acab53af3b7457f503637aa6096ecee64ba1..e8dab421292f6d50aada7bbaf1cc27a042a44f69 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -907,7 +907,8 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); - +int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, + SNode* pTagCond); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 99a9dcb6afeba42b712f8f92b925c9d0ed6719a0..f5d813f2bf03ce1b6dd317e3ea2823bd0ca1da97 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4556,8 +4556,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); -static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, - SNode* pTagCond); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d30e4ef6db5c6beabcd1251efea8d36b7b7f267b..3ad5465d674dc40624de64616b851d248cd07f18 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -630,7 +630,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); - char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); @@ -642,7 +642,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { } static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { - SBlockDistInfo* pDistInfo = (SBlockDistInfo*) param; + SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); } @@ -654,24 +654,25 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* goto _error; } - pInfo->pHandle = dataReader; + pInfo->pHandle = dataReader; pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_VARCHAR; - infoData.info.bytes = 1024; + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = 1024; taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); - pOperator->name = "DataBlockInfoScanOperator"; + pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, + destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); return pOperator; _error: @@ -823,7 +824,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return NULL; } - int32_t current = pInfo->validBlockIndex++; + int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); blockDataUpdateTsWindow(pBlock, 0); return pBlock; @@ -984,7 +985,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; @@ -1436,7 +1437,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pRsp->numOfRows, pInfo->loadInfo.totalRows); if (pRsp->numOfRows == 0) { - taosMemoryFree(pRsp); return NULL; } @@ -1773,3 +1773,498 @@ _error: terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + +typedef struct STableMergeScanInfo { + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; + + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + + SArray* pSortInfo; + SSortHandle* pSortHandle; + + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; + + SArray* sortSourceParams; + + SFileBlockLoadRecorder readRecorder; + int64_t numOfRows; + // int32_t prevGroupId; // previous table group id + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowCellInfoOffset; + SExprInfo* pExpr; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + int32_t numOfOutput; + + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SqlFunctionCtx* pPseudoCtx; + // int32_t* rowCellInfoOffset; + + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time + // window to check if current data block needs to be loaded. + + SSampleExecInfo sample; // sample execution info + int32_t curTWinIdx; + +} STableMergeScanInfo; + +int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId, SNode* pTagCond) { + int32_t code = + getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); + goto _error; + } + + SQueryTableDataCond cond = {0}; + code = initQueryTableDataCond(&cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) { + STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo)); + subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i)); + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + taosArrayPush(arrayReader, &pReader); + } + clearupQueryTableDataCond(&cond); + + return 0; + +_error: + return code; +} + +static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, + int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableScanInfo* pInfo = pOperator->info; + + SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + + *status = pInfo->dataBlockLoadFlag; + if (pTableScanInfo->pFilterNode != NULL || + overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SDataBlockInfo* pBlockInfo = &pBlock->info; + taosMemoryFreeClear(pBlock->pBlockAgg); + + if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->filterOutBlocks += 1; + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + // clear all data in pBlock that are set when handing the previous block + for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) { + SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); + pcol->pData = NULL; + } + + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + pCost->loadBlockStatis += 1; + + bool allColumnsHaveAgg = true; + SColumnDataAgg** pColAgg = NULL; + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg); + + if (allColumnsHaveAgg == true) { + int32_t numOfCols = pBlock->info.numOfCols; + + // todo create this buffer during creating operator + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + } + + return TSDB_CODE_SUCCESS; + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + *status = FUNC_DATA_REQUIRED_DATA_LOAD; + } + } + + ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); + + // todo filter data block according to the block sma data firstly +#if 0 + if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, + pBlockInfo->window.ekey, pBlockInfo->rows); + (*status) = FUNC_DATA_REQUIRED_FILTEROUT; + return TSDB_CODE_SUCCESS; + } +#endif + + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); + if (pCols == NULL) { + return terrno; + } + + relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + + // currently only the tbname pseudo column + if (pTableScanInfo->numOfPseudoExpr > 0) { + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, + pBlock); + } + + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock); + + int64_t et = taosGetTimestampMs(); + pTableScanInfo->readRecorder.filterTime += (et - st); + + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + } + + return TSDB_CODE_SUCCESS; +} + +typedef struct STableMergeScanSortSourceParam { + SOperatorInfo* pOperator; + int32_t readerIdx; +} STableMergeScanSortSourceParam; + +static SSDataBlock* getTableDataBlock(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + int32_t readerIdx = source->readerIdx; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + SSDataBlock* pBlock = pTableScanInfo->pResBlock; + + int64_t st = taosGetTimestampUs(); + + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + while (tsdbNextDataBlock(reader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + tsdbRetrieveDataBlockInfo(reader, &pBlock->info); + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); + // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; + return pBlock; + } + return NULL; +} + +SArray* generateSortByTsInfo(int32_t order) { + SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); + SBlockOrderInfo bi = {0}; + bi.order = order; + bi.slotId = 0; + bi.nullFirst = NULL_ORDER_FIRST; + + taosArrayPush(pList, &bi); + + return pList; +} + +int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + pInfo->startTs = taosGetTimestampUs(); + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + + pInfo->pSortHandle = + tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); + + size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + for (int32_t i = 0; i < numReaders; ++i) { + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + param->readerIdx = i; + param->pOperator = pOperator; + ps->param = param; + tsortAddSource(pInfo->pSortHandle, ps); + } + + int32_t code = tsortOpen(pInfo->pSortHandle); + + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + blockDataCleanup(pDataBlock); + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + STupleHandle* pTupleHandle = NULL; + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->groupId = tsortGetGroupId(pTupleHandle); + pInfo->prefetchedTuple = NULL; + } + + if (pTupleHandle == NULL) { + break; + } + + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } + + if (p->info.rows >= capacity) { + break; + } + } + + blockDataDestroy(p); + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.rows); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + +SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + SSDataBlock* pBlock = getSortedTableMergeScanBlockData( + pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); + + if (pBlock != NULL) { + pOperator->resultInfo.totalRows += pBlock->info.rows; + } else { + doSetOperatorCompleted(pOperator); + } + return pBlock; +} + +void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { + STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; + blockDataDestroy(pTableScanInfo->pResBlock); + clearupQueryTableDataCond(&pTableScanInfo->cond); + + for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); + tsdbCleanupReadHandle(reader); + } + taosArrayDestroy(pTableScanInfo->dataReaders); + + if (pTableScanInfo->pColMatchInfo != NULL) { + taosArrayDestroy(pTableScanInfo->pColMatchInfo); + } + + taosArrayDestroy(pTableScanInfo->sortSourceParams); + pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); + pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); + + taosArrayDestroy(pTableScanInfo->pSortInfo); +} + +int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + ASSERT(pOptr != NULL); + //TODO: merge these two info into one struct + SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); + STableScanInfo* pTableScanInfo = pOptr->info; + *pRecorder = pTableScanInfo->readRecorder; + *pOptrExplain = pRecorder; + *len = sizeof(SFileBlockLoadRecorder); + + SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); + + STableMergeScanInfo* pOperatorInfo = (STableMergeScanInfo*)pOptr->info; + + *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); + *pOptrExplain = pInfo; + *len = sizeof(SSortExecInfo); + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, + SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; + + int32_t numOfCols = 0; + SArray* pColList = + extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + + int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pTableScanNode->scan.pScanPseudoCols != NULL) { + pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); + } + + pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->sample.sampleRatio = pTableScanNode->ratio; + pInfo->sample.seed = taosGetTimestampSec(); + + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReaders = dataReaders; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; + + pInfo->pResBlock = createResDataBlock(pDescNode); + + pInfo->sortSourceParams = taosArrayInit(taosArrayGetSize(dataReaders), sizeof(STableMergeScanSortSourceParam)); + for (int32_t i = 0; i < taosArrayGetSize(dataReaders); ++i) { + STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + param->readerIdx = i; + param->pOperator = pOperator; + taosArrayPush(pInfo->sortSourceParams, param); + taosMemoryFree(param); + } + pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); + pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + int32_t rowSize = pInfo->pResBlock->info.rowSize; + pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; + pInfo->sortBufSize = pInfo->bufPageSize * 16; + pInfo->hasGroupId = false; + pInfo->prefetchedTuple = NULL; + + pOperator->name = "TableMergeScanOperator"; + // TODO : change it + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + initResultSizeInfo(pOperator, 1024); + + pOperator->fpSet = + createOperatorFpSet(doOpenTableMergeScanOperator, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, + NULL, NULL, getTableMergeScanExplainExecInfo); + pOperator->cost.openCost = 0; + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index ff093741fabedb124aa7b68eb09b21018f99d101..7b66b04ed3befe81d5c1283bcd982aad8c6e5e99 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -427,7 +427,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->pInputBlock = pInputBlock; pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = true; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo;