From 85adb8611c661808d26b1f83cda4e906e24ae7d9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 10 May 2022 20:32:03 +0800 Subject: [PATCH] feat(query): add state_count function --- include/libs/function/functionMgt.h | 2 + source/libs/function/inc/builtinsimpl.h | 4 + source/libs/function/inc/taggfunction.h | 2 +- source/libs/function/src/builtins.c | 30 +++++ source/libs/function/src/builtinsimpl.c | 164 ++++++++++++++++++++++++ 5 files changed, 201 insertions(+), 1 deletion(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 26bf566808..1a06a90f89 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -55,6 +55,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_TAIL, FUNCTION_TYPE_TOP, FUNCTION_TYPE_UNIQUE, + FUNCTION_TYPE_STATE_COUNT, + FUNCTION_TYPE_STATE_DURATION, // math function FUNCTION_TYPE_ABS = 1000, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index ae989f3280..9e42a49785 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -88,6 +88,10 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t stateCountFunction(SqlFunctionCtx* pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/inc/taggfunction.h b/source/libs/function/inc/taggfunction.h index 0697d309ac..d779cf50f4 100644 --- a/source/libs/function/inc/taggfunction.h +++ b/source/libs/function/inc/taggfunction.h @@ -67,7 +67,7 @@ bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const cha */ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32_t bufLen) { pResInfo->initialized = true; // the this struct has been initialized flag - + pResInfo->complete = false; pResInfo->numOfRes = 0; memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0eb014646b..d7e16cca81 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -262,6 +262,26 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (3 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY || + (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BIGINT && + ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_DOUBLE)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT }; + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -675,6 +695,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = histogramFunction, .finalizeFunc = histogramFinalize }, + { + .name = "state_count", + .type = FUNCTION_TYPE_STATE_COUNT, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC, + .translateFunc = translateState, + .getEnvFunc = getStateFuncEnv, + .initFunc = functionSetup, + .processFunc = stateCountFunction, + .finalizeFunc = NULL + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 009b964eb9..755b6d9b21 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -122,6 +122,19 @@ typedef enum { LOG_BIN } EHistoBinType; +typedef struct SStateInfo { + int64_t count; +} SStateInfo; + +typedef enum { + STATE_OPER_INVALID = 0, + STATE_OPER_LT, + STATE_OPER_GT, + STATE_OPER_LE, + STATE_OPER_GE, + STATE_OPER_NE, + STATE_OPER_EQ, +} EStateOperType; #define SET_VAL(_info, numOfElem, res) \ do { \ @@ -2382,3 +2395,154 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } + +bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SStateInfo); + return true; +} + +static int8_t getStateOpType(char *opStr) { + int8_t opType; + if (strcasecmp(opStr, "LT") == 0) { + opType = STATE_OPER_LT; + } else if (strcasecmp(opStr, "GT") == 0) { + opType = STATE_OPER_GT; + } else if (strcasecmp(opStr, "LE") == 0) { + opType = STATE_OPER_LE; + } else if (strcasecmp(opStr, "GE") == 0) { + opType = STATE_OPER_GE; + } else if (strcasecmp(opStr, "NE") == 0) { + opType = STATE_OPER_NE; + } else if (strcasecmp(opStr, "EQ") == 0) { + opType = STATE_OPER_EQ; + } else { + opType = STATE_OPER_INVALID; + } + + return opType; +} + +#define GET_STATE_VAL(param) \ + ((param.nType == TSDB_DATA_TYPE_BIGINT) ? (param.i) : (param.d)) \ + +#define STATE_COMP(_op, _lval, _param) \ + STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_param)) \ + +#define STATE_COMP_IMPL(_op, _lval, _rval) \ + do { \ + switch(_op) { \ + case STATE_OPER_LT: \ + return ((_lval) < (_rval)); \ + break; \ + case STATE_OPER_GT: \ + return ((_lval) > (_rval)); \ + break; \ + case STATE_OPER_LE: \ + return ((_lval) <= (_rval)); \ + break; \ + case STATE_OPER_GE: \ + return ((_lval) >= (_rval)); \ + break; \ + case STATE_OPER_NE: \ + return ((_lval) != (_rval)); \ + break; \ + case STATE_OPER_EQ: \ + return ((_lval) == (_rval)); \ + break; \ + default: \ + break; \ + } \ + } while (0) \ + +static bool checkStateOp(int8_t op, SColumnInfoData* pCol, int32_t index, SVariant param) { + char* data = colDataGetData(pCol, index); + switch(pCol->info.type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t v = *(int8_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t v = *(uint8_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t v = *(int16_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t v = *(uint16_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t v = *(int32_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t v = *(uint32_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t v = *(int64_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t v = *(uint64_t *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float v = *(float *)data; + STATE_COMP(op, v, param); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double v = *(double *)data; + STATE_COMP(op, v, param); + break; + } + default: { + ASSERT(0); + } + } + return false; +} + +int32_t stateCountFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + + int32_t numOfElems = 0; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int8_t op = getStateOpType(varDataVal(pCtx->param[1].param.pz)); + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + numOfElems++; + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + colDataAppendNULL(pOutput, i); + continue; + } + + bool ret = checkStateOp(op, pInputCol, i, pCtx->param[2].param); + int64_t output = -1; + if (ret) { + output = ++pInfo->count; + } else { + pInfo->count = 0; + } + colDataAppend(pOutput, i, (char *)&output, false); + } + + return numOfElems; +} -- GitLab