Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8a8d3f16
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8a8d3f16
编写于
6月 22, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: add multiple group tsdb reads to table merge scan
上级
acb3d121
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
84 addition
and
58 deletion
+84
-58
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-6
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+81
-50
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
8a8d3f16
...
...
@@ -840,8 +840,8 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
int32_t
doCreateMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
S
Array
*
dataReaders
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
S
TableListInfo
*
pTableListInfo
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
,
uint64_t
queryId
,
uint64_t
taskId
);
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8a8d3f16
...
...
@@ -4023,14 +4023,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
type
)
{
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
SArray
*
dataReaders
=
taosArrayInit
(
8
,
POINTER_BYTES
);
createScanTableListInfo
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
,
pTagCond
);
doCreateMultipleDataReaders
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
dataReaders
,
queryId
,
taskId
);
extractTableSchemaVersion
(
pHandle
,
pTableScanNode
->
scan
.
uid
,
pTaskInfo
);
SOperatorInfo
*
pOperator
=
createTableMergeScanOperatorInfo
(
pTableScanNode
,
dataReaders
,
pHandle
,
pTaskInfo
);
SOperatorInfo
*
pOperator
=
createTableMergeScanOperatorInfo
(
pTableScanNode
,
pTableListInfo
,
pHandle
,
pTaskInfo
,
queryId
,
taskId
);
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
return
pOperator
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
8a8d3f16
...
...
@@ -1908,8 +1908,13 @@ _error:
}
typedef
struct
STableMergeScanInfo
{
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
STableListInfo
*
tableListInfo
;
int32_t
tableStartIndex
;
int32_t
tableEndIndex
;
bool
hasGroupId
;
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
...
...
@@ -1920,11 +1925,9 @@ typedef struct STableMergeScanInfo {
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
bool
hasGroupId
;
uint64_t
groupId
;
STupleHandle
*
prefetchedTuple
;
SArray
*
sortSourceParams
;
SArray
*
sortSourceParams
;
uint64_t
queryId
;
uint64_t
taskId
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
...
...
@@ -1952,8 +1955,6 @@ typedef struct STableMergeScanInfo {
// window to check if current data block needs to be loaded.
SSampleExecInfo
sample
;
// sample execution info
int32_t
curTWinIdx
;
}
STableMergeScanInfo
;
int32_t
compareTableKeyInfoByGid
(
const
void
*
p1
,
const
void
*
p2
)
{
...
...
@@ -1963,8 +1964,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
}
int32_t
createScanTableListInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
)
{
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
&
pTableScanNode
->
scan
,
pTableListInfo
,
pTagCond
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -1984,11 +1984,10 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
}
int32_t
doCreateMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
)
{
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQueryTableDataCond
cond
=
{
0
};
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -2011,6 +2010,24 @@ _error:
return
code
;
}
int32_t
createMultipleDataReaders
(
SQueryTableDataCond
*
pQueryCond
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
int32_t
tableStartIdx
,
int32_t
tableEndIdx
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
)
{
for
(
int32_t
i
=
tableStartIdx
;
i
<=
tableEndIdx
;
++
i
)
{
STableListInfo
*
subListInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
subListInfo
));
subListInfo
->
pTableList
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
taosArrayPush
(
subListInfo
->
pTableList
,
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
));
tsdbReaderT
*
pReader
=
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
subListInfo
,
queryId
,
taskId
);
taosArrayPush
(
arrayReader
,
&
pReader
);
taosArrayDestroy
(
subListInfo
->
pTableList
);
taosMemoryFree
(
subListInfo
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
loadDataBlockFromOneTable
(
SOperatorInfo
*
pOperator
,
STableMergeScanInfo
*
pTableScanInfo
,
int32_t
readerIdx
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -2190,22 +2207,35 @@ SArray* generateSortByTsInfo(int32_t order) {
return
pList
;
}
int32_t
doOpenTableMergeScanOperator
(
SOperatorInfo
*
pOperator
)
{
int32_t
startGroupTableMergeScan
(
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
OPTR_IS_OPENED
(
pOperator
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
tableStartIdx
=
pInfo
->
tableStartIndex
;
int32_t
tableEndIdx
=
pInfo
->
tableEndIndex
;
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
STableListInfo
*
tableListInfo
=
pInfo
->
tableListInfo
;
createMultipleDataReaders
(
&
pInfo
->
cond
,
&
pInfo
->
readHandle
,
tableListInfo
,
tableStartIdx
,
tableEndIdx
,
pInfo
->
dataReaders
,
pInfo
->
queryId
,
pInfo
->
taskId
);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
(
tableEndIdx
-
tableStartIdx
+
1
+
1
);
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_MULTISOURCE_MERGE
,
pInfo
->
bufPageSize
,
numOfBufPage
,
pInfo
->
pSortInputBlock
,
pTaskInfo
->
id
.
str
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
getTableDataBlock
,
NULL
,
NULL
);
size_t
numReaders
=
taosArrayGetSize
(
pInfo
->
dataReaders
);
for
(
int32_t
i
=
0
;
i
<
numReaders
;
++
i
)
{
STableMergeScanSortSourceParam
param
=
{
0
};
param
.
readerIdx
=
i
;
param
.
pOperator
=
pOperator
;
param
.
inputBlock
=
createOneDataBlock
(
pInfo
->
pResBlock
,
false
);
taosArrayPush
(
pInfo
->
sortSourceParams
,
&
param
);
}
for
(
int32_t
i
=
0
;
i
<
numReaders
;
++
i
)
{
SSortSource
*
ps
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortSource
));
STableMergeScanSortSourceParam
*
param
=
taosArrayGet
(
pInfo
->
sortSourceParams
,
i
);
...
...
@@ -2219,9 +2249,22 @@ int32_t doOpenTableMergeScanOperator(SOperatorInfo* pOperator) {
longjmp
(
pTaskInfo
->
env
,
terrno
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
stopGroupTableMergeScan
(
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
taosArrayClear
(
pInfo
->
sortSourceParams
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
dataReaders
);
++
i
)
{
tsdbReaderT
*
reader
=
taosArrayGetP
(
pInfo
->
dataReaders
,
i
);
tsdbCleanupReadHandle
(
reader
);
}
taosArrayDestroy
(
pInfo
->
dataReaders
);
OPTR_SET_OPENED
(
pOperator
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2264,12 +2307,18 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
if
(
!
pInfo
->
hasGroupId
)
{
pInfo
->
hasGroupId
=
true
;
pInfo
->
tableStartIndex
=
0
;
pInfo
->
tableEndIndex
=
taosArrayGetSize
(
pInfo
->
tableListInfo
->
pTableList
)
-
1
;
startGroupTableMergeScan
(
pOperator
);
}
SSDataBlock
*
pBlock
=
getSortedTableMergeScanBlockData
(
pInfo
->
pSortHandle
,
pOperator
->
resultInfo
.
capacity
,
pOperator
);
if
(
pBlock
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
}
else
{
stopGroupTableMergeScan
(
pOperator
);
doSetOperatorCompleted
(
pOperator
);
}
return
pBlock
;
...
...
@@ -2279,17 +2328,11 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
STableMergeScanInfo
*
pTableScanInfo
=
(
STableMergeScanInfo
*
)
param
;
cleanupQueryTableDataCond
(
&
pTableScanInfo
->
cond
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
dataReaders
);
++
i
)
{
tsdbReaderT
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
i
);
tsdbCleanupReadHandle
(
reader
);
}
taosArrayDestroy
(
pTableScanInfo
->
dataReaders
);
if
(
pTableScanInfo
->
pColMatchInfo
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
pColMatchInfo
);
}
taosArrayDestroy
(
pTableScanInfo
->
sortSourceParams
);
pTableScanInfo
->
pResBlock
=
blockDataDestroy
(
pTableScanInfo
->
pResBlock
);
pTableScanInfo
->
pSortInputBlock
=
blockDataDestroy
(
pTableScanInfo
->
pSortInputBlock
);
...
...
@@ -2315,8 +2358,9 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SArray
*
dataReaders
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
STableListInfo
*
pTableListInfo
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
,
uint64_t
queryId
,
uint64_t
taskId
)
{
STableMergeScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMergeScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -2346,22 +2390,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo
->
sample
.
seed
=
taosGetTimestampSec
();
pInfo
->
dataBlockLoadFlag
=
pTableScanNode
->
dataRequired
;
pInfo
->
pFilterNode
=
pTableScanNode
->
scan
.
node
.
pConditions
;
pInfo
->
dataReaders
=
dataReaders
;
pInfo
->
tableListInfo
=
pTableListInfo
;
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColList
;
pInfo
->
curTWinIdx
=
0
;
pInfo
->
pResBlock
=
createResDataBlock
(
pDescNode
);
pInfo
->
dataReaders
=
taosArrayInit
(
64
,
POINTER_BYTES
);
pInfo
->
queryId
=
queryId
;
pInfo
->
taskId
=
taskId
;
pInfo
->
sortSourceParams
=
taosArrayInit
(
taosArrayGetSize
(
dataReaders
),
sizeof
(
STableMergeScanSortSourceParam
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
dataReaders
);
++
i
)
{
STableMergeScanSortSourceParam
*
param
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMergeScanSortSourceParam
));
param
->
readerIdx
=
i
;
param
->
pOperator
=
pOperator
;
param
->
inputBlock
=
createOneDataBlock
(
pInfo
->
pResBlock
,
false
);
taosArrayPush
(
pInfo
->
sortSourceParams
,
param
);
taosMemoryFree
(
param
);
}
pInfo
->
sortSourceParams
=
taosArrayInit
(
64
,
sizeof
(
STableMergeScanSortSourceParam
));
pInfo
->
pSortInfo
=
generateSortByTsInfo
(
pInfo
->
cond
.
order
);
pInfo
->
pSortInputBlock
=
createOneDataBlock
(
pInfo
->
pResBlock
,
false
);
...
...
@@ -2369,14 +2407,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t
rowSize
=
pInfo
->
pResBlock
->
info
.
rowSize
;
pInfo
->
bufPageSize
=
getProperSortPageSize
(
rowSize
);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
(
taosArrayGetSize
(
dataReaders
)
+
1
);
pInfo
->
hasGroupId
=
false
;
pInfo
->
prefetchedTuple
=
NULL
;
pOperator
->
name
=
"TableMergeScanOperator"
;
// TODO : change it
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
...
...
@@ -2386,7 +2417,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
initResultSizeInfo
(
pOperator
,
1024
);
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenTableMergeScanOperator
,
doTableMergeScan
,
NULL
,
NULL
,
destroyTableMergeScanOperatorInfo
,
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableMergeScan
,
NULL
,
NULL
,
destroyTableMergeScanOperatorInfo
,
NULL
,
NULL
,
getTableMergeScanExplainExecInfo
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录