提交 28d1459c 编写于 作者: H Haojun Liao

[td-2569]<feature>: support session window query.

上级 eb681d46
...@@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i ...@@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo); bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo); bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo); bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex); bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex); bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
...@@ -153,7 +154,6 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F ...@@ -153,7 +154,6 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F
SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index); SInternalField* tscFieldInfoGetInternalField(SFieldInfo* pFieldInfo, int32_t index);
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
......
...@@ -198,9 +198,10 @@ typedef struct STableDataBlocks { ...@@ -198,9 +198,10 @@ typedef struct STableDataBlocks {
typedef struct SQueryInfo { typedef struct SQueryInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately. int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
STimeWindow window; // query time window SInterval interval; // tumble time window
SInterval interval; SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // group by tags info SSqlGroupbyExpr groupbyExpr; // group by tags info
SArray * colList; // SArray<SColumn*> SArray * colList; // SArray<SColumn*>
...@@ -397,7 +398,6 @@ typedef struct SSqlStream { ...@@ -397,7 +398,6 @@ typedef struct SSqlStream {
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable); void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable);
int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj); int tscAcquireRpc(const char *key, const char *user, const char *secret,void **pRpcObj);
void tscReleaseRpc(void *param); void tscReleaseRpc(void *param);
void tscInitMsgsFp(); void tscInitMsgsFp();
......
此差异已折叠。
...@@ -643,7 +643,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -643,7 +643,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} }
pSql->epSet.inUse = rand()%pSql->epSet.numOfEps; pSql->epSet.inUse = rand()%pSql->epSet.numOfEps;
pQueryMsg->head.vgId = htonl(vgId); pQueryMsg->head.vgId = htonl(vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg; STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
...@@ -658,8 +657,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -658,8 +657,6 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
assert(index >= 0 && index < numOfVgroups); assert(index >= 0 && index < numOfVgroups);
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index); SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
// set the vgroup info // set the vgroup info
...@@ -668,7 +665,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char ...@@ -668,7 +665,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList); int32_t numOfTables = (int32_t)taosArrayGetSize(pTableIdList->itemList);
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
tscDebug("%p query on stable, vgId:%d, numOfTables:%d, vgIndex:%d, numOfVgroups:%d", pSql,
pTableIdList->vgInfo.vgId, numOfTables, index, numOfVgroups);
// serialize each table id info // serialize each table id info
for(int32_t i = 0; i < numOfTables; ++i) { for(int32_t i = 0; i < numOfTables; ++i) {
STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i); STableIdInfo* pItem = taosArrayGet(pTableIdList->itemList, i);
...@@ -754,6 +754,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -754,6 +754,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit); pQueryMsg->vgroupLimit = htobe64(pQueryInfo->vgroupLimit);
pQueryMsg->sqlstrLen = htonl(sqlLen); pQueryMsg->sqlstrLen = htonl(sqlLen);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
pQueryMsg->sw.gap = htobe64(pQueryInfo->sessionWindow.gap);
pQueryMsg->sw.primaryColId = htonl(PRIMARYKEY_TIMESTAMP_COL_INDEX);
size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfOutput = tscSqlExprNumOfExprs(pQueryInfo);
pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number pQueryMsg->numOfOutput = htons((int16_t)numOfOutput); // this is the stage one output column number
......
...@@ -1062,7 +1062,6 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1062,7 +1062,6 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows); tscError("%p invalid ts comp file from vnode, abort subquery, file size:%d", pSql, numOfRows);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno); pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){ if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return; return;
} }
......
...@@ -239,6 +239,21 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) { ...@@ -239,6 +239,21 @@ bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo) {
return false; return false;
} }
bool tscGroupbyColumn(SQueryInfo* pQueryInfo) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
SSqlGroupbyExpr* pGroupbyExpr = &pQueryInfo->groupbyExpr;
for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) {
SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < numOfCols) { // group by normal columns
return true;
}
}
return false;
}
bool tscIsTWAQuery(SQueryInfo* pQueryInfo) { bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
......
...@@ -485,12 +485,13 @@ typedef struct { ...@@ -485,12 +485,13 @@ typedef struct {
int16_t orderColId; int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval; SInterval interval;
SSessionWindow sw; // session window
uint16_t tagCondLen; // tag length in current query uint16_t tagCondLen; // tag length in current query
uint32_t tbnameCondLen; // table name filter condition string length uint32_t tbnameCondLen; // table name filter condition string length
int16_t numOfGroupCols; // num of group by columns int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx; int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query. int16_t prjOrder; // global order in super table projection query.
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
......
...@@ -135,74 +135,75 @@ ...@@ -135,74 +135,75 @@
#define TK_FROM 116 #define TK_FROM 116
#define TK_VARIABLE 117 #define TK_VARIABLE 117
#define TK_INTERVAL 118 #define TK_INTERVAL 118
#define TK_FILL 119 #define TK_SESSION 119
#define TK_SLIDING 120 #define TK_FILL 120
#define TK_ORDER 121 #define TK_SLIDING 121
#define TK_BY 122 #define TK_ORDER 122
#define TK_ASC 123 #define TK_BY 123
#define TK_DESC 124 #define TK_ASC 124
#define TK_GROUP 125 #define TK_DESC 125
#define TK_HAVING 126 #define TK_GROUP 126
#define TK_LIMIT 127 #define TK_HAVING 127
#define TK_OFFSET 128 #define TK_LIMIT 128
#define TK_SLIMIT 129 #define TK_OFFSET 129
#define TK_SOFFSET 130 #define TK_SLIMIT 130
#define TK_WHERE 131 #define TK_SOFFSET 131
#define TK_NOW 132 #define TK_WHERE 132
#define TK_RESET 133 #define TK_NOW 133
#define TK_QUERY 134 #define TK_RESET 134
#define TK_ADD 135 #define TK_QUERY 135
#define TK_COLUMN 136 #define TK_ADD 136
#define TK_TAG 137 #define TK_COLUMN 137
#define TK_CHANGE 138 #define TK_TAG 138
#define TK_SET 139 #define TK_CHANGE 139
#define TK_KILL 140 #define TK_SET 140
#define TK_CONNECTION 141 #define TK_KILL 141
#define TK_STREAM 142 #define TK_CONNECTION 142
#define TK_COLON 143 #define TK_STREAM 143
#define TK_ABORT 144 #define TK_COLON 144
#define TK_AFTER 145 #define TK_ABORT 145
#define TK_ATTACH 146 #define TK_AFTER 146
#define TK_BEFORE 147 #define TK_ATTACH 147
#define TK_BEGIN 148 #define TK_BEFORE 148
#define TK_CASCADE 149 #define TK_BEGIN 149
#define TK_CLUSTER 150 #define TK_CASCADE 150
#define TK_CONFLICT 151 #define TK_CLUSTER 151
#define TK_COPY 152 #define TK_CONFLICT 152
#define TK_DEFERRED 153 #define TK_COPY 153
#define TK_DELIMITERS 154 #define TK_DEFERRED 154
#define TK_DETACH 155 #define TK_DELIMITERS 155
#define TK_EACH 156 #define TK_DETACH 156
#define TK_END 157 #define TK_EACH 157
#define TK_EXPLAIN 158 #define TK_END 158
#define TK_FAIL 159 #define TK_EXPLAIN 159
#define TK_FOR 160 #define TK_FAIL 160
#define TK_IGNORE 161 #define TK_FOR 161
#define TK_IMMEDIATE 162 #define TK_IGNORE 162
#define TK_INITIALLY 163 #define TK_IMMEDIATE 163
#define TK_INSTEAD 164 #define TK_INITIALLY 164
#define TK_MATCH 165 #define TK_INSTEAD 165
#define TK_KEY 166 #define TK_MATCH 166
#define TK_OF 167 #define TK_KEY 167
#define TK_RAISE 168 #define TK_OF 168
#define TK_REPLACE 169 #define TK_RAISE 169
#define TK_RESTRICT 170 #define TK_REPLACE 170
#define TK_ROW 171 #define TK_RESTRICT 171
#define TK_STATEMENT 172 #define TK_ROW 172
#define TK_TRIGGER 173 #define TK_STATEMENT 173
#define TK_VIEW 174 #define TK_TRIGGER 174
#define TK_SEMI 175 #define TK_VIEW 175
#define TK_NONE 176 #define TK_SEMI 176
#define TK_PREV 177 #define TK_NONE 177
#define TK_LINEAR 178 #define TK_PREV 178
#define TK_IMPORT 179 #define TK_LINEAR 179
#define TK_METRIC 180 #define TK_IMPORT 180
#define TK_TBNAME 181 #define TK_METRIC 181
#define TK_JOIN 182 #define TK_TBNAME 182
#define TK_METRICS 183 #define TK_JOIN 183
#define TK_INSERT 184 #define TK_METRICS 184
#define TK_INTO 185 #define TK_INSERT 185
#define TK_VALUES 186 #define TK_INTO 186
#define TK_VALUES 187
#define TK_SPACE 300 #define TK_SPACE 300
......
...@@ -72,6 +72,11 @@ typedef struct SInterval { ...@@ -72,6 +72,11 @@ typedef struct SInterval {
int64_t offset; int64_t offset;
} SInterval; } SInterval;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
int32_t primaryColId; // primary timestamp column
} SSessionWindow;
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision); int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision); int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precision);
int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision); int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char unit, int32_t precision);
......
...@@ -185,7 +185,6 @@ typedef struct SQuery { ...@@ -185,7 +185,6 @@ typedef struct SQuery {
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
...@@ -196,6 +195,7 @@ typedef struct SQuery { ...@@ -196,6 +195,7 @@ typedef struct SQuery {
STimeWindow window; STimeWindow window;
SInterval interval; SInterval interval;
SSessionWindow sw;
int16_t precision; int16_t precision;
int16_t numOfOutput; int16_t numOfOutput;
int16_t fillType; int16_t fillType;
...@@ -279,10 +279,11 @@ enum OPERATOR_TYPE_E { ...@@ -279,10 +279,11 @@ enum OPERATOR_TYPE_E {
OP_Groupby = 8, OP_Groupby = 8,
OP_Limit = 9, OP_Limit = 9,
OP_Offset = 10, OP_Offset = 10,
OP_TimeInterval = 11, OP_TimeWindow = 11,
OP_Fill = 12, OP_SessionWindow = 12,
OP_MultiTableAggregate = 13, OP_Fill = 13,
OP_MultiTableTimeInterval = 14, OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
...@@ -411,6 +412,14 @@ typedef struct SGroupbyOperatorInfo { ...@@ -411,6 +412,14 @@ typedef struct SGroupbyOperatorInfo {
char *prevData; // previous group by value char *prevData; // previous group by value
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SSWindowOperatorInfo {
SOptrBasicInfo binfo;
STimeWindow curWindow; // current time window
TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows
int32_t start; // start row index
} SSWindowOperatorInfo;
void freeParam(SQueryParam *param); void freeParam(SQueryParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param);
int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg, int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlFuncMsg **pExprMsg,
......
...@@ -58,14 +58,19 @@ typedef struct SIntervalVal { ...@@ -58,14 +58,19 @@ typedef struct SIntervalVal {
SStrToken offset; SStrToken offset;
} SIntervalVal; } SIntervalVal;
typedef struct SSessionWindowVal {
SStrToken col;
SStrToken gap;
} SSessionWindowVal;
typedef struct SQuerySQL { typedef struct SQuerySQL {
struct tSQLExprList *pSelection; // select clause struct tSQLExprList *pSelection; // select clause
SArray * from; // from clause SArray<tVariantListItem> SArray * from; // from clause SArray<tVariantListItem>
struct tSQLExpr * pWhere; // where clause [optional] struct tSQLExpr * pWhere; // where clause [optional]
SArray * pGroupby; // groupby clause, only for tags[optional], SArray<tVariantListItem> SArray * pGroupby; // groupby clause, only for tags[optional], SArray<tVariantListItem>
SArray * pSortOrder; // orderby [optional], SArray<tVariantListItem> SArray * pSortOrder; // orderby [optional], SArray<tVariantListItem>
SStrToken interval; // interval [optional] SIntervalVal interval; // (interval, interval_offset) [optional]
SStrToken offset; // offset window [optional] SSessionWindowVal sessionVal; // session window [optional]
SStrToken sliding; // sliding window [optional] SStrToken sliding; // sliding window [optional]
SLimitVal limit; // limit offset [optional] SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional] SLimitVal slimit; // group limit offset [optional]
...@@ -256,8 +261,8 @@ tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken ...@@ -256,8 +261,8 @@ tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken
void tSqlExprListDestroy(tSQLExprList *pList); void tSqlExprListDestroy(tSQLExprList *pList);
SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere, SQuerySQL *tSetQuerySqlNode(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit); SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit);
SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSelect, int32_t type); SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSelect, int32_t type);
......
...@@ -171,6 +171,7 @@ static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -171,6 +171,7 @@ static SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
...@@ -1254,8 +1255,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1254,8 +1255,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
numOfOutput);
} }
// restore current time window // restore current time window
...@@ -1268,8 +1268,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1268,8 +1268,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
// window start key interpolation // window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
numOfOutput);
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
...@@ -1287,13 +1286,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1287,13 +1286,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
ekey = reviseWindowEkey(pQuery, &nextWin); ekey = reviseWindowEkey(pQuery, &nextWin);
forwardStep = forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
pSDataBlock->info.rows, numOfOutput);
} }
if (pQuery->timeWindowInterpo) { if (pQuery->timeWindowInterpo) {
...@@ -1323,6 +1320,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1323,6 +1320,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
continue; continue;
} }
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) { if (pInfo->prevData == NULL || (memcmp(pInfo->prevData, val, bytes) != 0)) {
if (pInfo->prevData == NULL) { if (pInfo->prevData == NULL) {
pInfo->prevData = malloc(bytes); pInfo->prevData = malloc(bytes);
...@@ -1347,6 +1345,66 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1347,6 +1345,66 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
} }
} }
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->pQuery->current;
// primary timestamp column
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t gap = pOperator->pRuntimeEnv->pQuery->sw.gap;
pInfo->numOfRows = 0;
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
if (pInfo->prevTs == INT64_MIN) {
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
} else if (tsList[j] - pInfo->prevTs <= gap) {
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows += 1;
pInfo->start = j;
} else { // start a new session window
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
pInfo->curWindow.skey = tsList[j];
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
}
}
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
pSDataBlock->info.rows, pOperator->numOfOutput);
}
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
int64_t v = -1; int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData); GET_TYPED_DATA(v, int64_t, type, pData);
...@@ -1700,6 +1758,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1700,6 +1758,13 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
}
} else if (pQuery->sw.gap > 0) {
pRuntimeEnv->proot = createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput);
setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot);
if (pQuery->pExpr2 != NULL) { if (pQuery->pExpr2 != NULL) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2); pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr2, pQuery->numOfExpr2);
} }
...@@ -2482,7 +2547,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa ...@@ -2482,7 +2547,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// Calculate all time windows that are overlapping or contain current data block. // Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block. // If current data block is contained by all possible time window, do not load current data block.
if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn || if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn || pQuery->sw.gap > 0 ||
(QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info))) { (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info))) {
(*status) = BLK_DATA_ALL_NEEDED; (*status) = BLK_DATA_ALL_NEEDED;
} }
...@@ -3039,7 +3104,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult ...@@ -3039,7 +3104,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t numOfOutput = pOperator->numOfOutput; int32_t numOfOutput = pOperator->numOfOutput;
if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery)) { if (pQuery->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQuery) || pQuery->sw.gap > 0) {
// for each group result, call the finalize function for each column // for each group result, call the finalize function for each column
if (pQuery->groupbyColumn) { if (pQuery->groupbyColumn) {
closeAllResultRows(pResultRowInfo); closeAllResultRows(pResultRowInfo);
...@@ -4240,7 +4305,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4240,7 +4305,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pCtx = pAggInfo->binfo.pCtx; pTableScanInfo->pCtx = pAggInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_TimeInterval) { } else if (pDownstream->operatorType == OP_TimeWindow) {
STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info; STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info;
pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pCtx = pIntervalInfo->pCtx;
...@@ -4264,6 +4329,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4264,6 +4329,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
} else if (pDownstream->operatorType == OP_Arithmetic) { } else if (pDownstream->operatorType == OP_Arithmetic) {
SArithOperatorInfo *pInfo = pDownstream->info; SArithOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_SessionWindow) {
SSWindowOperatorInfo* pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx; pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
...@@ -4619,6 +4690,62 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { ...@@ -4619,6 +4690,62 @@ static SSDataBlock* doSTableIntervalAgg(void* param) {
return pIntervalInfo->pRes; return pIntervalInfo->pRes;
} }
static SSDataBlock* doSessionWindowAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSWindowOperatorInfo* pWindowInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes;
}
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t order = pQuery->order.order;
STimeWindow win = pQuery->window;
SOperatorInfo* upstream = pOperator->upstream;
while(1) {
SSDataBlock* pBlock = upstream->exec(upstream);
if (pBlock == NULL) {
break;
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQuery->order.order);
doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock);
}
// restore the value
pQuery->order.order = order;
pQuery->window = win;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
static SSDataBlock* hashGroupbyAggregate(void* param) { static SSDataBlock* hashGroupbyAggregate(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -4908,7 +5035,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -4908,7 +5035,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "TimeIntervalAggOperator"; pOperator->name = "TimeIntervalAggOperator";
pOperator->operatorType = OP_TimeInterval; pOperator->operatorType = OP_TimeWindow;
pOperator->blockingOptr = true; pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream; pOperator->upstream = upstream;
...@@ -4922,6 +5049,31 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp ...@@ -4922,6 +5049,31 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
return pOperator; return pOperator;
} }
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->prevTs = INT64_MIN;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = OP_SessionWindow;
pOperator->blockingOptr = true;
pOperator->status = OP_IN_EXECUTING;
pOperator->upstream = upstream;
pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doSessionWindowAgg;
pOperator->cleanup = destroyBasicOperatorInfo;
return pOperator;
}
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
...@@ -4971,7 +5123,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -4971,7 +5123,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
} }
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput) { int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
...@@ -5203,6 +5355,17 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) { ...@@ -5203,6 +5355,17 @@ static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
return false; return false;
} }
if (pQueryMsg->sw.gap < 0 || pQueryMsg->sw.primaryColId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
qError("qmsg:%p illegal value of session window time %" PRId64, pQueryMsg, pQueryMsg->sw.gap);
return false;
}
if (pQueryMsg->sw.gap > 0 && pQueryMsg->interval.interval > 0) {
qError("qmsg:%p illegal value of session window time %" PRId64" and interval value %"PRId64, pQueryMsg,
pQueryMsg->sw.gap, pQueryMsg->interval.interval);
return false;
}
if (pQueryMsg->numOfTables <= 0) { if (pQueryMsg->numOfTables <= 0) {
qError("qmsg:%p illegal value of numOfTables %d", pQueryMsg, pQueryMsg->numOfTables); qError("qmsg:%p illegal value of numOfTables %d", pQueryMsg, pQueryMsg->numOfTables);
return false; return false;
...@@ -5315,6 +5478,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { ...@@ -5315,6 +5478,8 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput); pQueryMsg->secondStageOutput = htonl(pQueryMsg->secondStageOutput);
pQueryMsg->sqlstrLen = htonl(pQueryMsg->sqlstrLen); pQueryMsg->sqlstrLen = htonl(pQueryMsg->sqlstrLen);
pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen); pQueryMsg->prevResultLen = htonl(pQueryMsg->prevResultLen);
pQueryMsg->sw.gap = htobe64(pQueryMsg->sw.gap);
pQueryMsg->sw.primaryColId = htonl(pQueryMsg->sw.primaryColId);
// query msg safety check // query msg safety check
if (!validateQueryMsg(pQueryMsg)) { if (!validateQueryMsg(pQueryMsg)) {
...@@ -5953,7 +6118,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -5953,7 +6118,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
pQuery->tagColList = pTagCols; pQuery->tagColList = pTagCols;
pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit; pQuery->prjInfo.vgroupLimit = pQueryMsg->vgroupLimit;
pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX; pQuery->prjInfo.ts = (pQueryMsg->order == TSDB_ORDER_ASC)? INT64_MIN:INT64_MAX;
pQuery->sw = pQueryMsg->sw;
pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo)); pQuery->colList = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
if (pQuery->colList == NULL) { if (pQuery->colList == NULL) {
goto _cleanup; goto _cleanup;
...@@ -6023,7 +6188,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6023,7 +6188,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery);
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
pQuery->queryWindowIdentical = true;
bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr); bool groupByCol = isGroupbyColumn(pQuery->pGroupbyExpr);
STimeWindow window = pQuery->window; STimeWindow window = pQuery->window;
...@@ -6042,11 +6206,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6042,11 +6206,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
for(int32_t j = 0; j < s; ++j) { for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j); STableKeyInfo* info = taosArrayGet(pa, j);
window.skey = info->lastKey; window.skey = info->lastKey;
if (info->lastKey != pQuery->window.skey) {
pQInfo->query.queryWindowIdentical = false;
}
void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, groupByCol, window, buf); STableQueryInfo* item = createTableQueryInfo(pQuery, info->pTable, groupByCol, window, buf);
......
...@@ -550,8 +550,8 @@ void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type) { ...@@ -550,8 +550,8 @@ void tSqlSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
/* /*
* extract the select info out of sql string * extract the select info out of sql string
*/ */
SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere, SQuerySQL *tSetQuerySqlNode(SStrToken *pSelectToken, tSQLExprList *pSelection, SArray *pFrom, tSQLExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit) { SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pGLimit) {
assert(pSelection != NULL); assert(pSelection != NULL);
...@@ -574,14 +574,17 @@ SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection, ...@@ -574,14 +574,17 @@ SQuerySQL *tSetQuerySqlElems(SStrToken *pSelectToken, tSQLExprList *pSelection,
} }
if (pInterval != NULL) { if (pInterval != NULL) {
pQuery->interval = pInterval->interval; pQuery->interval = *pInterval;
pQuery->offset = pInterval->offset;
} }
if (pSliding != NULL) { if (pSliding != NULL) {
pQuery->sliding = *pSliding; pQuery->sliding = *pSliding;
} }
if (pSession != NULL) {
pQuery->sessionVal = *pSession;
}
pQuery->fillType = pFill; pQuery->fillType = pFill;
return pQuery; return pQuery;
} }
......
...@@ -139,6 +139,7 @@ static SKeyword keywordTable[] = { ...@@ -139,6 +139,7 @@ static SKeyword keywordTable[] = {
{"FROM", TK_FROM}, {"FROM", TK_FROM},
{"VARIABLE", TK_VARIABLE}, {"VARIABLE", TK_VARIABLE},
{"INTERVAL", TK_INTERVAL}, {"INTERVAL", TK_INTERVAL},
{"SESSION", TK_SESSION},
{"FILL", TK_FILL}, {"FILL", TK_FILL},
{"SLIDING", TK_SLIDING}, {"SLIDING", TK_SLIDING},
{"ORDER", TK_ORDER}, {"ORDER", TK_ORDER},
......
...@@ -42,12 +42,11 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { ...@@ -42,12 +42,11 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
} }
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
pResultRowInfo->capacity = size; pResultRowInfo->type = type;
pResultRowInfo->type = type;
pResultRowInfo->curIndex = -1;
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
pResultRowInfo->curIndex = -1;
pResultRowInfo->capacity = size;
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES); pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
if (pResultRowInfo->pResult == NULL) { if (pResultRowInfo->pResult == NULL) {
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册