diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index caea52a5ef9831bba2132ab522b0e80c979f4ac5..10b4866a965a47aafc03232c9f73168c30d6f597 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -31,8 +31,8 @@ pNode will be freed in API; */ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes); -/* -pDst need to freed in caller +/* +pDst need to freed in caller */ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst); @@ -77,6 +77,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 36de0d11494945aa8ee8cb09d6872a0395b3021a..27067fc966288943aaf074f38ecf27a71b1a5be3 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -423,6 +423,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .sprocessFunc = timeTruncateFunction, .finalizeFunc = NULL }, + { + .name = "timediff", + .type = FUNCTION_TYPE_TIMEDIFF, + .classification = FUNC_MGT_SCALAR_FUNC, + .checkFunc = checkAndGetResultType, + .getEnvFunc = NULL, + .initFunc = NULL, + .sprocessFunc = timeDiffFunction, + .finalizeFunc = NULL + }, { .name = "_rowts", .type = FUNCTION_TYPE_ROWTS, @@ -651,6 +661,10 @@ int32_t checkAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP}; break; } + case FUNCTION_TYPE_TIMEDIFF: { + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; + break; + } case FUNCTION_TYPE_TBNAME: { // todo diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index b53dc955de2436f93ab6498b8386f607aa03f4db..76059cf2e17b102592f0cf03d6ad020f6c27be6a 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1104,6 +1104,142 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara return TSDB_CODE_SUCCESS; } +int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + if (inputNum != 2 && inputNum != 3) { + return TSDB_CODE_FAILED; + } + + int32_t timePrec = GET_PARAM_PRECISON(&pInput[0]); + int64_t timeUnit = -1, timeVal[2] = {0}; + if (inputNum == 3) { + if (GET_PARAM_TYPE(&pInput[2]) != TSDB_DATA_TYPE_BIGINT) { + return TSDB_CODE_FAILED; + } + GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); + } + + char *input[2]; + for (int32_t k = 0; k < 2; ++k) { + int32_t type = GET_PARAM_TYPE(&pInput[k]); + if (type != TSDB_DATA_TYPE_BIGINT && type != TSDB_DATA_TYPE_TIMESTAMP && + type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) { + return TSDB_CODE_FAILED; + } + + if (IS_VAR_DATA_TYPE(type)) { + input[k] = pInput[k].columnData->pData + pInput[k].columnData->varmeta.offset[0]; + } else { + input[k] = pInput[k].columnData->pData; + } + } + + for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { + for (int32_t k = 0; k < 2; ++k) { + if (colDataIsNull_s(pInput[0].columnData, i)) { + colDataAppendNULL(pOutput->columnData, i); + continue; + } + + int32_t type = GET_PARAM_TYPE(&pInput[k]); + if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */ + convertStringToTimestamp(type, input[k], TSDB_TIME_PRECISION_NANO, &timeVal[k]); + } else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) { /* unix timestamp or ts column*/ + GET_TYPED_DATA(timeVal[k], int64_t, type, input[k]); + if (type == TSDB_DATA_TYPE_TIMESTAMP) { + int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : + (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); + int64_t timeValSec = timeVal[k] / factor; + if (timeValSec < 1000000000) { + timeVal[k] = timeValSec; + } + } + + char buf[20] = {0}; + NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal[k], sizeof(buf), buf); + int32_t tsDigits = (int32_t)strlen(buf); + if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { + timeVal[k] = timeVal[k] * 1000000000; + } else if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { + timeVal[k] = timeVal[k] * 1000000; + } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { + timeVal[k] = timeVal[k] * 1000; + } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { + timeVal[k] = timeVal[k]; + } + } + + if (IS_VAR_DATA_TYPE(type)) { + input[k] += varDataTLen(input[k]); + } else { + input[k] += tDataTypes[type].bytes; + } + } + + int64_t result = (timeVal[0] >= timeVal[1]) ? (timeVal[0] - timeVal[1]) : + (timeVal[1] - timeVal[0]); + + if (timeUnit < 0) { // if no time unit given use db precision + switch(timePrec) { + case TSDB_TIME_PRECISION_MILLI: { + result = result / 1000000; + break; + } + case TSDB_TIME_PRECISION_MICRO: { + result = result / 1000; + break; + } + case TSDB_TIME_PRECISION_NANO: { + result = result / 1; + break; + } + } + } else { + int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : + (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); + timeUnit = timeUnit * 1000 / factor; + switch(timeUnit) { + case 0: { /* 1u */ + result = result / 1000; + break; + } + case 1: { /* 1a */ + result = result / 1000000; + break; + } + case 1000: { /* 1s */ + result = result / 1000000000; + break; + } + case 60000: { /* 1m */ + result = result / 1000000000 / 60; + break; + } + case 3600000: { /* 1h */ + result = result / 1000000000 / 3600; + break; + } + case 86400000: { /* 1d */ + result = result / 1000000000 / 86400; + break; + } + case 604800000: { /* 1w */ + result = result / 1000000000 / 604800; + break; + } + default: { + break; + } + } + } + + colDataAppend(pOutput->columnData, i, (char *)&result, false); + } + + pOutput->numOfRows = pInput->numOfRows; + + return TSDB_CODE_SUCCESS; +} + int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return doScalarFunctionUnique(pInput, inputNum, pOutput, atan); }