提交 3c0617bc 编写于 作者: Y yihaoDeng

TD-2570

上级 4fa64be7
......@@ -226,6 +226,7 @@ typedef struct SQueryInfo {
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not
bool windowState; // window state or not
int32_t round; // 0/1/....
int32_t bufLen;
char* buf;
......
......@@ -89,6 +89,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm
static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t validateWindowStateNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
......@@ -828,6 +829,35 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
// The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
}
static int32_t validateWindowStateNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
const char* msg1 = "invalid column name";
const char* msg2 = "invalid window state";
const char* msg3 = "not support window_state on super table";
SStrToken *col = &(pSqlNode->windowstateVal.col) ;
if (col->z == NULL || col->n <= 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
//TODO(dengyihao): check tag column
if (isStable) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSqlExprNumOfExprs(pQueryInfo) == 1) {
SSqlExpr* pExpr = &(tscSqlExprGet(pQueryInfo, 0)->base);
if (index.columnIndex == pExpr->colInfo.colIndex && pExpr->colType == TSDB_DATA_TYPE_INT) {
pQueryInfo->windowState = true;
return TSDB_CODE_SUCCESS;
}
}
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) {
const char* msg1 = "gap should be fixed time window";
......@@ -6721,6 +6751,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -7288,7 +7319,9 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
if (validateWindowStateNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// set order by info
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) !=
TSDB_CODE_SUCCESS) {
......@@ -7311,6 +7344,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, int32_t index) {
return TSDB_CODE_TSC_INVALID_SQL;
}
// parse the window_state
/*
* transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last
......
......@@ -3550,6 +3550,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinctTag = pQueryInfo->distinctTag;
pQueryAttr->windowState = pQueryInfo->windowState;
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
......
......@@ -17,105 +17,105 @@
#define TDENGINE_TTOKENDEF_H
#define TK_ID 1
#define TK_BOOL 2
#define TK_TINYINT 3
#define TK_SMALLINT 4
#define TK_INTEGER 5
#define TK_BIGINT 6
#define TK_FLOAT 7
#define TK_DOUBLE 8
#define TK_STRING 9
#define TK_TIMESTAMP 10
#define TK_BINARY 11
#define TK_NCHAR 12
#define TK_OR 13
#define TK_AND 14
#define TK_NOT 15
#define TK_EQ 16
#define TK_NE 17
#define TK_ISNULL 18
#define TK_NOTNULL 19
#define TK_IS 20
#define TK_LIKE 21
#define TK_GLOB 22
#define TK_BETWEEN 23
#define TK_IN 24
#define TK_GT 25
#define TK_GE 26
#define TK_LT 27
#define TK_LE 28
#define TK_BITAND 29
#define TK_BITOR 30
#define TK_LSHIFT 31
#define TK_RSHIFT 32
#define TK_PLUS 33
#define TK_MINUS 34
#define TK_DIVIDE 35
#define TK_TIMES 36
#define TK_STAR 37
#define TK_SLASH 38
#define TK_REM 39
#define TK_CONCAT 40
#define TK_UMINUS 41
#define TK_UPLUS 42
#define TK_BITNOT 43
#define TK_SHOW 44
#define TK_DATABASES 45
#define TK_TOPICS 46
#define TK_MNODES 47
#define TK_DNODES 48
#define TK_ACCOUNTS 49
#define TK_USERS 50
#define TK_MODULES 51
#define TK_QUERIES 52
#define TK_CONNECTIONS 53
#define TK_STREAMS 54
#define TK_VARIABLES 55
#define TK_SCORES 56
#define TK_GRANTS 57
#define TK_VNODES 58
#define TK_IPTOKEN 59
#define TK_DOT 60
#define TK_CREATE 61
#define TK_TABLE 62
#define TK_STABLE 63
#define TK_DATABASE 64
#define TK_TABLES 65
#define TK_STABLES 66
#define TK_VGROUPS 67
#define TK_DROP 68
#define TK_TOPIC 69
#define TK_DNODE 70
#define TK_USER 71
#define TK_ACCOUNT 72
#define TK_USE 73
#define TK_DESCRIBE 74
#define TK_ALTER 75
#define TK_PASS 76
#define TK_PRIVILEGE 77
#define TK_LOCAL 78
#define TK_IF 79
#define TK_EXISTS 80
#define TK_PPS 81
#define TK_TSERIES 82
#define TK_DBS 83
#define TK_STORAGE 84
#define TK_QTIME 85
#define TK_CONNS 86
#define TK_STATE 87
#define TK_KEEP 88
#define TK_CACHE 89
#define TK_REPLICA 90
#define TK_QUORUM 91
#define TK_DAYS 92
#define TK_MINROWS 93
#define TK_MAXROWS 94
#define TK_BLOCKS 95
#define TK_CTIME 96
#define TK_WAL 97
#define TK_FSYNC 98
#define TK_COMP 99
#define TK_ID 1
#define TK_BOOL 2
#define TK_TINYINT 3
#define TK_SMALLINT 4
#define TK_INTEGER 5
#define TK_BIGINT 6
#define TK_FLOAT 7
#define TK_DOUBLE 8
#define TK_STRING 9
#define TK_TIMESTAMP 10
#define TK_BINARY 11
#define TK_NCHAR 12
#define TK_OR 13
#define TK_AND 14
#define TK_NOT 15
#define TK_EQ 16
#define TK_NE 17
#define TK_ISNULL 18
#define TK_NOTNULL 19
#define TK_IS 20
#define TK_LIKE 21
#define TK_GLOB 22
#define TK_BETWEEN 23
#define TK_IN 24
#define TK_GT 25
#define TK_GE 26
#define TK_LT 27
#define TK_LE 28
#define TK_BITAND 29
#define TK_BITOR 30
#define TK_LSHIFT 31
#define TK_RSHIFT 32
#define TK_PLUS 33
#define TK_MINUS 34
#define TK_DIVIDE 35
#define TK_TIMES 36
#define TK_STAR 37
#define TK_SLASH 38
#define TK_REM 39
#define TK_CONCAT 40
#define TK_UMINUS 41
#define TK_UPLUS 42
#define TK_BITNOT 43
#define TK_SHOW 44
#define TK_DATABASES 45
#define TK_TOPICS 46
#define TK_MNODES 47
#define TK_DNODES 48
#define TK_ACCOUNTS 49
#define TK_USERS 50
#define TK_MODULES 51
#define TK_QUERIES 52
#define TK_CONNECTIONS 53
#define TK_STREAMS 54
#define TK_VARIABLES 55
#define TK_SCORES 56
#define TK_GRANTS 57
#define TK_VNODES 58
#define TK_IPTOKEN 59
#define TK_DOT 60
#define TK_CREATE 61
#define TK_TABLE 62
#define TK_STABLE 63
#define TK_DATABASE 64
#define TK_TABLES 65
#define TK_STABLES 66
#define TK_VGROUPS 67
#define TK_DROP 68
#define TK_TOPIC 69
#define TK_DNODE 70
#define TK_USER 71
#define TK_ACCOUNT 72
#define TK_USE 73
#define TK_DESCRIBE 74
#define TK_ALTER 75
#define TK_PASS 76
#define TK_PRIVILEGE 77
#define TK_LOCAL 78
#define TK_IF 79
#define TK_EXISTS 80
#define TK_PPS 81
#define TK_TSERIES 82
#define TK_DBS 83
#define TK_STORAGE 84
#define TK_QTIME 85
#define TK_CONNS 86
#define TK_STATE 87
#define TK_KEEP 88
#define TK_CACHE 89
#define TK_REPLICA 90
#define TK_QUORUM 91
#define TK_DAYS 92
#define TK_MINROWS 93
#define TK_MAXROWS 94
#define TK_BLOCKS 95
#define TK_CTIME 96
#define TK_WAL 97
#define TK_FSYNC 98
#define TK_COMP 99
#define TK_PRECISION 100
#define TK_UPDATE 101
#define TK_CACHELAST 102
......@@ -136,74 +136,74 @@
#define TK_VARIABLE 117
#define TK_INTERVAL 118
#define TK_SESSION 119
#define TK_FILL 120
#define TK_SLIDING 121
#define TK_ORDER 122
#define TK_BY 123
#define TK_ASC 124
#define TK_DESC 125
#define TK_GROUP 126
#define TK_HAVING 127
#define TK_LIMIT 128
#define TK_OFFSET 129
#define TK_SLIMIT 130
#define TK_SOFFSET 131
#define TK_WHERE 132
#define TK_NOW 133
#define TK_RESET 134
#define TK_QUERY 135
#define TK_SYNCDB 136
#define TK_ADD 137
#define TK_COLUMN 138
#define TK_TAG 139
#define TK_CHANGE 140
#define TK_SET 141
#define TK_KILL 142
#define TK_CONNECTION 143
#define TK_STREAM 144
#define TK_COLON 145
#define TK_ABORT 146
#define TK_AFTER 147
#define TK_ATTACH 148
#define TK_BEFORE 149
#define TK_BEGIN 150
#define TK_CASCADE 151
#define TK_CLUSTER 152
#define TK_CONFLICT 153
#define TK_COPY 154
#define TK_DEFERRED 155
#define TK_DELIMITERS 156
#define TK_DETACH 157
#define TK_EACH 158
#define TK_END 159
#define TK_EXPLAIN 160
#define TK_FAIL 161
#define TK_FOR 162
#define TK_IGNORE 163
#define TK_IMMEDIATE 164
#define TK_INITIALLY 165
#define TK_INSTEAD 166
#define TK_MATCH 167
#define TK_KEY 168
#define TK_OF 169
#define TK_RAISE 170
#define TK_REPLACE 171
#define TK_RESTRICT 172
#define TK_ROW 173
#define TK_STATEMENT 174
#define TK_TRIGGER 175
#define TK_VIEW 176
#define TK_SEMI 177
#define TK_NONE 178
#define TK_PREV 179
#define TK_LINEAR 180
#define TK_IMPORT 181
#define TK_TBNAME 182
#define TK_JOIN 183
#define TK_INSERT 184
#define TK_INTO 185
#define TK_VALUES 186
#define TK_WINDOW_STATE 120
#define TK_FILL 121
#define TK_SLIDING 122
#define TK_ORDER 123
#define TK_BY 124
#define TK_ASC 125
#define TK_DESC 126
#define TK_GROUP 127
#define TK_HAVING 128
#define TK_LIMIT 129
#define TK_OFFSET 130
#define TK_SLIMIT 131
#define TK_SOFFSET 132
#define TK_WHERE 133
#define TK_NOW 134
#define TK_RESET 135
#define TK_QUERY 136
#define TK_SYNCDB 137
#define TK_ADD 138
#define TK_COLUMN 139
#define TK_TAG 140
#define TK_CHANGE 141
#define TK_SET 142
#define TK_KILL 143
#define TK_CONNECTION 144
#define TK_STREAM 145
#define TK_COLON 146
#define TK_ABORT 147
#define TK_AFTER 148
#define TK_ATTACH 149
#define TK_BEFORE 150
#define TK_BEGIN 151
#define TK_CASCADE 152
#define TK_CLUSTER 153
#define TK_CONFLICT 154
#define TK_COPY 155
#define TK_DEFERRED 156
#define TK_DELIMITERS 157
#define TK_DETACH 158
#define TK_EACH 159
#define TK_END 160
#define TK_EXPLAIN 161
#define TK_FAIL 162
#define TK_FOR 163
#define TK_IGNORE 164
#define TK_IMMEDIATE 165
#define TK_INITIALLY 166
#define TK_INSTEAD 167
#define TK_MATCH 168
#define TK_KEY 169
#define TK_OF 170
#define TK_RAISE 171
#define TK_REPLACE 172
#define TK_RESTRICT 173
#define TK_ROW 174
#define TK_STATEMENT 175
#define TK_TRIGGER 176
#define TK_VIEW 177
#define TK_SEMI 178
#define TK_NONE 179
#define TK_PREV 180
#define TK_LINEAR 181
#define TK_IMPORT 182
#define TK_TBNAME 183
#define TK_JOIN 184
#define TK_INSERT 185
#define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300
#define TK_COMMENT 301
......
......@@ -196,6 +196,7 @@ typedef struct SQueryAttr {
bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query
bool windowState; // window State on sub/normal table
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
......@@ -302,6 +303,7 @@ enum OPERATOR_TYPE_E {
OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
OP_Filter = 19,
OP_Distinct = 20,
OP_StateWindow = 21,
};
typedef struct SOperatorInfo {
......@@ -465,6 +467,16 @@ typedef struct SSWindowOperatorInfo {
int32_t start; // start row index
} SSWindowOperatorInfo;
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
int32_t numOfRows; // number of rows
int32_t colIndex; // start row index
int32_t start;
char* prevData; // previous data
} SStateWindowOperatorInfo ;
typedef struct SDistinctOperatorInfo {
SHashObj *pSet;
SSDataBlock *pRes;
......@@ -512,6 +524,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......
......@@ -89,6 +89,10 @@ typedef struct SSessionWindowVal {
SStrToken gap;
} SSessionWindowVal;
typedef struct SWindowStateVal {
SStrToken col;
} SWindowStateVal;
struct SRelationInfo;
typedef struct SSqlNode {
......@@ -100,6 +104,7 @@ typedef struct SSqlNode {
SArray *fillType; // fill type[optional], SArray<tVariantListItem>
SIntervalVal interval; // (interval, interval_offset) [optional]
SSessionWindowVal sessionVal; // session window [optional]
SWindowStateVal windowstateVal; // window_state(col) [optional]
SStrToken sliding; // sliding window [optional]
SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional]
......@@ -271,7 +276,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc
void tSqlExprListDestroy(SArray *pList);
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SWindowStateVal *pw,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving);
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
......
......@@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement /////////////////////////////////
%type select {SSqlNode*}
%destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N);
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
}
select(A) ::= LP select(B) RP. {A = B;}
......@@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select client_version()
// select server_state()
select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
}
// selcollist is a list of expressions that are to become the return
......@@ -552,6 +552,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
X.col = V;
X.gap = Y;
}
%type windowstate_option {SWindowStateVal}
windowstate_option(X) ::= . {X.col.n = 0;}
windowstate_option(X) ::= WINDOW_STATE LP ids(V) RP. {
X.col = V;
}
%type fill_opt {SArray*}
%destructor fill_opt {taosArrayDestroy($$);}
......
......@@ -1758,6 +1758,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
break;
}
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream->info, pRuntimeEnv->proot);
break;
}
case OP_Limit: {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
......@@ -4413,6 +4418,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (pDownstream->operatorType == OP_SessionWindow) {
SSWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_StateWindow) {
SStateWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
......@@ -5050,6 +5061,121 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes;
}
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SOptrBasicInfo* pBInfo = &pInfo->binfo;
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_TIMESTAMP) {
qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
return;
}
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) {
continue;
}
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
memcpy(pInfo->prevData, val, bytes);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
} else if (0 == memcmp(pInfo->prevData, val, bytes)) {
pInfo->curWindow.ekey = tsList[j];
pInfo->numOfRows += 1;
pInfo->start = j;
} else {
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->start = j;
}
}
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream;
while (1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
if (pBlock == NULL) {
break;
}
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order);
if (pWindowInfo->colIndex == -1) {
pWindowInfo->colIndex = pOperator->pExpr->base.colInfo.colIndex;
}
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
// restore the value
pQueryAttr->order.order = order;
pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
......@@ -5500,7 +5626,28 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return pOperator;
}
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "StateWindowOperator";
pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doStateWindowAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
......
......@@ -121,6 +121,10 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
if (pQueryAttr->stableQuery && !pQueryAttr->tsCompQuery) {
op = OP_MultiTableAggregate;
} else {
if (pQueryAttr->windowState) {
op = OP_StateWindow;
taosArrayPush(plan, &op);
}
op = OP_Aggregate;
}
......
......@@ -725,7 +725,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
*/
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
SSessionWindowVal *pSession, SWindowStateVal *pWindowStateVal, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
SLimitVal *psLimit, tSqlExpr *pHaving) {
assert(pSelNodeList != NULL);
......@@ -777,6 +777,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat
TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col);
}
if (pWindowStateVal != NULL) {
pSqlNode->windowstateVal = *pWindowStateVal;
} else {
TPARSER_SET_NONE_TOKEN(pSqlNode->windowstateVal.col);
}
return pSqlNode;
}
......
......@@ -141,6 +141,7 @@ static SKeyword keywordTable[] = {
{"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL},
{"SESSION", TK_SESSION},
{"WINDOW_STATE", TK_WINDOW_STATE},
{"FILL", TK_FILL},
{"SLIDING", TK_SLIDING},
{"ORDER", TK_ORDER},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册