Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dd0ea407
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dd0ea407
编写于
1月 18, 2023
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/fix/liaohj' into enh/3.0_planner_optimize
上级
554b8c64
c9a1b3ba
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
57 addition
and
23 deletion
+57
-23
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-1
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+14
-5
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+6
-1
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+9
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+19
-8
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+6
-5
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
dd0ea407
...
...
@@ -718,7 +718,8 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
);
void
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
);
void
resetLimitInfoForNextGroup
(
SLimitInfo
*
pLimitInfo
);
bool
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
);
void
applyAggFunctionOnPartialTuples
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
int32_t
numOfTotal
,
int32_t
numOfOutput
);
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
dd0ea407
...
...
@@ -584,7 +584,13 @@ int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDa
int32_t
index
=
0
;
int32_t
code
=
0
;
while
(
index
++
<
pRetrieveRsp
->
numOfBlocks
)
{
SSDataBlock
*
pb
=
createOneDataBlock
(
pExchangeInfo
->
pDummyBlock
,
false
);
SSDataBlock
*
pb
=
NULL
;
if
(
taosArrayGetSize
(
pExchangeInfo
->
pRecycledBlocks
)
>
0
)
{
pb
=
*
(
SSDataBlock
**
)
taosArrayPop
(
pExchangeInfo
->
pRecycledBlocks
);
blockDataCleanup
(
pb
);
}
else
{
pb
=
createOneDataBlock
(
pExchangeInfo
->
pDummyBlock
,
false
);
}
code
=
extractDataBlockFromFetchRsp
(
pb
,
pStart
,
NULL
,
&
pStart
);
if
(
code
!=
0
)
{
...
...
@@ -732,9 +738,7 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
}
// reset the value for a new group data
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
resetLimitInfoForNextGroup
(
pLimitInfo
);
// existing rows that belongs to previous group.
if
(
pBlock
->
info
.
rows
>
0
)
{
return
PROJECT_RETRIEVE_DONE
;
...
...
@@ -760,7 +764,12 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
setOperatorCompleted
(
pOperator
);
}
else
{
// current group limitation is reached, and future blocks of this group need to be discarded.
if
(
pBlock
->
info
.
rows
==
0
)
{
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
return
PROJECT_RETRIEVE_DONE
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
dd0ea407
...
...
@@ -954,7 +954,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
return
-
1
;
}
}
else
{
qWarn
(
"failed to get tableIds from by table name: %s, reason: %s"
,
name
,
tstrerror
(
terrno
));
//
qWarn("failed to get tableIds from by table name: %s, reason: %s", name, tstrerror(terrno));
terrno
=
0
;
}
}
...
...
@@ -1771,6 +1771,11 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo
->
remainGroupOffset
=
slimit
.
offset
;
}
void
resetLimitInfoForNextGroup
(
SLimitInfo
*
pLimitInfo
)
{
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
}
uint64_t
tableListGetSize
(
const
STableListInfo
*
pTableList
)
{
ASSERT
(
taosArrayGetSize
(
pTableList
->
pTableList
)
==
taosHashGetSize
(
pTableList
->
map
));
return
taosArrayGetSize
(
pTableList
->
pTableList
);
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
dd0ea407
...
...
@@ -175,8 +175,7 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S
// reset the value for a new group data
// existing rows that belongs to previous group.
pLimitInfo
->
numOfOutputRows
=
0
;
pLimitInfo
->
remainOffset
=
pLimitInfo
->
limit
.
offset
;
resetLimitInfoForNextGroup
(
pLimitInfo
);
}
return
PROJECT_RETRIEVE_DONE
;
...
...
@@ -200,10 +199,18 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if
(
pLimitInfo
->
limit
.
limit
>=
0
&&
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keepRows
);
// TODO: optimize it later when partition by + limit
// all retrieved requirement has been fulfilled, let's finish this
if
((
pLimitInfo
->
slimit
.
limit
==
-
1
&&
pLimitInfo
->
currentGroupId
==
0
)
||
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
))
{
setOperatorCompleted
(
pOperator
);
}
else
{
// Even current group is done, there may be many vgroups remain existed, and we need to continue to retrieve data
// from next group. So let's continue this retrieve process
if
(
keepRows
==
0
)
{
return
PROJECT_RETRIEVE_CONTINUE
;
}
}
}
...
...
@@ -358,7 +365,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
}
// printDataBlock1(p, "project");
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
dd0ea407
...
...
@@ -257,7 +257,7 @@ static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlo
}
// todo handle the slimit info
void
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
)
{
bool
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
SOperatorInfo
*
pOperator
)
{
SLimit
*
pLimit
=
&
pLimitInfo
->
limit
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
...
...
@@ -266,6 +266,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
pLimitInfo
->
remainOffset
-=
pBlock
->
info
.
rows
;
blockDataEmpty
(
pBlock
);
qDebug
(
"current block ignore due to offset, current:%"
PRId64
", %s"
,
pLimitInfo
->
remainOffset
,
id
);
return
false
;
}
else
{
blockDataTrimFirstNRows
(
pBlock
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
...
...
@@ -274,13 +275,14 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo
if
(
pLimit
->
limit
!=
-
1
&&
pLimit
->
limit
<=
(
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
))
{
// limit the output rows
int32_t
overflowRows
=
pLimitInfo
->
numOfOutputRows
+
pBlock
->
info
.
rows
-
pLimit
->
limit
;
int32_t
keep
=
pBlock
->
info
.
rows
-
overflowRows
;
int32_t
keep
=
(
int32_t
)(
pLimit
->
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pBlock
,
keep
);
qDebug
(
"output limit %"
PRId64
" has reached, %s"
,
pLimit
->
limit
,
id
);
pOperator
->
status
=
OP_EXEC_DONE
;
return
true
;
}
return
false
;
}
static
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanBase
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
...
...
@@ -395,7 +397,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
}
}
applyLimitOffset
(
&
pTableScanInfo
->
limitInfo
,
pBlock
,
pTaskInfo
,
pOperator
);
bool
limitReached
=
applyLimitOffset
(
&
pTableScanInfo
->
limitInfo
,
pBlock
,
pTaskInfo
,
pOperator
);
if
(
limitReached
)
{
// set operator flag is done
setOperatorCompleted
(
pOperator
);
}
pCost
->
totalRows
+=
pBlock
->
info
.
rows
;
pTableScanInfo
->
limitInfo
.
numOfOutputRows
=
pCost
->
totalRows
;
...
...
@@ -772,8 +777,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// reset value for the next group data output
pOperator
->
status
=
OP_OPENED
;
pInfo
->
base
.
limitInfo
.
numOfOutputRows
=
0
;
pInfo
->
base
.
limitInfo
.
remainOffset
=
pInfo
->
base
.
limitInfo
.
limit
.
offset
;
resetLimitInfoForNextGroup
(
&
pInfo
->
base
.
limitInfo
);
int32_t
num
=
0
;
STableKeyInfo
*
pList
=
NULL
;
...
...
@@ -2685,9 +2689,12 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
taosArrayDestroy
(
pInfo
->
queryConds
);
pInfo
->
queryConds
=
NULL
;
resetLimitInfoForNextGroup
(
&
pInfo
->
limitInfo
);
return
TSDB_CODE_SUCCESS
;
}
// all data produced by this function only belongs to one group
// slimit/soffset does not need to be concerned here, since this function only deal with data within one group.
SSDataBlock
*
getSortedTableMergeScanBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pResBlock
,
int32_t
capacity
,
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -2707,10 +2714,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
}
qDebug
(
"%s get sorted row blocks, rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pResBlock
->
info
.
rows
);
applyLimitOffset
(
&
pInfo
->
limitInfo
,
pResBlock
,
pTaskInfo
,
pOperator
);
pInfo
->
limitInfo
.
numOfOutputRows
+=
pResBlock
->
info
.
rows
;
qDebug
(
"%s get sorted row block, rows:%d, limit:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pResBlock
->
info
.
rows
,
pInfo
->
limitInfo
.
numOfOutputRows
);
return
(
pResBlock
->
info
.
rows
>
0
)
?
pResBlock
:
NULL
;
}
...
...
@@ -2749,11 +2758,13 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
return
pBlock
;
}
else
{
// Data of this group are all dumped, let's try the next group
stopGroupTableMergeScan
(
pOperator
);
if
(
pInfo
->
tableEndIndex
>=
tableListSize
-
1
)
{
setOperatorCompleted
(
pOperator
);
break
;
}
pInfo
->
tableStartIndex
=
pInfo
->
tableEndIndex
+
1
;
pInfo
->
groupId
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
pInfo
->
tableStartIndex
)
->
groupId
;
startGroupTableMergeScan
(
pOperator
);
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
dd0ea407
...
...
@@ -680,11 +680,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
break
;
}
bool
limitReached
=
applyLimitOffset
(
&
pInfo
->
limitInfo
,
p
,
pTaskInfo
,
pOperator
);
if
(
limitReached
)
{
resetLimitInfoForNextGroup
(
&
pInfo
->
limitInfo
);
}
if
(
p
->
info
.
rows
>
0
)
{
applyLimitOffset
(
&
pInfo
->
limitInfo
,
p
,
pTaskInfo
,
pOperator
);
if
(
p
->
info
.
rows
>
0
)
{
break
;
}
break
;
}
}
...
...
@@ -698,7 +700,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
colDataAssign
(
pDst
,
pSrc
,
p
->
info
.
rows
,
&
pDataBlock
->
info
);
}
pInfo
->
limitInfo
.
numOfOutputRows
+=
p
->
info
.
rows
;
pDataBlock
->
info
.
rows
=
p
->
info
.
rows
;
pDataBlock
->
info
.
id
.
groupId
=
pInfo
->
groupId
;
pDataBlock
->
info
.
dataLoad
=
1
;
...
...
tests/parallel_test/cases.task
浏览文件 @
dd0ea407
...
...
@@ -146,6 +146,7 @@
,,y,script,./test.sh -f tsim/parser/precision_ns.sim
,,y,script,./test.sh -f tsim/parser/projection_limit_offset.sim
,,y,script,./test.sh -f tsim/parser/regex.sim
,,y,script,./test.sh -f tsim/parser/regressiontest.sim
,,y,script,./test.sh -f tsim/parser/select_across_vnodes.sim
,,y,script,./test.sh -f tsim/parser/select_distinct_tag.sim
,,y,script,./test.sh -f tsim/parser/select_from_cache_disk.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录