未验证 提交 e7596b88 编写于 作者: G Ganlin Zhao 提交者: GitHub

Merge pull request #14112 from taosdata/feat/irate_function

feat(query): add irate function
...@@ -101,6 +101,11 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); ...@@ -101,6 +101,11 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t derivativeFunction(SqlFunctionCtx *pCtx); int32_t derivativeFunction(SqlFunctionCtx *pCtx);
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx *pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
......
...@@ -978,6 +978,21 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t ...@@ -978,6 +978,21 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS;
}
static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// first(col_list) will be rewritten as first(col) // first(col_list) will be rewritten as first(col)
if (1 != LIST_LENGTH(pFunc->pParameterList)) { if (1 != LIST_LENGTH(pFunc->pParameterList)) {
...@@ -1796,6 +1811,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1796,6 +1811,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = derivativeFunction, .processFunc = derivativeFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{
.name = "irate",
.type = FUNCTION_TYPE_IRATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.finalizeFunc = irateFinalize
},
{ {
.name = "last_row", .name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW, .type = FUNCTION_TYPE_LAST_ROW,
......
...@@ -59,6 +59,12 @@ typedef struct STuplePos { ...@@ -59,6 +59,12 @@ typedef struct STuplePos {
int32_t offset; int32_t offset;
} STuplePos; } STuplePos;
typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
STuplePos tuplePos;
} SMinmaxResInfo;
typedef struct STopBotResItem { typedef struct STopBotResItem {
SVariant v; SVariant v;
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
...@@ -148,6 +154,12 @@ typedef struct SElapsedInfo { ...@@ -148,6 +154,12 @@ typedef struct SElapsedInfo {
int64_t timeUnit; int64_t timeUnit;
} SElapsedInfo; } SElapsedInfo;
typedef struct STwaInfo {
double dOutput;
SPoint1 p;
STimeWindow win;
} STwaInfo;
typedef struct SHistoFuncBin { typedef struct SHistoFuncBin {
double lower; double lower;
double upper; double upper;
...@@ -234,6 +246,22 @@ typedef struct SUniqueInfo { ...@@ -234,6 +246,22 @@ typedef struct SUniqueInfo {
char pItems[]; char pItems[];
} SUniqueInfo; } SUniqueInfo;
typedef struct SDerivInfo {
double prevValue; // previous value
TSKEY prevTs; // previous timestamp
bool ignoreNegative; // ignore the negative value
int64_t tsWindow; // time window for derivative
bool valueSet; // the value has been set already
} SDerivInfo;
typedef struct SRateInfo {
double firstValue;
TSKEY firstKey;
double lastValue;
TSKEY lastKey;
int8_t hasResult; // flag to denote has value
} SRateInfo;
#define SET_VAL(_info, numOfElem, res) \ #define SET_VAL(_info, numOfElem, res) \
do { \ do { \
if ((numOfElem) <= 0) { \ if ((numOfElem) <= 0) { \
...@@ -927,12 +955,6 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin ...@@ -927,12 +955,6 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
return FUNC_DATA_REQUIRED_STATIS_LOAD; return FUNC_DATA_REQUIRED_STATIS_LOAD;
} }
typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
STuplePos tuplePos;
} SMinmaxResInfo;
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) { if (!functionSetup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized return false; // not initialized since it has been initialized
...@@ -4665,12 +4687,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -4665,12 +4687,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
typedef struct STwaInfo {
double dOutput;
SPoint1 p;
STimeWindow win;
} STwaInfo;
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STwaInfo); pEnv->calcMemSize = sizeof(STwaInfo);
return true; return true;
...@@ -5119,14 +5135,6 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -5119,14 +5135,6 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct SDerivInfo {
double prevValue; // previous value
TSKEY prevTs; // previous timestamp
bool ignoreNegative; // ignore the negative value
int64_t tsWindow; // time window for derivative
bool valueSet; // the value has been set already
} SDerivInfo;
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDerivInfo); pEnv->calcMemSize = sizeof(SDerivInfo);
return true; return true;
...@@ -5221,6 +5229,117 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) { ...@@ -5221,6 +5229,117 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
return numOfElems; return numOfElems;
} }
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SRateInfo);
return true;
}
bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false; // not initialized since it has been initialized
}
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->firstKey = INT64_MIN;
pInfo->lastKey = INT64_MIN;
pInfo->firstValue = (double)INT64_MIN;
pInfo->lastValue = (double)INT64_MIN;
pInfo->hasResult = 0;
return true;
}
int32_t irateFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
int32_t numOfElems = 0;
int32_t type = pInputCol->info.type;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pInputCol, i);
double v = 0;
GET_TYPED_DATA(v, double, type, data);
if (INT64_MIN == pRateInfo->lastKey) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = tsList[i];
continue;
}
if (tsList[i] > pRateInfo->lastKey) {
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
pRateInfo->firstValue = pRateInfo->lastValue;
pRateInfo->firstKey = pRateInfo->lastKey;
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = tsList[i];
continue;
}
if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = tsList[i];
}
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) ||
(pRateInfo->firstKey >= pRateInfo->lastKey)) {
return 0.0;
}
double diff = 0;
// If the previous value of the last is greater than the last value, only keep the last point instead of the delta
// value between two values.
diff = pRateInfo->lastValue;
if (diff >= pRateInfo->firstValue) {
diff -= pRateInfo->firstValue;
}
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
if (duration == 0) {
return 0;
}
return (duration > 0)? ((double)diff) / (duration/tickPerSec):0.0;
}
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
double result = doCalcRate(pInfo, 1000);
colDataAppend(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes);
return pResInfo->numOfRes;
}
int32_t interpFunction(SqlFunctionCtx* pCtx) { int32_t interpFunction(SqlFunctionCtx* pCtx) {
#if 0 #if 0
int32_t fillType = (int32_t) pCtx->param[2].i64; int32_t fillType = (int32_t) pCtx->param[2].i64;
......
...@@ -518,7 +518,7 @@ class TDTestCase: ...@@ -518,7 +518,7 @@ class TDTestCase:
tdSql.query("select avg(dataint) from jsons1 where jtag is not null") tdSql.query("select avg(dataint) from jsons1 where jtag is not null")
tdSql.checkData(0, 0, 5.3) tdSql.checkData(0, 0, 5.3)
#tdSql.error("select twa(dataint) from jsons1 where jtag is not null") #tdSql.error("select twa(dataint) from jsons1 where jtag is not null")
tdSql.error("select irate(dataint) from jsons1 where jtag is not null") tdSql.query("select irate(dataint) from jsons1 where jtag is not null")
#tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null") #tdSql.query("select sum(dataint) from jsons1 where jtag->'tag1' is not null")
#tdSql.checkData(0, 0, 49) #tdSql.checkData(0, 0, 49)
tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1") tdSql.query("select stddev(dataint) from jsons1 where jtag->'tag1'>1")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册