“8baf3ae0c9a9473e4860cd205e6c24214de78be5”上不存在“docs/zh/12-data-subscription/share/share-topic-users.webp”
提交 1652cd0e 编写于 作者: H Haojun Liao

enh(query): enable twa function in select clause.

上级 041cb3f0
......@@ -571,13 +571,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef struct {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
int16_t flag; // denote if it is a tag or a normal column
char name[TSDB_DB_FNAME_LEN];
} SColIndex;
typedef struct {
int16_t lowerRelOptr;
int16_t upperRelOptr;
......
......@@ -156,18 +156,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
*/
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
/**
* Create the table group according to the group by tags info
* @param pTableIdList
* @param skey
* @param groupInfo
* @param groupByIndex
* @param numOfIndex
* @return
*/
// int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGroupInfo groupInfo, SColIndex*
// groupByIndex, int32_t numOfIndex);
/**
* Update the table id list of a given query.
* @param uid child table uid
......
......@@ -67,50 +67,6 @@ typedef struct SFileBlockInfo {
#define TOP_BOTTOM_QUERY_LIMIT 100
#define FUNCTIONS_NAME_MAX_LENGTH 16
#define FUNCTION_INVALID_ID -1
#define FUNCTION_COUNT 0
#define FUNCTION_SUM 1
#define FUNCTION_AVG 2
#define FUNCTION_MIN 3
#define FUNCTION_MAX 4
#define FUNCTION_STDDEV 5
#define FUNCTION_PERCT 6
#define FUNCTION_APERCT 7
#define FUNCTION_FIRST 8
#define FUNCTION_LAST 9
#define FUNCTION_LAST_ROW 10
#define FUNCTION_TOP 11
#define FUNCTION_BOTTOM 12
#define FUNCTION_SPREAD 13
#define FUNCTION_TWA 14
#define FUNCTION_LEASTSQR 15
#define FUNCTION_TS 16
#define FUNCTION_TS_DUMMY 17
#define FUNCTION_TAG_DUMMY 18
#define FUNCTION_TS_COMP 19
#define FUNCTION_TAG 20
#define FUNCTION_PRJ 21
#define FUNCTION_TAGPRJ 22
#define FUNCTION_ARITHM 23
#define FUNCTION_DIFF 24
#define FUNCTION_FIRST_DST 25
#define FUNCTION_LAST_DST 26
#define FUNCTION_STDDEV_DST 27
#define FUNCTION_INTERP 28
#define FUNCTION_RATE 29
#define FUNCTION_IRATE 30
#define FUNCTION_TID_TAG 31
#define FUNCTION_DERIVATIVE 32
#define FUNCTION_BLKINFO 33
#define FUNCTION_COV 38
typedef struct SResultRowEntryInfo {
bool initialized:1; // output buffer has been initialized
bool complete:1; // query has completed
......@@ -180,10 +136,9 @@ typedef struct SqlFunctionCtx {
char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
int64_t *ptsList; // corresponding timestamp array list, todo remove it
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag;
struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries;
SPoint1 start;
......
......@@ -229,7 +229,7 @@ int32_t tdListAppend(SList *list, void *data);
SListNode *tdListPopHead(SList *list);
SListNode *tdListPopTail(SList *list);
SListNode *tdListGetHead(SList *list);
SListNode *tsListGetTail(SList *list);
SListNode *tdListGetTail(SList *list);
SListNode *tdListPopNode(SList *list, SListNode *node);
void tdListMove(SList *src, SList *dst);
void tdListDiscard(SList *list);
......
......@@ -75,6 +75,7 @@ typedef struct SResultRowInfo {
int32_t size; // number of result set
int32_t capacity; // max capacity
SResultRowPosition cur;
SList* openWindow;
} SResultRowInfo;
struct SqlFunctionCtx;
......
......@@ -451,15 +451,15 @@ typedef struct SIntervalAggOperatorInfo {
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock
SArray* pInterpCols; // interpolation columns
SAggSupporter aggSup; // aggregate supporter
STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info
bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
} SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
......@@ -802,7 +802,7 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initCacheSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey,
const char* pDir);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
......
......@@ -258,32 +258,6 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
return (int32_t) taosArrayGetSize(pGroupResInfo->pRows);
}
static int64_t getNumOfResultWindowRes(STaskRuntimeEnv* pRuntimeEnv, SResultRowPosition *pos, int32_t* rowCellInfoOffset) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
ASSERT(0);
for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) {
int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId;
/*
* ts, tag, tagprj function can not decide the output number of current query
* the number of output result is decided by main output
*/
if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
continue;
}
// SResultRowEntryInfo *pResultInfo = getResultCell(pResultRow, j, rowCellInfoOffset);
// assert(pResultInfo != NULL);
//
// if (pResultInfo->numOfRes > 0) {
// return pResultInfo->numOfRes;
// }
}
return 0;
}
static int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param) {
int32_t left = *(int32_t *)pLeft;
int32_t right = *(int32_t *)pRight;
......@@ -381,7 +355,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
}
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset);
int64_t num = 0;//getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset);
if (num <= 0) {
continue;
}
......
......@@ -239,36 +239,6 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
return true;
}
static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) {
int64_t newCapacity = 0;
// more than the capacity, reallocate the resources
if (pResultRowInfo->size < pResultRowInfo->capacity) {
return;
}
if (pResultRowInfo->capacity > 10000) {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25);
} else {
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
}
if (newCapacity <= pResultRowInfo->capacity) {
newCapacity += 4;
}
char* p = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
if (p == NULL) {
longjmp(env, TSDB_CODE_OUT_OF_MEMORY);
}
pResultRowInfo->pPosition = (SResultRowPosition*)p;
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc);
pResultRowInfo->capacity = (int32_t)newCapacity;
}
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false;
......@@ -306,7 +276,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
return p1 != NULL;
}
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
SFilePage* pData = NULL;
// in the first scan, new space needed for results
......@@ -392,25 +362,22 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
}
// allocate a new buffer page
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
if (pResult == NULL) {
ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize);
initResultRow(pResult);
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition));
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition));
}
// 2. set the new time window to be the new active time window
pResultRowInfo->pPosition[pResultRowInfo->size++] =
(SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
// too many time window in query
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) {
if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
......@@ -1218,7 +1185,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
taosVariantDestroy(&pCtx[i].param[j].param);
}
taosVariantDestroy(&pCtx[i].tag);
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg);
......@@ -1248,9 +1214,9 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q
static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) {
continue;
}
// if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) {
// continue;
// }
return false;
}
......@@ -1300,7 +1266,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
#if 0
if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
functionId == FUNCTION_TAG_DUMMY) {
continue;
......@@ -1311,6 +1277,8 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
} else {
hasOtherFunc = true;
}
#endif
}
if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
......@@ -1786,41 +1754,13 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf
// set the correct pointer after the memory buffer reallocated.
int32_t functionId = pBInfo->pCtx[i].functionId;
#if 0
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF ||
functionId == FUNCTION_DERIVATIVE) {
// if (i > 0) pBInfo->pCtx[i].pTsOutput = pBInfo->pCtx[i - 1].pOutput;
}
}
}
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput) {
bool needCopyTs = false;
int32_t tsNum = 0;
char* src = NULL;
for (int32_t i = 0; i < numOfOutput; i++) {
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_DIFF || functionId == FUNCTION_DERIVATIVE) {
needCopyTs = true;
if (i > 0 && pCtx[i - 1].functionId == FUNCTION_TS_DUMMY) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i - 1); // find ts data
src = pColRes->pData;
}
} else if (functionId == FUNCTION_TS_DUMMY) {
tsNum++;
}
}
if (!needCopyTs) return;
if (tsNum < 2) return;
if (src == NULL) return;
#endif
for (int32_t i = 0; i < numOfOutput; i++) {
int32_t functionId = pCtx[i].functionId;
if (functionId == FUNCTION_TS_DUMMY) {
SColumnInfoData* pColRes = taosArrayGet(pRes->pDataBlock, i);
memcpy(pColRes->pData, src, pColRes->info.bytes * pRes->info.rows);
}
}
}
......@@ -3569,7 +3509,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
offset += sizeof(int32_t);
uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
if (!resultRow) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
}
......@@ -3592,10 +3532,6 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
offset += valueLen;
initResultRow(resultRow);
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
// pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size;
// pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
// (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
}
......@@ -3887,18 +3823,6 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
}
}
// todo set the attribute of query scan count
static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionId == FUNCTION_STDDEV || functionId == FUNCTION_PERCT) {
return 2;
}
}
return 1;
}
static void destroyOperatorInfo(SOperatorInfo* pOperator) {
if (pOperator == NULL) {
return;
......
......@@ -110,9 +110,11 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo
return true;
}
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupCols, i);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
......@@ -208,7 +210,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
pInfo->isInit = true;
num++;
continue;
......@@ -223,7 +225,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// The first row of a new block does not belongs to the previous existed group
if (j == 0) {
num++;
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
continue;
}
......@@ -238,7 +240,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
num = 1;
}
......@@ -396,7 +398,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
SDataGroupInfo* pGInfo = NULL;
......
......@@ -534,8 +534,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
}
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
......
......@@ -140,6 +140,10 @@ bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo)
int32_t uniqueFunction(SqlFunctionCtx *pCtx);
//int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t twaFunction(SqlFunctionCtx *pCtx);
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......
......@@ -52,13 +52,6 @@ typedef struct SInterpInfoDetail {
int8_t primaryCol;
} SInterpInfoDetail;
typedef struct STwaInfo {
int8_t hasResult; // flag to denote has value
double dOutput;
SPoint1 p;
STimeWindow win;
} STwaInfo;
bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval);
/**
......
......@@ -18,7 +18,6 @@
#include "querynodes.h"
#include "scalar.h"
#include "taoserror.h"
#include "tdatablock.h"
static int32_t buildFuncErrMsg(char* pErrBuf, int32_t len, int32_t errCode, const char* pFormat, ...) {
va_list vArgList;
......@@ -314,7 +313,7 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
pValue->notReserved = true;
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
......@@ -1119,6 +1118,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.finalizeFunc = firstLastFinalize,
.combineFunc = lastCombine,
},
{
.name = "twa",
.type = FUNCTION_TYPE_TWA,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateInNumOutDou,
.getEnvFunc = getTwaFuncEnv,
.initFunc = twaFunctionSetup,
.processFunc = twaFunction,
.finalizeFunc = twaFinalize
},
{
.name = "histogram",
.type = FUNCTION_TYPE_HISTOGRAM,
......
......@@ -225,7 +225,6 @@ typedef struct SUniqueInfo {
int32_t numOfPoints;
uint8_t colType;
int16_t colBytes;
bool hasNull; //null is not hashable, handle separately
SHashObj *pHash;
char pItems[];
} SUniqueInfo;
......@@ -300,7 +299,7 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
//pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
char* in = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
......@@ -829,8 +828,10 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SInputColumnInfoData* pInput = &pCtx->input;
int32_t type = pInput->pData[0]->info.type;
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (IS_INTEGER_TYPE(type)) {
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
} else {
......@@ -1827,7 +1828,7 @@ bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultI
}
int32_t percentileFunction(SqlFunctionCtx* pCtx) {
int32_t notNullElems = 0;
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
......@@ -1905,11 +1906,11 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
}
char* data = colDataGetData(pCol, i);
notNullElems += 1;
numOfElems += 1;
tMemBucketPut(pInfo->pMemBucket, data, 1);
}
SET_VAL(pResInfo, notNullElems, 1);
SET_VAL(pResInfo, numOfElems, 1);
}
return TSDB_CODE_SUCCESS;
......@@ -1983,7 +1984,7 @@ bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResult
}
int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
int32_t notNullElems = 0;
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
......@@ -2000,7 +2001,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
notNullElems += 1;
numOfElems += 1;
char* data = colDataGetData(pCol, i);
double v = 0; // value
......@@ -2013,7 +2014,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
if (colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
notNullElems += 1;
numOfElems += 1;
char* data = colDataGetData(pCol, i);
double v = 0;
......@@ -2022,7 +2023,7 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
}
}
SET_VAL(pResInfo, notNullElems, 1);
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
......@@ -3690,7 +3691,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
......@@ -3713,24 +3713,6 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
return pInfo->numSampled;
}
//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
// SSampleInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
// int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
// SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
//
// //int32_t currentRow = pBlock->info.rows;
// pResInfo->numOfRes = pInfo->numSampled;
//
// for (int32_t i = 0; i < pInfo->numSampled; ++i) {
// colDataAppend(pCol, i, pInfo->data + i * pInfo->colBytes, false);
// //TODO: handle ts output
// }
//
// return pResInfo->numOfRes;
//}
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
......@@ -3806,7 +3788,6 @@ int32_t tailFunction(SqlFunctionCtx* pCtx) {
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
......@@ -3873,7 +3854,6 @@ bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
pInfo->numOfPoints = 0;
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
pInfo->hasNull = false;
if (pInfo->pHash != NULL) {
taosHashClear(pInfo->pHash);
} else {
......@@ -3887,11 +3867,10 @@ static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
if (isNull == true) {
int32_t size = sizeof(SUniqueItem) + pInfo->colBytes;
SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size);
if (pInfo->hasNull == false && pItem->isNull == false) {
if (pItem->isNull == false) {
pItem->timestamp = ts;
pItem->isNull = true;
pInfo->numOfPoints++;
pInfo->hasNull = true;
} else if (pItem->timestamp > ts && pItem->isNull == true) {
pItem->timestamp = ts;
}
......@@ -3911,8 +3890,6 @@ static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) {
} else if (pHashItem->timestamp > ts) {
pHashItem->timestamp = ts;
}
return;
}
int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
......@@ -3954,7 +3931,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) {
int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
......@@ -3967,3 +3944,325 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
typedef struct STwaInfo {
double dOutput;
int8_t hasResult; // flag to denote has value
SPoint1 p;
STimeWindow win;
} STwaInfo;
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STwaInfo);
return true;
}
bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
pInfo->p.key = INT64_MIN;
pInfo->win = TSWINDOW_INITIALIZER;
return true;
}
static double twa_get_area(SPoint1 s, SPoint1 e) {
if ((s.val >= 0 && e.val >= 0)|| (s.val <=0 && e.val <= 0)) {
return (s.val + e.val) * (e.key - s.key) / 2;
}
double x = (s.key * e.val - e.key * s.val)/(e.val - s.val);
double val = (s.val * (x - s.key) + e.val * (e.key - x)) / 2;
return val;
}
int32_t twaFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t numOfElems = 0;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SPoint1* last = &pInfo->p;
int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = pInput->startRowIndex;
for (i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (!colDataIsNull_f(pInputCol->nullbitmap, i)) {
break;
}
}
if (pCtx->start.key != INT64_MIN) {
ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) ||
(pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC));
ASSERT(last->key == INT64_MIN);
last->key = tsList[i];
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
pInfo->dOutput += twa_get_area(pCtx->start, *last);
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pCtx->start.key;
numOfElems++;
i += step;
} else if (pInfo->p.key == INT64_MIN) {
last->key = tsList[i];
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = last->key;
numOfElems++;
i += step;
}
// calculate the value of
switch(pInputCol->info.type) {
case TSDB_DATA_TYPE_TINYINT: {
int8_t *val = (int8_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t *val = (int16_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t *val = (int32_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *val = (int64_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = (double) val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = (double)val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float *val = (float*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = (double)val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double *val = (double*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t *val = (uint8_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t *val = (uint16_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t *val = (uint32_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t *val = (uint64_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
#ifndef _TD_NINGSI_60
SPoint1 st = {.key = tsList[i], .val = (double) val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = (double) val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st;
}
break;
}
default: assert(0);
}
// the last interpolated time window value
if (pCtx->end.key != INT64_MIN) {
pInfo->dOutput += twa_get_area(pInfo->p, pCtx->end);
pInfo->p = pCtx->end;
}
pInfo->win.ekey = pInfo->p.key;
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
/*
* To copy the input to interResBuf to avoid the input buffer space be over writen
* by next input data. The TWA function only applies to each table, so no merge procedure
* is required, we simply copy to the resut ot interResBuffer.
*/
//void twa_function_copy(SQLFunctionCtx *pCtx) {
// assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
// SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
//
// memcpy(GET_ROWCELL_INTERBUF(pResInfo), pCtx->pInput, (size_t)pCtx->inputBytes);
// pResInfo->hasResult = ((STwaInfo *)pCtx->pInput)->hasResult;
//}
int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->hasResult != DATA_SET_FLAG) {
pResInfo->isNullRes = 1;
} else {
// assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult);
if (pInfo->win.ekey == pInfo->win.skey) {
pInfo->dOutput = pInfo->p.val;
} else {
pInfo->dOutput = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey);
}
pResInfo->numOfRes = 1;
}
return functionFinalize(pCtx, pBlock);
}
......@@ -236,7 +236,7 @@ bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) {
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) {
return pEntry->initialized;
}
#if 0
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable/*, SUdfInfo* pUdfInfo*/) {
if (!isValidDataType(dataType)) {
......@@ -470,6 +470,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
return TSDB_CODE_SUCCESS;
}
#endif
static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
if (pResultInfo->initialized) {
......
......@@ -95,7 +95,7 @@ SListNode *tdListPopTail(SList *list) {
SListNode *tdListGetHead(SList *list) { return TD_DLIST_HEAD(list); }
SListNode *tsListGetTail(SList *list) { return TD_DLIST_TAIL(list); }
SListNode *tdListGetTail(SList *list) { return TD_DLIST_TAIL(list); }
SListNode *tdListPopNode(SList *list, SListNode *node) {
TD_DLIST_POP(list, node);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册