提交 bed77020 编写于 作者: wmmhello's avatar wmmhello

Merge branch 'feature/3.0_wxy' of github.com:taosdata/TDengine into feature/3.0_wxy

......@@ -101,6 +101,11 @@ bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t derivativeFunction(SqlFunctionCtx *pCtx);
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx *pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
......
......@@ -978,6 +978,21 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t
return TSDB_CODE_SUCCESS;
}
static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
return TSDB_CODE_SUCCESS;
}
static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// first(col_list) will be rewritten as first(col)
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
......@@ -1796,6 +1811,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = derivativeFunction,
.finalizeFunc = functionFinalize
},
{
.name = "irate",
.type = FUNCTION_TYPE_IRATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.finalizeFunc = irateFinalize
},
{
.name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW,
......
......@@ -59,6 +59,12 @@ typedef struct STuplePos {
int32_t offset;
} STuplePos;
typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
STuplePos tuplePos;
} SMinmaxResInfo;
typedef struct STopBotResItem {
SVariant v;
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
......@@ -148,6 +154,12 @@ typedef struct SElapsedInfo {
int64_t timeUnit;
} SElapsedInfo;
typedef struct STwaInfo {
double dOutput;
SPoint1 p;
STimeWindow win;
} STwaInfo;
typedef struct SHistoFuncBin {
double lower;
double upper;
......@@ -234,6 +246,22 @@ typedef struct SUniqueInfo {
char pItems[];
} SUniqueInfo;
typedef struct SDerivInfo {
double prevValue; // previous value
TSKEY prevTs; // previous timestamp
bool ignoreNegative; // ignore the negative value
int64_t tsWindow; // time window for derivative
bool valueSet; // the value has been set already
} SDerivInfo;
typedef struct SRateInfo {
double firstValue;
TSKEY firstKey;
double lastValue;
TSKEY lastKey;
int8_t hasResult; // flag to denote has value
} SRateInfo;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
......@@ -927,12 +955,6 @@ EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
return FUNC_DATA_REQUIRED_STATIS_LOAD;
}
typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
STuplePos tuplePos;
} SMinmaxResInfo;
bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false; // not initialized since it has been initialized
......@@ -4665,12 +4687,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes;
}
typedef struct STwaInfo {
double dOutput;
SPoint1 p;
STimeWindow win;
} STwaInfo;
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(STwaInfo);
return true;
......@@ -5119,14 +5135,6 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return TSDB_CODE_SUCCESS;
}
typedef struct SDerivInfo {
double prevValue; // previous value
TSKEY prevTs; // previous timestamp
bool ignoreNegative; // ignore the negative value
int64_t tsWindow; // time window for derivative
bool valueSet; // the value has been set already
} SDerivInfo;
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDerivInfo);
return true;
......@@ -5221,6 +5229,117 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
return numOfElems;
}
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SRateInfo);
return true;
}
bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
if (!functionSetup(pCtx, pResInfo)) {
return false; // not initialized since it has been initialized
}
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
pInfo->firstKey = INT64_MIN;
pInfo->lastKey = INT64_MIN;
pInfo->firstValue = (double)INT64_MIN;
pInfo->lastValue = (double)INT64_MIN;
pInfo->hasResult = 0;
return true;
}
int32_t irateFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SRateInfo* pRateInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
int32_t numOfElems = 0;
int32_t type = pInputCol->info.type;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pInputCol, i);
double v = 0;
GET_TYPED_DATA(v, double, type, data);
if (INT64_MIN == pRateInfo->lastKey) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = tsList[i];
continue;
}
if (tsList[i] > pRateInfo->lastKey) {
if ((INT64_MIN == pRateInfo->firstKey) || pRateInfo->lastKey > pRateInfo->firstKey) {
pRateInfo->firstValue = pRateInfo->lastValue;
pRateInfo->firstKey = pRateInfo->lastKey;
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = tsList[i];
continue;
}
if ((INT64_MIN == pRateInfo->firstKey) || tsList[i] > pRateInfo->firstKey) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = tsList[i];
}
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) ||
(pRateInfo->firstKey >= pRateInfo->lastKey)) {
return 0.0;
}
double diff = 0;
// If the previous value of the last is greater than the last value, only keep the last point instead of the delta
// value between two values.
diff = pRateInfo->lastValue;
if (diff >= pRateInfo->firstValue) {
diff -= pRateInfo->firstValue;
}
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
if (duration == 0) {
return 0;
}
return (duration > 0)? ((double)diff) / (duration/tickPerSec):0.0;
}
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
double result = doCalcRate(pInfo, 1000);
colDataAppend(pCol, pBlock->info.rows, (const char*)&result, pResInfo->isNullRes);
return pResInfo->numOfRes;
}
int32_t interpFunction(SqlFunctionCtx* pCtx) {
#if 0
int32_t fillType = (int32_t) pCtx->param[2].i64;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册