Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d1b4170b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d1b4170b
编写于
7月 08, 2021
作者:
H
Haojun Liao
提交者:
GitHub
7月 08, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #6808 from taosdata/feature/query
Feature/query
上级
1da35884
3d7b4e43
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
74 addition
and
35 deletion
+74
-35
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+4
-4
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+66
-27
src/query/src/qUtil.c
src/query/src/qUtil.c
+4
-4
未找到文件。
src/query/inc/qExecutor.h
浏览文件 @
d1b4170b
...
...
@@ -73,14 +73,14 @@ typedef struct SResultRowPool {
typedef
struct
SResultRow
{
int32_t
pageId
;
// pageId & rowId is the position of current result in disk-based output buffer
int32_t
offset
:
29
;
// row index in buffer page
int32_t
offset
:
29
;
// row index in buffer page
bool
startInterp
;
// the time window start timestamp has done the interpolation already.
bool
endInterp
;
// the time window end timestamp has done the interpolation already.
bool
closed
;
// this result status: closed or opened
uint32_t
numOfRows
;
// number of rows of current time window
SResultRowCellInfo
*
pCellInfo
;
// For each result column, there is a resultInfo
STimeWindow
win
;
char
*
key
;
// start key of current result row
STimeWindow
win
;
char
*
key
;
// start key of current result row
}
SResultRow
;
typedef
struct
SGroupResInfo
{
...
...
@@ -105,7 +105,7 @@ typedef struct SResultRowInfo {
int16_t
type
:
8
;
// data type for hash key
int32_t
size
:
24
;
// number of result set
int32_t
capacity
;
// max capacity
int32_t
curIndex
;
// current start active index
SResultRow
*
current
;
// current start active index
int64_t
prevSKey
;
// previous (not completed) sliding window start key
}
SResultRowInfo
;
...
...
src/query/src/qExecutor.c
浏览文件 @
d1b4170b
...
...
@@ -410,6 +410,21 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
pResultRowInfo
->
capacity
=
(
int32_t
)
newCapacity
;
}
static
int32_t
ascResultRowCompareFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SResultRow
*
pRow1
=
*
(
SResultRow
**
)
p1
;
SResultRow
*
pRow2
=
*
(
SResultRow
**
)
p2
;
if
(
pRow1
==
pRow2
)
{
return
0
;
}
else
{
return
pRow1
->
win
.
skey
<
pRow2
->
win
.
skey
?
-
1
:
1
;
}
}
static
int32_t
descResultRowCompareFn
(
const
void
*
p1
,
const
void
*
p2
)
{
return
-
ascResultRowCompareFn
(
p1
,
p2
);
}
static
SResultRow
*
doPrepareResultRowFromKey
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
uid
)
{
bool
existed
=
false
;
...
...
@@ -425,11 +440,17 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
}
if
(
p1
!=
NULL
)
{
for
(
int32_t
i
=
pResultRowInfo
->
size
-
1
;
i
>=
0
;
--
i
)
{
if
(
pResultRowInfo
->
pResult
[
i
]
==
(
*
p1
))
{
pResultRowInfo
->
curIndex
=
i
;
pResultRowInfo
->
current
=
(
*
p1
);
if
(
pResultRowInfo
->
size
==
0
)
{
existed
=
false
;
}
else
if
(
pResultRowInfo
->
size
==
1
)
{
existed
=
(
pResultRowInfo
->
pResult
[
0
]
==
(
*
p1
));
}
else
{
__compar_fn_t
fn
=
QUERY_IS_ASC_QUERY
(
pRuntimeEnv
->
pQueryAttr
)
?
ascResultRowCompareFn
:
descResultRowCompareFn
;
void
*
ptr
=
taosbsearch
(
p1
,
pResultRowInfo
->
pResult
,
pResultRowInfo
->
size
,
POINTER_BYTES
,
fn
,
TD_EQ
);
if
(
ptr
!=
NULL
)
{
existed
=
true
;
break
;
}
}
}
...
...
@@ -456,8 +477,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
pResult
=
*
p1
;
}
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
]
=
pResult
;
pResultRowInfo
->
cur
Index
=
pResultRowInfo
->
size
++
;
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
++
]
=
pResult
;
pResultRowInfo
->
cur
rent
=
pResult
;
}
// too many time window in query
...
...
@@ -465,7 +486,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW
);
}
return
getResultRow
(
pResultRowInfo
,
pResultRowInfo
->
curIndex
)
;
return
pResultRowInfo
->
current
;
}
static
void
getInitialStartTimeWindow
(
SQueryAttr
*
pQueryAttr
,
TSKEY
ts
,
STimeWindow
*
w
)
{
...
...
@@ -496,7 +517,7 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin
static
STimeWindow
getActiveTimeWindow
(
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SQueryAttr
*
pQueryAttr
)
{
STimeWindow
w
=
{
0
};
if
(
pResultRowInfo
->
cur
Index
==
-
1
)
{
// the first window, from the previous stored value
if
(
pResultRowInfo
->
cur
rent
==
NULL
)
{
// the first window, from the previous stored value
if
(
pResultRowInfo
->
prevSKey
==
TSKEY_INITIAL_VAL
)
{
getInitialStartTimeWindow
(
pQueryAttr
,
ts
,
&
w
);
pResultRowInfo
->
prevSKey
=
w
.
skey
;
...
...
@@ -510,8 +531,9 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
w
.
ekey
=
w
.
skey
+
pQueryAttr
->
interval
.
interval
-
1
;
}
}
else
{
int32_t
slot
=
curTimeWindowIndex
(
pResultRowInfo
);
SResultRow
*
pWindowRes
=
getResultRow
(
pResultRowInfo
,
slot
);
// int32_t slot = curTimeWindowIndex(pResultRowInfo);
// SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
SResultRow
*
pWindowRes
=
pResultRowInfo
->
current
;
w
=
pWindowRes
->
win
;
}
...
...
@@ -697,7 +719,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
// all result rows are closed, set the last one to be the skey
if
(
skey
==
TSKEY_INITIAL_VAL
)
{
pResultRowInfo
->
curIndex
=
pResultRowInfo
->
size
-
1
;
if
(
pResultRowInfo
->
size
==
0
)
{
// assert(pResultRowInfo->current == NULL);
pResultRowInfo
->
current
=
NULL
;
}
else
{
pResultRowInfo
->
current
=
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
-
1
];
}
}
else
{
for
(
i
=
pResultRowInfo
->
size
-
1
;
i
>=
0
;
--
i
)
{
...
...
@@ -708,12 +735,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
}
if
(
i
==
pResultRowInfo
->
size
-
1
)
{
pResultRowInfo
->
cur
Index
=
i
;
pResultRowInfo
->
cur
rent
=
pResultRowInfo
->
pResult
[
i
]
;
}
else
{
pResultRowInfo
->
cur
Index
=
i
+
1
;
// current not closed result object
pResultRowInfo
->
cur
rent
=
pResultRowInfo
->
pResult
[
i
+
1
]
;
// current not closed result object
}
pResultRowInfo
->
prevSKey
=
pResultRowInfo
->
pResult
[
pResultRowInfo
->
curIndex
]
->
win
.
skey
;
pResultRowInfo
->
prevSKey
=
pResultRowInfo
->
current
->
win
.
skey
;
}
}
...
...
@@ -721,7 +748,7 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pQueryAttr
);
if
((
lastKey
>
pQueryAttr
->
window
.
ekey
&&
ascQuery
)
||
(
lastKey
<
pQueryAttr
->
window
.
ekey
&&
(
!
ascQuery
)))
{
closeAllResultRows
(
pResultRowInfo
);
pResultRowInfo
->
cur
Index
=
pResultRowInfo
->
size
-
1
;
pResultRowInfo
->
cur
rent
=
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
-
1
]
;
}
else
{
int32_t
step
=
ascQuery
?
1
:
-
1
;
doUpdateResultRowIndex
(
pResultRowInfo
,
lastKey
-
step
,
ascQuery
,
pQueryAttr
->
timeWindowInterpo
);
...
...
@@ -1230,7 +1257,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQueryAttr
->
order
.
order
);
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pQueryAttr
);
int32_t
prevIndex
=
curTimeWindowIndex
(
pResultRowInfo
);
SResultRow
*
prevRow
=
pResultRowInfo
->
current
;
// int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
TSKEY
*
tsCols
=
NULL
;
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
...
...
@@ -1259,13 +1287,19 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
getNumOfRowsInTimeWindow
(
pRuntimeEnv
,
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
true
);
// prev time window not interpolation yet.
int32_t
curIndex
=
curTimeWindowIndex
(
pResultRowInfo
);
if
(
prevIndex
!=
-
1
&&
prevIndex
<
curIndex
&&
pQueryAttr
->
timeWindowInterpo
)
{
for
(
int32_t
j
=
prevIndex
;
j
<
curIndex
;
++
j
)
{
// previous time window may be all closed already.
// int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
// if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) {
// for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
if
(
prevRow
!=
NULL
&&
prevRow
!=
pResultRowInfo
->
current
&&
pQueryAttr
->
timeWindowInterpo
)
{
int32_t
j
=
0
;
while
(
pResultRowInfo
->
pResult
[
j
]
!=
prevRow
)
{
j
++
;
}
for
(;
pResultRowInfo
->
pResult
[
j
]
!=
pResultRowInfo
->
current
;
++
j
)
{
SResultRow
*
pRes
=
pResultRowInfo
->
pResult
[
j
];
if
(
pRes
->
closed
)
{
assert
(
resultRowInterpolated
(
pRes
,
RESULT_ROW_START_INTERP
)
&&
resultRowInterpolated
(
pRes
,
RESULT_ROW_END_INTERP
));
assert
(
resultRowInterpolated
(
pRes
,
RESULT_ROW_START_INTERP
)
&&
resultRowInterpolated
(
pRes
,
RESULT_ROW_END_INTERP
));
continue
;
}
...
...
@@ -3146,7 +3180,7 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl
}
static
void
updateTableQueryInfoForReverseScan
(
S
QueryAttr
*
pQueryAttr
,
S
TableQueryInfo
*
pTableQueryInfo
)
{
static
void
updateTableQueryInfoForReverseScan
(
STableQueryInfo
*
pTableQueryInfo
)
{
if
(
pTableQueryInfo
==
NULL
)
{
return
;
}
...
...
@@ -3158,7 +3192,12 @@ static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQue
pTableQueryInfo
->
cur
.
vgroupIndex
=
-
1
;
// set the index to be the end slot of result rows array
pTableQueryInfo
->
resInfo
.
curIndex
=
pTableQueryInfo
->
resInfo
.
size
-
1
;
SResultRowInfo
*
pResRowInfo
=
&
pTableQueryInfo
->
resInfo
;
if
(
pResRowInfo
->
size
>
0
)
{
pResRowInfo
->
current
=
pResRowInfo
->
pResult
[
pResRowInfo
->
size
-
1
];
}
else
{
pResRowInfo
->
current
=
NULL
;
}
}
static
void
setupQueryRangeForReverseScan
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
...
...
@@ -3172,7 +3211,7 @@ static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
size_t
t
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
t
;
++
j
)
{
STableQueryInfo
*
pCheckInfo
=
taosArrayGetP
(
group
,
j
);
updateTableQueryInfoForReverseScan
(
p
QueryAttr
,
p
CheckInfo
);
updateTableQueryInfoForReverseScan
(
pCheckInfo
);
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
// the start check timestamp of tsdbQueryHandle
...
...
@@ -4574,7 +4613,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
}
if
(
pResultRowInfo
->
size
>
0
)
{
pResultRowInfo
->
cur
Index
=
0
;
pResultRowInfo
->
cur
rent
=
pResultRowInfo
->
pResult
[
0
]
;
pResultRowInfo
->
prevSKey
=
pResultRowInfo
->
pResult
[
0
]
->
win
.
skey
;
}
...
...
@@ -4600,8 +4639,8 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
pTableScanInfo
->
order
=
cond
.
order
;
if
(
pResultRowInfo
->
size
>
0
)
{
pResultRowInfo
->
cur
Index
=
pResultRowInfo
->
size
-
1
;
pResultRowInfo
->
prevSKey
=
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
-
1
]
->
win
.
skey
;
pResultRowInfo
->
cur
rent
=
pResultRowInfo
->
pResult
[
pResultRowInfo
->
size
-
1
]
;
pResultRowInfo
->
prevSKey
=
pResultRowInfo
->
current
->
win
.
skey
;
}
p
=
doTableScanImpl
(
pOperator
,
newgroup
);
...
...
src/query/src/qUtil.c
浏览文件 @
d1b4170b
...
...
@@ -45,7 +45,7 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t
pResultRowInfo
->
type
=
type
;
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
pResultRowInfo
->
cur
Index
=
-
1
;
pResultRowInfo
->
cur
rent
=
NULL
;
pResultRowInfo
->
capacity
=
size
;
pResultRowInfo
->
pResult
=
calloc
(
pResultRowInfo
->
capacity
,
POINTER_BYTES
);
...
...
@@ -90,9 +90,9 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
SET_RES_WINDOW_KEY
(
pRuntimeEnv
->
keyBuf
,
&
groupIndex
,
sizeof
(
groupIndex
),
uid
);
taosHashRemove
(
pRuntimeEnv
->
pResultRowHashTable
,
(
const
char
*
)
pRuntimeEnv
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
groupIndex
)));
}
pResultRowInfo
->
curIndex
=
-
1
;
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
current
=
NULL
;
pResultRowInfo
->
prevSKey
=
TSKEY_INITIAL_VAL
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录