From e654f3f92c50dd8ef8b446c9f556c143c36506df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Oct 2022 16:44:44 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executil.h | 17 ++- source/libs/executor/inc/executorimpl.h | 124 ++++++++----------- source/libs/executor/src/cachescanoperator.c | 44 +++---- source/libs/executor/src/dataDeleter.c | 16 --- source/libs/executor/src/dataDispatcher.c | 18 +-- source/libs/executor/src/executil.c | 36 +++--- source/libs/executor/src/executorimpl.c | 18 +-- source/libs/executor/src/scanoperator.c | 98 +++++++-------- source/libs/executor/src/sortoperator.c | 95 +++++++------- source/libs/executor/src/tfill.c | 11 +- 10 files changed, 215 insertions(+), 262 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 4e960afdb1..0cfef7dc24 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -74,7 +74,6 @@ typedef struct SResultRowPosition { typedef struct SResKeyPos { SResultRowPosition pos; uint64_t groupId; - // char parTbName[TSDB_TABLE_NAME_LEN]; char key[]; } SResKeyPos; @@ -84,6 +83,18 @@ typedef struct SResultRowInfo { SList* openWindow; } SResultRowInfo; +typedef struct SColMatchItem { + int32_t colId; + int32_t srcSlotId; + int32_t dstSlotId; + bool needOutput; +} SColMatchItem; + +typedef struct SColMatchInfo { + SArray* pList; // SArray + int32_t matchType; // determinate the source according to col id or slot id +} SColMatchInfo; + struct SqlFunctionCtx; size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); @@ -121,8 +132,8 @@ size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList); -SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type); +int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type, SColMatchInfo* pMatchInfo); void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5e387d2bd3..7755bd88db 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -47,12 +47,7 @@ extern "C" { typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); -#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) -#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) - -#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) - #define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0) #define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN) #define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0) @@ -240,7 +235,7 @@ typedef struct SOperatorInfo { typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_EXHAUSTED = 0x3, } EX_SOURCE_STATUS; @@ -289,15 +284,6 @@ typedef struct SExchangeInfo { SLimitInfo limitInfo; } SExchangeInfo; -typedef struct SColMatchInfo { - int32_t srcSlotId; // source slot id - int32_t colId; - int32_t targetSlotId; - bool output; // todo remove this? - bool reserved; - int32_t matchType; // determinate the source according to col id or slot id -} SColMatchInfo; - typedef struct SScanInfo { int32_t numOfAsc; int32_t numOfDesc; @@ -339,7 +325,7 @@ typedef struct STableScanInfo { SNode* pFilterNode; // filter info, which is push down by optimizer SSDataBlock* pResBlock; - SArray* pColMatchInfo; + SColMatchInfo matchInfo; SExprSupp pseudoSup; SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan @@ -365,10 +351,9 @@ typedef struct STableMergeScanInfo { uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo; SSortHandle* pSortHandle; - - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; @@ -380,43 +365,40 @@ typedef struct STableMergeScanInfo { int32_t* rowEntryInfoOffset; SExprInfo* pExpr; SSDataBlock* pResBlock; - SArray* pColMatchInfo; + SColMatchInfo matchInfo; int32_t numOfOutput; - - SExprSupp pseudoSup; - - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; + SExprSupp pseudoSup; + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; // if the upstream is an interval operator, the interval info is also kept here to get the time // window to check if current data block needs to be loaded. - SInterval interval; - SSampleExecInfo sample; // sample execution info - - SSortExecInfo sortExecInfo; + SInterval interval; + SSampleExecInfo sample; // sample execution info + SSortExecInfo sortExecInfo; } STableMergeScanInfo; typedef struct STagScanInfo { SColumnInfo* pCols; SSDataBlock* pRes; - SArray* pColMatchInfo; + SColMatchInfo matchInfo; int32_t curPos; SReadHandle readHandle; STableListInfo* pTableList; } STagScanInfo; typedef struct SLastrowScanInfo { - SSDataBlock* pRes; - SReadHandle readHandle; - void* pLastrowReader; - SArray* pColMatchInfo; - int32_t* pSlotIds; - SExprSupp pseudoExprSup; - int32_t retrieveType; - int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; - SArray* pUidList; - int32_t indexOfBufferedRes; + SSDataBlock* pRes; + SReadHandle readHandle; + void* pLastrowReader; + SColMatchInfo matchInfo; + int32_t* pSlotIds; + SExprSupp pseudoExprSup; + int32_t retrieveType; + int32_t currentGroupIndex; + SSDataBlock* pBufferredRes; + SArray* pUidList; + int32_t indexOfBufferedRes; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -483,28 +465,28 @@ typedef struct STimeWindowAggSupp { } STimeWindowAggSupp; typedef struct SStreamScanInfo { - uint64_t tableUid; // queried super table uid - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SExprSupp tbnameCalSup; - SExprSupp tagCalSup; - int32_t primaryTsIndex; // primary time stamp slot id - SReadHandle readHandle; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. - SArray* pColMatchInfo; // - SNode* pCondition; - - SArray* pBlockLists; // multiple SSDatablock. - SSDataBlock* pRes; // result SSDataBlock - SSDataBlock* pUpdateRes; // update SSDataBlock - int32_t updateResIndex; - int32_t blockType; // current block type - int32_t validBlockIndex; // Is current data has returned? - uint64_t numOfExec; // execution times - STqReader* tqReader; - - uint64_t groupId; - SUpdateInfo* pUpdateInfo; + uint64_t tableUid; // queried super table uid + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SExprSupp tbnameCalSup; + SExprSupp tagCalSup; + int32_t primaryTsIndex; // primary time stamp slot id + SReadHandle readHandle; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SColMatchInfo matchInfo; + SNode* pCondition; + + SArray* pBlockLists; // multiple SSDatablock. + SSDataBlock* pRes; // result SSDataBlock + SSDataBlock* pUpdateRes; // update SSDataBlock + int32_t updateResIndex; + int32_t blockType; // current block type + int32_t validBlockIndex; // Is current data has returned? + uint64_t numOfExec; // execution times + STqReader* tqReader; + + uint64_t groupId; + SUpdateInfo* pUpdateInfo; EStreamScanMode scanMode; SOperatorInfo* pStreamScanOp; @@ -559,8 +541,8 @@ typedef struct SSysTableScanInfo { bool showRewrite; SNode* pCondition; // db_name filter condition, to discard data that are not in current database SMTbCursor* pCur; // cursor for iterate the local table meta store. - SSysTableIndex* pIdx; // idx for local table meta - SArray* scanCols; // SArray scan column id list + SSysTableIndex* pIdx; // idx for local table meta + SColMatchInfo matchInfo; SName name; SSDataBlock* pRes; int64_t numOfBlocks; // extract basic running information. @@ -680,7 +662,7 @@ typedef struct SFillOperatorInfo { SSDataBlock* existNewGroupBlock; STimeWindow win; SNode* pCondition; - SArray* pColMatchColInfo; + SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; uint64_t curGroupId; // current handled group id @@ -819,7 +801,7 @@ typedef struct SStreamFillOperatorInfo { int32_t srcDelRowIndex; SSDataBlock* pDelRes; SNode* pCondition; - SArray* pColMatchColInfo; + SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; SStreamFillInfo* pFillInfo; @@ -863,7 +845,7 @@ typedef struct SSortOperatorInfo { uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo; SSortHandle* pSortHandle; - SArray* pColMatchInfo; // for index map from table scan output + SColMatchInfo matchInfo; int32_t bufPageSize; int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. @@ -932,7 +914,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, const char* idStr); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 00a338cc41..22a685230a 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -28,7 +28,7 @@ static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static void destroyLastrowScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); -static SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo); +static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { @@ -46,10 +46,10 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pInfo->pRes = createResDataBlock(pDescNode); int32_t numOfCols = 0; - SArray* pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - pInfo->pColMatchInfo = removeRedundantTsCol(pScanNode, pColMatchInfo); + code = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + removeRedundantTsCol(pScanNode, &pInfo->matchInfo); - code = extractCacheScanSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); + code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -64,7 +64,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), &pInfo->pLastrowReader); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -139,9 +139,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { - SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); - int32_t slotId = pMatchInfo->targetSlotId; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { + SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); + int32_t slotId = pMatchInfo->dstSlotId; SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId); @@ -188,7 +188,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); @@ -235,7 +235,7 @@ void destroyLastrowScanOperator(void* param) { blockDataDestroy(pInfo->pBufferredRes); taosMemoryFree(pInfo->pSlotIds); taosArrayDestroy(pInfo->pUidList); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); if (pInfo->pLastrowReader != NULL) { pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader); @@ -255,16 +255,16 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw; for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i); + SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i); for (int32_t j = 0; j < pWrapper->nCols; ++j) { if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - (*pSlotIds)[pColMatch->targetSlotId] = -1; + (*pSlotIds)[pColMatch->dstSlotId] = -1; break; } if (pColMatch->colId == pWrapper->pSchema[j].colId) { - (*pSlotIds)[pColMatch->targetSlotId] = j; + (*pSlotIds)[pColMatch->dstSlotId] = j; break; } } @@ -273,17 +273,18 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask return TSDB_CODE_SUCCESS; } -SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo) { +int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) { if (!pScanNode->ignoreNull) { // retrieve cached last value - return pColMatchInfo; + return TSDB_CODE_SUCCESS; } - SArray* pMatchInfo = taosArrayInit(taosArrayGetSize(pColMatchInfo), sizeof(SColMatchInfo)); + size_t size = taosArrayGetSize(pColMatchInfo->pList); + SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchInfo)); - for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { - SColMatchInfo* pColInfo = taosArrayGet(pColMatchInfo, i); + for (int32_t i = 0; i < size; ++i) { + SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i); - int32_t slotId = pColInfo->targetSlotId; + int32_t slotId = pColInfo->dstSlotId; SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots; SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId); @@ -292,6 +293,7 @@ SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatch } } - taosArrayDestroy(pColMatchInfo); - return pMatchInfo; + taosArrayDestroy(pColMatchInfo->pList); + pColMatchInfo->pList = pMatchInfo; + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 2ed83a6469..c7a2480204 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -53,22 +53,6 @@ typedef struct SDataDeleterHandle { TdThreadMutex mutex; } SDataDeleterHandle; -static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { - if (tsCompressColData < 0 || 0 == pData->info.rows) { - return false; - } - - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); - int32_t colSize = pColRes->info.bytes * pData->info.rows; - if (NEEDTO_COMPRESS_QUERY(colSize)) { - return true; - } - } - - return false; -} - static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) { int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index b758e4b1dd..bc4ab9c468 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -51,22 +51,6 @@ typedef struct SDataDispatchHandle { TdThreadMutex mutex; } SDataDispatchHandle; -static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { - if (tsCompressColData < 0 || 0 == pData->info.rows) { - return false; - } - - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); - int32_t colSize = pColRes->info.bytes * pData->info.rows; - if (NEEDTO_COMPRESS_QUERY(colSize)) { - return true; - } - } - - return false; -} - // clang-format off // data format: // +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ @@ -86,7 +70,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn } } SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; - pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols); + pEntry->compressed = 0; pEntry->numOfRows = pInput->pData->info.rows; pEntry->numOfCols = numOfCols; pEntry->dataLen = 0; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 11d6665614..506033b3b6 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1066,13 +1066,17 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) { return pList; } -SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type) { - size_t numOfCols = LIST_LENGTH(pNodeList); +int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type, SColMatchInfo* pMatchInfo) { + size_t numOfCols = LIST_LENGTH(pNodeList); + int32_t code = 0; + + pMatchInfo->matchType = type; + SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1080,12 +1084,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - SColMatchInfo c = {0}; - c.output = true; + SColMatchItem c = {.needOutput = true}; c.colId = pColNode->colId; c.srcSlotId = pColNode->slotId; - c.matchType = type; - c.targetSlotId = pNode->slotId; + c.dstSlotId = pNode->slotId; taosArrayPush(pList, &c); } } @@ -1102,10 +1104,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod continue; } - SColMatchInfo* info = NULL; + SColMatchItem* info = NULL; for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { info = taosArrayGet(pList, j); - if (info->targetSlotId == pNode->slotId) { + if (info->dstSlotId == pNode->slotId) { break; } } @@ -1114,11 +1116,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod (*numOfOutputCols) += 1; } else if (info != NULL) { // select distinct tbname from stb where tbname='abc'; - info->output = false; + info->needOutput = false; } } - return pList; + return code; } static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, @@ -1407,14 +1409,14 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray int32_t i = 0, j = 0; while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { SColumnInfoData* p = taosArrayGet(pCols, i); - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j); - if (!outputEveryColumn && pmInfo->reserved) { + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j); +/* if (!outputEveryColumn && pmInfo->reserved) { j++; continue; - } + }*/ if (p->info.colId == pmInfo->colId) { - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info); i++; j++; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c484a852b9..2432944928 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1086,7 +1086,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo) { if (pFilterNode == NULL || pBlock->info.rows == 0) { return; } @@ -1120,12 +1120,12 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM extractQualifiedTupleByFilterResult(pBlock, p, keep, status); if (pColMatchInfo != NULL) { - for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { - SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i); + for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo->pList); ++i) { + SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId); + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId); if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - blockDataUpdateTsWindow(pBlock, pInfo->targetSlotId); + blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId); break; } } @@ -2799,7 +2799,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL); + doFilter(pInfo->pCondition, fillResult, &pInfo->matchInfo, NULL); if (fillResult->info.rows > 0) { break; } @@ -3087,7 +3087,7 @@ void destroyFillOperatorInfo(void* param) { cleanupExprSupp(&pInfo->noFillExprSupp); taosMemoryFreeClear(pInfo->p); - taosArrayDestroy(pInfo->pColMatchColInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -3231,8 +3231,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; int32_t numOfOutputCols = 0; - pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, - &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, + COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f24c782a9e..75abd22e46 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -331,12 +331,13 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, } } - for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) { - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { + for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->matchInfo.pList); ++i) { + SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { continue; } - pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + + pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; } return true; @@ -442,12 +443,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return terrno; } - relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampUs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo); + doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -780,8 +781,8 @@ static void destroyTableScanOperatorInfo(void* param) { tsdbReaderClose(pTableScanInfo->dataReader); pTableScanInfo->dataReader = NULL; - if (pTableScanInfo->pColMatchInfo != NULL) { - taosArrayDestroy(pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->matchInfo.pList); } cleanupExprSupp(&pTableScanInfo->pseudoSup); @@ -798,10 +799,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - pInfo->pColMatchInfo = - extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, + &pInfo->matchInfo); - int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1512,9 +1513,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } // todo extract method - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { - SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { + SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { continue; } @@ -1522,7 +1523,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) { SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j); if (pResCol->info.colId == pColMatchInfo->colId) { - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); colExists = true; break; @@ -1531,7 +1532,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // the required column does not exists in submit block, let's set it to be all null value if (!colExists) { - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataAppendNNULL(pDst, 0, pBlockInfo->rows); } } @@ -2193,8 +2194,8 @@ static void destroyStreamScanOperatorInfo(void* param) { if (pStreamScan->tqReader) { tqCloseReader(pStreamScan->tqReader); } - if (pStreamScan->pColMatchInfo) { - taosArrayDestroy(pStreamScan->pColMatchInfo); + if (pStreamScan->matchInfo.pList) { + taosArrayDestroy(pStreamScan->matchInfo.pList); } if (pStreamScan->pPseudoExpr) { destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr); @@ -2228,17 +2229,17 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pGroupTags = pTableScanNode->pGroupTags; int32_t numOfCols = 0; - pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo); + int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { - SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i); + SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i); int16_t colId = id->colId; taosArrayPush(pColIds, &colId); if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - pInfo->primaryTsIndex = id->targetSlotId; + pInfo->primaryTsIndex = id->dstSlotId; } } @@ -2387,7 +2388,7 @@ static void destroySysScanOperator(void* param) { pInfo->pIdx = NULL; } - taosArrayDestroy(pInfo->scanCols); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(pInfo->pUser); taosMemoryFreeClear(param); @@ -2752,7 +2753,7 @@ static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t dataBlock->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, dataBlock->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(dataBlock); @@ -3441,7 +3442,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3457,7 +3458,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3618,7 +3619,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3634,7 +3635,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3798,7 +3799,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } char* pStart = pRsp->data; - extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->scanCols, &pStart); + extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->matchInfo.pList, &pStart); updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); // todo log the filter info @@ -3827,7 +3828,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { p->info.rows = buildDbTableInfoBlock(pInfo->sysInfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); pInfo->pRes->info.rows = p->info.rows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); blockDataDestroy(p); return pInfo->pRes->info.rows; @@ -3892,7 +3893,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan SSDataBlock* pResBlock = createResDataBlock(pDescNode); int32_t num = 0; - SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosMemoryStrDup((void*)pUser); @@ -3900,7 +3901,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->pRes = pResBlock; pInfo->pCondition = pScanNode->node.pConditions; - pInfo->scanCols = colList; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -4021,7 +4021,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -4038,15 +4038,14 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi int32_t num = 0; int32_t numOfExprs = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); - SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { goto _error; } pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = colList; pInfo->pRes = createResDataBlock(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; @@ -4180,11 +4179,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { + SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { continue; } - pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; } return TSDB_CODE_SUCCESS; @@ -4215,7 +4214,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc return terrno; } - relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { @@ -4228,7 +4227,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL); + doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -4312,9 +4311,9 @@ static SSDataBlock* getTableDataBlock(void* param) { SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { int32_t tsTargetSlotId = 0; for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { - SColMatchInfo* colInfo = taosArrayGet(colMatchInfo, i); + SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i); if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - tsTargetSlotId = colInfo->targetSlotId; + tsTargetSlotId = colInfo->dstSlotId; } } @@ -4500,8 +4499,8 @@ void destroyTableMergeScanOperatorInfo(void* param) { } taosArrayDestroy(pTableScanInfo->dataReaders); - if (pTableScanInfo->pColMatchInfo != NULL) { - taosArrayDestroy(pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->matchInfo.pList); } pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); @@ -4559,11 +4558,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pColList); + taosArrayDestroy(pInfo->matchInfo.pList); goto _error; } @@ -4583,12 +4582,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->tableListInfo = pTableListInfo; pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); - pInfo->pSortInfo = generateSortByTsInfo(pInfo->pColMatchInfo, pInfo->cond.order); + pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); int32_t rowSize = pInfo->pResBlock->info.rowSize; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 39987d4501..04f86d90d5 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -38,8 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + int32_t code = extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); @@ -48,7 +47,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pInfo->binfo.pRes = pResBlock; pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pCondition = pSortNode->node.pConditions; - pInfo->pColMatchInfo = pColMatchColInfo; initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); pOperator->name = "SortOperator"; @@ -67,7 +65,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -127,11 +125,11 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i // todo extract function to handle this int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); +// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } @@ -210,13 +208,13 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); + pInfo->matchInfo.pList, pInfo); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); return NULL; } - doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo, NULL); + doFilter(pInfo->pCondition, pBlock, &pInfo->matchInfo, NULL); if (blockDataGetNumOfRows(pBlock) == 0) { continue; } @@ -256,7 +254,7 @@ void destroyOrderOperatorInfo(void* param) { tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -277,20 +275,17 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; typedef struct SGroupSortOperatorInfo { - SOptrBasicInfo binfo; - SArray* pSortInfo; - SArray* pColMatchInfo; - - int64_t startTs; - uint64_t sortElapsed; - bool hasGroupId; - uint64_t currGroupId; - + SOptrBasicInfo binfo; + SArray* pSortInfo; + SColMatchInfo matchInfo; + int64_t startTs; + uint64_t sortElapsed; + bool hasGroupId; + uint64_t currGroupId; SSDataBlock* prefetchedSortInput; SSortHandle* pCurrSortHandle; EChildOperatorStatus childOpStatus; - - SSortExecInfo sortExecInfo; + SSortExecInfo sortExecInfo; } SGroupSortOperatorInfo; SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, @@ -320,11 +315,11 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo if (p->info.rows > 0) { int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); +// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } @@ -442,7 +437,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { // beginSortGroup would fetch all child blocks of pInfo->currGroupId; ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); + pInfo->matchInfo.pList, pInfo); if (pBlock != NULL) { pBlock->info.groupId = pInfo->currGroupId; pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -474,7 +469,7 @@ void destroyGroupSortOperatorInfo(void* param) { pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -494,8 +489,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + int32_t code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, + &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pInfo->binfo.pRes = pResBlock; @@ -503,8 +498,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); - ; - pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "GroupSortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT; pOperator->blocking = false; @@ -517,7 +511,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, getGroupSortExplainExecInfo); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -535,20 +529,18 @@ _error: // Multiway Sort Merge operator typedef struct SMultiwayMergeOperatorInfo { SOptrBasicInfo binfo; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - - SArray* pSortInfo; - SSortHandle* pSortHandle; - SArray* pColMatchInfo; // for index map from table scan output - - SSDataBlock* pInputBlock; - int64_t startTs; // sort start time - bool groupSort; - bool hasGroupId; - uint64_t groupId; - STupleHandle* prefetchedTuple; + SArray* pSortInfo; + SSortHandle* pSortHandle; + SColMatchInfo matchInfo; + SSDataBlock* pInputBlock; + int64_t startTs; // sort start time + bool groupSort; + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; } SMultiwayMergeOperatorInfo; int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { @@ -645,11 +637,11 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(pDataBlock, p->info.rows); int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); +// ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } @@ -678,7 +670,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, - pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator); + pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; } else { @@ -694,7 +686,7 @@ void destroyMultiwayMergeOperatorInfo(void* param) { tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -731,15 +723,14 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + + code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->groupSort = pMergePhyNode->groupSort; pInfo->pSortInfo = pSortInfo; - pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pInputBlock = pInputBlock; pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 5d3f5e07d3..23847928da 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -722,7 +722,7 @@ void destroyStreamFillOperatorInfo(void* param) { pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pPrevSrcBlock = blockDataDestroy(pInfo->pPrevSrcBlock); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); - pInfo->pColMatchColInfo = taosArrayDestroy(pInfo->pColMatchColInfo); + pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFree(pInfo); } @@ -1499,7 +1499,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } doStreamFillImpl(pOperator); - doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL); + doFilter(pInfo->pCondition, pInfo->pRes, &pInfo->matchInfo, NULL); memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; if (pInfo->pRes->info.rows > 0) { @@ -1675,11 +1675,10 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, - &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + int32_t code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, + &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pInfo->pCondition = pPhyFillNode->node.pConditions; - pInfo->pColMatchColInfo = pColMatchColInfo; - int32_t code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); if (code != TSDB_CODE_SUCCESS) { goto _error; } -- GitLab