提交 3b471831 编写于 作者: X Xiaoyu Wang

[TD-10986]<feature>: Add elapsed function.

上级 79be15f5
......@@ -2449,7 +2449,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg7 = "normal table can not apply this function";
const char* msg8 = "multi-columns selection does not support alias column name";
const char* msg9 = "diff/derivative can no be applied to unsigned numeric type";
const char* msg10 = "derivative duration should be greater than 1 Second";
const char* msg10 = "derivative/elapsed duration should be greater than 1 Second";
const char* msg11 = "third parameter in derivative should be 0 or 1";
const char* msg12 = "parameter is out of range [1, 100]";
const char* msg13 = "parameter list required";
......@@ -2549,14 +2549,16 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_FLOOR:
case TSDB_FUNC_ROUND:
case TSDB_FUNC_STDDEV:
case TSDB_FUNC_LEASTSQR: {
case TSDB_FUNC_LEASTSQR:
case TSDB_FUNC_ELAPSED: {
// 1. valid the number of parameters
int32_t numOfParams = (pItem->pNode->Expr.paramList == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->Expr.paramList);
// no parameters or more than one parameter for function
if (pItem->pNode->Expr.paramList == NULL ||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) {
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3) ||
(functionId == TSDB_FUNC_ELAPSED && numOfParams > 2)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2570,6 +2572,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
// elapsed only can be applied to primary key
if (functionId == TSDB_FUNC_ELAPSED && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "elapsed only can be applied to primary key");
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -2581,7 +2588,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. check if sql function can be applied on this column data type
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
if (!IS_NUMERIC_TYPE(pSchema->type)) {
if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
......@@ -2626,7 +2633,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} else if (functionId == TSDB_FUNC_IRATE) {
int64_t prec = info.precision;
tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
} else if (functionId == TSDB_FUNC_DERIVATIVE) {
} else if (functionId == TSDB_FUNC_DERIVATIVE || (functionId == TSDB_FUNC_ELAPSED && 2 == numOfParams)) {
char val[8] = {0};
int64_t tickPerSec = 0;
......@@ -2645,18 +2652,20 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
memset(val, 0, tListLen(val));
if (functionId == TSDB_FUNC_DERIVATIVE) {
memset(val, 0, tListLen(val));
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int64_t v = *(int64_t*) val;
if (v != 0 && v != 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
int64_t v = *(int64_t*) val;
if (v != 0 && v != 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
}
}
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
......@@ -3075,7 +3084,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
}
default: {
pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n);
if (pUdfInfo == NULL) {
......@@ -3443,7 +3451,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) ||
(functionId == TSDB_FUNC_SAMPLE)) {
(functionId == TSDB_FUNC_SAMPLE) || (functionId == TSDB_FUNC_ELAPSED)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -3498,8 +3506,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
}
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE are not allowed to apply to super table directly";
const char* msg2 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE only support group by tbname for super table query";
const char* msg1 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/Elapsed are not allowed to apply to super table directly";
const char* msg2 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/Elapsed only support group by tbname for super table query";
const char* msg3 = "functions not support for super table query";
// filter sql function not supported by metric query yet.
......@@ -3517,7 +3525,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
}
if (tscIsTWAQuery(pQueryInfo) || tscIsDiffDerivLikeQuery(pQueryInfo) || tscIsIrateQuery(pQueryInfo) ||
tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_SAMPLE)) {
tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_SAMPLE) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED)) {
if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return true;
......@@ -7327,7 +7335,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
const char* msg3 = "group by/session/state_window not allowed on projection query";
const char* msg4 = "retrieve tags not compatible with group by or interval query";
const char* msg5 = "functions can not be mixed up";
const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg only support group by tbname";
const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg/Elapsed only support group by tbname";
// only retrieve tags, group by is not supportted
if (tscQueryTags(pQueryInfo)) {
......@@ -7389,7 +7397,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
}
if ((!pQueryInfo->stateWindow) && (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA ||
f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG)) {
f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG || f == TSDB_FUNC_ELAPSED)) {
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j);
if (j == 0) {
......
......@@ -941,6 +941,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsCompQuery = query.tsCompQuery;
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needTableSeqScan = query.needTableSeqScan;
pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->stateWindow = query.stateWindow;
pQueryMsg->numOfTags = htonl(numOfTags);
......
......@@ -379,6 +379,10 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
return true;
}
bool tscNeedTableSeqScan(SQueryInfo* pQueryInfo) {
return tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_TWA) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED);
}
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo) {
if (tscIsProjectionQuery(pQueryInfo)) {
return false;
......@@ -511,7 +515,7 @@ bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo) {
}
int32_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) {
if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP || functionId == TSDB_FUNC_ELAPSED) {
return true;
}
}
......@@ -5015,6 +5019,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->needTableSeqScan = tscNeedTableSeqScan(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow;
......
......@@ -475,6 +475,7 @@ typedef struct {
bool tsCompQuery; // is tscomp query
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needTableSeqScan; // need scan table by table
bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag
......
......@@ -79,6 +79,8 @@ extern "C" {
#define TSDB_FUNC_BLKINFO 39
#define TSDB_FUNC_ELAPSED 40
///////////////////////////////////////////
// the following functions is not implemented.
// after implementation, move them before TSDB_FUNC_BLKINFO. also make TSDB_FUNC_BLKINFO the maxium function index
......
......@@ -226,6 +226,7 @@ typedef struct SQueryAttr {
bool diffQuery; // is diff query
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needTableSeqScan; // need scan table by table
bool needReverseScan; // need reverse scan
bool distinct; // distinct query or not
bool stateWindow; // window State on sub/normal table
......
......@@ -195,6 +195,12 @@ typedef struct {
char *taglists;
} SSampleFuncInfo;
typedef struct SElapsedInfo {
int8_t hasResult;
TSKEY min;
TSKEY max;
} SElapsedInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo) {
if (!isValidDataType(dataType)) {
......@@ -359,6 +365,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = sizeof(STwaInfo);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_ELAPSED) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SElapsedInfo);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
}
......@@ -459,6 +470,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = sizeof(SStddevdstInfo);
*interBytes = (*bytes);
} else if (functionId == TSDB_FUNC_ELAPSED) {
*type = TSDB_DATA_TYPE_BIGINT;
*bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
*interBytes = sizeof(SElapsedInfo);
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -468,7 +483,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// TODO use hash table
int32_t isValidFunction(const char* name, int32_t len) {
for(int32_t i = 0; i <= TSDB_FUNC_BLKINFO; ++i) {
for(int32_t i = 0; i <= TSDB_FUNC_ELAPSED; ++i) {
int32_t nameLen = (int32_t) strlen(aAggs[i].name);
if (len != nameLen) {
continue;
......@@ -3436,7 +3451,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t numOfElems = 0;
// todo : opt with pre-calculated result
// column missing cause the hasNull to be true
if (pCtx->preAggVals.isSet) {
......@@ -3539,7 +3554,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
* the type of intermediate data is binary
*/
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
......@@ -5043,6 +5058,89 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
static SElapsedInfo * getSElapsedInfo(SQLFunctionCtx *pCtx) {
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SElapsedInfo *)pCtx->pOutput;
} else {
return GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
}
}
static bool elapsedSetup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
SElapsedInfo *pInfo = getSElapsedInfo(pCtx);
pInfo->min = MAX_TS_KEY;
pInfo->max = 0;
pInfo->hasResult = 0;
return true;
}
static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_NO_NEEDED;
}
static void elapsedFunction(SQLFunctionCtx *pCtx) {
SElapsedInfo *pInfo = getSElapsedInfo(pCtx);
if (pCtx->preAggVals.isSet) {
if (pInfo->min == MAX_TS_KEY) {
pInfo->min = pCtx->preAggVals.statis.min;
pInfo->max = pCtx->preAggVals.statis.max;
} else {
pInfo->max = pCtx->preAggVals.statis.max;
}
} else {
if (0 == pCtx->size) {
goto elapsedOver;
}
if (pCtx->start.key == INT64_MIN) {
pInfo->min = pCtx->ptsList[0];
} else {
pInfo->min = pCtx->start.key;
}
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
} else {
pInfo->max = pCtx->ptsList[pCtx->size - 1];
}
}
elapsedOver:
SET_VAL(pCtx, pCtx->size, 1);
if (pCtx->size > 0) {
GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
pInfo->hasResult = DATA_SET_FLAG;
}
}
static void elapsedMerge(SQLFunctionCtx *pCtx) {
SElapsedInfo *pInfo = getSElapsedInfo(pCtx);
memcpy(pInfo, pCtx->pInput, (size_t)pCtx->inputBytes);
GET_RES_INFO(pCtx)->hasResult = pInfo->hasResult;
}
static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
if (GET_RES_INFO(pCtx)->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
SElapsedInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
*(int64_t *)pCtx->pOutput = pInfo->max - pInfo->min;
if (pCtx->numOfParams > 0 && pCtx->param[0].i64 > 0) {
*(int64_t *)pCtx->pOutput = *(int64_t *)pCtx->pOutput / pCtx->param[0].i64;
}
GET_RES_INFO(pCtx)->numOfRes = 1;
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......@@ -5063,8 +5161,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, ceil, floor, round, csum, mavg, sample,
6, 8, 1, 1, 1, -1, -1, -1,
// block_info
7
// block_info, elapsed
7, 1
};
SAggFunctionInfo aAggs[] = {{
......@@ -5547,4 +5645,16 @@ SAggFunctionInfo aAggs[] = {{
block_func_merge,
dataBlockRequired,
},
{
// 40
"elapsed",
TSDB_FUNC_ELAPSED,
TSDB_FUNC_ELAPSED,
TSDB_BASE_FUNC_SO,
elapsedSetup,
elapsedFunction,
elapsedFinalizer,
elapsedMerge,
elapsedRequired,
}
};
......@@ -1332,7 +1332,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP) {
if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP && functionId != TSDB_FUNC_ELAPSED) {
pCtx[k].start.key = INT64_MIN;
continue;
}
......@@ -1370,7 +1370,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
}
}
} else if (functionId == TSDB_FUNC_TWA) {
} else if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_ELAPSED) {
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v };
......@@ -1974,7 +1974,7 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde
pCtx->hasNull = hasNull(pColIndex, pStatis);
// set the statistics data for primary time stamp column
if (pCtx->functionId == TSDB_FUNC_SPREAD && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if ((pCtx->functionId == TSDB_FUNC_SPREAD || pCtx->functionId == TSDB_FUNC_ELAPSED) && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pCtx->preAggVals.isSet = true;
pCtx->preAggVals.statis.min = pSDataBlock->info.window.skey;
pCtx->preAggVals.statis.max = pSDataBlock->info.window.ekey;
......@@ -8549,6 +8549,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery;
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQueryAttr->needTableSeqScan = pQueryMsg->needTableSeqScan;
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
pQueryAttr->stateWindow = pQueryMsg->stateWindow;
pQueryAttr->vgId = vgId;
......
......@@ -538,7 +538,7 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) {
} else {
if (pQueryAttr->queryBlockDist) {
op = OP_TableBlockInfoScan;
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery || pQueryAttr->diffQuery) {
} else if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery || pQueryAttr->diffQuery || pQueryAttr->needTableSeqScan) {
op = OP_TableSeqScan;
} else if (pQueryAttr->needReverseScan) {
op = OP_DataBlocksOptScan;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册