diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 18d35ecc56198a2e5162772b93f6c92aa53c74ff..caba10834bd815db3247206929b477a3445d9b1f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -912,6 +912,15 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); + +int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, + SNode* pTagCond); +int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId, SNode* pTagCond); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, + SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); + void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); #ifdef __cplusplus diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 53477eb56f1101edad03073bb815e8275ef24c7b..7d1dbdbcf3689e17767bda2fbfb28c0a6b350e25 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4556,8 +4556,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); -static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, - SNode* pTagCond); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); @@ -4691,19 +4689,30 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); - code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json + code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json taosArrayDestroy(groupKeys); - if (code){ + if (code) { tsdbCleanupReadHandle(pDataReader); return NULL; } - SOperatorInfo* pOperator = - createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); - + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; + return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { + STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; + SArray* dataReaders = taosArrayInit(8, POINTER_BYTES); + createMultipleDataReaders(pTableScanNode, pHandle, pTableListInfo, dataReaders, queryId, taskId, pTagCond); + extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); + SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionKeys); + generateGroupIdMap(pTableListInfo, pHandle, groupKeys); //todo for json + taosArrayDestroy(groupKeys); + SOperatorInfo* pOperator = + createTableMergeScanOperatorInfo(pTableScanNode, dataReaders, pHandle, pTaskInfo); + STableScanInfo* pScanInfo = pOperator->info; + pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 571e9ae1203980130d8ac058e28b2dcc1cf8b5b6..0823594858f49721db4b4f1e2958ae9ea4ea4e6c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -630,7 +630,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); - char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); @@ -642,7 +642,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { } static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { - SBlockDistInfo* pDistInfo = (SBlockDistInfo*) param; + SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); } @@ -654,24 +654,25 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* goto _error; } - pInfo->pHandle = dataReader; + pInfo->pHandle = dataReader; pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_VARCHAR; - infoData.info.bytes = 1024; + infoData.info.type = TSDB_DATA_TYPE_VARCHAR; + infoData.info.bytes = 1024; taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); - pOperator->name = "DataBlockInfoScanOperator"; + pOperator->name = "DataBlockInfoScanOperator"; // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, + destroyBlockDistScanOperatorInfo, NULL, NULL, NULL); return pOperator; _error: @@ -890,7 +891,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { return NULL; } - int32_t current = pInfo->validBlockIndex++; + int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); blockDataUpdateTsWindow(pBlock, 0); return pBlock; @@ -1058,7 +1059,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; @@ -1512,7 +1513,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { pRsp->numOfRows, pInfo->loadInfo.totalRows); if (pRsp->numOfRows == 0) { - taosMemoryFree(pRsp); return NULL; } @@ -1849,3 +1849,498 @@ _error: terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + +typedef struct STableMergeScanInfo { + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; + + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + + SArray* pSortInfo; + SSortHandle* pSortHandle; + + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; + + SArray* sortSourceParams; + + SFileBlockLoadRecorder readRecorder; + int64_t numOfRows; + // int32_t prevGroupId; // previous table group id + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowCellInfoOffset; + SExprInfo* pExpr; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + int32_t numOfOutput; + + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SqlFunctionCtx* pPseudoCtx; + // int32_t* rowCellInfoOffset; + + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time + // window to check if current data block needs to be loaded. + + SSampleExecInfo sample; // sample execution info + int32_t curTWinIdx; + +} STableMergeScanInfo; + +int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SArray* arrayReader, uint64_t queryId, + uint64_t taskId, SNode* pTagCond) { + int32_t code = + getTableList(pHandle->meta, pTableScanNode->scan.tableType, pTableScanNode->scan.uid, pTableListInfo, pTagCond); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); + goto _error; + } + + SQueryTableDataCond cond = {0}; + code = initQueryTableDataCond(&cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + // TODO: free the sublist info and the table list in it + for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); ++i) { + STableListInfo* subListInfo = taosMemoryCalloc(1, sizeof(subListInfo)); + subListInfo->pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(subListInfo->pTableList, taosArrayGet(pTableListInfo->pTableList, i)); + + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, subListInfo, queryId, taskId); + taosArrayPush(arrayReader, &pReader); + + taosArrayDestroy(subListInfo->pTableList); + taosMemoryFree(subListInfo); + } + clearupQueryTableDataCond(&cond); + + return 0; + +_error: + return code; +} + +static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, + int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + + *status = pInfo->dataBlockLoadFlag; + if (pTableScanInfo->pFilterNode != NULL || + overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SDataBlockInfo* pBlockInfo = &pBlock->info; + taosMemoryFreeClear(pBlock->pBlockAgg); + + if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->filterOutBlocks += 1; + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + // clear all data in pBlock that are set when handing the previous block + for (int32_t i = 0; i < pBlockInfo->numOfCols; ++i) { + SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); + pcol->pData = NULL; + } + + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + pCost->loadBlockStatis += 1; + + bool allColumnsHaveAgg = true; + SColumnDataAgg** pColAgg = NULL; + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + tsdbRetrieveDataBlockStatisInfo(reader, &pColAgg, &allColumnsHaveAgg); + + if (allColumnsHaveAgg == true) { + int32_t numOfCols = pBlock->info.numOfCols; + + // todo create this buffer during creating operator + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + } + + return TSDB_CODE_SUCCESS; + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + *status = FUNC_DATA_REQUIRED_DATA_LOAD; + } + } + + ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); + + // todo filter data block according to the block sma data firstly +#if 0 + if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, + pBlockInfo->window.ekey, pBlockInfo->rows); + (*status) = FUNC_DATA_REQUIRED_FILTEROUT; + return TSDB_CODE_SUCCESS; + } +#endif + + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); + if (pCols == NULL) { + return terrno; + } + + relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); + + // currently only the tbname pseudo column + if (pTableScanInfo->numOfPseudoExpr > 0) { + addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, + pBlock); + } + + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock); + + int64_t et = taosGetTimestampMs(); + pTableScanInfo->readRecorder.filterTime += (et - st); + + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + } + + return TSDB_CODE_SUCCESS; +} + +typedef struct STableMergeScanSortSourceParam { + SOperatorInfo* pOperator; + int32_t readerIdx; + SSDataBlock* inputBlock; +} STableMergeScanSortSourceParam; + +static SSDataBlock* getTableDataBlock(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + int32_t readerIdx = source->readerIdx; + SSDataBlock* pBlock = source->inputBlock; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + + int64_t st = taosGetTimestampUs(); + + blockDataCleanup(pBlock); + + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + while (tsdbNextDataBlock(reader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + tsdbRetrieveDataBlockInfo(reader, &pBlock->info); + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); + // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + return pBlock; + } + return NULL; +} + +SArray* generateSortByTsInfo(int32_t order) { + SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo)); + SBlockOrderInfo bi = {0}; + bi.order = order; + bi.slotId = 0; + bi.nullFirst = NULL_ORDER_FIRST; + + taosArrayPush(pList, &bi); + + return pList; +} + +int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + + pInfo->pSortHandle = + tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); + + size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + for (int32_t i = 0; i < numReaders; ++i) { + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); + ps->param = param; + tsortAddSource(pInfo->pSortHandle, ps); + } + + int32_t code = tsortOpen(pInfo->pSortHandle); + + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + STupleHandle* pTupleHandle = NULL; + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->groupId = tsortGetGroupId(pTupleHandle); + pInfo->prefetchedTuple = NULL; + } + + if (pTupleHandle == NULL) { + break; + } + + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } + + if (p->info.rows >= capacity) { + break; + } + } + + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); + return (p->info.rows > 0) ? p : NULL; +} + +SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + SSDataBlock* pBlock = + getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); + + if (pBlock != NULL) { + pOperator->resultInfo.totalRows += pBlock->info.rows; + } else { + doSetOperatorCompleted(pOperator); + } + return pBlock; +} + +void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { + STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; + clearupQueryTableDataCond(&pTableScanInfo->cond); + + for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { + tsdbReaderT* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); + tsdbCleanupReadHandle(reader); + } + taosArrayDestroy(pTableScanInfo->dataReaders); + + if (pTableScanInfo->pColMatchInfo != NULL) { + taosArrayDestroy(pTableScanInfo->pColMatchInfo); + } + + taosArrayDestroy(pTableScanInfo->sortSourceParams); + pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); + pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); + + taosArrayDestroy(pTableScanInfo->pSortInfo); +} + +typedef struct STableMergeScanExecInfo { + SFileBlockLoadRecorder blockRecorder; + SSortExecInfo sortExecInfo; +} STableMergeScanExecInfo; + +int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + ASSERT(pOptr != NULL); + // TODO: merge these two info into one struct + STableMergeScanExecInfo* execInfo = taosMemoryCalloc(1, sizeof(STableMergeScanExecInfo)); + STableMergeScanInfo* pInfo = pOptr->info; + execInfo->blockRecorder = pInfo->readRecorder; + execInfo->sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); + + *pOptrExplain = execInfo; + *len = sizeof(STableMergeScanExecInfo); + + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SArray* dataReaders, + SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; + + int32_t numOfCols = 0; + SArray* pColList = + extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID); + + int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pTableScanNode->scan.pScanPseudoCols != NULL) { + pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); + pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); + } + + pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + + pInfo->readHandle = *readHandle; + pInfo->interval = extractIntervalInfo(pTableScanNode); + pInfo->sample.sampleRatio = pTableScanNode->ratio; + pInfo->sample.seed = taosGetTimestampSec(); + + pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + pInfo->dataReaders = dataReaders; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColList; + pInfo->curTWinIdx = 0; + + pInfo->pResBlock = createResDataBlock(pDescNode); + + pInfo->sortSourceParams = taosArrayInit(taosArrayGetSize(dataReaders), sizeof(STableMergeScanSortSourceParam)); + for (int32_t i = 0; i < taosArrayGetSize(dataReaders); ++i) { + STableMergeScanSortSourceParam* param = taosMemoryCalloc(1, sizeof(STableMergeScanSortSourceParam)); + param->readerIdx = i; + param->pOperator = pOperator; + param->inputBlock = createOneDataBlock(pInfo->pResBlock, false); + taosArrayPush(pInfo->sortSourceParams, param); + taosMemoryFree(param); + } + pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); + pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); + int32_t rowSize = pInfo->pResBlock->info.rowSize; + pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; + pInfo->sortBufSize = pInfo->bufPageSize * 16; + pInfo->hasGroupId = false; + pInfo->prefetchedTuple = NULL; + + pOperator->name = "TableMergeScanOperator"; + // TODO : change it + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = numOfCols; + pOperator->pTaskInfo = pTaskInfo; + initResultSizeInfo(pOperator, 1024); + + pOperator->fpSet = + createOperatorFpSet(doOpenTableMergeScanOperator, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, + NULL, NULL, getTableMergeScanExplainExecInfo); + pOperator->cost.openCost = 0; + return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index ff093741fabedb124aa7b68eb09b21018f99d101..1008a5263cac638f35e5f8f80f368f16d30e1164 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -76,7 +76,9 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { colDataAppendNULL(pColInfo, pBlock->info.rows); } else { char* pData = tsortGetValue(pTupleHandle, i); - colDataAppend(pColInfo, pBlock->info.rows, pData, false); + if (pData != NULL) { + colDataAppend(pColInfo, pBlock->info.rows, pData, false); + } } } @@ -427,7 +429,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->pInputBlock = pInputBlock; pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; - pOperator->blocking = true; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index f63a9351c6e24f95a0f47a779af18638d7bfbcdc..1473bd81bb23d1384fe60356573d6a69cb4e9f70 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -738,7 +738,11 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) { void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex); - return colDataGetData(pColInfo, pVHandle->rowIndex); + if (pColInfo->pData == NULL) { + return NULL; + } else { + return colDataGetData(pColInfo, pVHandle->rowIndex); + } } uint64_t tsortGetGroupId(STupleHandle* pVHandle) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 4a0348151b8cbaf64a66886ef7a7d5f5855584e1..08e693da71abb864bb0a27e68d8e33291a4fd9e1 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -470,8 +470,8 @@ static ENodeType getScanOperatorType(EScanType scanType) { case SCAN_TYPE_STREAM: return QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; case SCAN_TYPE_TABLE_MERGE: - return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - // return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; + // return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; default: break; }