diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 24dfa5958dcef1656787f9add4bd8bbfd44d0dbf..fdbb2c5123952b0637e8408cfc457d23e644b5ab 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -137,6 +137,9 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui for (int32_t i = start; i < start + nRows; ++i) { colDataSetNull_f(pColumnInfoData->nullbitmap, i); } + + int32_t bytes = pColumnInfoData->info.bytes; + memset(pColumnInfoData->pData + start * bytes, 0, nRows * bytes); } pColumnInfoData->hasNull = true; @@ -215,7 +218,7 @@ size_t blockDataGetSerialMetaSize(uint32_t numOfCols); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index c0c3fc7fbc931246597a2d7c3fed8bc0febdf96f..60e8f63c22c1492ab9757f7a833f4c710f1420d8 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -241,13 +241,6 @@ struct STag { memcpy(varDataVal(x), (str), __len); \ } while (0); -#define STR_TO_NET_VARSTR(x, str) \ - do { \ - VarDataLenT __len = (VarDataLenT)strlen(str); \ - *(VarDataLenT *)(x) = htons(__len); \ - memcpy(varDataVal(x), (str), __len); \ - } while (0); - #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ do { \ char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \ diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 72248336f713386807b3b62360226dd485bb519d..6f2a67546658228615ebe20eade9e626e9a13ed7 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -182,7 +182,7 @@ struct SScalarParam { }; void cleanupResultRowEntry(struct SResultRowEntryInfo *pCell); -int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock); +//int32_t getNumOfResult(SqlFunctionCtx *pCtx, int32_t num, SSDataBlock *pResBlock); bool isRowEntryCompleted(struct SResultRowEntryInfo *pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo *pEntry); diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index a50af18b442711d5a531fb00ab9b182a39f70c91..81f63537e59ee7f51e01a0bc5c9527601be83300 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -224,7 +224,7 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc typedef enum EFuncDataRequired { FUNC_DATA_REQUIRED_DATA_LOAD = 1, - FUNC_DATA_REQUIRED_STATIS_LOAD, + FUNC_DATA_REQUIRED_SMA_LOAD, FUNC_DATA_REQUIRED_NOT_LOAD, FUNC_DATA_REQUIRED_FILTEROUT, } EFuncDataRequired; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0b27d9b52775b123b6a67acb8733cf566881b334..ebb7b5069264e334f93c38937cae32c83fb5456b 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -246,7 +246,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int uint32_t finalNumOfRows = numOfRow1 + numOfRow2; if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { // Handle the bitmap - if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) { + if (finalNumOfRows > (*capacity)) { char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -280,16 +280,14 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); pColumnInfoData->varmeta.length = len + oldLen; } else { - if (finalNumOfRows > *capacity || (numOfRow1 == 0 && pColumnInfoData->info.bytes != 0)) { + if (finalNumOfRows > (*capacity)) { // all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0; - // ASSERT(finalNumOfRows * pColumnInfoData->info.bytes); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } pColumnInfoData->pData = tmp; - if (BitmapLen(numOfRow1) < BitmapLen(finalNumOfRows)) { char* btmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(finalNumOfRows)); uint32_t extend = BitmapLen(finalNumOfRows) - BitmapLen(numOfRow1); @@ -823,7 +821,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB } static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { - int32_t rows = pDataBlock->info.rows; + int32_t rows = pDataBlock->info.capacity; size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData)); @@ -1126,26 +1124,28 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF } void blockDataCleanup(SSDataBlock* pDataBlock) { - pDataBlock->info.rows = 0; - pDataBlock->info.groupId = 0; + SDataBlockInfo* pInfo = &pDataBlock->info; - pDataBlock->info.window.ekey = 0; - pDataBlock->info.window.skey = 0; + pInfo->rows = 0; + pInfo->groupId = 0; + pInfo->window.ekey = 0; + pInfo->window.skey = 0; - if (pDataBlock->info.capacity == 0) { + if (pInfo->capacity == 0) { return; } size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - colInfoDataCleanup(p, pDataBlock->info.capacity); + colInfoDataCleanup(p, pInfo->capacity); } } -static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) { - ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows); - if (numOfRows < pBlockInfo->capacity) { +// todo temporarily disable it +static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { + ASSERT(numOfRows > 0 /*&& pBlockInfo->capacity >= pBlockInfo->rows*/); + if (numOfRows <= pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; } @@ -1182,8 +1182,10 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* return TSDB_CODE_OUT_OF_MEMORY; } - memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows)); pColumn->pData = tmp; + if (clearPayload) { + memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows)); + } } return TSDB_CODE_SUCCESS; @@ -1191,6 +1193,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { pColumn->hasNull = false; + if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; if (pColumn->varmeta.offset != NULL) { @@ -1203,30 +1206,27 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { } } -int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows) { +int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload) { SDataBlockInfo info = {0}; - return doEnsureCapacity(pColumn, &info, numOfRows); + return doEnsureCapacity(pColumn, &info, numOfRows, clearPayload); } int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t code = 0; - if (numOfRows == 0) { + if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) { return TSDB_CODE_SUCCESS; } - if (pDataBlock->info.capacity < numOfRows) { - pDataBlock->info.capacity = numOfRows; - } - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - code = doEnsureCapacity(p, &pDataBlock->info, numOfRows); + code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, true); if (code) { return code; } } + pDataBlock->info.capacity = numOfRows; return TSDB_CODE_SUCCESS; } diff --git a/source/common/src/trow.c b/source/common/src/trow.c index e4818aaa871eb6b7db22101a1c5c7cd38e242a4e..ac4b5f86de7c4bb9b909b7098ba6da3d39801a52 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -1065,8 +1065,8 @@ void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) { void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal) { STColumn *pTColumn = &pTSchema->columns[iCol]; - SCellVal cv; - SValue value; + SCellVal cv = {0}; + SValue value = {0}; ASSERT((pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) || (iCol > 0)); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 8b4cacf8d609ecf24b699f4cccf6306c6ada15c8..637a3818e658e5845bdc871b7054508585073604 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -240,25 +240,22 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { return -1; } - int32_t numOfCols = pShow->pMeta->numOfColumns; - SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + int32_t numOfCols = pShow->pMeta->numOfColumns; + SSDataBlock *pBlock = createDataBlock(); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; - SSchema *p = &pShow->pMeta->pSchemas[i]; + + SSchema *p = &pShow->pMeta->pSchemas[i]; idata.info.bytes = p->bytes; idata.info.type = p->type; idata.info.colId = p->colId; - - taosArrayPush(pBlock->pDataBlock, &idata); - if (IS_VAR_DATA_TYPE(p->type)) { - pBlock->info.hasVarCol = true; - } + blockDataAppendColInfo(pBlock, &idata); } blockDataEnsureCapacity(pBlock, rowsToRead); + if (mndCheckRetrieveFinished(pShow)) { mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows); rowsRead = 0; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index afd53b8dda18359bd5a35701c910494ea5768a63..6ae52f1fd7f86f5e1c060712bb5650ff258e7707 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -644,6 +644,13 @@ typedef struct SSttBlockLoadInfo { int16_t *colIds; int32_t numOfCols; bool sttBlockLoaded; + + // keep the last access position, this position may be used to reduce the binary times for + // starting last block data for a new table + struct { + int32_t blockIndex; + int32_t rowIndex; + } prevEndPos; } SSttBlockLoadInfo; typedef struct SMergeTree { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4099dafa26950fca71a7e521159948ed0a04c9d7..bfde5b30765b06b61ddba0c05f07a0143c8a53db 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -544,7 +544,6 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) taosMemoryFree(pResBlock); return NULL; } - return pResBlock; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 14159c2da28ad3fcd98be0fb1516a74a8e49080d..e20c8ee955dd09a9bc78b6f495e58280f75cf679 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -536,10 +536,10 @@ typedef struct SSysTableScanInfo { } SSysTableScanInfo; typedef struct SBlockDistInfo { - SSDataBlock* pResBlock; - STsdbReader* pHandle; - SReadHandle readHandle; - uint64_t uid; // table uid + SSDataBlock* pResBlock; + STsdbReader* pHandle; + SReadHandle readHandle; + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this @@ -550,7 +550,6 @@ typedef struct SOptrBasicInfo { } SOptrBasicInfo; typedef struct SIntervalAggOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter SExprSupp scalarSupp; // supporter for perform scalar function @@ -571,7 +570,6 @@ typedef struct SIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo* intervalAggOperatorInfo; - // bool hasGroupId; uint64_t groupId; // current groupId int64_t curTs; // current ts SSDataBlock* prefetchedBlock; @@ -839,10 +837,6 @@ typedef struct SSortOperatorInfo { SNode* pCondition; } SSortOperatorInfo; -typedef struct STagFilterOperatorInfo { - SOptrBasicInfo binfo; -} STagFilterOperatorInfo; - typedef struct SJoinOperatorInfo { SSDataBlock* pRes; int32_t joinType; @@ -907,7 +901,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); -void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); +void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts); int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index bcd59e83f00f513682fdefbcbb20704efeca031d..95d3c5cf239eb1220e3f583d2396f0353b8868e9 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -170,14 +170,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); pRes->info.rows = 1; - if (pInfo->pseudoExprSup.numOfExprs > 0) { - SExprSupp* pSup = &pInfo->pseudoExprSup; - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows, - GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return NULL; - } + SExprSupp* pSup = &pInfo->pseudoExprSup; + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, + pRes->info.rows, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; } pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 6f8d60f538360dc34eb2c47d113f07eba6116f0d..d1046ff02cbbabcf31442e1c4b151a9006aaa836 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -379,7 +379,7 @@ static int32_t createResultData(SDataType* pType, int32_t numOfRows, SScalarPara pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; - int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); + int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true); if (code != TSDB_CODE_SUCCESS) { terrno = code; taosMemoryFree(pColumnData); @@ -1824,8 +1824,6 @@ static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { } static int32_t sortTableGroup(STableListInfo* pTableListInfo) { - int32_t code = TSDB_CODE_SUCCESS; - taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); int32_t size = taosArrayGetSize(pTableListInfo->pTableList); @@ -1847,6 +1845,11 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + if (pTableListInfo->groupOffset == NULL) { + taosArrayDestroy(pList); + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); taosArrayDestroy(pList); return TDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 26abc2b90bc563918dad9522d7fe20c6cb6782f6..deb628f30ed4e1a06f515ebb431d39535c05d52d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -87,7 +87,7 @@ static void releaseQueryBuf(size_t numOfTables); static void destroyFillOperatorInfo(void* param); static void destroyProjectOperatorInfo(void* param); -static void destroyOrderOperatorInfo(void* param); +static void destroySortOperatorInfo(void* param); static void destroyAggOperatorInfo(void* param); static void destroyIntervalOperatorInfo(void* param); @@ -322,7 +322,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; pColData->info.bytes = sizeof(int64_t); - colInfoDataEnsureCapacity(pColData, 5); + colInfoDataEnsureCapacity(pColData, 5, false); colDataAppendInt64(pColData, 0, &pQueryWindow->skey); colDataAppendInt64(pColData, 1, &pQueryWindow->ekey); @@ -439,7 +439,7 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc pColInfo = pInput->pData[paramIndex]; } - colInfoDataEnsureCapacity(pColInfo, numOfRows); + colInfoDataEnsureCapacity(pColInfo, numOfRows, false); int8_t type = pFuncParam->param.nType; if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) { @@ -1047,8 +1047,6 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pCol SFilterInfo* filter = pFilterInfo; int64_t st = taosGetTimestampUs(); - // pError("start filter"); - // todo move to the initialization function int32_t code = 0; bool needFree = false; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e07a3475e0fd6a2051d13a8fbe31b13c3f07897e..ecc9c7e7ddd8aa5ce77bec462b43c0c5864611a2 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -336,8 +336,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGroupbyOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pRes = pInfo->binfo.pRes; - if (pOperator->status == OP_RES_TO_RETURN) { return buildGroupResultDataBlock(pOperator); } @@ -390,7 +388,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } } #endif - blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; @@ -422,6 +419,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* } initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -1082,14 +1081,16 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } pInfo->partitionSup.needCalc = true; - SSDataBlock* pResBlock = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc); - if (!pResBlock) { + pInfo->binfo.pRes = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc); + if (pInfo->binfo.pRes == NULL) { goto _error; } - blockDataEnsureCapacity(pResBlock, 4096); - pInfo->binfo.pRes = pResBlock; + + blockDataEnsureCapacity(pInfo->binfo.pRes, 4096); + pInfo->parIte = NULL; pInfo->pInputDataBlock = NULL; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); pInfo->tsColIndex = 0; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 4e4c33d4c312baeb64963bef789f9822395b022a..ba076233aaa20619a729fe5aac5b794d41d57ed3 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -384,6 +384,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy initBasicInfo(&pInfo->binfo, pResBlock); initResultSizeInfo(&pOperator->resultInfo, numOfRows); + blockDataEnsureCapacity(pResBlock, numOfRows); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -391,8 +392,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy } setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); - - pInfo->binfo.pRes = pResBlock; pInfo->pCondition = pPhyNode->node.pConditions; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3352b2685a17964d516b37cf9cbf167ad45d0345..937069b5cc013110369605096d57c49c76e9986b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -58,6 +58,18 @@ typedef struct { void* pVnode; } SSTabFltArg; +typedef struct STableMergeScanExecInfo { + SFileBlockLoadRecorder blockRecorder; + SSortExecInfo sortExecInfo; +} STableMergeScanExecInfo; + +typedef struct STableMergeScanSortSourceParam { + SOperatorInfo* pOperator; + int32_t readerIdx; + uint64_t uid; + SSDataBlock* inputBlock; +} STableMergeScanSortSourceParam; + static int32_t sysChkFilter__Comm(SNode* pNode); static int32_t sysChkFilter__DBName(SNode* pNode); static int32_t sysChkFilter__VgroupId(SNode* pNode); @@ -69,15 +81,15 @@ static int32_t sysChkFilter__STableName(SNode* pNode); static int32_t sysChkFilter__Uid(SNode* pNode); static int32_t sysChkFilter__Type(SNode* pNode); -static int32_t sysFilte__DbName(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__VgroupId(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__TableName(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__CreateTime(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Ncolumn(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Ttl(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__STableName(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Uid(void* pMeta, SNode* pNode, SArray* result); -static int32_t sysFilte__Type(void* pMeta, SNode* pNode, SArray* result); +static int32_t sysFilte__DbName(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__VgroupId(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Ttl(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__STableName(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Uid(void* arg, SNode* pNode, SArray* result); +static int32_t sysFilte__Type(void* arg, SNode* pNode, SArray* result); const SSTabFltFuncDef filterDict[] = { {.name = "table_name", .chkFunc = sysChkFilter__TableName, .fltFunc = sysFilte__TableName}, @@ -95,7 +107,7 @@ const SSTabFltFuncDef filterDict[] = { static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result); static int32_t optSysTabFilteImpl(void* arg, SNode* cond, SArray* result); static int32_t optSysCheckOper(SNode* pOpear); -static int32_t optSysMergeRslt(SArray* multiRslt, SArray* reslt); +static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt); static bool processBlockWithProbability(const SSampleExecInfo* pInfo); @@ -123,29 +135,6 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } } -static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) { -#if 0 - int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray *group = GET_TABLEGROUP(pRuntimeEnv, i); - SArray *tableKeyGroup = taosArrayGetP(pQueryAttr->tableGroupInfo.pGroupList, i); - - size_t t = taosArrayGetSize(group); - for (int32_t j = 0; j < t; ++j) { - STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); - updateTableQueryInfoForReverseScan(pCheckInfo); - - // update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide - // the start check timestamp of tsdbQueryHandle -// STableKeyInfo *pTableKeyInfo = taosArrayGet(tableKeyGroup, j); -// pTableKeyInfo->lastKey = pCheckInfo->lastKey; -// -// assert(pCheckInfo->pTable == pTableKeyInfo->pTable); - } - } -#endif -} - static void getNextTimeWindow(SInterval* pInterval, STimeWindow* tw, int32_t order) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); if (pInterval->intervalUnit != 'n' && pInterval->intervalUnit != 'y') { @@ -291,7 +280,7 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* return TSDB_CODE_SUCCESS; } -static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, +static bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, int32_t numOfRows) { if (pColsAgg == NULL || pFilterNode == NULL) { return true; @@ -360,13 +349,13 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo // todo handle the slimit info void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator) { SLimit* pLimit = &pLimitInfo->limit; + const char* id = GET_TASKID(pTaskInfo); if (pLimit->offset > 0 && pLimitInfo->remainOffset > 0) { if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; pBlock->info.rows = 0; - qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, - GET_TASKID(pTaskInfo)); + qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, id); } else { blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; @@ -379,22 +368,11 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo int32_t keep = pBlock->info.rows - overflowRows; blockDataKeepFirstNRows(pBlock, keep); - qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); + qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, id); pOperator->status = OP_EXEC_DONE; } } -static void ensureBlockCapacity(SSDataBlock* pBlock, int32_t capacity) { - // keep the value of rows temporarily - int32_t rows = pBlock->info.rows; - - pBlock->info.rows = 0; - blockDataEnsureCapacity(pBlock, capacity); - - // restore the rows number - pBlock->info.rows = rows; -} - static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -425,24 +403,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - ensureBlockCapacity(pBlock, pBlock->info.rows); - } doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; loadSMA = true; // mark the operation of load sma; bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - ensureBlockCapacity(pBlock, pBlock->info.rows); - } - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); return TSDB_CODE_SUCCESS; } else { @@ -492,7 +462,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return terrno; } - ensureBlockCapacity(pBlock, pBlock->info.rows); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); @@ -536,7 +505,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr) { // currently only the tbname pseudo column - if (numOfPseudoExpr == 0) { + if (numOfPseudoExpr <= 0) { return TSDB_CODE_SUCCESS; } @@ -566,7 +535,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int // this is to handle the tbname if (fmIsScanPseudoColumnFunc(functionId)) { - setTbNameColData(pHandle->meta, pBlock, pColInfoData, functionId); + setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name); } else { // these are tags STagVal tagVal = {0}; tagVal.cid = pExpr->base.pParam[0].pCol->colId; @@ -602,16 +571,20 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int return TSDB_CODE_SUCCESS; } -void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) { +void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name) { struct SScalarFuncExecFuncs fpSet = {0}; fmGetScalarFuncExecFuncs(functionId, &fpSet); - SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_BIGINT, sizeof(uint64_t), 1); - colInfoDataEnsureCapacity(&infoData, 1); + size_t len = TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE; + char buf[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(buf, name) - colDataAppendInt64(&infoData, 0, (int64_t*)&pBlock->info.uid); - SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData}; + SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, len, 1); + colInfoDataEnsureCapacity(&infoData, 1, false); + colDataAppend(&infoData, 0, buf, false); + + SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .columnData = &infoData}; SScalarParam param = {.columnData = pColInfoData}; if (fpSet.process != NULL) { @@ -642,9 +615,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } blockDataCleanup(pBlock); - SDataBlockInfo* pBInfo = &pBlock->info; - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window); + + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window); + + blockDataEnsureCapacity(pBlock, rows); // todo remove it latter + pBInfo->rows = rows; ASSERT(pBInfo->uid != 0); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); @@ -679,7 +656,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { return NULL; } -static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { +static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -692,7 +669,6 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { - ASSERT(p->info.uid != 0); return p; } @@ -746,7 +722,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); while (1) { - SSDataBlock* result = doTableScanGroup(pOperator); + SSDataBlock* result = doGroupedTableScan(pOperator); if (result) { return result; } @@ -784,7 +760,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } } - SSDataBlock* result = doTableScanGroup(pOperator); + SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { ASSERT(result->info.uid != 0); return result; @@ -808,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; - result = doTableScanGroup(pOperator); + result = doGroupedTableScan(pOperator); if (result != NULL) { return result; } @@ -851,24 +827,25 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, goto _error; } - SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; + SScanPhysiNode* pScanNode = &pTableScanNode->scan; + SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, + int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); + initLimitInfo(pScanNode->node.pLimit, pScanNode->node.pSlimit, &pInfo->limitInfo); code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } - if (pTableScanNode->scan.pScanPseudoCols != NULL) { + if (pScanNode->pScanPseudoCols != NULL) { SExprSupp* pSup = &pInfo->pseudoSup; - pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pSup->numOfExprs); pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } @@ -879,9 +856,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->sample.seed = taosGetTimestampSec(); pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createResDataBlock(pDescNode); - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); + pInfo->pFilterNode = pScanNode->node.pConditions; if (pInfo->pFilterNode != NULL) { code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -988,7 +968,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; - int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, + int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -1007,12 +987,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); - blockDataEnsureCapacity(pBlock, 1); colDataAppend(pColInfo, 0, p, false); taosMemoryFree(p); pBlock->info.rows = 1; - pOperator->status = OP_EXEC_DONE; return pBlock; } @@ -1078,7 +1056,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi pInfo->readHandle = *readHandle; pInfo->uid = pBlockScanNode->suid; + pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pInfo->pResBlock, 1); int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); @@ -2430,7 +2410,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->dataReader = NULL; - int32_t code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL); + code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL); if (code != 0) { terrno = code; destroyTableScanOperatorInfo(pTableScanOp); @@ -4098,12 +4078,14 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan goto _error; } - SScanPhysiNode* pScanNode = &pScanPhyNode->scan; - + SScanPhysiNode* pScanNode = &pScanPhyNode->scan; SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; int32_t num = 0; int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosMemoryStrDup((void*)pUser); @@ -4140,7 +4122,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan return pOperator; _error: - taosMemoryFreeClear(pInfo); + if (pInfo != NULL) { + destroySysScanOperator(pInfo); + } taosMemoryFreeClear(pOperator); terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; @@ -4244,16 +4228,15 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc; - int32_t num = 0; int32_t numOfExprs = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); - int32_t code = - extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + int32_t code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + int32_t num = 0; + code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4285,21 +4268,9 @@ _error: return NULL; } -int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo, - int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) { - for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { - STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i); - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr); - taosArrayPush(arrayReader, &pReader); - } - - return TSDB_CODE_SUCCESS; -} - // todo refactor static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, - int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { + SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableMergeScanInfo* pInfo = pOperator->info; @@ -4334,12 +4305,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } return TSDB_CODE_SUCCESS; - } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + } else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) { pCost->loadBlockStatis += 1; bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; - // STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); if (allColumnsHaveAgg == true) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -4388,13 +4358,12 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); // currently only the tbname pseudo column - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - int32_t code = - addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } + SExprSupp* pSup = &pTableScanInfo->pseudoSup; + + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, + pBlock->info.rows, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); } if (pTableScanInfo->pFilterNode != NULL) { @@ -4416,13 +4385,6 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc return TSDB_CODE_SUCCESS; } -typedef struct STableMergeScanSortSourceParam { - SOperatorInfo* pOperator; - int32_t readerIdx; - uint64_t uid; - SSDataBlock* inputBlock; -} STableMergeScanSortSourceParam; - static SSDataBlock* getTableDataBlockImpl(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -4433,7 +4395,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { STableMergeScanInfo* pTableScanInfo = pOperator->info; SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); - blockDataCleanup(pBlock); int64_t st = taosGetTimestampUs(); @@ -4443,13 +4404,13 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); if (code != 0) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } STsdbReader* reader = pInfo->pReader; while (tsdbNextDataBlock(reader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + if (isTaskKilled(pTaskInfo)) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // process this data block based on the probabilities @@ -4472,9 +4433,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } uint32_t status = 0; - int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); + code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // current block is filter out according to filter condition, continue load the next block @@ -4482,71 +4443,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); + pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); - pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; + pOperator->resultInfo.totalRows += pBlock->info.rows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; tsdbReaderClose(pInfo->pReader); pInfo->pReader = NULL; return pBlock; } + tsdbReaderClose(pInfo->pReader); pInfo->pReader = NULL; return NULL; } -static SSDataBlock* getTableDataBlock(void* param) { - STableMergeScanSortSourceParam* source = param; - SOperatorInfo* pOperator = source->pOperator; - int32_t readerIdx = source->readerIdx; - SSDataBlock* pBlock = source->inputBlock; - STableMergeScanInfo* pTableScanInfo = pOperator->info; - - int64_t st = taosGetTimestampUs(); - - blockDataCleanup(pBlock); - - STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); - while (tsdbNextDataBlock(reader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); - } - - // process this data block based on the probabilities - bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); - if (!processThisBlock) { - continue; - } - - blockDataCleanup(pBlock); - - int32_t rows = 0; - tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window); - blockDataEnsureCapacity(pBlock, rows); - pBlock->info.rows = rows; - - uint32_t status = 0; - int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); - // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - - pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - return pBlock; - } - return NULL; -} - SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { int32_t tsTargetSlotId = 0; for (int32_t i = 0; i < taosArrayGetSize(colMatchInfo); ++i) { @@ -4675,7 +4586,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pResBlock); - blockDataEnsureCapacity(pResBlock, capacity); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -4781,11 +4691,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -typedef struct STableMergeScanExecInfo { - SFileBlockLoadRecorder blockRecorder; - SSortExecInfo sortExecInfo; -} STableMergeScanExecInfo; - int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { ASSERT(pOptr != NULL); // TODO: merge these two info into one struct @@ -4800,18 +4705,6 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla return TSDB_CODE_SUCCESS; } -int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) { - const STableKeyInfo* info1 = p1; - const STableKeyInfo* info2 = p2; - if (info1->groupId < info2->groupId) { - return -1; - } else if (info1->groupId > info2->groupId) { - return 1; - } else { - return 0; - } -} - SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); @@ -4825,6 +4718,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN int32_t numOfCols = 0; int32_t code = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { @@ -4849,7 +4745,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->tableListInfo = pTableListInfo; pInfo->scanFlag = MAIN_SCAN; + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); + blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); + pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order); @@ -4866,7 +4765,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->info = pInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 5b05b3b2ed3bf2dc3ea1911a6bd57d690ef77c37..7abf05e7d65993f41f4157545ae52adcda55da22 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -20,7 +20,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator); static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); -static void destroyOrderOperatorInfo(void* param); +static void destroySortOperatorInfo(void* param); // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { @@ -64,7 +64,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // TODO dynamic set the available sort buffer pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); + createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroySortOperatorInfo, getExplainExecInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -249,7 +249,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL; } -void destroyOrderOperatorInfo(void* param) { +void destroySortOperatorInfo(void* param) { SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); @@ -627,10 +627,12 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* } -SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SOperatorInfo* pOperator) { +SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, + SOperatorInfo* pOperator) { SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + int32_t capacity = pOperator->resultInfo.capacity; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pDataBlock); @@ -640,7 +642,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } blockDataEnsureCapacity(p, capacity); - while (1) { doGetSortedBlockData(pInfo, pHandle, capacity, p); if (p->info.rows == 0) { @@ -656,8 +657,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } if (p->info.rows > 0) { - blockDataEnsureCapacity(pDataBlock, p->info.rows); - int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); @@ -692,13 +691,13 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, - pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pOperator); + SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; } else { doSetOperatorCompleted(pOperator); } + return pBlock; } @@ -742,12 +741,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = createResDataBlock(pDescNode); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; ASSERT(rowSize < 100 * 1024 * 1024); - SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; - code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { @@ -756,10 +754,12 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); + initResultSizeInfo(&pOperator->resultInfo, 1024); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); pInfo->groupSort = pMergePhyNode->groupSort; - pInfo->pSortInfo = pSortInfo; + pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); pInfo->pInputBlock = pInputBlock; pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. @@ -781,11 +781,11 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; if (pInfo != NULL) { destroyMultiwayMergeOperatorInfo(pInfo); } + pTaskInfo->code = code; taosMemoryFree(pOperator); return NULL; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 72093793a1b4baba3f62d6144e3031aecd1ce8ac..2a1b1783534c59ba537ff2aa49e55197923ccb86 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -489,7 +489,7 @@ EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWind if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) { return FUNC_DATA_REQUIRED_NOT_LOAD; } - return FUNC_DATA_REQUIRED_STATIS_LOAD; + return FUNC_DATA_REQUIRED_SMA_LOAD; } bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { @@ -1103,7 +1103,7 @@ int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { - return FUNC_DATA_REQUIRED_STATIS_LOAD; + return FUNC_DATA_REQUIRED_SMA_LOAD; } bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { diff --git a/source/libs/index/test/index_executor_tests.cpp b/source/libs/index/test/index_executor_tests.cpp index 8b03f3b251da4ae8698bf976753f6f7e3a86ab96..bcc474dc8b9e93f564d53b9b82000edf55110371 100644 --- a/source/libs/index/test/index_executor_tests.cpp +++ b/source/libs/index/test/index_executor_tests.cpp @@ -86,8 +86,7 @@ void sifAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *sl SColumnInfoData idata = {0}; idata.info = *colInfo; - colInfoDataEnsureCapacity(&idata, rows); - + colInfoDataEnsureCapacity(&idata, rows, true); taosArrayPush(res->pDataBlock, &idata); *dataBlockId = taosArrayGetSize(pBlockList) - 1; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 0d65e8d8f52d0cf570abc188cabebadd82abe0fa..79f33f3ac33fb1586780578d63d5a09d1b878a25 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -279,7 +279,7 @@ static EFuncDataRequired scanPathOptPromoteDataRequired(EFuncDataRequired l, EFu switch (l) { case FUNC_DATA_REQUIRED_DATA_LOAD: return l; - case FUNC_DATA_REQUIRED_STATIS_LOAD: + case FUNC_DATA_REQUIRED_SMA_LOAD: return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l; case FUNC_DATA_REQUIRED_NOT_LOAD: return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r; diff --git a/source/libs/scalar/CMakeLists.txt b/source/libs/scalar/CMakeLists.txt index c34c5e2877951cd6ba4d3b6ae72b321bd9dc6231..193a6971e54b52b1c59749c0b41eefe6b9bbb2fb 100644 --- a/source/libs/scalar/CMakeLists.txt +++ b/source/libs/scalar/CMakeLists.txt @@ -14,7 +14,6 @@ target_link_libraries(scalar PRIVATE nodes PRIVATE function PRIVATE qcom - PRIVATE vnode ) if(${BUILD_TEST}) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index ea1ce175e36dfe675b8e8b2e9cdb811f693914b3..fa71009365019014e8cacf70c66010bb5a5fa0c5 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -49,7 +49,7 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara pColumnData->info.scale = pType->scale; pColumnData->info.precision = pType->precision; - int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows); + int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pColumnData); @@ -70,7 +70,7 @@ int32_t sclConvertValueToSclParam(SValueNode* pValueNode, SScalarParam* out, int colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); - colInfoDataEnsureCapacity(out->columnData, 1); + colInfoDataEnsureCapacity(out->columnData, 1, true); code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1); sclFreeParam(&in); @@ -88,7 +88,7 @@ int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockL pLeft->numOfRows = pb->info.rows; if (pDst->numOfRows < pb->info.rows) { - colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows); + colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true); } _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN); @@ -1604,7 +1604,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { if (1 == res->numOfRows) { SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList)); } else { - colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows); + colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true); colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); pDst->numOfRows = res->numOfRows; pDst->numOfQualified = res->numOfQualified; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 7e93c9173c0d21edca583c40caab11a30f5c146f..c89072233fba013235dee71e6a22d86321fbe116 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -6,7 +6,6 @@ #include "tdatablock.h" #include "tjson.h" #include "ttime.h" -#include "vnode.h" typedef float (*_float_fn)(float); typedef double (*_double_fn)(double); @@ -1718,12 +1717,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { ASSERT(inputNum == 1); + char* p = colDataGetVarData(pInput->columnData, 0); - uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0); - - char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; - metaGetTableNameByUid(pInput->param, uid, str); - colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, str, pInput->numOfRows); + colDataAppendNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows); pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index 39055b534d937ca74ecde5d017710534112fc91a..0fd0c98c1ab23b6457683dec02147c5637523af1 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -87,7 +87,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s SColumnInfoData idata = {0}; idata.info = *colInfo; - colInfoDataEnsureCapacity(&idata, rows); + colInfoDataEnsureCapacity(&idata, rows, true); blockDataAppendColInfo(res, &idata); @@ -104,7 +104,7 @@ void scltAppendReservedSlot(SArray *pBlockList, int16_t *dataBlockId, int16_t *s SSDataBlock *res = *(SSDataBlock **)taosArrayGetLast(pBlockList); SColumnInfoData idata = {0}; idata.info = *colInfo; - colInfoDataEnsureCapacity(&idata, rows); + colInfoDataEnsureCapacity(&idata, rows, true); blockDataAppendColInfo(res, &idata); *dataBlockId = taosArrayGetSize(pBlockList) - 1; @@ -146,12 +146,12 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in SSDataBlock *res = createDataBlock(); for (int32_t i = 0; i < 2; ++i) { SColumnInfoData idata = createColumnInfoData(TSDB_DATA_TYPE_INT, 10, i + 1); - colInfoDataEnsureCapacity(&idata, rowNum); + colInfoDataEnsureCapacity(&idata, rowNum, true); blockDataAppendColInfo(res, &idata); } SColumnInfoData idata = createColumnInfoData(dataType, dataBytes, 3); - colInfoDataEnsureCapacity(&idata, rowNum); + colInfoDataEnsureCapacity(&idata, rowNum, true); blockDataAppendColInfo(res, &idata); res->info.capacity = rowNum; @@ -175,7 +175,7 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in int32_t idx = taosArrayGetSize(res->pDataBlock); SColumnInfoData idata = createColumnInfoData(dataType, dataBytes, 1 + idx); - colInfoDataEnsureCapacity(&idata, rowNum); + colInfoDataEnsureCapacity(&idata, rowNum, true); res->info.capacity = rowNum; blockDataAppendColInfo(res, &idata); @@ -2022,7 +2022,7 @@ void scltMakeDataBlock(SScalarParam **pInput, int32_t type, void *pVal, int32_t input->numOfRows = num; input->columnData->info = createColumnInfo(0, type, bytes); - colInfoDataEnsureCapacity(input->columnData, num); + colInfoDataEnsureCapacity(input->columnData, num, true); if (setVal) { for (int32_t i = 0; i < num; ++i) { diff --git a/tests/system-test/2-query/cast.py b/tests/system-test/2-query/cast.py index b3969a9c45321e4496ed2d03fd5ab00ed91ab619..e6542c7c2bc6b91e501098baf45562cadb07b599 100644 --- a/tests/system-test/2-query/cast.py +++ b/tests/system-test/2-query/cast.py @@ -15,7 +15,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) self.dbname = "db" def __cast_to_bigint(self, col_name, tbname):