Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
c303ff2a
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c303ff2a
编写于
1月 31, 2020
作者:
H
hjxilinx
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor some codes [tbase-266]
上级
8c2766e3
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
290 addition
and
268 deletion
+290
-268
src/client/src/tscFunctionImpl.c
src/client/src/tscFunctionImpl.c
+29
-27
src/system/detail/inc/vnodeQueryImpl.h
src/system/detail/inc/vnodeQueryImpl.h
+5
-3
src/system/detail/inc/vnodeRead.h
src/system/detail/inc/vnodeRead.h
+5
-6
src/system/detail/src/vnodeQueryImpl.c
src/system/detail/src/vnodeQueryImpl.c
+195
-174
src/system/detail/src/vnodeQueryProcess.c
src/system/detail/src/vnodeQueryProcess.c
+54
-56
src/system/detail/src/vnodeRead.c
src/system/detail/src/vnodeRead.c
+2
-2
未找到文件。
src/client/src/tscFunctionImpl.c
浏览文件 @
c303ff2a
...
@@ -72,6 +72,8 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
...
@@ -72,6 +72,8 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \
void
noop1
(
SQLFunctionCtx
*
UNUSED_PARAM
(
pCtx
))
{}
void
noop1
(
SQLFunctionCtx
*
UNUSED_PARAM
(
pCtx
))
{}
void
noop2
(
SQLFunctionCtx
*
UNUSED_PARAM
(
pCtx
),
int32_t
UNUSED_PARAM
(
index
))
{}
void
noop2
(
SQLFunctionCtx
*
UNUSED_PARAM
(
pCtx
),
int32_t
UNUSED_PARAM
(
index
))
{}
void
doFinalizer
(
SQLFunctionCtx
*
pCtx
)
{
resetResultInfo
(
GET_RES_INFO
(
pCtx
));
}
typedef
struct
tValuePair
{
typedef
struct
tValuePair
{
tVariant
v
;
tVariant
v
;
int64_t
timestamp
;
int64_t
timestamp
;
...
@@ -355,8 +357,8 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -355,8 +357,8 @@ static void function_finalizer(SQLFunctionCtx *pCtx) {
pTrace
(
"no result generated, result is set to NULL"
);
pTrace
(
"no result generated, result is set to NULL"
);
setNull
(
pCtx
->
aOutputBuf
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
setNull
(
pCtx
->
aOutputBuf
,
pCtx
->
outputType
,
pCtx
->
outputBytes
);
}
}
resetResultInfo
(
GET_RES_INFO
(
pCtx
)
);
doFinalizer
(
pCtx
);
}
}
/*
/*
...
@@ -889,6 +891,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -889,6 +891,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) {
// cannot set the numOfIteratedElems again since it is set during previous iteration
// cannot set the numOfIteratedElems again since it is set during previous iteration
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
doFinalizer
(
pCtx
);
}
}
/////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////////////////
...
@@ -1433,8 +1436,8 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -1433,8 +1436,8 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) {
*
retValue
=
sqrt
(
pStd
->
res
/
pStd
->
num
);
*
retValue
=
sqrt
(
pStd
->
res
/
pStd
->
num
);
SET_VAL
(
pCtx
,
1
,
1
);
SET_VAL
(
pCtx
,
1
,
1
);
}
}
resetResultInfo
(
GET_RES_INFO
(
pCtx
)
);
doFinalizer
(
pCtx
);
}
}
//////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////
...
@@ -1836,7 +1839,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -1836,7 +1839,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) {
}
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
resetResultInfo
(
GET_RES_INFO
(
pCtx
)
);
doFinalizer
(
pCtx
);
}
}
//////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////
...
@@ -2404,8 +2407,8 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -2404,8 +2407,8 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) {
GET_TRUE_DATA_TYPE
();
GET_TRUE_DATA_TYPE
();
copyTopBotRes
(
pCtx
,
type
);
copyTopBotRes
(
pCtx
,
type
);
resetResultInfo
(
pResInfo
);
doFinalizer
(
pCtx
);
}
}
///////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////
...
@@ -2481,8 +2484,8 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -2481,8 +2484,8 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) {
tOrderDescDestroy
(
pMemBucket
->
pOrderDesc
);
tOrderDescDestroy
(
pMemBucket
->
pOrderDesc
);
tMemBucketDestroy
(
pMemBucket
);
tMemBucketDestroy
(
pMemBucket
);
resetResultInfo
(
GET_RES_INFO
(
pCtx
)
);
doFinalizer
(
pCtx
);
}
}
//////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////
...
@@ -2690,8 +2693,8 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -2690,8 +2693,8 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) {
return
;
return
;
}
}
}
}
resetResultInfo
(
pResInfo
);
doFinalizer
(
pCtx
);
}
}
/////////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////////////////
...
@@ -2871,7 +2874,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -2871,7 +2874,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
param
[
1
][
2
]
/=
param
[
1
][
1
];
param
[
1
][
2
]
/=
param
[
1
][
1
];
sprintf
(
pCtx
->
aOutputBuf
,
"(%lf, %lf)"
,
param
[
0
][
2
],
param
[
1
][
2
]);
sprintf
(
pCtx
->
aOutputBuf
,
"(%lf, %lf)"
,
param
[
0
][
2
],
param
[
1
][
2
]);
resetResultInfo
(
GET_RES_INFO
(
pCtx
)
);
doFinalizer
(
pCtx
);
}
}
static
void
date_col_output_function
(
SQLFunctionCtx
*
pCtx
)
{
static
void
date_col_output_function
(
SQLFunctionCtx
*
pCtx
)
{
...
@@ -2927,18 +2930,17 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
...
@@ -2927,18 +2930,17 @@ static void tag_project_function(SQLFunctionCtx *pCtx) {
INC_INIT_VAL
(
pCtx
,
pCtx
->
size
);
INC_INIT_VAL
(
pCtx
,
pCtx
->
size
);
assert
(
pCtx
->
inputBytes
==
pCtx
->
outputBytes
);
assert
(
pCtx
->
inputBytes
==
pCtx
->
outputBytes
);
// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pCtx
->
size
;
++
i
)
{
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
outputType
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
outputType
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
/* * factor*/
;
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
;
}
}
}
}
static
void
tag_project_function_f
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
)
{
static
void
tag_project_function_f
(
SQLFunctionCtx
*
pCtx
,
int32_t
index
)
{
INC_INIT_VAL
(
pCtx
,
1
);
INC_INIT_VAL
(
pCtx
,
1
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
tag
.
nType
);
tVariantDump
(
&
pCtx
->
tag
,
pCtx
->
aOutputBuf
,
pCtx
->
tag
.
nType
);
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/
;
pCtx
->
aOutputBuf
+=
pCtx
->
outputBytes
;
}
}
/**
/**
...
@@ -4183,7 +4185,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
...
@@ -4183,7 +4185,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
}
}
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
GET_RES_INFO
(
pCtx
)
->
numOfRes
=
1
;
resetResultInfo
(
GET_RES_INFO
(
pCtx
)
);
doFinalizer
(
pCtx
);
}
}
/**
/**
...
@@ -4345,7 +4347,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
...
@@ -4345,7 +4347,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
strcpy
(
pCtx
->
aOutputBuf
,
pTSbuf
->
path
);
strcpy
(
pCtx
->
aOutputBuf
,
pTSbuf
->
path
);
tsBufDestory
(
pTSbuf
);
tsBufDestory
(
pTSbuf
);
resetResultInfo
(
GET_RES_INFO
(
pCtx
)
);
doFinalizer
(
pCtx
);
}
}
/*
/*
...
@@ -4385,7 +4387,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4385,7 +4387,7 @@ SQLAggFuncElem aAggs[28] = {{
count_function
,
count_function
,
count_function_f
,
count_function_f
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
count_func_merge
,
count_func_merge
,
count_func_merge
,
count_func_merge
,
count_load_data_info
,
count_load_data_info
,
...
@@ -4628,7 +4630,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4628,7 +4630,7 @@ SQLAggFuncElem aAggs[28] = {{
date_col_output_function
,
date_col_output_function
,
date_col_output_function_f
,
date_col_output_function_f
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
copy_function
,
copy_function
,
copy_function
,
copy_function
,
no_data_info
,
no_data_info
,
...
@@ -4643,7 +4645,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4643,7 +4645,7 @@ SQLAggFuncElem aAggs[28] = {{
noop1
,
noop1
,
noop2
,
noop2
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
copy_function
,
copy_function
,
copy_function
,
copy_function
,
data_req_load_info
,
data_req_load_info
,
...
@@ -4658,7 +4660,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4658,7 +4660,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_function
,
tag_function
,
noop2
,
noop2
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
copy_function
,
copy_function
,
copy_function
,
copy_function
,
no_data_info
,
no_data_info
,
...
@@ -4688,7 +4690,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4688,7 +4690,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_function
,
tag_function
,
tag_function_f
,
tag_function_f
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
copy_function
,
copy_function
,
copy_function
,
copy_function
,
no_data_info
,
no_data_info
,
...
@@ -4703,7 +4705,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4703,7 +4705,7 @@ SQLAggFuncElem aAggs[28] = {{
col_project_function
,
col_project_function
,
col_project_function_f
,
col_project_function_f
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
copy_function
,
copy_function
,
copy_function
,
copy_function
,
data_req_load_info
,
data_req_load_info
,
...
@@ -4718,7 +4720,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4718,7 +4720,7 @@ SQLAggFuncElem aAggs[28] = {{
tag_project_function
,
tag_project_function
,
tag_project_function_f
,
tag_project_function_f
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
copy_function
,
copy_function
,
copy_function
,
copy_function
,
no_data_info
,
no_data_info
,
...
@@ -4733,7 +4735,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4733,7 +4735,7 @@ SQLAggFuncElem aAggs[28] = {{
arithmetic_function
,
arithmetic_function
,
arithmetic_function_f
,
arithmetic_function_f
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
copy_function
,
copy_function
,
copy_function
,
copy_function
,
data_req_load_info
,
data_req_load_info
,
...
@@ -4748,7 +4750,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4748,7 +4750,7 @@ SQLAggFuncElem aAggs[28] = {{
diff_function
,
diff_function
,
diff_function_f
,
diff_function_f
,
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
noop1
,
noop1
,
noop1
,
noop1
,
data_req_load_info
,
data_req_load_info
,
...
@@ -4794,7 +4796,7 @@ SQLAggFuncElem aAggs[28] = {{
...
@@ -4794,7 +4796,7 @@ SQLAggFuncElem aAggs[28] = {{
interp_function
,
interp_function
,
do_sum_f
,
// todo filter handle
do_sum_f
,
// todo filter handle
no_next_step
,
no_next_step
,
noop1
,
doFinalizer
,
noop1
,
noop1
,
copy_function
,
copy_function
,
no_data_info
,
no_data_info
,
...
...
src/system/detail/inc/vnodeQueryImpl.h
浏览文件 @
c303ff2a
...
@@ -279,9 +279,11 @@ void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter);
...
@@ -279,9 +279,11 @@ void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter);
void
clearGroupResultBuf
(
SOutputRes
*
pOneOutputRes
,
int32_t
nOutputCols
);
void
clearGroupResultBuf
(
SOutputRes
*
pOneOutputRes
,
int32_t
nOutputCols
);
void
copyGroupResultBuf
(
SOutputRes
*
dst
,
const
SOutputRes
*
src
,
int32_t
nOutputCols
);
void
copyGroupResultBuf
(
SOutputRes
*
dst
,
const
SOutputRes
*
src
,
int32_t
nOutputCols
);
void
resetResWindowInfo
(
SSlidingWindowResInfo
*
pWindowResInfo
,
int32_t
numOfCols
);
void
resetSlidingWindowInfo
(
SSlidingWindowInfo
*
pSlidingWindowInfo
,
int32_t
numOfCols
);
void
clearCompletedResWindows
(
SSlidingWindowResInfo
*
pWindowResInfo
,
int32_t
numOfCols
);
void
clearCompletedSlidingWindows
(
SSlidingWindowInfo
*
pSlidingWindowInfo
,
int32_t
numOfCols
);
int32_t
numOfResFromResWindowInfo
(
SSlidingWindowResInfo
*
pWindowResInfo
);
int32_t
numOfClosedSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindowInfo
);
void
closeSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindowInfo
,
int32_t
slot
);
void
closeAllSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindowInfo
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/system/detail/inc/vnodeRead.h
浏览文件 @
c303ff2a
...
@@ -122,7 +122,7 @@ typedef struct SWindowStatus {
...
@@ -122,7 +122,7 @@ typedef struct SWindowStatus {
bool
closed
;
bool
closed
;
}
SWindowStatus
;
}
SWindowStatus
;
typedef
struct
SSlidingWindow
Res
Info
{
typedef
struct
SSlidingWindowInfo
{
SOutputRes
*
pResult
;
// reference to SQuerySupporter->pResult
SOutputRes
*
pResult
;
// reference to SQuerySupporter->pResult
SWindowStatus
*
pStatus
;
// current query window closed or not?
SWindowStatus
*
pStatus
;
// current query window closed or not?
void
*
hashList
;
// hash list for quick access
void
*
hashList
;
// hash list for quick access
...
@@ -134,7 +134,7 @@ typedef struct SSlidingWindowResInfo {
...
@@ -134,7 +134,7 @@ typedef struct SSlidingWindowResInfo {
int64_t
startTime
;
// start time of the first time window for sliding query
int64_t
startTime
;
// start time of the first time window for sliding query
int64_t
prevSKey
;
// previous (not completed) sliding window start key
int64_t
prevSKey
;
// previous (not completed) sliding window start key
int64_t
threshold
;
// threshold for return completed results.
int64_t
threshold
;
// threshold for return completed results.
}
SSlidingWindow
Res
Info
;
}
SSlidingWindowInfo
;
typedef
struct
SQueryRuntimeEnv
{
typedef
struct
SQueryRuntimeEnv
{
SPositionInfo
startPos
;
/* the start position, used for secondary/third iteration */
SPositionInfo
startPos
;
/* the start position, used for secondary/third iteration */
...
@@ -159,14 +159,13 @@ typedef struct SQueryRuntimeEnv {
...
@@ -159,14 +159,13 @@ typedef struct SQueryRuntimeEnv {
SInterpolationInfo
interpoInfo
;
SInterpolationInfo
interpoInfo
;
SData
**
pInterpoBuf
;
SData
**
pInterpoBuf
;
SSlidingWindow
Res
Info
swindowResInfo
;
SSlidingWindowInfo
swindowResInfo
;
STSBuf
*
pTSBuf
;
STSBuf
*
pTSBuf
;
STSCursor
cur
;
STSCursor
cur
;
SQueryCostSummary
summary
;
SQueryCostSummary
summary
;
TSKEY
intervalSKey
;
// skey of the complete time window, not affected by the actual data distribution
STimeWindow
intervalWindow
;
// the complete time window, not affected by the actual data distribution
TSKEY
intervalEKey
;
// ekey of the complete time window
/*
/*
* Temporarily hold the in-memory cache block info during scan cache blocks
* Temporarily hold the in-memory cache block info during scan cache blocks
...
@@ -296,7 +295,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo* pQInfo, SQuery* pQuery, void* param)
...
@@ -296,7 +295,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo* pQInfo, SQuery* pQuery, void* param)
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
);
void
vnodeDecMeterRefcnt
(
SQInfo
*
pQInfo
);
/* sql query handle in dnode */
/* sql query handle in dnode */
void
vnodeSingle
Meter
Query
(
SSchedMsg
*
pMsg
);
void
vnodeSingle
Table
Query
(
SSchedMsg
*
pMsg
);
/*
/*
* handle multi-meter query process
* handle multi-meter query process
...
...
src/system/detail/src/vnodeQueryImpl.c
浏览文件 @
c303ff2a
...
@@ -88,7 +88,7 @@ static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __
...
@@ -88,7 +88,7 @@ static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __
static
bool
functionNeedToExecute
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
);
static
bool
functionNeedToExecute
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQLFunctionCtx
*
pCtx
,
int32_t
functionId
);
static
void
doGetAlignedIntervalQueryRangeImpl
(
SQuery
*
pQuery
,
int64_t
pKey
,
int64_t
keyFirst
,
int64_t
keyLast
,
static
void
doGetAlignedIntervalQueryRangeImpl
(
SQuery
*
pQuery
,
int64_t
pKey
,
int64_t
keyFirst
,
int64_t
keyLast
,
int64_t
*
actualSkey
,
int64_t
*
actualEkey
,
int64_t
*
skey
,
int64_t
*
ekey
);
int64_t
*
actualSkey
,
int64_t
*
actualEkey
,
int64_t
*
skey
,
int64_t
*
ekey
);
static
void
getNextLogicalQueryRange
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
*
skey
,
int64_t
*
ekey
);
static
void
getNextLogicalQueryRange
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pTimeWindow
);
// check the offset value integrity
// check the offset value integrity
static
FORCE_INLINE
int32_t
validateHeaderOffsetSegment
(
SQInfo
*
pQInfo
,
char
*
filePath
,
int32_t
vid
,
char
*
data
,
static
FORCE_INLINE
int32_t
validateHeaderOffsetSegment
(
SQInfo
*
pQInfo
,
char
*
filePath
,
int32_t
vid
,
char
*
data
,
...
@@ -1484,7 +1484,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
...
@@ -1484,7 +1484,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
}
}
}
}
TSKEY
ts
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pRuntimeEnv
->
interval
SKey
:
pRuntimeEnv
->
intervalEK
ey
;
TSKEY
ts
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pRuntimeEnv
->
interval
Window
.
skey
:
pRuntimeEnv
->
intervalWindow
.
ek
ey
;
setExecParams
(
pQuery
,
&
pCtx
[
k
],
ts
,
dataBlock
,
(
char
*
)
primaryKeyCol
,
forwardStep
,
functionId
,
tpField
,
hasNull
,
setExecParams
(
pQuery
,
&
pCtx
[
k
],
ts
,
dataBlock
,
(
char
*
)
primaryKeyCol
,
forwardStep
,
functionId
,
tpField
,
hasNull
,
pRuntimeEnv
->
blockStatus
,
&
sasArray
[
k
],
pRuntimeEnv
->
scanFlag
);
pRuntimeEnv
->
blockStatus
,
&
sasArray
[
k
],
pRuntimeEnv
->
scanFlag
);
}
}
...
@@ -1574,113 +1574,113 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
...
@@ -1574,113 +1574,113 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *
return
true
;
return
true
;
}
}
static
SOutputRes
*
getResWindow
(
SSlidingWindowResInfo
*
pWindowRes
Info
,
char
*
pData
,
int16_t
bytes
,
static
SOutputRes
*
doSetSlidingWindowFromKey
(
SSlidingWindowInfo
*
pSlidingWindow
Info
,
char
*
pData
,
int16_t
bytes
,
SWindowStatus
**
pStatus
)
{
SWindowStatus
**
pStatus
)
{
int32_t
p
=
-
1
;
int32_t
p
=
-
1
;
int32_t
*
p1
=
(
int32_t
*
)
taosGetDataFromHash
(
p
WindowRes
Info
->
hashList
,
pData
,
bytes
);
int32_t
*
p1
=
(
int32_t
*
)
taosGetDataFromHash
(
p
SlidingWindow
Info
->
hashList
,
pData
,
bytes
);
if
(
p1
!=
NULL
)
{
if
(
p1
!=
NULL
)
{
p
=
*
p1
;
p
=
*
p1
;
p
WindowRes
Info
->
curIndex
=
p
;
p
SlidingWindow
Info
->
curIndex
=
p
;
if
(
pStatus
!=
NULL
)
{
if
(
pStatus
!=
NULL
)
{
*
pStatus
=
&
p
WindowRes
Info
->
pStatus
[
p
];
*
pStatus
=
&
p
SlidingWindow
Info
->
pStatus
[
p
];
}
}
}
else
{
// more than the capacity, reallocate the resources
}
else
{
// more than the capacity, reallocate the resources
if
(
p
WindowResInfo
->
size
>=
pWindowRes
Info
->
capacity
)
{
if
(
p
SlidingWindowInfo
->
size
>=
pSlidingWindow
Info
->
capacity
)
{
int64_t
newCap
=
p
WindowRes
Info
->
capacity
*
2
;
int64_t
newCap
=
p
SlidingWindow
Info
->
capacity
*
2
;
char
*
t
=
realloc
(
p
WindowRes
Info
->
pStatus
,
newCap
*
sizeof
(
SWindowStatus
));
char
*
t
=
realloc
(
p
SlidingWindow
Info
->
pStatus
,
newCap
*
sizeof
(
SWindowStatus
));
if
(
t
!=
NULL
)
{
if
(
t
!=
NULL
)
{
p
WindowRes
Info
->
pStatus
=
(
SWindowStatus
*
)
t
;
p
SlidingWindow
Info
->
pStatus
=
(
SWindowStatus
*
)
t
;
memset
(
&
p
WindowResInfo
->
pStatus
[
pWindowResInfo
->
capacity
],
0
,
sizeof
(
SWindowStatus
)
*
pWindowRes
Info
->
capacity
);
memset
(
&
p
SlidingWindowInfo
->
pStatus
[
pSlidingWindowInfo
->
capacity
],
0
,
sizeof
(
SWindowStatus
)
*
pSlidingWindow
Info
->
capacity
);
}
else
{
}
else
{
// todo
// todo
}
}
p
WindowRes
Info
->
capacity
=
newCap
;
p
SlidingWindow
Info
->
capacity
=
newCap
;
}
}
// add a new result set for a new group
// add a new result set for a new group
if
(
pStatus
!=
NULL
)
{
if
(
pStatus
!=
NULL
)
{
*
pStatus
=
&
p
WindowResInfo
->
pStatus
[
pWindowRes
Info
->
size
];
*
pStatus
=
&
p
SlidingWindowInfo
->
pStatus
[
pSlidingWindow
Info
->
size
];
}
}
p
=
p
WindowRes
Info
->
size
;
p
=
p
SlidingWindow
Info
->
size
;
p
WindowResInfo
->
curIndex
=
pWindowRes
Info
->
size
;
p
SlidingWindowInfo
->
curIndex
=
pSlidingWindow
Info
->
size
;
p
WindowRes
Info
->
size
+=
1
;
p
SlidingWindow
Info
->
size
+=
1
;
taosAddToHashTable
(
p
WindowResInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pWindowRes
Info
->
curIndex
,
sizeof
(
int32_t
));
taosAddToHashTable
(
p
SlidingWindowInfo
->
hashList
,
pData
,
bytes
,
(
char
*
)
&
pSlidingWindow
Info
->
curIndex
,
sizeof
(
int32_t
));
}
}
return
&
p
WindowRes
Info
->
pResult
[
p
];
return
&
p
SlidingWindow
Info
->
pResult
[
p
];
}
}
static
int32_t
initResWindowInfo
(
SSlidingWindow
ResInfo
*
pWindowRes
Info
,
int32_t
threshold
,
int16_t
type
,
static
int32_t
initResWindowInfo
(
SSlidingWindow
Info
*
pSlidingWindow
Info
,
int32_t
threshold
,
int16_t
type
,
SOutputRes
*
pRes
)
{
SOutputRes
*
pRes
)
{
p
WindowRes
Info
->
capacity
=
threshold
;
p
SlidingWindow
Info
->
capacity
=
threshold
;
p
WindowRes
Info
->
threshold
=
threshold
;
p
SlidingWindow
Info
->
threshold
=
threshold
;
p
WindowRes
Info
->
type
=
type
;
p
SlidingWindow
Info
->
type
=
type
;
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
type
);
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
type
);
p
WindowRes
Info
->
hashList
=
taosInitHashTable
(
threshold
,
fn
,
false
);
p
SlidingWindow
Info
->
hashList
=
taosInitHashTable
(
threshold
,
fn
,
false
);
p
WindowRes
Info
->
curIndex
=
-
1
;
p
SlidingWindow
Info
->
curIndex
=
-
1
;
p
WindowRes
Info
->
size
=
0
;
p
SlidingWindow
Info
->
size
=
0
;
p
WindowRes
Info
->
pResult
=
pRes
;
p
SlidingWindow
Info
->
pResult
=
pRes
;
p
WindowRes
Info
->
pStatus
=
calloc
(
threshold
,
sizeof
(
SWindowStatus
));
p
SlidingWindow
Info
->
pStatus
=
calloc
(
threshold
,
sizeof
(
SWindowStatus
));
if
(
p
WindowResInfo
->
pStatus
==
NULL
||
pWindowRes
Info
->
hashList
==
NULL
)
{
if
(
p
SlidingWindowInfo
->
pStatus
==
NULL
||
pSlidingWindow
Info
->
hashList
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
void
destroyResWindowInfo
(
SSlidingWindow
ResInfo
*
pWindowRes
Info
)
{
static
void
destroyResWindowInfo
(
SSlidingWindow
Info
*
pSlidingWindow
Info
)
{
if
(
p
WindowResInfo
==
NULL
||
pWindowRes
Info
->
capacity
==
0
)
{
if
(
p
SlidingWindowInfo
==
NULL
||
pSlidingWindow
Info
->
capacity
==
0
)
{
assert
(
p
WindowResInfo
->
hashList
==
NULL
&&
pWindowRes
Info
->
pResult
==
NULL
);
assert
(
p
SlidingWindowInfo
->
hashList
==
NULL
&&
pSlidingWindow
Info
->
pResult
==
NULL
);
return
;
return
;
}
}
taosCleanUpHashTable
(
p
WindowRes
Info
->
hashList
);
taosCleanUpHashTable
(
p
SlidingWindow
Info
->
hashList
);
tfree
(
p
WindowRes
Info
->
pStatus
);
tfree
(
p
SlidingWindow
Info
->
pStatus
);
}
}
void
reset
ResWindowInfo
(
SSlidingWindowResInfo
*
pWindowRes
Info
,
int32_t
numOfCols
)
{
void
reset
SlidingWindowInfo
(
SSlidingWindowInfo
*
pSlidingWindow
Info
,
int32_t
numOfCols
)
{
if
(
p
WindowResInfo
==
NULL
||
pWindowRes
Info
->
capacity
==
0
)
{
if
(
p
SlidingWindowInfo
==
NULL
||
pSlidingWindow
Info
->
capacity
==
0
)
{
return
;
return
;
}
}
for
(
int32_t
i
=
0
;
i
<
p
WindowRes
Info
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SlidingWindow
Info
->
size
;
++
i
)
{
SOutputRes
*
pOneRes
=
&
p
WindowRes
Info
->
pResult
[
i
];
SOutputRes
*
pOneRes
=
&
p
SlidingWindow
Info
->
pResult
[
i
];
clearGroupResultBuf
(
pOneRes
,
numOfCols
);
clearGroupResultBuf
(
pOneRes
,
numOfCols
);
}
}
memset
(
p
WindowResInfo
->
pStatus
,
0
,
sizeof
(
SWindowStatus
)
*
pWindowRes
Info
->
capacity
);
memset
(
p
SlidingWindowInfo
->
pStatus
,
0
,
sizeof
(
SWindowStatus
)
*
pSlidingWindow
Info
->
capacity
);
p
WindowRes
Info
->
curIndex
=
-
1
;
p
SlidingWindow
Info
->
curIndex
=
-
1
;
taosCleanUpHashTable
(
p
WindowRes
Info
->
hashList
);
taosCleanUpHashTable
(
p
SlidingWindow
Info
->
hashList
);
p
WindowRes
Info
->
size
=
0
;
p
SlidingWindow
Info
->
size
=
0
;
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
p
WindowRes
Info
->
type
);
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
p
SlidingWindow
Info
->
type
);
p
WindowResInfo
->
hashList
=
taosInitHashTable
(
pWindowRes
Info
->
capacity
,
fn
,
false
);
p
SlidingWindowInfo
->
hashList
=
taosInitHashTable
(
pSlidingWindow
Info
->
capacity
,
fn
,
false
);
p
WindowRes
Info
->
startTime
=
0
;
p
SlidingWindow
Info
->
startTime
=
0
;
p
WindowRes
Info
->
prevSKey
=
0
;
p
SlidingWindow
Info
->
prevSKey
=
0
;
}
}
void
clearCompleted
ResWindows
(
SSlidingWindowResInfo
*
pWindowRes
Info
,
int32_t
numOfCols
)
{
void
clearCompleted
SlidingWindows
(
SSlidingWindowInfo
*
pSlidingWindow
Info
,
int32_t
numOfCols
)
{
if
(
p
WindowResInfo
==
NULL
||
pWindowResInfo
->
capacity
==
0
||
pWindowRes
Info
->
size
==
0
)
{
if
(
p
SlidingWindowInfo
==
NULL
||
pSlidingWindowInfo
->
capacity
==
0
||
pSlidingWindow
Info
->
size
==
0
)
{
return
;
return
;
}
}
int32_t
i
=
0
;
int32_t
i
=
0
;
for
(
i
=
0
;
i
<
p
WindowRes
Info
->
size
;
++
i
)
{
for
(
i
=
0
;
i
<
p
SlidingWindow
Info
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
p
WindowRes
Info
->
pStatus
[
i
];
SWindowStatus
*
pStatus
=
&
p
SlidingWindow
Info
->
pStatus
[
i
];
if
(
pStatus
->
closed
)
{
// remove the window slot from hash table
if
(
pStatus
->
closed
)
{
// remove the window slot from hash table
taosDeleteFromHashTable
(
p
WindowRes
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
);
taosDeleteFromHashTable
(
p
SlidingWindow
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
);
}
else
{
}
else
{
break
;
break
;
}
}
...
@@ -1690,48 +1690,106 @@ void clearCompletedResWindows(SSlidingWindowResInfo *pWindowResInfo, int32_t num
...
@@ -1690,48 +1690,106 @@ void clearCompletedResWindows(SSlidingWindowResInfo *pWindowResInfo, int32_t num
return
;
return
;
}
}
int32_t
remain
=
p
WindowRes
Info
->
size
-
i
;
int32_t
remain
=
p
SlidingWindow
Info
->
size
-
i
;
//clear remain list
//clear remain list
memmove
(
p
WindowResInfo
->
pStatus
,
&
pWindowRes
Info
->
pStatus
[
i
],
remain
*
sizeof
(
SWindowStatus
));
memmove
(
p
SlidingWindowInfo
->
pStatus
,
&
pSlidingWindow
Info
->
pStatus
[
i
],
remain
*
sizeof
(
SWindowStatus
));
memset
(
&
p
WindowResInfo
->
pStatus
[
remain
],
0
,
(
pWindowRes
Info
->
capacity
-
remain
)
*
sizeof
(
SWindowStatus
));
memset
(
&
p
SlidingWindowInfo
->
pStatus
[
remain
],
0
,
(
pSlidingWindow
Info
->
capacity
-
remain
)
*
sizeof
(
SWindowStatus
));
for
(
int32_t
k
=
0
;
k
<
remain
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
remain
;
++
k
)
{
copyGroupResultBuf
(
&
p
WindowResInfo
->
pResult
[
k
],
&
pWindowRes
Info
->
pResult
[
i
+
k
],
numOfCols
);
copyGroupResultBuf
(
&
p
SlidingWindowInfo
->
pResult
[
k
],
&
pSlidingWindow
Info
->
pResult
[
i
+
k
],
numOfCols
);
}
}
for
(
int32_t
k
=
remain
;
k
<
p
WindowRes
Info
->
size
;
++
k
)
{
for
(
int32_t
k
=
remain
;
k
<
p
SlidingWindow
Info
->
size
;
++
k
)
{
SOutputRes
*
pOneRes
=
&
p
WindowRes
Info
->
pResult
[
k
];
SOutputRes
*
pOneRes
=
&
p
SlidingWindow
Info
->
pResult
[
k
];
clearGroupResultBuf
(
pOneRes
,
numOfCols
);
clearGroupResultBuf
(
pOneRes
,
numOfCols
);
}
}
p
WindowRes
Info
->
size
=
remain
;
p
SlidingWindow
Info
->
size
=
remain
;
for
(
int32_t
k
=
0
;
k
<
p
WindowRes
Info
->
size
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
p
SlidingWindow
Info
->
size
;
++
k
)
{
SWindowStatus
*
pStatus
=
&
p
WindowRes
Info
->
pStatus
[
k
];
SWindowStatus
*
pStatus
=
&
p
SlidingWindow
Info
->
pStatus
[
k
];
int32_t
*
p
=
(
int32_t
*
)
taosGetDataFromHash
(
p
WindowRes
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
);
int32_t
*
p
=
(
int32_t
*
)
taosGetDataFromHash
(
p
SlidingWindow
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
);
int32_t
v
=
*
p
;
int32_t
v
=
*
p
;
v
=
(
v
-
i
);
v
=
(
v
-
i
);
taosDeleteFromHashTable
(
p
WindowRes
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
);
taosDeleteFromHashTable
(
p
SlidingWindow
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
);
taosAddToHashTable
(
p
WindowRes
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
,
taosAddToHashTable
(
p
SlidingWindow
Info
->
hashList
,
(
const
char
*
)
&
pStatus
->
window
.
skey
,
TSDB_KEYSIZE
,
(
char
*
)
&
v
,
sizeof
(
int32_t
));
(
char
*
)
&
v
,
sizeof
(
int32_t
));
}
}
p
WindowRes
Info
->
curIndex
=
-
1
;
p
SlidingWindow
Info
->
curIndex
=
-
1
;
}
}
int32_t
numOf
ResFromResWindowInfo
(
SSlidingWindowResInfo
*
pWindowRes
Info
)
{
int32_t
numOf
ClosedSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindow
Info
)
{
for
(
int32_t
i
=
0
;
i
<
p
WindowRes
Info
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SlidingWindow
Info
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
p
WindowRes
Info
->
pStatus
[
i
];
SWindowStatus
*
pStatus
=
&
p
SlidingWindow
Info
->
pStatus
[
i
];
if
(
pStatus
->
closed
==
false
)
{
if
(
pStatus
->
closed
==
false
)
{
return
i
;
return
i
;
}
}
}
}
}
}
static
SWindowStatus
*
getCurrentSWindow
(
SSlidingWindowResInfo
*
pWindowResInfo
)
{
void
closeSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindowInfo
,
int32_t
slot
)
{
return
&
pWindowResInfo
->
pStatus
[
pWindowResInfo
->
curIndex
];
assert
(
slot
>=
0
&&
slot
<
pSlidingWindowInfo
->
size
);
SWindowStatus
*
pStatus
=
&
pSlidingWindowInfo
->
pStatus
[
slot
];
pStatus
->
closed
=
true
;
}
void
closeAllSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindowInfo
)
{
assert
(
pSlidingWindowInfo
->
size
>=
0
&&
pSlidingWindowInfo
->
capacity
>=
pSlidingWindowInfo
->
size
);
for
(
int32_t
i
=
0
;
i
<
pSlidingWindowInfo
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
pSlidingWindowInfo
->
pStatus
[
i
];
pStatus
->
closed
=
true
;
}
}
static
SWindowStatus
*
getSlidingWindowStatus
(
SSlidingWindowInfo
*
pSlidingWindowInfo
,
int32_t
slot
)
{
return
&
pSlidingWindowInfo
->
pStatus
[
slot
];
}
static
bool
slidingWindowClosed
(
SSlidingWindowInfo
*
pSlidingWindowInfo
,
int32_t
slot
)
{
return
(
pSlidingWindowInfo
->
pStatus
[
slot
].
closed
==
true
);
}
static
int32_t
curSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindowInfo
)
{
assert
(
pSlidingWindowInfo
->
curIndex
>=
0
&&
pSlidingWindowInfo
->
curIndex
<
pSlidingWindowInfo
->
size
);
return
pSlidingWindowInfo
->
curIndex
;
}
// get the correct sliding window according to the handled timestamp
static
STimeWindow
getActiveSlidingWindow
(
SSlidingWindowInfo
*
pSlidingWindowInfo
,
int64_t
ts
,
SQuery
*
pQuery
)
{
STimeWindow
w
=
{
0
};
if
(
pSlidingWindowInfo
->
curIndex
==
-
1
)
{
// the first window, from the prevous stored value
w
.
skey
=
pSlidingWindowInfo
->
prevSKey
;
w
.
ekey
=
w
.
skey
+
pQuery
->
nAggTimeInterval
-
1
;
}
else
{
SWindowStatus
*
pStatus
=
getSlidingWindowStatus
(
pSlidingWindowInfo
,
curSlidingWindow
(
pSlidingWindowInfo
));
if
(
pStatus
->
window
.
skey
<=
ts
&&
pStatus
->
window
.
ekey
>=
ts
)
{
w
=
pStatus
->
window
;
}
else
{
int64_t
st
=
pStatus
->
window
.
skey
;
while
(
st
>
ts
)
{
st
-=
pQuery
->
slidingTime
;
}
while
((
st
+
pQuery
->
nAggTimeInterval
-
1
)
<
ts
)
{
st
+=
pQuery
->
slidingTime
;
}
w
.
skey
=
st
;
w
.
ekey
=
w
.
skey
+
pQuery
->
nAggTimeInterval
-
1
;
}
}
assert
(
ts
>=
w
.
skey
&&
ts
<=
w
.
ekey
);
return
w
;
}
}
static
int32_t
setGroupResultFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
)
{
static
int32_t
setGroupResultFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
)
{
...
@@ -1739,7 +1797,7 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData,
...
@@ -1739,7 +1797,7 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData,
return
-
1
;
return
-
1
;
}
}
SOutputRes
*
pOutputRes
=
getResWindow
(
&
pRuntimeEnv
->
swindowResInfo
,
pData
,
bytes
,
NULL
);
SOutputRes
*
pOutputRes
=
doSetSlidingWindowFromKey
(
&
pRuntimeEnv
->
swindowResInfo
,
pData
,
bytes
,
NULL
);
if
(
pOutputRes
==
NULL
)
{
if
(
pOutputRes
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -1750,17 +1808,20 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData,
...
@@ -1750,17 +1808,20 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData,
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
setSlidingWindowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
skey
,
int64_t
ekey
)
{
static
int32_t
setSlidingWindowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pTimeWindow
)
{
int64_t
st
=
skey
;
assert
(
pTimeWindow
->
skey
<
pTimeWindow
->
ekey
);
int64_t
st
=
pTimeWindow
->
skey
;
SWindowStatus
*
pStatus
=
NULL
;
SWindowStatus
*
pStatus
=
NULL
;
SOutputRes
*
pOutputRes
=
getResWindow
(
&
pRuntimeEnv
->
swindowResInfo
,
(
char
*
)
&
st
,
TSDB_KEYSIZE
,
&
pStatus
);
SOutputRes
*
pOutputRes
=
doSetSlidingWindowFromKey
(
&
pRuntimeEnv
->
swindowResInfo
,
(
char
*
)
&
st
,
TSDB_KEYSIZE
,
&
pStatus
);
if
(
pOutputRes
==
NULL
)
{
if
(
pOutputRes
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
pStatus
->
window
=
(
STimeWindow
){.
skey
=
skey
,
.
ekey
=
ekey
};
pStatus
->
window
=
*
pTimeWindow
;
setGroupOutputBuffer
(
pRuntimeEnv
,
pOutputRes
);
setGroupOutputBuffer
(
pRuntimeEnv
,
pOutputRes
);
initCtxOutputBuf
(
pRuntimeEnv
);
initCtxOutputBuf
(
pRuntimeEnv
);
...
@@ -1884,7 +1945,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
...
@@ -1884,7 +1945,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
bool
hasNull
=
hasNullVal
(
pQuery
,
k
,
pBlockInfo
,
pFields
,
isDiskFileBlock
);
bool
hasNull
=
hasNullVal
(
pQuery
,
k
,
pBlockInfo
,
pFields
,
isDiskFileBlock
);
char
*
dataBlock
=
getDataBlocks
(
pRuntimeEnv
,
&
sasArray
[
k
],
k
,
*
forwardStep
);
char
*
dataBlock
=
getDataBlocks
(
pRuntimeEnv
,
&
sasArray
[
k
],
k
,
*
forwardStep
);
TSKEY
ts
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pRuntimeEnv
->
interval
SKey
:
pRuntimeEnv
->
intervalEK
ey
;
TSKEY
ts
=
QUERY_IS_ASC_QUERY
(
pQuery
)
?
pRuntimeEnv
->
interval
Window
.
skey
:
pRuntimeEnv
->
intervalWindow
.
ek
ey
;
setExecParams
(
pQuery
,
&
pCtx
[
k
],
ts
,
dataBlock
,
(
char
*
)
primaryKeyCol
,
(
*
forwardStep
),
functionId
,
pFields
,
hasNull
,
setExecParams
(
pQuery
,
&
pCtx
[
k
],
ts
,
dataBlock
,
(
char
*
)
primaryKeyCol
,
(
*
forwardStep
),
functionId
,
pFields
,
hasNull
,
pRuntimeEnv
->
blockStatus
,
&
sasArray
[
k
],
pRuntimeEnv
->
scanFlag
);
pRuntimeEnv
->
blockStatus
,
&
sasArray
[
k
],
pRuntimeEnv
->
scanFlag
);
}
}
...
@@ -1936,34 +1997,11 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
...
@@ -1936,34 +1997,11 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
if
(
pQuery
->
slidingTime
>
0
&&
pQuery
->
nAggTimeInterval
>
0
)
{
if
(
pQuery
->
slidingTime
>
0
&&
pQuery
->
nAggTimeInterval
>
0
)
{
// decide the time window according to the primary timestamp
// decide the time window according to the primary timestamp
int64_t
ts
=
primaryKeyCol
[
offset
];
int64_t
ts
=
primaryKeyCol
[
offset
];
int64_t
wskey
=
0
;
int64_t
wekey
=
0
;
SSlidingWindowInfo
*
pSlidingWindowInfo
=
&
pRuntimeEnv
->
swindowResInfo
;
STimeWindow
win
=
getActiveSlidingWindow
(
pSlidingWindowInfo
,
ts
,
pQuery
);
if
(
pRuntimeEnv
->
swindowResInfo
.
curIndex
==
-
1
)
{
wskey
=
pRuntimeEnv
->
swindowResInfo
.
prevSKey
;
wekey
=
wskey
+
pQuery
->
nAggTimeInterval
-
1
;
}
else
{
SWindowStatus
*
pStatus
=
&
pRuntimeEnv
->
swindowResInfo
.
pStatus
[
pRuntimeEnv
->
swindowResInfo
.
curIndex
];
if
(
pStatus
->
window
.
skey
<=
ts
&&
pStatus
->
window
.
ekey
>=
ts
)
{
wskey
=
pStatus
->
window
.
skey
;
wekey
=
pStatus
->
window
.
ekey
;
}
else
{
int64_t
st
=
pStatus
->
window
.
skey
;
while
(
st
>
ts
)
{
st
-=
pQuery
->
slidingTime
;
}
while
((
st
+
pQuery
->
nAggTimeInterval
-
1
)
<
ts
)
{
st
+=
pQuery
->
slidingTime
;
}
wskey
=
st
;
wekey
=
wskey
+
pQuery
->
nAggTimeInterval
-
1
;
}
}
int32_t
ret
=
setSlidingWindowFromKey
(
pRuntimeEnv
,
wskey
,
wekey
);
int32_t
ret
=
setSlidingWindowFromKey
(
pRuntimeEnv
,
&
win
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
// null data, too many state code
continue
;
continue
;
}
}
...
@@ -1973,9 +2011,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
...
@@ -1973,9 +2011,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutputCols
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutputCols
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pBase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pBase
.
functionId
;
pCtx
[
k
].
nStartQueryTimestamp
=
wskey
;
pCtx
[
k
].
nStartQueryTimestamp
=
win
.
skey
;
SWindowStatus
*
pStatus
=
getSlidingWindowStatus
(
pSlidingWindowInfo
,
curSlidingWindow
(
pSlidingWindowInfo
));
SWindowStatus
*
pStatus
=
getCurrentSWindow
(
&
pRuntimeEnv
->
swindowResInfo
);
if
(
!
IS_MASTER_SCAN
(
pRuntimeEnv
)
&&
!
pStatus
->
closed
)
{
if
(
!
IS_MASTER_SCAN
(
pRuntimeEnv
)
&&
!
pStatus
->
closed
)
{
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
...
@@ -1990,25 +2029,27 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
...
@@ -1990,25 +2029,27 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
lastKey
=
ts
;
lastKey
=
ts
;
int32_t
index
=
pRuntimeEnv
->
swindowResInfo
.
curIndex
;
int32_t
index
=
pRuntimeEnv
->
swindowResInfo
.
curIndex
;
while
(
1
)
{
while
(
1
)
{
getNextLogicalQueryRange
(
pRuntimeEnv
,
&
wskey
,
&
wekey
);
STimeWindow
nextWin
=
{
0
};
if
(
pRuntimeEnv
->
swindowResInfo
.
startTime
>
wskey
||
(
wskey
>
pQuery
->
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
wskey
>
pQuery
->
skey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
getNextLogicalQueryRange
(
pRuntimeEnv
,
&
nextWin
);
if
(
pSlidingWindowInfo
->
startTime
>
nextWin
.
skey
||
(
nextWin
.
skey
>
pQuery
->
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
nextWin
.
skey
>
pQuery
->
skey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
pRuntimeEnv
->
swindowResInfo
.
curIndex
=
index
;
pRuntimeEnv
->
swindowResInfo
.
curIndex
=
index
;
break
;
break
;
}
}
if
(
ts
>=
wskey
&&
ts
<=
w
ekey
)
{
if
(
ts
>=
nextWin
.
skey
&&
ts
<=
nextWin
.
ekey
)
{
// null data, failed to allocate more memory buffer
// null data, failed to allocate more memory buffer
if
(
setSlidingWindowFromKey
(
pRuntimeEnv
,
wskey
,
wekey
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
setSlidingWindowFromKey
(
pRuntimeEnv
,
&
nextWin
)
!=
TSDB_CODE_SUCCESS
)
{
pRuntimeEnv
->
swindowResInfo
.
curIndex
=
index
;
pRuntimeEnv
->
swindowResInfo
.
curIndex
=
index
;
break
;
break
;
}
}
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutputCols
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
pQuery
->
numOfOutputCols
;
++
k
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pBase
.
functionId
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
k
].
pBase
.
functionId
;
pCtx
[
k
].
nStartQueryTimestamp
=
w
skey
;
pCtx
[
k
].
nStartQueryTimestamp
=
nextWin
.
skey
;
SWindowStatus
*
pStatus
=
get
CurrentSWindow
(
&
pRuntimeEnv
->
swindowResInfo
);
SWindowStatus
*
pStatus
=
get
SlidingWindowStatus
(
pSlidingWindowInfo
,
curSlidingWindow
(
pSlidingWindowInfo
)
);
if
(
!
IS_MASTER_SCAN
(
pRuntimeEnv
)
&&
!
pStatus
->
closed
)
{
if
(
!
IS_MASTER_SCAN
(
pRuntimeEnv
)
&&
!
pStatus
->
closed
)
{
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
...
@@ -2068,42 +2109,39 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
...
@@ -2068,42 +2109,39 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
free
(
sasArray
);
free
(
sasArray
);
if
(
pQuery
->
slidingTime
>
0
&&
pQuery
->
nAggTimeInterval
>
0
&&
IS_MASTER_SCAN
(
pRuntimeEnv
))
{
if
(
pQuery
->
slidingTime
>
0
&&
pQuery
->
nAggTimeInterval
>
0
&&
IS_MASTER_SCAN
(
pRuntimeEnv
))
{
SSlidingWindow
ResInfo
*
pWindowRes
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
SSlidingWindow
Info
*
pSlidingWindow
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
// query completed
// query completed
if
(
lastKey
>=
pQuery
->
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
)
||
if
(
lastKey
>=
pQuery
->
ekey
&&
QUERY_IS_ASC_QUERY
(
pQuery
)
||
(
lastKey
<=
pQuery
->
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
(
lastKey
<=
pQuery
->
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
closeAllSlidingWindow
(
pSlidingWindowInfo
);
SWindowStatus
*
pStatus
=
&
pWindowResInfo
->
pStatus
[
i
];
pStatus
->
closed
=
true
;
pSlidingWindowInfo
->
curIndex
=
pSlidingWindowInfo
->
size
-
1
;
}
pWindowResInfo
->
curIndex
=
pWindowResInfo
->
size
-
1
;
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
|
QUERY_RESBUF_FULL
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
|
QUERY_RESBUF_FULL
);
}
else
{
}
else
{
int32_t
i
=
0
;
int32_t
i
=
0
;
int64_t
skey
=
0
;
int64_t
skey
=
0
;
for
(
i
=
0
;
i
<
p
WindowRes
Info
->
size
;
++
i
)
{
for
(
i
=
0
;
i
<
p
SlidingWindow
Info
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
p
WindowRes
Info
->
pStatus
[
i
];
SWindowStatus
*
pStatus
=
&
p
SlidingWindow
Info
->
pStatus
[
i
];
if
((
pStatus
->
window
.
ekey
<=
lastKey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
if
((
pStatus
->
window
.
ekey
<=
lastKey
&&
QUERY_IS_ASC_QUERY
(
pQuery
))
||
(
pStatus
->
window
.
skey
>=
lastKey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
(
pStatus
->
window
.
skey
>=
lastKey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)))
{
pStatus
->
closed
=
true
;
closeSlidingWindow
(
pSlidingWindowInfo
,
i
)
;
}
else
{
}
else
{
skey
=
pStatus
->
window
.
skey
;
skey
=
pStatus
->
window
.
skey
;
break
;
break
;
}
}
}
}
p
WindowRes
Info
->
prevSKey
=
skey
;
p
SlidingWindow
Info
->
prevSKey
=
skey
;
// the number of completed slots are larger than the threshold, dump to client immediately.
// the number of completed slots are larger than the threshold, dump to client immediately.
int32_t
v
=
numOf
ResFromResWindowInfo
(
pWindowRes
Info
);
int32_t
v
=
numOf
ClosedSlidingWindow
(
pSlidingWindow
Info
);
if
(
v
>
p
WindowRes
Info
->
threshold
)
{
if
(
v
>
p
SlidingWindow
Info
->
threshold
)
{
setQueryStatus
(
pQuery
,
QUERY_RESBUF_FULL
);
setQueryStatus
(
pQuery
,
QUERY_RESBUF_FULL
);
}
}
dTrace
(
"QInfo:%p total window:%d, closed:%d"
,
GET_QINFO_ADDR
(
pQuery
),
p
WindowRes
Info
->
size
,
v
);
dTrace
(
"QInfo:%p total window:%d, closed:%d"
,
GET_QINFO_ADDR
(
pQuery
),
p
SlidingWindow
Info
->
size
,
v
);
}
}
}
}
...
@@ -2651,11 +2689,6 @@ bool isFixedOutputQuery(SQuery *pQuery) {
...
@@ -2651,11 +2689,6 @@ bool isFixedOutputQuery(SQuery *pQuery) {
continue
;
continue
;
}
}
// // ignore the group by + projection combination
// if (pExprMsg->functionId == TSDB_FUNC_PRJ && isGroupbyNormalCol(pQuery)) {
// continue;
// }
if
(
!
IS_MULTIOUTPUT
(
aAggs
[
pExprMsg
->
functionId
].
nStatus
))
{
if
(
!
IS_MULTIOUTPUT
(
aAggs
[
pExprMsg
->
functionId
].
nStatus
))
{
return
true
;
return
true
;
}
}
...
@@ -3049,20 +3082,18 @@ static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY ke
...
@@ -3049,20 +3082,18 @@ static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY ke
pQuery
->
skey
=
skey1
;
pQuery
->
skey
=
skey1
;
pQuery
->
ekey
=
ekey1
;
pQuery
->
ekey
=
ekey1
;
pRuntimeEnv
->
intervalSKey
=
windowSKey
;
pRuntimeEnv
->
intervalWindow
=
(
STimeWindow
)
{.
skey
=
windowSKey
,
.
ekey
=
windowEKey
};
pRuntimeEnv
->
intervalEKey
=
windowEKey
;
assert
(
pQuery
->
skey
<=
pQuery
->
ekey
&&
assert
(
pQuery
->
skey
<=
pQuery
->
ekey
&&
pRuntimeEnv
->
interval
SKey
+
(
pQuery
->
nAggTimeInterval
-
1
)
==
pRuntimeEnv
->
intervalEK
ey
);
pRuntimeEnv
->
interval
Window
.
skey
+
(
pQuery
->
nAggTimeInterval
-
1
)
==
pRuntimeEnv
->
intervalWindow
.
ek
ey
);
}
else
{
}
else
{
pQuery
->
skey
=
ekey1
;
pQuery
->
skey
=
ekey1
;
pQuery
->
ekey
=
skey1
;
pQuery
->
ekey
=
skey1
;
pRuntimeEnv
->
intervalSKey
=
windowEKey
;
pRuntimeEnv
->
intervalWindow
=
(
STimeWindow
)
{.
skey
=
windowEKey
,
.
ekey
=
windowSKey
};
pRuntimeEnv
->
intervalEKey
=
windowSKey
;
assert
(
pQuery
->
skey
>=
pQuery
->
ekey
&&
assert
(
pQuery
->
skey
>=
pQuery
->
ekey
&&
pRuntimeEnv
->
interval
SKey
-
(
pQuery
->
nAggTimeInterval
-
1
)
==
pRuntimeEnv
->
intervalEK
ey
);
pRuntimeEnv
->
interval
Window
.
skey
-
(
pQuery
->
nAggTimeInterval
-
1
)
==
pRuntimeEnv
->
intervalWindow
.
ek
ey
);
}
}
pQuery
->
lastKey
=
pQuery
->
skey
;
pQuery
->
lastKey
=
pQuery
->
skey
;
...
@@ -4885,13 +4916,13 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
...
@@ -4885,13 +4916,13 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
}
}
}
}
static
void
getNextLogicalQueryRange
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
*
skey
,
int64_t
*
ekey
)
{
static
void
getNextLogicalQueryRange
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
STimeWindow
*
pTimeWindow
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
factor
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
int32_t
factor
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
*
skey
+=
(
pQuery
->
slidingTime
*
factor
);
pTimeWindow
->
skey
+=
(
pQuery
->
slidingTime
*
factor
);
*
ekey
+=
(
pQuery
->
slidingTime
*
factor
);
pTimeWindow
->
ekey
+=
(
pQuery
->
slidingTime
*
factor
);
}
}
static
int64_t
doScanAllDataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
static
int64_t
doScanAllDataBlocks
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
...
@@ -5606,15 +5637,15 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
...
@@ -5606,15 +5637,15 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) {
pRuntimeEnv
->
pCtx
[
i
].
order
=
(
pRuntimeEnv
->
pCtx
[
i
].
order
)
^
1
;
pRuntimeEnv
->
pCtx
[
i
].
order
=
(
pRuntimeEnv
->
pCtx
[
i
].
order
)
^
1
;
}
}
SSlidingWindow
ResInfo
*
pWindowRes
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
SSlidingWindow
Info
*
pSlidingWindow
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
for
(
int32_t
i
=
0
;
i
<
p
WindowRes
Info
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SlidingWindow
Info
->
size
;
++
i
)
{
SWindowStatus
*
pStatus
=
&
p
WindowRes
Info
->
pStatus
[
i
];
SWindowStatus
*
pStatus
=
&
p
SlidingWindow
Info
->
pStatus
[
i
];
if
(
!
pStatus
->
closed
)
{
if
(
!
pStatus
->
closed
)
{
continue
;
continue
;
}
}
SOutputRes
*
buf
=
&
p
WindowRes
Info
->
pResult
[
i
];
SOutputRes
*
buf
=
&
p
SlidingWindow
Info
->
pResult
[
i
];
// open/close the specified query for each group result
// open/close the specified query for each group result
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
...
@@ -5748,7 +5779,6 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -5748,7 +5779,6 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
void
forwardCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
output
)
{
void
forwardCtxOutputBuf
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
int64_t
output
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// reset the execution contexts
// reset the execution contexts
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
...
@@ -5860,13 +5890,13 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus
...
@@ -5860,13 +5890,13 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus
pQuery
->
lastKey
=
pQuery
->
skey
;
pQuery
->
lastKey
=
pQuery
->
skey
;
pRuntimeEnv
->
startPos
=
pRuntimeEnv
->
endPos
;
pRuntimeEnv
->
startPos
=
pRuntimeEnv
->
endPos
;
SWAP
(
pRuntimeEnv
->
interval
SKey
,
pRuntimeEnv
->
intervalEK
ey
,
TSKEY
);
SWAP
(
pRuntimeEnv
->
interval
Window
.
skey
,
pRuntimeEnv
->
intervalWindow
.
ek
ey
,
TSKEY
);
}
}
static
void
queryStatusRestore
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQueryStatus
*
pStatus
)
{
static
void
queryStatusRestore
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SQueryStatus
*
pStatus
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SWAP
(
pQuery
->
skey
,
pQuery
->
ekey
,
TSKEY
);
SWAP
(
pQuery
->
skey
,
pQuery
->
ekey
,
TSKEY
);
SWAP
(
pRuntimeEnv
->
interval
SKey
,
pRuntimeEnv
->
intervalEK
ey
,
TSKEY
);
SWAP
(
pRuntimeEnv
->
interval
Window
.
skey
,
pRuntimeEnv
->
intervalWindow
.
ek
ey
,
TSKEY
);
pQuery
->
lastKey
=
pStatus
->
lastKey
;
pQuery
->
lastKey
=
pStatus
->
lastKey
;
pQuery
->
skey
=
pStatus
->
skey
;
pQuery
->
skey
=
pStatus
->
skey
;
...
@@ -5940,12 +5970,12 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -5940,12 +5970,12 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
nAggTimeInterval
>
0
&&
pQuery
->
slidingTime
>
0
))
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
nAggTimeInterval
>
0
&&
pQuery
->
slidingTime
>
0
))
{
// for each group result, call the finalize function for each column
// for each group result, call the finalize function for each column
SSlidingWindow
ResInfo
*
pWindowRes
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
SSlidingWindow
Info
*
pSlidingWindow
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
for
(
int32_t
i
=
0
;
i
<
p
WindowRes
Info
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SlidingWindow
Info
->
size
;
++
i
)
{
SOutputRes
*
buf
=
&
p
WindowRes
Info
->
pResult
[
i
];
SOutputRes
*
buf
=
&
p
SlidingWindow
Info
->
pResult
[
i
];
SWindowStatus
*
pStatus
=
&
p
WindowRes
Info
->
pStatus
[
i
];
SWindowStatus
*
pStatus
=
&
p
SlidingWindow
Info
->
pStatus
[
i
];
if
(
!
pStatus
->
closed
)
{
if
(
!
pStatus
->
closed
)
{
continue
;
continue
;
}
}
...
@@ -6002,17 +6032,14 @@ void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
...
@@ -6002,17 +6032,14 @@ void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) {
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
nAggTimeInterval
>
0
&&
pQuery
->
slidingTime
>
0
))
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
nAggTimeInterval
>
0
&&
pQuery
->
slidingTime
>
0
))
{
// for each group result, call the finalize function for each column
// for each group result, call the finalize function for each column
SSlidingWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
swindowResInfo
;
SSlidingWindowInfo
*
pSlidingWindowInfo
=
&
pRuntimeEnv
->
swindowResInfo
;
bool
groupbyCol
=
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
);
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
closeAllSlidingWindow
(
pSlidingWindowInfo
);
}
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pSlidingWindowInfo
->
size
;
++
i
)
{
SOutputRes
*
buf
=
&
pWindowResInfo
->
pResult
[
i
];
SOutputRes
*
buf
=
&
pSlidingWindowInfo
->
pResult
[
i
];
SWindowStatus
*
pStatus
=
&
pWindowResInfo
->
pStatus
[
i
];
if
(
!
slidingWindowClosed
(
pSlidingWindowInfo
,
i
))
{
if
(
groupbyCol
)
{
pStatus
->
closed
=
true
;
}
if
(
!
pStatus
->
closed
)
{
continue
;
continue
;
}
}
...
@@ -6079,12 +6106,9 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue
...
@@ -6079,12 +6106,9 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
factor
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
int32_t
factor
=
GET_FORWARD_DIRECTION_FACTOR
(
pQuery
->
order
.
order
);
printf
(
"------------------------%lld, %lld
\n
"
,
pQuery
->
skey
,
pQuery
->
ekey
);
*
skey
=
pRuntimeEnv
->
intervalSKey
+
(
pQuery
->
slidingTime
*
factor
);
*
ekey
=
pRuntimeEnv
->
intervalEKey
+
(
pQuery
->
slidingTime
*
factor
);
printf
(
"new window:%lld, %lld
\n
"
,
*
skey
,
*
ekey
);
*
skey
=
pRuntimeEnv
->
intervalWindow
.
skey
+
(
pQuery
->
slidingTime
*
factor
);
*
ekey
=
pRuntimeEnv
->
intervalWindow
.
ekey
+
(
pQuery
->
slidingTime
*
factor
);
if
(
pQuery
->
slidingTime
>
0
)
{
if
(
pQuery
->
slidingTime
>
0
)
{
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
if
(
QUERY_IS_ASC_QUERY
(
pQuery
))
{
...
@@ -6095,8 +6119,6 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue
...
@@ -6095,8 +6119,6 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue
if
(
*
skey
>
pSupporter
->
rawEKey
)
{
if
(
*
skey
>
pSupporter
->
rawEKey
)
{
return
QUERY_COMPLETED
;
return
QUERY_COMPLETED
;
// setQueryStatus(pQuery, QUERY_COMPLETED);
// return;
}
}
if
(
*
ekey
>
pSupporter
->
rawEKey
)
{
if
(
*
ekey
>
pSupporter
->
rawEKey
)
{
...
@@ -6108,7 +6130,6 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue
...
@@ -6108,7 +6130,6 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue
}
}
if
(
*
skey
<
pSupporter
->
rawEKey
)
{
if
(
*
skey
<
pSupporter
->
rawEKey
)
{
// setQueryStatus(pQuery, QUERY_COMPLETED);
return
QUERY_COMPLETED
;
return
QUERY_COMPLETED
;
}
}
...
@@ -6143,7 +6164,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE
...
@@ -6143,7 +6164,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE
return
;
return
;
}
}
getNextLogicalQueryRange
(
pRuntimeEnv
,
&
pRuntimeEnv
->
interval
SKey
,
&
pRuntimeEnv
->
intervalEKey
);
getNextLogicalQueryRange
(
pRuntimeEnv
,
&
pRuntimeEnv
->
interval
Window
);
/* ensure the search in cache will return right position */
/* ensure the search in cache will return right position */
pQuery
->
lastKey
=
pQuery
->
skey
;
pQuery
->
lastKey
=
pQuery
->
skey
;
...
@@ -7472,7 +7493,7 @@ static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) {
...
@@ -7472,7 +7493,7 @@ static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) {
int32_t
totalSubset
=
0
;
int32_t
totalSubset
=
0
;
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
nAggTimeInterval
>
0
&&
pQuery
->
slidingTime
>
0
))
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
nAggTimeInterval
>
0
&&
pQuery
->
slidingTime
>
0
))
{
totalSubset
=
numOf
ResFromResWindowInfo
(
&
pSupporter
->
runtimeEnv
.
swindowResInfo
);
totalSubset
=
numOf
ClosedSlidingWindow
(
&
pSupporter
->
runtimeEnv
.
swindowResInfo
);
}
else
{
}
else
{
totalSubset
=
pSupporter
->
pSidSet
->
numOfSubSet
;
totalSubset
=
pSupporter
->
pSidSet
->
numOfSubSet
;
}
}
...
...
src/system/detail/src/vnodeQueryProcess.c
浏览文件 @
c303ff2a
...
@@ -88,22 +88,10 @@ static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, b
...
@@ -88,22 +88,10 @@ static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, b
static
void
enableExecutionForNextTable
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
static
void
enableExecutionForNextTable
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// enable execution for next table
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutputCols
;
++
i
)
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
||
(
pQuery
->
nAggTimeInterval
>
0
&&
pQuery
->
slidingTime
>
0
))
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
&
pRuntimeEnv
->
pCtx
[
i
]);
SSlidingWindowResInfo
*
pWindowResInfo
=
&
pRuntimeEnv
->
swindowResInfo
;
if
(
pResInfo
!=
NULL
)
{
pResInfo
->
complete
=
false
;
for
(
int32_t
i
=
0
;
i
<
pWindowResInfo
->
size
;
++
i
)
{
SOutputRes
*
buf
=
&
pWindowResInfo
->
pResult
[
i
];
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
buf
->
resultInfo
[
j
].
complete
=
false
;
}
}
}
else
{
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutputCols
;
++
i
)
{
SResultInfo
*
pResInfo
=
GET_RES_INFO
(
&
pRuntimeEnv
->
pCtx
[
i
]);
if
(
pResInfo
!=
NULL
)
{
pResInfo
->
complete
=
false
;
}
}
}
}
}
}
}
...
@@ -535,6 +523,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
...
@@ -535,6 +523,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
}
}
}
}
initCtxOutputBuf
(
pRuntimeEnv
);
return
true
;
return
true
;
}
}
...
@@ -572,13 +561,8 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
...
@@ -572,13 +561,8 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
vnodeScanAllData
(
pRuntimeEnv
);
vnodeScanAllData
(
pRuntimeEnv
);
// enable execution for next table
enableExecutionForNextTable
(
pRuntimeEnv
);
// first/last_row query, do not invoke the finalize for super table query
// first/last_row query, do not invoke the finalize for super table query
if
(
!
isFirstLastRowQuery
(
pQuery
))
{
doFinalizeResult
(
pRuntimeEnv
);
doFinalizeResult
(
pRuntimeEnv
);
}
int64_t
numOfRes
=
getNumOfResult
(
pRuntimeEnv
);
int64_t
numOfRes
=
getNumOfResult
(
pRuntimeEnv
);
assert
(
numOfRes
==
1
||
numOfRes
==
0
);
assert
(
numOfRes
==
1
||
numOfRes
==
0
);
...
@@ -592,7 +576,14 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
...
@@ -592,7 +576,14 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start
return
numOfRes
;
return
numOfRes
;
}
}
static
void
vnodeMultiMeterMultiOutputProcessor
(
SQInfo
*
pQInfo
)
{
/**
* super table query handler
* 1. super table projection query, group-by on normal columns query, ts-comp query
* 2. point interpolation query, last row query
*
* @param pQInfo
*/
static
void
vnodeSTableSeqProcessor
(
SQInfo
*
pQInfo
)
{
SMeterQuerySupportObj
*
pSupporter
=
pQInfo
->
pMeterQuerySupporter
;
SMeterQuerySupportObj
*
pSupporter
=
pQInfo
->
pMeterQuerySupporter
;
SMeterSidExtInfo
**
pMeterSidExtInfo
=
pSupporter
->
pMeterSidExtInfo
;
SMeterSidExtInfo
**
pMeterSidExtInfo
=
pSupporter
->
pMeterSidExtInfo
;
...
@@ -601,8 +592,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -601,8 +592,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
SQuery
*
pQuery
=
&
pQInfo
->
query
;
SQuery
*
pQuery
=
&
pQInfo
->
query
;
tSidSet
*
pSids
=
pSupporter
->
pSidSet
;
tSidSet
*
pSids
=
pSupporter
->
pSidSet
;
SMeterObj
*
pOneMeter
=
getMeterObj
(
pSupporter
->
pMetersHashTable
,
pMeterSidExtInfo
[
0
]
->
sid
)
;
int32_t
vid
=
getMeterObj
(
pSupporter
->
pMetersHashTable
,
pMeterSidExtInfo
[
0
]
->
sid
)
->
vnode
;
resetCtxOutputBuf
(
pRuntimeEnv
);
resetCtxOutputBuf
(
pRuntimeEnv
);
if
(
isPointInterpoQuery
(
pQuery
))
{
if
(
isPointInterpoQuery
(
pQuery
))
{
...
@@ -613,7 +604,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -613,7 +604,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
int32_t
end
=
pSids
->
starterPos
[
pSupporter
->
subgroupIdx
+
1
]
-
1
;
int32_t
end
=
pSids
->
starterPos
[
pSupporter
->
subgroupIdx
+
1
]
-
1
;
if
(
isFirstLastRowQuery
(
pQuery
))
{
if
(
isFirstLastRowQuery
(
pQuery
))
{
dTrace
(
"QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d"
,
pQInfo
,
pOneMeter
->
vnode
,
dTrace
(
"QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d"
,
pQInfo
,
vid
,
pSids
->
numOfSubSet
,
pSupporter
->
subgroupIdx
);
pSids
->
numOfSubSet
,
pSupporter
->
subgroupIdx
);
TSKEY
key
=
-
1
;
TSKEY
key
=
-
1
;
...
@@ -646,7 +637,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -646,7 +637,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
int64_t
num
=
doCheckMetersInGroup
(
pQInfo
,
index
,
start
);
int64_t
num
=
doCheckMetersInGroup
(
pQInfo
,
index
,
start
);
assert
(
num
>=
0
);
assert
(
num
>=
0
);
}
else
{
}
else
{
dTrace
(
"QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d"
,
pQInfo
,
pOneMeter
->
vnode
,
dTrace
(
"QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d"
,
pQInfo
,
vid
,
pSids
->
numOfSubSet
,
pSupporter
->
subgroupIdx
);
pSids
->
numOfSubSet
,
pSupporter
->
subgroupIdx
);
for
(
int32_t
k
=
start
;
k
<=
end
;
++
k
)
{
for
(
int32_t
k
=
start
;
k
<=
end
;
++
k
)
{
...
@@ -673,7 +664,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -673,7 +664,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
}
}
}
}
}
else
{
}
else
{
// this procedure treats all tables as single group
/*
* 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query
*/
assert
(
pSupporter
->
meterIdx
>=
0
);
assert
(
pSupporter
->
meterIdx
>=
0
);
/*
/*
...
@@ -693,9 +686,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -693,9 +686,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
return
;
return
;
}
}
reset
Res
WindowInfo
(
&
pRuntimeEnv
->
swindowResInfo
,
pQuery
->
numOfOutputCols
);
reset
Sliding
WindowInfo
(
&
pRuntimeEnv
->
swindowResInfo
,
pQuery
->
numOfOutputCols
);
while
(
pSupporter
->
meterIdx
<
pSupporter
->
numOfMeters
)
{
while
(
pSupporter
->
meterIdx
<
pSupporter
->
numOfMeters
)
{
int32_t
k
=
pSupporter
->
meterIdx
;
int32_t
k
=
pSupporter
->
meterIdx
;
if
(
isQueryKilled
(
pQuery
))
{
if
(
isQueryKilled
(
pQuery
))
{
...
@@ -752,7 +745,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -752,7 +745,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
break
;
break
;
}
}
// enable execution for next table
// enable execution for next table
, when handling the projection query
enableExecutionForNextTable
(
pRuntimeEnv
);
enableExecutionForNextTable
(
pRuntimeEnv
);
if
(
Q_STATUS_EQUAL
(
pQuery
->
over
,
QUERY_NO_DATA_TO_CHECK
|
QUERY_COMPLETED
))
{
if
(
Q_STATUS_EQUAL
(
pQuery
->
over
,
QUERY_NO_DATA_TO_CHECK
|
QUERY_COMPLETED
))
{
...
@@ -772,8 +765,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -772,8 +765,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
break
;
break
;
}
}
}
else
{
}
else
{
// forward query range
// forward query range
pQuery
->
skey
=
pQuery
->
lastKey
;
pQuery
->
skey
=
pQuery
->
lastKey
;
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
...
@@ -789,7 +781,18 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -789,7 +781,18 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
}
}
}
}
if
(
!
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
&&
!
isFirstLastRowQuery
(
pQuery
))
{
/*
* 1. super table projection query, group-by on normal columns query, ts-comp query
* 2. point interpolation query, last row query
*
* group-by on normal columns query and last_row query do NOT invoke the finalizer here,
* since the finalize stage will be done at the client side.
*
* projection query, point interpolation query do not need the finalizer.
*
* Only the ts-comp query requires the finalizer function to be executed here.
*/
if
(
isTSCompQuery
(
pQuery
))
{
doFinalizeResult
(
pRuntimeEnv
);
doFinalizeResult
(
pRuntimeEnv
);
}
}
...
@@ -799,11 +802,11 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -799,11 +802,11 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
// todo refactor
// todo refactor
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
SSlidingWindow
ResInfo
*
pWindowRes
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
SSlidingWindow
Info
*
pSlidingWindow
Info
=
&
pRuntimeEnv
->
swindowResInfo
;
for
(
int32_t
i
=
0
;
i
<
p
WindowRes
Info
->
size
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
p
SlidingWindow
Info
->
size
;
++
i
)
{
SOutputRes
*
buf
=
&
p
WindowRes
Info
->
pResult
[
i
];
SOutputRes
*
buf
=
&
p
SlidingWindow
Info
->
pResult
[
i
];
p
WindowRes
Info
->
pStatus
[
i
].
closed
=
true
;
// enable return all results for group by normal columns
p
SlidingWindow
Info
->
pStatus
[
i
].
closed
=
true
;
// enable return all results for group by normal columns
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutputCols
;
++
j
)
{
buf
->
numOfRows
=
MAX
(
buf
->
numOfRows
,
buf
->
resultInfo
[
j
].
numOfRes
);
buf
->
numOfRows
=
MAX
(
buf
->
numOfRows
,
buf
->
resultInfo
[
j
].
numOfRes
);
...
@@ -812,7 +815,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -812,7 +815,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
pQInfo
->
pMeterQuerySupporter
->
subgroupIdx
=
0
;
pQInfo
->
pMeterQuerySupporter
->
subgroupIdx
=
0
;
pQuery
->
pointsRead
=
0
;
pQuery
->
pointsRead
=
0
;
copyFromGroupBuf
(
pQInfo
,
p
WindowRes
Info
->
pResult
);
copyFromGroupBuf
(
pQInfo
,
p
SlidingWindow
Info
->
pResult
);
}
}
pQInfo
->
pointsRead
+=
pQuery
->
pointsRead
;
pQInfo
->
pointsRead
+=
pQuery
->
pointsRead
;
...
@@ -821,7 +824,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
...
@@ -821,7 +824,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
dTrace
(
dTrace
(
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d,"
"next skey:%"
PRId64
", offset:%"
PRId64
,
"next skey:%"
PRId64
", offset:%"
PRId64
,
pQInfo
,
pOneMeter
->
vnode
,
pSids
->
numOfSids
,
pSupporter
->
meterIdx
,
pSids
->
numOfSubSet
,
pQuery
->
pointsRead
,
pQInfo
,
vid
,
pSids
->
numOfSids
,
pSupporter
->
meterIdx
,
pSids
->
numOfSubSet
,
pQuery
->
pointsRead
,
pQInfo
->
pointsRead
,
pQInfo
->
pointsReturned
,
pQuery
->
skey
,
pQuery
->
limit
.
offset
);
pQInfo
->
pointsRead
,
pQInfo
->
pointsReturned
,
pQuery
->
skey
,
pQuery
->
limit
.
offset
);
}
}
...
@@ -979,7 +982,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
...
@@ -979,7 +982,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
* select count(*) from table_name group by status_column;
*/
*/
static
void
vnodeSingle
Meter
FixedOutputProcessor
(
SQInfo
*
pQInfo
)
{
static
void
vnodeSingle
Table
FixedOutputProcessor
(
SQInfo
*
pQInfo
)
{
SQuery
*
pQuery
=
&
pQInfo
->
query
;
SQuery
*
pQuery
=
&
pQInfo
->
query
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
pMeterQuerySupporter
->
runtimeEnv
;
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
pMeterQuerySupporter
->
runtimeEnv
;
...
@@ -1002,19 +1005,13 @@ static void vnodeSingleMeterFixedOutputProcessor(SQInfo *pQInfo) {
...
@@ -1002,19 +1005,13 @@ static void vnodeSingleMeterFixedOutputProcessor(SQInfo *pQInfo) {
assert
(
isTopBottomQuery
(
pQuery
));
assert
(
isTopBottomQuery
(
pQuery
));
}
}
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
pQInfo
->
pMeterQuerySupporter
->
subgroupIdx
=
0
;
pQuery
->
pointsRead
=
0
;
copyFromGroupBuf
(
pQInfo
,
pRuntimeEnv
->
swindowResInfo
.
pResult
);
}
doSkipResults
(
pRuntimeEnv
);
doSkipResults
(
pRuntimeEnv
);
doRevisedResultsByLimit
(
pQInfo
);
doRevisedResultsByLimit
(
pQInfo
);
pQInfo
->
pointsRead
=
pQuery
->
pointsRead
;
pQInfo
->
pointsRead
=
pQuery
->
pointsRead
;
}
}
static
void
vnodeSingle
Meter
MultiOutputProcessor
(
SQInfo
*
pQInfo
)
{
static
void
vnodeSingle
Table
MultiOutputProcessor
(
SQInfo
*
pQInfo
)
{
SQuery
*
pQuery
=
&
pQInfo
->
query
;
SQuery
*
pQuery
=
&
pQInfo
->
query
;
SMeterObj
*
pMeterObj
=
pQInfo
->
pObj
;
SMeterObj
*
pMeterObj
=
pQInfo
->
pObj
;
...
@@ -1083,7 +1080,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
...
@@ -1083,7 +1080,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
(
pQuery
->
skey
>=
pQuery
->
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)));
(
pQuery
->
skey
>=
pQuery
->
ekey
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)));
initCtxOutputBuf
(
pRuntimeEnv
);
initCtxOutputBuf
(
pRuntimeEnv
);
clearCompleted
Res
Windows
(
&
pRuntimeEnv
->
swindowResInfo
,
pQuery
->
numOfOutputCols
);
clearCompleted
Sliding
Windows
(
&
pRuntimeEnv
->
swindowResInfo
,
pQuery
->
numOfOutputCols
);
vnodeScanAllData
(
pRuntimeEnv
);
vnodeScanAllData
(
pRuntimeEnv
);
if
(
isQueryKilled
(
pQuery
))
{
if
(
isQueryKilled
(
pQuery
))
{
...
@@ -1133,7 +1130,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
...
@@ -1133,7 +1130,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
}
}
/* handle time interval query on single table */
/* handle time interval query on single table */
static
void
vnodeSingle
Meter
IntervalProcessor
(
SQInfo
*
pQInfo
)
{
static
void
vnodeSingle
Table
IntervalProcessor
(
SQInfo
*
pQInfo
)
{
SQuery
*
pQuery
=
&
(
pQInfo
->
query
);
SQuery
*
pQuery
=
&
(
pQInfo
->
query
);
SMeterObj
*
pMeterObj
=
pQInfo
->
pObj
;
SMeterObj
*
pMeterObj
=
pQInfo
->
pObj
;
...
@@ -1187,7 +1184,7 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) {
...
@@ -1187,7 +1184,7 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) {
pQInfo
->
pointsRead
-
pQInfo
->
pointsInterpo
,
pQInfo
->
pointsInterpo
,
pQInfo
->
pointsReturned
);
pQInfo
->
pointsRead
-
pQInfo
->
pointsInterpo
,
pQInfo
->
pointsInterpo
,
pQInfo
->
pointsReturned
);
}
}
void
vnodeSingle
Meter
Query
(
SSchedMsg
*
pMsg
)
{
void
vnodeSingle
Table
Query
(
SSchedMsg
*
pMsg
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
pMsg
->
ahandle
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
pMsg
->
ahandle
;
if
(
pQInfo
==
NULL
||
pQInfo
->
pMeterQuerySupporter
==
NULL
)
{
if
(
pQInfo
==
NULL
||
pQInfo
->
pMeterQuerySupporter
==
NULL
)
{
...
@@ -1280,16 +1277,17 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
...
@@ -1280,16 +1277,17 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
int64_t
st
=
taosGetTimestampUs
();
int64_t
st
=
taosGetTimestampUs
();
if
(
pQuery
->
nAggTimeInterval
!=
0
)
{
// interval (down sampling operation)
// group by normal column, sliding window query, interval query are handled by interval query processor
if
(
pQuery
->
nAggTimeInterval
!=
0
||
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
))
{
// interval (down sampling operation)
assert
(
pQuery
->
checkBufferInLoop
==
0
&&
pQuery
->
pointsOffset
==
pQuery
->
pointsToRead
);
assert
(
pQuery
->
checkBufferInLoop
==
0
&&
pQuery
->
pointsOffset
==
pQuery
->
pointsToRead
);
vnodeSingle
Meter
IntervalProcessor
(
pQInfo
);
vnodeSingle
Table
IntervalProcessor
(
pQInfo
);
}
else
{
}
else
{
if
(
isFixedOutputQuery
(
pQuery
))
{
if
(
isFixedOutputQuery
(
pQuery
))
{
assert
(
pQuery
->
checkBufferInLoop
==
0
);
assert
(
pQuery
->
checkBufferInLoop
==
0
);
vnodeSingle
Meter
FixedOutputProcessor
(
pQInfo
);
vnodeSingle
Table
FixedOutputProcessor
(
pQInfo
);
}
else
{
// diff/add/multiply/subtract/division
}
else
{
// diff/add/multiply/subtract/division
assert
(
pQuery
->
checkBufferInLoop
==
1
);
assert
(
pQuery
->
checkBufferInLoop
==
1
);
vnodeSingle
Meter
MultiOutputProcessor
(
pQInfo
);
vnodeSingle
Table
MultiOutputProcessor
(
pQInfo
);
}
}
}
}
...
@@ -1336,7 +1334,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
...
@@ -1336,7 +1334,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
assert
((
pQuery
->
checkBufferInLoop
==
1
&&
pQuery
->
nAggTimeInterval
==
0
)
||
isPointInterpoQuery
(
pQuery
)
||
assert
((
pQuery
->
checkBufferInLoop
==
1
&&
pQuery
->
nAggTimeInterval
==
0
)
||
isPointInterpoQuery
(
pQuery
)
||
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
));
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
));
vnode
MultiMeterMultiOutput
Processor
(
pQInfo
);
vnode
STableSeq
Processor
(
pQInfo
);
}
}
/* record the total elapsed time */
/* record the total elapsed time */
...
...
src/system/detail/src/vnodeRead.c
浏览文件 @
c303ff2a
...
@@ -673,7 +673,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
...
@@ -673,7 +673,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
return
pQInfo
;
return
pQInfo
;
}
}
schedMsg
.
fp
=
vnodeSingle
Meter
Query
;
schedMsg
.
fp
=
vnodeSingle
Table
Query
;
}
}
/*
/*
...
@@ -891,7 +891,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
...
@@ -891,7 +891,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
if
(
pQInfo
->
pMeterQuerySupporter
!=
NULL
)
{
if
(
pQInfo
->
pMeterQuerySupporter
!=
NULL
)
{
if
(
pQInfo
->
pMeterQuerySupporter
->
pSidSet
==
NULL
)
{
if
(
pQInfo
->
pMeterQuerySupporter
->
pSidSet
==
NULL
)
{
schedMsg
.
fp
=
vnodeSingle
Meter
Query
;
schedMsg
.
fp
=
vnodeSingle
Table
Query
;
}
else
{
// group by tag
}
else
{
// group by tag
schedMsg
.
fp
=
vnodeMultiMeterQuery
;
schedMsg
.
fp
=
vnodeMultiMeterQuery
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录