未验证 提交 f690127a 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6549 from taosdata/feature/TD-4734

[TD-4734]<feature> session_window and state window support main query
...@@ -7822,17 +7822,29 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7822,17 +7822,29 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
return code; return code;
} }
} }
int32_t timeWindowQuery =
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap));
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, false) != TSDB_CODE_SUCCESS) { if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
// todo NOT support yet // todo NOT support yet
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) { for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId; int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) { if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
if ((timeWindowQuery || pQueryInfo->stateWindow) && f == TSDB_FUNC_LAST) {
pExpr->base.numOfParams = 1;
pExpr->base.param[0].i64 = TSDB_ORDER_ASC;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT;
} }
} }
...@@ -7863,7 +7875,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7863,7 +7875,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) { if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} else { } else {
if (isTimeWindowQuery(pQueryInfo) || pQueryInfo->sessionWindow.gap > 0) { if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (isTimeWindowQuery(pQueryInfo)) {
// check if the first column of the nest query result is timestamp column // check if the first column of the nest query result is timestamp column
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, 0); SColumn* pCol = taosArrayGetP(pQueryInfo->colList, 0);
if (pCol->info.type != TSDB_DATA_TYPE_TIMESTAMP) { if (pCol->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
...@@ -7972,8 +7987,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7972,8 +7987,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
if ((isTimeWindowQuery(pQueryInfo) || pQueryInfo->sessionWindow.gap > 0) && if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
......
...@@ -1751,7 +1751,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1751,7 +1751,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_SessionWindow: { case OP_SessionWindow: {
pRuntimeEnv->proot = pRuntimeEnv->proot =
createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break; break;
} }
case OP_MultiTableAggregate: { case OP_MultiTableAggregate: {
...@@ -1787,7 +1790,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1787,7 +1790,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
case OP_StateWindow: { case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot); int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
if (opType != OP_DummyInput) {
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
}
break; break;
} }
......
...@@ -51,38 +51,73 @@ class TDTestCase: ...@@ -51,38 +51,73 @@ class TDTestCase:
tdSql.checkRows(15) tdSql.checkRows(15)
tdSql.checkData(0, 1, 2) tdSql.checkData(0, 1, 2)
# session(ts,5a) main query
tdSql.query("select count(*) from (select * from dev_001) session(ts,5a)")
tdSql.checkRows(15)
tdSql.checkData(0, 1, 2)
# session(ts,1s) # session(ts,1s)
tdSql.query("select count(*) from dev_001 session(ts,1s)") tdSql.query("select count(*) from dev_001 session(ts,1s)")
tdSql.checkRows(12) tdSql.checkRows(12)
tdSql.checkData(0, 1, 5) tdSql.checkData(0, 1, 5)
# session(ts,1s) main query
tdSql.query("select count(*) from (select * from dev_001) session(ts,1s)")
tdSql.checkRows(12)
tdSql.checkData(0, 1, 5)
tdSql.query("select count(*) from dev_001 session(ts,1000a)") tdSql.query("select count(*) from dev_001 session(ts,1000a)")
tdSql.checkRows(12) tdSql.checkRows(12)
tdSql.checkData(0, 1, 5) tdSql.checkData(0, 1, 5)
tdSql.query("select count(*) from (select * from dev_001) session(ts,1000a)")
tdSql.checkRows(12)
tdSql.checkData(0, 1, 5)
# session(ts,1m) # session(ts,1m)
tdSql.query("select count(*) from dev_001 session(ts,1m)") tdSql.query("select count(*) from dev_001 session(ts,1m)")
tdSql.checkRows(9) tdSql.checkRows(9)
tdSql.checkData(0, 1, 8) tdSql.checkData(0, 1, 8)
# session(ts,1m)
tdSql.query("select count(*) from (select * from dev_001) session(ts,1m)")
tdSql.checkRows(9)
tdSql.checkData(0, 1, 8)
# session(ts,1h) # session(ts,1h)
tdSql.query("select count(*) from dev_001 session(ts,1h)") tdSql.query("select count(*) from dev_001 session(ts,1h)")
tdSql.checkRows(6) tdSql.checkRows(6)
tdSql.checkData(0, 1, 11) tdSql.checkData(0, 1, 11)
# session(ts,1h)
tdSql.query("select count(*) from (select * from dev_001) session(ts,1h)")
tdSql.checkRows(6)
tdSql.checkData(0, 1, 11)
# session(ts,1d) # session(ts,1d)
tdSql.query("select count(*) from dev_001 session(ts,1d)") tdSql.query("select count(*) from dev_001 session(ts,1d)")
tdSql.checkRows(4) tdSql.checkRows(4)
tdSql.checkData(0, 1, 13) tdSql.checkData(0, 1, 13)
# session(ts,1d)
tdSql.query("select count(*) from (select * from dev_001) session(ts,1d)")
tdSql.checkRows(4)
tdSql.checkData(0, 1, 13)
# session(ts,1w) # session(ts,1w)
tdSql.query("select count(*) from dev_001 session(ts,1w)") tdSql.query("select count(*) from dev_001 session(ts,1w)")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 15) tdSql.checkData(0, 1, 15)
# session(ts,1w)
tdSql.query("select count(*) from (select * from dev_001) session(ts,1w)")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 15)
# session with where # session with where
tdSql.query("select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtype),min(tagtype),max(tagtype),leastsquares(tagtype, 1, 1),spread(tagtype),stddev(tagtype),percentile(tagtype,0) from dev_001 where ts <'2020-05-20 0:0:0' session(ts,1d)") tdSql.query("select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtype),min(tagtype),max(tagtype),leastsquares(tagtype, 1, 1),spread(tagtype),stddev(tagtype),percentile(tagtype,0) from dev_001 where ts <'2020-05-20 0:0:0' session(ts,1d)")
tdSql.checkRows(2) tdSql.checkRows(2)
tdSql.checkData(0, 1, 13) tdSql.checkData(0, 1, 13)
tdSql.checkData(0, 2, 1) tdSql.checkData(0, 2, 1)
...@@ -97,6 +132,20 @@ class TDTestCase: ...@@ -97,6 +132,20 @@ class TDTestCase:
tdSql.checkData(0, 11, 1) tdSql.checkData(0, 11, 1)
tdSql.checkData(1, 11, 14) tdSql.checkData(1, 11, 14)
# session with where main
tdSql.query("select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtype),min(tagtype),max(tagtype),leastsquares(tagtype, 1, 1) from (select * from dev_001 where ts <'2020-05-20 0:0:0') session(ts,1d)")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, 13)
tdSql.checkData(0, 4, 7)
tdSql.checkData(0, 5, 91)
tdSql.checkData(0, 6, 1)
tdSql.checkData(0, 7, 13)
tdSql.checkData(0, 8, '{slop:1.000000, intercept:0.000000}')
# tdsql err # tdsql err
tdSql.error("select * from dev_001 session(ts,1w)") tdSql.error("select * from dev_001 session(ts,1w)")
tdSql.error("select count(*) from st session(ts,1w)") tdSql.error("select count(*) from st session(ts,1w)")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册