Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7f818670
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7f818670
编写于
10月 28, 2020
作者:
B
Bomin Zhang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-1807]<fix>: interval window can have more rows
上级
bbfafc6b
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
47 addition
and
48 deletion
+47
-48
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+5
-8
src/query/inc/qUtil.h
src/query/inc/qUtil.h
+1
-1
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+36
-35
src/query/src/qUtil.c
src/query/src/qUtil.c
+5
-4
未找到文件。
src/query/inc/qExecutor.h
浏览文件 @
7f818670
...
...
@@ -33,15 +33,11 @@ struct SColumnFilterElem;
typedef
bool
(
*
__filter_func_t
)(
struct
SColumnFilterElem
*
pFilter
,
char
*
val1
,
char
*
val2
);
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
typedef
struct
SPosInfo
{
int32_t
pageId
:
20
;
int32_t
rowId
:
12
;
}
SPosInfo
;
typedef
struct
SGroupResInfo
{
int32_t
groupId
;
int32_t
numOfDataPages
;
SPosInfo
pos
;
int32_t
pageId
;
int32_t
rowId
;
}
SGroupResInfo
;
typedef
struct
SSqlGroupbyExpr
{
...
...
@@ -53,9 +49,10 @@ typedef struct SSqlGroupbyExpr {
}
SSqlGroupbyExpr
;
typedef
struct
SWindowResult
{
SPosInfo
pos
;
// Position of current result in disk-based output buffer
int32_t
pageId
;
// pageId & rowId is the position of current result in disk-based output buffer
int32_t
rowId
:
15
;
bool
closed
:
1
;
// this result status: closed or opened
uint16_t
numOfRows
;
// number of rows of current time window
bool
closed
;
// this result status: closed or opened
SResultInfo
*
resultInfo
;
// For each result column, there is a resultInfo
union
{
STimeWindow
win
;
char
*
key
;};
// start key of current time window
}
SWindowResult
;
...
...
src/query/inc/qUtil.h
浏览文件 @
7f818670
...
...
@@ -51,7 +51,7 @@ static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int3
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
int32_t
realRowId
=
(
int32_t
)(
pResult
->
pos
.
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
));
int32_t
realRowId
=
(
int32_t
)(
pResult
->
rowId
*
GET_ROW_PARAM_FOR_MULTIOUTPUT
(
pQuery
,
pRuntimeEnv
->
topBotQuery
,
pRuntimeEnv
->
stableQuery
));
return
((
char
*
)
page
->
data
)
+
pRuntimeEnv
->
offset
[
columnIndex
]
*
pRuntimeEnv
->
numOfRowsPerPage
+
pQuery
->
pSelectExpr
[
columnIndex
].
bytes
*
realRowId
;
}
...
...
src/query/src/qExecutor.c
浏览文件 @
7f818670
...
...
@@ -557,7 +557,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
static
int32_t
addNewWindowResultBuf
(
SWindowResult
*
pWindowRes
,
SDiskbasedResultBuf
*
pResultBuf
,
int32_t
sid
,
int32_t
numOfRowsPerPage
)
{
if
(
pWindowRes
->
p
os
.
p
ageId
!=
-
1
)
{
if
(
pWindowRes
->
pageId
!=
-
1
)
{
return
0
;
}
...
...
@@ -590,11 +590,11 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
}
// set the number of rows in current disk page
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
// not allocated yet, allocate new buffer
pWindowRes
->
p
os
.
p
ageId
=
pageId
;
pWindowRes
->
pos
.
rowId
=
(
int32_t
)(
pData
->
num
++
);
if
(
pWindowRes
->
pageId
==
-
1
)
{
// not allocated yet, allocate new buffer
pWindowRes
->
pageId
=
pageId
;
pWindowRes
->
rowId
=
(
int32_t
)(
pData
->
num
++
);
assert
(
pWindowRes
->
p
os
.
p
ageId
>=
0
);
assert
(
pWindowRes
->
pageId
>=
0
);
}
return
0
;
...
...
@@ -616,7 +616,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
*
newWind
=
true
;
// not assign result buffer yet, add new result buffer
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
if
(
pWindowRes
->
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
sid
,
pRuntimeEnv
->
numOfRowsPerPage
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
...
...
@@ -1143,7 +1143,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
assert
(
pRuntimeEnv
->
windowResInfo
.
interval
==
0
);
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
if
(
pWindowRes
->
pageId
==
-
1
)
{
int32_t
ret
=
addNewWindowResultBuf
(
pWindowRes
,
pResultBuf
,
GROUPRESULTID
,
pRuntimeEnv
->
numOfRowsPerPage
);
if
(
ret
!=
0
)
{
return
-
1
;
...
...
@@ -2652,7 +2652,7 @@ static void doMerge(SQueryRuntimeEnv *pRuntimeEnv, int64_t timestamp, SWindowRes
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQLFunctionCtx
*
pCtx
=
pRuntimeEnv
->
pCtx
;
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
...
...
@@ -2823,14 +2823,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
SWindowResInfo
*
pWindowResInfo1
=
&
supporter
->
pTableQueryInfo
[
left
]
->
windowResInfo
;
SWindowResult
*
pWindowRes1
=
getWindowResult
(
pWindowResInfo1
,
leftPos
);
tFilePage
*
page1
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes1
->
p
os
.
p
ageId
);
tFilePage
*
page1
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes1
->
pageId
);
char
*
b1
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes1
,
page1
);
TSKEY
leftTimestamp
=
GET_INT64_VAL
(
b1
);
SWindowResInfo
*
pWindowResInfo2
=
&
supporter
->
pTableQueryInfo
[
right
]
->
windowResInfo
;
SWindowResult
*
pWindowRes2
=
getWindowResult
(
pWindowResInfo2
,
rightPos
);
tFilePage
*
page2
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes2
->
p
os
.
p
ageId
);
tFilePage
*
page2
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes2
->
pageId
);
char
*
b2
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes2
,
page2
);
TSKEY
rightTimestamp
=
GET_INT64_VAL
(
b2
);
...
...
@@ -2867,7 +2867,7 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
}
SGroupResInfo
*
info
=
&
pQInfo
->
groupResInfo
;
if
(
pQInfo
->
groupIndex
==
numOfGroups
&&
info
->
p
os
.
p
ageId
==
info
->
numOfDataPages
)
{
if
(
pQInfo
->
groupIndex
==
numOfGroups
&&
info
->
pageId
==
info
->
numOfDataPages
)
{
SET_STABLE_QUERY_OVER
(
pQInfo
);
}
...
...
@@ -2883,10 +2883,10 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
SGroupResInfo
*
pGroupResInfo
=
&
pQInfo
->
groupResInfo
;
// all results have been return to client, try next group
if
(
pGroupResInfo
->
p
os
.
p
ageId
==
pGroupResInfo
->
numOfDataPages
)
{
if
(
pGroupResInfo
->
pageId
==
pGroupResInfo
->
numOfDataPages
)
{
pGroupResInfo
->
numOfDataPages
=
0
;
pGroupResInfo
->
p
os
.
p
ageId
=
0
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
pageId
=
0
;
pGroupResInfo
->
rowId
=
0
;
// current results of group has been sent to client, try next group
if
(
mergeIntoGroupResult
(
pQInfo
)
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2914,22 +2914,22 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
assert
(
size
==
pGroupResInfo
->
numOfDataPages
);
bool
done
=
false
;
for
(
int32_t
j
=
pGroupResInfo
->
p
os
.
p
ageId
;
j
<
size
;
++
j
)
{
for
(
int32_t
j
=
pGroupResInfo
->
pageId
;
j
<
size
;
++
j
)
{
SPageInfo
*
pi
=
*
(
SPageInfo
**
)
taosArrayGet
(
list
,
j
);
tFilePage
*
pData
=
getResBufPage
(
pResultBuf
,
pi
->
pageId
);
assert
(
pData
->
num
>
0
&&
pData
->
num
<=
pRuntimeEnv
->
numOfRowsPerPage
&&
pGroupResInfo
->
pos
.
rowId
<
pData
->
num
);
int32_t
numOfRes
=
(
int32_t
)(
pData
->
num
-
pGroupResInfo
->
pos
.
rowId
);
assert
(
pData
->
num
>
0
&&
pData
->
num
<=
pRuntimeEnv
->
numOfRowsPerPage
&&
pGroupResInfo
->
rowId
<
pData
->
num
);
int32_t
numOfRes
=
(
int32_t
)(
pData
->
num
-
pGroupResInfo
->
rowId
);
if
(
numOfRes
>
pQuery
->
rec
.
capacity
-
offset
)
{
numOfCopiedRows
=
(
int32_t
)(
pQuery
->
rec
.
capacity
-
offset
);
pGroupResInfo
->
pos
.
rowId
+=
numOfCopiedRows
;
pGroupResInfo
->
rowId
+=
numOfCopiedRows
;
done
=
true
;
}
else
{
numOfCopiedRows
=
(
int32_t
)
pData
->
num
;
pGroupResInfo
->
p
os
.
p
ageId
+=
1
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
pageId
+=
1
;
pGroupResInfo
->
rowId
=
0
;
}
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
...
...
@@ -3020,8 +3020,8 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
pGroupResInfo
->
numOfDataPages
=
(
int32_t
)
taosArrayGetSize
(
pageList
);
pGroupResInfo
->
groupId
=
tid
;
pGroupResInfo
->
p
os
.
p
ageId
=
0
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
pageId
=
0
;
pGroupResInfo
->
rowId
=
0
;
return
pGroupResInfo
->
numOfDataPages
;
}
...
...
@@ -3067,7 +3067,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
SWindowResInfo
*
pWindowResInfo
=
&
pTableList
[
pos
]
->
windowResInfo
;
SWindowResult
*
pWindowRes
=
getWindowResult
(
pWindowResInfo
,
cs
.
position
[
pos
]);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pageId
);
char
*
b
=
getPosInResultPage
(
pRuntimeEnv
,
PRIMARYKEY_TIMESTAMP_COL_INDEX
,
pWindowRes
,
page
);
TSKEY
ts
=
GET_INT64_VAL
(
b
);
...
...
@@ -3104,7 +3104,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
lastTimestamp
=
ts
;
// move to the next element of current entry
int32_t
currentPageId
=
pWindowRes
->
p
os
.
p
ageId
;
int32_t
currentPageId
=
pWindowRes
->
pageId
;
cs
.
position
[
pos
]
+=
1
;
if
(
cs
.
position
[
pos
]
>=
pWindowResInfo
->
size
)
{
...
...
@@ -3117,7 +3117,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
}
else
{
// current page is not needed anymore
SWindowResult
*
pNextWindowRes
=
getWindowResult
(
pWindowResInfo
,
cs
.
position
[
pos
]);
if
(
pNextWindowRes
->
p
os
.
p
ageId
!=
currentPageId
)
{
if
(
pNextWindowRes
->
pageId
!=
currentPageId
)
{
releaseResBufPage
(
pRuntimeEnv
->
pResultBuf
,
page
);
}
}
...
...
@@ -3329,7 +3329,8 @@ int32_t createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool is
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
pResultRow
->
pos
=
(
SPosInfo
)
{
-
1
,
-
1
};
pResultRow
->
pageId
=
-
1
;
pResultRow
->
rowId
=
-
1
;
char
*
buf
=
(
char
*
)
pResultRow
->
resultInfo
+
numOfCols
*
sizeof
(
SResultInfo
);
...
...
@@ -3796,7 +3797,7 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
* not assign result buffer yet, add new result buffer
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if
(
pWindowRes
->
p
os
.
p
ageId
==
-
1
)
{
if
(
pWindowRes
->
pageId
==
-
1
)
{
if
(
addNewWindowResultBuf
(
pWindowRes
,
pRuntimeEnv
->
pResultBuf
,
groupIndex
,
pRuntimeEnv
->
numOfRowsPerPage
)
!=
TSDB_CODE_SUCCESS
)
{
return
;
...
...
@@ -3813,7 +3814,7 @@ void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
...
...
@@ -3840,7 +3841,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
tFilePage
*
bufPage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
p
os
.
p
ageId
);
tFilePage
*
bufPage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pResult
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
SQLFunctionCtx
*
pCtx
=
&
pRuntimeEnv
->
pCtx
[
i
];
...
...
@@ -4019,12 +4020,12 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
for
(
int32_t
i
=
startIdx
;
(
i
<
totalSet
)
&&
(
i
>=
0
);
i
+=
step
)
{
if
(
result
[
i
].
numOfRows
==
0
)
{
pQInfo
->
groupIndex
+=
1
;
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
rowId
=
0
;
continue
;
}
int32_t
numOfRowsToCopy
=
result
[
i
].
numOfRows
-
pGroupResInfo
->
pos
.
rowId
;
int32_t
oldOffset
=
pGroupResInfo
->
pos
.
rowId
;
int32_t
numOfRowsToCopy
=
result
[
i
].
numOfRows
-
pGroupResInfo
->
rowId
;
int32_t
oldOffset
=
pGroupResInfo
->
rowId
;
/*
* current output space is not enough to accommodate all data of this page, only partial results
...
...
@@ -4032,13 +4033,13 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
*/
if
(
numOfRowsToCopy
>
pQuery
->
rec
.
capacity
-
numOfResult
)
{
numOfRowsToCopy
=
(
int32_t
)
pQuery
->
rec
.
capacity
-
numOfResult
;
pGroupResInfo
->
pos
.
rowId
+=
numOfRowsToCopy
;
pGroupResInfo
->
rowId
+=
numOfRowsToCopy
;
}
else
{
pGroupResInfo
->
pos
.
rowId
=
0
;
pGroupResInfo
->
rowId
=
0
;
pQInfo
->
groupIndex
+=
1
;
}
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
result
[
i
].
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
result
[
i
].
pageId
);
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
int32_t
size
=
pRuntimeEnv
->
pCtx
[
j
].
outputBytes
;
...
...
src/query/src/qUtil.c
浏览文件 @
7f818670
...
...
@@ -266,7 +266,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
return
;
}
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
p
os
.
p
ageId
);
tFilePage
*
page
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
pWindowRes
->
pageId
);
for
(
int32_t
i
=
0
;
i
<
pRuntimeEnv
->
pQuery
->
numOfOutput
;
++
i
)
{
SResultInfo
*
pResultInfo
=
&
pWindowRes
->
resultInfo
[
i
];
...
...
@@ -279,7 +279,8 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
}
pWindowRes
->
numOfRows
=
0
;
pWindowRes
->
pos
=
(
SPosInfo
){
-
1
,
-
1
};
pWindowRes
->
pageId
=
-
1
;
pWindowRes
->
rowId
=
-
1
;
pWindowRes
->
closed
=
false
;
pWindowRes
->
win
=
TSWINDOW_INITIALIZER
;
}
...
...
@@ -308,10 +309,10 @@ void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, con
memcpy
(
pDst
->
interResultBuf
,
pSrc
->
interResultBuf
,
pDst
->
bufLen
);
// copy the output buffer data from src to dst, the position info keep unchanged
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
p
os
.
p
ageId
);
tFilePage
*
dstpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
dst
->
pageId
);
char
*
dstBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
dst
,
dstpage
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
p
os
.
p
ageId
);
tFilePage
*
srcpage
=
getResBufPage
(
pRuntimeEnv
->
pResultBuf
,
src
->
pageId
);
char
*
srcBuf
=
getPosInResultPage
(
pRuntimeEnv
,
i
,
(
SWindowResult
*
)
src
,
srcpage
);
size_t
s
=
pRuntimeEnv
->
pQuery
->
pSelectExpr
[
i
].
bytes
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录