未验证 提交 6561851b 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #11216 from taosdata/feature/3.0_liaohj

[td-13039] support stddev.
...@@ -193,20 +193,19 @@ typedef struct SColumn { ...@@ -193,20 +193,19 @@ typedef struct SColumn {
uint8_t scale; uint8_t scale;
} SColumn; } SColumn;
typedef struct SLimit { typedef struct STableBlockDistInfo {
int64_t limit; uint16_t rowSize;
int64_t offset; uint16_t numOfFiles;
} SLimit; uint32_t numOfTables;
uint64_t totalSize;
typedef struct SOrder { uint64_t totalRows;
uint32_t order; int32_t maxRows;
SColumn col; int32_t minRows;
} SOrder; int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
typedef struct SGroupbyExpr { uint32_t numOfSmallBlocks;
SArray* columnInfo; // SArray<SColIndex>, group by columns information SArray *dataBlockInfos;
bool groupbyTag; // group by tag or column } STableBlockDistInfo;
} SGroupbyExpr;
enum { enum {
FUNC_PARAM_TYPE_VALUE = 0x1, FUNC_PARAM_TYPE_VALUE = 0x1,
...@@ -241,15 +240,6 @@ typedef struct SExprInfo { ...@@ -241,15 +240,6 @@ typedef struct SExprInfo {
struct tExprNode* pExpr; struct tExprNode* pExpr;
} SExprInfo; } SExprInfo;
typedef struct SStateWindow {
SColumn col;
} SStateWindow;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
SColumn col;
} SSessionWindow;
#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1 #define QUERY_DESC_FORWARD_STEP -1
......
...@@ -52,7 +52,12 @@ typedef struct SFuncExecFuncs { ...@@ -52,7 +52,12 @@ typedef struct SFuncExecFuncs {
FExecFinalize finalize; FExecFinalize finalize;
} SFuncExecFuncs; } SFuncExecFuncs;
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define FUNCTION_TYPE_SCALAR 1 #define FUNCTION_TYPE_SCALAR 1
#define FUNCTION_TYPE_AGG 2 #define FUNCTION_TYPE_AGG 2
...@@ -101,10 +106,6 @@ typedef struct SFuncExecFuncs { ...@@ -101,10 +106,6 @@ typedef struct SFuncExecFuncs {
#define FUNCTION_DERIVATIVE 32 #define FUNCTION_DERIVATIVE 32
#define FUNCTION_BLKINFO 33 #define FUNCTION_BLKINFO 33
#define FUNCTION_HISTOGRAM 34
#define FUNCTION_HLL 35
#define FUNCTION_MODE 36
#define FUNCTION_SAMPLE 37
#define FUNCTION_COV 38 #define FUNCTION_COV 38
......
...@@ -171,6 +171,8 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo ...@@ -171,6 +171,8 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle); bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle);
/** /**
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "tsdbDef.h" #include "tsdbDef.h"
#include <tdatablock.h> #include "tdatablock.h"
#include "os.h" #include "os.h"
#include "talgo.h" #include "talgo.h"
#include "tcompare.h" #include "tcompare.h"
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "tlosertree.h" #include "tlosertree.h"
#include "tsdbDef.h" #include "tsdbDef.h"
#include "tmsg.h" #include "tmsg.h"
#include "tsdbCommit.h"
#define EXTRA_BYTES 2 #define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
...@@ -209,34 +210,34 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load ...@@ -209,34 +210,34 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
return pLocalIdList; return pLocalIdList;
} }
//int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) { int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
//
// int64_t rows = 0; int64_t rows = 0;
// STsdbMemTable* pMemTable = pTsdbReadHandle->pMemTable; STsdbMemTable* pMemTable = NULL;//pTsdbReadHandle->pMemTable;
// if (pMemTable == NULL) { return rows; } if (pMemTable == NULL) { return rows; }
//
//// STableData* pMem = NULL; // STableData* pMem = NULL;
//// STableData* pIMem = NULL; // STableData* pIMem = NULL;
//
//// SMemTable* pMemT = pMemRef->snapshot.mem; // SMemTable* pMemT = pMemRef->snapshot.mem;
//// SMemTable* pIMemT = pMemRef->snapshot.imem; // SMemTable* pIMemT = pMemRef->snapshot.imem;
//
// size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
// STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i); STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
//
//// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) { // if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
//// pMem = pMemT->tData[pCheckInfo->tableId]; // pMem = pMemT->tData[pCheckInfo->tableId];
//// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0; // rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
//// } // }
//// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) { // if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
//// pIMem = pIMemT->tData[pCheckInfo->tableId]; // pIMem = pIMemT->tData[pCheckInfo->tableId];
//// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0; // rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
//// } // }
// } }
// return rows; return rows;
//} }
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) { static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) {
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
...@@ -2261,12 +2262,13 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) { ...@@ -2261,12 +2262,13 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
cur->mixBlock = false; cur->mixBlock = false;
cur->blockCompleted = false; cur->blockCompleted = false;
} }
#if 0
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTableBlockInfo) { int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle;
pTableBlockInfo->totalSize = 0; pTableBlockInfo->totalSize = 0;
pTableBlockInfo->totalRows = 0; pTableBlockInfo->totalRows = 0;
STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb); STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
// find the start data block in file // find the start data block in file
...@@ -2284,9 +2286,11 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa ...@@ -2284,9 +2286,11 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock); int defaultRows = 4096;//TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
STimeWindow win = TSWINDOW_INITIALIZER; STimeWindow win = TSWINDOW_INITIALIZER;
bool ascTraverse = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
while (true) { while (true) {
numOfBlocks = 0; numOfBlocks = 0;
tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
...@@ -2299,8 +2303,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa ...@@ -2299,8 +2303,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files // current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) || if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)) {
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
...@@ -2342,19 +2345,26 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa ...@@ -2342,19 +2345,26 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
int32_t numOfRows = pBlock[j].numOfRows; int32_t numOfRows = pBlock[j].numOfRows;
pTableBlockInfo->totalRows += numOfRows; pTableBlockInfo->totalRows += numOfRows;
if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows; if (numOfRows > pTableBlockInfo->maxRows) {
if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows; pTableBlockInfo->maxRows = numOfRows;
if (numOfRows < defaultRows) pTableBlockInfo->numOfSmallBlocks+=1; }
int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex); if (numOfRows < pTableBlockInfo->minRows) {
blockInfo->numBlocksOfStep++; pTableBlockInfo->minRows = numOfRows;
}
if (numOfRows < defaultRows) {
pTableBlockInfo->numOfSmallBlocks += 1;
}
// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
// blockInfo->numBlocksOfStep++;
} }
} }
} }
return code; return code;
} }
#endif
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) { static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb); STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);
......
...@@ -128,6 +128,11 @@ typedef struct { ...@@ -128,6 +128,11 @@ typedef struct {
int64_t sumRunTimes; int64_t sumRunTimes;
} SOperatorProfResult; } SOperatorProfResult;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct STaskCostInfo { typedef struct STaskCostInfo {
int64_t created; int64_t created;
int64_t start; int64_t start;
...@@ -163,6 +168,11 @@ typedef struct SOperatorCostInfo { ...@@ -163,6 +168,11 @@ typedef struct SOperatorCostInfo {
uint64_t execCost; uint64_t execCost;
} SOperatorCostInfo; } SOperatorCostInfo;
typedef struct SOrder {
uint32_t order;
SColumn col;
} SOrder;
// The basic query information extracted from the SQueryInfo tree to support the // The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node. // execution of query in a data node.
typedef struct STaskAttr { typedef struct STaskAttr {
...@@ -196,7 +206,6 @@ typedef struct STaskAttr { ...@@ -196,7 +206,6 @@ typedef struct STaskAttr {
STimeWindow window; STimeWindow window;
SInterval interval; SInterval interval;
SSessionWindow sw;
int16_t precision; int16_t precision;
int16_t numOfOutput; int16_t numOfOutput;
int16_t fillType; int16_t fillType;
...@@ -206,13 +215,8 @@ typedef struct STaskAttr { ...@@ -206,13 +215,8 @@ typedef struct STaskAttr {
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxTableColumnWidth; int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query int32_t tagLen; // tag value length of current query
SGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1; SExprInfo* pExpr1;
SExprInfo* pExpr2;
int32_t numOfExpr2;
SExprInfo* pExpr3;
int32_t numOfExpr3;
SColumnInfo* tableCols; SColumnInfo* tableCols;
SColumnInfo* tagColList; SColumnInfo* tagColList;
...@@ -220,8 +224,6 @@ typedef struct STaskAttr { ...@@ -220,8 +224,6 @@ typedef struct STaskAttr {
int64_t* fillVal; int64_t* fillVal;
SSingleColumnFilterInfo* pFilterInfo; SSingleColumnFilterInfo* pFilterInfo;
// SFilterInfo *pFilters;
void* tsdb; void* tsdb;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo> STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId; int32_t vgId;
...@@ -384,7 +386,7 @@ typedef struct SExchangeInfo { ...@@ -384,7 +386,7 @@ typedef struct SExchangeInfo {
} SExchangeInfo; } SExchangeInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
void* pTsdbReadHandle; void* dataReader;
int32_t numOfBlocks; // extract basic running information. int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped; int32_t numOfSkipped;
int32_t numOfBlockStatis; int32_t numOfBlockStatis;
...@@ -644,12 +646,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -644,12 +646,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput); int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,
......
...@@ -38,9 +38,14 @@ void minFunction(SqlFunctionCtx* pCtx); ...@@ -38,9 +38,14 @@ void minFunction(SqlFunctionCtx* pCtx);
void maxFunction(SqlFunctionCtx *pCtx); void maxFunction(SqlFunctionCtx *pCtx);
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void stddevFunction(SqlFunctionCtx* pCtx); void stddevFunction(SqlFunctionCtx* pCtx);
void stddevFinalize(SqlFunctionCtx* pCtx); void stddevFinalize(SqlFunctionCtx* pCtx);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void percentileFunction(SqlFunctionCtx *pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void firstFunction(SqlFunctionCtx *pCtx); void firstFunction(SqlFunctionCtx *pCtx);
void lastFunction(SqlFunctionCtx *pCtx); void lastFunction(SqlFunctionCtx *pCtx);
......
...@@ -68,9 +68,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -68,9 +68,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.checkFunc = stubCheckAndGetResultType, .checkFunc = stubCheckAndGetResultType,
.getEnvFunc = getStddevFuncEnv, .getEnvFunc = getStddevFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = stddevFunctionSetup,
.processFunc = maxFunction, .processFunc = stddevFunction,
.finalizeFunc = functionFinalize .finalizeFunc = stddevFinalize
}, },
{ {
.name = "percentile", .name = "percentile",
...@@ -434,6 +434,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { ...@@ -434,6 +434,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
break; break;
} }
case FUNCTION_TYPE_STDDEV:
case FUNCTION_TYPE_SIN: case FUNCTION_TYPE_SIN:
case FUNCTION_TYPE_COS: case FUNCTION_TYPE_COS:
case FUNCTION_TYPE_TAN: case FUNCTION_TYPE_TAN:
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "builtinsimpl.h" #include "builtinsimpl.h"
#include "tpercentile.h"
#include "querynodes.h" #include "querynodes.h"
#include "taggfunction.h" #include "taggfunction.h"
#include "tdatablock.h" #include "tdatablock.h"
...@@ -453,6 +454,7 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { ...@@ -453,6 +454,7 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
} }
typedef struct SStddevRes { typedef struct SStddevRes {
double result;
int64_t count; int64_t count;
union {double quadraticDSum; int64_t quadraticISum;}; union {double quadraticDSum; int64_t quadraticISum;};
union {double dsum; int64_t isum;}; union {double dsum; int64_t isum;};
...@@ -463,38 +465,129 @@ bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { ...@@ -463,38 +465,129 @@ bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return true; return true;
} }
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
SStddevRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
memset(pRes, 0, sizeof(SStddevRes));
return true;
}
void stddevFunction(SqlFunctionCtx* pCtx) { void stddevFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0; int32_t numOfElem = 0;
// Only the pre-computing information loaded and actual data does not loaded // Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0]; SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
int32_t type = pInput->pData[0]->info.type; int32_t type = pInput->pData[0]->info.type;
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// } else { // computing based on the true data block // computing based on the true data block
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows; int32_t numOfRows = pInput->numOfRows;
switch(type) { switch (type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_TINYINT: {
int32_t* plist = (int32_t*)pCol->pData; int8_t* plist = (int8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
numOfElem += 1;
pStddevRes->count += 1; pStddevRes->count += 1;
pStddevRes->isum += plist[i]; pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i]; pStddevRes->quadraticISum += plist[i] * plist[i];
} }
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t* plist = (int16_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t* plist = (int32_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t* plist = (int64_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float* plist = (float*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double* plist = (double*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElem += 1;
pStddevRes->count += 1;
pStddevRes->isum += plist[i];
pStddevRes->quadraticISum += plist[i] * plist[i];
} }
break; break;
} }
default:
break;
}
// data in the check operation are all null, not output // data in the check operation are all null, not output
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
} }
...@@ -503,11 +596,122 @@ void stddevFinalize(SqlFunctionCtx* pCtx) { ...@@ -503,11 +596,122 @@ void stddevFinalize(SqlFunctionCtx* pCtx) {
functionFinalize(pCtx); functionFinalize(pCtx);
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
double res = pStddevRes->quadraticISum/pStddevRes->count - (pStddevRes->isum / pStddevRes->count) * (pStddevRes->isum / pStddevRes->count); double avg = pStddevRes->isum / ((double) pStddevRes->count);
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
}
typedef struct SPercentileInfo {
tMemBucket *pMemBucket;
int32_t stage;
double minval;
double maxval;
int64_t numOfElems;
} SPercentileInfo;
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SPercentileInfo);
return true;
}
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
// in the first round, get the min-max value of all involved data
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
SET_DOUBLE_VAL(&pInfo->minval, DBL_MAX);
SET_DOUBLE_VAL(&pInfo->maxval, -DBL_MAX);
pInfo->numOfElems = 0;
return true;
} }
void percentileFunction(SqlFunctionCtx *pCtx) {
int32_t notNullElems = 0;
#if 0
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
if (pInfo->numOfElems == 0) {
pResInfo->complete = true;
return;
} else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
}
}
// the first stage, only acquire the min/max value
if (pInfo->stage == 0) {
if (pCtx->preAggVals.isSet) {
double tmin = 0.0, tmax = 0.0;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
tmin = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.min);
tmax = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.max);
} else if (IS_FLOAT_TYPE(pCtx->inputType)) {
tmin = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.min);
tmax = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.max);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
tmin = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.min);
tmax = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.max);
} else {
assert(true);
}
if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
SET_DOUBLE_VAL(&pInfo->minval, tmin);
}
if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) {
SET_DOUBLE_VAL(&pInfo->maxval, tmax);
}
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull);
} else {
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, data);
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
SET_DOUBLE_VAL(&pInfo->minval, v);
}
if (v > GET_DOUBLE_VAL(&pInfo->maxval)) {
SET_DOUBLE_VAL(&pInfo->maxval, v);
}
pInfo->numOfElems += 1;
}
}
return;
}
// the second stage, calculate the true percentile value
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
notNullElems += 1;
tMemBucketPut(pInfo->pMemBucket, data, 1);
}
SET_VAL(pCtx, notNullElems, 1);
pResInfo->hasResult = DATA_SET_FLAG;
#endif
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
......
...@@ -176,26 +176,6 @@ typedef struct SResPair { ...@@ -176,26 +176,6 @@ typedef struct SResPair {
double avg; double avg;
} SResPair; } SResPair;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct STableBlockDist {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray *dataBlockInfos;
} STableBlockDist;
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) { void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) {
pCell->initialized = false; pCell->initialized = false;
} }
...@@ -3984,7 +3964,7 @@ static void irate_function(SqlFunctionCtx *pCtx) { ...@@ -3984,7 +3964,7 @@ static void irate_function(SqlFunctionCtx *pCtx) {
} }
} }
static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) { static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDistInfo* pDist) {
SBufferReader br = tbufInitReader(data, len, false); SBufferReader br = tbufInitReader(data, len, false);
pDist->numOfTables = tbufReadUint32(&br); pDist->numOfTables = tbufReadUint32(&br);
...@@ -4024,7 +4004,7 @@ static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDi ...@@ -4024,7 +4004,7 @@ static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDi
static void blockInfo_func(SqlFunctionCtx* pCtx) { static void blockInfo_func(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo);
int32_t len = *(int32_t*) pCtx->pInput; int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist); blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist);
...@@ -4036,8 +4016,8 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) { ...@@ -4036,8 +4016,8 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) {
//pResInfo->hasResult = DATA_SET_FLAG; //pResInfo->hasResult = DATA_SET_FLAG;
} }
static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlockDist* pSrc) { static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlockDistInfo* pSrc) {
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo);
assert(pDist != NULL && pSrc != NULL); assert(pDist != NULL && pSrc != NULL);
pDist->numOfTables += pSrc->numOfTables; pDist->numOfTables += pSrc->numOfTables;
...@@ -4071,7 +4051,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock ...@@ -4071,7 +4051,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock
} }
void block_func_merge(SqlFunctionCtx* pCtx) { void block_func_merge(SqlFunctionCtx* pCtx) {
STableBlockDist info = {0}; STableBlockDistInfo info = {0};
int32_t len = *(int32_t*) pCtx->pInput; int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info); blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
...@@ -4082,7 +4062,7 @@ void block_func_merge(SqlFunctionCtx* pCtx) { ...@@ -4082,7 +4062,7 @@ void block_func_merge(SqlFunctionCtx* pCtx) {
//pResInfo->hasResult = DATA_SET_FLAG; //pResInfo->hasResult = DATA_SET_FLAG;
} }
void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents, void getPercentiles(STableBlockDistInfo *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents,
double* percents, int32_t* percentiles) { double* percents, int32_t* percentiles) {
if (totalBlocks == 0) { if (totalBlocks == 0) {
for (int32_t i = 0; i < numOfPercents; ++i) { for (int32_t i = 0; i < numOfPercents; ++i) {
...@@ -4117,7 +4097,7 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32 ...@@ -4117,7 +4097,7 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32
} }
} }
void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { void generateBlockDistResult(STableBlockDistInfo *pTableBlockDist, char* result) {
if (pTableBlockDist == NULL) { if (pTableBlockDist == NULL) {
return; return;
} }
...@@ -4178,7 +4158,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) { ...@@ -4178,7 +4158,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) { void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo);
pDist->rowSize = (uint16_t)pCtx->param[0].i; pDist->rowSize = (uint16_t)pCtx->param[0].i;
generateBlockDistResult(pDist, pCtx->pOutput); generateBlockDistResult(pDist, pCtx->pOutput);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册