diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ec3b0c44213f728fcb56236fde282ed9fce50cbf..b2fdafb7f31f8bad08708b3fd01a57cb2a6d1991 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -226,6 +226,7 @@ typedef struct SQueryInfo { int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX int16_t resColumnId; // result column id bool distinctTag; // distinct tag or not + bool windowState; // window state or not int32_t round; // 0/1/.... int32_t bufLen; char* buf; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 63c08fe25b279416778d5cc6b44a983b72268871..9b60cca0edaf8db9cda5212c9d380ce5850b1cfd 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -89,6 +89,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken); static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding); +static int32_t validateWindowStateNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem); @@ -828,6 +829,35 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS // The following part is used to check for the invalid query expression. return checkInvalidExprForTimeWindow(pCmd, pQueryInfo); } +static int32_t validateWindowStateNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) { + + const char* msg1 = "invalid column name"; + const char* msg2 = "invalid window state"; + const char* msg3 = "not support window_state on super table"; + + SStrToken *col = &(pSqlNode->windowstateVal.col) ; + if (col->z == NULL || col->n <= 0) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + //TODO(dengyihao): check tag column + if (isStable) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); + } + + SColumnIndex index = COLUMN_INDEX_INITIALIZER; + if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); + } + + if (tscSqlExprNumOfExprs(pQueryInfo) == 1) { + SSqlExpr* pExpr = &(tscSqlExprGet(pQueryInfo, 0)->base); + if (index.columnIndex == pExpr->colInfo.colIndex && pExpr->colType == TSDB_DATA_TYPE_INT) { + pQueryInfo->windowState = true; + return TSDB_CODE_SUCCESS; + } + } + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); +} int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) { const char* msg1 = "gap should be fixed time window"; @@ -6721,6 +6751,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { return TSDB_CODE_TSC_INVALID_SQL; } + if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -7288,7 +7319,9 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_SQL; } - + if (validateWindowStateNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_TSC_INVALID_SQL; + } // set order by info if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != TSDB_CODE_SUCCESS) { @@ -7311,6 +7344,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) { return TSDB_CODE_TSC_INVALID_SQL; } + // parse the window_state /* * transfer sql functions that need secondary merge into another format * in dealing with super table queries such as: count/first/last diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 420b78f64dccfa6bac9302dc2c04c2ccd9ce44e5..a40501d1cf14c49ae959c2d6b7004b9739609e9c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3550,6 +3550,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); pQueryAttr->distinctTag = pQueryInfo->distinctTag; + pQueryAttr->windowState = pQueryInfo->windowState; pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfOutput = numOfOutput; diff --git a/src/inc/ttokendef.h b/src/inc/ttokendef.h index ef3f8ed1fbcc754ad54dd7e9c232f610b37b98e8..8f5f1d7590f80f9a76b21e5460bc009da6cabdc9 100644 --- a/src/inc/ttokendef.h +++ b/src/inc/ttokendef.h @@ -17,105 +17,105 @@ #define TDENGINE_TTOKENDEF_H -#define TK_ID 1 -#define TK_BOOL 2 -#define TK_TINYINT 3 -#define TK_SMALLINT 4 -#define TK_INTEGER 5 -#define TK_BIGINT 6 -#define TK_FLOAT 7 -#define TK_DOUBLE 8 -#define TK_STRING 9 -#define TK_TIMESTAMP 10 -#define TK_BINARY 11 -#define TK_NCHAR 12 -#define TK_OR 13 -#define TK_AND 14 -#define TK_NOT 15 -#define TK_EQ 16 -#define TK_NE 17 -#define TK_ISNULL 18 -#define TK_NOTNULL 19 -#define TK_IS 20 -#define TK_LIKE 21 -#define TK_GLOB 22 -#define TK_BETWEEN 23 -#define TK_IN 24 -#define TK_GT 25 -#define TK_GE 26 -#define TK_LT 27 -#define TK_LE 28 -#define TK_BITAND 29 -#define TK_BITOR 30 -#define TK_LSHIFT 31 -#define TK_RSHIFT 32 -#define TK_PLUS 33 -#define TK_MINUS 34 -#define TK_DIVIDE 35 -#define TK_TIMES 36 -#define TK_STAR 37 -#define TK_SLASH 38 -#define TK_REM 39 -#define TK_CONCAT 40 -#define TK_UMINUS 41 -#define TK_UPLUS 42 -#define TK_BITNOT 43 -#define TK_SHOW 44 -#define TK_DATABASES 45 -#define TK_TOPICS 46 -#define TK_MNODES 47 -#define TK_DNODES 48 -#define TK_ACCOUNTS 49 -#define TK_USERS 50 -#define TK_MODULES 51 -#define TK_QUERIES 52 -#define TK_CONNECTIONS 53 -#define TK_STREAMS 54 -#define TK_VARIABLES 55 -#define TK_SCORES 56 -#define TK_GRANTS 57 -#define TK_VNODES 58 -#define TK_IPTOKEN 59 -#define TK_DOT 60 -#define TK_CREATE 61 -#define TK_TABLE 62 -#define TK_STABLE 63 -#define TK_DATABASE 64 -#define TK_TABLES 65 -#define TK_STABLES 66 -#define TK_VGROUPS 67 -#define TK_DROP 68 -#define TK_TOPIC 69 -#define TK_DNODE 70 -#define TK_USER 71 -#define TK_ACCOUNT 72 -#define TK_USE 73 -#define TK_DESCRIBE 74 -#define TK_ALTER 75 -#define TK_PASS 76 -#define TK_PRIVILEGE 77 -#define TK_LOCAL 78 -#define TK_IF 79 -#define TK_EXISTS 80 -#define TK_PPS 81 -#define TK_TSERIES 82 -#define TK_DBS 83 -#define TK_STORAGE 84 -#define TK_QTIME 85 -#define TK_CONNS 86 -#define TK_STATE 87 -#define TK_KEEP 88 -#define TK_CACHE 89 -#define TK_REPLICA 90 -#define TK_QUORUM 91 -#define TK_DAYS 92 -#define TK_MINROWS 93 -#define TK_MAXROWS 94 -#define TK_BLOCKS 95 -#define TK_CTIME 96 -#define TK_WAL 97 -#define TK_FSYNC 98 -#define TK_COMP 99 +#define TK_ID 1 +#define TK_BOOL 2 +#define TK_TINYINT 3 +#define TK_SMALLINT 4 +#define TK_INTEGER 5 +#define TK_BIGINT 6 +#define TK_FLOAT 7 +#define TK_DOUBLE 8 +#define TK_STRING 9 +#define TK_TIMESTAMP 10 +#define TK_BINARY 11 +#define TK_NCHAR 12 +#define TK_OR 13 +#define TK_AND 14 +#define TK_NOT 15 +#define TK_EQ 16 +#define TK_NE 17 +#define TK_ISNULL 18 +#define TK_NOTNULL 19 +#define TK_IS 20 +#define TK_LIKE 21 +#define TK_GLOB 22 +#define TK_BETWEEN 23 +#define TK_IN 24 +#define TK_GT 25 +#define TK_GE 26 +#define TK_LT 27 +#define TK_LE 28 +#define TK_BITAND 29 +#define TK_BITOR 30 +#define TK_LSHIFT 31 +#define TK_RSHIFT 32 +#define TK_PLUS 33 +#define TK_MINUS 34 +#define TK_DIVIDE 35 +#define TK_TIMES 36 +#define TK_STAR 37 +#define TK_SLASH 38 +#define TK_REM 39 +#define TK_CONCAT 40 +#define TK_UMINUS 41 +#define TK_UPLUS 42 +#define TK_BITNOT 43 +#define TK_SHOW 44 +#define TK_DATABASES 45 +#define TK_TOPICS 46 +#define TK_MNODES 47 +#define TK_DNODES 48 +#define TK_ACCOUNTS 49 +#define TK_USERS 50 +#define TK_MODULES 51 +#define TK_QUERIES 52 +#define TK_CONNECTIONS 53 +#define TK_STREAMS 54 +#define TK_VARIABLES 55 +#define TK_SCORES 56 +#define TK_GRANTS 57 +#define TK_VNODES 58 +#define TK_IPTOKEN 59 +#define TK_DOT 60 +#define TK_CREATE 61 +#define TK_TABLE 62 +#define TK_STABLE 63 +#define TK_DATABASE 64 +#define TK_TABLES 65 +#define TK_STABLES 66 +#define TK_VGROUPS 67 +#define TK_DROP 68 +#define TK_TOPIC 69 +#define TK_DNODE 70 +#define TK_USER 71 +#define TK_ACCOUNT 72 +#define TK_USE 73 +#define TK_DESCRIBE 74 +#define TK_ALTER 75 +#define TK_PASS 76 +#define TK_PRIVILEGE 77 +#define TK_LOCAL 78 +#define TK_IF 79 +#define TK_EXISTS 80 +#define TK_PPS 81 +#define TK_TSERIES 82 +#define TK_DBS 83 +#define TK_STORAGE 84 +#define TK_QTIME 85 +#define TK_CONNS 86 +#define TK_STATE 87 +#define TK_KEEP 88 +#define TK_CACHE 89 +#define TK_REPLICA 90 +#define TK_QUORUM 91 +#define TK_DAYS 92 +#define TK_MINROWS 93 +#define TK_MAXROWS 94 +#define TK_BLOCKS 95 +#define TK_CTIME 96 +#define TK_WAL 97 +#define TK_FSYNC 98 +#define TK_COMP 99 #define TK_PRECISION 100 #define TK_UPDATE 101 #define TK_CACHELAST 102 @@ -136,74 +136,74 @@ #define TK_VARIABLE 117 #define TK_INTERVAL 118 #define TK_SESSION 119 -#define TK_FILL 120 -#define TK_SLIDING 121 -#define TK_ORDER 122 -#define TK_BY 123 -#define TK_ASC 124 -#define TK_DESC 125 -#define TK_GROUP 126 -#define TK_HAVING 127 -#define TK_LIMIT 128 -#define TK_OFFSET 129 -#define TK_SLIMIT 130 -#define TK_SOFFSET 131 -#define TK_WHERE 132 -#define TK_NOW 133 -#define TK_RESET 134 -#define TK_QUERY 135 -#define TK_SYNCDB 136 -#define TK_ADD 137 -#define TK_COLUMN 138 -#define TK_TAG 139 -#define TK_CHANGE 140 -#define TK_SET 141 -#define TK_KILL 142 -#define TK_CONNECTION 143 -#define TK_STREAM 144 -#define TK_COLON 145 -#define TK_ABORT 146 -#define TK_AFTER 147 -#define TK_ATTACH 148 -#define TK_BEFORE 149 -#define TK_BEGIN 150 -#define TK_CASCADE 151 -#define TK_CLUSTER 152 -#define TK_CONFLICT 153 -#define TK_COPY 154 -#define TK_DEFERRED 155 -#define TK_DELIMITERS 156 -#define TK_DETACH 157 -#define TK_EACH 158 -#define TK_END 159 -#define TK_EXPLAIN 160 -#define TK_FAIL 161 -#define TK_FOR 162 -#define TK_IGNORE 163 -#define TK_IMMEDIATE 164 -#define TK_INITIALLY 165 -#define TK_INSTEAD 166 -#define TK_MATCH 167 -#define TK_KEY 168 -#define TK_OF 169 -#define TK_RAISE 170 -#define TK_REPLACE 171 -#define TK_RESTRICT 172 -#define TK_ROW 173 -#define TK_STATEMENT 174 -#define TK_TRIGGER 175 -#define TK_VIEW 176 -#define TK_SEMI 177 -#define TK_NONE 178 -#define TK_PREV 179 -#define TK_LINEAR 180 -#define TK_IMPORT 181 -#define TK_TBNAME 182 -#define TK_JOIN 183 -#define TK_INSERT 184 -#define TK_INTO 185 -#define TK_VALUES 186 - +#define TK_WINDOW_STATE 120 +#define TK_FILL 121 +#define TK_SLIDING 122 +#define TK_ORDER 123 +#define TK_BY 124 +#define TK_ASC 125 +#define TK_DESC 126 +#define TK_GROUP 127 +#define TK_HAVING 128 +#define TK_LIMIT 129 +#define TK_OFFSET 130 +#define TK_SLIMIT 131 +#define TK_SOFFSET 132 +#define TK_WHERE 133 +#define TK_NOW 134 +#define TK_RESET 135 +#define TK_QUERY 136 +#define TK_SYNCDB 137 +#define TK_ADD 138 +#define TK_COLUMN 139 +#define TK_TAG 140 +#define TK_CHANGE 141 +#define TK_SET 142 +#define TK_KILL 143 +#define TK_CONNECTION 144 +#define TK_STREAM 145 +#define TK_COLON 146 +#define TK_ABORT 147 +#define TK_AFTER 148 +#define TK_ATTACH 149 +#define TK_BEFORE 150 +#define TK_BEGIN 151 +#define TK_CASCADE 152 +#define TK_CLUSTER 153 +#define TK_CONFLICT 154 +#define TK_COPY 155 +#define TK_DEFERRED 156 +#define TK_DELIMITERS 157 +#define TK_DETACH 158 +#define TK_EACH 159 +#define TK_END 160 +#define TK_EXPLAIN 161 +#define TK_FAIL 162 +#define TK_FOR 163 +#define TK_IGNORE 164 +#define TK_IMMEDIATE 165 +#define TK_INITIALLY 166 +#define TK_INSTEAD 167 +#define TK_MATCH 168 +#define TK_KEY 169 +#define TK_OF 170 +#define TK_RAISE 171 +#define TK_REPLACE 172 +#define TK_RESTRICT 173 +#define TK_ROW 174 +#define TK_STATEMENT 175 +#define TK_TRIGGER 176 +#define TK_VIEW 177 +#define TK_SEMI 178 +#define TK_NONE 179 +#define TK_PREV 180 +#define TK_LINEAR 181 +#define TK_IMPORT 182 +#define TK_TBNAME 183 +#define TK_JOIN 184 +#define TK_INSERT 185 +#define TK_INTO 186 +#define TK_VALUES 187 #define TK_SPACE 300 #define TK_COMMENT 301 diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index b9361650e99a0531033cf4f680860730266f68f3..6640061c637a6ab7e146cb08cc3ab66cf319818b 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -196,6 +196,7 @@ typedef struct SQueryAttr { bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan bool distinctTag; // distinct tag query + bool windowState; // window State on sub/normal table int32_t interBufSize; // intermediate buffer sizse int32_t havingNum; // having expr number @@ -302,6 +303,7 @@ enum OPERATOR_TYPE_E { OP_GlobalAggregate = 18, // global merge for the multi-way data sources. OP_Filter = 19, OP_Distinct = 20, + OP_StateWindow = 21, }; typedef struct SOperatorInfo { @@ -465,6 +467,16 @@ typedef struct SSWindowOperatorInfo { int32_t start; // start row index } SSWindowOperatorInfo; +typedef struct SStateWindowOperatorInfo { + SOptrBasicInfo binfo; + STimeWindow curWindow; // current time window + int32_t numOfRows; // number of rows + int32_t colIndex; // start row index + int32_t start; + char* prevData; // previous data + +} SStateWindowOperatorInfo ; + typedef struct SDistinctOperatorInfo { SHashObj *pSet; SSDataBlock *pRes; @@ -512,6 +524,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows, void* merger, bool groupMix); +SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); diff --git a/src/query/inc/qSqlparser.h b/src/query/inc/qSqlparser.h index 85cba06b3e82e72cbe4e4b58f6b4aa3e3c58a05f..026ae1d976957883da0c6c4658cda2412d35b00b 100644 --- a/src/query/inc/qSqlparser.h +++ b/src/query/inc/qSqlparser.h @@ -89,6 +89,10 @@ typedef struct SSessionWindowVal { SStrToken gap; } SSessionWindowVal; +typedef struct SWindowStateVal { + SStrToken col; +} SWindowStateVal; + struct SRelationInfo; typedef struct SSqlNode { @@ -100,6 +104,7 @@ typedef struct SSqlNode { SArray *fillType; // fill type[optional], SArray SIntervalVal interval; // (interval, interval_offset) [optional] SSessionWindowVal sessionVal; // session window [optional] + SWindowStateVal windowstateVal; // window_state(col) [optional] SStrToken sliding; // sliding window [optional] SLimitVal limit; // limit offset [optional] SLimitVal slimit; // group limit offset [optional] @@ -271,7 +276,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc void tSqlExprListDestroy(SArray *pList); SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, - SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, + SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SWindowStateVal *pw, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving); int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right); diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 8ef8ef0e2b3080ce334686f7fdbb7105596356e5..fa1292e2d76769b79ba1c6477aaf57e013b2939b 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). { //////////////////////// The SELECT statement ///////////////////////////////// %type select {SSqlNode*} %destructor select {destroySqlNode($$);} -select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { - A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N); +select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { + A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N); } select(A) ::= LP select(B) RP. {A = B;} @@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); } // select client_version() // select server_state() select(A) ::= SELECT(T) selcollist(W). { - A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); + A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); } // selcollist is a list of expressions that are to become the return @@ -552,6 +552,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. { X.col = V; X.gap = Y; } +%type windowstate_option {SWindowStateVal} +windowstate_option(X) ::= . {X.col.n = 0;} +windowstate_option(X) ::= WINDOW_STATE LP ids(V) RP. { + X.col = V; +} %type fill_opt {SArray*} %destructor fill_opt {taosArrayDestroy($$);} diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index feaa205c3ef49d988049d7b0253a8ad62facb970..9bec84e28eb8288674d1ecaa8276da250466f6e4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1758,6 +1758,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } break; } + case OP_StateWindow: { + pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot); + break; + } case OP_Limit: { pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); @@ -4413,6 +4418,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } else if (pDownstream->operatorType == OP_SessionWindow) { SSWindowOperatorInfo* pInfo = pDownstream->info; + pTableScanInfo->pCtx = pInfo->binfo.pCtx; + pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; + } else if (pDownstream->operatorType == OP_StateWindow) { + SStateWindowOperatorInfo* pInfo = pDownstream->info; + pTableScanInfo->pCtx = pInfo->binfo.pCtx; pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; @@ -5050,6 +5061,121 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } + +static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { + SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); + SOptrBasicInfo* pBInfo = &pInfo->binfo; + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STableQueryInfo* item = pRuntimeEnv->current; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + + SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0); + TSKEY* tsList = (TSKEY*)pTsColInfoData->pData; + + if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_TIMESTAMP) { + qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv)); + return; + } + for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) { + char* val = ((char*)pColInfoData->pData) + bytes * j; + if (isNull(val, type)) { + continue; + } + if (pInfo->prevData == NULL) { + pInfo->prevData = malloc(bytes); + memcpy(pInfo->prevData, val, bytes); + pInfo->curWindow.skey = tsList[j]; + pInfo->curWindow.ekey = tsList[j]; + pInfo->numOfRows = 1; + pInfo->start = j; + } else if (0 == memcmp(pInfo->prevData, val, bytes)) { + pInfo->curWindow.ekey = tsList[j]; + pInfo->numOfRows += 1; + pInfo->start = j; + } else { + SResultRow* pResult = NULL; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, + &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, + pBInfo->rowCellInfoOffset); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } + + doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, + pSDataBlock->info.rows, pOperator->numOfOutput); + + pInfo->curWindow.skey = tsList[j]; + pInfo->curWindow.ekey = tsList[j]; + memcpy(pInfo->prevData, val, bytes); + pInfo->numOfRows = 1; + pInfo->start = j; + } + } + SResultRow* pResult = NULL; + + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, + &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, + pBInfo->rowCellInfoOffset); + if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); + } + + doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList, + pSDataBlock->info.rows, pOperator->numOfOutput); + +} +static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + SStateWindowOperatorInfo* pWindowInfo = pOperator->info; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; + if (pOperator->status == OP_RES_TO_RETURN) { + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); + + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + return pBInfo->pRes; + } + int32_t order = pQueryAttr->order.order; + STimeWindow win = pQueryAttr->window; + SOperatorInfo* upstream = pOperator->upstream; + while (1) { + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + if (pBlock == NULL) { + break; + } + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order); + if (pWindowInfo->colIndex == -1) { + pWindowInfo->colIndex = pOperator->pExpr->base.colInfo.colIndex; + } + doStateWindowAggImpl(pOperator, pWindowInfo, pBlock); + } + + // restore the value + pQueryAttr->order.order = order; + pQueryAttr->window = win; + + pOperator->status = OP_RES_TO_RETURN; + closeAllResultRows(&pBInfo->resultRowInfo); + setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); + + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); + + if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + + return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; +} static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5500,7 +5626,28 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp return pOperator; } +SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { + SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); + pInfo->colIndex = -1; + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "StateWindowOperator"; + pOperator->operatorType = OP_StateWindow; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->upstream = upstream; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfOutput; + pOperator->info = pInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->exec = doStateWindowAgg; + pOperator->cleanup = destroyBasicOperatorInfo; + return pOperator; + +} SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 0554a887ec68ca3353b5e45c1067ea734ea98cfd..52b0329293e537785f1d484a8e6e535295aaff7d 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -121,6 +121,10 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) { op = OP_MultiTableAggregate; } else { + if (pQueryAttr->windowState) { + op = OP_StateWindow; + taosArrayPush(plan, &op); + } op = OP_Aggregate; } diff --git a/src/query/src/qSqlParser.c b/src/query/src/qSqlParser.c index fb8f164ed3f2920767c61dd08969a7435002f9d4..8432edc6ee12061692588b7bcc1de1e759fd9bb4 100644 --- a/src/query/src/qSqlParser.c +++ b/src/query/src/qSqlParser.c @@ -725,7 +725,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { */ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, - SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, + SSessionWindowVal *pSession, SWindowStateVal *pWindowStateVal, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *psLimit, tSqlExpr *pHaving) { assert(pSelNodeList != NULL); @@ -777,6 +777,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col); } + if (pWindowStateVal != NULL) { + pSqlNode->windowstateVal = *pWindowStateVal; + } else { + TPARSER_SET_NONE_TOKEN(pSqlNode->windowstateVal.col); + } + return pSqlNode; } diff --git a/src/query/src/qTokenizer.c b/src/query/src/qTokenizer.c index 7869e27707dc00c6856e70bb3e6ed903c5616b4d..7f3c269e999cfd251133ff66eae9d43dc232531b 100644 --- a/src/query/src/qTokenizer.c +++ b/src/query/src/qTokenizer.c @@ -141,6 +141,7 @@ static SKeyword keywordTable[] = { {"VARIABLE", TK_VARIABLE}, {"INTERVAL", TK_INTERVAL}, {"SESSION", TK_SESSION}, + {"WINDOW_STATE", TK_WINDOW_STATE}, {"FILL", TK_FILL}, {"SLIDING", TK_SLIDING}, {"ORDER", TK_ORDER},