Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c1f5f825
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c1f5f825
编写于
6月 22, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: prepare for multiple group tsdb reads
上级
fadc11b6
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
75 addition
and
51 deletion
+75
-51
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+7
-2
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+11
-9
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+53
-37
未找到文件。
source/dnode/vnode/inc/vnode.h
浏览文件 @
c1f5f825
...
...
@@ -195,6 +195,7 @@ struct SVnodeCfg {
typedef
struct
{
TSKEY
lastKey
;
uint64_t
uid
;
uint64_t
groupId
;
}
STableKeyInfo
;
struct
SMetaEntry
{
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
c1f5f825
...
...
@@ -2845,7 +2845,7 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
break
;
}
STableKeyInfo
info
=
{.
lastKey
=
TSKEY_INITIAL_VAL
,
uid
=
id
};
STableKeyInfo
info
=
{.
lastKey
=
TSKEY_INITIAL_VAL
,
uid
=
id
,
.
groupId
=
0
};
taosArrayPush
(
list
,
&
info
);
}
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
c1f5f825
...
...
@@ -834,14 +834,19 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
createMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
int32_t
createScanTableListInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
);
int32_t
doCreateMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
);
uint64_t
taskId
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SArray
*
dataReaders
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
int32_t
generateGroupIdMap
(
STableListInfo
*
pTableListInfo
,
SReadHandle
*
pHandle
,
SArray
*
groupKey
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/src/executil.c
浏览文件 @
c1f5f825
...
...
@@ -239,7 +239,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
res
);
i
++
)
{
STableKeyInfo
info
=
{.
lastKey
=
TSKEY_INITIAL_VAL
,
.
uid
=
*
(
uint64_t
*
)
taosArrayGet
(
res
,
i
)};
STableKeyInfo
info
=
{.
lastKey
=
TSKEY_INITIAL_VAL
,
.
uid
=
*
(
uint64_t
*
)
taosArrayGet
(
res
,
i
)
,
.
groupId
=
0
};
taosArrayPush
(
pListInfo
->
pTableList
,
&
info
);
}
taosArrayDestroy
(
res
);
...
...
@@ -247,7 +247,7 @@ int32_t getTableList(void* metaHandle, SScanPhysiNode* pScanNode, STableListInfo
code
=
tsdbGetAllTableList
(
metaHandle
,
tableUid
,
pListInfo
->
pTableList
);
}
}
else
{
// Create one table group.
STableKeyInfo
info
=
{.
lastKey
=
0
,
.
uid
=
tableUid
};
STableKeyInfo
info
=
{.
lastKey
=
0
,
.
uid
=
tableUid
,
.
groupId
=
0
};
taosArrayPush
(
pListInfo
->
pTableList
,
&
info
);
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
c1f5f825
...
...
@@ -3968,14 +3968,16 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
}
}
int32_t
len
=
(
int32_t
)(
pStart
-
(
char
*
)
keyBuf
);
uint64_t
*
groupId
=
taosHashGet
(
pTableListInfo
->
map
,
keyBuf
,
len
);
if
(
groupId
)
{
taosHashPut
(
pTableListInfo
->
map
,
&
(
info
->
uid
),
sizeof
(
uint64_t
),
groupId
,
sizeof
(
uint64_t
));
}
else
{
uint64_t
*
pGroupId
=
taosHashGet
(
pTableListInfo
->
map
,
keyBuf
,
len
);
if
(
!
pGroupId
)
{
uint64_t
tmpId
=
calcGroupId
(
keyBuf
,
len
);
info
->
groupId
=
tmpId
;
taosHashPut
(
pTableListInfo
->
map
,
&
(
info
->
uid
),
sizeof
(
uint64_t
),
&
tmpId
,
sizeof
(
uint64_t
));
}
else
{
info
->
groupId
=
*
pGroupId
;
}
metaReaderClear
(
&
mr
);
...
...
@@ -4023,11 +4025,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
SArray
*
dataReaders
=
taosArrayInit
(
8
,
POINTER_BYTES
);
createMultipleDataReaders
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
dataReaders
,
queryId
,
taskId
,
pTagCond
);
createScanTableListInfo
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
,
pTagCond
);
doCreateMultipleDataReaders
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
dataReaders
,
queryId
,
taskId
);
extractTableSchemaVersion
(
pHandle
,
pTableScanNode
->
scan
.
uid
,
pTaskInfo
);
SArray
*
groupKeys
=
extractPartitionColInfo
(
pTableScanNode
->
pPartitionTags
);
generateGroupIdMap
(
pTableListInfo
,
pHandle
,
groupKeys
);
// todo for json
taosArrayDestroy
(
groupKeys
);
SOperatorInfo
*
pOperator
=
createTableMergeScanOperatorInfo
(
pTableScanNode
,
dataReaders
,
pHandle
,
pTaskInfo
);
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
c1f5f825
...
...
@@ -591,12 +591,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pInfo
->
dataReader
=
pReadHandle
;
// pInfo->prevGroupId = -1;
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScanImpl
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
);
return
pOperator
;
...
...
@@ -610,7 +610,7 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
metaGetTableEntryByUid
(
&
mr
,
uid
);
if
(
mr
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
int32_t
numOfCols
=
mr
.
me
.
stbEntry
.
schemaRow
.
nCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
rowLen
+=
mr
.
me
.
stbEntry
.
schemaRow
.
pSchema
[
i
].
bytes
;
}
}
else
if
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
...
...
@@ -618,12 +618,12 @@ static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) {
metaGetTableEntryByUid
(
&
mr
,
suid
);
int32_t
numOfCols
=
mr
.
me
.
stbEntry
.
schemaRow
.
nCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
rowLen
+=
mr
.
me
.
stbEntry
.
schemaRow
.
pSchema
[
i
].
bytes
;
}
}
else
if
(
mr
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
int32_t
numOfCols
=
mr
.
me
.
ntbEntry
.
schemaRow
.
nCols
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
rowLen
+=
mr
.
me
.
ntbEntry
.
schemaRow
.
pSchema
[
i
].
bytes
;
}
}
...
...
@@ -647,7 +647,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SSDataBlock
*
pBlock
=
pBlockScanInfo
->
pResBlock
;
int32_t
slotId
=
pOperator
->
exprSupp
.
pExprInfo
->
base
.
resSchema
.
slotId
;
int32_t
slotId
=
pOperator
->
exprSupp
.
pExprInfo
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
int32_t
len
=
tSerializeBlockDistInfo
(
NULL
,
0
,
&
blockDistInfo
);
...
...
@@ -679,23 +679,23 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
goto
_error
;
}
pInfo
->
pHandle
=
dataReader
;
pInfo
->
pHandle
=
dataReader
;
pInfo
->
readHandle
=
*
readHandle
;
pInfo
->
uid
=
uid
;
pInfo
->
pResBlock
=
createResDataBlock
(
pBlockScanNode
->
node
.
pOutputDataBlockDesc
);
pInfo
->
uid
=
uid
;
pInfo
->
pResBlock
=
createResDataBlock
(
pBlockScanNode
->
node
.
pOutputDataBlockDesc
);
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pBlockScanNode
->
pScanPseudoCols
,
NULL
,
&
numOfCols
);
int32_t
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
numOfCols
);
int32_t
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
numOfCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pOperator
->
name
=
"DataBlockDistScanOperator"
;
pOperator
->
name
=
"DataBlockDistScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
NULL
,
...
...
@@ -1872,26 +1872,25 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyNode
->
pScanPseudoCols
,
NULL
,
&
numOfExprs
);
SArray
*
colList
=
extractColMatchInfo
(
pPhyNode
->
pScanPseudoCols
,
pDescNode
,
&
num
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
pTableList
=
pTableListInfo
;
pInfo
->
pColMatchInfo
=
colList
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pInfo
->
pFilterNode
=
pPhyNode
->
node
.
pConditions
;
pInfo
->
pTableList
=
pTableListInfo
;
pInfo
->
pColMatchInfo
=
colList
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pInfo
->
pFilterNode
=
pPhyNode
->
node
.
pConditions
;
pOperator
->
name
=
"TagScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
initResultSizeInfo
(
pOperator
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -1957,25 +1956,42 @@ typedef struct STableMergeScanInfo {
}
STableMergeScanInfo
;
int32_t
createMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
)
{
int32_t
compareTableKeyInfoByGid
(
const
void
*
p1
,
const
void
*
p2
)
{
const
STableKeyInfo
*
info1
=
p1
;
const
STableKeyInfo
*
info2
=
p2
;
return
info1
->
groupId
-
info2
->
groupId
;
}
int32_t
createScanTableListInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
&
pTableScanNode
->
scan
,
pTableListInfo
,
pTagCond
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
return
code
;
}
if
(
taosArrayGetSize
(
pTableListInfo
->
pTableList
)
==
0
)
{
qDebug
(
"no table qualified for query, TID:0x%"
PRIx64
", QID:0x%"
PRIx64
,
taskId
,
queryId
);
goto
_error
;
return
TSDB_CODE_SUCCESS
;
}
SArray
*
groupKeys
=
extractPartitionColInfo
(
pTableScanNode
->
pPartitionTags
);
generateGroupIdMap
(
pTableListInfo
,
pHandle
,
groupKeys
);
// todo for json
if
(
groupKeys
)
{
taosArraySort
(
pTableListInfo
->
pTableList
,
compareTableKeyInfoByGid
);
}
taosArrayDestroy
(
groupKeys
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
doCreateMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SQueryTableDataCond
cond
=
{
0
};
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
// TODO: free the sublist info and the table list in it
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
++
i
)
{
STableListInfo
*
subListInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
subListInfo
));
subListInfo
->
pTableList
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
...
...
@@ -1989,7 +2005,7 @@ int32_t createMultipleDataReaders(STableScanPhysiNode* pTableScanNode, SReadHand
}
cleanupQueryTableDataCond
(
&
cond
);
return
0
;
return
TSDB_CODE_SUCCESS
;
_error:
return
code
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录