Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8972ebc1
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8972ebc1
编写于
8月 18, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-1103] refactor the structure for reducing memory consumption during interval query.
上级
18926450
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
177 addition
and
171 deletion
+177
-171
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+7
-9
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+2
-1
src/query/inc/tsqlfunction.h
src/query/inc/tsqlfunction.h
+7
-7
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+137
-134
src/query/src/qUtil.c
src/query/src/qUtil.c
+24
-20
未找到文件。
src/query/inc/qExecutor.h
浏览文件 @
8972ebc1
...
@@ -42,20 +42,16 @@ typedef struct SSqlGroupbyExpr {
...
@@ -42,20 +42,16 @@ typedef struct SSqlGroupbyExpr {
}
SSqlGroupbyExpr
;
}
SSqlGroupbyExpr
;
typedef
struct
SPosInfo
{
typedef
struct
SPosInfo
{
int32_t
pageId
;
int32_t
pageId
:
20
;
int32_t
rowId
;
int32_t
rowId
:
12
;
}
SPosInfo
;
}
SPosInfo
;
typedef
struct
SWindowStatus
{
bool
closed
;
}
SWindowStatus
;
typedef
struct
SWindowResult
{
typedef
struct
SWindowResult
{
uint16_t
numOfRows
;
// number of rows of current time window
SWindowStatus
status
;
// this result status: closed or opened
SPosInfo
pos
;
// Position of current result in disk-based output buffer
SPosInfo
pos
;
// Position of current result in disk-based output buffer
uint16_t
numOfRows
;
// number of rows of current time window
bool
closed
;
// this result status: closed or opened
SResultInfo
*
resultInfo
;
// For each result column, there is a resultInfo
SResultInfo
*
resultInfo
;
// For each result column, there is a resultInfo
STimeWindow
window
;
// The time window that current result covers.
TSKEY
skey
;
// start key of current time window
}
SWindowResult
;
}
SWindowResult
;
/**
/**
...
@@ -79,6 +75,7 @@ typedef struct SWindowResInfo {
...
@@ -79,6 +75,7 @@ typedef struct SWindowResInfo {
int64_t
startTime
;
// start time of the first time window for sliding query
int64_t
startTime
;
// start time of the first time window for sliding query
int64_t
prevSKey
;
// previous (not completed) sliding window start key
int64_t
prevSKey
;
// previous (not completed) sliding window start key
int64_t
threshold
;
// threshold to halt query and return the generated results.
int64_t
threshold
;
// threshold to halt query and return the generated results.
int64_t
interval
;
// time window interval
}
SWindowResInfo
;
}
SWindowResInfo
;
typedef
struct
SColumnFilterElem
{
typedef
struct
SColumnFilterElem
{
...
@@ -123,6 +120,7 @@ typedef struct SQueryCostInfo {
...
@@ -123,6 +120,7 @@ typedef struct SQueryCostInfo {
uint64_t
elapsedTime
;
uint64_t
elapsedTime
;
uint64_t
computTime
;
uint64_t
computTime
;
uint64_t
internalSupSize
;
uint64_t
internalSupSize
;
uint64_t
numOfTimeWindows
;
}
SQueryCostInfo
;
}
SQueryCostInfo
;
typedef
struct
SQuery
{
typedef
struct
SQuery
{
...
...
src/query/inc/qUtil.h
浏览文件 @
8972ebc1
...
@@ -38,7 +38,8 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf
...
@@ -38,7 +38,8 @@ static FORCE_INLINE SWindowResult *getWindowResult(SWindowResInfo *pWindowResInf
return
&
pWindowResInfo
->
pResult
[
slot
];
return
&
pWindowResInfo
->
pResult
[
slot
];
}
}
#define curTimeWindow(_winres) ((_winres)->curIndex)
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
#define GET_TIMEWINDOW(_winresInfo, _win) (STimeWindow) {(_win)->skey, ((_win)->skey + (_winresInfo)->interval)}
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pSelectExpr[1].base.arg->argValue.i64:1)
bool
isWindowResClosed
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
);
bool
isWindowResClosed
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
);
...
...
src/query/inc/tsqlfunction.h
浏览文件 @
8972ebc1
...
@@ -137,13 +137,13 @@ typedef struct SInterpInfoDetail {
...
@@ -137,13 +137,13 @@ typedef struct SInterpInfoDetail {
}
SInterpInfoDetail
;
}
SInterpInfoDetail
;
typedef
struct
SResultInfo
{
typedef
struct
SResultInfo
{
int8_t
hasResult
;
// result generated, not NULL value
int8_t
hasResult
;
// result generated, not NULL value
bool
initialized
;
// output buffer has been initialized
bool
initialized
:
1
;
// output buffer has been initialized
bool
complete
;
// query has completed
bool
complete
:
1
;
// query has completed
bool
superTableQ
;
// is super table query
bool
superTableQ
:
1
;
// is super table query
int
32_t
numOfRes
;
// num of output result in current buffer
int
16_t
numOfRes
;
// num of output result in current buffer
int32_t
bufLen
;
// buffer size
u
int32_t
bufLen
;
// buffer size
void
*
interResultBuf
;
// output result buffer
void
*
interResultBuf
;
// output result buffer
}
SResultInfo
;
}
SResultInfo
;
struct
SQLFunctionCtx
;
struct
SQLFunctionCtx
;
...
...
src/query/src/qExecutor.c
浏览文件 @
8972ebc1
...
@@ -27,7 +27,8 @@
...
@@ -27,7 +27,8 @@
#include "query.h"
#include "query.h"
#include "queryLog.h"
#include "queryLog.h"
#include "tlosertree.h"
#include "tlosertree.h"
#include "tscompression.h"
#define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1);
/**
/**
* check if the primary column is load by default, otherwise, the program will
* check if the primary column is load by default, otherwise, the program will
...
@@ -215,16 +216,16 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -215,16 +216,16 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) {
*/
*/
void
updateNumOfResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
numOfRes
)
{
void
updateNumOfResult
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int32_t
numOfRes
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
&
pRuntimeEnv
->
pCtx
[
j
]);
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
&
pRuntimeEnv
->
pCtx
[
j
]);
int16_t
functionId
=
pRuntimeEnv
->
pCtx
[
j
].
functionId
;
int16_t
functionId
=
pRuntimeEnv
->
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TAGPRJ
||
if
(
functionId
==
TSDB_FUNC_TS
||
functionId
==
TSDB_FUNC_TAG
||
functionId
==
TSDB_FUNC_TAGPRJ
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
continue
;
}
}
assert
(
pResInfo
->
numOfRes
>
numOfRes
);
assert
(
pResInfo
->
numOfRes
>
numOfRes
);
pResInfo
->
numOfRes
=
numOfRes
;
pResInfo
->
numOfRes
=
numOfRes
;
}
}
...
@@ -318,10 +319,10 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.function
...
@@ -318,10 +319,10 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].base.function
static
bool
limitResults
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
static
bool
limitResults
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
((
pQuery
->
limit
.
limit
>
0
)
&&
(
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
>
pQuery
->
limit
.
limit
))
{
if
((
pQuery
->
limit
.
limit
>
0
)
&&
(
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
>
pQuery
->
limit
.
limit
))
{
pQuery
->
rec
.
rows
=
pQuery
->
limit
.
limit
-
pQuery
->
rec
.
total
;
pQuery
->
rec
.
rows
=
pQuery
->
limit
.
limit
-
pQuery
->
rec
.
total
;
qDebug
(
"QInfo:%p discard remain data due to result limitation, limit:%"
PRId64
", current return:%"
PRId64
", total:%"
PRId64
,
qDebug
(
"QInfo:%p discard remain data due to result limitation, limit:%"
PRId64
", current return:%"
PRId64
", total:%"
PRId64
,
pQInfo
,
pQuery
->
limit
.
limit
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
pQInfo
,
pQuery
->
limit
.
limit
,
pQuery
->
rec
.
rows
,
pQuery
->
rec
.
total
+
pQuery
->
rec
.
rows
);
assert
(
pQuery
->
rec
.
rows
>=
0
);
assert
(
pQuery
->
rec
.
rows
>=
0
);
...
@@ -415,6 +416,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
...
@@ -415,6 +416,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
char
*
t
=
realloc
(
pWindowResInfo
->
pResult
,
newCap
*
sizeof
(
SWindowResult
));
char
*
t
=
realloc
(
pWindowResInfo
->
pResult
,
newCap
*
sizeof
(
SWindowResult
));
pRuntimeEnv
->
summary
.
internalSupSize
+=
(
newCap
-
pWindowResInfo
->
capacity
)
*
sizeof
(
SWindowResult
);
pRuntimeEnv
->
summary
.
internalSupSize
+=
(
newCap
-
pWindowResInfo
->
capacity
)
*
sizeof
(
SWindowResult
);
pRuntimeEnv
->
summary
.
numOfTimeWindows
+=
(
newCap
-
pWindowResInfo
->
capacity
);
if
(
t
==
NULL
)
{
if
(
t
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
@@ -450,8 +452,9 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
...
@@ -450,8 +452,9 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
w
.
skey
=
pWindowResInfo
->
prevSKey
;
w
.
skey
=
pWindowResInfo
->
prevSKey
;
w
.
ekey
=
w
.
skey
+
pQuery
->
intervalTime
-
1
;
w
.
ekey
=
w
.
skey
+
pQuery
->
intervalTime
-
1
;
}
else
{
}
else
{
int32_t
slot
=
curTimeWindow
(
pWindowResInfo
);
int32_t
slot
=
curTimeWindowIndex
(
pWindowResInfo
);
w
=
getWindowResult
(
pWindowResInfo
,
slot
)
->
window
;
SWindowResult
*
pWindowRes
=
getWindowResult
(
pWindowResInfo
,
slot
);
w
=
GET_TIMEWINDOW
(
pWindowResInfo
,
pWindowRes
);
}
}
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
if
(
w
.
skey
>
ts
||
w
.
ekey
<
ts
)
{
...
@@ -552,15 +555,15 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
...
@@ -552,15 +555,15 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
}
}
// set time window for current result
// set time window for current result
pWindowRes
->
window
=
*
win
;
pWindowRes
->
skey
=
win
->
skey
;
setWindowResOutputBufInitCtx
(
pRuntimeEnv
,
pWindowRes
);
setWindowResOutputBufInitCtx
(
pRuntimeEnv
,
pWindowRes
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
SWindowStatus
*
getTimeWindowResStatus
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
static
bool
getTimeWindowResStatus
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
assert
(
slot
>=
0
&&
slot
<
pWindowResInfo
->
size
);
assert
(
slot
>=
0
&&
slot
<
pWindowResInfo
->
size
);
return
&
pWindowResInfo
->
pResult
[
slot
].
status
;
return
pWindowResInfo
->
pResult
[
slot
].
closed
;
}
}
static
FORCE_INLINE
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int16_t
pos
,
static
FORCE_INLINE
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int16_t
pos
,
...
@@ -602,7 +605,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
...
@@ -602,7 +605,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
// no qualified results exist, abort check
// no qualified results exist, abort check
int32_t
numOfClosed
=
0
;
int32_t
numOfClosed
=
0
;
if
(
pWindowResInfo
->
size
==
0
)
{
if
(
pWindowResInfo
->
size
==
0
)
{
return
pWindowResInfo
->
size
;
return
pWindowResInfo
->
size
;
}
}
...
@@ -620,16 +623,17 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
...
@@ -620,16 +623,17 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
for
(
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
for
(
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
if
(
pResult
->
status
.
closed
)
{
if
(
pResult
->
closed
)
{
numOfClosed
+=
1
;
numOfClosed
+=
1
;
continue
;
continue
;
}
}
if
((
pResult
->
window
.
ekey
<=
lastKey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
TSKEY
ekey
=
pResult
->
skey
+
pWindowResInfo
->
interval
;
(
pResult
->
window
.
skey
>=
lastKey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
if
((
ekey
<=
lastKey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
pResult
->
skey
>=
lastKey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
closeTimeWindow
(
pWindowResInfo
,
i
);
closeTimeWindow
(
pWindowResInfo
,
i
);
}
else
{
}
else
{
skey
=
pResult
->
window
.
skey
;
skey
=
pResult
->
skey
;
break
;
break
;
}
}
}
}
...
@@ -642,26 +646,26 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
...
@@ -642,26 +646,26 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
pWindowResInfo
->
curIndex
=
i
;
pWindowResInfo
->
curIndex
=
i
;
}
}
pWindowResInfo
->
prevSKey
=
pWindowResInfo
->
pResult
[
pWindowResInfo
->
curIndex
].
window
.
skey
;
pWindowResInfo
->
prevSKey
=
pWindowResInfo
->
pResult
[
pWindowResInfo
->
curIndex
].
skey
;
// the number of completed slots are larger than the threshold, return current generated results to client.
// the number of completed slots are larger than the threshold, return current generated results to client.
if
(
numOfClosed
>
pWindowResInfo
->
threshold
)
{
if
(
numOfClosed
>
pWindowResInfo
->
threshold
)
{
qDebug
(
"QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return"
,
qDebug
(
"QInfo:%p total result window:%d closed:%d, reached the output threshold %d, return"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pWindowResInfo
->
size
,
numOfClosed
,
pQuery
->
rec
.
threshold
);
GET_QINFO_ADDR
(
pRuntimeEnv
),
pWindowResInfo
->
size
,
numOfClosed
,
pQuery
->
rec
.
threshold
);
setQueryStatus
(
pQuery
,
QUERY_RESBUF_FULL
);
setQueryStatus
(
pQuery
,
QUERY_RESBUF_FULL
);
}
else
{
}
else
{
qDebug
(
"QInfo:%p total result window:%d already closed:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pWindowResInfo
->
size
,
qDebug
(
"QInfo:%p total result window:%d already closed:%d"
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pWindowResInfo
->
size
,
numOfClosed
);
numOfClosed
);
}
}
}
}
// output has reached the limitation, set query completed
// output has reached the limitation, set query completed
if
(
pQuery
->
limit
.
limit
>
0
&&
(
pQuery
->
limit
.
limit
+
pQuery
->
limit
.
offset
)
<=
numOfClosed
&&
if
(
pQuery
->
limit
.
limit
>
0
&&
(
pQuery
->
limit
.
limit
+
pQuery
->
limit
.
offset
)
<=
numOfClosed
&&
pRuntimeEnv
->
scanFlag
==
MASTER_SCAN
)
{
pRuntimeEnv
->
scanFlag
==
MASTER_SCAN
)
{
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
}
}
assert
(
pWindowResInfo
->
prevSKey
!=
TSKEY_INITIAL_VAL
);
assert
(
pWindowResInfo
->
prevSKey
!=
TSKEY_INITIAL_VAL
);
return
numOfClosed
;
return
numOfClosed
;
}
}
...
@@ -675,7 +679,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
...
@@ -675,7 +679,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
order
);
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
order
);
STableQueryInfo
*
item
=
pQuery
->
current
;
STableQueryInfo
*
item
=
pQuery
->
current
;
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
if
(
ekey
<
pDataBlockInfo
->
window
.
ekey
)
{
if
(
ekey
<
pDataBlockInfo
->
window
.
ekey
)
{
num
=
getForwardStepsInBlock
(
pDataBlockInfo
->
rows
,
searchFn
,
ekey
,
startPos
,
order
,
pPrimaryColumn
);
num
=
getForwardStepsInBlock
(
pDataBlockInfo
->
rows
,
searchFn
,
ekey
,
startPos
,
order
,
pPrimaryColumn
);
...
@@ -706,12 +710,12 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
...
@@ -706,12 +710,12 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return
num
;
return
num
;
}
}
static
void
doBlockwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowStatus
*
pStatus
,
STimeWindow
*
pWin
,
static
void
doBlockwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
bool
closed
,
STimeWindow
*
pWin
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsBuf
,
int32_t
numOfTotal
)
{
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsBuf
,
int32_t
numOfTotal
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
pStatus
->
closed
)
{
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
closed
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
base
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
base
.
functionId
;
...
@@ -735,12 +739,11 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
...
@@ -735,12 +739,11 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
}
}
}
}
static
void
doRowwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowStatus
*
pStatus
,
STimeWindow
*
pWin
,
static
void
doRowwiseApplyFunctions
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
bool
closed
,
STimeWindow
*
pWin
,
int32_t
offset
)
{
int32_t
offset
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
pStatus
->
closed
)
{
if
(
IS_MASTER_SCAN
(
pRuntimeEnv
)
||
closed
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutput
;
++
k
)
{
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
pCtx
[
k
].
nStartQueryTimestamp
=
pWin
->
skey
;
...
@@ -825,14 +828,14 @@ static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow)
...
@@ -825,14 +828,14 @@ static FORCE_INLINE TSKEY reviseWindowEkey(SQuery *pQuery, STimeWindow *pWindow)
//todo binary search
//todo binary search
static
void
*
getDataBlockImpl
(
SArray
*
pDataBlock
,
int32_t
colId
)
{
static
void
*
getDataBlockImpl
(
SArray
*
pDataBlock
,
int32_t
colId
)
{
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pDataBlock
);
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
,
i
);
SColumnInfoData
*
p
=
taosArrayGet
(
pDataBlock
,
i
);
if
(
colId
==
p
->
info
.
colId
)
{
if
(
colId
==
p
->
info
.
colId
)
{
return
p
->
pData
;
return
p
->
pData
;
}
}
}
}
return
NULL
;
return
NULL
;
}
}
...
@@ -961,7 +964,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
...
@@ -961,7 +964,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
win
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
pQuery
->
pos
,
ekey
,
searchFn
,
true
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
pQuery
->
pos
,
ekey
,
searchFn
,
true
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
bool
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindowIndex
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
startPos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
startPos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
}
}
...
@@ -990,8 +993,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
...
@@ -990,8 +993,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
nextWin
);
TSKEY
ekey
=
reviseWindowEkey
(
pQuery
,
&
nextWin
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
startPos
,
ekey
,
searchFn
,
true
);
forwardStep
=
getNumOfRowsInTimeWindow
(
pQuery
,
pDataBlockInfo
,
tsCols
,
startPos
,
ekey
,
searchFn
,
true
);
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
bool
closed
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindowIndex
(
pWindowResInfo
));
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
nextWin
,
startPos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
doBlockwiseApplyFunctions
(
pRuntimeEnv
,
closed
,
&
nextWin
,
startPos
,
forwardStep
,
tsCols
,
pDataBlockInfo
->
rows
);
}
}
pWindowResInfo
->
curIndex
=
index
;
pWindowResInfo
->
curIndex
=
index
;
...
@@ -1044,8 +1047,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
...
@@ -1044,8 +1047,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
return
-
1
;
return
-
1
;
}
}
pWindowRes
->
window
.
skey
=
v
;
pWindowRes
->
skey
=
v
;
pWindowRes
->
window
.
ekey
=
v
;
assert
(
pRuntimeEnv
->
windowResInfo
.
interval
==
0
)
;
if
(
pWindowRes
->
pos
.
pageId
==
-
1
)
{
if
(
pWindowRes
->
pos
.
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
GROUPRESULTID
,
pRuntimeEnv
->
numOfRowsPerPage
);
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
GROUPRESULTID
,
pRuntimeEnv
->
numOfRowsPerPage
);
...
@@ -1144,7 +1147,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
...
@@ -1144,7 +1147,7 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
if
(
functionId
==
TSDB_FUNC_TS
)
{
if
(
functionId
==
TSDB_FUNC_TS
)
{
return
true
;
return
true
;
}
}
if
(
pResInfo
->
complete
||
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
if
(
pResInfo
->
complete
||
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
return
false
;
return
false
;
}
}
...
@@ -1251,8 +1254,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
...
@@ -1251,8 +1254,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
continue
;
continue
;
}
}
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
bool
closed
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindowIndex
(
pWindowResInfo
));
doRowwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
win
,
offset
);
doRowwiseApplyFunctions
(
pRuntimeEnv
,
closed
,
&
win
,
offset
);
STimeWindow
nextWin
=
win
;
STimeWindow
nextWin
=
win
;
int32_t
index
=
pWindowResInfo
->
curIndex
;
int32_t
index
=
pWindowResInfo
->
curIndex
;
...
@@ -1275,8 +1278,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
...
@@ -1275,8 +1278,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
}
if
(
hasTimeWindow
)
{
if
(
hasTimeWindow
)
{
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindow
(
pWindowResInfo
));
closed
=
getTimeWindowResStatus
(
pWindowResInfo
,
curTimeWindowIndex
(
pWindowResInfo
));
doRowwiseApplyFunctions
(
pRuntimeEnv
,
pStatus
,
&
nextWin
,
offset
);
doRowwiseApplyFunctions
(
pRuntimeEnv
,
closed
,
&
nextWin
,
offset
);
}
}
}
}
...
@@ -1331,10 +1334,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
...
@@ -1331,10 +1334,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
static
int32_t
tableApplyFunctionsOnBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pDataBlockInfo
,
static
int32_t
tableApplyFunctionsOnBlock
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pDataBlockInfo
,
SDataStatis
*
pStatis
,
__block_search_fn_t
searchFn
,
SArray
*
pDataBlock
)
{
SDataStatis
*
pStatis
,
__block_search_fn_t
searchFn
,
SArray
*
pDataBlock
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQInfo
=
pQuery
->
current
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
!=
NULL
||
pRuntimeEnv
->
groupbyNormalCol
)
{
if
(
pQuery
->
numOfFilterCols
>
0
||
pRuntimeEnv
->
pTSBuf
!=
NULL
||
pRuntimeEnv
->
groupbyNormalCol
)
{
rowwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pWindowResInfo
,
pDataBlock
);
rowwiseApplyFunctions
(
pRuntimeEnv
,
pStatis
,
pDataBlockInfo
,
pWindowResInfo
,
pDataBlock
);
}
else
{
}
else
{
...
@@ -1377,10 +1380,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
...
@@ -1377,10 +1380,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
void
setExecParams
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
void
*
inputData
,
TSKEY
*
tsCol
,
SDataBlockInfo
*
pBlockInfo
,
void
setExecParams
(
SQuery
*
pQuery
,
SQLFunctionCtx
*
pCtx
,
void
*
inputData
,
TSKEY
*
tsCol
,
SDataBlockInfo
*
pBlockInfo
,
SDataStatis
*
pStatis
,
void
*
param
,
int32_t
colIndex
)
{
SDataStatis
*
pStatis
,
void
*
param
,
int32_t
colIndex
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
colIndex
].
base
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
colIndex
].
base
.
functionId
;
int32_t
colId
=
pQuery
->
pSelectExpr
[
colIndex
].
base
.
colInfo
.
colId
;
int32_t
colId
=
pQuery
->
pSelectExpr
[
colIndex
].
base
.
colInfo
.
colId
;
SDataStatis
*
tpField
=
NULL
;
SDataStatis
*
tpField
=
NULL
;
pCtx
->
hasNull
=
hasNullValue
(
&
pQuery
->
pSelectExpr
[
colIndex
].
base
.
colInfo
,
pStatis
,
&
tpField
);
pCtx
->
hasNull
=
hasNullValue
(
&
pQuery
->
pSelectExpr
[
colIndex
].
base
.
colInfo
,
pStatis
,
&
tpField
);
pCtx
->
aInputElemBuf
=
inputData
;
pCtx
->
aInputElemBuf
=
inputData
;
...
@@ -1436,7 +1439,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
...
@@ -1436,7 +1439,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pInterpInfo
->
type
=
(
int8_t
)
pQuery
->
fillType
;
pInterpInfo
->
type
=
(
int8_t
)
pQuery
->
fillType
;
pInterpInfo
->
ts
=
pQuery
->
window
.
skey
;
pInterpInfo
->
ts
=
pQuery
->
window
.
skey
;
pInterpInfo
->
primaryCol
=
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
pInterpInfo
->
primaryCol
=
(
colId
==
PRIMARYKEY_TIMESTAMP_COL_INDEX
);
if
(
pQuery
->
fillVal
!=
NULL
)
{
if
(
pQuery
->
fillVal
!=
NULL
)
{
if
(
isNull
((
const
char
*
)
&
pQuery
->
fillVal
[
colIndex
],
pCtx
->
inputType
))
{
if
(
isNull
((
const
char
*
)
&
pQuery
->
fillVal
[
colIndex
],
pCtx
->
inputType
))
{
pCtx
->
param
[
1
].
nType
=
TSDB_DATA_TYPE_NULL
;
pCtx
->
param
[
1
].
nType
=
TSDB_DATA_TYPE_NULL
;
...
@@ -1469,13 +1472,13 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
...
@@ -1469,13 +1472,13 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
if
(
isSelectivityWithTagsQuery
(
pQuery
))
{
if
(
isSelectivityWithTagsQuery
(
pQuery
))
{
int32_t
num
=
0
;
int32_t
num
=
0
;
int16_t
tagLen
=
0
;
int16_t
tagLen
=
0
;
SQLFunctionCtx
*
p
=
NULL
;
SQLFunctionCtx
*
p
=
NULL
;
SQLFunctionCtx
**
pTagCtx
=
calloc
(
pQuery
->
numOfOutput
,
POINTER_BYTES
);
SQLFunctionCtx
**
pTagCtx
=
calloc
(
pQuery
->
numOfOutput
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SSqlFuncMsg
*
pSqlFuncMsg
=
&
pQuery
->
pSelectExpr
[
i
].
base
;
SSqlFuncMsg
*
pSqlFuncMsg
=
&
pQuery
->
pSelectExpr
[
i
].
base
;
if
(
pSqlFuncMsg
->
functionId
==
TSDB_FUNC_TAG_DUMMY
||
pSqlFuncMsg
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
if
(
pSqlFuncMsg
->
functionId
==
TSDB_FUNC_TAG_DUMMY
||
pSqlFuncMsg
->
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
tagLen
+=
pCtx
[
i
].
outputBytes
;
tagLen
+=
pCtx
[
i
].
outputBytes
;
pTagCtx
[
num
++
]
=
&
pCtx
[
i
];
pTagCtx
[
num
++
]
=
&
pCtx
[
i
];
...
@@ -1494,7 +1497,7 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
...
@@ -1494,7 +1497,7 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
p
->
tagInfo
.
numOfTagCols
=
num
;
p
->
tagInfo
.
numOfTagCols
=
num
;
p
->
tagInfo
.
tagsLen
=
tagLen
;
p
->
tagInfo
.
tagsLen
=
tagLen
;
}
else
{
}
else
{
taosTFree
(
pTagCtx
);
taosTFree
(
pTagCtx
);
}
}
}
}
}
}
...
@@ -1547,7 +1550,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
...
@@ -1547,7 +1550,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
pCtx
->
inputBytes
=
pQuery
->
colList
[
index
].
bytes
;
pCtx
->
inputBytes
=
pQuery
->
colList
[
index
].
bytes
;
pCtx
->
inputType
=
pQuery
->
colList
[
index
].
type
;
pCtx
->
inputType
=
pQuery
->
colList
[
index
].
type
;
}
}
assert
(
isValidDataType
(
pCtx
->
inputType
));
assert
(
isValidDataType
(
pCtx
->
inputType
));
pCtx
->
ptsOutputBuf
=
NULL
;
pCtx
->
ptsOutputBuf
=
NULL
;
...
@@ -1809,7 +1812,7 @@ bool colIdCheck(SQuery *pQuery) {
...
@@ -1809,7 +1812,7 @@ bool colIdCheck(SQuery *pQuery) {
return
false
;
return
false
;
}
}
}
}
return
true
;
return
true
;
}
}
...
@@ -1969,7 +1972,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
...
@@ -1969,7 +1972,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
}
}
pRuntimeEnv
->
numOfRowsPerPage
=
((
*
ps
)
-
sizeof
(
tFilePage
))
/
(
*
rowsize
);
pRuntimeEnv
->
numOfRowsPerPage
=
((
*
ps
)
-
sizeof
(
tFilePage
))
/
(
*
rowsize
);
assert
(
pRuntimeEnv
->
numOfRowsPerPage
<=
MAX_ROWS_PER_RESBUF_PAGE
);
}
}
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
...
@@ -2123,21 +2126,21 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
...
@@ -2123,21 +2126,21 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
// return DISK_DATA_LOAD_FAILED;
// return DISK_DATA_LOAD_FAILED;
}
}
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
if
(
*
pStatis
==
NULL
)
{
// data block statistics does not exist, load data block
if
(
*
pStatis
==
NULL
)
{
// data block statistics does not exist, load data block
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
}
}
}
else
{
}
else
{
assert
(
status
==
BLK_DATA_ALL_NEEDED
);
assert
(
status
==
BLK_DATA_ALL_NEEDED
);
// load the data block statistics to perform further filter
// load the data block statistics to perform further filter
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
pRuntimeEnv
->
summary
.
loadBlockStatis
+=
1
;
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tsdbRetrieveDataBlockStatisInfo
(
pQueryHandle
,
pStatis
)
!=
TSDB_CODE_SUCCESS
)
{
}
}
if
(
!
needToLoadDataBlock
(
pRuntimeEnv
,
*
pStatis
,
pRuntimeEnv
->
pCtx
,
pBlockInfo
->
rows
))
{
if
(
!
needToLoadDataBlock
(
pRuntimeEnv
,
*
pStatis
,
pRuntimeEnv
->
pCtx
,
pBlockInfo
->
rows
))
{
// current block has been discard due to filter applied
// current block has been discard due to filter applied
pRuntimeEnv
->
summary
.
discardBlocks
+=
1
;
pRuntimeEnv
->
summary
.
discardBlocks
+=
1
;
...
@@ -2145,7 +2148,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
...
@@ -2145,7 +2148,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
return
BLK_DATA_DISCARD
;
return
BLK_DATA_DISCARD
;
}
}
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
pRuntimeEnv
->
summary
.
totalCheckedRows
+=
pBlockInfo
->
rows
;
pRuntimeEnv
->
summary
.
loadBlocks
+=
1
;
pRuntimeEnv
->
summary
.
loadBlocks
+=
1
;
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
*
pDataBlock
=
tsdbRetrieveDataBlock
(
pQueryHandle
,
NULL
);
...
@@ -2250,11 +2253,11 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
...
@@ -2250,11 +2253,11 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
!
pRuntimeEnv
->
groupbyNormalCol
&&
!
isFixedOutputQuery
(
pRuntimeEnv
)
&&
!
isTSCompQuery
(
pQuery
))
{
if
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
&&
!
pRuntimeEnv
->
groupbyNormalCol
&&
!
isFixedOutputQuery
(
pRuntimeEnv
)
&&
!
isTSCompQuery
(
pQuery
))
{
SResultRec
*
pRec
=
&
pQuery
->
rec
;
SResultRec
*
pRec
=
&
pQuery
->
rec
;
if
(
pQuery
->
rec
.
capacity
-
pQuery
->
rec
.
rows
<
pBlockInfo
->
rows
)
{
if
(
pQuery
->
rec
.
capacity
-
pQuery
->
rec
.
rows
<
pBlockInfo
->
rows
)
{
int32_t
remain
=
(
int32_t
)(
pRec
->
capacity
-
pRec
->
rows
);
int32_t
remain
=
(
int32_t
)(
pRec
->
capacity
-
pRec
->
rows
);
int32_t
newSize
=
(
int32_t
)(
pRec
->
capacity
+
(
pBlockInfo
->
rows
-
remain
));
int32_t
newSize
=
(
int32_t
)(
pRec
->
capacity
+
(
pBlockInfo
->
rows
-
remain
));
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
bytes
=
pQuery
->
pSelectExpr
[
i
].
bytes
;
int32_t
bytes
=
pQuery
->
pSelectExpr
[
i
].
bytes
;
assert
(
bytes
>
0
&&
newSize
>
0
);
assert
(
bytes
>
0
&&
newSize
>
0
);
...
@@ -2278,7 +2281,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
...
@@ -2278,7 +2281,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
qDebug
(
"QInfo:%p realloc output buffer, new size: %d rows, old:%"
PRId64
", remain:%"
PRId64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
qDebug
(
"QInfo:%p realloc output buffer, new size: %d rows, old:%"
PRId64
", remain:%"
PRId64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
newSize
,
pRec
->
capacity
,
newSize
-
pRec
->
rows
);
newSize
,
pRec
->
capacity
,
newSize
-
pRec
->
rows
);
pRec
->
capacity
=
newSize
;
pRec
->
capacity
=
newSize
;
}
}
}
}
...
@@ -2388,7 +2391,7 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa
...
@@ -2388,7 +2391,7 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa
if
(
tagColId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
if
(
tagColId
==
TSDB_TBNAME_COLUMN_INDEX
)
{
char
*
val
=
tsdbGetTableName
(
pTable
);
char
*
val
=
tsdbGetTableName
(
pTable
);
assert
(
val
!=
NULL
);
assert
(
val
!=
NULL
);
tVariantCreateFromBinary
(
tag
,
varDataVal
(
val
),
varDataLen
(
val
),
TSDB_DATA_TYPE_BINARY
);
tVariantCreateFromBinary
(
tag
,
varDataVal
(
val
),
varDataLen
(
val
),
TSDB_DATA_TYPE_BINARY
);
}
else
{
}
else
{
char
*
val
=
tsdbGetTableTagVal
(
pTable
,
tagColId
,
type
,
bytes
);
char
*
val
=
tsdbGetTableTagVal
(
pTable
,
tagColId
,
type
,
bytes
);
...
@@ -2396,7 +2399,7 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa
...
@@ -2396,7 +2399,7 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa
tag
->
nType
=
TSDB_DATA_TYPE_NULL
;
tag
->
nType
=
TSDB_DATA_TYPE_NULL
;
return
;
return
;
}
}
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
isNull
(
val
,
type
))
{
if
(
isNull
(
val
,
type
))
{
tag
->
nType
=
TSDB_DATA_TYPE_NULL
;
tag
->
nType
=
TSDB_DATA_TYPE_NULL
;
...
@@ -2443,7 +2446,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
...
@@ -2443,7 +2446,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
// set tag value, by which the results are aggregated.
// set tag value, by which the results are aggregated.
for
(
int32_t
idx
=
0
;
idx
<
pQuery
->
numOfOutput
;
++
idx
)
{
for
(
int32_t
idx
=
0
;
idx
<
pQuery
->
numOfOutput
;
++
idx
)
{
SExprInfo
*
pLocalExprInfo
=
&
pQuery
->
pSelectExpr
[
idx
];
SExprInfo
*
pLocalExprInfo
=
&
pQuery
->
pSelectExpr
[
idx
];
// ts_comp column required the tag value for join filter
// ts_comp column required the tag value for join filter
if
(
!
TSDB_COL_IS_TAG
(
pLocalExprInfo
->
base
.
colInfo
.
flag
))
{
if
(
!
TSDB_COL_IS_TAG
(
pLocalExprInfo
->
base
.
colInfo
.
flag
))
{
continue
;
continue
;
...
@@ -2493,14 +2496,14 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
...
@@ -2493,14 +2496,14 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
// in case of tag column, the tag information should be extracted from input buffer
// in case of tag column, the tag information should be extracted from input buffer
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TAG
)
{
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TAG
)
{
tVariantDestroy
(
&
pCtx
[
i
].
tag
);
tVariantDestroy
(
&
pCtx
[
i
].
tag
);
int32_t
type
=
pCtx
[
i
].
outputType
;
int32_t
type
=
pCtx
[
i
].
outputType
;
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
if
(
type
==
TSDB_DATA_TYPE_BINARY
||
type
==
TSDB_DATA_TYPE_NCHAR
)
{
tVariantCreateFromBinary
(
&
pCtx
[
i
].
tag
,
varDataVal
(
pCtx
[
i
].
aInputElemBuf
),
varDataLen
(
pCtx
[
i
].
aInputElemBuf
),
type
);
tVariantCreateFromBinary
(
&
pCtx
[
i
].
tag
,
varDataVal
(
pCtx
[
i
].
aInputElemBuf
),
varDataLen
(
pCtx
[
i
].
aInputElemBuf
),
type
);
}
else
{
}
else
{
tVariantCreateFromBinary
(
&
pCtx
[
i
].
tag
,
pCtx
[
i
].
aInputElemBuf
,
pCtx
[
i
].
inputBytes
,
pCtx
[
i
].
inputType
);
tVariantCreateFromBinary
(
&
pCtx
[
i
].
tag
,
pCtx
[
i
].
aInputElemBuf
,
pCtx
[
i
].
inputBytes
,
pCtx
[
i
].
inputType
);
}
}
}
}
}
}
...
@@ -2590,7 +2593,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
...
@@ -2590,7 +2593,7 @@ void UNUSED_FUNC displayInterResult(tFilePage **pdata, SQueryRuntimeEnv* pRuntim
for
(
int32_t
j
=
0
;
j
<
numOfRows
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
numOfRows
;
++
j
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
switch
(
pQuery
->
pSelectExpr
[
i
].
type
)
{
switch
(
pQuery
->
pSelectExpr
[
i
].
type
)
{
case
TSDB_DATA_TYPE_BINARY
:
{
case
TSDB_DATA_TYPE_BINARY
:
{
int32_t
type
=
pQuery
->
pSelectExpr
[
i
].
type
;
int32_t
type
=
pQuery
->
pSelectExpr
[
i
].
type
;
...
@@ -2839,7 +2842,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
...
@@ -2839,7 +2842,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
char
*
b
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes
,
page
);
char
*
b
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes
,
page
);
TSKEY
ts
=
GET_INT64_VAL
(
b
);
TSKEY
ts
=
GET_INT64_VAL
(
b
);
assert
(
ts
==
pWindowRes
->
window
.
skey
);
assert
(
ts
==
pWindowRes
->
skey
);
int64_t
num
=
getNumOfResultWindowRes
(
pQuery
,
pWindowRes
);
int64_t
num
=
getNumOfResultWindowRes
(
pQuery
,
pWindowRes
);
if
(
num
<=
0
)
{
if
(
num
<=
0
)
{
cs
.
position
[
pos
]
+=
1
;
cs
.
position
[
pos
]
+=
1
;
...
@@ -2979,10 +2982,10 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
...
@@ -2979,10 +2982,10 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
if
(
pTableQueryInfo
==
NULL
)
{
if
(
pTableQueryInfo
==
NULL
)
{
return
;
return
;
}
}
// order has changed already
// order has changed already
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
// TODO validate the assertion
// TODO validate the assertion
// if (!QUERY_IS_ASC_QUERY(pQuery)) {
// if (!QUERY_IS_ASC_QUERY(pQuery)) {
// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
...
@@ -2998,7 +3001,7 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
...
@@ -2998,7 +3001,7 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
SWAP
(
pTableQueryInfo
->
win
.
skey
,
pTableQueryInfo
->
win
.
ekey
,
TSKEY
);
SWAP
(
pTableQueryInfo
->
win
.
skey
,
pTableQueryInfo
->
win
.
ekey
,
TSKEY
);
pTableQueryInfo
->
lastKey
=
pTableQueryInfo
->
win
.
skey
;
pTableQueryInfo
->
lastKey
=
pTableQueryInfo
->
win
.
skey
;
SWITCH_ORDER
(
pTableQueryInfo
->
cur
.
order
);
SWITCH_ORDER
(
pTableQueryInfo
->
cur
.
order
);
pTableQueryInfo
->
cur
.
vgroupIndex
=
-
1
;
pTableQueryInfo
->
cur
.
vgroupIndex
=
-
1
;
...
@@ -3008,10 +3011,10 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
...
@@ -3008,10 +3011,10 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
static
void
disableFuncInReverseScanImpl
(
SQInfo
*
pQInfo
,
SWindowResInfo
*
pWindowResInfo
,
int32_t
order
)
{
static
void
disableFuncInReverseScanImpl
(
SQInfo
*
pQInfo
,
SWindowResInfo
*
pWindowResInfo
,
int32_t
order
)
{
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
getTimeWindowResStatus
(
pWindowResInfo
,
i
);
bool
closed
=
getTimeWindowResStatus
(
pWindowResInfo
,
i
);
if
(
!
pStatus
->
closed
)
{
if
(
!
closed
)
{
continue
;
continue
;
}
}
...
@@ -3185,7 +3188,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -3185,7 +3188,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
if
(
pQuery
->
rec
.
rows
<=
pQuery
->
limit
.
offset
)
{
if
(
pQuery
->
rec
.
rows
<=
pQuery
->
limit
.
offset
)
{
qDebug
(
"QInfo:%p skip rows:%"
PRId64
", new offset:%"
PRIu64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pQuery
->
rec
.
rows
,
qDebug
(
"QInfo:%p skip rows:%"
PRId64
", new offset:%"
PRIu64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
pQuery
->
rec
.
rows
,
pQuery
->
limit
.
offset
-
pQuery
->
rec
.
rows
);
pQuery
->
limit
.
offset
-
pQuery
->
rec
.
rows
);
pQuery
->
limit
.
offset
-=
pQuery
->
rec
.
rows
;
pQuery
->
limit
.
offset
-=
pQuery
->
rec
.
rows
;
pQuery
->
rec
.
rows
=
0
;
pQuery
->
rec
.
rows
=
0
;
...
@@ -3197,14 +3200,14 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -3197,14 +3200,14 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
int64_t
numOfSkip
=
pQuery
->
limit
.
offset
;
int64_t
numOfSkip
=
pQuery
->
limit
.
offset
;
pQuery
->
rec
.
rows
-=
numOfSkip
;
pQuery
->
rec
.
rows
-=
numOfSkip
;
pQuery
->
limit
.
offset
=
0
;
pQuery
->
limit
.
offset
=
0
;
qDebug
(
"QInfo:%p skip row:%"
PRId64
", new offset:%d, numOfRows remain:%"
PRIu64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
numOfSkip
,
qDebug
(
"QInfo:%p skip row:%"
PRId64
", new offset:%d, numOfRows remain:%"
PRIu64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
numOfSkip
,
0
,
pQuery
->
rec
.
rows
);
0
,
pQuery
->
rec
.
rows
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
int32_t
bytes
=
pRuntimeEnv
->
pCtx
[
i
].
outputBytes
;
int32_t
bytes
=
pRuntimeEnv
->
pCtx
[
i
].
outputBytes
;
memmove
(
pQuery
->
sdata
[
i
]
->
data
,
(
char
*
)
pQuery
->
sdata
[
i
]
->
data
+
bytes
*
numOfSkip
,
pQuery
->
rec
.
rows
*
bytes
);
memmove
(
pQuery
->
sdata
[
i
]
->
data
,
(
char
*
)
pQuery
->
sdata
[
i
]
->
data
+
bytes
*
numOfSkip
,
pQuery
->
rec
.
rows
*
bytes
);
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
=
((
char
*
)
pQuery
->
sdata
[
i
]
->
data
)
+
pQuery
->
rec
.
rows
*
bytes
;
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
=
((
char
*
)
pQuery
->
sdata
[
i
]
->
data
)
+
pQuery
->
rec
.
rows
*
bytes
;
...
@@ -3237,7 +3240,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -3237,7 +3240,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SWindowResult
*
pResult
=
getWindowResult
(
pWindowResInfo
,
i
);
SWindowResult
*
pResult
=
getWindowResult
(
pWindowResInfo
,
i
);
if
(
!
pResult
->
status
.
closed
)
{
if
(
!
pResult
->
closed
)
{
continue
;
continue
;
}
}
...
@@ -3275,10 +3278,10 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -3275,10 +3278,10 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
static
SQueryStatusInfo
getQueryStatusInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
TSKEY
start
)
{
static
SQueryStatusInfo
getQueryStatusInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
TSKEY
start
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
assert
((
start
<=
pTableQueryInfo
->
lastKey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
assert
((
start
<=
pTableQueryInfo
->
lastKey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
start
>=
pTableQueryInfo
->
lastKey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)));
(
start
>=
pTableQueryInfo
->
lastKey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)));
SQueryStatusInfo
info
=
{
SQueryStatusInfo
info
=
{
.
status
=
pQuery
->
status
,
.
status
=
pQuery
->
status
,
.
windowIndex
=
pRuntimeEnv
->
windowResInfo
.
curIndex
,
.
windowIndex
=
pRuntimeEnv
->
windowResInfo
.
curIndex
,
...
@@ -3359,7 +3362,7 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
...
@@ -3359,7 +3362,7 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan
pTableQueryInfo
->
lastKey
=
pStatus
->
lastKey
;
pTableQueryInfo
->
lastKey
=
pStatus
->
lastKey
;
pQuery
->
status
=
pStatus
->
status
;
pQuery
->
status
=
pStatus
->
status
;
pTableQueryInfo
->
win
=
pStatus
->
w
;
pTableQueryInfo
->
win
=
pStatus
->
w
;
pQuery
->
window
=
pTableQueryInfo
->
win
;
pQuery
->
window
=
pTableQueryInfo
->
win
;
}
}
...
@@ -3375,7 +3378,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
...
@@ -3375,7 +3378,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQInfo
*
pQInfo
=
(
SQInfo
*
)
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
// store the start query position
// store the start query position
...
@@ -3428,7 +3431,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
...
@@ -3428,7 +3431,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
pRuntimeEnv
->
windowResInfo
.
curIndex
=
qstatus
.
windowIndex
;
pRuntimeEnv
->
windowResInfo
.
curIndex
=
qstatus
.
windowIndex
;
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
setQueryStatus
(
pQuery
,
QUERY_NOT_COMPLETED
);
pRuntimeEnv
->
scanFlag
=
REPEAT_SCAN
;
pRuntimeEnv
->
scanFlag
=
REPEAT_SCAN
;
qDebug
(
"QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
pQInfo
,
qDebug
(
"QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
pQInfo
,
cond
.
twindow
.
skey
,
cond
.
twindow
.
ekey
);
cond
.
twindow
.
skey
,
cond
.
twindow
.
ekey
);
...
@@ -3678,7 +3681,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
...
@@ -3678,7 +3681,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
if
(
pTableQueryInfo
->
queryRangeSet
)
{
if
(
pTableQueryInfo
->
queryRangeSet
)
{
pTableQueryInfo
->
lastKey
=
key
;
pTableQueryInfo
->
lastKey
=
key
;
}
else
{
}
else
{
...
@@ -3709,7 +3712,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
...
@@ -3709,7 +3712,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
if
(
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
if
(
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
assert
(
win
.
ekey
==
pQuery
->
window
.
ekey
);
assert
(
win
.
ekey
==
pQuery
->
window
.
ekey
);
}
}
pWindowResInfo
->
prevSKey
=
w
.
skey
;
pWindowResInfo
->
prevSKey
=
w
.
skey
;
}
}
...
@@ -3735,7 +3738,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
...
@@ -3735,7 +3738,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
*/
*/
STimeWindow
*
w
=
&
pDataBlockInfo
->
window
;
STimeWindow
*
w
=
&
pDataBlockInfo
->
window
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
bool
loadPrimaryTS
=
(
pTableQueryInfo
->
lastKey
>=
w
->
skey
&&
pTableQueryInfo
->
lastKey
<=
w
->
ekey
)
||
bool
loadPrimaryTS
=
(
pTableQueryInfo
->
lastKey
>=
w
->
skey
&&
pTableQueryInfo
->
lastKey
<=
w
->
ekey
)
||
(
pQuery
->
window
.
ekey
>=
w
->
skey
&&
pQuery
->
window
.
ekey
<=
w
->
ekey
)
||
requireTimestamp
(
pQuery
);
(
pQuery
->
window
.
ekey
>=
w
->
skey
&&
pQuery
->
window
.
ekey
<=
w
->
ekey
)
||
requireTimestamp
(
pQuery
);
...
@@ -3856,7 +3859,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
...
@@ -3856,7 +3859,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
SArray
*
pDataBlock
,
__block_search_fn_t
searchFn
)
{
SArray
*
pDataBlock
,
__block_search_fn_t
searchFn
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
SWindowResInfo
*
pWindowResInfo
=
&
pTableQueryInfo
->
windowResInfo
;
SWindowResInfo
*
pWindowResInfo
=
&
pTableQueryInfo
->
windowResInfo
;
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
pDataBlockInfo
->
rows
-
1
;
pQuery
->
pos
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
0
:
pDataBlockInfo
->
rows
-
1
;
...
@@ -3948,10 +3951,10 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
...
@@ -3948,10 +3951,10 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQInfo
*
pQInfo
=
GET_QINFO_ADDR
(
pRuntimeEnv
);
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SFillInfo
*
pFillInfo
=
pRuntimeEnv
->
pFillInfo
;
SFillInfo
*
pFillInfo
=
pRuntimeEnv
->
pFillInfo
;
while
(
1
)
{
while
(
1
)
{
int32_t
ret
=
(
int32_t
)
taosGenerateDataBlock
(
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
,
(
int32_t
)
pQuery
->
rec
.
capacity
);
int32_t
ret
=
(
int32_t
)
taosGenerateDataBlock
(
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
,
(
int32_t
)
pQuery
->
rec
.
capacity
);
// todo apply limit output function
// todo apply limit output function
/* reached the start position of according to offset value, return immediately */
/* reached the start position of according to offset value, return immediately */
if
(
pQuery
->
limit
.
offset
==
0
)
{
if
(
pQuery
->
limit
.
offset
==
0
)
{
...
@@ -3962,7 +3965,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
...
@@ -3962,7 +3965,7 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
if
(
pQuery
->
limit
.
offset
<
ret
)
{
if
(
pQuery
->
limit
.
offset
<
ret
)
{
qDebug
(
"QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%"
PRId64
". Discard due to offset, remain:%"
PRId64
", new offset:%d"
,
qDebug
(
"QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%"
PRId64
". Discard due to offset, remain:%"
PRId64
", new offset:%d"
,
pQInfo
,
pFillInfo
->
numOfRows
,
ret
,
pQuery
->
limit
.
offset
,
ret
-
pQuery
->
limit
.
offset
,
0
);
pQInfo
,
pFillInfo
->
numOfRows
,
ret
,
pQuery
->
limit
.
offset
,
ret
-
pQuery
->
limit
.
offset
,
0
);
ret
-=
(
int32_t
)
pQuery
->
limit
.
offset
;
ret
-=
(
int32_t
)
pQuery
->
limit
.
offset
;
// todo !!!!there exactly number of interpo is not valid.
// todo !!!!there exactly number of interpo is not valid.
// todo refactor move to the beginning of buffer
// todo refactor move to the beginning of buffer
...
@@ -3970,14 +3973,14 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
...
@@ -3970,14 +3973,14 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
memmove
(
pDst
[
i
]
->
data
,
pDst
[
i
]
->
data
+
pQuery
->
pSelectExpr
[
i
].
bytes
*
pQuery
->
limit
.
offset
,
memmove
(
pDst
[
i
]
->
data
,
pDst
[
i
]
->
data
+
pQuery
->
pSelectExpr
[
i
].
bytes
*
pQuery
->
limit
.
offset
,
ret
*
pQuery
->
pSelectExpr
[
i
].
bytes
);
ret
*
pQuery
->
pSelectExpr
[
i
].
bytes
);
}
}
pQuery
->
limit
.
offset
=
0
;
pQuery
->
limit
.
offset
=
0
;
return
ret
;
return
ret
;
}
else
{
}
else
{
qDebug
(
"QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%"
PRId64
". Discard due to offset, "
qDebug
(
"QInfo:%p initial numOfRows:%d, generate filled result:%d rows, offset:%"
PRId64
". Discard due to offset, "
"remain:%d, new offset:%"
PRId64
,
pQInfo
,
pFillInfo
->
numOfRows
,
ret
,
pQuery
->
limit
.
offset
,
0
,
"remain:%d, new offset:%"
PRId64
,
pQInfo
,
pFillInfo
->
numOfRows
,
ret
,
pQuery
->
limit
.
offset
,
0
,
pQuery
->
limit
.
offset
-
ret
);
pQuery
->
limit
.
offset
-
ret
);
pQuery
->
limit
.
offset
-=
ret
;
pQuery
->
limit
.
offset
-=
ret
;
pQuery
->
rec
.
rows
=
0
;
pQuery
->
rec
.
rows
=
0
;
ret
=
0
;
ret
=
0
;
...
@@ -3998,13 +4001,14 @@ static void queryCostStatis(SQInfo *pQInfo) {
...
@@ -3998,13 +4001,14 @@ static void queryCostStatis(SQInfo *pQInfo) {
pQInfo
,
pSummary
->
elapsedTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pQInfo
,
pSummary
->
elapsedTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
qDebug
(
"QInfo:%p :cost summary: internal size:%"
PRId64
,
pQInfo
,
pSummary
->
internalSupSize
);
qDebug
(
"QInfo:%p :cost summary: internal size:%"
PRId64
", numOfWin:%"
PRId64
,
pQInfo
,
pSummary
->
internalSupSize
,
pSummary
->
numOfTimeWindows
);
}
}
static
void
updateOffsetVal
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pBlockInfo
)
{
static
void
updateOffsetVal
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SDataBlockInfo
*
pBlockInfo
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
if
(
pQuery
->
limit
.
offset
==
pBlockInfo
->
rows
)
{
// current block will ignore completed
if
(
pQuery
->
limit
.
offset
==
pBlockInfo
->
rows
)
{
// current block will ignore completed
...
@@ -4094,7 +4098,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
...
@@ -4094,7 +4098,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
assert
(
pRuntimeEnv
->
windowResInfo
.
prevSKey
==
TSKEY_INITIAL_VAL
);
assert
(
pRuntimeEnv
->
windowResInfo
.
prevSKey
==
TSKEY_INITIAL_VAL
);
STimeWindow
w
=
TSWINDOW_INITIALIZER
;
STimeWindow
w
=
TSWINDOW_INITIALIZER
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
STableQueryInfo
*
pTableQueryInfo
=
pQuery
->
current
;
...
@@ -4143,21 +4147,21 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
...
@@ -4143,21 +4147,21 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
// set the abort info
// set the abort info
pQuery
->
pos
=
startPos
;
pQuery
->
pos
=
startPos
;
// reset the query start timestamp
// reset the query start timestamp
pTableQueryInfo
->
win
.
skey
=
((
TSKEY
*
)
pColInfoData
->
pData
)[
startPos
];
pTableQueryInfo
->
win
.
skey
=
((
TSKEY
*
)
pColInfoData
->
pData
)[
startPos
];
pQuery
->
window
.
skey
=
pTableQueryInfo
->
win
.
skey
;
pQuery
->
window
.
skey
=
pTableQueryInfo
->
win
.
skey
;
*
start
=
pTableQueryInfo
->
win
.
skey
;
*
start
=
pTableQueryInfo
->
win
.
skey
;
pWindowResInfo
->
prevSKey
=
tw
.
skey
;
pWindowResInfo
->
prevSKey
=
tw
.
skey
;
int32_t
index
=
pRuntimeEnv
->
windowResInfo
.
curIndex
;
int32_t
index
=
pRuntimeEnv
->
windowResInfo
.
curIndex
;
int32_t
numOfRes
=
tableApplyFunctionsOnBlock
(
pRuntimeEnv
,
&
blockInfo
,
NULL
,
binarySearchForKey
,
pDataBlock
);
int32_t
numOfRes
=
tableApplyFunctionsOnBlock
(
pRuntimeEnv
,
&
blockInfo
,
NULL
,
binarySearchForKey
,
pDataBlock
);
pRuntimeEnv
->
windowResInfo
.
curIndex
=
index
;
// restore the window index
pRuntimeEnv
->
windowResInfo
.
curIndex
=
index
;
// restore the window index
qDebug
(
"QInfo:%p check data block, brange:%"
PRId64
"-%"
PRId64
", numOfRows:%d, numOfRes:%d, lastKey:%"
PRId64
,
qDebug
(
"QInfo:%p check data block, brange:%"
PRId64
"-%"
PRId64
", numOfRows:%d, numOfRes:%d, lastKey:%"
PRId64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
blockInfo
.
window
.
skey
,
blockInfo
.
window
.
ekey
,
blockInfo
.
rows
,
numOfRes
,
pQuery
->
current
->
lastKey
);
GET_QINFO_ADDR
(
pRuntimeEnv
),
blockInfo
.
window
.
skey
,
blockInfo
.
window
.
ekey
,
blockInfo
.
rows
,
numOfRes
,
pQuery
->
current
->
lastKey
);
return
true
;
return
true
;
}
else
{
// do nothing
}
else
{
// do nothing
*
start
=
tw
.
skey
;
*
start
=
tw
.
skey
;
...
@@ -4225,7 +4229,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
...
@@ -4225,7 +4229,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
if
(
!
isSTableQuery
if
(
!
isSTableQuery
&&
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
)
&&
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
)
&&
(
cond
.
order
==
TSDB_ORDER_ASC
)
&&
(
cond
.
order
==
TSDB_ORDER_ASC
)
&&
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
!
QUERY_IS_INTERVAL_QUERY
(
pQuery
))
&&
(
!
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
&&
(
!
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
&&
(
!
isFixedOutputQuery
(
pRuntimeEnv
))
&&
(
!
isFixedOutputQuery
(
pRuntimeEnv
))
...
@@ -4250,21 +4254,21 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
...
@@ -4250,21 +4254,21 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
static
SFillColInfo
*
taosCreateFillColInfo
(
SQuery
*
pQuery
)
{
static
SFillColInfo
*
taosCreateFillColInfo
(
SQuery
*
pQuery
)
{
int32_t
numOfCols
=
pQuery
->
numOfOutput
;
int32_t
numOfCols
=
pQuery
->
numOfOutput
;
int32_t
offset
=
0
;
int32_t
offset
=
0
;
SFillColInfo
*
pFillCol
=
calloc
(
numOfCols
,
sizeof
(
SFillColInfo
));
SFillColInfo
*
pFillCol
=
calloc
(
numOfCols
,
sizeof
(
SFillColInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pQuery
->
pSelectExpr
[
i
];
SExprInfo
*
pExprInfo
=
&
pQuery
->
pSelectExpr
[
i
];
pFillCol
[
i
].
col
.
bytes
=
pExprInfo
->
bytes
;
pFillCol
[
i
].
col
.
bytes
=
pExprInfo
->
bytes
;
pFillCol
[
i
].
col
.
type
=
(
int8_t
)
pExprInfo
->
type
;
pFillCol
[
i
].
col
.
type
=
(
int8_t
)
pExprInfo
->
type
;
pFillCol
[
i
].
col
.
offset
=
offset
;
pFillCol
[
i
].
col
.
offset
=
offset
;
pFillCol
[
i
].
flag
=
TSDB_COL_NORMAL
;
// always be ta normal column for table query
pFillCol
[
i
].
flag
=
TSDB_COL_NORMAL
;
// always be ta normal column for table query
pFillCol
[
i
].
functionId
=
pExprInfo
->
base
.
functionId
;
pFillCol
[
i
].
functionId
=
pExprInfo
->
base
.
functionId
;
pFillCol
[
i
].
fillVal
.
i
=
pQuery
->
fillVal
[
i
];
pFillCol
[
i
].
fillVal
.
i
=
pQuery
->
fillVal
[
i
];
offset
+=
pExprInfo
->
bytes
;
offset
+=
pExprInfo
->
bytes
;
}
}
return
pFillCol
;
return
pFillCol
;
}
}
...
@@ -4285,7 +4289,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
...
@@ -4285,7 +4289,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
vgId
=
vgId
;
pQInfo
->
vgId
=
vgId
;
...
@@ -4558,7 +4562,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
...
@@ -4558,7 +4562,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
SArray
*
g1
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
g1
=
taosArrayInit
(
1
,
POINTER_BYTES
);
SArray
*
tx
=
taosArrayClone
(
group
);
SArray
*
tx
=
taosArrayClone
(
group
);
taosArrayPush
(
g1
,
&
tx
);
taosArrayPush
(
g1
,
&
tx
);
STableGroupInfo
gp
=
{.
numOfTables
=
taosArrayGetSize
(
tx
),
.
pGroupList
=
g1
};
STableGroupInfo
gp
=
{.
numOfTables
=
taosArrayGetSize
(
tx
),
.
pGroupList
=
g1
};
// include only current table
// include only current table
...
@@ -4566,7 +4570,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
...
@@ -4566,7 +4570,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pQueryHandle
);
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
pQueryHandle
);
pRuntimeEnv
->
pQueryHandle
=
NULL
;
pRuntimeEnv
->
pQueryHandle
=
NULL
;
}
}
if
(
isFirstLastRowQuery
(
pQuery
))
{
if
(
isFirstLastRowQuery
(
pQuery
))
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryLastRow
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
,
pQInfo
);
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryLastRow
(
pQInfo
->
tsdb
,
&
cond
,
&
gp
,
pQInfo
);
}
else
{
}
else
{
...
@@ -4580,10 +4584,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
...
@@ -4580,10 +4584,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
initCtxOutputBuf
(
pRuntimeEnv
);
initCtxOutputBuf
(
pRuntimeEnv
);
SArray
*
s
=
tsdbGetQueriedTableList
(
pRuntimeEnv
->
pQueryHandle
);
SArray
*
s
=
tsdbGetQueriedTableList
(
pRuntimeEnv
->
pQueryHandle
);
assert
(
taosArrayGetSize
(
s
)
>=
1
);
assert
(
taosArrayGetSize
(
s
)
>=
1
);
setTagVal
(
pRuntimeEnv
,
taosArrayGetP
(
s
,
0
),
pQInfo
->
tsdb
);
setTagVal
(
pRuntimeEnv
,
taosArrayGetP
(
s
,
0
),
pQInfo
->
tsdb
);
if
(
isFirstLastRowQuery
(
pQuery
))
{
if
(
isFirstLastRowQuery
(
pQuery
))
{
assert
(
taosArrayGetSize
(
s
)
==
1
);
assert
(
taosArrayGetSize
(
s
)
==
1
);
...
@@ -4596,13 +4600,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
...
@@ -4596,13 +4600,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
pQuery
->
current
=
taosArrayGetP
(
first
,
0
);
pQuery
->
current
=
taosArrayGetP
(
first
,
0
);
scanOneTableDataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
scanOneTableDataBlocks
(
pRuntimeEnv
,
pQuery
->
current
->
lastKey
);
int64_t
numOfRes
=
getNumOfResult
(
pRuntimeEnv
);
int64_t
numOfRes
=
getNumOfResult
(
pRuntimeEnv
);
if
(
numOfRes
>
0
)
{
if
(
numOfRes
>
0
)
{
pQuery
->
rec
.
rows
+=
numOfRes
;
pQuery
->
rec
.
rows
+=
numOfRes
;
forwardCtxOutputBuf
(
pRuntimeEnv
,
numOfRes
);
forwardCtxOutputBuf
(
pRuntimeEnv
,
numOfRes
);
}
}
skipResults
(
pRuntimeEnv
);
skipResults
(
pRuntimeEnv
);
pQInfo
->
groupIndex
+=
1
;
pQInfo
->
groupIndex
+=
1
;
...
@@ -4661,14 +4665,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
...
@@ -4661,14 +4665,13 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
SWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
windowResInfo
;
// no results generated for current group, continue to try the next group
// no results generated for current group, continue to try the next group
taosArrayDestroy
(
s
);
taosArrayDestroy
(
s
);
if
(
pWindowResInfo
->
size
<=
0
)
{
if
(
pWindowResInfo
->
size
<=
0
)
{
continue
;
continue
;
}
}
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
pWindowResInfo
->
pResult
[
i
].
status
;
pWindowResInfo
->
pResult
[
i
].
closed
=
true
;
// enable return all results for group by normal columns
pStatus
->
closed
=
true
;
// enable return all results for group by normal columns
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
...
@@ -4822,11 +4825,11 @@ static void doSaveContext(SQInfo *pQInfo) {
...
@@ -4822,11 +4825,11 @@ static void doSaveContext(SQInfo *pQInfo) {
SET_REVERSE_SCAN_FLAG
(
pRuntimeEnv
);
SET_REVERSE_SCAN_FLAG
(
pRuntimeEnv
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
SWITCH_ORDER
(
pQuery
->
order
.
order
);
SWITCH_ORDER
(
pQuery
->
order
.
order
);
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
if
(
pRuntimeEnv
->
pTSBuf
!=
NULL
)
{
pRuntimeEnv
->
pTSBuf
->
cur
.
order
=
pQuery
->
order
.
order
;
pRuntimeEnv
->
pTSBuf
->
cur
.
order
=
pQuery
->
order
.
order
;
}
}
STsdbQueryCond
cond
=
{
STsdbQueryCond
cond
=
{
.
order
=
pQuery
->
order
.
order
,
.
order
=
pQuery
->
order
.
order
,
.
colList
=
pQuery
->
colList
,
.
colList
=
pQuery
->
colList
,
...
@@ -4968,14 +4971,14 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
...
@@ -4968,14 +4971,14 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
*/
*/
static
void
tableFixedOutputProcess
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableInfo
)
{
static
void
tableFixedOutputProcess
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
if
(
!
pRuntimeEnv
->
topBotQuery
&&
pQuery
->
limit
.
offset
>
0
)
{
// no need to execute, since the output will be ignore.
if
(
!
pRuntimeEnv
->
topBotQuery
&&
pQuery
->
limit
.
offset
>
0
)
{
// no need to execute, since the output will be ignore.
return
;
return
;
}
}
pQuery
->
current
=
pTableInfo
;
// set current query table info
pQuery
->
current
=
pTableInfo
;
// set current query table info
scanOneTableDataBlocks
(
pRuntimeEnv
,
pTableInfo
->
lastKey
);
scanOneTableDataBlocks
(
pRuntimeEnv
,
pTableInfo
->
lastKey
);
finalizeQueryResult
(
pRuntimeEnv
);
finalizeQueryResult
(
pRuntimeEnv
);
...
@@ -4993,10 +4996,10 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
...
@@ -4993,10 +4996,10 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
static
void
tableMultiOutputProcess
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableInfo
)
{
static
void
tableMultiOutputProcess
(
SQInfo
*
pQInfo
,
STableQueryInfo
*
pTableInfo
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
pQuery
->
current
=
pTableInfo
;
pQuery
->
current
=
pTableInfo
;
// for ts_comp query, re-initialized is not allowed
// for ts_comp query, re-initialized is not allowed
if
(
!
isTSCompQuery
(
pQuery
))
{
if
(
!
isTSCompQuery
(
pQuery
))
{
resetCtxOutputBuf
(
pRuntimeEnv
);
resetCtxOutputBuf
(
pRuntimeEnv
);
...
@@ -5087,7 +5090,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
...
@@ -5087,7 +5090,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
int32_t
numOfFilled
=
0
;
int32_t
numOfFilled
=
0
;
TSKEY
newStartKey
=
TSKEY_INITIAL_VAL
;
TSKEY
newStartKey
=
TSKEY_INITIAL_VAL
;
// skip blocks without load the actual data block from file if no filter condition present
// skip blocks without load the actual data block from file if no filter condition present
skipTimeInterval
(
pRuntimeEnv
,
&
newStartKey
);
skipTimeInterval
(
pRuntimeEnv
,
&
newStartKey
);
if
(
pQuery
->
limit
.
offset
>
0
&&
pQuery
->
numOfFilterCols
==
0
&&
pRuntimeEnv
->
pFillInfo
==
NULL
)
{
if
(
pQuery
->
limit
.
offset
>
0
&&
pQuery
->
numOfFilterCols
==
0
&&
pRuntimeEnv
->
pFillInfo
==
NULL
)
{
...
@@ -5114,7 +5117,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
...
@@ -5114,7 +5117,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
(
int32_t
)
pQuery
->
rec
.
rows
,
pQuery
->
window
.
ekey
);
taosFillSetStartInfo
(
pRuntimeEnv
->
pFillInfo
,
(
int32_t
)
pQuery
->
rec
.
rows
,
pQuery
->
window
.
ekey
);
taosFillCopyInputDataFromFilePage
(
pRuntimeEnv
->
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
);
taosFillCopyInputDataFromFilePage
(
pRuntimeEnv
->
pFillInfo
,
(
tFilePage
**
)
pQuery
->
sdata
);
numOfFilled
=
0
;
numOfFilled
=
0
;
pQuery
->
rec
.
rows
=
doFillGapsInResults
(
pRuntimeEnv
,
(
tFilePage
**
)
pQuery
->
sdata
,
&
numOfFilled
);
pQuery
->
rec
.
rows
=
doFillGapsInResults
(
pRuntimeEnv
,
(
tFilePage
**
)
pQuery
->
sdata
,
&
numOfFilled
);
if
(
pQuery
->
rec
.
rows
>
0
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
if
(
pQuery
->
rec
.
rows
>
0
||
Q_STATUS_EQUAL
(
pQuery
->
status
,
QUERY_COMPLETED
))
{
limitResults
(
pRuntimeEnv
);
limitResults
(
pRuntimeEnv
);
...
@@ -5180,11 +5183,11 @@ static void tableQueryImpl(SQInfo *pQInfo) {
...
@@ -5180,11 +5183,11 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// number of points returned during this query
// number of points returned during this query
pQuery
->
rec
.
rows
=
0
;
pQuery
->
rec
.
rows
=
0
;
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
assert
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
assert
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
1
);
SArray
*
g
=
GET_TABLEGROUP
(
pQInfo
,
0
);
SArray
*
g
=
GET_TABLEGROUP
(
pQInfo
,
0
);
STableQueryInfo
*
item
=
taosArrayGetP
(
g
,
0
);
STableQueryInfo
*
item
=
taosArrayGetP
(
g
,
0
);
// group by normal column, sliding window query, interval query are handled by interval query processor
// group by normal column, sliding window query, interval query are handled by interval query processor
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
)
{
// interval (down sampling operation)
if
(
QUERY_IS_INTERVAL_QUERY
(
pQuery
)
||
pRuntimeEnv
->
groupbyNormalCol
)
{
// interval (down sampling operation)
tableIntervalProcess
(
pQInfo
,
item
);
tableIntervalProcess
(
pQInfo
,
item
);
...
@@ -5384,7 +5387,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
...
@@ -5384,7 +5387,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
for
(
int32_t
f
=
0
;
f
<
numOfFilters
;
++
f
)
{
for
(
int32_t
f
=
0
;
f
<
numOfFilters
;
++
f
)
{
SColumnFilterInfo
*
pFilterMsg
=
(
SColumnFilterInfo
*
)
pMsg
;
SColumnFilterInfo
*
pFilterMsg
=
(
SColumnFilterInfo
*
)
pMsg
;
SColumnFilterInfo
*
pColFilter
=
&
pColInfo
->
filters
[
f
];
SColumnFilterInfo
*
pColFilter
=
&
pColInfo
->
filters
[
f
];
pColFilter
->
filterstr
=
htons
(
pFilterMsg
->
filterstr
);
pColFilter
->
filterstr
=
htons
(
pFilterMsg
->
filterstr
);
...
@@ -5805,7 +5808,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
...
@@ -5805,7 +5808,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
break
;
break
;
}
}
}
}
assert
(
f
<
pQuery
->
numOfTags
||
pColIndex
->
colId
==
TSDB_TBNAME_COLUMN_INDEX
);
assert
(
f
<
pQuery
->
numOfTags
||
pColIndex
->
colId
==
TSDB_TBNAME_COLUMN_INDEX
);
}
}
}
}
...
@@ -6062,7 +6065,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
...
@@ -6062,7 +6065,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
))
!=
TSDB_CODE_SUCCESS
)
{
if
((
code
=
doInitQInfo
(
pQInfo
,
pTSBuf
,
tsdb
,
vgId
,
isSTable
))
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
}
}
return
code
;
return
code
;
_error:
_error:
...
@@ -6140,7 +6143,7 @@ static void freeQInfo(SQInfo *pQInfo) {
...
@@ -6140,7 +6143,7 @@ static void freeQInfo(SQInfo *pQInfo) {
taosHashCleanup
(
pQInfo
->
tableqinfoGroupInfo
.
map
);
taosHashCleanup
(
pQInfo
->
tableqinfoGroupInfo
.
map
);
tsdbDestroyTableGroup
(
&
pQInfo
->
tableGroupInfo
);
tsdbDestroyTableGroup
(
&
pQInfo
->
tableGroupInfo
);
taosArrayDestroy
(
pQInfo
->
arrTableIdInfo
);
taosArrayDestroy
(
pQInfo
->
arrTableIdInfo
);
if
(
pQuery
->
pGroupbyExpr
!=
NULL
)
{
if
(
pQuery
->
pGroupbyExpr
!=
NULL
)
{
taosArrayDestroy
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
taosArrayDestroy
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
taosTFree
(
pQuery
->
pGroupbyExpr
);
taosTFree
(
pQuery
->
pGroupbyExpr
);
...
@@ -6217,7 +6220,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
...
@@ -6217,7 +6220,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
qError
(
"QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s"
,
pQInfo
,
qError
(
"QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s"
,
pQInfo
,
pQuery
->
sdata
[
0
]
->
data
,
strerror
(
errno
));
pQuery
->
sdata
[
0
]
->
data
,
strerror
(
errno
));
if
(
fd
!=
-
1
)
{
if
(
fd
!=
-
1
)
{
close
(
fd
);
close
(
fd
);
}
}
}
}
...
@@ -6236,7 +6239,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
...
@@ -6236,7 +6239,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
qDebug
(
"QInfo:%p results limitation reached, limitation:%"
PRId64
,
pQInfo
,
pQuery
->
limit
.
limit
);
qDebug
(
"QInfo:%p results limitation reached, limitation:%"
PRId64
,
pQInfo
,
pQuery
->
limit
.
limit
);
setQueryStatus
(
pQuery
,
QUERY_OVER
);
setQueryStatus
(
pQuery
,
QUERY_OVER
);
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -6338,7 +6341,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
...
@@ -6338,7 +6341,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
pExprs
=
NULL
;
pExprs
=
NULL
;
pGroupbyExpr
=
NULL
;
pGroupbyExpr
=
NULL
;
pTagColumnInfo
=
NULL
;
pTagColumnInfo
=
NULL
;
if
((
*
pQInfo
)
==
NULL
)
{
if
((
*
pQInfo
)
==
NULL
)
{
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
goto
_over
;
goto
_over
;
...
@@ -6353,7 +6356,7 @@ _over:
...
@@ -6353,7 +6356,7 @@ _over:
if
(
pGroupbyExpr
!=
NULL
)
{
if
(
pGroupbyExpr
!=
NULL
)
{
taosArrayDestroy
(
pGroupbyExpr
->
columnInfo
);
taosArrayDestroy
(
pGroupbyExpr
->
columnInfo
);
free
(
pGroupbyExpr
);
free
(
pGroupbyExpr
);
}
}
free
(
pTagColumnInfo
);
free
(
pTagColumnInfo
);
free
(
pExprs
);
free
(
pExprs
);
free
(
pExprMsg
);
free
(
pExprMsg
);
...
@@ -6520,7 +6523,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
...
@@ -6520,7 +6523,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
(
*
pRsp
)
->
offset
=
0
;
(
*
pRsp
)
->
offset
=
0
;
(
*
pRsp
)
->
useconds
=
htobe64
(
pRuntimeEnv
->
summary
.
elapsedTime
);
(
*
pRsp
)
->
useconds
=
htobe64
(
pRuntimeEnv
->
summary
.
elapsedTime
);
}
}
(
*
pRsp
)
->
precision
=
htons
(
pQuery
->
precision
);
(
*
pRsp
)
->
precision
=
htons
(
pQuery
->
precision
);
if
(
pQuery
->
rec
.
rows
>
0
&&
pQInfo
->
code
==
TSDB_CODE_SUCCESS
)
{
if
(
pQuery
->
rec
.
rows
>
0
&&
pQInfo
->
code
==
TSDB_CODE_SUCCESS
)
{
doDumpQueryResult
(
pQInfo
,
(
*
pRsp
)
->
data
);
doDumpQueryResult
(
pQInfo
,
(
*
pRsp
)
->
data
);
...
@@ -6597,7 +6600,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
...
@@ -6597,7 +6600,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
if
(
numOfGroup
==
0
)
{
if
(
numOfGroup
==
0
)
{
return
;
return
;
}
}
SArray
*
pa
=
GET_TABLEGROUP
(
pQInfo
,
0
);
SArray
*
pa
=
GET_TABLEGROUP
(
pQInfo
,
0
);
size_t
num
=
taosArrayGetSize
(
pa
);
size_t
num
=
taosArrayGetSize
(
pa
);
...
@@ -6698,7 +6701,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
...
@@ -6698,7 +6701,7 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
}
else
{
}
else
{
type
=
pExprInfo
[
j
].
type
;
type
=
pExprInfo
[
j
].
type
;
bytes
=
pExprInfo
[
j
].
bytes
;
bytes
=
pExprInfo
[
j
].
bytes
;
data
=
tsdbGetTableTagVal
(
item
->
pTable
,
pExprInfo
[
j
].
base
.
colInfo
.
colId
,
type
,
bytes
);
data
=
tsdbGetTableTagVal
(
item
->
pTable
,
pExprInfo
[
j
].
base
.
colInfo
.
colId
,
type
,
bytes
);
dst
=
pQuery
->
sdata
[
j
]
->
data
+
count
*
pExprInfo
[
j
].
bytes
;
dst
=
pQuery
->
sdata
[
j
]
->
data
+
count
*
pExprInfo
[
j
].
bytes
;
...
...
src/query/src/qUtil.c
浏览文件 @
8972ebc1
...
@@ -45,8 +45,8 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
...
@@ -45,8 +45,8 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo
->
curIndex
=
-
1
;
pWindowResInfo
->
curIndex
=
-
1
;
pWindowResInfo
->
size
=
0
;
pWindowResInfo
->
size
=
0
;
pWindowResInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
pWindowResInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
pRuntimeEnv
->
summary
.
internalSupSize
+=
sizeof
(
SWindowResult
)
*
threshold
;
SQueryCostInfo
*
pSummary
=
&
pRuntimeEnv
->
summary
;
// use the pointer arraylist
// use the pointer arraylist
pWindowResInfo
->
pResult
=
calloc
(
threshold
,
sizeof
(
SWindowResult
));
pWindowResInfo
->
pResult
=
calloc
(
threshold
,
sizeof
(
SWindowResult
));
...
@@ -54,8 +54,11 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
...
@@ -54,8 +54,11 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
pRuntimeEnv
->
summary
.
internalSupSize
+=
sizeof
(
SWindowResult
)
*
threshold
;
pWindowResInfo
->
interval
=
pRuntimeEnv
->
pQuery
->
intervalTime
;
pRuntimeEnv
->
summary
.
internalSupSize
+=
(
pRuntimeEnv
->
pQuery
->
numOfOutput
*
sizeof
(
SResultInfo
)
+
pRuntimeEnv
->
interBufSize
)
*
pWindowResInfo
->
capacity
;
pSummary
->
internalSupSize
+=
sizeof
(
SWindowResult
)
*
threshold
;
pSummary
->
internalSupSize
+=
(
pRuntimeEnv
->
pQuery
->
numOfOutput
*
sizeof
(
SResultInfo
)
+
pRuntimeEnv
->
interBufSize
)
*
pWindowResInfo
->
capacity
;
pSummary
->
numOfTimeWindows
=
threshold
;
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
capacity
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
capacity
;
++
i
)
{
int32_t
code
=
createQueryResultInfo
(
pRuntimeEnv
->
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
pRuntimeEnv
->
interBufSize
);
int32_t
code
=
createQueryResultInfo
(
pRuntimeEnv
->
pQuery
,
&
pWindowResInfo
->
pResult
[
i
],
pRuntimeEnv
->
stableQuery
,
pRuntimeEnv
->
interBufSize
);
...
@@ -126,8 +129,8 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
...
@@ -126,8 +129,8 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
i
];
if
(
pResult
->
status
.
closed
)
{
// remove the window slot from hash table
if
(
pResult
->
closed
)
{
// remove the window slot from hash table
taosHashRemove
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
pWindowResInfo
->
type
);
taosHashRemove
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
skey
,
pWindowResInfo
->
type
);
}
else
{
}
else
{
break
;
break
;
}
}
...
@@ -149,12 +152,12 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
...
@@ -149,12 +152,12 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
pWindowResInfo
->
size
=
remain
;
pWindowResInfo
->
size
=
remain
;
for
(
int32_t
k
=
0
;
k
<
pWindowResInfo
->
size
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
pWindowResInfo
->
size
;
++
k
)
{
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
k
];
SWindowResult
*
pResult
=
&
pWindowResInfo
->
pResult
[
k
];
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
window
.
skey
,
int32_t
*
p
=
(
int32_t
*
)
taosHashGet
(
pWindowResInfo
->
hashList
,
(
const
char
*
)
&
pResult
->
skey
,
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
);
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
);
assert
(
p
!=
NULL
);
assert
(
p
!=
NULL
);
int32_t
v
=
(
*
p
-
num
);
int32_t
v
=
(
*
p
-
num
);
assert
(
v
>=
0
&&
v
<=
pWindowResInfo
->
size
);
assert
(
v
>=
0
&&
v
<=
pWindowResInfo
->
size
);
taosHashPut
(
pWindowResInfo
->
hashList
,
(
char
*
)
&
pResult
->
window
.
skey
,
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
,
taosHashPut
(
pWindowResInfo
->
hashList
,
(
char
*
)
&
pResult
->
skey
,
tDataTypeDesc
[
pWindowResInfo
->
type
].
nSize
,
(
char
*
)
&
v
,
sizeof
(
int32_t
));
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
}
...
@@ -173,7 +176,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -173,7 +176,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t
numOfClosedTimeWindow
(
SWindowResInfo
*
pWindowResInfo
)
{
int32_t
numOfClosedTimeWindow
(
SWindowResInfo
*
pWindowResInfo
)
{
int32_t
i
=
0
;
int32_t
i
=
0
;
while
(
i
<
pWindowResInfo
->
size
&&
pWindowResInfo
->
pResult
[
i
].
status
.
closed
)
{
while
(
i
<
pWindowResInfo
->
size
&&
pWindowResInfo
->
pResult
[
i
].
closed
)
{
++
i
;
++
i
;
}
}
...
@@ -184,11 +187,11 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
...
@@ -184,11 +187,11 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
assert
(
pWindowResInfo
->
size
>=
0
&&
pWindowResInfo
->
capacity
>=
pWindowResInfo
->
size
);
assert
(
pWindowResInfo
->
size
>=
0
&&
pWindowResInfo
->
capacity
>=
pWindowResInfo
->
size
);
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
if
(
pWindowResInfo
->
pResult
[
i
].
status
.
closed
)
{
if
(
pWindowResInfo
->
pResult
[
i
].
closed
)
{
continue
;
continue
;
}
}
pWindowResInfo
->
pResult
[
i
].
status
.
closed
=
true
;
pWindowResInfo
->
pResult
[
i
].
closed
=
true
;
}
}
}
}
...
@@ -204,7 +207,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
...
@@ -204,7 +207,7 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
}
// get the result order
// get the result order
int32_t
resultOrder
=
(
pWindowResInfo
->
pResult
[
0
].
window
.
skey
<
pWindowResInfo
->
pResult
[
1
].
window
.
skey
)
?
1
:-
1
;
int32_t
resultOrder
=
(
pWindowResInfo
->
pResult
[
0
].
skey
<
pWindowResInfo
->
pResult
[
1
]
.
skey
)
?
1
:-
1
;
if
(
order
!=
resultOrder
)
{
if
(
order
!=
resultOrder
)
{
return
;
return
;
...
@@ -212,11 +215,12 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
...
@@ -212,11 +215,12 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
int32_t
i
=
0
;
int32_t
i
=
0
;
if
(
order
==
QUERY_ASC_FORWARD_STEP
)
{
if
(
order
==
QUERY_ASC_FORWARD_STEP
)
{
while
(
i
<
pWindowResInfo
->
size
&&
(
pWindowResInfo
->
pResult
[
i
].
window
.
ekey
<
lastKey
))
{
TSKEY
ekey
=
pWindowResInfo
->
pResult
[
i
].
skey
+
pWindowResInfo
->
interval
;
while
(
i
<
pWindowResInfo
->
size
&&
(
ekey
<
lastKey
))
{
++
i
;
++
i
;
}
}
}
else
if
(
order
==
QUERY_DESC_FORWARD_STEP
)
{
}
else
if
(
order
==
QUERY_DESC_FORWARD_STEP
)
{
while
(
i
<
pWindowResInfo
->
size
&&
(
pWindowResInfo
->
pResult
[
i
].
window
.
skey
>
lastKey
))
{
while
(
i
<
pWindowResInfo
->
size
&&
(
pWindowResInfo
->
pResult
[
i
].
skey
>
lastKey
))
{
++
i
;
++
i
;
}
}
}
}
...
@@ -227,11 +231,11 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
...
@@ -227,11 +231,11 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
}
bool
isWindowResClosed
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
bool
isWindowResClosed
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
return
(
getWindowResult
(
pWindowResInfo
,
slot
)
->
status
.
closed
==
true
);
return
(
getWindowResult
(
pWindowResInfo
,
slot
)
->
closed
==
true
);
}
}
void
closeTimeWindow
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
void
closeTimeWindow
(
SWindowResInfo
*
pWindowResInfo
,
int32_t
slot
)
{
getWindowResult
(
pWindowResInfo
,
slot
)
->
status
.
closed
=
true
;
getWindowResult
(
pWindowResInfo
,
slot
)
->
closed
=
true
;
}
}
void
clearTimeWindowResBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pWindowRes
)
{
void
clearTimeWindowResBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
pWindowRes
)
{
...
@@ -253,8 +257,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
...
@@ -253,8 +257,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
pWindowRes
->
numOfRows
=
0
;
pWindowRes
->
numOfRows
=
0
;
pWindowRes
->
pos
=
(
SPosInfo
){
-
1
,
-
1
};
pWindowRes
->
pos
=
(
SPosInfo
){
-
1
,
-
1
};
pWindowRes
->
status
.
closed
=
false
;
pWindowRes
->
closed
=
false
;
pWindowRes
->
window
=
TSWINDOW_INITIALIZER
;
pWindowRes
->
skey
=
TSKEY_INITIAL_VAL
;
}
}
/**
/**
...
@@ -264,8 +268,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
...
@@ -264,8 +268,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
*/
*/
void
copyTimeWindowResBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
dst
,
const
SWindowResult
*
src
)
{
void
copyTimeWindowResBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SWindowResult
*
dst
,
const
SWindowResult
*
src
)
{
dst
->
numOfRows
=
src
->
numOfRows
;
dst
->
numOfRows
=
src
->
numOfRows
;
dst
->
window
=
src
->
window
;
dst
->
skey
=
src
->
skey
;
dst
->
status
=
src
->
status
;
dst
->
closed
=
src
->
closed
;
int32_t
nOutputCols
=
pRuntimeEnv
->
pQuery
->
numOfOutput
;
int32_t
nOutputCols
=
pRuntimeEnv
->
pQuery
->
numOfOutput
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录