diff --git a/include/common/tcommon.h b/include/common/tcommon.h index d0ec5c92960b3b2515c44a41b91372e05914af22..ea09c62819c7a99f2e13eb591e3290b8e5bc9332 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -56,8 +56,9 @@ typedef struct SColumnDataAgg { typedef struct SDataBlockInfo { STimeWindow window; int32_t rows; + int32_t tupleSize; int32_t numOfCols; - int64_t uid; + union {int64_t uid; int64_t blockId;}; } SDataBlockInfo; typedef struct SConstantItem { @@ -108,7 +109,7 @@ static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlo SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); tlen += taosEncodeFixedI16(buf, pColData->info.colId); tlen += taosEncodeFixedI16(buf, pColData->info.type); - tlen += taosEncodeFixedI16(buf, pColData->info.bytes); + tlen += taosEncodeFixedI32(buf, pColData->info.bytes); int32_t colSz = rows * pColData->info.bytes; tlen += taosEncodeBinary(buf, pColData->pData, colSz); } @@ -127,7 +128,7 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) SColumnInfoData data = {0}; buf = taosDecodeFixedI16(buf, &data.info.colId); buf = taosDecodeFixedI16(buf, &data.info.type); - buf = taosDecodeFixedI16(buf, &data.info.bytes); + buf = taosDecodeFixedI32(buf, &data.info.bytes); int32_t colSz = pBlock->info.rows * data.info.bytes; buf = taosDecodeBinary(buf, (void**)&data.pData, colSz); taosArrayPush(pBlock->pDataBlock, &data); @@ -212,10 +213,23 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) { //====================================================================================================================== // the following structure shared by parser and executor typedef struct SColumn { - uint64_t uid; - char name[TSDB_COL_NAME_LEN]; - int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) - SColumnInfo info; + union { + uint64_t uid; + int64_t dataBlockId; + }; + + char name[TSDB_COL_NAME_LEN]; + int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) + union { + int16_t colId; + int16_t slotId; + }; + + int16_t type; + int32_t bytes; + uint8_t precision; + uint8_t scale; + // SColumnInfo info; } SColumn; typedef struct SLimit { @@ -233,21 +247,24 @@ typedef struct SGroupbyExpr { bool groupbyTag; // group by tag or column } SGroupbyExpr; -// the structure for sql function in select clause -typedef struct SSqlExpr { - char token[TSDB_COL_NAME_LEN]; // original token - SSchema resSchema; +typedef struct SFunctParam { + int32_t type; + SColumn *pCol; + SVariant param; +} SFunctParam; - int32_t numOfCols; - SColumn* pColumns; // data columns that are required by query - int32_t interBytes; // inter result buffer size - int16_t numOfParams; // argument value of each function - SVariant param[3]; // parameters are not more than 3 -} SSqlExpr; +// the structure for sql function in select clause +typedef struct SExprBasicInfo { + SSchema resSchema; // TODO refactor + int32_t interBytes; // inter result buffer size, TODO remove it + int16_t numOfParams; // argument value of each function + SFunctParam *pParam; +// SVariant param[3]; // parameters are not more than 3 +} SExprBasicInfo; typedef struct SExprInfo { - struct SSqlExpr base; - struct tExprNode* pExpr; + struct SExprBasicInfo base; + struct tExprNode *pExpr; } SExprInfo; typedef struct SStateWindow { diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 602df8ca53c7eab4e7881598286b27a934a675dc..c2249f408aeb9eadb2b6ecac577e4d8b9a0a1508 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -94,8 +94,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); void colDataTrim(SColumnInfoData* pColumnInfoData); -size_t colDataGetNumOfCols(const SSDataBlock* pBlock); -size_t colDataGetNumOfRows(const SSDataBlock* pBlock); +size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); +size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f9794218be2ff1d861823859c13ef2116513ce98..ef36285ddc53506c5e65d181212443e17b096f36 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -424,9 +424,10 @@ typedef struct { int16_t slotId; }; - int16_t type; - int16_t bytes; - SColumnFilterList flist; + int16_t type; + int32_t bytes; + uint8_t precision; + uint8_t scale; } SColumnInfo; typedef struct { @@ -456,57 +457,6 @@ typedef struct { int64_t offset; } SInterval; -typedef struct { - SMsgHead head; - char version[TSDB_VERSION_LEN]; - - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool interpQuery; // interp query or not - bool groupbyColumn; // denote if this is a groupby normal column query - bool hasTagResults; // if there are tag values in final result or not - bool timeWindowInterpo; // if the time window start/end required interpolation - bool queryBlockDist; // if query data block distribution - bool stabledev; // super table stddev query - bool tsCompQuery; // is tscomp query - bool simpleAgg; - bool pointInterpQuery; // point interpolation query - bool needReverseScan; // need reverse scan - bool stateWindow; // state window flag - - STimeWindow window; - int32_t numOfTables; - int16_t order; - int16_t orderColId; - int16_t numOfCols; // the number of columns will be load from vnode - SInterval interval; - // SSessionWindow sw; // session window - int16_t tagCondLen; // tag length in current query - int16_t colCondLen; // column length in current query - int16_t numOfGroupCols; // num of group by columns - int16_t orderByIdx; - int16_t orderType; // used in group by xx order by xxx - int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. - int16_t prjOrder; // global order in super table projection query. - int64_t limit; - int64_t offset; - int32_t queryType; // denote another query process - int16_t numOfOutput; // final output columns numbers - int16_t fillType; // interpolate type - int64_t fillVal; // default value array list - int32_t secondStageOutput; - STsBufInfo tsBuf; // tsBuf info - int32_t numOfTags; // number of tags columns involved - int32_t sqlstrLen; // sql query string - int32_t prevResultLen; // previous result length - int32_t numOfOperator; - int32_t tableScanOperator; // table scan operator. -1 means no scan operator - int32_t udfNum; // number of udf function - int32_t udfContentOffset; - int32_t udfContentLen; - SColumnInfo tableCols[]; -} SQueryTableReq; - typedef struct { int32_t code; } SQueryTableRsp; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 62063ef99a67b961b852a3870600d4d4b850ed7b..21013ef906e092fddd350e3e0a23ed06d27c6a91 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -20,9 +20,30 @@ extern "C" { #endif +#include "tbuffer.h" #include "tcommon.h" #include "tvariant.h" -#include "tbuffer.h" + +struct SqlFunctionCtx; +struct SResultRowEntryInfo; + +typedef struct SFunctionNode SFunctionNode; + +typedef struct SFuncExecEnv { + int32_t calcMemSize; +} SFuncExecEnv; + +typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); +typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx); +typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); + +typedef struct SFuncExecFuncs { + FExecGetEnv getEnv; + FExecInit init; + FExecProcess process; + FExecFinalize finalize; +} SFuncExecFuncs; #define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results @@ -118,57 +139,58 @@ typedef struct SExtTagsInfo { } SExtTagsInfo; typedef struct SResultDataInfo { + int16_t precision; + int16_t scale; int16_t type; int16_t bytes; - int32_t intermediateBytes; + int32_t interBufSize; } SResultDataInfo; #define GET_RES_INFO(ctx) ((ctx)->resultInfo) -typedef struct SFunctionFpSet { - bool (*init)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment - void (*addInput)(struct SqlFunctionCtx *pCtx); - - // finalizer must be called after all exec has been executed to generated final result. - void (*finalize)(struct SqlFunctionCtx *pCtx); - void (*combine)(struct SqlFunctionCtx *pCtx); -} SFunctionFpSet; - -extern SFunctionFpSet fpSet[1]; +typedef struct SInputColumnInfoData { + int32_t totalRows; // total rows in current columnar data + int32_t startRowIndex; // handle started row index + int32_t numOfRows; // the number of rows needs to be handled + int32_t numOfInputCols; // PTS is not included + bool colDataAggIsSet;// if agg is set or not + SColumnInfoData *pPTS; // primary timestamp column + SColumnInfoData **pData; + SColumnDataAgg **pColumnDataAgg; +} SInputColumnInfoData; // sql function runtime context typedef struct SqlFunctionCtx { - int32_t startRow; - int32_t size; // number of rows + SInputColumnInfoData input; + SResultDataInfo resDataInfo; + uint32_t order; // asc|desc + //////////////////////////////////////////////////////////////// + int32_t startRow; // start row index + int32_t size; // handled processed row number SColumnInfoData* pInput; - - uint32_t order; // asc|desc - int16_t inputType; - int16_t inputBytes; - - SResultDataInfo resDataInfo; - bool hasNull; // null value exist in current block - bool requireNull; // require null in some function - bool stableQuery; - int16_t functionId; // function id - char * pOutput; // final result output buffer, point to sdata->data - uint8_t currentStage; // record current running step, default: 0 - int64_t startTs; // timestamp range of current query when function is executed on a specific data block - int32_t numOfParams; - SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param - int64_t *ptsList; // corresponding timestamp array list - void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ - SVariant tag; - - bool isAggSet; - SColumnDataAgg agg; + SColumnDataAgg agg; + int16_t inputType; // TODO remove it + int16_t inputBytes; // TODO remove it + bool hasNull; // null value exist in current block, TODO remove it + bool requireNull; // require null in some function, TODO remove it + int32_t columnIndex; // TODO remove it + uint8_t currentStage; // record current running step, default: 0 + bool isAggSet; + ///////////////////////////////////////////////////////////////// + bool stableQuery; + int16_t functionId; // function id + char * pOutput; // final result output buffer, point to sdata->data + int64_t startTs; // timestamp range of current query when function is executed on a specific data block + int32_t numOfParams; + SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param + int64_t *ptsList; // corresponding timestamp array list + void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ + SVariant tag; struct SResultRowEntryInfo *resultInfo; - SExtTagsInfo tagInfo; - SPoint1 start; - SPoint1 end; - - int32_t columnIndex; - SFunctionFpSet* fpSet; + SExtTagsInfo tagInfo; + SPoint1 start; + SPoint1 end; + SFuncExecFuncs fpSet; } SqlFunctionCtx; enum { @@ -194,9 +216,10 @@ typedef struct tExprNode { struct SVariant *pVal; // value node struct {// function node - char functionName[FUNCTIONS_NAME_MAX_LENGTH]; + char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor + int32_t functionId; int32_t num; - + SFunctionNode *pFunctNode; // Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the // calculation instead. // E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes. @@ -207,7 +230,6 @@ typedef struct tExprNode { }; } tExprNode; -//TODO create? void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree); void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)); @@ -294,7 +316,7 @@ tExprNode* exprdup(tExprNode* pTree); void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num); void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell); -int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num); +int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock); bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry); diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e77ccaa225e16195e4ce4758deedee2ddf3eead2..500b1491d7cca813a86ab5b9ce006313dbefbc5f 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -102,22 +102,6 @@ struct SqlFunctionCtx; struct SResultRowEntryInfo; struct STimeWindow; -typedef struct SFuncExecEnv { - int32_t calcMemSize; -} SFuncExecEnv; - -typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv); -typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); -typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx); -typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); - -typedef struct SFuncExecFuncs { - FExecGetEnv getEnv; - FExecInit init; - FExecProcess process; - FExecFinalize finalize; -} SFuncExecFuncs; - typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); typedef struct SScalarFuncExecFuncs { diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1128a38840941f244751739b4fa1cc71960f9026..2cd51e94431d3cae503e422dadb821695379ef49 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -435,11 +435,11 @@ static int32_t hbCreateThread() { pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pthread_attr_destroy(&thAttr); +// if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { +// terrno = TAOS_SYSTEM_ERROR(errno); +// return -1; +// } +// pthread_attr_destroy(&thAttr); return 0; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 7bba0935882bcac5b784dee40a6b4b8f24bc6ae9..286cb99cb9d3cc4381bc3a7bdd7b65e6c314ca99 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -53,7 +53,7 @@ TEST(testCase, driverInit_Test) { // taos_init(); } -#if 1 +#if 0 TEST(testCase, connect_Test) { // taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); @@ -571,15 +571,15 @@ TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); +// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); +// if (taos_errno(pRes) != 0) { +// printf("error in create db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); - pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - +#if 0 pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); if (taos_errno(pRes) != 0) { printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); @@ -592,7 +592,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 10000000; ++i) { + for(int32_t i = 0; i < 10000; ++i) { char sql[512] = {0}; sprintf(sql, "insert into tu values(now+%da, %d)", i, i); TAOS_RES* p = taos_query(pConn, sql); @@ -602,8 +602,9 @@ TEST(testCase, projection_query_tables) { taos_free_result(p); } +#endif - pRes = taos_query(pConn, "select * from tu"); + pRes = taos_query(pConn, "select count(ts) from tu"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -623,8 +624,8 @@ TEST(testCase, projection_query_tables) { taos_free_result(pRes); taos_close(pConn); } -#if 0 +#if 0 TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5b7557b749cc1b7f1565fd5577758812f0642058..4070224ab83313f03c3821edf4945b090352f1cc 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -239,7 +239,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co return numOfRow1 + numOfRow2; } -size_t colDataGetNumOfCols(const SSDataBlock* pBlock) { +size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { ASSERT(pBlock); size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0; @@ -247,7 +247,7 @@ size_t colDataGetNumOfCols(const SSDataBlock* pBlock) { return pBlock->info.numOfCols; } -size_t colDataGetNumOfRows(const SSDataBlock* pBlock) { +size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index 7d54e5150c6c86b0bb909e7e8962ca36700997d5..d5fb61929a94109331e955e492cb3333b09c961a 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -160,8 +160,8 @@ TEST(testCase, Datablock_test) { printf("binary column length:%d\n", *(int32_t*) p1->pData); - ASSERT_EQ(colDataGetNumOfCols(b), 2); - ASSERT_EQ(colDataGetNumOfRows(b), 40); + ASSERT_EQ(blockDataGetNumOfCols(b), 2); + ASSERT_EQ(blockDataGetNumOfRows(b), 40); char* pData = colDataGetData(p1, 3); printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4bd0ee959e0db8e9b84e7e02566e497e15dcb066..97c52f44ebbbb56181717ab4a308d0efda9ed8f9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -13,18 +13,19 @@ * along with this program. If not, see . */ +#include +#include "os.h" +#include "talgo.h" +#include "tcompare.h" +#include "tdataformat.h" +#include "texception.h" #include "tsdb.h" #include "tsdbDef.h" #include "tsdbFS.h" #include "tsdbLog.h" #include "tsdbReadImpl.h" -#include "ttime.h" -#include "texception.h" -#include "os.h" -#include "talgo.h" -#include "tcompare.h" -#include "tdataformat.h" #include "tskiplist.h" +#include "ttime.h" #include "taosdef.h" #include "tlosertree.h" @@ -1472,6 +1473,8 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t return numOfRows + num; } +// TODO fix bug for reverse copy data +// TODO handle the null data // Note: row1 always has high priority static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1, STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2, @@ -1515,7 +1518,6 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } } - int32_t i = 0, j = 0, k = 0; while(i < numOfCols && (j < numOfColsOfRow1 || k < numOfColsOfRow2)) { SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); @@ -1586,7 +1588,6 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit tdSKvRowGetVal(row, colId, offset, chosen_itr, &sVal); } - if (colId == pColInfo->info.colId) { if (tdValTypeIsNorm(sVal.valType)) { switch (pColInfo->info.type) { @@ -1594,7 +1595,6 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit case TSDB_DATA_TYPE_NCHAR: memcpy(pData, sVal.val, varDataTLen(sVal.val)); break; - case TSDB_DATA_TYPE_NULL: case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: @@ -1625,11 +1625,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit memcpy(pData, sVal.val, pColInfo->info.bytes); } } else if (forceSetNull) { - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } + colDataAppend(pColInfo, numOfRows, NULL, true); } i++; @@ -1640,11 +1636,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit } } else { if(forceSetNull) { - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } + colDataAppend(pColInfo, numOfRows, NULL, true); } i++; } @@ -1653,18 +1645,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit if(forceSetNull) { while (i < numOfCols) { // the remain columns are all null data SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = (char*)pColInfo->pData + (capacity - numOfRows - 1) * pColInfo->info.bytes; - } - - if (pColInfo->info.type == TSDB_DATA_TYPE_BINARY || pColInfo->info.type == TSDB_DATA_TYPE_NCHAR) { - setVardataNull(pData, pColInfo->info.type); - } else { - setNull(pData, pColInfo->info.type, pColInfo->info.bytes); - } - + colDataAppend(pColInfo, numOfRows, NULL, true); i++; } } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 17f457e9918bee0fe50bd252072d78a8c3c172c9..f35cfaee70fbb63c7174d42a4ca41e696c1d02f6 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -111,7 +111,6 @@ void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultR struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr); -void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols); int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable); static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 991cd372c3f4dcd286fb0ab9931cadd3cdd7eca5..1cc25938deb979a4e5971be971304c6193c9968a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -362,8 +362,8 @@ typedef struct STaskParam { char* tbnameCond; char* prevResult; SArray* pTableIdList; - SSqlExpr** pExpr; - SSqlExpr** pSecExpr; + SExprBasicInfo** pExpr; + SExprBasicInfo** pSecExpr; SExprInfo* pExprs; SExprInfo* pSecExprs; @@ -448,7 +448,6 @@ typedef struct SOptrBasicInfo { int32_t* rowCellInfoOffset; // offset value for each row result cell info SqlFunctionCtx* pCtx; SSDataBlock* pRes; - uint32_t resRowSize; int32_t capacity; } SOptrBasicInfo; @@ -617,8 +616,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); @@ -650,8 +649,6 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper int32_t numOfOutput); SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); -SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); @@ -676,16 +673,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg, - SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); - -int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, - SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo* pUdfInfo); - int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters); -SGroupbyExpr* createGroupbyExprFromMsg(SQueryTableReq* pQueryMsg, SColIndex* pColIndex, int32_t* code); - int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ea97f10dd5cdbb5aa1c14ac8871c5d11df1a70fa..fcb98ea167a0c23f890fe9c7c203fea45c0ea5dd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,6 +13,9 @@ * along with this program. If not, see . */ +#include +#include +#include #include "os.h" #include "parser.h" @@ -265,7 +268,7 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, STaskRuntimeEn return; } - int32_t orderId = pRuntimeEnv->pQueryAttr->order.col.info.colId; + int32_t orderId = pRuntimeEnv->pQueryAttr->order.col.colId; if (orderId <= 0) { return; } @@ -337,17 +340,26 @@ SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) { SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); + SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock)); pBlock->info.numOfCols = numOfCols; pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pBlock->info.blockId = pNode->dataBlockId; + for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); - idata.info.type = pDescNode->dataType.type; - idata.info.bytes = pDescNode->dataType.bytes; + idata.info.type = pDescNode->dataType.type; + idata.info.bytes = pDescNode->dataType.bytes; + idata.info.scale = pDescNode->dataType.scale; idata.info.slotId = pDescNode->slotId; + idata.info.precision = pDescNode->dataType.precision; + + taosArrayPush(pBlock->pDataBlock, &idata); } + + return pBlock; } static bool isSelectivityWithTagsQuery(SqlFunctionCtx *pCtx, int32_t numOfOutput) { @@ -382,7 +394,7 @@ static bool isProjQuery(STaskAttr *pQueryAttr) { } static bool hasNull(SColumn* pColumn, SColumnDataAgg *pStatis) { - if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { return false; } @@ -989,7 +1001,7 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of } if (functionNeedToExecute(&pCtx[k])) { - pCtx[k].fpSet->addInput(&pCtx[k]); +// pCtx[k].fpSet.process(&pCtx[k]); } // restore it @@ -1134,91 +1146,82 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC } void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { -// if (pCtx[0].functionId == FUNCTION_ARITHM) { -// SScalar* pSupport = (SScalarFunctionSupport*) pCtx[0].param[1].pz; -// if (pSupport->colList == NULL) { -// doSetInputDataBlock(pOperator, pCtx, pBlock, order); -// } else { -// doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); -// } -// } else { - if (pBlock->pDataBlock != NULL) { - doSetInputDataBlock(pOperator, pCtx, pBlock, order); - } else { - doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); - } -// } + if (pBlock->pDataBlock != NULL) { + doSetInputDataBlock(pOperator, pCtx, pBlock, order); + } else { + doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); + } } static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; - pCtx[i].currentStage = MAIN_SCAN/*(uint8_t)pOperator->pRuntimeEnv->scanFlag*/; - - setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns); - - if (pCtx[i].functionId == FUNCTION_ARITHM) { -// setArithParams((SScalarFunctionSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); - } else { - uint32_t flag = pOperator->pExpr[i].base.pColumns->flag; - if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) || - (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) { - - SColumn* pCol = pOperator->pExpr[i].base.pColumns; - if (pCtx[i].columnIndex == -1) { - for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); - if (pColData->info.colId == pCol->info.colId) { - pCtx[i].columnIndex = j; - break; - } - } - } - - SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pCtx[i].columnIndex); - // in case of the block distribution query, the inputBytes is not a constant value. - pCtx[i].pInput = p; - assert(p->info.colId == pCol->info.colId); - - if (pCtx[i].functionId < 0) { - SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); - pCtx[i].ptsList = (int64_t*) tsInfo->pData; - - continue; - } - -// uint32_t status = aAggs[pCtx[i].functionId].status; -// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { -// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); - // In case of the top/bottom query again the nest query result, which has no timestamp column - // don't set the ptsList attribute. -// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { -// pCtx[i].ptsList = (int64_t*) tsInfo->pData; -// } else { -// pCtx[i].ptsList = NULL; -// } -// } -// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { -// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; -// SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); -// -// pCtx[i].pInput = p->pData; -// assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); -// for(int32_t j = 0; j < pBlock->info.rows; ++j) { -// char* dst = p->pData + j * p->info.bytes; -// taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true); -// } - } - } - } -} - -static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { + pCtx[i].currentStage = MAIN_SCAN; + + // setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns); + int32_t slotId = pOperator->pExpr[i].base.pParam[0].pCol->slotId; + + // uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag; + // if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) || + // (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) { + + // SColumn* pCol = pOperator->pExpr[i].base.pParam[0].pCol; + // if (pCtx[i].columnIndex == -1) { + // for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + // SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j); + // if (pColData->info.colId == pCol->colId) { + // pCtx[i].columnIndex = j; + // break; + // } + // } + // } + + // in case of the block distribution query, the inputBytes is not a constant value. + pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId); + pCtx[i].input.totalRows = pBlock->info.rows; + pCtx[i].input.numOfRows = pBlock->info.rows; + pCtx[i].input.startRowIndex = 0; + + ASSERT(pCtx[i].input.pData[0] != NULL); + + // if (pCtx[i].functionId < 0) { + // SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); + // pCtx[i].ptsList = (int64_t*) tsInfo->pData; + + // continue; + // } + + // uint32_t status = aAggs[pCtx[i].functionId].status; + // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { + // SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); + // In case of the top/bottom query again the nest query result, which has no timestamp column + // don't set the ptsList attribute. + // if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { + // pCtx[i].ptsList = (int64_t*) tsInfo->pData; + // } else { + // pCtx[i].ptsList = NULL; + // } + // } + // } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { + // SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; + // SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); + // + // pCtx[i].pInput = p->pData; + // assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type); + // for(int32_t j = 0; j < pBlock->info.rows; ++j) { + // char* dst = p->pData + j * p->info.bytes; + // taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true); + // } + // } + } +} + +static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs;// this can be set during create the struct - pCtx[k].fpSet->addInput(&pCtx[k]); + pCtx[k].fpSet.process(&pCtx[k]); } } } @@ -1878,7 +1881,7 @@ void setBlockStatisInfo(SqlFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColumn* pCtx->hasNull = hasNull(pColumn, pAgg); // set the statistics data for primary time stamp column - if (pCtx->functionId == FUNCTION_SPREAD && pColumn->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { pCtx->isAggSet = true; pCtx->agg.min = pSDataBlock->info.window.skey; pCtx->agg.max = pSDataBlock->info.window.ekey; @@ -1943,10 +1946,10 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI } for (int32_t i = 0; i < numOfOutput; ++i) { - SSqlExpr *pSqlExpr = &pExpr[i].base; + SExprBasicInfo *pFunct = &pExpr[i].base; SqlFunctionCtx* pCtx = &pFuncCtx[i]; #if 0 - SColIndex *pIndex = &pSqlExpr->colInfo; + SColIndex *pIndex = &pFunct->colInfo; if (TSDB_COL_REQ_NULL(pIndex->flag)) { pCtx->requireNull = true; @@ -1955,33 +1958,30 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI pCtx->requireNull = false; } #endif -// pCtx->inputBytes = pSqlExpr->colBytes; -// pCtx->inputType = pSqlExpr->colType; +// pCtx->inputBytes = pFunct->colBytes; +// pCtx->inputType = pFunct->colType; pCtx->ptsOutputBuf = NULL; - pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes; - pCtx->resDataInfo.type = pSqlExpr->resSchema.type; + pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; + pCtx->resDataInfo.type = pFunct->resSchema.type; pCtx->order = pQueryAttr->order.order; -// pCtx->functionId = pSqlExpr->functionId; +// pCtx->functionId = pFunct->functionId; pCtx->stableQuery = pQueryAttr->stableQuery; - pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; + pCtx->resDataInfo.interBufSize = pFunct->interBytes; pCtx->start.key = INT64_MIN; pCtx->end.key = INT64_MIN; - pCtx->numOfParams = pSqlExpr->numOfParams; + pCtx->numOfParams = pFunct->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { - int16_t type = pSqlExpr->param[j].nType; - int16_t bytes = pSqlExpr->param[j].nLen; -// if (pSqlExpr->functionId == FUNCTION_STDDEV_DST) { -// continue; -// } + int16_t type = pFunct->pParam[j].param.nType; + int16_t bytes = pFunct->pParam[j].param.nType; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - taosVariantCreateFromBinary(&pCtx->param[j], pSqlExpr->param[j].pz, bytes, type); +// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type); } else { - taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlExpr->param[j].i, bytes, type); +// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type); } } @@ -1997,7 +1997,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI pCtx->param[3].i = functionId; pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - pCtx->param[1].i = pQueryAttr->order.col.info.colId; + pCtx->param[1].i = pQueryAttr->order.col.colId; } else if (functionId == FUNCTION_INTERP) { pCtx->param[2].i = (int8_t)pQueryAttr->fillType; if (pQueryAttr->fillVal != NULL) { @@ -2031,7 +2031,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI return pFuncCtx; } -static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset, uint32_t* pRowSize) { +static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset) { size_t numOfOutput = taosArrayGetSize(pExprInfo); SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx)); @@ -2048,54 +2048,41 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC for (int32_t i = 0; i < numOfOutput; ++i) { SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); - SSqlExpr *pSqlExpr = &pExpr->base; + SExprBasicInfo *pFunct = &pExpr->base; SqlFunctionCtx* pCtx = &pFuncCtx[i]; -#if 0 - SColIndex *pIndex = &pSqlExpr->colInfo; + fmGetFuncExecFuncs(pExpr->pExpr->_function.functionId, &pCtx->fpSet); + pCtx->input.numOfInputCols = pFunct->numOfParams; - if (TSDB_COL_REQ_NULL(pIndex->flag)) { - pCtx->requireNull = true; - pIndex->flag &= ~(TSDB_COL_NULL); - } else { - pCtx->requireNull = false; - } -#endif -// pCtx->inputBytes = pSqlExpr->; -// pCtx->inputType = pSqlExpr->colType; + pCtx->input.pData = calloc(pFunct->numOfParams, POINTER_BYTES); + pCtx->input.pColumnDataAgg = calloc(pFunct->numOfParams, POINTER_BYTES); - pCtx->ptsOutputBuf = NULL; - pCtx->fpSet = fpSet; - pCtx->columnIndex = -1; - pCtx->resDataInfo.bytes = pSqlExpr->resSchema.bytes; - pCtx->resDataInfo.type = pSqlExpr->resSchema.type; + pCtx->ptsOutputBuf = NULL; + pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; + pCtx->resDataInfo.type = pFunct->resSchema.type; + pCtx->order = TSDB_ORDER_ASC; +// pCtx->functionId = pExpr->pExpr->_function.pFunctNode->;//TODO remove it + pCtx->stableQuery = false; // TODO + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; - pCtx->order = TSDB_ORDER_ASC; - if (i == 0) { - pCtx->functionId = FUNCTION_TS; - } - -// pCtx->functionId = pSqlExpr->functionId; -// pCtx->stableQuery = pQueryAttr->stableQuery; - pCtx->resDataInfo.intermediateBytes = pSqlExpr->interBytes; - pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; - - pCtx->numOfParams = pSqlExpr->numOfParams; + SFuncExecEnv env = {0}; + pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); + pCtx->resDataInfo.interBufSize = env.calcMemSize; +#if 0 for (int32_t j = 0; j < pCtx->numOfParams; ++j) { - int16_t type = pSqlExpr->param[j].nType; - int16_t bytes = pSqlExpr->param[j].nLen; +// int16_t type = pFunct->param[j].nType; +// int16_t bytes = pFunct->param[j].nLen; - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - taosVariantCreateFromBinary(&pCtx->param[j], pSqlExpr->param[j].pz, bytes, type); - } else { - taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pSqlExpr->param[j].i, bytes, type); - } +// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { +// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type); +// } else { +// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type); +// } } // set the order information for top/bottom query int32_t functionId = pCtx->functionId; - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { int32_t f = getExprFunctionId(&pExpr[0]); assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); @@ -2128,12 +2115,12 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC } else if (functionId == FUNCTION_ARITHM) { // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); } +#endif } for(int32_t i = 1; i < numOfOutput; ++i) { SExprInfo* pExpr = taosArrayGetP(pExprInfo, i - 1); (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr->base.interBytes); - *pRowSize += pExpr->base.resSchema.bytes; } setCtxTagColumnInfo(pFuncCtx, numOfOutput); @@ -2303,7 +2290,7 @@ void setTaskKilled(SExecTaskInfo *pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q // } // // for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { -// SSqlExpr *pExpr = &pQueryAttr->pExpr1[i].base; +// SExprBasicInfo *pExpr = &pQueryAttr->pExpr1[i].base; // // if (pExpr->functionId == FUNCTION_TS || pExpr->functionId == FUNCTION_TS_DUMMY) { // continue; @@ -2482,101 +2469,101 @@ static void doUpdateLastKey(STaskAttr* pQueryAttr) { } } -static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) { - STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - - // in case of point-interpolation query, use asc order scan - char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 - "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64; - - // todo handle the case the the order irrelevant query type mixed up with order critical query type - // descending order query for last_row query - if (isFirstLastRowQuery(pQueryAttr)) { - //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId, pQueryAttr->order.order, TSDB_ORDER_ASC); - - pQueryAttr->order.order = TSDB_ORDER_ASC; - if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { - TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); - } - - pQueryAttr->needReverseScan = false; - return; - } - - if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) { - pQueryAttr->order.order = TSDB_ORDER_ASC; - if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { - TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); - } - - pQueryAttr->needReverseScan = false; - doUpdateLastKey(pQueryAttr); - return; - } - - if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) { - if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { - //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); - TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); - } - - pQueryAttr->order.order = TSDB_ORDER_ASC; - return; - } - - if (pQueryAttr->interval.interval == 0) { - if (onlyFirstQuery(pQueryAttr)) { - if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { - //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, -// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); - - TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); - doUpdateLastKey(pQueryAttr); - } - - pQueryAttr->order.order = TSDB_ORDER_ASC; - pQueryAttr->needReverseScan = false; - } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) { - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { - //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey, -// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); - - TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); - doUpdateLastKey(pQueryAttr); - } - - pQueryAttr->order.order = TSDB_ORDER_DESC; - pQueryAttr->needReverseScan = false; - } - - } else { // interval query - if (stableQuery) { - if (onlyFirstQuery(pQueryAttr)) { - if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { - //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC, -// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); - - TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); - doUpdateLastKey(pQueryAttr); - } - - pQueryAttr->order.order = TSDB_ORDER_ASC; - pQueryAttr->needReverseScan = false; - } else if (onlyLastQuery(pQueryAttr)) { - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { - //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC, -// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); - - TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); - doUpdateLastKey(pQueryAttr); - } - - pQueryAttr->order.order = TSDB_ORDER_DESC; - pQueryAttr->needReverseScan = false; - } - } - } -} +//static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) { +// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; +// +// // in case of point-interpolation query, use asc order scan +// char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 +// "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64; +// +// // todo handle the case the the order irrelevant query type mixed up with order critical query type +// // descending order query for last_row query +// if (isFirstLastRowQuery(pQueryAttr)) { +// //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId, pQueryAttr->order.order, TSDB_ORDER_ASC); +// +// pQueryAttr->order.order = TSDB_ORDER_ASC; +// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// } +// +// pQueryAttr->needReverseScan = false; +// return; +// } +// +// if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) { +// pQueryAttr->order.order = TSDB_ORDER_ASC; +// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// } +// +// pQueryAttr->needReverseScan = false; +// doUpdateLastKey(pQueryAttr); +// return; +// } +// +// if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) { +// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { +// //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// } +// +// pQueryAttr->order.order = TSDB_ORDER_ASC; +// return; +// } +// +// if (pQueryAttr->interval.interval == 0) { +// if (onlyFirstQuery(pQueryAttr)) { +// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { +// //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, +//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); +// +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// doUpdateLastKey(pQueryAttr); +// } +// +// pQueryAttr->order.order = TSDB_ORDER_ASC; +// pQueryAttr->needReverseScan = false; +// } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) { +// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { +// //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey, +//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); +// +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// doUpdateLastKey(pQueryAttr); +// } +// +// pQueryAttr->order.order = TSDB_ORDER_DESC; +// pQueryAttr->needReverseScan = false; +// } +// +// } else { // interval query +// if (stableQuery) { +// if (onlyFirstQuery(pQueryAttr)) { +// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { +// //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC, +//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); +// +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// doUpdateLastKey(pQueryAttr); +// } +// +// pQueryAttr->order.order = TSDB_ORDER_ASC; +// pQueryAttr->needReverseScan = false; +// } else if (onlyLastQuery(pQueryAttr)) { +// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { +// //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC, +//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); +// +// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey, TSKEY); +// doUpdateLastKey(pQueryAttr); +// } +// +// pQueryAttr->order.order = TSDB_ORDER_DESC; +// pQueryAttr->needReverseScan = false; +// } +// } +// } +//} static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -2879,7 +2866,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData int32_t numOfOutput = pTableScanInfo->numOfOutput; for (int32_t i = 0; i < numOfOutput; ++i) { int32_t functionId = pCtx[i].functionId; - int32_t colId = pTableScanInfo->pExpr[i].base.pColumns->info.colId; + int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId; // group by + first/last should not apply the first/last block filter if (functionId < 0) { @@ -3207,11 +3194,12 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SqlFunctionCtx* pCt if (pQueryAttr->numOfOutput == 1 && functionId == FUNCTION_TS_COMP && pQueryAttr->stableQuery) { assert(pExprInfo->base.numOfParams == 1); - int16_t tagColId = (int16_t)pExprInfo->base.param[0].i; +// int16_t tagColId = (int16_t)pExprInfo->base.param[0].i; + int16_t tagColId = -1; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId); doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); - return; + } else { // set tag value, by which the results are aggregated. int32_t offset = 0; @@ -3221,12 +3209,12 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SqlFunctionCtx* pCt SExprInfo* pLocalExprInfo = &pExpr[idx]; // ts_comp column required the tag value for join filter - if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.pColumns->flag)) { + if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.pParam[0].pCol->flag)) { continue; } // todo use tag column index to optimize performance - doSetTagValueInParam(pTable, pLocalExprInfo->base.pColumns->info.colId, &pCtx[idx].tag, pLocalExprInfo->base.resSchema.type, + doSetTagValueInParam(pTable, pLocalExprInfo->base.pParam[0].pCol->colId, &pCtx[idx].tag, pLocalExprInfo->base.resSchema.type, pLocalExprInfo->base.resSchema.bytes); if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resSchema.type) @@ -3344,9 +3332,9 @@ int32_t initResultRow(SResultRow *pResultRow) { /* * The start of each column SResultRowEntryInfo is denote by RowCellInfoOffset. * Note that in case of top/bottom query, the whole multiple rows of result is treated as only one row of results. - * +------------+-----------------result column 1-----------+-----------------result column 2-----------+ - * + SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2| - * +------------+-------------------------------------------+-------------------------------------------+ + * +------------+-----------------result column 1------------+------------------result column 2-----------+ + * | SResultRow | SResultRowEntryInfo | intermediate buffer1 | SResultRowEntryInfo | intermediate buffer 2| + * +------------+--------------------------------------------+--------------------------------------------+ * offset[0] offset[1] offset[2] */ void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) { @@ -3383,8 +3371,9 @@ void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, in initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); } + // TODO refactor: some function move away -void setDefaultOutputBuf_rv(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) { +void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, SExecTaskInfo* pTaskInfo) { SqlFunctionCtx* pCtx = pInfo->pCtx; SSDataBlock* pDataBlock = pInfo->pRes; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; @@ -3407,7 +3396,6 @@ void setDefaultOutputBuf_rv(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t pCtx[i].resultInfo = pEntry; pCtx[i].pOutput = pData->pData; pCtx[i].currentStage = stage; - assert(pCtx[i].pOutput != NULL); // set the timestamp output buffer for top/bottom/diff query int32_t fid = pCtx[i].functionId; @@ -3504,7 +3492,7 @@ void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { continue; } - pCtx[j].fpSet->init(&pCtx[j], pCtx[j].resultInfo); + pCtx[j].fpSet.init(&pCtx[j], pCtx[j].resultInfo); } } @@ -3577,7 +3565,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResult // if (pCtx[j].functionId < 0) { // doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); // } else { - pCtx[j].fpSet->finalize(&pCtx[j]); + pCtx[j].fpSet.finalize(&pCtx[j]); // } } // } @@ -3721,7 +3709,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.resRowSize); + int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.pRes->info.tupleSize); if (ret != TSDB_CODE_SUCCESS) { return; } @@ -3769,7 +3757,7 @@ void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFu void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - SSqlExpr* pExpr = &pExprInfo->base; + SExprBasicInfo* pExpr = &pExprInfo->base; // if (pQueryAttr->stableQuery && (pRuntimeEnv->pTsBuf != NULL) && // (pExpr->functionId == FUNCTION_TS || pExpr->functionId == FUNCTION_PRJ) && // (pExpr->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_ID)) { @@ -3845,7 +3833,7 @@ void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, continue; } - SSqlExpr* pExpr = &pExprInfo1->base; + SExprBasicInfo* pExpr = &pExprInfo1->base; pCtx[i].param[0].arr = NULL; pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int @@ -3874,7 +3862,7 @@ void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionC #if 0 int32_t numOfExprs = pQueryAttr->numOfOutput; for(int32_t i = 0; i < numOfExprs; ++i) { - SSqlExpr* pExpr1 = &pExpr[i].base; + SExprBasicInfo* pExpr1 = &pExpr[i].base; if (pExpr1->functionId != FUNCTION_STDDEV_DST) { continue; } @@ -5550,10 +5538,10 @@ SArray* getOrderCheckColumns(STaskAttr* pQuery) { for(int32_t i = 0; i < numOfCols; ++i) { SColIndex* index = taosArrayGet(pOrderColumns, i); for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SSqlExpr* pExpr = &pQuery->pExpr1[j].base; + SExprBasicInfo* pExpr = &pQuery->pExpr1[j].base; int32_t functionId = getExprFunctionId(&pQuery->pExpr1[j]); - if (index->colId == pExpr->pColumns->info.colId && + if (index->colId == pExpr->pParam[0].pCol->colId && (functionId == FUNCTION_PRJ || functionId == FUNCTION_TAG || functionId == FUNCTION_TS)) { index->colIndex = j; index->colId = pExpr->resSchema.colId; @@ -5580,18 +5568,18 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { bool found = false; for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SSqlExpr* pExpr = &pQuery->pExpr1[j].base; + SExprBasicInfo* pExpr = &pQuery->pExpr1[j].base; int32_t functionId = getExprFunctionId(&pQuery->pExpr1[j]); // FUNCTION_TAG_DUMMY function needs to be ignored - if (index->colId == pExpr->pColumns->info.colId && - ((TSDB_COL_IS_TAG(pExpr->pColumns->flag) && functionId == FUNCTION_TAG) || - (TSDB_COL_IS_NORMAL_COL(pExpr->pColumns->flag) && functionId == FUNCTION_PRJ))) { - index->colIndex = j; - index->colId = pExpr->resSchema.colId; - found = true; - break; - } +// if (index->colId == pExpr->pColumns->info.colId && +// ((TSDB_COL_IS_TAG(pExpr->pColumns->flag) && functionId == FUNCTION_TAG) || +// (TSDB_COL_IS_NORMAL_COL(pExpr->pColumns->flag) && functionId == FUNCTION_PRJ))) { +// index->colIndex = j; +// index->colId = pExpr->resSchema.colId; +// found = true; +// break; +// } } assert(found && index->colIndex >= 0 && index->colIndex < pQuery->numOfOutput); @@ -5600,7 +5588,7 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { return pOrderColumns; } -static int32_t initAggSup(SAggSupporter* pAggSup, SArray* pExprInfo); +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SArray* pExprInfo); static void clearupAggSup(SAggSupporter* pAggSup); static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { @@ -5629,13 +5617,13 @@ static void assignExprInfo(SExprInfo* dst, const SExprInfo* src) { *dst = *src; dst->pExpr = exprdup(src->pExpr); - dst->base.pColumns = calloc(src->base.numOfCols, sizeof(SColumn)); - memcpy(dst->base.pColumns, src->base.pColumns, sizeof(SColumn) * src->base.numOfCols); + dst->base.pParam = calloc(src->base.numOfParams, sizeof(SColumn)); + memcpy(dst->base.pParam, src->base.pParam, sizeof(SColumn) * src->base.numOfParams); - memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param)); - for (int32_t j = 0; j < src->base.numOfParams; ++j) { - taosVariantAssign(&dst->base.param[j], &src->base.param[j]); - } +// memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param)); +// for (int32_t j = 0; j < src->base.numOfParams; ++j) { +// taosVariantAssign(&dst->base.param[j], &src->base.param[j]); +// } } static SExprInfo* exprArrayDup(SArray* pExprList) { @@ -5734,7 +5722,7 @@ static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx *p for (int32_t j = 0; j < numOfExpr; ++j) { int32_t functionId = pCtx[j].functionId; - pCtx[j].fpSet->addInput(&pCtx[j]); +// pCtx[j].fpSet->addInput(&pCtx[j]); // if (functionId < 0) { // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); @@ -5757,7 +5745,7 @@ static void doFinalizeResultImpl(SqlFunctionCtx *pCtx, int32_t numOfExpr) { // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); // doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); // } else { - pCtx[j].fpSet->addInput(&pCtx[j]); + pCtx[j].fpSet.finalize(&pCtx[j]); } } @@ -5793,7 +5781,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock doMergeResultImpl(pInfo, pCtx, numOfExpr, i); } else { doFinalizeResultImpl(pCtx, numOfExpr); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); // TODO check for available buffer; @@ -5805,7 +5793,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock continue; } - pCtx[j].fpSet->addInput(&pCtx[j]); + pCtx[j].fpSet.process(&pCtx[j]); } doMergeResultImpl(pInfo, pCtx, numOfExpr, i); @@ -5849,7 +5837,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { } doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfOutput); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); // TODO check for available buffer; @@ -5905,7 +5893,7 @@ static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { SExprInfo* pExpr = taosArrayGet(pExprInfo, i); - if (pExpr->base.resSchema.colId == pOrder->col.info.colId) { + if (pExpr->base.resSchema.colId == pOrder->col.colId) { orderInfo.colIndex = i; break; } @@ -5935,10 +5923,10 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO SColumn* pCol = taosArrayGet(pGroupInfo, i); for(int32_t j = 0; j < taosArrayGetSize(pExprInfo); ++j) { SExprInfo* pe = taosArrayGet(pExprInfo, j); - if (pe->base.resSchema.colId == pCol->info.colId) { + if (pe->base.resSchema.colId == pCol->colId) { taosArrayPush(plist, pCol); taosArrayPush(pInfo->groupInfo, &j); - len += pCol->info.bytes; + len += pCol->bytes; break; } } @@ -5957,7 +5945,7 @@ static int32_t initGroupCol(SArray* pExprInfo, SArray* pGroupInfo, SSortedMergeO for(int32_t i = 0; i < numOfGroupCol; ++i) { pInfo->groupVal[i] = start + offset; SColumn* pCol = taosArrayGet(plist, i); - offset += pCol->info.bytes; + offset += pCol->bytes; } taosArrayDestroy(plist); @@ -5973,7 +5961,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t } int32_t numOfOutput = taosArrayGetSize(pExprInfo); - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); @@ -5981,12 +5969,12 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t goto _error; } - int32_t code = initAggSup(&pInfo->aggSup, pExprInfo); + int32_t code = doInitAggInfoSup(&pInfo->aggSup, pExprInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } - setDefaultOutputBuf_rv(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); code = initGroupCol(pExprInfo, pGroupInfo, pInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -6144,15 +6132,15 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - doAggregateImpl(pOperator, 0, pInfo->pCtx, pBlock); + doAggregateImpl(pOperator, 0, pInfo->pCtx); } doSetOperatorCompleted(pOperator); finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); - pInfo->pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput); + getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, pInfo->pRes); - return (pInfo->pRes->info.rows != 0)? pInfo->pRes:NULL; + return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL; } static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { @@ -6207,7 +6195,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { } setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current, pAggInfo); - doAggregateImpl(pOperator, 0, pInfo->pCtx, pBlock); + doAggregateImpl(pOperator, 0, pInfo->pCtx); } pOperator->status = OP_RES_TO_RETURN; @@ -6254,7 +6242,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput); + pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); @@ -6303,7 +6291,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput); + pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL); if (pRes->info.rows >= 1000/*pRuntimeEnv->resultInfo.threshold*/) { break; } @@ -7022,7 +7010,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { tfree(pOperator); } -static int32_t initAggSup(SAggSupporter* pAggSup, SArray* pExprInfo) { +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SArray* pExprInfo) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES); @@ -7047,12 +7035,12 @@ static void clearupAggSup(SAggSupporter* pAggSup) { destroyResultRowPool(pAggSup->pool); } -static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows, const STableGroupInfo* pTableGroupInfo) { - pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); - pInfo->binfo.capacity = 4096; +static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows, SSDataBlock* pResultBlock, const STableGroupInfo* pTableGroupInfo) { + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = pResultBlock; + pInfo->binfo.capacity = numOfRows; - initAggSup(&pInfo->aggSup, pExprInfo); + doInitAggInfoSup(&pInfo->aggSup, pExprInfo); pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); int32_t index = 0; @@ -7074,14 +7062,15 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n return TSDB_CODE_SUCCESS; } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, + SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); int32_t numOfRows = 1; //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); - initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo); - setDefaultOutputBuf_rv(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + initAggInfo(pInfo, pExprInfo, numOfRows, pResultBlock, pTableGroupInfo); + setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; @@ -7093,8 +7082,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->pTaskInfo = pTaskInfo; - pOperator->nextDataFn = doAggregate; - pOperator->closeFn = destroyAggOperatorInfo; + pOperator->nextDataFn = doAggregate; + pOperator->closeFn = destroyAggOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7171,12 +7160,12 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { +SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); int32_t numOfRows = 1; size_t numOfOutput = taosArrayGetSize(pExprInfo); - initAggInfo(pInfo, pExprInfo, numOfRows, pTableGroupInfo); + initAggInfo(pInfo, pExprInfo, numOfRows, pResBlock, pTableGroupInfo); size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup); @@ -7189,6 +7178,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray pOperator->info = pInfo; pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = numOfOutput; + pOperator->pTaskInfo = pTaskInfo; pOperator->nextDataFn = doMultiTableAggregate; pOperator->closeFn = destroyAggOperatorInfo; @@ -7202,10 +7192,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp int32_t numOfRows = 4096; pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); // initResultRowInfo(&pBInfo->resultRowInfo, 8); -// setDefaultOutputBuf_rv(pBInfo, MAIN_SCAN); +// setFunctionResultOutput(pBInfo, MAIN_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; @@ -7256,31 +7246,6 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 return 0; } -SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { - SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); - - assert(numOfFilter > 0 && pCols != NULL); -// doCreateFilterInfo(pCols, numOfOutput, numOfFilter, &pInfo->pFilterInfo, 0); - pInfo->numOfFilterCols = numOfFilter; - - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - - pOperator->name = "FilterOperator"; -// pOperator->operatorType = OP_Filter; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->numOfOutput = numOfOutput; - pOperator->pExpr = pExpr; - pOperator->nextDataFn = doFilter; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->closeFn = destroyConditionOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); - - return pOperator; -} - SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; @@ -7302,7 +7267,7 @@ SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorIn SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); - initAggSup(&pInfo->aggSup, pExprInfo); + doInitAggInfoSup(&pInfo->aggSup, pExprInfo); pInfo->order = TSDB_ORDER_ASC; pInfo->precision = TSDB_TIME_PRECISION_MICRO; @@ -7311,7 +7276,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pEx int32_t code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); @@ -7872,16 +7837,16 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato return pOperator; } -static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { +static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SExprBasicInfo *pExpr, SColumnInfo* pTagCols) { int32_t j = 0; - if (TSDB_COL_IS_TAG(pExpr->pColumns->flag)) { - if (pExpr->pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) { + if (TSDB_COL_IS_TAG(pExpr->pParam[0].pCol->type)) { + if (pExpr->pParam[0].pCol->colId == TSDB_TBNAME_COLUMN_INDEX) { return TSDB_TBNAME_COLUMN_INDEX; } while(j < pTableInfo->numOfTags) { - if (pExpr->pColumns->info.colId == pTagCols[j].colId) { + if (pExpr->pParam[0].pCol->colId == pTagCols[j].colId) { return j; } @@ -7903,47 +7868,12 @@ static int32_t getColumnIndexInSource(SQueriedTableInfo *pTableInfo, SSqlExpr *p return INT32_MIN; // return a less than TSDB_TBNAME_COLUMN_INDEX value } -bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SColumnInfo* pTagCols) { +bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SExprBasicInfo *pExpr, SColumnInfo* pTagCols) { int32_t j = getColumnIndexInSource(pTableInfo, pExpr, pTagCols); return j != INT32_MIN; } -static bool validateQueryMsg(SQueryTableReq *pQueryMsg) { - if (pQueryMsg->interval.interval < 0) { - //qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->interval.interval); - return false; - } - -// if (pQueryMsg->sw.gap < 0 || pQueryMsg->sw.primaryColId != PRIMARYKEY_TIMESTAMP_COL_ID) { - //qError("qmsg:%p illegal value of session window time %" PRId64, pQueryMsg, pQueryMsg->sw.gap); -// return false; -// } - -// if (pQueryMsg->sw.gap > 0 && pQueryMsg->interval.interval > 0) { - //qError("qmsg:%p illegal value of session window time %" PRId64" and interval value %"PRId64, pQueryMsg, -// pQueryMsg->sw.gap, pQueryMsg->interval.interval); -// return false; -// } - - if (pQueryMsg->numOfTables <= 0) { - //qError("qmsg:%p illegal value of numOfTables %d", pQueryMsg, pQueryMsg->numOfTables); - return false; - } - - if (pQueryMsg->numOfGroupCols < 0) { - //qError("qmsg:%p illegal value of numOfGroupbyCols %d", pQueryMsg, pQueryMsg->numOfGroupCols); - return false; - } - - if (pQueryMsg->numOfOutput > TSDB_MAX_COLUMNS || pQueryMsg->numOfOutput <= 0) { - //qError("qmsg:%p illegal value of output columns %d", pQueryMsg, pQueryMsg->numOfOutput); - return false; - } - - return true; -} - -static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pExpr, int32_t numOfOutput, +static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SExprBasicInfo** pExpr, int32_t numOfOutput, SColumnInfo* pTagCols, void* pMsg) { int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags; if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) { @@ -7953,7 +7883,7 @@ static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pEx if (numOfTotal == 0) { // table total columns are not required. // for(int32_t i = 0; i < numOfOutput; ++i) { -// SSqlExpr* p = pExpr[i]; +// SExprBasicInfo* p = pExpr[i]; // if ((p->functionId == FUNCTION_TAGPRJ) || // (p->functionId == FUNCTION_TID_TAG && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || // (p->functionId == FUNCTION_COUNT && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || @@ -7974,23 +7904,6 @@ static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pEx return true; } -static char *createTableIdList(SQueryTableReq *pQueryMsg, char *pMsg, SArray **pTableIdList) { - assert(pQueryMsg->numOfTables > 0); - - *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo)); - - for (int32_t j = 0; j < pQueryMsg->numOfTables; ++j) { - STableIdInfo* pTableIdInfo = (STableIdInfo *)pMsg; - pTableIdInfo->uid = htobe64(pTableIdInfo->uid); - pTableIdInfo->key = htobe64(pTableIdInfo->key); - - taosArrayPush(*pTableIdList, pTableIdInfo); - pMsg += sizeof(STableIdInfo); - } - - return pMsg; -} - static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) { for (int32_t f = 0; f < numOfFilters; ++f) { SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg); @@ -8022,6 +7935,52 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t return TSDB_CODE_SUCCESS; } +SArray* createExprInfo(SAggPhysiNode* pPhyNode, int32_t* resultRowSize) { + *resultRowSize = pPhyNode->node.pOutputDataBlockDesc->resultRowSize; + + int32_t numOfAggFuncs = LIST_LENGTH(pPhyNode->pAggFuncs); + + SArray* pArray = taosArrayInit(numOfAggFuncs, POINTER_BYTES); + for(int32_t i = 0; i < numOfAggFuncs; ++i) { + SExprInfo* pExp = calloc(1, sizeof(SExprInfo)); + + pExp->pExpr = calloc(1, sizeof(tExprNode)); + pExp->pExpr->_function.num = 1; + + pExp->base.pParam = calloc(1, sizeof(SFunctParam)); + pExp->base.numOfParams = 1; + + pExp->base.pParam[0].pCol = calloc(1, sizeof(SColumn)); + SColumn* pCol = pExp->base.pParam[0].pCol; + + ASSERT(LIST_LENGTH(pPhyNode->pAggFuncs) == 1); + STargetNode* pTargetNode = (STargetNode*) nodesListGetNode(pPhyNode->pAggFuncs, 0); + + SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; + pExp->base.resSchema = createSchema(pFuncNode->node.resType.type, pFuncNode->node.resType.bytes, pTargetNode->slotId, pFuncNode->node.aliasName); + pExp->pExpr->_function.pFunctNode = pFuncNode; + + strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName)); + + // TODO: value parameter needs to be handled + int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); + for(int32_t j = 0; j < numOfParam; ++j) { + SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); + SColumnNode* pcn = (SColumnNode*)p1; + + pCol->slotId = pcn->slotId; + pCol->bytes = pcn->node.resType.bytes; + pCol->type = pcn->node.resType.type; + pCol->scale = pcn->node.resType.scale; + pCol->precision = pcn->node.resType.precision; + pCol->dataBlockId = pcn->dataBlockId; + } + taosArrayPush(pArray, &pExp); + } + + return pArray; +} + static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { SExecTaskInfo* pTaskInfo = calloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); @@ -8092,11 +8051,14 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa size_t size = LIST_LENGTH(pPhyNode->pChildren); assert(size == 1); - // TODO single table agg for (int32_t i = 0; i < size; ++i) { - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); - // return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo); + + int32_t resultRowSize = 0; + SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode, &resultRowSize); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createAggregateOperatorInfo(op, pExprInfo, pResBlock, pTaskInfo, pTableGroupInfo); } } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); @@ -8231,27 +8193,6 @@ int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int return TSDB_CODE_SUCCESS; } -int32_t buildArithmeticExprFromMsg(SExprInfo *pExprInfo, void *pQueryMsg) { - //qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg); - - tExprNode* pExprNode = NULL; - TRY(TSDB_MAX_TAG_CONDITIONS) { - pExprNode = exprTreeFromBinary(pExprInfo->base.param[0].pz, pExprInfo->base.param[0].nLen); - } CATCH( code ) { - CLEANUP_EXECUTE(); - //qError("qmsg:%p failed to create arithmetic expression string from:%s, reason: %s", pQueryMsg, pExprInfo->base.param[0].pz, tstrerror(code)); - return code; - } END_TRY - - if (pExprNode == NULL) { - //qError("qmsg:%p failed to create arithmetic expression string from:%s", pQueryMsg, pExprInfo->base.param[0].pz); - return TSDB_CODE_QRY_APP_ERROR; - } - - pExprInfo->pExpr = pExprNode; - return TSDB_CODE_SUCCESS; -} - static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, int32_t numOfOutput, int32_t tagLen, bool superTable) { for (int32_t i = 0; i < numOfOutput; ++i) { int16_t functId = getExprFunctionId(&pExprs[i]); @@ -8273,127 +8214,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol } // TODO tag length should be passed from client, refactor -int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, - SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo) { - *pExprInfo = NULL; - int32_t code = TSDB_CODE_SUCCESS; - -// code = initUdfInfo(pUdfInfo); - if (code) { - return code; - } - - SExprInfo *pExprs = (SExprInfo *)calloc(numOfOutput, sizeof(SExprInfo)); - if (pExprs == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - bool isSuperTable = /*QUERY_IS_STABLE_QUERY(queryType);*/ true; - int16_t tagLen = 0; - - for (int32_t i = 0; i < numOfOutput; ++i) { - pExprs[i].base = *pExprMsg[i]; - - memset(pExprs[i].base.param, 0, sizeof(SVariant) * tListLen(pExprs[i].base.param)); - for (int32_t j = 0; j < pExprMsg[i]->numOfParams; ++j) { - taosVariantAssign(&pExprs[i].base.param[j], &pExprMsg[i]->param[j]); - } - - int16_t type = 0; - int16_t bytes = 0; - - // parse the arithmetic expression - int32_t functionId = getExprFunctionId(&pExprs[i]); - if (functionId == FUNCTION_ARITHM) { - code = buildArithmeticExprFromMsg(&pExprs[i], pMsg); - - if (code != TSDB_CODE_SUCCESS) { - tfree(pExprs); - return code; - } - - type = TSDB_DATA_TYPE_DOUBLE; - bytes = tDataTypes[type].bytes; - } else if (functionId == FUNCTION_BLKINFO) { - SSchema s = {.type=TSDB_DATA_TYPE_BINARY, .bytes=TSDB_MAX_BINARY_LEN}; - type = s.type; - bytes = s.bytes; - } else if (pExprs[i].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX && functionId == FUNCTION_TAGPRJ) { // parse the normal column - const SSchema* s = tGetTbnameColumnSchema(); - type = s->type; - bytes = s->bytes; - } else if (pExprs[i].base.pColumns->info.colId <= TSDB_UD_COLUMN_INDEX && pExprs[i].base.pColumns->info.colId > TSDB_RES_COL_ID) { - // it is a user-defined constant value column - assert(functionId == FUNCTION_PRJ); - - type = pExprs[i].base.param[1].nType; - bytes = pExprs[i].base.param[1].nLen; - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - bytes += VARSTR_HEADER_SIZE; - } - } else { - int32_t j = getColumnIndexInSource(pTableInfo, &pExprs[i].base, pTagCols); - if (TSDB_COL_IS_TAG(pExprs[i].base.pColumns->flag)) { - if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pTableInfo->numOfTags) { - tfree(pExprs); - return TSDB_CODE_QRY_INVALID_MSG; - } - } else { - if (j < PRIMARYKEY_TIMESTAMP_COL_ID || j >= pTableInfo->numOfCols) { - tfree(pExprs); - return TSDB_CODE_QRY_INVALID_MSG; - } - } - - if (pExprs[i].base.pColumns->info.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) { - SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.pColumns->flag))? &pTagCols[j]:&pTableInfo->colList[j]; - type = pCol->type; - bytes = pCol->bytes; - } else { - const SSchema* s = tGetTbnameColumnSchema(); - - type = s->type; - bytes = s->bytes; - } - -// if (pExprs[i].base.flist.numOfFilters > 0) { -// int32_t ret = cloneExprFilterInfo(&pExprs[i].base.flist.filterInfo, pExprMsg[i]->flist.filterInfo, -// pExprMsg[i]->flist.numOfFilters); -// if (ret) { -// tfree(pExprs); -// return ret; -// } -// } - } - - int32_t param = (int32_t)pExprs[i].base.param[0].i; -// if (functionId != FUNCTION_ARITHM && -// (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) { -// tfree(pExprs); -// return TSDB_CODE_QRY_INVALID_MSG; -// } - - // todo remove it - SResultDataInfo info; - if (getResultDataInfo(type, bytes, functionId, param, &info, 0, isSuperTable/*, pUdfInfo*/) != TSDB_CODE_SUCCESS) { - tfree(pExprs); - return TSDB_CODE_QRY_INVALID_MSG; - } - - if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) { - tagLen += pExprs[i].base.resSchema.bytes; - } - - assert(isValidDataType(pExprs[i].base.resSchema.type)); - } - - // the tag length is affected by other tag columns, so this should be update. - updateOutputBufForTopBotQuery(pTableInfo, pTagCols, pExprs, numOfOutput, tagLen, isSuperTable); - - *pExprInfo = pExprs; - return TSDB_CODE_SUCCESS; -} - int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) { tExprNode* expr = NULL; @@ -8415,26 +8235,6 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) { // return ret; } -SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code) { - if (pQueryMsg->numOfGroupCols == 0) { - return NULL; - } - - // using group by tag columns - SGroupbyExpr *pGroupbyExpr = (SGroupbyExpr *)calloc(1, sizeof(SGroupbyExpr)); - if (pGroupbyExpr == NULL) { - *code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pGroupbyExpr->columnInfo = taosArrayInit(pQueryMsg->numOfGroupCols, sizeof(SColIndex)); - for(int32_t i = 0; i < pQueryMsg->numOfGroupCols; ++i) { - taosArrayPush(pGroupbyExpr->columnInfo, &pColIndex[i]); - } - - return pGroupbyExpr; -} - //int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId) { // *pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols); // if (*pFilterInfo == NULL) { @@ -8522,7 +8322,7 @@ static void doUpdateExprColumnIndex(STaskAttr *pQueryAttr) { assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL); for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) { - SSqlExpr *pSqlExprMsg = &pQueryAttr->pExpr1[k].base; + SExprBasicInfo *pSqlExprMsg = &pQueryAttr->pExpr1[k].base; // if (pSqlExprMsg->functionId == FUNCTION_ARITHM) { // continue; // } @@ -8687,44 +8487,6 @@ static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo) { pTableqinfoGroupInfo->numOfTables = 0; } -void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) { - if (pExprInfo == NULL) { - assert(numOfExpr == 0); - return NULL; - } - - for (int32_t i = 0; i < numOfExpr; ++i) { - if (pExprInfo[i].pExpr != NULL) { - tExprTreeDestroy(pExprInfo[i].pExpr, NULL); - } - -// if (pExprInfo[i].base.flist.filterInfo) { -// freeColumnFilterInfo(pExprInfo[i].base.flist.filterInfo, pExprInfo[i].base.flist.numOfFilters); -// } - - for(int32_t j = 0; j < pExprInfo[i].base.numOfParams; ++j) { - taosVariantDestroy(&pExprInfo[i].base.param[j]); - } - } - - tfree(pExprInfo); - return NULL; -} - -void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) { - if (pColumnInfo != NULL) { - assert(numOfCols >= 0); - - for (int32_t i = 0; i < numOfCols; i++) { - freeColumnFilterInfo(pColumnInfo[i].flist.filterInfo, pColumnInfo[i].flist.numOfFilters); - } - - tfree(pColumnInfo); - } - - return NULL; -} - void doDestroyTask(SExecTaskInfo *pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h new file mode 100644 index 0000000000000000000000000000000000000000..18d1ff41e26883257f3dc62b3d15aea9a617e7fd --- /dev/null +++ b/source/libs/function/inc/builtinsimpl.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_BUILTINSIMPL_H +#define TDENGINE_BUILTINSIMPL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "function.h" + +bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +void functionFinalizer(SqlFunctionCtx *pCtx); + +bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void countFunction(SqlFunctionCtx *pCtx); + +bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void sumFunction(SqlFunctionCtx *pCtx); + +#ifdef __cplusplus +} +#endif +#endif // TDENGINE_BUILTINSIMPL_H diff --git a/source/libs/function/inc/taggfunction.h b/source/libs/function/inc/taggfunction.h index 65a100efed0d894ef0f64f32341cb79be217a0cf..d71ff789ba8342ed231a20e1ba8b686d93e959c0 100644 --- a/source/libs/function/inc/taggfunction.h +++ b/source/libs/function/inc/taggfunction.h @@ -83,9 +83,7 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32 pResInfo->initialized = true; // the this struct has been initialized flag pResInfo->complete = false; - pResInfo->hasResult = false; pResInfo->numOfRes = 0; - memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d9eeb6eeeb324ea7b9c03d432a5eee7552c43ce7..b95fba9596e0c92336268c715075b8b47f7466f8 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -14,7 +14,9 @@ */ #include "builtins.h" +#include "builtinsimpl.h" #include "taoserror.h" +#include "tdatablock.h" int32_t stubCheckAndGetResultType(SFunctionNode* pFunc); @@ -23,29 +25,29 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "count", .type = FUNCTION_TYPE_COUNT, .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, - .initFunc = NULL, - .processFunc = NULL, - .finalizeFunc = NULL + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getCountFuncEnv, + .initFunc = functionSetup, + .processFunc = countFunction, + .finalizeFunc = functionFinalizer }, { .name = "sum", .type = FUNCTION_TYPE_SUM, .classification = FUNC_MGT_AGG_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, - .initFunc = NULL, - .processFunc = NULL, - .finalizeFunc = NULL + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getSumFuncEnv, + .initFunc = functionSetup, + .processFunc = sumFunction, + .finalizeFunc = functionFinalizer }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, - .checkFunc = stubCheckAndGetResultType, - .getEnvFunc = NULL, - .initFunc = NULL, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, .sprocessFunc = NULL, .finalizeFunc = NULL } @@ -54,5 +56,11 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { const int funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition)); int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { + switch(pFunc->funcType) { + case FUNCTION_TYPE_COUNT: pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};break; + default: + break; + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c new file mode 100644 index 0000000000000000000000000000000000000000..fb30cce6a9fa31c9f054c7419dcc89d9dfe0ec6e --- /dev/null +++ b/source/libs/function/src/builtinsimpl.c @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "builtinsimpl.h" +#include "taggfunction.h" +#include "tdatablock.h" + +#define SET_VAL(_info, numOfElem, res) \ + do { \ + if ((numOfElem) <= 0) { \ + break; \ + } \ + (_info)->numOfRes = (res); \ + (_info)->hasResult = DATA_SET_FLAG; \ + } while (0) + +typedef struct SSumRes { +// int8_t hasResult; + union { + int64_t isum; + uint64_t usum; + double dsum; + }; +} SSumRes; + +bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (pResultInfo->initialized) { + return false; + } + + if (pCtx->pOutput != NULL) { + memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes); + } + + initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize); + return true; +} + +static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(pResInfo); } + +void functionFinalizer(SqlFunctionCtx *pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + if (pResInfo->hasResult != DATA_SET_FLAG) { +// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); + } + + doFinalizer(pResInfo); +} + +bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(int64_t); + return true; +} + +/* + * count function does need the finalize, if data is missing, the default value, which is 0, is used + * count function does not use the pCtx->interResBuf to keep the intermediate buffer + */ +void countFunction(SqlFunctionCtx *pCtx) { + int32_t numOfElem = 0; + + /* + * 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true; + * 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true; + * 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false; + */ + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) { + numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; + ASSERT(numOfElem >= 0); + } else { + if (pInputCol->hasNull) { + for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + numOfElem += 1; + } + } else { + //when counting on the primary time stamp column and no statistics data is presented, use the size value directly. + numOfElem = pInput->numOfRows; + } + } + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + *((int64_t *)buf) += numOfElem; + + SET_VAL(pResInfo, numOfElem, 1); +} + +#define LIST_ADD_N(_res, _col, _start, _rows, _t, numOfElem) \ + do { \ + _t *d = (_t *)(_col->pData); \ + for (int32_t i = (_start); i < (_rows) + (_start); ++i) { \ + if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ + continue; \ + }; \ + (_res) += (d)[i]; \ + (numOfElem)++; \ + } \ + } while (0) + +static void do_sum(SqlFunctionCtx *pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0]; + int32_t type = pInput->pData[0]->info.type; + + if (pInput->colDataAggIsSet) { + numOfElem = pInput->numOfRows - pAgg->numOfNull; + ASSERT(numOfElem >= 0); + + SSumRes* pSumInfo = (SSumRes*) pCtx->pOutput; + if (IS_SIGNED_NUMERIC_TYPE(type)) { + pSumInfo->isum += pAgg->sum; + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + pSumInfo->usum += pAgg->sum; + } else if (IS_FLOAT_TYPE(type)) { + pSumInfo->dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum)); + } + } else { // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + SSumRes* pSum = (SSumRes*) pCtx->pOutput; + + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { + if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { + LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int8_t, numOfElem); + } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { + LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int16_t, numOfElem); + } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { + LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int32_t, numOfElem); + } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { + LIST_ADD_N(pSum->isum, pCol, start, numOfRows, int64_t, numOfElem); + } + } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { + if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { + LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint8_t, numOfElem); + } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { + LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint16_t, numOfElem); + } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { + LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint32_t, numOfElem); + } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { + LIST_ADD_N(pSum->usum, pCol, start, numOfRows, uint64_t, numOfElem); + } + } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { + LIST_ADD_N(pSum->dsum, pCol, start, numOfRows, double, numOfElem); + } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { + LIST_ADD_N(pSum->dsum, pCol, start, numOfRows, float, numOfElem); + } + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); +} + +bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SSumRes); + return true; +} + +void sumFunction(SqlFunctionCtx *pCtx) { + do_sum(pCtx); + + // keep the result data in output buffer, not in the intermediate buffer +// SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); +// if (pResInfo->hasResult == DATA_SET_FLAG) { + // set the flag for super table query +// SSumRes *pSum = (SSumRes *)pCtx->pOutput; +// pSum->hasResult = DATA_SET_FLAG; +// } +} diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 862354c1382872f63daec601924c623bd86a2877..e087f8f3e949c43d37795cadbcff3afe80c2263d 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -26,19 +26,27 @@ typedef struct SFuncMgtService { } SFuncMgtService; static SFuncMgtService gFunMgtService; +static pthread_once_t functionHashTableInit = PTHREAD_ONCE_INIT; +static int32_t initFunctionCode = 0; -// todo refactor -int32_t fmFuncMgtInit() { +static void doInitFunctionHashTable() { gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (NULL == gFunMgtService.pFuncNameHashTable) { - return TSDB_CODE_FAILED; + initFunctionCode = TSDB_CODE_FAILED; + return; } + for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) { if (TSDB_CODE_SUCCESS != taosHashPut(gFunMgtService.pFuncNameHashTable, funcMgtBuiltins[i].name, strlen(funcMgtBuiltins[i].name), &i, sizeof(int32_t))) { - return TSDB_CODE_FAILED; + initFunctionCode = TSDB_CODE_FAILED; + return; } } - return TSDB_CODE_SUCCESS; +} + +int32_t fmFuncMgtInit() { + pthread_once(&functionHashTableInit, doInitFunctionHashTable); + return initFunctionCode; } int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) { diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 7fb63f591089fb19c693bae5eb205c72f3f5aace..47d09ec2dc48e893851a7823396cccc07ae6c86b 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -16,10 +16,10 @@ #include "os.h" #include "taosdef.h" #include "tmsg.h" -#include "tglobal.h" #include "thash.h" #include "ttypes.h" +#include "function.h" #include "taggfunction.h" #include "tfill.h" #include "thistogram.h" @@ -200,27 +200,47 @@ void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) { pCell->initialized = false; } -int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num) { - int32_t maxOutput = 0; +int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock) { + int32_t maxRows = 0; + for (int32_t j = 0; j < num; ++j) { +#if 0 int32_t id = pCtx[j].functionId; /* * ts, tag, tagprj function can not decide the output number of current query * the number of output result is decided by main output */ - if (/*hasMainFunction && */(id == FUNCTION_TS || id == FUNCTION_TAG || id == FUNCTION_TAGPRJ)) { + if (id == FUNCTION_TS || id == FUNCTION_TAG || id == FUNCTION_TAGPRJ) { continue; } - +#endif SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]); - if (pResInfo != NULL && maxOutput < pResInfo->numOfRes) { - maxOutput = pResInfo->numOfRes; + if (pResInfo != NULL && maxRows < pResInfo->numOfRes) { + maxRows = pResInfo->numOfRes; } } - assert(maxOutput >= 0); - return maxOutput; + assert(maxRows >= 0); + + blockDataEnsureCapacity(pResBlock, maxRows); + for(int32_t i = 0; i < num; ++i) { + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); + + SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]); + if (!pResInfo->hasResult) { + for(int32_t j = 0; j < pResInfo->numOfRes; ++j) { + colDataAppend(pCol, j, NULL, true); // TODO add set null data api + } + } else { + for (int32_t j = 0; j < pResInfo->numOfRes; ++j) { + colDataAppend(pCol, j, GET_ROWCELL_INTERBUF(pResInfo), false); + } + } + } + + pResBlock->info.rows = maxRows; + return maxRows; } void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num) { @@ -254,9 +274,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI pInfo->bytes = (int16_t)dataBytes; if (functionId == FUNCTION_INTERP) { - pInfo->intermediateBytes = sizeof(SInterpInfoDetail); + pInfo->interBufSize = sizeof(SInterpInfoDetail); } else { - pInfo->intermediateBytes = 0; + pInfo->interBufSize = 0; } return TSDB_CODE_SUCCESS; @@ -266,42 +286,42 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI if (functionId == FUNCTION_TID_TAG) { // todo use struct pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = (int16_t)(dataBytes + sizeof(int16_t) + sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t) + VARSTR_HEADER_SIZE); - pInfo->intermediateBytes = 0; + pInfo->interBufSize = 0; return TSDB_CODE_SUCCESS; } if (functionId == FUNCTION_BLKINFO) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = 16384; - pInfo->intermediateBytes = 0; + pInfo->interBufSize = 0; return TSDB_CODE_SUCCESS; } if (functionId == FUNCTION_COUNT) { pInfo->type = TSDB_DATA_TYPE_BIGINT; pInfo->bytes = sizeof(int64_t); - pInfo->intermediateBytes = 0; + pInfo->interBufSize = 0; return TSDB_CODE_SUCCESS; } if (functionId == FUNCTION_ARITHM) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); - pInfo->intermediateBytes = 0; + pInfo->interBufSize = 0; return TSDB_CODE_SUCCESS; } if (functionId == FUNCTION_TS_COMP) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = 1; // this results is compressed ts data, only one byte - pInfo->intermediateBytes = POINTER_BYTES; + pInfo->interBufSize = POINTER_BYTES; return TSDB_CODE_SUCCESS; } if (functionId == FUNCTION_DERIVATIVE) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); // this results is compressed ts data, only one byte - pInfo->intermediateBytes = sizeof(SDerivInfo); + pInfo->interBufSize = sizeof(SDerivInfo); return TSDB_CODE_SUCCESS; } @@ -310,11 +330,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI // if (pUdfInfo->bufSize > 0) { // pInfo->type = TSDB_DATA_TYPE_BINARY; // pInfo->bytes = pUdfInfo->bufSize; -// pInfo->intermediateBytes = pInfo->bytes; +// pInfo->interBufSize = pInfo->bytes; // } else { // pInfo->type = pUdfInfo->resType; // pInfo->bytes = pUdfInfo->resBytes; -// pInfo->intermediateBytes = pInfo->bytes; +// pInfo->interBufSize = pInfo->bytes; // } // // return TSDB_CODE_SUCCESS; @@ -323,54 +343,54 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI if (functionId == FUNCTION_MIN || functionId == FUNCTION_MAX) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = (int16_t)(dataBytes + DATA_SET_FLAG_SIZE); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_SUM) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = sizeof(SSumInfo); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_AVG) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = sizeof(SAvgInfo); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } else if (functionId >= FUNCTION_RATE && functionId <= FUNCTION_IRATE) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(SRateInfo); - pInfo->intermediateBytes = sizeof(SRateInfo); + pInfo->interBufSize = sizeof(SRateInfo); return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = (int16_t)(sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_SPREAD) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = sizeof(SSpreadInfo); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_APERCT) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1) + sizeof(SHistogramInfo) + sizeof(SAPercentileInfo); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_LAST_ROW) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = (int16_t)(sizeof(SLastrowInfo) + dataBytes); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_TWA) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(STwaInfo); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; return TSDB_CODE_SUCCESS; } } @@ -385,18 +405,18 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI } pInfo->bytes = sizeof(int64_t); - pInfo->intermediateBytes = sizeof(SSumInfo); + pInfo->interBufSize = sizeof(SSumInfo); return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_APERCT) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); - pInfo->intermediateBytes = + pInfo->interBufSize = sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1); return TSDB_CODE_SUCCESS; } else if (functionId == FUNCTION_TWA) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); - pInfo->intermediateBytes = sizeof(STwaInfo); + pInfo->interBufSize = sizeof(STwaInfo); return TSDB_CODE_SUCCESS; } @@ -405,9 +425,9 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI // pInfo->bytes = pUdfInfo->resBytes; // // if (pUdfInfo->bufSize > 0) { -// pInfo->intermediateBytes = pUdfInfo->bufSize; +// pInfo->interBufSize = pUdfInfo->bufSize; // } else { -// pInfo->intermediateBytes = pInfo->bytes; +// pInfo->interBufSize = pInfo->bytes; // } // // return TSDB_CODE_SUCCESS; @@ -416,39 +436,39 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI if (functionId == FUNCTION_AVG) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); - pInfo->intermediateBytes = sizeof(SAvgInfo); + pInfo->interBufSize = sizeof(SAvgInfo); } else if (functionId >= FUNCTION_RATE && functionId <= FUNCTION_IRATE) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); - pInfo->intermediateBytes = sizeof(SRateInfo); + pInfo->interBufSize = sizeof(SRateInfo); } else if (functionId == FUNCTION_STDDEV) { pInfo->type = TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); - pInfo->intermediateBytes = sizeof(SStddevInfo); + pInfo->interBufSize = sizeof(SStddevInfo); } else if (functionId == FUNCTION_MIN || functionId == FUNCTION_MAX) { pInfo->type = (int16_t)dataType; pInfo->bytes = (int16_t)dataBytes; - pInfo->intermediateBytes = dataBytes + DATA_SET_FLAG_SIZE; + pInfo->interBufSize = dataBytes + DATA_SET_FLAG_SIZE; } else if (functionId == FUNCTION_FIRST || functionId == FUNCTION_LAST) { pInfo->type = (int16_t)dataType; pInfo->bytes = (int16_t)dataBytes; - pInfo->intermediateBytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo)); + pInfo->interBufSize = (int16_t)(dataBytes + sizeof(SFirstLastInfo)); } else if (functionId == FUNCTION_SPREAD) { pInfo->type = (int16_t)TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = sizeof(double); - pInfo->intermediateBytes = sizeof(SSpreadInfo); + pInfo->interBufSize = sizeof(SSpreadInfo); } else if (functionId == FUNCTION_PERCT) { pInfo->type = (int16_t)TSDB_DATA_TYPE_DOUBLE; pInfo->bytes = (int16_t)sizeof(double); - pInfo->intermediateBytes = (int16_t)sizeof(SPercentileInfo); + pInfo->interBufSize = (int16_t)sizeof(SPercentileInfo); } else if (functionId == FUNCTION_LEASTSQR) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = TMAX(AVG_FUNCTION_INTER_BUFFER_SIZE, sizeof(SLeastsquaresInfo)); // string - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; } else if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = (int16_t)(dataBytes + sizeof(SFirstLastInfo)); - pInfo->intermediateBytes = pInfo->bytes; + pInfo->interBufSize = pInfo->bytes; } else if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) { pInfo->type = (int16_t)dataType; pInfo->bytes = (int16_t)dataBytes; @@ -456,15 +476,15 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI size_t size = sizeof(STopBotInfo) + (sizeof(tValuePair) + POINTER_BYTES + extLength) * param; // the output column may be larger than sizeof(STopBotInfo) - pInfo->intermediateBytes = (int32_t)size; + pInfo->interBufSize = (int32_t)size; } else if (functionId == FUNCTION_LAST_ROW) { pInfo->type = (int16_t)dataType; pInfo->bytes = (int16_t)dataBytes; - pInfo->intermediateBytes = dataBytes; + pInfo->interBufSize = dataBytes; } else if (functionId == FUNCTION_STDDEV_DST) { pInfo->type = TSDB_DATA_TYPE_BINARY; pInfo->bytes = sizeof(SStddevdstInfo); - pInfo->intermediateBytes = (pInfo->bytes); + pInfo->interBufSize = (pInfo->bytes); } else { return TSDB_CODE_TSC_INVALID_OPERATION; @@ -479,7 +499,7 @@ static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf } memset(pCtx->pOutput, 0, (size_t)pCtx->resDataInfo.bytes); - initResultRowEntry(pResultInfo, pCtx->resDataInfo.intermediateBytes); + initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize); return true; } @@ -492,9 +512,9 @@ static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf */ static void function_finalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); - } +// if (pResInfo->hasResult != DATA_SET_FLAG) { // TODO set the correct null value +// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); +// } doFinalizer(pCtx); } @@ -530,7 +550,7 @@ static void count_function(SqlFunctionCtx *pCtx) { } if (numOfElem > 0) { - GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; +// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; } *((int64_t *)pCtx->pOutput) += numOfElem; @@ -694,7 +714,7 @@ static void do_sum(SqlFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { - GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; +// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; } } @@ -703,11 +723,11 @@ static void sum_function(SqlFunctionCtx *pCtx) { // keep the result data in output buffer, not in the intermediate buffer SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { +// if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { // set the flag for super table query SSumInfo *pSum = (SSumInfo *)pCtx->pOutput; pSum->hasResult = DATA_SET_FLAG; - } +// } } static void sum_func_merge(SqlFunctionCtx *pCtx) { @@ -738,7 +758,7 @@ static void sum_func_merge(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); if (notNullElems > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -876,7 +896,7 @@ static void avg_function(SqlFunctionCtx *pCtx) { pAvgInfo->num += notNullElems; if (notNullElems > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } // keep the data into the final output buffer for super table query since this execution may be the last one @@ -1192,7 +1212,7 @@ static void min_function(SqlFunctionCtx *pCtx) { if (notNullElems > 0) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; // set the flag for super table query if (pCtx->stableQuery) { @@ -1209,7 +1229,7 @@ static void max_function(SqlFunctionCtx *pCtx) { if (notNullElems > 0) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; // set the flag for super table query if (pCtx->stableQuery) { @@ -1310,7 +1330,7 @@ static void min_func_merge(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); if (notNullElems > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -1321,7 +1341,7 @@ static void max_func_merge(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); if (numOfElem > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -1602,7 +1622,7 @@ static void first_function(SqlFunctionCtx *pCtx) { } SResultRowEntryInfo *pInfo = GET_RES_INFO(pCtx); - pInfo->hasResult = DATA_SET_FLAG; +// pInfo->hasResult = DATA_SET_FLAG; pInfo->complete = true; notNullElems++; @@ -1652,7 +1672,7 @@ static void first_dist_function(SqlFunctionCtx *pCtx) { first_data_assign_impl(pCtx, data, i); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; notNullElems++; break; @@ -1680,7 +1700,7 @@ static void first_dist_func_merge(SqlFunctionCtx *pCtx) { } SET_VAL(pCtx, 1, 1); - GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; +// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; } ////////////////////////////////////////////////////////////////////////////////////////// @@ -1712,7 +1732,7 @@ static void last_function(SqlFunctionCtx *pCtx) { TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; DO_UPDATE_TAG_COLUMNS(pCtx, ts); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; pResInfo->complete = true; // set query completed on this column notNullElems++; break; @@ -1727,13 +1747,13 @@ static void last_function(SqlFunctionCtx *pCtx) { TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; char* buf = GET_ROWCELL_INTERBUF(pResInfo); - if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { - pResInfo->hasResult = DATA_SET_FLAG; - memcpy(pCtx->pOutput, data, pCtx->inputBytes); - - *(TSKEY*)buf = ts; - DO_UPDATE_TAG_COLUMNS(pCtx, ts); - } +// if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { +// //pResInfo->hasResult = DATA_SET_FLAG; +// memcpy(pCtx->pOutput, data, pCtx->inputBytes); +// +// *(TSKEY*)buf = ts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); +// } notNullElems++; break; @@ -1782,7 +1802,7 @@ static void last_dist_function(SqlFunctionCtx *pCtx) { last_data_assign_impl(pCtx, data, i); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; notNullElems++; break; @@ -1817,7 +1837,7 @@ static void last_dist_func_merge(SqlFunctionCtx *pCtx) { } SET_VAL(pCtx, 1, 1); - GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; +// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; } ////////////////////////////////////////////////////////////////////////////////// @@ -1832,7 +1852,7 @@ static void last_row_function(SqlFunctionCtx *pCtx) { assignVal(pCtx->pOutput, pData + (pCtx->size - 1) * pCtx->inputBytes, pCtx->inputBytes, pCtx->inputType); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; // set the result to final result buffer in case of super table query if (pCtx->stableQuery) { @@ -1852,10 +1872,10 @@ static void last_row_function(SqlFunctionCtx *pCtx) { static void last_row_finalizer(SqlFunctionCtx *pCtx) { // do nothing at the first stage SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); - return; - } +// if (pResInfo->hasResult != DATA_SET_FLAG) { +// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); +// return; +// } GET_RES_INFO(pCtx)->numOfRes = 1; doFinalizer(pCtx); @@ -2248,7 +2268,7 @@ static void top_function(SqlFunctionCtx *pCtx) { if (notNullElems > 0) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2271,7 +2291,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) { if (pOutput->num > 0) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2305,7 +2325,7 @@ static void bottom_function(SqlFunctionCtx *pCtx) { if (notNullElems > 0) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2328,7 +2348,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) { if (pOutput->num > 0) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2338,7 +2358,7 @@ static void top_bottom_func_finalizer(SqlFunctionCtx *pCtx) { // data in temporary list is less than the required number of results, not enough qualified number of results STopBotInfo *pRes = GET_ROWCELL_INTERBUF(pResInfo); if (pRes->num == 0) { // no result - assert(pResInfo->hasResult != DATA_SET_FLAG); +// assert(pResInfo->hasResult != DATA_SET_FLAG); // TODO: } @@ -2457,7 +2477,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) { } SET_VAL(pCtx, notNullElems, 1); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } static void percentile_finalizer(SqlFunctionCtx *pCtx) { @@ -2538,7 +2558,7 @@ static void apercentile_function(SqlFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } } @@ -2568,7 +2588,7 @@ static void apercentile_func_merge(SqlFunctionCtx *pCtx) { } SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; SET_VAL(pCtx, 1, 1); } @@ -2579,18 +2599,18 @@ static void apercentile_finalizer(SqlFunctionCtx *pCtx) { SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); if (pCtx->currentStage == MERGE_STAGE) { - if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null - assert(pOutput->pHisto->numOfElems > 0); - - double ratio[] = {v}; - double *res = tHistogramUniform(pOutput->pHisto, ratio, 1); - - memcpy(pCtx->pOutput, res, sizeof(double)); - free(res); - } else { - setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); - return; - } +// if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null +// assert(pOutput->pHisto->numOfElems > 0); +// +// double ratio[] = {v}; +// double *res = tHistogramUniform(pOutput->pHisto, ratio, 1); +// +// memcpy(pCtx->pOutput, res, sizeof(double)); +// free(res); +// } else { +// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); +// return; +// } } else { if (pOutput->pHisto->numOfElems > 0) { double ratio[] = {v}; @@ -2718,7 +2738,7 @@ static void leastsquares_function(SqlFunctionCtx *pCtx) { pInfo->num += numOfElem; if (pInfo->num > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } SET_VAL(pCtx, numOfElem, 1); @@ -3356,7 +3376,7 @@ static void spread_function(SqlFunctionCtx *pCtx) { SET_VAL(pCtx, numOfElems, 1); if (numOfElems > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; pInfo->hasResult = DATA_SET_FLAG; } @@ -3384,7 +3404,7 @@ void spread_func_merge(SqlFunctionCtx *pCtx) { pCtx->param[3].d = pData->max; } - GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; +// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; } void spread_function_finalizer(SqlFunctionCtx *pCtx) { @@ -3397,10 +3417,10 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) { if (pCtx->currentStage == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); - if (pResInfo->hasResult != DATA_SET_FLAG) { - setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); - return; - } +// if (pResInfo->hasResult != DATA_SET_FLAG) { +// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); +// return; +// } SET_DOUBLE_VAL((double *)pCtx->pOutput, pCtx->param[3].d - pCtx->param[0].d); } else { @@ -3708,7 +3728,7 @@ static void twa_function(SqlFunctionCtx *pCtx) { SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } if (pCtx->stableQuery) { @@ -3726,7 +3746,7 @@ void twa_function_copy(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes); - pResInfo->hasResult = ((STwaInfo *)pCtx->pInput)->hasResult; +// pResInfo->hasResult = ((STwaInfo *)pCtx->pInput)->hasResult; } void twa_function_finalizer(SqlFunctionCtx *pCtx) { @@ -3738,7 +3758,7 @@ void twa_function_finalizer(SqlFunctionCtx *pCtx) { return; } - assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult); +// assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult); if (pInfo->win.ekey == pInfo->win.skey) { SET_DOUBLE_VAL((double *)pCtx->pOutput, pInfo->p.val); } else { @@ -3948,7 +3968,7 @@ static void ts_comp_function(SqlFunctionCtx *pCtx) { } SET_VAL(pCtx, pCtx->size, 1); - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } static void ts_comp_finalize(SqlFunctionCtx *pCtx) { @@ -4069,7 +4089,7 @@ static void rate_function(SqlFunctionCtx *pCtx) { if (notNullElems > 0) { pRateInfo->hasResult = DATA_SET_FLAG; - pResInfo->hasResult = DATA_SET_FLAG; +// pResInfo->hasResult = DATA_SET_FLAG; } // keep the data into the final output buffer for super table query since this execution may be the last one @@ -4083,7 +4103,7 @@ static void rate_func_copy(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes); - pResInfo->hasResult = ((SRateInfo*)pCtx->pInput)->hasResult; +// pResInfo->hasResult = ((SRateInfo*)pCtx->pInput)->hasResult; } static void rate_finalizer(SqlFunctionCtx *pCtx) { @@ -4099,7 +4119,7 @@ static void rate_finalizer(SqlFunctionCtx *pCtx) { // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; doFinalizer(pCtx); } @@ -4139,7 +4159,7 @@ static void irate_function(SqlFunctionCtx *pCtx) { if (notNullElems > 0) { pRateInfo->hasResult = DATA_SET_FLAG; - pResInfo->hasResult = DATA_SET_FLAG; +// pResInfo->hasResult = DATA_SET_FLAG; } // keep the data into the final output buffer for super table query since this execution may be the last one @@ -4197,7 +4217,7 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) { memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len); pResInfo->numOfRes = 1; - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlockDist* pSrc) { @@ -4211,10 +4231,10 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock pDist->totalSize += pSrc->totalSize; pDist->totalRows += pSrc->totalRows; - if (pResInfo->hasResult == DATA_SET_FLAG) { - pDist->maxRows = TMAX(pDist->maxRows, pSrc->maxRows); - pDist->minRows = TMIN(pDist->minRows, pSrc->minRows); - } else { +// if (pResInfo->hasResult == DATA_SET_FLAG) { +// pDist->maxRows = TMAX(pDist->maxRows, pSrc->maxRows); +// pDist->minRows = TMIN(pDist->minRows, pSrc->minRows); +// } else { pDist->maxRows = pSrc->maxRows; pDist->minRows = pSrc->minRows; @@ -4224,7 +4244,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock } pDist->dataBlockInfos = taosArrayInit(maxSteps, sizeof(SFileBlockInfo)); taosArraySetSize(pDist->dataBlockInfos, maxSteps); - } +// } size_t steps = taosArrayGetSize(pSrc->dataBlockInfos); for (int32_t i = 0; i < steps; ++i) { @@ -4243,7 +4263,7 @@ void block_func_merge(SqlFunctionCtx* pCtx) { taosArrayDestroy(info.dataBlockInfos); pResInfo->numOfRes = 1; - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; } void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents, @@ -4354,7 +4374,7 @@ void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) { // cannot set the numOfIteratedElems again since it is set during previous iteration pResInfo->numOfRes = 1; - pResInfo->hasResult = DATA_SET_FLAG; + //pResInfo->hasResult = DATA_SET_FLAG; doFinalizer(pCtx); } @@ -4381,24 +4401,6 @@ int32_t functionCompatList[] = { 6, 8, 7, }; -//typedef struct SFunctionFpSet { -// bool (*init)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment -// void (*addInput)(struct SqlFunctionCtx *pCtx); -// -// // finalizer must be called after all exec has been executed to generated final result. -// void (*finalize)(struct SqlFunctionCtx *pCtx); -// void (*combine)(struct SqlFunctionCtx *pCtx); -//} SFunctionFpSet; - -SFunctionFpSet fpSet[1] = { - { - .init = function_setup, - .addInput = count_function, - .finalize = doFinalizer, - .combine = count_func_merge, - }, -}; - SAggFunctionInfo aggFunc[35] = {{ // 0, count function does not invoke the finalize function "count", diff --git a/source/libs/function/src/tfill.c b/source/libs/function/src/tfill.c index 653e30cec883b1bb82a635e945cf1ec3077fcfba..46d82aa6fbbf1854eeb8415fe21340fd5d16dad5 100644 --- a/source/libs/function/src/tfill.c +++ b/source/libs/function/src/tfill.c @@ -543,7 +543,7 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co pFillCol[i].col.offset = offset; pFillCol[i].col.colId = pExprInfo->base.resSchema.colId; pFillCol[i].tagIndex = -2; - pFillCol[i].flag = pExprInfo->base.pColumns->flag; // always be the normal column for table query + pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query // pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId; pFillCol[i].fillVal.i = fillVal[i];