提交 43c94bd3 编写于 作者: X Xiaoyu Wang

[TD-10986]<feature>: Add elapsed function.

上级 3b471831
......@@ -2449,12 +2449,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg7 = "normal table can not apply this function";
const char* msg8 = "multi-columns selection does not support alias column name";
const char* msg9 = "diff/derivative can no be applied to unsigned numeric type";
const char* msg10 = "derivative/elapsed duration should be greater than 1 Second";
const char* msg10 = "derivative duration should be greater than 1 Second";
const char* msg11 = "third parameter in derivative should be 0 or 1";
const char* msg12 = "parameter is out of range [1, 100]";
const char* msg13 = "parameter list required";
const char* msg14 = "third parameter algorithm must be 'default' or 't-digest'";
const char* msg15 = "parameter is out of range [1, 1000]";
const char* msg16 = "elapsed duration should be greater than database precision";
switch (functionId) {
case TSDB_FUNC_COUNT: {
......@@ -2563,7 +2564,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0);
if (pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) {
if ((pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) || 0 == pParamElem->pNode->columnName.n) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2637,7 +2638,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
char val[8] = {0};
int64_t tickPerSec = 0;
if (tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
......@@ -2647,9 +2648,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI);
}
if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) {
if ((tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) && (functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg10);
}
} else if (tickPerSec <= 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg16);
}
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
if (functionId == TSDB_FUNC_DERIVATIVE) {
......
......@@ -471,8 +471,8 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*interBytes = (*bytes);
} else if (functionId == TSDB_FUNC_ELAPSED) {
*type = TSDB_DATA_TYPE_BIGINT;
*bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = tDataTypes[*type].bytes;
*interBytes = sizeof(SElapsedInfo);
} else {
return TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -5132,9 +5132,9 @@ static void elapsedFinalizer(SQLFunctionCtx *pCtx) {
}
SElapsedInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
*(int64_t *)pCtx->pOutput = pInfo->max - pInfo->min;
*(double *)pCtx->pOutput = (double)pInfo->max - (double)pInfo->min;
if (pCtx->numOfParams > 0 && pCtx->param[0].i64 > 0) {
*(int64_t *)pCtx->pOutput = *(int64_t *)pCtx->pOutput / pCtx->param[0].i64;
*(double *)pCtx->pOutput = *(double *)pCtx->pOutput / pCtx->param[0].i64;
}
GET_RES_INFO(pCtx)->numOfRes = 1;
......
......@@ -6107,6 +6107,18 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
}
static void resetInterpolation(SQLFunctionCtx *pCtx, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfOutput) {
if (!pRuntimeEnv->pQueryAttr->timeWindowInterpo) {
return;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].start.key = INT64_MIN;
pCtx[i].end.key = INT64_MIN;
}
*(TSKEY *)pRuntimeEnv->prevRow[0] = INT64_MIN;
}
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -6135,6 +6147,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
STableId prevId = {0, 0};
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
......@@ -6144,6 +6157,12 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
break;
}
if (prevId.tid != pBlock->info.tid || prevId.uid != pBlock->info.uid) {
resetInterpolation(pIntervalInfo->pCtx, pRuntimeEnv, pOperator->numOfOutput);
prevId.uid = pBlock->info.uid;
prevId.tid = pBlock->info.tid;
}
// the pDataBlock are always the same one, no need to call this again
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
......
###################################################################
# Copyright (c) 2020 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def general(self):
# normal table
tdSql.execute("create database ms_test")
tdSql.execute("use ms_test")
tdSql.execute("create table t1 (ts timestamp, f float)")
tdSql.execute("insert into t1 values('2021-11-18 00:00:10', 1)"
"('2021-11-18 00:00:30', 2)"
"('2021-11-18 00:00:40', 3)"
"('2021-11-18 00:01:00', 4)")
tdSql.execute("create database ns_test precision \"ns\"")
tdSql.execute("use ns_test")
tdSql.execute("create table t1 (ts timestamp, f float)")
tdSql.execute("insert into t1 values('2021-11-18 00:00:00.000000100', 1)"
"('2021-11-18 00:00:00' + 200b, 2)"
"('2021-11-18 00:00:00' + 300b, 3)"
"('2021-11-18 00:00:00' + 500b, 4)")
# super table
tdSql.execute("create stable st1(ts timestamp, f float) tags(id int)")
tdSql.execute("create table subt1 using st1 tags(1)")
tdSql.execute("create table subt2 using st1 tags(2)")
tdSql.execute("insert into subt1 values('2021-11-18 00:00:00', 1)('2021-11-18 00:06:00', 2)('2021-11-18 00:12:00', 3)('2021-11-18 00:24:00', 4)")
def normalTable(self):
tdSql.query("select elapsed(ts) from t1")
tdSql.checkData(0, 0, 50000)
tdSql.query("select elapsed(ts, 20s) from t1")
tdSql.checkData(0, 0, 2.5)
tdSql.query("select elapsed(ts) from t1 interval(1s)")
def superTable(self):
tdSql.query("select elapsed(ts) from st1 group by tbname")
tdSql.query("select elapsed(ts) from st1 interval(1s) group by tbname")
tdSql.query("select elapsed(ts, 1s), twa(f), elapsed(ts, 1s) * twa(f) as integral from st1 interval(1s) group by tbname")
def run(self):
tdSql.prepare()
self.general()
self.normalTable()
#self.superTable()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册