Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f5eefb0f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f5eefb0f
编写于
8月 23, 2021
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[td-6260]<enhance>: Optimize the client-side query performance when multiple group result exists.
上级
48043bd4
变更
6
展开全部
隐藏空白更改
内联
并排
Showing
6 changed file
with
311 addition
and
287 deletion
+311
-287
src/client/inc/tscUtil.h
src/client/inc/tscUtil.h
+1
-1
src/client/src/tscGlobalmerge.c
src/client/src/tscGlobalmerge.c
+165
-203
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+0
-1
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+10
-5
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+134
-76
src/query/src/qFill.c
src/query/src/qFill.c
+1
-1
未找到文件。
src/client/inc/tscUtil.h
浏览文件 @
f5eefb0f
...
...
@@ -36,7 +36,7 @@ extern "C" {
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)
|| UTIL_TABLE_IS_TMP_TABLE(metaInfo)
))
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
...
...
src/client/src/tscGlobalmerge.c
浏览文件 @
f5eefb0f
...
...
@@ -35,6 +35,7 @@ typedef struct SCompareParam {
static
bool
needToMerge
(
SSDataBlock
*
pBlock
,
SArray
*
columnIndexList
,
int32_t
index
,
char
**
buf
)
{
int32_t
ret
=
0
;
size_t
size
=
taosArrayGetSize
(
columnIndexList
);
if
(
size
>
0
)
{
ret
=
compare_aRv
(
pBlock
,
columnIndexList
,
(
int32_t
)
size
,
index
,
buf
,
TSDB_ORDER_ASC
);
...
...
@@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
(
*
hasPrev
)
=
true
;
}
// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the
// output value to multiple rows
static
void
setTagValueForMultipleRows
(
SQLFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
numOfRows
)
{
if
(
numOfRows
<=
1
)
{
return
;
return
;
}
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
...
...
@@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
continue
;
}
int32_t
inc
=
numOfRows
-
1
;
// tsdb_func_tag function only produce one row of result
char
*
src
=
pCtx
[
k
].
pOutput
;
char
*
src
=
pCtx
[
k
].
pOutput
;
char
*
dst
=
pCtx
[
k
].
pOutput
+
pCtx
[
k
].
outputBytes
;
for
(
int32_t
i
=
0
;
i
<
inc
;
++
i
)
{
pCtx
[
k
].
pOutput
+=
pCtx
[
k
].
outputBytes
;
memcpy
(
pCtx
[
k
].
pOutput
,
src
,
(
size_t
)
pCtx
[
k
].
outputBytes
);
// Let's start from the second row, as the first row has result value already.
for
(
int32_t
i
=
1
;
i
<
numOfRows
;
++
i
)
{
memcpy
(
dst
,
src
,
(
size_t
)
pCtx
[
k
].
outputBytes
);
dst
+=
pCtx
[
k
].
outputBytes
;
}
}
}
static
void
doMergeResultImpl
(
SMultiwayMergeInfo
*
pInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfExpr
,
int32_t
rowIndex
,
char
**
pDataPtr
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
pCtx
[
j
].
pInput
=
pDataPtr
[
j
]
+
pCtx
[
j
].
inputBytes
*
rowIndex
;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_MERGE
);
}
else
{
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
}
}
static
void
doFinalizeResultImpl
(
SMultiwayMergeInfo
*
pInfo
,
SQLFunctionCtx
*
pCtx
,
int32_t
numOfExpr
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_FINALIZE
);
}
else
{
aAggs
[
functionId
].
xFinalize
(
&
pCtx
[
j
]);
}
}
}
...
...
@@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
SMultiwayMergeInfo
*
pInfo
=
pOperator
->
info
;
SQLFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
char
**
add
=
calloc
(
pBlock
->
info
.
numOfCols
,
POINTER_BYTES
);
char
**
add
rPtr
=
calloc
(
pBlock
->
info
.
numOfCols
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
add
[
i
]
=
pCtx
[
i
].
pInput
;
add
rPtr
[
i
]
=
pCtx
[
i
].
pInput
;
pCtx
[
i
].
size
=
1
;
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
if
(
pInfo
->
hasPrev
)
{
if
(
needToMerge
(
pBlock
,
pInfo
->
orderColumnList
,
i
,
pInfo
->
prevRow
))
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
pCtx
[
j
].
pInput
=
add
[
j
]
+
pCtx
[
j
].
inputBytes
*
i
;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_MERGE
);
continue
;
}
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
,
addrPtr
);
}
else
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
// TODO refactor
int32_t
functionId
=
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_FINALIZE
);
continue
;
}
aAggs
[
functionId
].
xFinalize
(
&
pCtx
[
j
]);
}
doFinalizeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
);
int32_t
numOfRows
=
getNumOfResult
(
pOperator
->
pRuntimeEnv
,
pInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
);
setTagValueForMultipleRows
(
pCtx
,
pOperator
->
numOfOutput
,
numOfRows
);
...
...
@@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
aAggs
[
pCtx
[
j
].
functionId
].
init
(
&
pCtx
[
j
],
pCtx
[
j
].
resultInfo
);
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
pCtx
[
j
].
pInput
=
add
[
j
]
+
pCtx
[
j
].
inputBytes
*
i
;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_MERGE
);
continue
;
}
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
,
addrPtr
);
}
}
else
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
pCtx
[
j
].
pInput
=
add
[
j
]
+
pCtx
[
j
].
inputBytes
*
i
;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
int32_t
functionId
=
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_MERGE
);
continue
;
}
aAggs
[
functionId
].
mergeFunc
(
&
pCtx
[
j
]);
}
doMergeResultImpl
(
pInfo
,
pCtx
,
numOfExpr
,
i
,
addrPtr
);
}
savePrevOrderColumns
(
pInfo
->
prevRow
,
pInfo
->
orderColumnList
,
pBlock
,
i
,
&
pInfo
->
hasPrev
);
...
...
@@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
{
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
pCtx
[
i
].
pInput
=
add
[
i
];
pCtx
[
i
].
pInput
=
add
rPtr
[
i
];
}
}
tfree
(
add
);
tfree
(
add
rPtr
);
}
static
bool
isAllSourcesCompleted
(
SGlobalMerger
*
pMerger
)
{
...
...
@@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SLocalDataSource
*
pOneDataSrc
=
pMerger
->
pLocalDataSrc
[
pTree
->
pNode
[
0
].
index
];
bool
sameGroup
=
true
;
if
(
pInfo
->
hasPrev
)
{
// todo refactor extract method
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pInfo
->
orderColumnList
);
// if this row belongs to current result set group
...
...
@@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
break
;
}
bool
sameGroup
=
true
;
if
(
pAggInfo
->
hasGroupColData
)
{
bool
sameGroup
=
isSameGroup
(
pAggInfo
->
groupColumnList
,
pBlock
,
pAggInfo
->
currentGroupColData
);
if
(
!
sameGroup
)
{
sameGroup
=
isSameGroup
(
pAggInfo
->
groupColumnList
,
pBlock
,
pAggInfo
->
currentGroupColData
);
if
(
!
sameGroup
&&
!
pAggInfo
->
multiGroupResults
)
{
*
newgroup
=
true
;
pAggInfo
->
hasDataBlockForNewGroup
=
true
;
pAggInfo
->
pExistBlock
=
pBlock
;
...
...
@@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
}
if
(
handleData
)
{
// data in current group is all handled
for
(
int32_t
j
=
0
;
j
<
pOperator
->
numOfOutput
;
++
j
)
{
int32_t
functionId
=
pAggInfo
->
binfo
.
pCtx
[
j
].
functionId
;
if
(
functionId
==
TSDB_FUNC_TAG_DUMMY
||
functionId
==
TSDB_FUNC_TS_DUMMY
)
{
continue
;
}
if
(
functionId
<
0
)
{
SUdfInfo
*
pUdfInfo
=
taosArrayGet
(
pAggInfo
->
udfInfo
,
-
1
*
functionId
-
1
);
doInvokeUdf
(
pUdfInfo
,
&
pAggInfo
->
binfo
.
pCtx
[
j
],
0
,
TSDB_UDF_FUNC_FINALIZE
);
continue
;
}
aAggs
[
functionId
].
xFinalize
(
&
pAggInfo
->
binfo
.
pCtx
[
j
]);
}
doFinalizeResultImpl
(
pAggInfo
,
pAggInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
);
int32_t
numOfRows
=
getNumOfResult
(
pOperator
->
pRuntimeEnv
,
pAggInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
);
pAggInfo
->
binfo
.
pRes
->
info
.
rows
+=
numOfRows
;
pAggInfo
->
binfo
.
pRes
->
info
.
rows
+=
numOfRows
;
setTagValueForMultipleRows
(
pAggInfo
->
binfo
.
pCtx
,
pOperator
->
numOfOutput
,
numOfRows
);
}
...
...
@@ -1019,137 +975,143 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
return
(
pRes
->
info
.
rows
!=
0
)
?
pRes
:
NULL
;
}
static
SSDataBlock
*
skipGroupBlock
(
SOperatorInfo
*
pOperator
,
bool
*
newgroup
)
{
SSLimitOperatorInfo
*
pInfo
=
pOperator
->
info
;
assert
(
pInfo
->
currentGroupOffset
>=
0
);
static
void
doHandleDataInCurrentGroup
(
SSLimitOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
)
{
if
(
pInfo
->
currentOffset
>
0
)
{
pInfo
->
currentOffset
-=
1
;
}
else
{
// discard the data rows in current group
if
(
pInfo
->
limit
.
limit
<
0
||
(
pInfo
->
limit
.
limit
>=
0
&&
pInfo
->
rowsTotal
<
pInfo
->
limit
.
limit
))
{
int32_t
num1
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
num1
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
SColumnInfoData
*
pDstInfoData
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
i
);
SSDataBlock
*
pBlock
=
NULL
;
if
(
pInfo
->
currentGroupOffset
==
0
)
{
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pBlock
=
pOperator
->
upstream
[
0
]
->
exec
(
pOperator
->
upstream
[
0
],
newgroup
);
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
,
QUERY_COMPLETED
);
pOperator
->
status
=
OP_EXEC_DONE
;
}
SColumnInfo
*
pColInfo
=
&
pColInfoData
->
info
;
char
*
pSrc
=
rowIndex
*
pColInfo
->
bytes
+
(
char
*
)
pColInfoData
->
pData
;
char
*
pDst
=
(
char
*
)
pDstInfoData
->
pData
+
(
pInfo
->
pRes
->
info
.
rows
*
pColInfo
->
bytes
);
if
(
*
newgroup
==
false
&&
pInfo
->
limit
.
limit
>
0
&&
pInfo
->
rowsTotal
>=
pInfo
->
limit
.
limit
)
{
while
((
*
newgroup
)
==
false
)
{
// ignore the remain blocks
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pBlock
=
pOperator
->
upstream
[
0
]
->
exec
(
pOperator
->
upstream
[
0
],
newgroup
);
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
,
QUERY_COMPLETED
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
NULL
;
}
memcpy
(
pDst
,
pSrc
,
pColInfo
->
bytes
);
}
}
return
pBlock
;
}
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pBlock
=
pOperator
->
upstream
[
0
]
->
exec
(
pOperator
->
upstream
[
0
],
newgroup
);
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pBlock
==
NULL
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
,
QUERY_COMPLETED
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
NULL
;
pInfo
->
rowsTotal
+=
1
;
pInfo
->
pRes
->
info
.
rows
+=
1
;
}
}
}
while
(
1
)
{
if
(
*
newgroup
)
{
pInfo
->
currentGroupOffset
-=
1
;
*
newgroup
=
false
;
}
static
void
ensureOutputBuf
(
SSLimitOperatorInfo
*
pInfo
,
SSDataBlock
*
pResultBlock
,
int32_t
numOfRows
)
{
if
(
pInfo
->
capacity
<
pResultBlock
->
info
.
rows
+
numOfRows
)
{
int32_t
total
=
pResultBlock
->
info
.
rows
+
numOfRows
;
while
((
*
newgroup
)
==
false
)
{
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
pBlock
=
pOperator
->
upstream
[
0
]
->
exec
(
pOperator
->
upstream
[
0
],
newgroup
);
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_AFTER_OPERATOR_EXEC
);
int32_t
num
=
taosArrayGetSize
(
pResultBlock
->
pDataBlock
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
SColumnInfoData
*
pInfoData
=
taosArrayGet
(
pResultBlock
->
pDataBlock
,
i
);
if
(
pBlock
==
NULL
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
,
QUERY_COMPLETED
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
NULL
;
}
char
*
tmp
=
realloc
(
pInfoData
->
pData
,
total
*
pInfoData
->
info
.
bytes
);
if
(
tmp
!=
NULL
)
{
pInfoData
->
pData
=
tmp
;
}
else
{
// todo handle the malloc failure
}
// now we have got the first data block of the next group.
if
(
pInfo
->
currentGroupOffset
==
0
)
{
return
pBlock
;
}
pInfo
->
capacity
=
total
;
pInfo
->
threshold
=
total
*
0
.
8
;
}
return
NULL
;
}
}
SSDataBlock
*
doSLimit
(
void
*
param
,
bool
*
newgroup
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
static
void
doSlimitImpl
(
SOperatorInfo
*
pOperator
,
SSLimitOperatorInfo
*
pInfo
,
SSDataBlock
*
pBlock
)
{
int32_t
rowIndex
=
0
;
SSLimitOperatorInfo
*
pInfo
=
pOperator
->
info
;
while
(
rowIndex
<
pBlock
->
info
.
rows
)
{
int32_t
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pInfo
->
orderColumnList
);
SSDataBlock
*
pBlock
=
NULL
;
while
(
1
)
{
pBlock
=
skipGroupBlock
(
pOperator
,
newgroup
);
if
(
pBlock
==
NULL
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
,
QUERY_COMPLETED
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
NULL
;
}
bool
samegroup
=
true
;
if
(
pInfo
->
hasPrev
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColIndex
*
pIndex
=
taosArrayGet
(
pInfo
->
orderColumnList
,
i
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pIndex
->
colIndex
);
if
(
*
newgroup
)
{
// a new group arrives
pInfo
->
groupTotal
+=
1
;
pInfo
->
rowsTotal
=
0
;
pInfo
->
currentOffset
=
pInfo
->
limit
.
offset
;
SColumnInfo
*
pColInfo
=
&
pColInfoData
->
info
;
char
*
d
=
rowIndex
*
pColInfo
->
bytes
+
(
char
*
)
pColInfoData
->
pData
;
int32_t
ret
=
columnValueAscendingComparator
(
pInfo
->
prevRow
[
i
],
d
,
pColInfo
->
type
,
pColInfo
->
bytes
);
if
(
ret
!=
0
)
{
// it is a new group
samegroup
=
false
;
break
;
}
}
}
assert
(
pInfo
->
currentGroupOffset
==
0
);
if
(
!
samegroup
||
!
pInfo
->
hasPrev
)
{
pInfo
->
ignoreCurrentGroup
=
false
;
savePrevOrderColumns
(
pInfo
->
prevRow
,
pInfo
->
orderColumnList
,
pBlock
,
rowIndex
,
&
pInfo
->
hasPrev
);
if
(
pInfo
->
currentOffset
>=
pBlock
->
info
.
rows
)
{
pInfo
->
currentOffset
-=
pBlock
->
info
.
rows
;
}
else
{
if
(
pInfo
->
currentOffset
==
0
)
{
break
;
pInfo
->
currentOffset
=
pInfo
->
limit
.
offset
;
// reset the offset value for a new group
pInfo
->
rowsTotal
=
0
;
if
(
pInfo
->
currentGroupOffset
>
0
)
{
pInfo
->
ignoreCurrentGroup
=
true
;
pInfo
->
currentGroupOffset
-=
1
;
// now we are in the next group data
rowIndex
+=
1
;
continue
;
}
int32_t
remain
=
(
int32_t
)(
pBlock
->
info
.
rows
-
pInfo
->
currentOffset
);
pBlock
->
info
.
rows
=
remain
;
// A new group has arrived according to the result rows, and the group limitation has already reached.
// Let's jump out of current loop and return immediately.
if
(
pInfo
->
slimit
.
limit
>=
0
&&
pInfo
->
groupTotal
>=
pInfo
->
slimit
.
limit
)
{
setQueryStatus
(
pOperator
->
pRuntimeEnv
,
QUERY_COMPLETED
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
;
}
// move the remain rows of this data block to the front.
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
pInfo
->
groupTotal
+=
1
;
doHandleDataInCurrentGroup
(
pInfo
,
pBlock
,
rowIndex
);
int16_t
bytes
=
pColInfoData
->
info
.
bytes
;
memmove
(
pColInfoData
->
pData
,
pColInfoData
->
pData
+
bytes
*
pInfo
->
currentOffset
,
remain
*
bytes
);
}
else
{
// handle the offset in the same group
// All the data in current group needs to be discarded, due to the limit parameter in the SQL statement
if
(
pInfo
->
ignoreCurrentGroup
)
{
rowIndex
+=
1
;
continue
;
}
pInfo
->
currentOffset
=
0
;
break
;
doHandleDataInCurrentGroup
(
pInfo
,
pBlock
,
rowIndex
);
}
rowIndex
+=
1
;
}
}
if
(
pInfo
->
slimit
.
limit
>
0
&&
pInfo
->
groupTotal
>
pInfo
->
slimit
.
limit
)
{
// reach the group limit, abort
SSDataBlock
*
doSLimit
(
void
*
param
,
bool
*
newgroup
)
{
SOperatorInfo
*
pOperator
=
(
SOperatorInfo
*
)
param
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
if
(
pInfo
->
limit
.
limit
>
0
&&
(
pInfo
->
rowsTotal
+
pBlock
->
info
.
rows
>=
pInfo
->
limit
.
limit
))
{
pBlock
->
info
.
rows
=
(
int32_t
)(
pInfo
->
limit
.
limit
-
pInfo
->
rowsTotal
);
pInfo
->
rowsTotal
=
pInfo
->
limit
.
limit
;
SSLimitOperatorInfo
*
pInfo
=
pOperator
->
info
;
pInfo
->
pRes
->
info
.
rows
=
0
;
assert
(
pInfo
->
currentGroupOffset
>=
0
);
while
(
1
)
{
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
SSDataBlock
*
pBlock
=
pOperator
->
upstream
[
0
]
->
exec
(
pOperator
->
upstream
[
0
],
newgroup
);
publishOperatorProfEvent
(
pOperator
->
upstream
[
0
],
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
p
Info
->
slimit
.
limit
>
0
&&
pInfo
->
groupTotal
>=
pInfo
->
slimit
.
limit
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
if
(
p
Block
==
NULL
)
{
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
}
else
{
pInfo
->
rowsTotal
+=
pBlock
->
info
.
rows
;
}
ensureOutputBuf
(
pInfo
,
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
doSlimitImpl
(
pOperator
,
pInfo
,
pBlock
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
pInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pInfo
->
pRes
;
}
return
pBlock
;
// now the number of rows in current group is enough, let's return to the invoke function
if
(
pInfo
->
pRes
->
info
.
rows
>
pInfo
->
threshold
)
{
return
pInfo
->
pRes
;
}
}
}
src/client/src/tscSQLParser.c
浏览文件 @
f5eefb0f
...
...
@@ -6917,7 +6917,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
const
char
*
msg1
=
"interval not allowed in group by normal column"
;
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
SSchema
*
pSchema
=
tscGetTableSchema
(
pTableMetaInfo
->
pTableMeta
);
SSchema
*
tagSchema
=
NULL
;
...
...
src/query/inc/qExecutor.h
浏览文件 @
f5eefb0f
...
...
@@ -470,6 +470,11 @@ typedef struct SSLimitOperatorInfo {
char
**
prevRow
;
SArray
*
orderColumnList
;
bool
hasPrev
;
bool
ignoreCurrentGroup
;
SSDataBlock
*
pRes
;
// result buffer
int64_t
capacity
;
int64_t
threshold
;
}
SSLimitOperatorInfo
;
typedef
struct
SFilterOperatorInfo
{
...
...
@@ -481,7 +486,7 @@ typedef struct SFillOperatorInfo {
SFillInfo
*
pFillInfo
;
SSDataBlock
*
pRes
;
int64_t
totalInputRows
;
void
**
p
;
SSDataBlock
*
existNewGroupBlock
;
}
SFillOperatorInfo
;
...
...
@@ -544,9 +549,9 @@ typedef struct SMultiwayMergeInfo {
bool
hasDataBlockForNewGroup
;
SSDataBlock
*
pExistBlock
;
bool
hasPrev
;
bool
groupMix
;
SArray
*
udfInfo
;
bool
hasPrev
;
bool
multiGroupResults
;
}
SMultiwayMergeInfo
;
// todo support the disk-based sort
...
...
@@ -577,8 +582,8 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
SOperatorInfo
*
createDistinctOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createMultiwaySortOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
int32_t
numOfRows
,
void
*
merger
,
bool
groupMix
);
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
);
int32_t
numOfRows
,
void
*
merger
);
SOperatorInfo
*
createGlobalAggregateOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
param
,
SArray
*
pUdfInfo
,
bool
groupResultMixedUp
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createSLimitOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
void
*
merger
);
SOperatorInfo
*
createFilterOperatorInfo
(
SQueryRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
upstream
,
SExprInfo
*
pExpr
,
...
...
src/query/src/qExecutor.c
浏览文件 @
f5eefb0f
此差异已折叠。
点击以展开。
src/query/src/qFill.c
浏览文件 @
f5eefb0f
...
...
@@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
SColumnInfoData
*
pColData
=
taosArrayGet
(
pInput
->
pDataBlock
,
i
);
pFillInfo
->
pData
[
i
]
=
pColData
->
pData
;
if
(
TSDB_COL_IS_TAG
(
pCol
->
flag
)
/* || IS_VAR_DATA_TYPE(pCol->col.type)*/
)
{
// copy the tag value to tag value buffer
if
(
TSDB_COL_IS_TAG
(
pCol
->
flag
))
{
// copy the tag value to tag value buffer
SFillTagColInfo
*
pTag
=
&
pFillInfo
->
pTags
[
pCol
->
tagIndex
];
assert
(
pTag
->
col
.
colId
==
pCol
->
col
.
colId
);
memcpy
(
pTag
->
tagVal
,
pColData
->
pData
,
pCol
->
col
.
bytes
);
// TODO not memcpy??
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录