未验证 提交 de0e8d88 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7631 from taosdata/feature/query

[td-6395]<enhance>: use the keyword "every" instead of keyword of "in…
......@@ -1081,12 +1081,13 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
const char* msg1 = "sliding cannot be used without interval";
const char* msg2 = "interval cannot be less than 1 us";
const char* msg3 = "interval value is too small";
const char* msg4 = "only point interpolation query requires keyword EVERY";
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (!TPARSER_HAS_TOKEN(pSqlNode->interval.interval)) {
if (TPARSER_HAS_TOKEN(pSqlNode->sliding)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
......@@ -1112,7 +1113,6 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
}
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
// interval cannot be less than 10 milliseconds
if (convertTimePrecision(pQueryInfo->interval.interval, tinfo.precision, TSDB_TIME_PRECISION_MICRO) < tsMinIntervalTime) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
......@@ -1127,9 +1127,15 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
return TSDB_CODE_TSC_INVALID_OPERATION;
}
bool interpQuery = tscIsPointInterpQuery(pQueryInfo);
if ((pSqlNode->interval.token == TK_EVERY && (!interpQuery)) || (pSqlNode->interval.token == TK_INTERVAL && interpQuery)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
const char* msg1 = "invalid column name";
......
......@@ -141,75 +141,76 @@
#define TK_FROM 123
#define TK_VARIABLE 124
#define TK_INTERVAL 125
#define TK_SESSION 126
#define TK_STATE_WINDOW 127
#define TK_FILL 128
#define TK_SLIDING 129
#define TK_ORDER 130
#define TK_BY 131
#define TK_ASC 132
#define TK_GROUP 133
#define TK_HAVING 134
#define TK_LIMIT 135
#define TK_OFFSET 136
#define TK_SLIMIT 137
#define TK_SOFFSET 138
#define TK_WHERE 139
#define TK_RESET 140
#define TK_QUERY 141
#define TK_SYNCDB 142
#define TK_ADD 143
#define TK_COLUMN 144
#define TK_MODIFY 145
#define TK_TAG 146
#define TK_CHANGE 147
#define TK_SET 148
#define TK_KILL 149
#define TK_CONNECTION 150
#define TK_STREAM 151
#define TK_COLON 152
#define TK_ABORT 153
#define TK_AFTER 154
#define TK_ATTACH 155
#define TK_BEFORE 156
#define TK_BEGIN 157
#define TK_CASCADE 158
#define TK_CLUSTER 159
#define TK_CONFLICT 160
#define TK_COPY 161
#define TK_DEFERRED 162
#define TK_DELIMITERS 163
#define TK_DETACH 164
#define TK_EACH 165
#define TK_END 166
#define TK_EXPLAIN 167
#define TK_FAIL 168
#define TK_FOR 169
#define TK_IGNORE 170
#define TK_IMMEDIATE 171
#define TK_INITIALLY 172
#define TK_INSTEAD 173
#define TK_MATCH 174
#define TK_KEY 175
#define TK_OF 176
#define TK_RAISE 177
#define TK_REPLACE 178
#define TK_RESTRICT 179
#define TK_ROW 180
#define TK_STATEMENT 181
#define TK_TRIGGER 182
#define TK_VIEW 183
#define TK_IPTOKEN 184
#define TK_SEMI 185
#define TK_NONE 186
#define TK_PREV 187
#define TK_LINEAR 188
#define TK_IMPORT 189
#define TK_TBNAME 190
#define TK_JOIN 191
#define TK_INSERT 192
#define TK_INTO 193
#define TK_VALUES 194
#define TK_EVERY 126
#define TK_SESSION 127
#define TK_STATE_WINDOW 128
#define TK_FILL 129
#define TK_SLIDING 130
#define TK_ORDER 131
#define TK_BY 132
#define TK_ASC 133
#define TK_GROUP 134
#define TK_HAVING 135
#define TK_LIMIT 136
#define TK_OFFSET 137
#define TK_SLIMIT 138
#define TK_SOFFSET 139
#define TK_WHERE 140
#define TK_RESET 141
#define TK_QUERY 142
#define TK_SYNCDB 143
#define TK_ADD 144
#define TK_COLUMN 145
#define TK_MODIFY 146
#define TK_TAG 147
#define TK_CHANGE 148
#define TK_SET 149
#define TK_KILL 150
#define TK_CONNECTION 151
#define TK_STREAM 152
#define TK_COLON 153
#define TK_ABORT 154
#define TK_AFTER 155
#define TK_ATTACH 156
#define TK_BEFORE 157
#define TK_BEGIN 158
#define TK_CASCADE 159
#define TK_CLUSTER 160
#define TK_CONFLICT 161
#define TK_COPY 162
#define TK_DEFERRED 163
#define TK_DELIMITERS 164
#define TK_DETACH 165
#define TK_EACH 166
#define TK_END 167
#define TK_EXPLAIN 168
#define TK_FAIL 169
#define TK_FOR 170
#define TK_IGNORE 171
#define TK_IMMEDIATE 172
#define TK_INITIALLY 173
#define TK_INSTEAD 174
#define TK_MATCH 175
#define TK_KEY 176
#define TK_OF 177
#define TK_RAISE 178
#define TK_REPLACE 179
#define TK_RESTRICT 180
#define TK_ROW 181
#define TK_STATEMENT 182
#define TK_TRIGGER 183
#define TK_VIEW 184
#define TK_IPTOKEN 185
#define TK_SEMI 186
#define TK_NONE 187
#define TK_PREV 188
#define TK_LINEAR 189
#define TK_IMPORT 190
#define TK_TBNAME 191
#define TK_JOIN 192
#define TK_INSERT 193
#define TK_INTO 194
#define TK_VALUES 195
#define TK_SPACE 300
......
......@@ -80,6 +80,7 @@ typedef struct tVariantListItem {
} tVariantListItem;
typedef struct SIntervalVal {
int32_t token;
SStrToken interval;
SStrToken offset;
} SIntervalVal;
......
......@@ -482,7 +482,7 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement /////////////////////////////////
%type select {SSqlNode*}
%destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) sliding_opt(S) session_option(H) windowstate_option(D) fill_opt(F)groupby_opt(P) having_opt(N) orderby_opt(Z) slimit_opt(G) limit_opt(L). {
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_option(K) sliding_opt(S) session_option(H) windowstate_option(D) fill_opt(F)groupby_opt(P) having_opt(N) orderby_opt(Z) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
}
......@@ -572,10 +572,14 @@ tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). {
%type tmvar {SStrToken}
tmvar(A) ::= VARIABLE(X). {A = X;}
%type interval_opt {SIntervalVal}
interval_opt(N) ::= INTERVAL LP tmvar(E) RP. {N.interval = E; N.offset.n = 0;}
interval_opt(N) ::= INTERVAL LP tmvar(E) COMMA tmvar(X) RP. {N.interval = E; N.offset = X;}
interval_opt(N) ::= . {memset(&N, 0, sizeof(N));}
%type interval_option {SIntervalVal}
interval_option(N) ::= intervalKey(A) LP tmvar(E) RP. {N.interval = E; N.offset.n = 0; N.token = A;}
interval_option(N) ::= intervalKey(A) LP tmvar(E) COMMA tmvar(X) RP. {N.interval = E; N.offset = X; N.token = A;}
interval_option(N) ::= . {memset(&N, 0, sizeof(N));}
%type intervalKey {int32_t}
intervalKey(A) ::= INTERVAL. {A = TK_INTERVAL;}
intervalKey(A) ::= EVERY. {A = TK_EVERY; }
%type session_option {SSessionWindowVal}
session_option(X) ::= . {X.col.n = 0; X.gap.n = 0;}
......@@ -584,6 +588,7 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
X.col = V;
X.gap = Y;
}
%type windowstate_option {SWindowStateVal}
windowstate_option(X) ::= . { X.col.n = 0; X.col.z = NULL;}
windowstate_option(X) ::= STATE_WINDOW LP ids(V) RP. { X.col = V; }
......
......@@ -230,6 +230,12 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
static void doSetOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
if (pOperator->pRuntimeEnv != NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
}
}
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
......@@ -5517,8 +5523,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
// start to flush data into disk and try do multiway merge sort
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
break;
}
......@@ -5629,8 +5634,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock);
}
pOperator->status = OP_EXEC_DONE;
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
doSetOperatorCompleted(pOperator);
finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
......@@ -5706,7 +5710,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
}
return pInfo->pRes;
......@@ -5824,8 +5828,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
return NULL;
}
......@@ -5853,8 +5856,7 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total);
pInfo->total = pInfo->limit;
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
} else {
pInfo->total += pBlock->info.rows;
}
......@@ -5889,8 +5891,7 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
}
}
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
return NULL;
}
......@@ -5905,9 +5906,8 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
}
return pIntervalInfo->pRes;
......@@ -5948,7 +5948,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
}
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
......@@ -5967,7 +5967,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
}
return pIntervalInfo->pRes;
......@@ -6025,9 +6025,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
if (pOperator->status == OP_RES_TO_RETURN) {
int64_t st = taosGetTimestampUs();
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
}
SQInfo* pQInfo = pRuntimeEnv->qinfo;
......@@ -7286,13 +7287,11 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
break;
}
if (!initMultiDistinctInfo(pInfo, pOperator, pBlock)) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
break;
}
// ensure result output buf
......
......@@ -766,7 +766,7 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat
pSqlNode->pSortOrder = pSortOrder;
pSqlNode->pWhere = pWhere;
pSqlNode->fillType = pFill;
pSqlNode->pHaving = pHaving;
pSqlNode->pHaving = pHaving;
if (pLimit != NULL) {
pSqlNode->limit = *pLimit;
......
此差异已折叠。
......@@ -137,6 +137,7 @@ static SKeyword keywordTable[] = {
{"COMMA", TK_COMMA},
{"NULL", TK_NULL},
{"SELECT", TK_SELECT},
{"EVERY", TK_EVERY},
{"FROM", TK_FROM},
{"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL},
......
......@@ -57,15 +57,15 @@ class TDTestCase:
tdSql.checkRows(0)
tdSql.query("select interp(pav) from ap1 where ts = '2021-07-25 02:19:54' FILL (LINEAR)")
tdSql.checkRows(0)
tdSql.query("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' INTERVAL(1000a) FILL (LINEAR)")
tdSql.query("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' every(1000a) FILL (LINEAR)")
tdSql.checkRows(6)
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' INTERVAL(1000a) FILL (NEXT)")
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' every(1000a) FILL (NEXT)")
tdSql.checkRows(6)
tdSql.checkData(0,1,2.90799)
tdSql.query("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' INTERVAL(1000a) FILL (PREV)")
tdSql.query("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' every(1000a) FILL (PREV)")
tdSql.checkRows(7)
tdSql.checkData(1,1,1.47885)
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' INTERVAL(1000a) FILL (LINEAR)")
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' every(1000a) FILL (LINEAR)")
tdSql.checkRows(7)
# check desc order
......@@ -74,13 +74,13 @@ class TDTestCase:
tdSql.checkRows(0)
tdSql.query("select interp(pav) from ap1 where ts = '2021-07-25 02:19:54' FILL (LINEAR) order by ts desc")
tdSql.checkRows(0)
tdSql.query("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' INTERVAL(1000a) FILL (LINEAR) order by ts desc")
tdSql.query("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' every(1000a) FILL (LINEAR) order by ts desc")
tdSql.checkRows(6)
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' INTERVAL(1000a) FILL (NEXT) order by ts desc")
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts<'2021-07-25 02:20:00' every(1000a) FILL (NEXT) order by ts desc")
tdSql.checkRows(6)
tdSql.checkData(0,1,4.60900)
tdSql.error("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' INTERVAL(1000a) FILL (PREV) order by ts desc")
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' INTERVAL(1000a) FILL (LINEAR) order by ts desc")
tdSql.error("select interp(pav) from ap1 where ts> '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' every(1000a) FILL (PREV) order by ts desc")
tdSql.query("select interp(pav) from ap1 where ts>= '2021-07-25 02:19:54' and ts <= '2021-07-25 02:20:00' every(1000a) FILL (LINEAR) order by ts desc")
tdSql.checkRows(7)
# check exception
......@@ -88,7 +88,7 @@ class TDTestCase:
tdSql.error("select interp(*) from ap1 FILL(NEXT)")
tdSql.error("select interp(*) from ap1 ts >= '2021-07-25 02:19:54' FILL(NEXT)")
tdSql.error("select interp(*) from ap1 ts <= '2021-07-25 02:19:54' FILL(NEXT)")
tdSql.error("select interp(*) from ap1 where ts >'2021-07-25 02:19:59.938' and ts < now interval(1s) fill(next)")
tdSql.error("select interp(*) from ap1 where ts >'2021-07-25 02:19:59.938' and ts < now every(1s) fill(next)")
def stop(self):
tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册