Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5e2b3f91
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
5e2b3f91
编写于
9月 15, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
9月 15, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #3529 from taosdata/hotfix/groupby
[td-1290]
上级
2a7b9134
bffd30ef
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
51 addition
and
70 deletion
+51
-70
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+51
-70
未找到文件。
src/query/src/qExecutor.c
浏览文件 @
5e2b3f91
...
...
@@ -1951,36 +1951,36 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
// todo handle the case the the order irrelevant query type mixed up with order critical query type
// descending order query for last_row query
if
(
isFirstLastRowQuery
(
pQuery
)
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)
)
{
if
(
isFirstLastRowQuery
(
pQuery
))
{
qDebug
(
"QInfo:%p scan order changed for last_row query, old:%d, new:%d"
,
GET_QINFO_ADDR
(
pQuery
),
pQuery
->
order
.
order
,
TSDB_ORDER_ASC
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
pQuery
->
order
.
order
=
TSDB_ORDER_ASC
;
assert
(
pQuery
->
window
.
skey
<=
pQuery
->
window
.
ekey
);
if
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
)
{
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
doExchangeTimeWindow
(
pQInfo
,
&
pQuery
->
window
);
return
;
}
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
)
)
{
if
(
isGroupbyNormalCol
(
pQuery
->
pGroupbyExpr
)
&&
pQuery
->
order
.
order
==
TSDB_ORDER_DESC
)
{
pQuery
->
order
.
order
=
TSDB_ORDER_ASC
;
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
assert
(
pQuery
->
window
.
skey
<=
pQuery
->
window
.
ekey
);
if
(
pQuery
->
window
.
skey
>
pQuery
->
window
.
ekey
)
{
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
doExchangeTimeWindow
(
pQInfo
,
&
pQuery
->
window
);
return
;
}
if
(
isPointInterpoQuery
(
pQuery
)
&&
(
pQuery
->
intervalTime
==
0
)
&&
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
qDebug
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"interp"
,
pQuery
->
order
.
order
,
TSDB_ORDER_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
if
(
isPointInterpoQuery
(
pQuery
)
&&
pQuery
->
intervalTime
==
0
)
{
if
(
!
QUERY_IS_ASC_QUERY
(
pQuery
))
{
qDebug
(
msg
,
GET_QINFO_ADDR
(
pQuery
),
"interp"
,
pQuery
->
order
.
order
,
TSDB_ORDER_ASC
,
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
ekey
,
pQuery
->
window
.
skey
);
SWAP
(
pQuery
->
window
.
skey
,
pQuery
->
window
.
ekey
,
TSKEY
);
}
pQuery
->
order
.
order
=
TSDB_ORDER_ASC
;
assert
(
pQuery
->
window
.
skey
<=
pQuery
->
window
.
ekey
);
doExchangeTimeWindow
(
pQInfo
,
&
pQuery
->
window
);
return
;
}
...
...
@@ -2365,16 +2365,16 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
memset
(
tmp
+
sizeof
(
tFilePage
)
+
bytes
*
pRec
->
rows
,
0
,
(
size_t
)((
newSize
-
pRec
->
rows
)
*
bytes
));
pQuery
->
sdata
[
i
]
=
(
tFilePage
*
)
tmp
;
}
// set the pCtx output buffer position
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
=
pQuery
->
sdata
[
i
]
->
data
+
pRec
->
rows
*
bytes
;
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
if
(
functionId
==
TSDB_FUNC_TOP
||
functionId
==
TSDB_FUNC_BOTTOM
||
functionId
==
TSDB_FUNC_DIFF
)
{
pRuntimeEnv
->
pCtx
[
i
].
ptsOutputBuf
=
pRuntimeEnv
->
pCtx
[
0
].
aOutputBuf
;
}
}
qDebug
(
"QInfo:%p realloc output buffer, new size: %d rows, old:%"
PRId64
", remain:%"
PRId64
,
GET_QINFO_ADDR
(
pRuntimeEnv
),
newSize
,
pRec
->
capacity
,
newSize
-
pRec
->
rows
);
...
...
@@ -2920,11 +2920,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
STableQueryInfo
*
item
=
taosArrayGetP
(
pGroup
,
i
);
SIDList
list
=
getDataBufPagesIdList
(
pRuntimeEnv
->
pResultBuf
,
TSDB_TABLEID
(
item
->
pTable
)
->
tid
);
pageList
=
list
;
tid
=
TSDB_TABLEID
(
item
->
pTable
)
->
tid
;
if
(
taosArrayGetSize
(
list
)
>
0
&&
item
->
windowResInfo
.
size
>
0
)
{
pTableList
[
numOfTables
++
]
=
item
;
tid
=
TSDB_TABLEID
(
item
->
pTable
)
->
tid
;
pageList
=
list
;
}
}
...
...
@@ -3357,7 +3357,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
for
(
int32_t
i
=
0
;
i
<
pQuery
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
pQuery
->
pSelectExpr
[
i
].
base
.
functionId
;
int32_t
bytes
=
pRuntimeEnv
->
pCtx
[
i
].
outputBytes
;
memmove
(
pQuery
->
sdata
[
i
]
->
data
,
(
char
*
)
pQuery
->
sdata
[
i
]
->
data
+
bytes
*
numOfSkip
,
(
size_t
)(
pQuery
->
rec
.
rows
*
bytes
));
pRuntimeEnv
->
pCtx
[
i
].
aOutputBuf
=
((
char
*
)
pQuery
->
sdata
[
i
]
->
data
)
+
pQuery
->
rec
.
rows
*
bytes
;
...
...
@@ -4354,32 +4354,6 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
return
true
;
}
static
void
freeTableQueryInfo
(
STableGroupInfo
*
pTableGroupInfo
)
{
if
(
pTableGroupInfo
->
pGroupList
==
NULL
)
{
assert
(
pTableGroupInfo
->
numOfTables
==
0
);
}
else
{
size_t
numOfGroups
=
taosArrayGetSize
(
pTableGroupInfo
->
pGroupList
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
p
=
taosArrayGetP
(
pTableGroupInfo
->
pGroupList
,
i
);
size_t
num
=
taosArrayGetSize
(
p
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
STableQueryInfo
*
item
=
taosArrayGetP
(
p
,
j
);
destroyTableQueryInfo
(
item
);
}
taosArrayDestroy
(
p
);
}
taosArrayDestroy
(
pTableGroupInfo
->
pGroupList
);
pTableGroupInfo
->
pGroupList
=
NULL
;
pTableGroupInfo
->
numOfTables
=
0
;
}
taosHashCleanup
(
pTableGroupInfo
->
map
);
pTableGroupInfo
->
map
=
NULL
;
}
static
int32_t
setupQueryHandle
(
void
*
tsdb
,
SQInfo
*
pQInfo
,
bool
isSTableQuery
)
{
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pQInfo
->
runtimeEnv
.
pQuery
;
...
...
@@ -4415,22 +4389,20 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
terrno
=
TSDB_CODE_SUCCESS
;
if
(
isFirstLastRowQuery
(
pQuery
))
{
pRuntimeEnv
->
pQueryHandle
=
tsdbQueryLastRow
(
tsdb
,
&
cond
,
&
pQInfo
->
tableGroupInfo
,
pQInfo
);
if
(
pRuntimeEnv
->
pQueryHandle
==
NULL
)
{
// no data in current stable, clear all
freeTableQueryInfo
(
&
pQInfo
->
tableqinfoGroupInfo
);
}
else
{
// update the query time window
pQuery
->
window
=
cond
.
twindow
;
size_t
numOfGroups
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
i
);
// update the query time window
pQuery
->
window
=
cond
.
twindow
;
size_t
numOfGroups
=
GET_NUM_OF_TABLEGROUP
(
pQInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
group
=
GET_TABLEGROUP
(
pQInfo
,
i
);
size_t
t
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
t
;
++
j
)
{
STableQueryInfo
*
pCheckInfo
=
taosArrayGetP
(
group
,
j
);
size_t
t
=
taosArrayGetSize
(
group
);
for
(
int32_t
j
=
0
;
j
<
t
;
++
j
)
{
STableQueryInfo
*
pCheckInfo
=
taosArrayGetP
(
group
,
j
);
pCheckInfo
->
win
=
pQuery
->
window
;
pCheckInfo
->
lastKey
=
pCheckInfo
->
win
.
skey
;
}
pCheckInfo
->
win
=
pQuery
->
window
;
pCheckInfo
->
lastKey
=
pCheckInfo
->
win
.
skey
;
}
}
}
else
if
(
isPointInterpoQuery
(
pQuery
))
{
...
...
@@ -4484,12 +4456,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
return
code
;
}
if
(
pQInfo
->
tableqinfoGroupInfo
.
numOfTables
==
0
)
{
qDebug
(
"QInfo:%p no table qualified for tag filter, abort query"
,
pQInfo
);
setQueryStatus
(
pQuery
,
QUERY_COMPLETED
);
return
TSDB_CODE_SUCCESS
;
}
pQInfo
->
tsdb
=
tsdb
;
pQInfo
->
vgId
=
vgId
;
...
...
@@ -4612,7 +4578,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
SQuery
*
pQuery
=
pRuntimeEnv
->
pQuery
;
SQueryCostInfo
*
summary
=
&
pRuntimeEnv
->
summary
;
int64_t
st
=
taosGetTimestampMs
();
TsdbQueryHandleT
pQueryHandle
=
IS_MASTER_SCAN
(
pRuntimeEnv
)
?
pRuntimeEnv
->
pQueryHandle
:
pRuntimeEnv
->
pSecQueryHandle
;
...
...
@@ -4622,7 +4588,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
while
(
tsdbNextDataBlock
(
pQueryHandle
))
{
summary
->
totalBlocks
+=
1
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
...
...
@@ -4659,7 +4625,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
summary
->
totalRows
+=
blockInfo
.
rows
;
stableApplyFunctionsOnBlock
(
pRuntimeEnv
,
&
blockInfo
,
pStatis
,
pDataBlock
,
binarySearchForKey
);
qDebug
(
"QInfo:%p check data block completed, uid:%"
PRId64
", tid:%d, brange:%"
PRId64
"-%"
PRId64
", numOfRows:%d, "
"lastKey:%"
PRId64
,
pQInfo
,
blockInfo
.
uid
,
blockInfo
.
tid
,
blockInfo
.
window
.
skey
,
blockInfo
.
window
.
ekey
,
blockInfo
.
rows
,
...
...
@@ -6383,10 +6349,25 @@ static void freeQInfo(SQInfo *pQInfo) {
taosTFree
(
pQuery
);
}
freeTableQueryInfo
(
&
pQInfo
->
tableqinfoGroupInfo
);
// todo refactor, extract method to destroytableDataInfo
if
(
pQInfo
->
tableqinfoGroupInfo
.
pGroupList
!=
NULL
)
{
int32_t
numOfGroups
=
(
int32_t
)(
GET_NUM_OF_TABLEGROUP
(
pQInfo
));
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
p
=
GET_TABLEGROUP
(
pQInfo
,
i
);
taosTFree
(
pQInfo
->
pBuf
);
size_t
num
=
taosArrayGetSize
(
p
);
for
(
int32_t
j
=
0
;
j
<
num
;
++
j
)
{
STableQueryInfo
*
item
=
taosArrayGetP
(
p
,
j
);
destroyTableQueryInfo
(
item
);
}
taosArrayDestroy
(
p
);
}
}
taosTFree
(
pQInfo
->
pBuf
);
taosArrayDestroy
(
pQInfo
->
tableqinfoGroupInfo
.
pGroupList
);
taosHashCleanup
(
pQInfo
->
tableqinfoGroupInfo
.
map
);
tsdbDestroyTableGroup
(
&
pQInfo
->
tableGroupInfo
);
taosArrayDestroy
(
pQInfo
->
arrTableIdInfo
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录