未验证 提交 b289b28a 编写于 作者: H haojun Liao 提交者: GitHub

Merge pull request #6305 from taosdata/feature/TD-2570

Feature/td 2570
...@@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/ */
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo); bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo); bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo); bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
......
...@@ -90,6 +90,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm ...@@ -90,6 +90,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm
static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode); static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken); static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding); static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
static int32_t validateStateWindowNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem); static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
...@@ -851,6 +852,59 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS ...@@ -851,6 +852,59 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
// The following part is used to check for the invalid query expression. // The following part is used to check for the invalid query expression.
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo); return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
} }
static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
const char* msg1 = "invalid column name";
const char* msg3 = "not support state_window with group by ";
const char* msg4 = "function not support for super table query";
SStrToken *col = &(pSqlNode->windowstateVal.col) ;
if (col->z == NULL || col->n <= 0) {
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
pQueryInfo->groupbyExpr.numOfGroupCols = 1;
//TODO(dengyihao): check tag column
if (isStable) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
SGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr;
if (pGroupExpr->columnInfo == NULL) {
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
}
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
pQueryInfo->stateWindow = true;
return TSDB_CODE_SUCCESS;
}
int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) { int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) {
const char* msg1 = "gap should be fixed time window"; const char* msg1 = "gap should be fixed time window";
...@@ -885,11 +939,17 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS ...@@ -885,11 +939,17 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS
if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) { if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
if (pQueryInfo->sessionWindow.gap == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
SColumnIndex index = COLUMN_INDEX_INITIALIZER; SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) { if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
...@@ -2896,6 +2956,9 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) ...@@ -2896,6 +2956,9 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
return true; return true;
} }
} }
} else if (tscIsSessionWindowQuery(pQueryInfo)) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return true;
} }
return false; return false;
...@@ -6156,7 +6219,7 @@ static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) { ...@@ -6156,7 +6219,7 @@ static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) {
int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "functions/columns not allowed in group by query"; const char* msg1 = "functions/columns not allowed in group by query";
const char* msg2 = "projection query on columns not allowed"; const char* msg2 = "projection query on columns not allowed";
const char* msg3 = "group by not allowed on projection query"; const char* msg3 = "group by/session/state_window not allowed on projection query";
const char* msg4 = "retrieve tags not compatible with group by or interval query"; const char* msg4 = "retrieve tags not compatible with group by or interval query";
const char* msg5 = "functions can not be mixed up"; const char* msg5 = "functions can not be mixed up";
...@@ -6172,6 +6235,9 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { ...@@ -6172,6 +6235,9 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
if (tscIsProjectionQuery(pQueryInfo) && tscIsSessionWindowQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
// check if all the tags prj columns belongs to the group by columns // check if all the tags prj columns belongs to the group by columns
...@@ -6741,6 +6807,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) { ...@@ -6741,6 +6807,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) { if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
...@@ -7529,7 +7596,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7529,7 +7596,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
// parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// set order by info // set order by info
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) != if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
...@@ -7570,6 +7640,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7570,6 +7640,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
* transfer sql functions that need secondary merge into another format * transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last * in dealing with super table queries such as: count/first/last
*/ */
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (isSTable) { if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo); tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) { if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
...@@ -7577,10 +7651,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7577,10 +7651,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
} }
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// no result due to invalid query time range // no result due to invalid query time range
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) { if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
......
...@@ -857,6 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -857,6 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->simpleAgg = query.simpleAgg; pQueryMsg->simpleAgg = query.simpleAgg;
pQueryMsg->pointInterpQuery = query.pointInterpQuery; pQueryMsg->pointInterpQuery = query.pointInterpQuery;
pQueryMsg->needReverseScan = query.needReverseScan; pQueryMsg->needReverseScan = query.needReverseScan;
pQueryMsg->stateWindow = query.stateWindow;
pQueryMsg->numOfTags = htonl(numOfTags); pQueryMsg->numOfTags = htonl(numOfTags);
pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->sqlstrLen = htonl(sqlLen);
......
...@@ -434,6 +434,9 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { ...@@ -434,6 +434,9 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
return false; return false;
} }
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->sessionWindow.gap > 0;
}
bool tscNeedReverseScan(SQueryInfo* pQueryInfo) { bool tscNeedReverseScan(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscNumOfExprs(pQueryInfo); size_t numOfExprs = tscNumOfExprs(pQueryInfo);
...@@ -4202,11 +4205,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4202,11 +4205,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo); pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo); pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type); pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinctTag = pQueryInfo->distinctTag; pQueryAttr->distinctTag = pQueryInfo->distinctTag;
pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput; pQueryAttr->numOfOutput = numOfOutput;
...@@ -4214,9 +4219,9 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4214,9 +4219,9 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->slimit = pQueryInfo->slimit; pQueryAttr->slimit = pQueryInfo->slimit;
pQueryAttr->order = pQueryInfo->order; pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryAttr->havingNum = pQueryInfo->havingFieldNum; pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window; pQueryAttr->window = pQueryInfo->window;
} else { } else {
......
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f Subproject commit 8ce6d86558afc8c0b50c10f990fd2b4270cf06fc
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
...@@ -473,6 +473,7 @@ typedef struct { ...@@ -473,6 +473,7 @@ typedef struct {
bool simpleAgg; bool simpleAgg;
bool pointInterpQuery; // point interpolation query bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag
STimeWindow window; STimeWindow window;
int32_t numOfTables; int32_t numOfTables;
......
...@@ -136,74 +136,74 @@ ...@@ -136,74 +136,74 @@
#define TK_VARIABLE 117 #define TK_VARIABLE 117
#define TK_INTERVAL 118 #define TK_INTERVAL 118
#define TK_SESSION 119 #define TK_SESSION 119
#define TK_FILL 120 #define TK_STATE_WINDOW 120
#define TK_SLIDING 121 #define TK_FILL 121
#define TK_ORDER 122 #define TK_SLIDING 122
#define TK_BY 123 #define TK_ORDER 123
#define TK_ASC 124 #define TK_BY 124
#define TK_DESC 125 #define TK_ASC 125
#define TK_GROUP 126 #define TK_DESC 126
#define TK_HAVING 127 #define TK_GROUP 127
#define TK_LIMIT 128 #define TK_HAVING 128
#define TK_OFFSET 129 #define TK_LIMIT 129
#define TK_SLIMIT 130 #define TK_OFFSET 130
#define TK_SOFFSET 131 #define TK_SLIMIT 131
#define TK_WHERE 132 #define TK_SOFFSET 132
#define TK_NOW 133 #define TK_WHERE 133
#define TK_RESET 134 #define TK_NOW 134
#define TK_QUERY 135 #define TK_RESET 135
#define TK_SYNCDB 136 #define TK_QUERY 136
#define TK_ADD 137 #define TK_SYNCDB 137
#define TK_COLUMN 138 #define TK_ADD 138
#define TK_TAG 139 #define TK_COLUMN 139
#define TK_CHANGE 140 #define TK_TAG 140
#define TK_SET 141 #define TK_CHANGE 141
#define TK_KILL 142 #define TK_SET 142
#define TK_CONNECTION 143 #define TK_KILL 143
#define TK_STREAM 144 #define TK_CONNECTION 144
#define TK_COLON 145 #define TK_STREAM 145
#define TK_ABORT 146 #define TK_COLON 146
#define TK_AFTER 147 #define TK_ABORT 147
#define TK_ATTACH 148 #define TK_AFTER 148
#define TK_BEFORE 149 #define TK_ATTACH 149
#define TK_BEGIN 150 #define TK_BEFORE 150
#define TK_CASCADE 151 #define TK_BEGIN 151
#define TK_CLUSTER 152 #define TK_CASCADE 152
#define TK_CONFLICT 153 #define TK_CLUSTER 153
#define TK_COPY 154 #define TK_CONFLICT 154
#define TK_DEFERRED 155 #define TK_COPY 155
#define TK_DELIMITERS 156 #define TK_DEFERRED 156
#define TK_DETACH 157 #define TK_DELIMITERS 157
#define TK_EACH 158 #define TK_DETACH 158
#define TK_END 159 #define TK_EACH 159
#define TK_EXPLAIN 160 #define TK_END 160
#define TK_FAIL 161 #define TK_EXPLAIN 161
#define TK_FOR 162 #define TK_FAIL 162
#define TK_IGNORE 163 #define TK_FOR 163
#define TK_IMMEDIATE 164 #define TK_IGNORE 164
#define TK_INITIALLY 165 #define TK_IMMEDIATE 165
#define TK_INSTEAD 166 #define TK_INITIALLY 166
#define TK_MATCH 167 #define TK_INSTEAD 167
#define TK_KEY 168 #define TK_MATCH 168
#define TK_OF 169 #define TK_KEY 169
#define TK_RAISE 170 #define TK_OF 170
#define TK_REPLACE 171 #define TK_RAISE 171
#define TK_RESTRICT 172 #define TK_REPLACE 172
#define TK_ROW 173 #define TK_RESTRICT 173
#define TK_STATEMENT 174 #define TK_ROW 174
#define TK_TRIGGER 175 #define TK_STATEMENT 175
#define TK_VIEW 176 #define TK_TRIGGER 176
#define TK_SEMI 177 #define TK_VIEW 177
#define TK_NONE 178 #define TK_SEMI 178
#define TK_PREV 179 #define TK_NONE 179
#define TK_LINEAR 180 #define TK_PREV 180
#define TK_IMPORT 181 #define TK_LINEAR 181
#define TK_TBNAME 182 #define TK_IMPORT 182
#define TK_JOIN 183 #define TK_TBNAME 183
#define TK_INSERT 184 #define TK_JOIN 184
#define TK_INTO 185 #define TK_INSERT 185
#define TK_VALUES 186 #define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
......
...@@ -189,6 +189,7 @@ typedef struct SQueryAttr { ...@@ -189,6 +189,7 @@ typedef struct SQueryAttr {
bool pointInterpQuery; // point interpolation query bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query bool distinctTag; // distinct tag query
bool stateWindow; // window State on sub/normal table
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
...@@ -296,6 +297,7 @@ enum OPERATOR_TYPE_E { ...@@ -296,6 +297,7 @@ enum OPERATOR_TYPE_E {
OP_Filter = 19, OP_Filter = 19,
OP_Distinct = 20, OP_Distinct = 20,
OP_Join = 21, OP_Join = 21,
OP_StateWindow = 22,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
...@@ -460,6 +462,16 @@ typedef struct SSWindowOperatorInfo { ...@@ -460,6 +462,16 @@ typedef struct SSWindowOperatorInfo {
int32_t start; // start row index int32_t start; // start row index
} SSWindowOperatorInfo; } SSWindowOperatorInfo;
typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
int32_t numOfRows; // number of rows
int32_t colIndex; // start row index
int32_t start;
char* prevData; // previous data
} SStateWindowOperatorInfo ;
typedef struct SDistinctOperatorInfo { typedef struct SDistinctOperatorInfo {
SHashObj *pSet; SHashObj *pSet;
SSDataBlock *pRes; SSDataBlock *pRes;
...@@ -509,6 +521,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat ...@@ -509,6 +521,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger, bool groupMix); int32_t numOfRows, void* merger, bool groupMix);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
......
...@@ -89,6 +89,10 @@ typedef struct SSessionWindowVal { ...@@ -89,6 +89,10 @@ typedef struct SSessionWindowVal {
SStrToken gap; SStrToken gap;
} SSessionWindowVal; } SSessionWindowVal;
typedef struct SWindowStateVal {
SStrToken col;
} SWindowStateVal;
struct SRelationInfo; struct SRelationInfo;
typedef struct SSqlNode { typedef struct SSqlNode {
...@@ -100,6 +104,7 @@ typedef struct SSqlNode { ...@@ -100,6 +104,7 @@ typedef struct SSqlNode {
SArray *fillType; // fill type[optional], SArray<tVariantListItem> SArray *fillType; // fill type[optional], SArray<tVariantListItem>
SIntervalVal interval; // (interval, interval_offset) [optional] SIntervalVal interval; // (interval, interval_offset) [optional]
SSessionWindowVal sessionVal; // session window [optional] SSessionWindowVal sessionVal; // session window [optional]
SWindowStateVal windowstateVal; // window_state(col) [optional]
SStrToken sliding; // sliding window [optional] SStrToken sliding; // sliding window [optional]
SLimitVal limit; // limit offset [optional] SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional] SLimitVal slimit; // group limit offset [optional]
...@@ -275,7 +280,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc ...@@ -275,7 +280,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc
void tSqlExprListDestroy(SArray *pList); void tSqlExprListDestroy(SArray *pList);
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SWindowStateVal *pw,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving); SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving);
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right); int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
......
...@@ -138,6 +138,7 @@ typedef struct SQueryInfo { ...@@ -138,6 +138,7 @@ typedef struct SQueryInfo {
bool hasFilter; bool hasFilter;
bool onlyTagQuery; bool onlyTagQuery;
bool orderProjectQuery; bool orderProjectQuery;
bool stateWindow;
} SQueryInfo; } SQueryInfo;
/** /**
......
...@@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). { ...@@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
//////////////////////// The SELECT statement ///////////////////////////////// //////////////////////// The SELECT statement /////////////////////////////////
%type select {SSqlNode*} %type select {SSqlNode*}
%destructor select {destroySqlNode($$);} %destructor select {destroySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N); A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
} }
select(A) ::= LP select(B) RP. {A = B;} select(A) ::= LP select(B) RP. {A = B;}
...@@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); } ...@@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select client_version() // select client_version()
// select server_state() // select server_state()
select(A) ::= SELECT(T) selcollist(W). { select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
} }
// selcollist is a list of expressions that are to become the return // selcollist is a list of expressions that are to become the return
...@@ -558,6 +558,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. { ...@@ -558,6 +558,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
X.col = V; X.col = V;
X.gap = Y; X.gap = Y;
} }
%type windowstate_option {SWindowStateVal}
windowstate_option(X) ::= . {X.col.n = 0;}
windowstate_option(X) ::= STATE_WINDOW LP ids(V) RP. {
X.col = V;
}
%type fill_opt {SArray*} %type fill_opt {SArray*}
%destructor fill_opt {taosArrayDestroy($$);} %destructor fill_opt {taosArrayDestroy($$);}
......
...@@ -3299,8 +3299,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) { ...@@ -3299,8 +3299,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
if (pCtx->numOfParams == 2) { if (pCtx->numOfParams == 2) {
return; return;
} }
if (pCtx->param[0].i64 == 1) {
SET_VAL(pCtx, pCtx->size, 1);
} else {
INC_INIT_VAL(pCtx, pCtx->size); INC_INIT_VAL(pCtx, pCtx->size);
}
char *pData = GET_INPUT_DATA_LIST(pCtx); char *pData = GET_INPUT_DATA_LIST(pCtx);
if (pCtx->order == TSDB_ORDER_ASC) { if (pCtx->order == TSDB_ORDER_ASC) {
......
...@@ -189,12 +189,16 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); ...@@ -189,12 +189,16 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
static void destroyOperatorInfo(SOperatorInfo* pOperator); static void destroyOperatorInfo(SOperatorInfo* pOperator);
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
...@@ -731,7 +735,6 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx ...@@ -731,7 +735,6 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) { if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false; pCtx[k].preAggVals.isSet = false;
} }
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]); aAggs[functionId].xFunction(&pCtx[k]);
} }
...@@ -1299,7 +1302,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1299,7 +1302,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
} }
int32_t ret = int32_t ret =
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex); setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1338,12 +1341,16 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1338,12 +1341,16 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
pInfo->start = j; pInfo->start = j;
} else if (tsList[j] - pInfo->prevTs <= gap) { } else if (tsList[j] - pInfo->prevTs <= gap) {
pInfo->curWindow.ekey = tsList[j]; pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j]; //pInfo->prevTs = tsList[j];
pInfo->numOfRows += 1; pInfo->numOfRows += 1;
pInfo->start = j; if (j == 0 && pInfo->start != 0) {
pInfo->numOfRows = 1;
pInfo->start = 0;
}
} else { // start a new session window } else { // start a new session window
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
...@@ -1364,6 +1371,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf ...@@ -1364,6 +1371,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan, int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset); pBInfo->rowCellInfoOffset);
...@@ -1391,12 +1399,12 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { ...@@ -1391,12 +1399,12 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
} }
} }
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int32_t *rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
SResultRowInfo *pResultRowInfo = &pInfo->binfo.resultRowInfo; SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
SQLFunctionCtx *pCtx = pInfo->binfo.pCtx; SQLFunctionCtx *pCtx = binfo->pCtx;
// not assign result buffer yet, add new result buffer, TODO remove it // not assign result buffer yet, add new result buffer, TODO remove it
char* d = pData; char* d = pData;
...@@ -1767,6 +1775,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1767,6 +1775,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
break; break;
} }
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
break;
}
case OP_Limit: { case OP_Limit: {
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
...@@ -2109,6 +2122,8 @@ static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQu ...@@ -2109,6 +2122,8 @@ static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQu
static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); } static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
static bool notContainSessionOrStateWindow(SQueryAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); }
static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) { static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) {
bool hasFirstLastFunc = false; bool hasFirstLastFunc = false;
bool hasOtherFunc = false; bool hasOtherFunc = false;
...@@ -2212,7 +2227,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo ...@@ -2212,7 +2227,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
} }
pQueryAttr->order.order = TSDB_ORDER_ASC; pQueryAttr->order.order = TSDB_ORDER_ASC;
} else if (onlyLastQuery(pQueryAttr)) { } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
if (QUERY_IS_ASC_QUERY(pQueryAttr)) { if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey, qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
...@@ -3204,7 +3219,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult ...@@ -3204,7 +3219,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperator->numOfOutput; int32_t numOfOutput = pOperator->numOfOutput;
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0) { if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
if (pQueryAttr->groupbyColumn) { if (pQueryAttr->groupbyColumn) {
closeAllResultRows(pResultRowInfo); closeAllResultRows(pResultRowInfo);
...@@ -4514,6 +4529,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4514,6 +4529,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (pDownstream->operatorType == OP_SessionWindow) { } else if (pDownstream->operatorType == OP_SessionWindow) {
SSWindowOperatorInfo* pInfo = pDownstream->info; SSWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_StateWindow) {
SStateWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx; pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
...@@ -4625,7 +4646,6 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -4625,7 +4646,6 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
tfree(pInfo->currentGroupColData); tfree(pInfo->currentGroupColData);
} }
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList); taosArrayDestroy(pInfo->orderColumnList);
...@@ -5131,6 +5151,130 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5131,6 +5151,130 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
} }
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
SOptrBasicInfo* pBInfo = &pInfo->binfo;
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
pInfo->numOfRows = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
char* val = ((char*)pColInfoData->pData) + bytes * j;
if (isNull(val, type)) {
continue;
}
if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes);
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->start = j;
} else if (memcmp(pInfo->prevData, val, bytes) == 0) {
pInfo->curWindow.ekey = tsList[j];
pInfo->numOfRows += 1;
//pInfo->start = j;
if (j == 0 && pInfo->start != 0) {
pInfo->numOfRows = 1;
pInfo->start = 0;
}
} else {
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
memcpy(pInfo->prevData, val, bytes);
pInfo->numOfRows = 1;
pInfo->start = j;
}
}
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
while (1) {
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
if (pBlock == NULL) {
break;
}
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order);
if (pWindowInfo->colIndex == -1) {
pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
}
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
// restore the value
pQueryAttr->order.order = order;
pQueryAttr->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -5140,6 +5284,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -5140,6 +5284,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SSWindowOperatorInfo* pWindowInfo = pOperator->info; SSWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
...@@ -5152,6 +5297,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -5152,6 +5297,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
} }
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
//pQueryAttr->order.order = TSDB_ORDER_ASC;
int32_t order = pQueryAttr->order.order; int32_t order = pQueryAttr->order.order;
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
...@@ -5389,7 +5535,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera ...@@ -5389,7 +5535,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doAggregate; pOperator->exec = doAggregate;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -5409,6 +5555,19 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5409,6 +5555,19 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param; SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param;
doDestroyBasicInfo(pInfo, numOfOutput); doDestroyBasicInfo(pInfo, numOfOutput);
} }
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
tfree(pInfo->prevData);
}
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
...@@ -5463,7 +5622,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -5463,7 +5622,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSTableAggregate; pOperator->exec = doSTableAggregate;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroyAggOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -5589,7 +5748,29 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -5589,7 +5748,29 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
} }
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "StateWindowOperator";
pOperator->operatorType = OP_StateWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doStateWindowAgg;
pOperator->cleanup = destroyStateWindowOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator;
}
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
...@@ -5609,7 +5790,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -5609,7 +5790,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSessionWindowAgg; pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroyBasicOperatorInfo; pOperator->cleanup = destroySWindowOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
...@@ -6901,6 +7082,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S ...@@ -6901,6 +7082,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
pQueryAttr->stateWindow = pQueryMsg->stateWindow;
pQueryAttr->vgId = vgId; pQueryAttr->vgId = vgId;
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
......
...@@ -592,6 +592,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -592,6 +592,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
op = OP_SessionWindow; op = OP_SessionWindow;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->stateWindow) {
op = OP_StateWindow;
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic; op = OP_Arithmetic;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
......
...@@ -726,9 +726,9 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -726,9 +726,9 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
* extract the select info out of sql string * extract the select info out of sql string
*/ */
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere, SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *psLimit, SSessionWindowVal *pSession, SWindowStateVal *pWindowStateVal, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
tSqlExpr *pHaving) { SLimitVal *psLimit, tSqlExpr *pHaving) {
assert(pSelNodeList != NULL); assert(pSelNodeList != NULL);
SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode)); SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode));
...@@ -779,6 +779,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat ...@@ -779,6 +779,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat
TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col); TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col);
} }
if (pWindowStateVal != NULL) {
pSqlNode->windowstateVal = *pWindowStateVal;
} else {
TPARSER_SET_NONE_TOKEN(pSqlNode->windowstateVal.col);
}
return pSqlNode; return pSqlNode;
} }
......
此差异已折叠。
...@@ -141,6 +141,7 @@ static SKeyword keywordTable[] = { ...@@ -141,6 +141,7 @@ static SKeyword keywordTable[] = {
{"VARIABLE", TK_VARIABLE}, {"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL}, {"INTERVAL", TK_INTERVAL},
{"SESSION", TK_SESSION}, {"SESSION", TK_SESSION},
{"STATE_WINDOW", TK_STATE_WINDOW},
{"FILL", TK_FILL}, {"FILL", TK_FILL},
{"SLIDING", TK_SLIDING}, {"SLIDING", TK_SLIDING},
{"ORDER", TK_ORDER}, {"ORDER", TK_ORDER},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册