diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b22a9866c3fc612a603595f7da8812cfa5c37ed0..900fd749f5d598b4f10dfa3acf8638f62e05b6bd 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -266,6 +266,20 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t paraLen = LIST_LENGTH(pFunc->pParameterList); + if (paraLen == 0 || paraLen > 2) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + if (!IS_NUMERIC_TYPE(p1->resType.type)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + pFunc->node.resType = p1->resType; + return TSDB_CODE_SUCCESS; +} + static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -613,7 +627,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "diff", .type = FUNCTION_TYPE_DIFF, .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, - .translateFunc = translateInOutNum, + .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, .processFunc = diffFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 896c1c9164f464efe078caf12854e7770927936a..f5d6521ef4070c4e81e9987b8148923f4f3e8c60 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -75,7 +75,7 @@ typedef struct SPercentileInfo { typedef struct SDiffInfo { bool hasPrev; bool includeNull; - bool ignoreNegative; + bool ignoreNegative; // replace the ignore with case when bool firstOutput; union { int64_t i64; @@ -1208,248 +1208,192 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); pDiffInfo->hasPrev = false; pDiffInfo->prev.i64 = 0; - pDiffInfo->ignoreNegative = false; // TODO set correct param + pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param pDiffInfo->includeNull = false; pDiffInfo->firstOutput = false; return true; } +static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) { + switch(type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: + pDiffInfo->prev.i64 = *(int8_t*) pv; break; + case TSDB_DATA_TYPE_INT: + pDiffInfo->prev.i64 = *(int32_t*) pv; break; + case TSDB_DATA_TYPE_SMALLINT: + pDiffInfo->prev.i64 = *(int16_t*) pv; break; + case TSDB_DATA_TYPE_BIGINT: + pDiffInfo->prev.i64 = *(int64_t*) pv; break; + case TSDB_DATA_TYPE_FLOAT: + pDiffInfo->prev.d64 = *(float *) pv; break; + case TSDB_DATA_TYPE_DOUBLE: + pDiffInfo->prev.d64 = *(double*) pv; break; + default: + ASSERT(0); + } +} + +static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, int32_t order) { + int32_t factor = (order == TSDB_ORDER_ASC)? 1:-1; + switch (type) { + case TSDB_DATA_TYPE_INT: { + int32_t v = *(int32_t*)pv; + int32_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt32(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: { + int8_t v = *(int8_t*)pv; + int8_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt8(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t v = *(int16_t*)pv; + int16_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt16(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t v = *(int64_t*)pv; + int64_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendInt64(pOutput, pos, &delta); + } + pDiffInfo->prev.i64 = v; + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float v = *(float*)pv; + float delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendFloat(pOutput, pos, &delta); + } + pDiffInfo->prev.d64 = v; + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double v = *(double*)pv; + double delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null + if (delta < 0 && pDiffInfo->ignoreNegative) { + colDataSetNull_f(pOutput->nullbitmap, pos); + } else { + colDataAppendDouble(pOutput, pos, &delta); + } + pDiffInfo->prev.d64 = v; + break; + } + default: + ASSERT(0); + } + } + int32_t diffFunction(SqlFunctionCtx* pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; - - bool isFirstBlock = (pDiffInfo->hasPrev == false); - int32_t numOfElems = 0; + SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pTsOutput = pCtx->pTsOutput; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + int32_t numOfElems = 0; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; int32_t startOffset = pCtx->offset; - switch (pInputCol->info.type) { - case TSDB_DATA_TYPE_INT: { - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - if (pCtx->order == TSDB_ORDER_ASC) { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems); - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - if (pDiffInfo->includeNull) { - colDataSetNull_f(pOutput->nullbitmap, pos); - if (tsList != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); - } - - numOfElems += 1; - } - continue; - } - int32_t v = *(int32_t*)colDataGetData(pInputCol, i); - if (pDiffInfo->hasPrev) { - int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { - colDataSetNull_f(pOutput->nullbitmap, pos); - } else { - colDataAppendInt32(pOutput, pos, &delta); - } - - if (pTsOutput != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); - } - } + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - pDiffInfo->prev.i64 = v; - pDiffInfo->hasPrev = true; - numOfElems++; - } - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - int32_t v = *(int32_t*)colDataGetData(pInputCol, i); - int32_t pos = startOffset + numOfElems; - - // there is a row of previous data block to be handled in the first place. - if (pDiffInfo->hasPrev) { - int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null - if (delta < 0 && pDiffInfo->ignoreNegative) { - colDataSetNull_f(pOutput->nullbitmap, pos); - } else { - colDataAppendInt32(pOutput, pos, &delta); - } + if (pCtx->order == TSDB_ORDER_ASC) { + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; - if (pTsOutput != NULL) { - colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs); - } - pDiffInfo->hasPrev = false; + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (pDiffInfo->includeNull) { + colDataSetNull_f(pOutput->nullbitmap, pos); + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); } - // it is not the last row of current block - if (i < pInput->numOfRows + pInput->startRowIndex - 1) { - int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1); - - int32_t delta = v - next; // direct previous may be null - colDataAppendInt32(pOutput, pos, &delta); - - if (pTsOutput != NULL) { - colDataAppendInt64(pTsOutput, pos, &tsList[i]); - } - } else { - pDiffInfo->prev.i64 = v; - if (pTsOutput != NULL) { - pDiffInfo->prevTs = tsList[i]; - } - pDiffInfo->hasPrev = true; - } - numOfElems++; + numOfElems += 1; } - + continue; } - break; - } - case TSDB_DATA_TYPE_BIGINT: { - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { - continue; - } - - int32_t v = 0; - if (pDiffInfo->hasPrev) { - v = *(int64_t*)colDataGetData(pInputCol, i); - int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null - if (pDiffInfo->ignoreNegative) { - continue; - } + char* pv = colDataGetData(pInputCol, i); - // *(pOutput++) = delta; - // *pTimestamp = (tsList != NULL)? tsList[i]:0; - // - // pOutput += 1; - // pTimestamp += 1; + if (pDiffInfo->hasPrev) { + doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); } - pDiffInfo->prev.i64 = v; - pDiffInfo->hasPrev = true; numOfElems++; + } else { + doSetPrevVal(pDiffInfo, pInputCol->info.type, pv); } - break; - } -#if 0 - case TSDB_DATA_TYPE_DOUBLE: { - double *pData = (double *)data; - double *pOutput = (double *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } - - if (pDiffInfo->hasPrev) { // initial value is not set yet - SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; - } - - pDiffInfo->d64Prev = pData[i]; - pDiffInfo->hasPrev = true; - numOfElems++; - } - break; + pDiffInfo->hasPrev = true; } - case TSDB_DATA_TYPE_FLOAT: { - float *pData = (float *)data; - float *pOutput = (float *)pCtx->pOutput; - - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } + } else { + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; + + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (pDiffInfo->includeNull) { + colDataSetNull_f(pOutput->nullbitmap, pos); + if (tsList != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } - if (pDiffInfo->hasPrev) { // initial value is not set yet - *pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; + numOfElems += 1; } - - pDiffInfo->d64Prev = pData[i]; - pDiffInfo->hasPrev = true; - numOfElems++; + continue; } - break; - } - case TSDB_DATA_TYPE_SMALLINT: { - int16_t *pData = (int16_t *)data; - int16_t *pOutput = (int16_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } + char* pv = colDataGetData(pInputCol, i); - if (pDiffInfo->hasPrev) { // initial value is not set yet - *pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; + // there is a row of previous data block to be handled in the first place. + if (pDiffInfo->hasPrev) { + doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs); } - pDiffInfo->i64Prev = pData[i]; - pDiffInfo->hasPrev = true; numOfElems++; + } else { + doSetPrevVal(pDiffInfo, pInputCol->info.type, pv); } - break; - } - - case TSDB_DATA_TYPE_TINYINT: { - int8_t *pData = (int8_t *)data; - int8_t *pOutput = (int8_t *)pCtx->pOutput; - for (; i < pCtx->size && i >= 0; i += step) { - if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) { - continue; - } - if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) { - continue; - } - - if (pDiffInfo->hasPrev) { // initial value is not set yet - *pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null - *pTimestamp = (tsList != NULL)? tsList[i]:0; - pOutput += 1; - pTimestamp += 1; - } - - pDiffInfo->i64Prev = pData[i]; - pDiffInfo->hasPrev = true; - numOfElems++; + pDiffInfo->hasPrev = true; + if (pTsOutput != NULL) { + pDiffInfo->prevTs = tsList[i]; } - break; } -#endif - default: - break; - // qError("error input type"); } // initial value is not set yet - if (numOfElems <= 0) { - return 0; - } else { - return (isFirstBlock) ? numOfElems - 1 : numOfElems; - } + return numOfElems; } bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {