diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index 7262f23657b57eeab7ce115a570b5cc3bff8bb20..396de7eed33102d24979653cdd05d27507641c96 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -21,10 +21,8 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { int32_t numOfRows = 0; - char *pWrite; int32_t cols = 0; char tmp[32]; - char tmp1[32]; if (pShow->numOfRows < 1) { cols = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 257a7d0baa4dc71f5b2b05fe672c5f7f3ead6f7b..ca877f599f7e052a842a186964a0b6961ca150f0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1910,6 +1910,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == k.ts) { if (init) { + if (merge.pTSchema == NULL) { + return code; + } + tRowMerge(&merge, pRow); } else { STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); @@ -1964,7 +1968,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow1); } else { init = true; - int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1975,17 +1979,24 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); if (!init) { - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } } else { + if (merge.pTSchema == NULL) { + return code; + } tRowMerge(&merge, &fRow); } doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } } + if (merge.pTSchema == NULL) { + return code; + } + code = tRowMergerGetRow(&merge, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3232,7 +3243,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); int32_t code = tRowMergerInit(&merge, pRow, pSchema); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) { return code; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 4e960afdb17a37940198f8bcc70e70bd8d846c1c..0cfef7dc24f2f56c82a408e85789d658ceae576a 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -74,7 +74,6 @@ typedef struct SResultRowPosition { typedef struct SResKeyPos { SResultRowPosition pos; uint64_t groupId; - // char parTbName[TSDB_TABLE_NAME_LEN]; char key[]; } SResKeyPos; @@ -84,6 +83,18 @@ typedef struct SResultRowInfo { SList* openWindow; } SResultRowInfo; +typedef struct SColMatchItem { + int32_t colId; + int32_t srcSlotId; + int32_t dstSlotId; + bool needOutput; +} SColMatchItem; + +typedef struct SColMatchInfo { + SArray* pList; // SArray + int32_t matchType; // determinate the source according to col id or slot id +} SColMatchInfo; + struct SqlFunctionCtx; size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); @@ -121,8 +132,8 @@ size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); SArray* extractPartitionColInfo(SNodeList* pNodeList); -SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type); +int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type, SColMatchInfo* pMatchInfo); void createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5e387d2bd3e66d7aa341e50c6166d82fa58889a7..7755bd88db8c9be412092fc24f62360f4cfca5bb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -47,12 +47,7 @@ extern "C" { typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); -#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) -#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) - -#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) - #define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0) #define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN) #define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0) @@ -240,7 +235,7 @@ typedef struct SOperatorInfo { typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_EXHAUSTED = 0x3, } EX_SOURCE_STATUS; @@ -289,15 +284,6 @@ typedef struct SExchangeInfo { SLimitInfo limitInfo; } SExchangeInfo; -typedef struct SColMatchInfo { - int32_t srcSlotId; // source slot id - int32_t colId; - int32_t targetSlotId; - bool output; // todo remove this? - bool reserved; - int32_t matchType; // determinate the source according to col id or slot id -} SColMatchInfo; - typedef struct SScanInfo { int32_t numOfAsc; int32_t numOfDesc; @@ -339,7 +325,7 @@ typedef struct STableScanInfo { SNode* pFilterNode; // filter info, which is push down by optimizer SSDataBlock* pResBlock; - SArray* pColMatchInfo; + SColMatchInfo matchInfo; SExprSupp pseudoSup; SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan @@ -365,10 +351,9 @@ typedef struct STableMergeScanInfo { uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo; SSortHandle* pSortHandle; - - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; @@ -380,43 +365,40 @@ typedef struct STableMergeScanInfo { int32_t* rowEntryInfoOffset; SExprInfo* pExpr; SSDataBlock* pResBlock; - SArray* pColMatchInfo; + SColMatchInfo matchInfo; int32_t numOfOutput; - - SExprSupp pseudoSup; - - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; + SExprSupp pseudoSup; + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; // if the upstream is an interval operator, the interval info is also kept here to get the time // window to check if current data block needs to be loaded. - SInterval interval; - SSampleExecInfo sample; // sample execution info - - SSortExecInfo sortExecInfo; + SInterval interval; + SSampleExecInfo sample; // sample execution info + SSortExecInfo sortExecInfo; } STableMergeScanInfo; typedef struct STagScanInfo { SColumnInfo* pCols; SSDataBlock* pRes; - SArray* pColMatchInfo; + SColMatchInfo matchInfo; int32_t curPos; SReadHandle readHandle; STableListInfo* pTableList; } STagScanInfo; typedef struct SLastrowScanInfo { - SSDataBlock* pRes; - SReadHandle readHandle; - void* pLastrowReader; - SArray* pColMatchInfo; - int32_t* pSlotIds; - SExprSupp pseudoExprSup; - int32_t retrieveType; - int32_t currentGroupIndex; - SSDataBlock* pBufferredRes; - SArray* pUidList; - int32_t indexOfBufferedRes; + SSDataBlock* pRes; + SReadHandle readHandle; + void* pLastrowReader; + SColMatchInfo matchInfo; + int32_t* pSlotIds; + SExprSupp pseudoExprSup; + int32_t retrieveType; + int32_t currentGroupIndex; + SSDataBlock* pBufferredRes; + SArray* pUidList; + int32_t indexOfBufferedRes; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -483,28 +465,28 @@ typedef struct STimeWindowAggSupp { } STimeWindowAggSupp; typedef struct SStreamScanInfo { - uint64_t tableUid; // queried super table uid - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SExprSupp tbnameCalSup; - SExprSupp tagCalSup; - int32_t primaryTsIndex; // primary time stamp slot id - SReadHandle readHandle; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. - SArray* pColMatchInfo; // - SNode* pCondition; - - SArray* pBlockLists; // multiple SSDatablock. - SSDataBlock* pRes; // result SSDataBlock - SSDataBlock* pUpdateRes; // update SSDataBlock - int32_t updateResIndex; - int32_t blockType; // current block type - int32_t validBlockIndex; // Is current data has returned? - uint64_t numOfExec; // execution times - STqReader* tqReader; - - uint64_t groupId; - SUpdateInfo* pUpdateInfo; + uint64_t tableUid; // queried super table uid + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SExprSupp tbnameCalSup; + SExprSupp tagCalSup; + int32_t primaryTsIndex; // primary time stamp slot id + SReadHandle readHandle; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SColMatchInfo matchInfo; + SNode* pCondition; + + SArray* pBlockLists; // multiple SSDatablock. + SSDataBlock* pRes; // result SSDataBlock + SSDataBlock* pUpdateRes; // update SSDataBlock + int32_t updateResIndex; + int32_t blockType; // current block type + int32_t validBlockIndex; // Is current data has returned? + uint64_t numOfExec; // execution times + STqReader* tqReader; + + uint64_t groupId; + SUpdateInfo* pUpdateInfo; EStreamScanMode scanMode; SOperatorInfo* pStreamScanOp; @@ -559,8 +541,8 @@ typedef struct SSysTableScanInfo { bool showRewrite; SNode* pCondition; // db_name filter condition, to discard data that are not in current database SMTbCursor* pCur; // cursor for iterate the local table meta store. - SSysTableIndex* pIdx; // idx for local table meta - SArray* scanCols; // SArray scan column id list + SSysTableIndex* pIdx; // idx for local table meta + SColMatchInfo matchInfo; SName name; SSDataBlock* pRes; int64_t numOfBlocks; // extract basic running information. @@ -680,7 +662,7 @@ typedef struct SFillOperatorInfo { SSDataBlock* existNewGroupBlock; STimeWindow win; SNode* pCondition; - SArray* pColMatchColInfo; + SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; uint64_t curGroupId; // current handled group id @@ -819,7 +801,7 @@ typedef struct SStreamFillOperatorInfo { int32_t srcDelRowIndex; SSDataBlock* pDelRes; SNode* pCondition; - SArray* pColMatchColInfo; + SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; SStreamFillInfo* pFillInfo; @@ -863,7 +845,7 @@ typedef struct SSortOperatorInfo { uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo; SSortHandle* pSortHandle; - SArray* pColMatchInfo; // for index map from table scan output + SColMatchInfo matchInfo; int32_t bufPageSize; int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. @@ -932,7 +914,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, const char* idStr); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 00a338cc419a4f7b0a2918afbf35a06992a2750f..22a685230a12d4981a3b0729ef1f11b6504688d4 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -28,7 +28,7 @@ static SSDataBlock* doScanCache(SOperatorInfo* pOperator); static void destroyLastrowScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); -static SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo); +static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { @@ -46,10 +46,10 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pInfo->pRes = createResDataBlock(pDescNode); int32_t numOfCols = 0; - SArray* pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); - pInfo->pColMatchInfo = removeRedundantTsCol(pScanNode, pColMatchInfo); + code = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + removeRedundantTsCol(pScanNode, &pInfo->matchInfo); - code = extractCacheScanSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); + code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -64,7 +64,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), &pInfo->pLastrowReader); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -139,9 +139,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { - SColMatchInfo* pMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); - int32_t slotId = pMatchInfo->targetSlotId; + for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { + SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); + int32_t slotId = pMatchInfo->dstSlotId; SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId); @@ -188,7 +188,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, - taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); + taosArrayGetSize(pInfo->matchInfo.pList), &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); @@ -235,7 +235,7 @@ void destroyLastrowScanOperator(void* param) { blockDataDestroy(pInfo->pBufferredRes); taosMemoryFree(pInfo->pSlotIds); taosArrayDestroy(pInfo->pUidList); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); if (pInfo->pLastrowReader != NULL) { pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader); @@ -255,16 +255,16 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw; for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i); + SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i); for (int32_t j = 0; j < pWrapper->nCols; ++j) { if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - (*pSlotIds)[pColMatch->targetSlotId] = -1; + (*pSlotIds)[pColMatch->dstSlotId] = -1; break; } if (pColMatch->colId == pWrapper->pSchema[j].colId) { - (*pSlotIds)[pColMatch->targetSlotId] = j; + (*pSlotIds)[pColMatch->dstSlotId] = j; break; } } @@ -273,17 +273,18 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask return TSDB_CODE_SUCCESS; } -SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo) { +int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo) { if (!pScanNode->ignoreNull) { // retrieve cached last value - return pColMatchInfo; + return TSDB_CODE_SUCCESS; } - SArray* pMatchInfo = taosArrayInit(taosArrayGetSize(pColMatchInfo), sizeof(SColMatchInfo)); + size_t size = taosArrayGetSize(pColMatchInfo->pList); + SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchInfo)); - for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { - SColMatchInfo* pColInfo = taosArrayGet(pColMatchInfo, i); + for (int32_t i = 0; i < size; ++i) { + SColMatchItem* pColInfo = taosArrayGet(pColMatchInfo->pList, i); - int32_t slotId = pColInfo->targetSlotId; + int32_t slotId = pColInfo->dstSlotId; SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots; SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId); @@ -292,6 +293,7 @@ SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatch } } - taosArrayDestroy(pColMatchInfo); - return pMatchInfo; + taosArrayDestroy(pColMatchInfo->pList); + pColMatchInfo->pList = pMatchInfo; + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 2ed83a6469645f685bd1be50b5896e14331e15bb..c7a248020472e229a2f786f8918e71fc8a36d765 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -53,22 +53,6 @@ typedef struct SDataDeleterHandle { TdThreadMutex mutex; } SDataDeleterHandle; -static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { - if (tsCompressColData < 0 || 0 == pData->info.rows) { - return false; - } - - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); - int32_t colSize = pColRes->info.bytes * pData->info.rows; - if (NEEDTO_COMPRESS_QUERY(colSize)) { - return true; - } - } - - return false; -} - static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) { int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index b758e4b1dd9dd36ac0f668729b9aa74e9b095c4d..bc4ab9c468ccbcacce3eff00ebd4f4d37435d87c 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -51,22 +51,6 @@ typedef struct SDataDispatchHandle { TdThreadMutex mutex; } SDataDispatchHandle; -static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { - if (tsCompressColData < 0 || 0 == pData->info.rows) { - return false; - } - - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); - int32_t colSize = pColRes->info.bytes * pData->info.rows; - if (NEEDTO_COMPRESS_QUERY(colSize)) { - return true; - } - } - - return false; -} - // clang-format off // data format: // +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+ @@ -86,7 +70,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn } } SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; - pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols); + pEntry->compressed = 0; pEntry->numOfRows = pInput->pData->info.rows; pEntry->numOfCols = numOfCols; pEntry->dataLen = 0; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 11d6665614c4a4075fbd7cc056cce2fcc107a946..f58c2a0e344c56b120c4585fb1af23308a58cbcb 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1066,13 +1066,17 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) { return pList; } -SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, - int32_t type) { - size_t numOfCols = LIST_LENGTH(pNodeList); +int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, + int32_t type, SColMatchInfo* pMatchInfo) { + size_t numOfCols = LIST_LENGTH(pNodeList); + int32_t code = 0; + + pMatchInfo->matchType = type; + SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } for (int32_t i = 0; i < numOfCols; ++i) { @@ -1080,12 +1084,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - SColMatchInfo c = {0}; - c.output = true; + SColMatchItem c = {.needOutput = true}; c.colId = pColNode->colId; c.srcSlotId = pColNode->slotId; - c.matchType = type; - c.targetSlotId = pNode->slotId; + c.dstSlotId = pNode->slotId; taosArrayPush(pList, &c); } } @@ -1102,10 +1104,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod continue; } - SColMatchInfo* info = NULL; + SColMatchItem* info = NULL; for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) { info = taosArrayGet(pList, j); - if (info->targetSlotId == pNode->slotId) { + if (info->dstSlotId == pNode->slotId) { break; } } @@ -1114,11 +1116,12 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod (*numOfOutputCols) += 1; } else if (info != NULL) { // select distinct tbname from stb where tbname='abc'; - info->output = false; + info->needOutput = false; } } - return pList; + pMatchInfo->pList = pList; + return code; } static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, @@ -1407,14 +1410,14 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray int32_t i = 0, j = 0; while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { SColumnInfoData* p = taosArrayGet(pCols, i); - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j); - if (!outputEveryColumn && pmInfo->reserved) { + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j); +/* if (!outputEveryColumn && pmInfo->reserved) { j++; continue; - } + }*/ if (p->info.colId == pmInfo->colId) { - SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info); i++; j++; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fb4248e886fdb3a1428f5aadd9966690fa3f0282..0312105ca8f8305284cb4d1dcdfda914dfd432c4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -355,7 +355,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 10000, .maxDataBlockNumPerQuery = 5000}; + SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; code = dsDataSinkMgtInit(&cfg); if (code != TSDB_CODE_SUCCESS) { qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a1f7b700a4a6c86049d05b43ec0e241347c9f0e7..2432944928e9843dc1c52c93fb49d3c1e61e7ba7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1086,7 +1086,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo) { if (pFilterNode == NULL || pBlock->info.rows == 0) { return; } @@ -1120,12 +1120,12 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM extractQualifiedTupleByFilterResult(pBlock, p, keep, status); if (pColMatchInfo != NULL) { - for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { - SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i); + for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo->pList); ++i) { + SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId); + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId); if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - blockDataUpdateTsWindow(pBlock, pInfo->targetSlotId); + blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId); break; } } @@ -2560,57 +2560,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len return TDB_CODE_SUCCESS; } -int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { - if (result == NULL) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info); - SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo)); - - // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); - int32_t length = *(int32_t*)(result); - int32_t offset = sizeof(int32_t); - - int32_t count = *(int32_t*)(result + offset); - offset += sizeof(int32_t); - - while (count-- > 0 && length > offset) { - int32_t keyLen = *(int32_t*)(result + offset); - offset += sizeof(int32_t); - - uint64_t tableGroupId = *(uint64_t*)(result + offset); - SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); - if (!resultRow) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - - // add a new result set for a new group - SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; - tSimpleHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); - - offset += keyLen; - int32_t valueLen = *(int32_t*)(result + offset); - if (valueLen != pSup->resultRowSize) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - offset += sizeof(int32_t); - int32_t pageId = resultRow->pageId; - int32_t pOffset = resultRow->offset; - memcpy(resultRow, result + offset, valueLen); - resultRow->pageId = pageId; - resultRow->offset = pOffset; - offset += valueLen; - - pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; - // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId)); - } - - if (offset != length) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - return TDB_CODE_SUCCESS; -} - int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf) { if (pLimitInfo->remainGroupOffset > 0) { if (pLimitInfo->currentGroupId == 0) { // it is the first group @@ -2850,7 +2799,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL); + doFilter(pInfo->pCondition, fillResult, &pInfo->matchInfo, NULL); if (fillResult->info.rows > 0) { break; } @@ -3098,9 +3047,11 @@ _error: destroyAggOperatorInfo(pInfo); } - cleanupExprSupp(&pOperator->exprSupp); - taosMemoryFreeClear(pOperator); + if (pOperator != NULL) { + cleanupExprSupp(&pOperator->exprSupp); + } + taosMemoryFreeClear(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -3136,7 +3087,7 @@ void destroyFillOperatorInfo(void* param) { cleanupExprSupp(&pInfo->noFillExprSupp); taosMemoryFreeClear(pInfo->p); - taosArrayDestroy(pInfo->pColMatchColInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -3280,8 +3231,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; int32_t numOfOutputCols = 0; - pInfo->pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, - &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, + COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f24c782a9ee2771ff5420a3481f4b956f4a1dc64..9773a0c8e78c9d822341a75360b382fa7830fa43 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -331,12 +331,13 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, } } - for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) { - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { + for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->matchInfo.pList); ++i) { + SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { continue; } - pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + + pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; } return true; @@ -442,12 +443,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return terrno; } - relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampUs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo); + doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -780,8 +781,8 @@ static void destroyTableScanOperatorInfo(void* param) { tsdbReaderClose(pTableScanInfo->dataReader); pTableScanInfo->dataReader = NULL; - if (pTableScanInfo->pColMatchInfo != NULL) { - taosArrayDestroy(pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->matchInfo.pList); } cleanupExprSupp(&pTableScanInfo->pseudoSup); @@ -798,10 +799,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - pInfo->pColMatchInfo = - extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, + &pInfo->matchInfo); - int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1512,9 +1513,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } // todo extract method - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { - SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { + SColMatchItem* pColMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { continue; } @@ -1522,7 +1523,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) { SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j); if (pResCol->info.colId == pColMatchInfo->colId) { - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); colExists = true; break; @@ -1531,7 +1532,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // the required column does not exists in submit block, let's set it to be all null value if (!colExists) { - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->dstSlotId); colDataAppendNNULL(pDst, 0, pBlockInfo->rows); } } @@ -2193,8 +2194,8 @@ static void destroyStreamScanOperatorInfo(void* param) { if (pStreamScan->tqReader) { tqCloseReader(pStreamScan->tqReader); } - if (pStreamScan->pColMatchInfo) { - taosArrayDestroy(pStreamScan->pColMatchInfo); + if (pStreamScan->matchInfo.pList) { + taosArrayDestroy(pStreamScan->matchInfo.pList); } if (pStreamScan->pPseudoExpr) { destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr); @@ -2228,17 +2229,17 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pGroupTags = pTableScanNode->pGroupTags; int32_t numOfCols = 0; - pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - int32_t numOfOutput = taosArrayGetSize(pInfo->pColMatchInfo); + int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); for (int32_t i = 0; i < numOfOutput; ++i) { - SColMatchInfo* id = taosArrayGet(pInfo->pColMatchInfo, i); + SColMatchItem* id = taosArrayGet(pInfo->matchInfo.pList, i); int16_t colId = id->colId; taosArrayPush(pColIds, &colId); if (id->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - pInfo->primaryTsIndex = id->targetSlotId; + pInfo->primaryTsIndex = id->dstSlotId; } } @@ -2387,7 +2388,7 @@ static void destroySysScanOperator(void* param) { pInfo->pIdx = NULL; } - taosArrayDestroy(pInfo->scanCols); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(pInfo->pUser); taosMemoryFreeClear(param); @@ -2752,7 +2753,7 @@ static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t dataBlock->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, dataBlock->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(dataBlock); @@ -3441,7 +3442,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3457,7 +3458,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3618,7 +3619,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3634,7 +3635,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); doFilterResult(pInfo); blockDataCleanup(p); @@ -3798,7 +3799,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { } char* pStart = pRsp->data; - extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->scanCols, &pStart); + extractDataBlockFromFetchRsp(pInfo->pRes, pRsp->data, pInfo->matchInfo.pList, &pStart); updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); // todo log the filter info @@ -3827,7 +3828,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { p->info.rows = buildDbTableInfoBlock(pInfo->sysInfo, p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); pInfo->pRes->info.rows = p->info.rows; - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); + relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); blockDataDestroy(p); return pInfo->pRes->info.rows; @@ -3889,18 +3890,16 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan SScanPhysiNode* pScanNode = &pScanPhyNode->scan; SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); int32_t num = 0; - SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosMemoryStrDup((void*)pUser); pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; - pInfo->pRes = pResBlock; + pInfo->pRes = createResDataBlock(pDescNode); pInfo->pCondition = pScanNode->node.pConditions; - pInfo->scanCols = colList; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -3922,7 +3921,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->exprSupp.numOfExprs = taosArrayGetSize(pResBlock->pDataBlock); + pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = @@ -4021,7 +4020,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -4038,15 +4037,14 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi int32_t num = 0; int32_t numOfExprs = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); - SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { goto _error; } pInfo->pTableList = pTableListInfo; - pInfo->pColMatchInfo = colList; pInfo->pRes = createResDataBlock(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; @@ -4180,11 +4178,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { + SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { continue; } - pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; } return TSDB_CODE_SUCCESS; @@ -4215,7 +4213,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc return terrno; } - relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { @@ -4228,7 +4226,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL); + doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -4312,9 +4310,9 @@ static SSDataBlock* getTableDataBlock(void* param) { SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { int32_t tsTargetSlotId = 0; for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { - SColMatchInfo* colInfo = taosArrayGet(colMatchInfo, i); + SColMatchItem* colInfo = taosArrayGet(colMatchInfo, i); if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - tsTargetSlotId = colInfo->targetSlotId; + tsTargetSlotId = colInfo->dstSlotId; } } @@ -4500,8 +4498,8 @@ void destroyTableMergeScanOperatorInfo(void* param) { } taosArrayDestroy(pTableScanInfo->dataReaders); - if (pTableScanInfo->pColMatchInfo != NULL) { - taosArrayDestroy(pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->matchInfo.pList != NULL) { + taosArrayDestroy(pTableScanInfo->matchInfo.pList); } pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); @@ -4559,11 +4557,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); - int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); + code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pColList); + taosArrayDestroy(pInfo->matchInfo.pList); goto _error; } @@ -4583,12 +4581,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->tableListInfo = pTableListInfo; pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColList; pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); - pInfo->pSortInfo = generateSortByTsInfo(pInfo->pColMatchInfo, pInfo->cond.order); + pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); int32_t rowSize = pInfo->pResBlock->info.rowSize; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 39987d4501a241fb69a5a46e800ca1f18d19cace..04f86d90d5eeb07d19feb15adbf65b04d9eb23cd 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -38,8 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + int32_t code = extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); @@ -48,7 +47,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pInfo->binfo.pRes = pResBlock; pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pCondition = pSortNode->node.pConditions; - pInfo->pColMatchInfo = pColMatchColInfo; initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); pOperator->name = "SortOperator"; @@ -67,7 +65,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -127,11 +125,11 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i // todo extract function to handle this int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); +// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } @@ -210,13 +208,13 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { SSDataBlock* pBlock = NULL; while (1) { pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); + pInfo->matchInfo.pList, pInfo); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); return NULL; } - doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo, NULL); + doFilter(pInfo->pCondition, pBlock, &pInfo->matchInfo, NULL); if (blockDataGetNumOfRows(pBlock) == 0) { continue; } @@ -256,7 +254,7 @@ void destroyOrderOperatorInfo(void* param) { tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -277,20 +275,17 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; typedef struct SGroupSortOperatorInfo { - SOptrBasicInfo binfo; - SArray* pSortInfo; - SArray* pColMatchInfo; - - int64_t startTs; - uint64_t sortElapsed; - bool hasGroupId; - uint64_t currGroupId; - + SOptrBasicInfo binfo; + SArray* pSortInfo; + SColMatchInfo matchInfo; + int64_t startTs; + uint64_t sortElapsed; + bool hasGroupId; + uint64_t currGroupId; SSDataBlock* prefetchedSortInput; SSortHandle* pCurrSortHandle; EChildOperatorStatus childOpStatus; - - SSortExecInfo sortExecInfo; + SSortExecInfo sortExecInfo; } SGroupSortOperatorInfo; SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, @@ -320,11 +315,11 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo if (p->info.rows > 0) { int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); +// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } @@ -442,7 +437,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { // beginSortGroup would fetch all child blocks of pInfo->currGroupId; ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, pInfo); + pInfo->matchInfo.pList, pInfo); if (pBlock != NULL) { pBlock->info.groupId = pInfo->currGroupId; pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -474,7 +469,7 @@ void destroyGroupSortOperatorInfo(void* param) { pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -494,8 +489,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + int32_t code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, + &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pInfo->binfo.pRes = pResBlock; @@ -503,8 +498,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); - ; - pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "GroupSortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT; pOperator->blocking = false; @@ -517,7 +511,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, getGroupSortExplainExecInfo); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -535,20 +529,18 @@ _error: // Multiway Sort Merge operator typedef struct SMultiwayMergeOperatorInfo { SOptrBasicInfo binfo; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - - SArray* pSortInfo; - SSortHandle* pSortHandle; - SArray* pColMatchInfo; // for index map from table scan output - - SSDataBlock* pInputBlock; - int64_t startTs; // sort start time - bool groupSort; - bool hasGroupId; - uint64_t groupId; - STupleHandle* prefetchedTuple; + SArray* pSortInfo; + SSortHandle* pSortHandle; + SColMatchInfo matchInfo; + SSDataBlock* pInputBlock; + int64_t startTs; // sort start time + bool groupSort; + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; } SMultiwayMergeOperatorInfo; int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { @@ -645,11 +637,11 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(pDataBlock, p->info.rows); int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); +// ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } @@ -678,7 +670,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, - pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator); + pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; } else { @@ -694,7 +686,7 @@ void destroyMultiwayMergeOperatorInfo(void* param) { tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->pColMatchInfo); + taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFreeClear(param); } @@ -731,15 +723,14 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = - extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + + code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->groupSort = pMergePhyNode->groupSort; pInfo->pSortInfo = pSortInfo; - pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pInputBlock = pInputBlock; pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 5d3f5e07d3f8b35e7c05e2a05973b36987fb1402..23847928da7eab945418ee636eeb56e25e06483b 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -722,7 +722,7 @@ void destroyStreamFillOperatorInfo(void* param) { pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); pInfo->pPrevSrcBlock = blockDataDestroy(pInfo->pPrevSrcBlock); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); - pInfo->pColMatchColInfo = taosArrayDestroy(pInfo->pColMatchColInfo); + pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFree(pInfo); } @@ -1499,7 +1499,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } doStreamFillImpl(pOperator); - doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL); + doFilter(pInfo->pCondition, pInfo->pRes, &pInfo->matchInfo, NULL); memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; if (pInfo->pRes->info.rows > 0) { @@ -1675,11 +1675,10 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId; int32_t numOfOutputCols = 0; - SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, - &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); + int32_t code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, + &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pInfo->pCondition = pPhyFillNode->node.pConditions; - pInfo->pColMatchColInfo = pColMatchColInfo; - int32_t code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); if (code != TSDB_CODE_SUCCESS) { goto _error; }