未验证 提交 fc2d75d6 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #8727 from taosdata/feature/TD-10986

[TD-10986]<feature>: Add elapsed function.
......@@ -2505,6 +2505,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg13 = "parameter list required";
const char* msg14 = "third parameter algorithm must be 'default' or 't-digest'";
const char* msg15 = "parameter is out of range [1, 1000]";
const char* msg16 = "elapsed duration should be greater than or equal to database precision";
switch (functionId) {
case TSDB_FUNC_COUNT: {
......@@ -2599,19 +2600,21 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_FLOOR:
case TSDB_FUNC_ROUND:
case TSDB_FUNC_STDDEV:
case TSDB_FUNC_LEASTSQR: {
case TSDB_FUNC_LEASTSQR:
case TSDB_FUNC_ELAPSED: {
// 1. valid the number of parameters
int32_t numOfParams = (pItem->pNode->Expr.paramList == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->Expr.paramList);
// no parameters or more than one parameter for function
if (pItem->pNode->Expr.paramList == NULL ||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) {
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3) ||
(functionId == TSDB_FUNC_ELAPSED && numOfParams > 2)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);
if (pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) {
if ((pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) || 0 == pParamElem->pNode->columnName.n) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2620,6 +2623,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
// elapsed only can be applied to primary key
if (functionId == TSDB_FUNC_ELAPSED && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "elapsed only can be applied to primary key");
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -2631,7 +2639,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. check if sql function can be applied on this column data type
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
if (!IS_NUMERIC_TYPE(pSchema->type)) {
if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
......@@ -2676,11 +2684,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
} else if (functionId == TSDB_FUNC_IRATE) {
int64_t prec = info.precision;
tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
} else if (functionId == TSDB_FUNC_DERIVATIVE) {
} else if (functionId == TSDB_FUNC_DERIVATIVE || (functionId == TSDB_FUNC_ELAPSED && 2 == numOfParams)) {
char val[8] = {0};
int64_t tickPerSec = 0;
if (tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -2690,23 +2698,27 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI);
}
if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) {
if ((tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) && (functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg10);
}
} else if (tickPerSec <= 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg16);
}
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
memset(val, 0, tListLen(val));
if (functionId == TSDB_FUNC_DERIVATIVE) {
memset(val, 0, tListLen(val));
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int64_t v = *(int64_t*) val;
if (v != 0 && v != 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
int64_t v = *(int64_t*) val;
if (v != 0 && v != 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
}
}
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
......@@ -3125,7 +3137,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_SUCCESS;
}
default: {
pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n);
if (pUdfInfo == NULL) {
......@@ -3496,7 +3507,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) {
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) ||
(functionId == TSDB_FUNC_SAMPLE)) {
(functionId == TSDB_FUNC_SAMPLE) || (functionId == TSDB_FUNC_ELAPSED)) {
if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes,
&interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -3551,8 +3562,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
}
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP are not allowed to apply to super table directly";
const char* msg2 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP only support group by tbname for super table query";
const char* msg1 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP/Elapsed are not allowed to apply to super table directly";
const char* msg2 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP/Elapsed only support group by tbname for super table query";
const char* msg3 = "functions not support for super table query";
// filter sql function not supported by metric query yet.
......@@ -3570,7 +3581,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
}
if (tscIsTWAQuery(pQueryInfo) || tscIsDiffDerivLikeQuery(pQueryInfo) || tscIsIrateQuery(pQueryInfo) ||
tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_SAMPLE) || tscGetPointInterpQuery(pQueryInfo)) {
tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_SAMPLE) || tscGetPointInterpQuery(pQueryInfo) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED)) {
if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return true;
......@@ -7474,7 +7485,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
const char* msg3 = "group by/session/state_window not allowed on projection query";
const char* msg4 = "retrieve tags not compatible with group by or interval query";
const char* msg5 = "functions can not be mixed up";
const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg only support group by tbname";
const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg/Elapsed only support group by tbname";
// only retrieve tags, group by is not supportted
if (tscQueryTags(pQueryInfo)) {
......@@ -7536,7 +7547,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
}
if ((!pQueryInfo->stateWindow) && (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA ||
f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG)) {
f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG || f == TSDB_FUNC_ELAPSED)) {
for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) {
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j);
if (j == 0) {
......@@ -7585,7 +7596,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char*
int32_t validateFunctionFromUpstream(SQueryInfo* pQueryInfo, char* msg) {
const char* msg1 = "TWA/Diff/Derivative/Irate are not allowed to apply to super table without group by tbname";
const char* msg1 = "TWA/Diff/Derivative/Irate/elapsed are not allowed to apply to super table without group by tbname";
const char* msg2 = "group by not supported in nested interp query";
const char* msg3 = "order by not supported in nested interp query";
const char* msg4 = "first column should be timestamp for interp query";
......@@ -7598,7 +7609,7 @@ int32_t validateFunctionFromUpstream(SQueryInfo* pQueryInfo, char* msg) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || f == TSDB_FUNC_DIFF) {
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ELAPSED) {
for (int32_t j = 0; j < upNum; ++j) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, j);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pUp, 0);
......
......@@ -943,6 +943,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->tsCompQuery = query.tsCompQuery;
pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needTableSeqScan = query.needTableSeqScan;
pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->stateWindow = query.stateWindow;
pQueryMsg->numOfTags = htonl(numOfTags);
......
......@@ -375,6 +375,10 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) {
return true;
}
bool tscNeedTableSeqScan(SQueryInfo* pQueryInfo) {
return pQueryInfo->stableQuery && (tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_TWA) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED));
}
bool tscGetPointInterpQuery(SQueryInfo* pQueryInfo) {
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
......@@ -391,7 +395,6 @@ bool tscGetPointInterpQuery(SQueryInfo* pQueryInfo) {
return false;
}
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo) {
if (tscIsProjectionQuery(pQueryInfo)) {
return false;
......@@ -524,7 +527,7 @@ bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo) {
}
int32_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) {
if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP || functionId == TSDB_FUNC_ELAPSED) {
return true;
}
}
......@@ -5054,6 +5057,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->needTableSeqScan = tscNeedTableSeqScan(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow;
......
......@@ -475,6 +475,7 @@ typedef struct {
bool tsCompQuery; // is tscomp query
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needTableSeqScan; // need scan table by table
bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag
......
......@@ -79,6 +79,8 @@ extern "C" {
#define TSDB_FUNC_BLKINFO 39
#define TSDB_FUNC_ELAPSED 40
///////////////////////////////////////////
// the following functions is not implemented.
// after implementation, move them before TSDB_FUNC_BLKINFO. also make TSDB_FUNC_BLKINFO the maxium function index
......
......@@ -230,6 +230,7 @@ typedef struct SQueryAttr {
bool diffQuery; // is diff query
bool simpleAgg;
bool pointInterpQuery; // point interpolation query
bool needTableSeqScan; // need scan table by table
bool needReverseScan; // need reverse scan
bool distinct; // distinct query or not
bool stateWindow; // window State on sub/normal table
......
......@@ -196,6 +196,12 @@ typedef struct {
char *taglists;
} SSampleFuncInfo;
typedef struct SElapsedInfo {
int8_t hasResult;
TSKEY min;
TSKEY max;
} SElapsedInfo;
typedef struct {
bool valueAssigned;
union {
......@@ -371,6 +377,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = sizeof(STwaInfo);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_ELAPSED) {
*type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SElapsedInfo);
*interBytes = *bytes;
return TSDB_CODE_SUCCESS;
}
}
......@@ -471,6 +482,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*bytes = sizeof(SStddevdstInfo);
*interBytes = (*bytes);
} else if (functionId == TSDB_FUNC_ELAPSED) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = tDataTypes[*type].bytes;
*interBytes = sizeof(SElapsedInfo);
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -480,7 +495,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
// TODO use hash table
int32_t isValidFunction(const char* name, int32_t len) {
for(int32_t i = 0; i <= TSDB_FUNC_BLKINFO; ++i) {
for(int32_t i = 0; i <= TSDB_FUNC_ELAPSED; ++i) {
int32_t nameLen = (int32_t) strlen(aAggs[i].name);
if (len != nameLen) {
continue;
......@@ -3449,7 +3464,7 @@ static void spread_function(SQLFunctionCtx *pCtx) {
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t numOfElems = 0;
// todo : opt with pre-calculated result
// column missing cause the hasNull to be true
if (pCtx->preAggVals.isSet) {
......@@ -3552,7 +3567,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
* the type of intermediate data is binary
*/
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
if (pCtx->currentStage == MERGE_STAGE) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
......@@ -4922,6 +4937,120 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
static SElapsedInfo * getSElapsedInfo(SQLFunctionCtx *pCtx) {
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
return (SElapsedInfo *)pCtx->pOutput;
} else {
return GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
}
}
static bool elapsedSetup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) {
if (!function_setup(pCtx, pResInfo)) {
return false;
}
SElapsedInfo *pInfo = getSElapsedInfo(pCtx);
pInfo->min = MAX_TS_KEY;
pInfo->max = 0;
pInfo->hasResult = 0;
return true;
}
static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_NO_NEEDED;
}
static void elapsedFunction(SQLFunctionCtx *pCtx) {
SElapsedInfo *pInfo = getSElapsedInfo(pCtx);
if (pCtx->preAggVals.isSet) {
if (pInfo->min == MAX_TS_KEY) {
pInfo->min = pCtx->preAggVals.statis.min;
pInfo->max = pCtx->preAggVals.statis.max;
} else {
if (pCtx->order == TSDB_ORDER_ASC) {
pInfo->max = pCtx->preAggVals.statis.max;
} else {
pInfo->min = pCtx->preAggVals.statis.min;
}
}
} else {
// 0 == pCtx->size mean this is end interpolation.
if (0 == pCtx->size) {
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key;
}
} else {
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
}
}
goto elapsedOver;
}
int64_t *ptsList = (int64_t *)GET_INPUT_DATA_LIST(pCtx);
// pCtx->start.key == INT64_MIN mean this is first window or there is actual start point of current window.
// pCtx->end.key == INT64_MIN mean current window does not end in current data block or there is actual end point of current window.
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max;
} else {
pInfo->max = pCtx->start.key + 1;
}
if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key;
} else {
pInfo->min = ptsList[0];
}
} else {
if (pCtx->start.key == INT64_MIN) {
pInfo->min = (pInfo->min > ptsList[0]) ? ptsList[0] : pInfo->min;
} else {
pInfo->min = pCtx->start.key;
}
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
} else {
pInfo->max = ptsList[pCtx->size - 1];
}
}
}
elapsedOver:
SET_VAL(pCtx, pCtx->size, 1);
if (pCtx->size > 0) {
GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
pInfo->hasResult = DATA_SET_FLAG;
}
}
static void elapsedMerge(SQLFunctionCtx *pCtx) {
SElapsedInfo *pInfo = getSElapsedInfo(pCtx);
memcpy(pInfo, pCtx->pInput, (size_t)pCtx->inputBytes);
GET_RES_INFO(pCtx)->hasResult = pInfo->hasResult;
}
static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
if (GET_RES_INFO(pCtx)->hasResult != DATA_SET_FLAG) {
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
return;
}
SElapsedInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
*(double *)pCtx->pOutput = (double)pInfo->max - (double)pInfo->min;
if (pCtx->numOfParams > 0 && pCtx->param[0].i64 > 0) {
*(double *)pCtx->pOutput = *(double *)pCtx->pOutput / pCtx->param[0].i64;
}
GET_RES_INFO(pCtx)->numOfRes = 1;
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/*
* function compatible list.
......@@ -4942,8 +5071,8 @@ int32_t functionCompatList[] = {
1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1,
// tid_tag, deriv, ceil, floor, round, csum, mavg, sample,
6, 8, 1, 1, 1, -1, -1, -1,
// block_info
7
// block_info, elapsed
7, 1
};
SAggFunctionInfo aAggs[] = {{
......@@ -5426,4 +5555,16 @@ SAggFunctionInfo aAggs[] = {{
block_func_merge,
dataBlockRequired,
},
{
// 40
"elapsed",
TSDB_FUNC_ELAPSED,
TSDB_FUNC_ELAPSED,
TSDB_BASE_FUNC_SO,
elapsedSetup,
elapsedFunction,
elapsedFinalizer,
elapsedMerge,
elapsedRequired,
}
};
......@@ -933,9 +933,10 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
bool hasAggregates = pCtx[0].preAggVals.isSet;
for (int32_t k = 0; k < numOfOutput; ++k) {
bool hasAggregates = pCtx[k].preAggVals.isSet;
pCtx[k].size = forwardStep;
pCtx[k].startTs = pWin->skey;
......@@ -1258,7 +1259,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
int32_t functionId = pCtx[k].functionId;
if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP) {
if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP && functionId != TSDB_FUNC_ELAPSED) {
pCtx[k].start.key = INT64_MIN;
continue;
}
......@@ -1301,7 +1302,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
}
}
} else if (functionId == TSDB_FUNC_TWA) {
} else if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_ELAPSED) {
assert(curTs != windowKey);
if (prevRowIndex == -1) {
......@@ -1468,7 +1469,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
......@@ -1491,23 +1491,22 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
continue;
}
STimeWindow w = pRes->win;
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
STimeWindow w = pRes->win;
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1,
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1,
tsCols[startPos], startPos, QUERY_IS_ASC_QUERY(pQueryAttr) ? w.ekey : w.skey, RESULT_ROW_END_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
}
// restore current time window
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
......@@ -1821,7 +1820,7 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde
pCtx->hasNull = hasNull(pColIndex, pStatis);
// set the statistics data for primary time stamp column
if (pCtx->functionId == TSDB_FUNC_SPREAD && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
if ((pCtx->functionId == TSDB_FUNC_SPREAD || pCtx->functionId == TSDB_FUNC_ELAPSED) && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
pCtx->preAggVals.isSet = true;
pCtx->preAggVals.statis.min = pSDataBlock->info.window.skey;
pCtx->preAggVals.statis.max = pSDataBlock->info.window.ekey;
......@@ -6203,7 +6202,17 @@ group_finished_exit:
return true;
}
static void resetInterpolation(SQLFunctionCtx *pCtx, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfOutput) {
if (!pRuntimeEnv->pQueryAttr->timeWindowInterpo) {
return;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].start.key = INT64_MIN;
pCtx[i].end.key = INT64_MIN;
}
*(TSKEY *)pRuntimeEnv->prevRow[0] = INT64_MIN;
}
static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDataBlock* pBlock, bool newgroup) {
STimeEveryOperatorInfo* pEveryInfo = (STimeEveryOperatorInfo*) pOperator->info;
......@@ -6431,6 +6440,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
STableId prevId = {0, 0};
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
......@@ -6440,6 +6450,12 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
break;
}
if (prevId.tid != pBlock->info.tid || prevId.uid != pBlock->info.uid) {
resetInterpolation(pIntervalInfo->pCtx, pRuntimeEnv, pOperator->numOfOutput);
prevId.uid = pBlock->info.uid;
prevId.tid = pBlock->info.tid;
}
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
......@@ -8785,6 +8801,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery;
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQueryAttr->needTableSeqScan = pQueryMsg->needTableSeqScan;
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
pQueryAttr->stateWindow = pQueryMsg->stateWindow;
pQueryAttr->vgId = vgId;
......
......@@ -538,7 +538,7 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) {
} else {
if (pQueryAttr->queryBlockDist) {
op = OP_TableBlockInfoScan;
} else if (pQueryAttr->tsCompQuery || pQueryAttr->diffQuery) {
} else if (pQueryAttr->tsCompQuery || pQueryAttr->diffQuery || pQueryAttr->needTableSeqScan) {
op = OP_TableSeqScan;
} else if (pQueryAttr->needReverseScan || pQueryAttr->pointInterpQuery) {
op = OP_DataBlocksOptScan;
......
......@@ -371,6 +371,7 @@ python3 ./test.py -f functions/function_irate.py
python3 ./test.py -f functions/function_ceil.py
python3 ./test.py -f functions/function_floor.py
python3 ./test.py -f functions/function_round.py
python3 ./test.py -f functions/function_elapsed.py
python3 ./test.py -f functions/function_mavg.py
python3 ./test.py -f functions/function_csum.py
......
###################################################################
# Copyright (c) 2020 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
from functions.function_elapsed_case import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def genTime(self, no):
h = int(no / 60)
hs = "%d" %h
if h < 10:
hs = "0%d" %h
m = int(no % 60)
ms = "%d" %m
if m < 10:
ms = "0%d" %m
return hs, ms
def general(self):
# normal table
tdSql.execute("create database wxy_db minrows 10 maxrows 200")
tdSql.execute("use wxy_db")
tdSql.execute("create table t1(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp)")
for i in range(1, 1001):
hs, ms = self.genTime(i)
if i < 500:
ret = tdSql.execute("insert into t1(ts, i, b) values (\"2021-11-22 %s:%s:00\", %d, 1)" % (hs, ms, i))
else:
ret = tdSql.execute("insert into t1(ts, i, b) values (\"2021-11-22 %s:%s:00\", %d, 0)" % (hs, ms, i))
tdSql.query("select count(*) from t1")
tdSql.checkEqual(int(tdSql.getData(0, 0)), 1000)
# empty normal table
tdSql.execute("create table t2(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp)")
tdSql.execute("create database wxy_db_ns precision \"ns\"")
tdSql.execute("use wxy_db_ns")
tdSql.execute("create table t1 (ts timestamp, f float)")
tdSql.execute("insert into t1 values('2021-11-18 00:00:00.000000100', 1)"
"('2021-11-18 00:00:00.000000200', 2)"
"('2021-11-18 00:00:00.000000300', 3)"
"('2021-11-18 00:00:00.000000500', 4)")
# super table
tdSql.execute("use wxy_db")
tdSql.execute("create stable st1(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp) tags(id int)")
tdSql.execute("create table st1s1 using st1 tags(1)")
tdSql.execute("create table st1s2 using st1 tags(2)")
for i in range(1, 1001):
hs, ms = self.genTime(i)
if 0 == i % 2:
ret = tdSql.execute("insert into st1s1(ts, i) values (\"2021-11-22 %s:%s:00\", %d)" % (hs, ms, i))
else:
ret = tdSql.execute("insert into st1s2(ts, i) values (\"2021-11-22 %s:%s:00\", %d)" % (hs, ms, i))
tdSql.query("select count(*) from st1s1")
tdSql.checkEqual(int(tdSql.getData(0, 0)), 500)
tdSql.query("select count(*) from st1s2")
tdSql.checkEqual(int(tdSql.getData(0, 0)), 500)
# empty super table
tdSql.execute("create stable st2(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp) tags(id int)")
tdSql.execute("create table st2s1 using st1 tags(1)")
tdSql.execute("create table st2s2 using st1 tags(2)")
tdSql.execute("create stable st3(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp) tags(id int)")
def run(self):
tdSql.prepare()
self.general()
ElapsedCase().run()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
###################################################################
# Copyright (c) 2020 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
from functions.function_elapsed_case import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
ElapsedCase(True).run()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册