From b95e95dc0ab56534072f165fe7f011db486ca8d5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 22 Feb 2022 13:12:03 +0800 Subject: [PATCH] [td-13039] fix bug in sorted merge operator. --- include/common/tep.h | 12 +- include/libs/function/function.h | 4 +- source/common/src/tep.c | 49 +- source/common/test/commonTests.cpp | 2 +- source/libs/executor/inc/executil.h | 8 +- source/libs/executor/inc/executorimpl.h | 33 +- source/libs/executor/src/executil.c | 8 +- source/libs/executor/src/executorimpl.c | 568 +++++++++++------- source/libs/executor/src/tsort.c | 60 +- source/libs/executor/test/executorTests.cpp | 51 +- .../libs/executor/test/executorUtilTests.cpp | 168 +++--- source/libs/function/src/taggfunction.c | 5 +- source/libs/parser/src/queryInfoUtil.c | 2 +- 13 files changed, 572 insertions(+), 398 deletions(-) diff --git a/include/common/tep.h b/include/common/tep.h index a4e28dbb7c..2be86aa095 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -64,16 +64,16 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u } } -#define colDataGet(p1_, r_) \ +#define colDataGetData(p1_, r_) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \ - : (p1_)->pData + ((r_) * (p1_)->info.bytes)); + : (p1_)->pData + ((r_) * (p1_)->info.bytes)) int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); -int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); -void colDataTrim(SColumnInfoData* pColumnInfoData); +int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); +void colDataTrim(SColumnInfoData* pColumnInfoData); size_t colDataGetNumOfCols(const SSDataBlock* pBlock); size_t colDataGetNumOfRows(const SSDataBlock* pBlock); @@ -92,13 +92,13 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); -size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize); - int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol); +SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); +size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); void *blockDataDestroy(SSDataBlock *pBlock); #ifdef __cplusplus diff --git a/include/libs/function/function.h b/include/libs/function/function.h index aef5f7fec4..a5d45c120b 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -138,8 +138,10 @@ extern SFunctionFpSet fpSet[1]; // sql function runtime context typedef struct SqlFunctionCtx { + int32_t startRow; int32_t size; // number of rows - void * pInput; // input data buffer + SColumnInfoData* pInput; + uint32_t order; // asc|desc int16_t inputType; int16_t inputBytes; diff --git a/source/common/src/tep.c b/source/common/src/tep.c index b7f7043d26..89d8127a63 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -63,7 +63,7 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) { #define BitmapLen(_n) (((_n) + ((1<> NBIT) -int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { +int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { ASSERT(pColumnInfoData != NULL); if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return pColumnInfoData->varmeta.length; @@ -249,8 +249,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { } ASSERT(pColInfoData->nullbitmap == NULL); - pDataBlock->info.window.skey = *(TSKEY*) colDataGet(pColInfoData, 0); - pDataBlock->info.window.ekey = *(TSKEY*) colDataGet(pColInfoData, (pDataBlock->info.rows - 1)); + pDataBlock->info.window.skey = *(TSKEY*) colDataGetData(pColInfoData, 0); + pDataBlock->info.window.ekey = *(TSKEY*) colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); return 0; } @@ -262,8 +262,8 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); - uint32_t oldLen = colDataGetSize(pCol2, pDest->info.rows); - uint32_t newLen = colDataGetSize(pCol1, pSrc->info.rows); + uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows); + uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows); int32_t newSize = oldLen + newLen; char* tmp = realloc(pCol2->pData, newSize); @@ -287,7 +287,7 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) { for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - total += colDataGetSize(pColInfoData, pBlock->info.rows); + total += colDataGetLength(pColInfoData, pBlock->info.rows); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { total += sizeof(int32_t) * pBlock->info.rows; @@ -336,7 +336,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd if (isNull) { // do nothing } else { - char* p = colDataGet(pColInfoData, j); + char* p = colDataGetData(pColInfoData, j); size += varDataTLen(p); } @@ -401,7 +401,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg); - char* p = colDataGet(pColData, j); + char* p = colDataGetData(pColData, j); colDataAppend(pDstCol, j - startIndex, p, isNull); } @@ -443,7 +443,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { pStart += BitmapLen(pBlock->info.rows); } - uint32_t dataSize = colDataGetSize(pCol, numOfRows); + uint32_t dataSize = colDataGetLength(pCol, numOfRows); *(int32_t*) pStart = dataSize; pStart += sizeof(int32_t); @@ -592,8 +592,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { } } - void* left1 = colDataGet(pColInfoData, left); - void* right1 = colDataGet(pColInfoData, right); + void* left1 = colDataGetData(pColInfoData, left); + void* right1 = colDataGetData(pColInfoData, right); switch(pColInfoData->info.type) { case TSDB_DATA_TYPE_INT: { @@ -632,7 +632,7 @@ static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, co return code; } } else { - char* p = colDataGet(pSrc, tupleIndex); + char* p = colDataGetData(pSrc, tupleIndex); code = colDataAppend(pDst, numOfRows, p, false); if (code != TSDB_CODE_SUCCESS) { return code; @@ -971,8 +971,8 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) { // } // } -// void* left1 = colDataGet(pColInfoData, left); -// void* right1 = colDataGet(pColInfoData, right); +// void* left1 = colDataGetData(pColInfoData, left); +// void* right1 = colDataGetData(pColInfoData, right); // switch(pColInfoData->info.type) { // case TSDB_DATA_TYPE_INT: { @@ -1113,4 +1113,25 @@ void* blockDataDestroy(SSDataBlock* pBlock) { tfree(pBlock->pBlockAgg); tfree(pBlock); return NULL; +} + +SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { + int32_t numOfCols = pDataBlock->info.numOfCols; + + SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock)); + pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = numOfCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); + colInfo.info = p->info; + taosArrayPush(pBlock->pDataBlock, &colInfo); + } + + return pBlock; +} + +size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { + return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); } \ No newline at end of file diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index e9e8d086b3..9b05b5a780 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -162,7 +162,7 @@ TEST(testCase, Datablock_test) { ASSERT_EQ(colDataGetNumOfCols(b), 2); ASSERT_EQ(colDataGetNumOfRows(b), 40); - char* pData = colDataGet(p1, 3); + char* pData = colDataGetData(p1, 3); printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 10d884cb3f..e45e02cdd0 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -69,8 +69,8 @@ typedef struct SResultRow { typedef struct SResultRowInfo { SResultRow** pResult; // result list - int16_t type:8; // data type for hash key - int32_t size:24; // number of result set +// int16_t type:8; // data type for hash key + int32_t size; // number of result set int32_t capacity; // max capacity int32_t curPos; // current active result row index of pResult list } SResultRowInfo; @@ -95,7 +95,7 @@ struct SUdfInfo; int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr); size_t getResultRowSize(SArray* pExprInfo); -int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); +int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); @@ -105,7 +105,7 @@ void closeAllResultRows(SResultRowInfo* pResultRowInfo); int32_t initResultRow(SResultRow *pResultRow); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); -void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); +void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow); struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 362f40696c..1289de004d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -445,16 +445,20 @@ typedef struct SOptrBasicInfo { int32_t capacity; } SOptrBasicInfo; -typedef struct SOptrBasicInfo STableIntervalOperatorInfo; - -typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file +typedef struct SAggSupporter { SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. +} SAggSupporter; + +typedef struct SOptrBasicInfo STableIntervalOperatorInfo; + +typedef struct SAggOperatorInfo { + SOptrBasicInfo binfo; + SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file + SAggSupporter aggSup; STableQueryInfo *current; uint32_t groupId; SGroupResInfo groupResInfo; @@ -552,8 +556,6 @@ typedef struct SDistinctOperatorInfo { typedef struct SSortedMergeOperatorInfo { SOptrBasicInfo binfo; - -// SSDataBlock *pDataBlock; bool hasVarCol; SArray *orderInfo; // SArray @@ -564,12 +566,16 @@ typedef struct SSortedMergeOperatorInfo { int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t numOfRowsInRes; - char** prevRow; int32_t resultRowFactor; - bool multiGroupResults; - bool hasGroupColData; + bool hasGroupVal; + + SDiskbasedBuf *pTupleStore; // keep the final results + int32_t numOfResPerPage; + + char** groupVal; + SArray *groupInfo; + SAggSupporter aggSup; } SSortedMergeOperatorInfo; typedef struct SOrderOperatorInfo { @@ -634,7 +640,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); // SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); // SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); @@ -682,9 +688,6 @@ int32_t checkForQueryBuf(size_t numOfTables); bool checkNeedToCompressQueryCol(SQInfo* pQInfo); void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status); -bool onlyQueryTags(STaskAttr* pQueryAttr); -// void destroyUdfInfo(struct SUdfInfo* pUdfInfo); - int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen); size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 52ab8493f1..e2675115e0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -53,8 +53,8 @@ int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { return size; } -int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { - pResultRowInfo->type = type; +int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) { +// pResultRowInfo->type = type; pResultRowInfo->size = 0; pResultRowInfo->curPos = -1; pResultRowInfo->capacity = size; @@ -93,7 +93,7 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow for (int32_t i = 0; i < pResultRowInfo->size; ++i) { SResultRow *pWindowRes = pResultRowInfo->pResult[i]; - clearResultRow(pRuntimeEnv, pWindowRes, pResultRowInfo->type); + clearResultRow(pRuntimeEnv, pWindowRes); int32_t groupIndex = 0; int64_t uid = 0; @@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { getResultRow(pResultRowInfo, slot)->closed = true; } -void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) { +void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) { if (pResultRow == NULL) { return; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5286bd7ba1..efa4afb422 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -12,14 +12,14 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include -#include -#include "exception.h" #include "os.h" + +#include "tep.h" +#include "tsort.h" +#include "exception.h" #include "parser.h" #include "tglobal.h" #include "tmsg.h" -#include "tq.h" #include "ttime.h" #include "executorimpl.h" @@ -381,12 +381,13 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg *pStatis) { } static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) { + int64_t newCapacity = 0; + // more than the capacity, reallocate the resources if (pResultRowInfo->size < pResultRowInfo->capacity) { return; } - int64_t newCapacity = 0; if (pResultRowInfo->capacity > 10000) { newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25); } else { @@ -519,12 +520,12 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR } static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes, - bool masterscan, uint64_t tableGroupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggOperatorInfo* pAggInfo) { + bool masterscan, uint64_t tableGroupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { bool existed = false; - SET_RES_WINDOW_KEY(pAggInfo->keyBuf, pData, bytes, tableGroupId); + SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, tableGroupId); SResultRow **p1 = - (SResultRow **)taosHashGet(pAggInfo->pResultRowHashTable, pAggInfo->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + (SResultRow **)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); // in case of repeat scan/reverse scan, no new time window added. if (isIntervalQuery) { @@ -540,8 +541,8 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int existed = (pResultRowInfo->pResult[0] == (*p1)); pResultRowInfo->curPos = 0; } else { // check if current pResultRowInfo contains the existed pResultRow - SET_RES_EXT_WINDOW_KEY(pAggInfo->keyBuf, pData, bytes, tid, pResultRowInfo); - int64_t* index = taosHashGet(pAggInfo->pResultRowListSet, pAggInfo->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); + SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, tid, pResultRowInfo); + int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); if (index != NULL) { pResultRowInfo->curPos = (int32_t) *index; existed = true; @@ -562,16 +563,16 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int SResultRow *pResult = NULL; if (p1 == NULL) { - pResult = getNewResultRow(pAggInfo->pool); + pResult = getNewResultRow(pSup->pool); int32_t ret = initResultRow(pResult); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // add a new result set for a new group - taosHashPut(pAggInfo->pResultRowHashTable, pAggInfo->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES); + taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult, POINTER_BYTES); SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult}; - taosArrayPush(pAggInfo->pResultRowArrayList, &cell); + taosArrayPush(pSup->pResultRowArrayList, &cell); } else { pResult = *p1; } @@ -580,8 +581,8 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int pResultRowInfo->pResult[pResultRowInfo->size++] = pResult; int64_t index = pResultRowInfo->curPos; - SET_RES_EXT_WINDOW_KEY(pAggInfo->keyBuf, pData, bytes, tid, pResultRowInfo); - taosHashPut(pAggInfo->pResultRowListSet, pAggInfo->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &index, POINTER_BYTES); + SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, tid, pResultRowInfo); + taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &index, POINTER_BYTES); } // too many time window in query @@ -933,11 +934,11 @@ static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, pCtx[k].startTs = pWin->skey; // keep it temporarialy - char* start = pCtx[k].pInput; + char* start = NULL;//pCtx[k].pInput; int32_t pos = (QUERY_IS_ASC_QUERY(pQueryAttr)) ? offset : offset - (forwardStep - 1); if (pCtx[k].pInput != NULL) { - pCtx[k].pInput = (char *)pCtx[k].pInput + pos * pCtx[k].inputBytes; +// pCtx[k].pInput = (char *)pCtx[k].pInput + pos * pCtx[k].inputBytes; } if (tsCol != NULL) { @@ -956,7 +957,7 @@ static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, // restore it pCtx[k].isAggSet = hasAggregates; - pCtx[k].pInput = start; +// pCtx[k].pInput = start; } } @@ -1152,7 +1153,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pCtx[i].columnIndex); // in case of the block distribution query, the inputBytes is not a constant value. - pCtx[i].pInput = p->pData; + pCtx[i].pInput = p; assert(p->info.colId == pCol->info.colId); if (pCtx[i].functionId < 0) { @@ -1164,14 +1165,14 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, // uint32_t status = aAggs[pCtx[i].functionId].status; // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { - SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); +// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); // In case of the top/bottom query again the nest query result, which has no timestamp column // don't set the ptsList attribute. - if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - pCtx[i].ptsList = (int64_t*) tsInfo->pData; - } else { - pCtx[i].ptsList = NULL; - } +// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { +// pCtx[i].ptsList = (int64_t*) tsInfo->pData; +// } else { +// pCtx[i].ptsList = NULL; +// } // } // } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { // SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; @@ -2341,29 +2342,6 @@ static bool isCachedLastQuery(STaskAttr *pQueryAttr) { return true; } - - -/** - * The following 4 kinds of query are treated as the tags query - * tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query - */ -bool onlyQueryTags(STaskAttr* pQueryAttr) { - for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { - SExprInfo* pExprInfo = &pQueryAttr->pExpr1[i]; - - int32_t functionId = getExprFunctionId(pExprInfo); - - if (functionId != FUNCTION_TAGPRJ && - functionId != FUNCTION_TID_TAG && - (!(functionId == FUNCTION_COUNT && pExprInfo->base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX)) && - (!(functionId == FUNCTION_PRJ && TSDB_COL_IS_UD_COL(pExprInfo->base.pColumns->flag)))) { - return false; - } - } - - return true; -} - ///////////////////////////////////////////////////////////////////////////////////////////// void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) { @@ -2864,8 +2842,6 @@ void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, tfree(p); } - - static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant *tag, int16_t type, int16_t bytes); @@ -3380,10 +3356,8 @@ void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, in initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); } - -void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int32_t stage, SExecTaskInfo* pTaskInfo) { - SOptrBasicInfo *pInfo = &pAggInfo->binfo; - +// TODO refactor: some function move away +void setDefaultOutputBuf_rv(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) { SqlFunctionCtx* pCtx = pInfo->pCtx; SSDataBlock* pDataBlock = pInfo->pRes; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; @@ -3391,9 +3365,7 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int32_t stage, SExecTask int64_t tid = 0; int64_t groupId = 0; - - pAggInfo->keyBuf = realloc(pAggInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES); - SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pAggInfo); + SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); @@ -3606,7 +3578,7 @@ STableQueryInfo *createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow // set more initial size of interval/groupby query // if (/*QUERY_IS_INTERVAL_QUERY(pQueryAttr) || */groupbyColumn) { int32_t initialSize = 128; - int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT); + int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -3624,7 +3596,7 @@ STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) { // set more initial size of interval/groupby query int32_t initialSize = 16; - int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT); + int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize); if (code != TSDB_CODE_SUCCESS) { tfree(pTableQueryInfo); return NULL; @@ -3717,7 +3689,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; SResultRow* pResultRow = - doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, pAggInfo); + doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, &pAggInfo->aggSup); assert (pResultRow != NULL); /* @@ -4521,13 +4493,19 @@ void queryCostStatis(SExecTaskInfo *pTaskInfo) { // return true; //} -void appendDownstream(SOperatorInfo* p, SOperatorInfo* pDownstream) { +int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) { if (p->pDownstream == NULL) { assert(p->numOfDownstream == 0); } - p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfDownstream + 1)); - p->pDownstream[p->numOfDownstream++] = pDownstream; + p->pDownstream = calloc(1, num * POINTER_BYTES); + if (p->pDownstream == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(p->pDownstream, pDownstream, num * POINTER_BYTES); + p->numOfDownstream = num; + return TSDB_CODE_SUCCESS; } static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); @@ -5599,11 +5577,20 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { return pOrderColumns; } +static int32_t initAggSup(SAggSupporter* pAggSup, SArray* pExprInfo); +static void clearupAggSup(SAggSupporter* pAggSup); + static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param; taosArrayDestroy(pInfo->orderInfo); - destroySortHandle(pInfo->pSortHandle); + taosArrayDestroy(pInfo->groupInfo); + + if (pInfo->pSortHandle != NULL) { + destroySortHandle(pInfo->pSortHandle); + } blockDataDestroy(pInfo->binfo.pRes); + + clearupAggSup(&pInfo->aggSup); } static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { @@ -5613,11 +5600,12 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevRow); } -static SExprInfo* exprArrayDup(SArray* pExprInfo) { - size_t numOfOutput = taosArrayGetSize(pExprInfo); +static SExprInfo* exprArrayDup(SArray* pExprList) { + size_t numOfOutput = taosArrayGetSize(pExprList); + SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); - for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { - SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + for (int32_t i = 0; i < numOfOutput; ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprList, i); assignExprInfo(&p[i], pExpr); } @@ -5666,6 +5654,173 @@ SSDataBlock* loadNextDataBlock(void* param) { return pOperator->exec(pOperator, &newgroup); } +static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char **buf, int32_t rowIndex) { + size_t size = taosArrayGetSize(groupInfo); + if (size == 0) { + return true; + } + + for (int32_t i = 0; i < size; ++i) { + int32_t* index = taosArrayGet(groupInfo, i); + + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index); + bool isNull = colDataIsNull(pColInfo, rowIndex, pBlock->info.rows, NULL); + + if ((isNull && buf[i] != NULL) || (!isNull && buf[i] == NULL)) { + return false; + } + + char* pCell = colDataGetData(pColInfo, rowIndex); + if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { + if (varDataLen(pCell) != varDataLen(buf[i])) { + return false; + } else { + if (memcmp(varDataVal(pCell), varDataVal(buf[i]), varDataLen(pCell)) != 0) { + return false; + } + } + } else { + if (memcmp(pCell, buf[i], pColInfo->info.bytes) != 0) { + return false; + } + } + } + + return 0; +} + +static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex) { + for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index + pCtx[j].startRow = rowIndex; + } + + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + pCtx[j].fpSet->addInput(&pCtx[j]); + +// if (functionId < 0) { +// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); +// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); +// } else { +// assert(!TSDB_FUNC_IS_SCALAR(functionId)); +// aAggs[functionId].mergeFunc(&pCtx[j]); +// } + } +} + +static void doFinalizeResultImpl(SqlFunctionCtx *pCtx, int32_t numOfExpr) { + for(int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + // if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) { + // continue; + // } + + // if (functionId < 0) { + // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + // doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); + // } else { + pCtx[j].fpSet->addInput(&pCtx[j]); + } +} + +static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) { + int32_t size = (int32_t) taosArrayGetSize(pColumnList); + + for(int32_t i = 0; i < size; ++i) { + int32_t* index = taosArrayGet(pColumnList, i); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, *index); + + char* data = colDataGetData(pColInfo, rowIndex); + memcpy(rowColData[i], data, colDataGetLength(pColInfo, rowIndex)); + } + + return true; +} + +static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) { + SSortedMergeOperatorInfo* pInfo = pOperator->info; + + SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + pCtx[i].size = 1; + } + + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + if (!pInfo->hasGroupVal) { + ASSERT(i == 0); + doMergeResultImpl(pInfo, pCtx, numOfExpr, i); + pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i); + } else { + if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) { + doMergeResultImpl(pInfo, pCtx, numOfExpr, i); + } else { + doFinalizeResultImpl(pCtx, numOfExpr); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + + // TODO check for available buffer; + + // next group info data + pInfo->binfo.pRes->info.rows += numOfRows; + for (int32_t j = 0; j < numOfExpr; ++j) { + if (pCtx[j].functionId < 0) { + continue; + } + + pCtx[j].fpSet->addInput(&pCtx[j]); + } + + doMergeResultImpl(pInfo, pCtx, numOfExpr, i); + pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i); + } + } + } +} + +static SSDataBlock* doMerge(SOperatorInfo* pOperator) { + SSortedMergeOperatorInfo* pInfo = pOperator->info; + SSortHandle* pHandle = pInfo->pSortHandle; + + SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes); + blockDataEnsureCapacity(pDataBlock, pInfo->binfo.capacity); + + while(1) { + + blockDataClearup(pDataBlock, pInfo->hasVarCol); + while (1) { + STupleHandle* pTupleHandle = sortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + // build datablock for merge for one group + appendOneRowToDataBlock(pDataBlock, pTupleHandle); + if (pDataBlock->info.rows >= pInfo->binfo.capacity) { + break; + } + } + + if (pDataBlock->info.rows == 0) { + break; + } + + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC); + // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, pOperator->pRuntimeEnv, true); + doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock); + // flush to tuple store, and after all data have been handled, return to upstream node or sink node + } + + doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfOutput); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + + // TODO check for available buffer; + + // next group info data + pInfo->binfo.pRes->info.rows += numOfRows; + return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL; +} + static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5675,7 +5830,7 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity); } SSchema* p = blockDataExtractSchema(pInfo->binfo.pRes, NULL); @@ -5698,7 +5853,7 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return doMerge(pOperator); } static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { @@ -5724,29 +5879,88 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { return pOrderInfo; } -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) { +static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) { + if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) { + return 0; + } + + int32_t len = 0; + SArray* plist = taosArrayInit(3, sizeof(SColumn)); + pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t)); + + if (plist == NULL || pInfo->groupInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo); + for(int32_t i = 0; i < numOfGroupCol; ++i) { + SColumn* pCol = taosArrayGet(pGroupInfo, i); + for(int32_t j = 0; j < taosArrayGetSize(pExprInfo); ++j) { + SExprInfo* pe = taosArrayGet(pExprInfo, j); + if (pe->base.resSchema.colId == pCol->info.colId) { + taosArrayPush(plist, pCol); + taosArrayPush(pInfo->groupInfo, &j); + len += pCol->info.bytes; + break; + } + } + } + + ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist)); + + pInfo->groupVal = calloc(1, (POINTER_BYTES * numOfGroupCol + len)); + if (pInfo->groupVal == NULL) { + taosArrayDestroy(plist); + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t offset = 0; + char *start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol)); + for(int32_t i = 0; i < numOfGroupCol; ++i) { + pInfo->groupVal[i] = start + offset; + SColumn* pCol = taosArrayGet(plist, i); + offset += pCol->info.bytes; + } + + taosArrayDestroy(plist); + + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) { SSortedMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortedMergeOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - tfree(pInfo); - tfree(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } int32_t numOfOutput = taosArrayGetSize(pExprInfo); - pInfo->binfo.capacity = 4096; - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); + + if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) { + goto _error; + } -// pInfo->resultRowFactor = -// (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); + int32_t code = initAggSup(&pInfo->aggSup, pExprInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + setDefaultOutputBuf_rv(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + code = initGroupCol(pExprInfo, pGroupInfo, pInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + +// pInfo->resultRowFactor = (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, +// pRuntimeEnv->pQueryAttr->topBotQuery, false)); pInfo->sortBufSize = 1024 * 16; // 1MB pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; - pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); - int32_t numOfRows = 1; + pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); pOperator->name = "SortedMerge"; pOperator->operatorType = OP_SortedMerge; @@ -5754,16 +5968,28 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = numOfOutput; + pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->pTaskInfo = pTaskInfo; pOperator->exec = doSortedMerge; pOperator->cleanupFn = destroySortedMergeOperatorInfo; - for(int32_t i = 0; i < numOfDownstream; ++i) { - appendDownstream(pOperator, downstream[i]); + code = appendDownstream(pOperator, downstream, numOfDownstream); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } return pOperator; + + _error: + if (pInfo != NULL) { + destroySortedMergeOperatorInfo(pInfo, numOfOutput); + } + + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; } static SSDataBlock* doSort(void* param, bool* newgroup) { @@ -5844,7 +6070,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI pOperator->exec = doSort; pOperator->cleanupFn = destroyOrderOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -6761,18 +6987,37 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { tfree(pOperator); } +static int32_t initAggSup(SAggSupporter* pAggSup, SArray* pExprInfo) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + + pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES); + pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); + pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + pAggSup->pool = initResultRowPool(getResultRowSize(pExprInfo)); + pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); + + if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL || + pAggSup->pResultRowHashTable == NULL || pAggSup->pool == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static void clearupAggSup(SAggSupporter* pAggSup) { + tfree(pAggSup->keyBuf); + taosHashCleanup(pAggSup->pResultRowHashTable); + taosHashCleanup(pAggSup->pResultRowListSet); + taosArrayDestroy(pAggSup->pResultRowArrayList); + destroyResultRowPool(pAggSup->pool); +} + static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows, const STableGroupInfo* pTableGroupInfo) { pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); pInfo->binfo.capacity = 4096; - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - - pInfo->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); - pInfo->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); - pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo)); - pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); - + initAggSup(&pInfo->aggSup, pExprInfo); pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); int32_t index = 0; @@ -6801,7 +7046,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo); - setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo); + setDefaultOutputBuf_rv(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; @@ -6815,7 +7060,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->pTaskInfo = pTaskInfo; pOperator->exec = doAggregate; pOperator->cleanupFn = destroyAggOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -6899,7 +7144,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo); size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList); - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiTableAggregate"; @@ -6912,7 +7157,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray pOperator->exec = doMultiTableAggregate; pOperator->cleanupFn = destroyAggOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -6927,7 +7172,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pBInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); - initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pBInfo->resultRowInfo, 8); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MAIN_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6942,7 +7187,7 @@ SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = doProjectOperation; pOperator->cleanupFn = destroyProjectOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7000,7 +7245,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->cleanupFn = destroyConditionOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7018,7 +7263,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->exec = doLimit; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7028,7 +7273,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7043,7 +7288,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOpe pOperator->exec = doIntervalAgg; pOperator->cleanupFn = destroyBasicOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7053,7 +7298,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7068,7 +7313,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S pOperator->exec = doAllIntervalAgg; pOperator->cleanupFn = destroyBasicOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7078,7 +7323,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pInfo->reptScan = false; pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "StateWindowOperator"; @@ -7092,7 +7337,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->exec = doStateWindowAgg; pOperator->cleanupFn = destroyStateWindowOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { @@ -7100,7 +7345,7 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pInfo->prevTs = INT64_MIN; pInfo->reptScan = false; @@ -7117,7 +7362,7 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = doSessionWindowAgg; pOperator->cleanupFn = destroySWindowOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7126,7 +7371,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiTableTimeIntervalOperator"; @@ -7141,7 +7386,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim pOperator->exec = doSTableIntervalAgg; pOperator->cleanupFn = destroyBasicOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7150,7 +7395,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pInfo->pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "AllMultiTableTimeIntervalOperator"; @@ -7165,12 +7410,11 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun pOperator->exec = doAllSTableIntervalAgg; pOperator->cleanupFn = destroyBasicOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } - SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); pInfo->colIndex = -1; // group by column index @@ -7184,7 +7428,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GroupbyAggOperator"; @@ -7198,7 +7442,7 @@ SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator pOperator->exec = hashGroupbyAggregate; pOperator->cleanupFn = destroyGroupbyOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7237,7 +7481,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf pOperator->exec = doFill; pOperator->cleanupFn = destroySFillOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7285,7 +7529,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->cleanupFn = destroySlimitOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7583,7 +7827,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato pOperator->pExpr = pExpr; pOperator->cleanupFn = destroyDistinctOperatorInfo; - appendDownstream(pOperator, downstream); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } @@ -7963,7 +8207,6 @@ int32_t buildArithmeticExprFromMsg(SExprInfo *pExprInfo, void *pQueryMsg) { return TSDB_CODE_SUCCESS; } - static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, int32_t numOfOutput, int32_t tagLen, bool superTable) { for (int32_t i = 0; i < numOfOutput; ++i) { int16_t functId = getExprFunctionId(&pExprs[i]); @@ -8127,66 +8370,6 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) { // return ret; } - -// todo refactor -int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, - SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo *pUdfInfo) { -// *pExprInfo = NULL; -// int32_t code = TSDB_CODE_SUCCESS; -// -// SExprInfo *pExprs = (SExprInfo *)calloc(numOfOutput, sizeof(SExprInfo)); -// if (pExprs == NULL) { -// return TSDB_CODE_QRY_OUT_OF_MEMORY; -// } -// -// bool isSuperTable = QUERY_IS_STABLE_QUERY(pQueryMsg->queryType); -// -// for (int32_t i = 0; i < numOfOutput; ++i) { -// pExprs[i].base = *pExpr[i]; -// memset(pExprs[i].base.param, 0, sizeof(SVariant) * tListLen(pExprs[i].base.param)); -// -// for (int32_t j = 0; j < pExpr[i]->numOfParams; ++j) { -// taosVariantAssign(&pExprs[i].base.param[j], &pExpr[i]->param[j]); -// } -// -// pExprs[i].base.resSchema.type = 0; -// -// int16_t type = 0; -// int16_t bytes = 0; -// -// // parse the arithmetic expression -// if (pExprs[i].base.functionId == FUNCTION_ARITHM) { -// code = buildArithmeticExprFromMsg(&pExprs[i], pQueryMsg); -// -// if (code != TSDB_CODE_SUCCESS) { -// tfree(pExprs); -// return code; -// } -// -// type = TSDB_DATA_TYPE_DOUBLE; -// bytes = tDataTypes[type].bytes; -// } else { -// int32_t index = pExprs[i].base.colInfo.colIndex; -// assert(prevExpr[index].base.resSchema.colId == pExprs[i].base.pColumns->info.colId); -// -// type = prevExpr[index].base.resSchema.type; -// bytes = prevExpr[index].base.resSchema.bytes; -// } -// -// int32_t param = (int32_t)pExprs[i].base.param[0].i; -// if (getResultDataInfo(type, bytes, functionId, param, &pExprs[i].base.resSchema.type, &pExprs[i].base.resSchema.bytes, -// &pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) { -// tfree(pExprs); -// return TSDB_CODE_QRY_INVALID_MSG; -// } -// -// assert(isValidDataType(pExprs[i].base.resSchema.type)); -// } -// -// *pExprInfo = pExprs; - return TSDB_CODE_SUCCESS; -} - SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code) { if (pQueryMsg->numOfGroupCols == 0) { return NULL; @@ -8595,30 +8778,3 @@ void releaseQueryBuf(size_t numOfTables) { // restore value is not enough buffer available atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); } - -void freeQueryAttr(STaskAttr* pQueryAttr) { - if (pQueryAttr != NULL) { - if (pQueryAttr->fillVal != NULL) { - tfree(pQueryAttr->fillVal); - } - - pQueryAttr->pFilterInfo = doDestroyFilterInfo(pQueryAttr->pFilterInfo, pQueryAttr->numOfFilterCols); - - pQueryAttr->pExpr1 = destroyQueryFuncExpr(pQueryAttr->pExpr1, pQueryAttr->numOfOutput); - pQueryAttr->pExpr2 = destroyQueryFuncExpr(pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); - pQueryAttr->pExpr3 = destroyQueryFuncExpr(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3); - - tfree(pQueryAttr->tagColList); - tfree(pQueryAttr->pFilterInfo); - - pQueryAttr->tableCols = freeColumnInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols); - - if (pQueryAttr->pGroupbyExpr != NULL) { - taosArrayDestroy(pQueryAttr->pGroupbyExpr->columnInfo); - tfree(pQueryAttr->pGroupbyExpr); - } - -// filterFreeInfo(pQueryAttr->pFilters); - } -} - diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 4854f0cbed..89f555517e 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -123,23 +123,6 @@ int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource) { taosArrayPush(pSortHandle->pOrderedSource, &pSource); } -static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) { - int32_t numOfCols = pDataBlock->info.numOfCols; - - SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock)); - pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - pBlock->info.numOfCols = numOfCols; - - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData colInfo = {0}; - SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - colInfo.info = p->info; - taosArrayPush(pBlock->pDataBlock, &colInfo); - } - - return pBlock; -} - static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) { SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); if (pSource == NULL) { @@ -198,7 +181,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { blockDataClearup(pDataBlock, pHandle->hasVarCol); - SSDataBlock* pBlock = createDataBlock(pDataBlock); + SSDataBlock* pBlock = createOneDataBlock(pDataBlock); int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); if (code != TSDB_CODE_SUCCESS) { return code; @@ -263,7 +246,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou if (isNull) { colDataAppend(pColInfo, pBlock->info.rows, NULL, true); } else { - char* pData = colDataGet(pSrcColInfo, *rowIndex); + char* pData = colDataGetData(pSrcColInfo, *rowIndex); colDataAppend(pColInfo, pBlock->info.rows, pData, false); } } @@ -279,15 +262,14 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa */ if (pSource->src.rowIndex >= pSource->src.pBlock->info.rows) { pSource->src.rowIndex = 0; - pSource->pageIndex += 1; - if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { - (*numOfCompleted) += 1; - pSource->src.rowIndex = -1; - pSource->pageIndex = -1; - pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); - } else { - if (pHandle->type == SORT_SINGLESOURCE_SORT) { + if (pHandle->type == SORT_SINGLESOURCE_SORT) { + if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { + (*numOfCompleted) += 1; + pSource->src.rowIndex = -1; + pSource->pageIndex = -1; + pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); + } else { SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo)); @@ -297,12 +279,12 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa } releaseBufPage(pHandle->pBuf, pPage); - } else { - pSource->src.pBlock = pHandle->fetchfp(((SGenericSource*)pSource)->param); - if (pSource->src.pBlock == NULL) { - (*numOfCompleted) += 1; - pSource->src.rowIndex = -1; - } + } + } else { + pSource->src.pBlock = pHandle->fetchfp(((SGenericSource*)pSource)->param); + if (pSource->src.pBlock == NULL) { + (*numOfCompleted) += 1; + pSource->src.rowIndex = -1; } } } @@ -404,8 +386,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { return pParam->nullFirst? -1:1; } - void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex); - void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex); + void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex); + void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex); switch(pLeftColInfoData->info.type) { case TSDB_DATA_TYPE_INT: { @@ -499,7 +481,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { tMergeTreeDestroy(pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; - SSDataBlock* pBlock = createDataBlock(pHandle->pDataBlock); + SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock); code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId); if (code != 0) { return code; @@ -545,7 +527,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { } if (pHandle->pDataBlock == NULL) { - pHandle->pDataBlock = createDataBlock(pBlock); + pHandle->pDataBlock = createOneDataBlock(pBlock); } int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); @@ -646,6 +628,7 @@ STupleHandle* sortNextTuple(SSortHandle* pHandle) { return NULL; } + // All the data are hold in the buffer, no external sort is invoked. if (pHandle->inMemSort) { pHandle->tupleHandle.rowIndex += 1; if (pHandle->tupleHandle.rowIndex == pHandle->pDataBlock->info.rows) { @@ -671,6 +654,7 @@ STupleHandle* sortNextTuple(SSortHandle* pHandle) { return NULL; } + // Get the adjusted value after the loser tree is updated. index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); pSource = pHandle->cmpParam.pSources[index]; @@ -691,5 +675,5 @@ bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex) { void* sortGetValue(STupleHandle* pVHandle, int32_t colIndex) { SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex); - return colDataGet(pColInfo, pVHandle->rowIndex); + return colDataGetData(pColInfo, pVHandle->rowIndex); } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 455e731c05..bf3f76b94e 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -42,22 +42,21 @@ enum { }; typedef struct SDummyInputInfo { - int32_t max; - int32_t current; - int32_t startVal; - int32_t type; + int32_t totalPages; // numOfPages + int32_t current; + int32_t startVal; + int32_t type; + int32_t numOfRowsPerPage; SSDataBlock* pBlock; } SDummyInputInfo; SSDataBlock* getDummyBlock(void* param, bool* newgroup) { SOperatorInfo* pOperator = static_cast(param); SDummyInputInfo* pInfo = static_cast(pOperator->info); - if (pInfo->current >= pInfo->max) { + if (pInfo->current >= pInfo->totalPages) { return NULL; } - int32_t numOfRows = 1000; - if (pInfo->pBlock == NULL) { pInfo->pBlock = static_cast(calloc(1, sizeof(SSDataBlock))); @@ -67,8 +66,8 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { colInfo.info.type = TSDB_DATA_TYPE_INT; colInfo.info.bytes = sizeof(int32_t); colInfo.info.colId = 1; - colInfo.pData = static_cast(calloc(numOfRows, sizeof(int32_t))); - colInfo.nullbitmap = static_cast(calloc(1, (numOfRows + 7) / 8)); + colInfo.pData = static_cast(calloc(pInfo->numOfRowsPerPage, sizeof(int32_t))); + colInfo.nullbitmap = static_cast(calloc(1, (pInfo->numOfRowsPerPage + 7) / 8)); taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo); @@ -91,7 +90,7 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { char buf[128] = {0}; char b1[128] = {0}; int32_t v = 0; - for(int32_t i = 0; i < numOfRows; ++i) { + for(int32_t i = 0; i < pInfo->numOfRowsPerPage; ++i) { SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); if (pInfo->type == data_desc) { @@ -111,22 +110,23 @@ SSDataBlock* getDummyBlock(void* param, bool* newgroup) { // colDataAppend(pColInfo2, i, b1, false); } - pBlock->info.rows = numOfRows; + pBlock->info.rows = pInfo->numOfRowsPerPage; pBlock->info.numOfCols = 1; pInfo->current += 1; return pBlock; } -SOperatorInfo* createDummyOperator(int32_t numOfBlocks, int32_t type) { +SOperatorInfo* createDummyOperator(int32_t startVal, int32_t numOfBlocks, int32_t rowsPerPage, int32_t type) { SOperatorInfo* pOperator = static_cast(calloc(1, sizeof(SOperatorInfo))); pOperator->name = "dummyInputOpertor4Test"; pOperator->exec = getDummyBlock; SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); - pInfo->max = numOfBlocks; - pInfo->startVal = 1500000; - pInfo->type = type; + pInfo->totalPages = numOfBlocks; + pInfo->startVal = startVal; + pInfo->numOfRowsPerPage = rowsPerPage; + pInfo->type = type; pOperator->info = pInfo; return pOperator; @@ -257,7 +257,7 @@ TEST(testCase, inMem_sort_Test) { SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); for(int32_t i = 0; i < pRes->info.rows; ++i) { - char* p = colDataGet(pCol2, i); + char* p = colDataGetData(pCol2, i); printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); } } @@ -341,7 +341,7 @@ TEST(testCase, external_sort_Test) { SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); // SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); for (int32_t i = 0; i < pRes->info.rows; ++i) { -// char* p = colDataGet(pCol2, i); +// char* p = colDataGetData(pCol2, i); printf("%d: %d\n", total++, ((int32_t*)pCol1->pData)[i]); // printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); } @@ -357,6 +357,7 @@ TEST(testCase, external_sort_Test) { taosArrayDestroy(pOrderVal); } +#endif TEST(testCase, sorted_merge_Test) { srand(time(NULL)); @@ -370,7 +371,12 @@ TEST(testCase, sorted_merge_Test) { SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); - exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); + exp->base.resSchema = createSchema(TSDB_DATA_TYPE_BIGINT, sizeof(int64_t), 1, "count_result"); + exp->base.pColumns = static_cast(calloc(1, sizeof(SColumn))); + exp->base.pColumns->flag = TSDB_COL_NORMAL; + exp->base.pColumns->info = (SColumnInfo) {.colId = 1, .type = TSDB_DATA_TYPE_INT, .bytes = 4}; + exp->base.numOfCols = 1; + taosArrayPush(pExprInfo, &exp); SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); @@ -380,10 +386,10 @@ TEST(testCase, sorted_merge_Test) { int32_t numOfSources = 10; SOperatorInfo** plist = (SOperatorInfo**) calloc(numOfSources, sizeof(void*)); for(int32_t i = 0; i < numOfSources; ++i) { - plist[i] = createDummyOperator(1, data_asc); + plist[i] = createDummyOperator(1, 1, 1, data_asc); } - SOperatorInfo* pOperator = createSortedMergeOperatorInfo(plist, numOfSources, pExprInfo, pOrderVal, NULL); + SOperatorInfo* pOperator = createSortedMergeOperatorInfo(plist, numOfSources, pExprInfo, pOrderVal, NULL, NULL); bool newgroup = false; SSDataBlock* pRes = NULL; @@ -409,8 +415,8 @@ TEST(testCase, sorted_merge_Test) { SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); // SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); for (int32_t i = 0; i < pRes->info.rows; ++i) { -// char* p = colDataGet(pCol2, i); - printf("%d: %d\n", total++, ((int32_t*)pCol1->pData)[i]); +// char* p = colDataGetData(pCol2, i); + printf("%d: %ld\n", total++, ((int64_t*)pCol1->pData)[i]); // printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); } } @@ -424,5 +430,4 @@ TEST(testCase, sorted_merge_Test) { taosArrayDestroy(pExprInfo); taosArrayDestroy(pOrderVal); } -#endif #pragma GCC diagnostic pop diff --git a/source/libs/executor/test/executorUtilTests.cpp b/source/libs/executor/test/executorUtilTests.cpp index 754d4b6b80..6a023170b2 100644 --- a/source/libs/executor/test/executorUtilTests.cpp +++ b/source/libs/executor/test/executorUtilTests.cpp @@ -123,8 +123,8 @@ int32_t docomp(const void* p1, const void* p2, void* param) { return pParam->nullFirst? -1:1; } - void* left1 = colDataGet(pLeftColInfoData, pLeftSource->src.rowIndex); - void* right1 = colDataGet(pRightColInfoData, pRightSource->src.rowIndex); + void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex); + void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex); switch(pLeftColInfoData->info.type) { case TSDB_DATA_TYPE_INT: { @@ -148,48 +148,15 @@ int32_t docomp(const void* p1, const void* p2, void* param) { } } // namespace -//TEST(testCase, inMem_sort_Test) { -// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); -// SOrder o = {.order = TSDB_ORDER_ASC}; -// o.col.info.colId = 1; -// o.col.info.type = TSDB_DATA_TYPE_INT; -// taosArrayPush(pOrderVal, &o); -// -// int32_t numOfRows = 1000; -// SBlockOrderInfo oi = {0}; -// oi.order = TSDB_ORDER_ASC; -// oi.colIndex = 0; -// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); -// taosArrayPush(orderInfo, &oi); -// -// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, "test_abc"); -// setFetchRawDataFp(phandle, getSingleColDummyBlock); -// sortAddSource(phandle, &numOfRows); -// -// int32_t code = sortOpen(phandle); -// int32_t row = 1; -// -// while(1) { -// STupleHandle* pTupleHandle = sortNextTuple(phandle); -// if (pTupleHandle == NULL) { -// break; -// } -// -// void* v = sortGetValue(pTupleHandle, 0); -// printf("%d: %d\n", row++, *(int32_t*) v); -// -// } -// destroySortHandle(phandle); -//} -// -TEST(testCase, external_mem_sort_Test) { +#if 0 +TEST(testCase, inMem_sort_Test) { SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); SOrder o = {.order = TSDB_ORDER_ASC}; o.col.info.colId = 1; o.col.info.type = TSDB_DATA_TYPE_INT; taosArrayPush(pOrderVal, &o); -// int32_t numOfRows = 1000; + int32_t numOfRows = 1000; SBlockOrderInfo oi = {0}; oi.order = TSDB_ORDER_ASC; oi.colIndex = 0; @@ -197,7 +164,40 @@ TEST(testCase, external_mem_sort_Test) { taosArrayPush(orderInfo, &oi); SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, }; + SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc"); + setFetchRawDataFp(phandle, getSingleColDummyBlock); + sortAddSource(phandle, &numOfRows); + int32_t code = sortOpen(phandle); + int32_t row = 1; + + while(1) { + STupleHandle* pTupleHandle = sortNextTuple(phandle); + if (pTupleHandle == NULL) { + break; + } + + void* v = sortGetValue(pTupleHandle, 0); + printf("%d: %d\n", row++, *(int32_t*) v); + + } + destroySortHandle(phandle); +} + +TEST(testCase, external_mem_sort_Test) { + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {.order = TSDB_ORDER_ASC}; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + + SBlockOrderInfo oi = {0}; + oi.order = TSDB_ORDER_ASC; + oi.colIndex = 0; + SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + taosArrayPush(orderInfo, &oi); + + SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, }; SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc"); setFetchRawDataFp(phandle, getSingleColDummyBlock); @@ -227,50 +227,52 @@ TEST(testCase, external_mem_sort_Test) { destroySortHandle(phandle); } -//TEST(testCase, ordered_merge_sort_Test) { -// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); -// SOrder o = {.order = TSDB_ORDER_ASC}; -// o.col.info.colId = 1; -// o.col.info.type = TSDB_DATA_TYPE_INT; -// taosArrayPush(pOrderVal, &o); -// -// int32_t numOfRows = 1000; -// SBlockOrderInfo oi = {0}; -// oi.order = TSDB_ORDER_ASC; -// oi.colIndex = 0; -// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); -// taosArrayPush(orderInfo, &oi); -// -// SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4}; -// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc"); -// setFetchRawDataFp(phandle, getSingleColDummyBlock); -// setComparFn(phandle, docomp); -// -// for(int32_t i = 0; i < 10; ++i) { -// SOperatorSource* p = static_cast(calloc(1, sizeof(SOperatorSource))); -// _info* c = static_cast<_info*>(calloc(1, sizeof(_info))); -// c->count = 1; -// c->pageRows = 1000; -// c->startVal = 0; -// -// p->param = c; -// sortAddSource(phandle, p); -// } -// -// int32_t code = sortOpen(phandle); -// int32_t row = 1; -// -// while(1) { -// STupleHandle* pTupleHandle = sortNextTuple(phandle); -// if (pTupleHandle == NULL) { -// break; -// } -// -// void* v = sortGetValue(pTupleHandle, 0); -// printf("%d: %d\n", row++, *(int32_t*) v); -// -// } -// destroySortHandle(phandle); -//} +TEST(testCase, ordered_merge_sort_Test) { + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {.order = TSDB_ORDER_ASC}; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + + int32_t numOfRows = 1000; + SBlockOrderInfo oi = {0}; + oi.order = TSDB_ORDER_ASC; + oi.colIndex = 0; + SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + taosArrayPush(orderInfo, &oi); + + SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4}; + SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc"); + setFetchRawDataFp(phandle, getSingleColDummyBlock); + setComparFn(phandle, docomp); + + for(int32_t i = 0; i < 10; ++i) { + SGenericSource* p = static_cast(calloc(1, sizeof(SGenericSource))); + _info* c = static_cast<_info*>(calloc(1, sizeof(_info))); + c->count = 1; + c->pageRows = 1000; + c->startVal = 0; + + p->param = c; + sortAddSource(phandle, p); + } + + int32_t code = sortOpen(phandle); + int32_t row = 1; + + while(1) { + STupleHandle* pTupleHandle = sortNextTuple(phandle); + if (pTupleHandle == NULL) { + break; + } + + void* v = sortGetValue(pTupleHandle, 0); + printf("%d: %d\n", row++, *(int32_t*) v); + + } + destroySortHandle(phandle); +} + +#endif #pragma GCC diagnostic pop diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index d0d89611c4..0615ff9627 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -30,9 +30,10 @@ #include "tcompression.h" //#include "queryLog.h" #include "tudf.h" +#include "tep.h" #define GET_INPUT_DATA_LIST(x) ((char *)((x)->pInput)) -#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes) +#define GET_INPUT_DATA(x, y) ((char*) colDataGetData((x)->pInput, (y))) #define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) #define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) @@ -3817,7 +3818,7 @@ static void interp_function_impl(SqlFunctionCtx *pCtx) { skey = ekey; } } - assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType); +// assignVal(pCtx->pOutput, pCtx->pInput, pCtx->resDataInfo.bytes, pCtx->inputType); } else if (type == TSDB_FILL_NEXT) { TSKEY ekey = skey; char* val = NULL; diff --git a/source/libs/parser/src/queryInfoUtil.c b/source/libs/parser/src/queryInfoUtil.c index 9a2ca2da98..9b355b0775 100644 --- a/source/libs/parser/src/queryInfoUtil.c +++ b/source/libs/parser/src/queryInfoUtil.c @@ -230,7 +230,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo) { } void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { - assert(dst != NULL && src != NULL); + assert(dst != NULL && src != NULL && src->base.numOfCols > 0); *dst = *src; #if 0 -- GitLab