Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
61634070
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
61634070
编写于
10月 28, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
other: merge 3.0
上级
307b7e0a
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
380 addition
and
30 deletion
+380
-30
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+0
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+380
-29
未找到文件。
source/dnode/vnode/inc/vnode.h
浏览文件 @
61634070
...
...
@@ -165,7 +165,6 @@ SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdLi
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
);
int32_t
tsdbGetFileBlocksDistInfo
(
STsdbReader
*
pReader
,
STableBlockDistInfo
*
pTableBlockInfo
);
int64_t
tsdbGetNumOfRowsInMemTable
(
STsdbReader
*
pHandle
);
bool
tsdbIsAscendingOrder
(
STsdbReader
*
pReader
);
void
*
tsdbGetIdx
(
SMeta
*
pMeta
);
void
*
tsdbGetIvtIdx
(
SMeta
*
pMeta
);
uint64_t
getReaderMaxVersion
(
STsdbReader
*
pReader
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
61634070
...
...
@@ -42,7 +42,7 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S
static
char
*
SYSTABLE_IDX_COLUMN
[]
=
{
"table_name"
,
"db_name"
,
"create_time"
,
"columns"
,
"ttl"
,
"stable_name"
,
"vgroup_id', 'uid"
,
"type"
};
static
char
*
SYSTABLE_
IDX_EXCEPT
[]
=
{
"db_name"
,
"vgroup_id"
};
static
char
*
SYSTABLE_
SPECIAL_COL
[]
=
{
"db_name"
,
"vgroup_id"
};
typedef
int32_t
(
*
__sys_filte
)(
void
*
pMeta
,
SNode
*
cond
,
SArray
*
result
);
typedef
int32_t
(
*
__sys_check
)(
SNode
*
cond
);
...
...
@@ -3267,15 +3267,79 @@ static int tableUidCompare(const void* a, const void* b) {
}
return
u1
<
u2
?
-
1
:
1
;
}
typedef
struct
MergeIndex
{
int
idx
;
int
len
;
}
MergeIndex
;
static
FORCE_INLINE
int
optSysBinarySearch
(
SArray
*
arr
,
int
s
,
int
e
,
uint64_t
k
)
{
uint64_t
v
;
int32_t
m
;
while
(
s
<=
e
)
{
m
=
s
+
(
e
-
s
)
/
2
;
v
=
*
(
uint64_t
*
)
taosArrayGet
(
arr
,
m
);
if
(
v
>=
k
)
{
e
=
m
-
1
;
}
else
{
s
=
m
+
1
;
}
}
return
s
;
}
void
optSysIntersection
(
SArray
*
in
,
SArray
*
out
)
{
int32_t
sz
=
(
int32_t
)
taosArrayGetSize
(
in
);
if
(
sz
<=
0
)
{
return
;
}
MergeIndex
*
mi
=
taosMemoryCalloc
(
sz
,
sizeof
(
MergeIndex
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
t
=
taosArrayGetP
(
in
,
i
);
mi
[
i
].
len
=
(
int32_t
)
taosArrayGetSize
(
t
);
mi
[
i
].
idx
=
0
;
}
SArray
*
base
=
taosArrayGetP
(
in
,
0
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
base
);
i
++
)
{
uint64_t
tgt
=
*
(
uint64_t
*
)
taosArrayGet
(
base
,
i
);
bool
has
=
true
;
for
(
int
j
=
1
;
j
<
taosArrayGetSize
(
in
);
j
++
)
{
SArray
*
oth
=
taosArrayGetP
(
in
,
j
);
int
mid
=
optSysBinarySearch
(
oth
,
mi
[
j
].
idx
,
mi
[
j
].
len
-
1
,
tgt
);
if
(
mid
>=
0
&&
mid
<
mi
[
j
].
len
)
{
uint64_t
val
=
*
(
uint64_t
*
)
taosArrayGet
(
oth
,
mid
);
has
=
(
val
==
tgt
?
true
:
false
);
mi
[
j
].
idx
=
mid
;
}
else
{
has
=
false
;
}
}
if
(
has
==
true
)
{
taosArrayPush
(
out
,
&
tgt
);
}
}
taosMemoryFreeClear
(
mi
);
}
static
int32_t
optSysMergeRslt
(
SArray
*
mRslt
,
SArray
*
rslt
)
{
// TODO, find comm mem from mRslt
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
mRslt
);
i
++
)
{
SArray
*
a
R
slt
=
taosArrayGetP
(
mRslt
,
i
);
taosArray
AddAll
(
rslt
,
aRslt
);
SArray
*
a
r
slt
=
taosArrayGetP
(
mRslt
,
i
);
taosArray
Sort
(
arslt
,
tableUidCompare
);
}
taosArraySort
(
rslt
,
tableUidCompare
);
taosArrayRemoveDuplicate
(
rslt
,
tableUidCompare
,
NULL
);
optSysIntersection
(
mRslt
,
rslt
);
return
0
;
}
static
int32_t
optSysSpecialColumn
(
SNode
*
cond
)
{
SOperatorNode
*
pOper
=
(
SOperatorNode
*
)
cond
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pOper
->
pLeft
;
for
(
int
i
=
0
;
i
<
sizeof
(
SYSTABLE_SPECIAL_COL
)
/
sizeof
(
SYSTABLE_SPECIAL_COL
[
0
]);
i
++
)
{
if
(
0
==
strcmp
(
pCol
->
colName
,
SYSTABLE_SPECIAL_COL
[
i
]))
{
return
1
;
}
}
return
0
;
}
...
...
@@ -3302,7 +3366,6 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
SNodeList
*
pList
=
(
SNodeList
*
)
pNode
->
pParameterList
;
int32_t
len
=
LIST_LENGTH
(
pList
);
if
(
len
<=
0
)
return
ret
;
bool
hasIdx
=
false
;
bool
hasRslt
=
true
;
...
...
@@ -3318,7 +3381,12 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
if
(
ret
==
0
)
{
// has index
hasIdx
=
true
;
taosArrayPush
(
mRslt
,
&
aRslt
);
if
(
optSysSpecialColumn
(
cell
->
pNode
)
==
0
)
{
taosArrayPush
(
mRslt
,
&
aRslt
);
}
else
{
// db_name/vgroup not result
taosArrayDestroy
(
aRslt
);
}
}
else
if
(
ret
==
-
2
)
{
// current vg
hasIdx
=
true
;
...
...
@@ -3389,7 +3457,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pInfo
->
readHandle
.
meta
,
0
);
int32_t
ret
=
metaGetTableEntryByUid
(
&
mr
,
*
uid
);
ret
=
metaGetTableEntryByUid
(
&
mr
,
*
uid
);
if
(
ret
<
0
)
{
metaReaderClear
(
&
mr
);
continue
;
...
...
@@ -4187,6 +4255,130 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle*
return
TSDB_CODE_SUCCESS
;
}
int32_t
createMultipleDataReaders2
(
SQueryTableDataCond
*
pQueryCond
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
int32_t
tableStartIdx
,
int32_t
tableEndIdx
,
STsdbReader
**
ppReader
,
const
char
*
idstr
)
{
STsdbReader
*
pReader
=
NULL
;
void
*
pStart
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
tableStartIdx
);
int32_t
num
=
tableEndIdx
-
tableStartIdx
+
1
;
int32_t
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
pStart
,
num
,
&
pReader
,
idstr
);
if
(
code
!=
0
)
{
return
code
;
}
*
ppReader
=
pReader
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
loadDataBlockFromOneTable2
(
SOperatorInfo
*
pOperator
,
STableMergeScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
uint64_t
uid
=
pBlock
->
info
.
uid
;
SFileBlockLoadRecorder
*
pCost
=
&
pTableScanInfo
->
readRecorder
;
pCost
->
totalBlocks
+=
1
;
pCost
->
totalRows
+=
pBlock
->
info
.
rows
;
*
status
=
pInfo
->
dataBlockLoadFlag
;
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
||
overlapWithTimeWindow
(
&
pTableScanInfo
->
interval
,
&
pBlock
->
info
,
pTableScanInfo
->
cond
.
order
))
{
(
*
status
)
=
FUNC_DATA_REQUIRED_DATA_LOAD
;
}
SDataBlockInfo
*
pBlockInfo
=
&
pBlock
->
info
;
taosMemoryFreeClear
(
pBlock
->
pBlockAgg
);
if
(
*
status
==
FUNC_DATA_REQUIRED_FILTEROUT
)
{
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
filterOutBlocks
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_NOT_LOAD
)
{
qDebug
(
"%s data block skipped, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
skipBlocks
+=
1
;
// clear all data in pBlock that are set when handing the previous block
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pBlock
->
pDataBlock
);
++
i
)
{
SColumnInfoData
*
pcol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
pcol
->
pData
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_STATIS_LOAD
)
{
pCost
->
loadBlockStatis
+=
1
;
bool
allColumnsHaveAgg
=
true
;
SColumnDataAgg
**
pColAgg
=
NULL
;
STsdbReader
*
reader
=
pTableScanInfo
->
pReader
;
tsdbRetrieveDatablockSMA
(
reader
,
&
pColAgg
,
&
allColumnsHaveAgg
);
if
(
allColumnsHaveAgg
==
true
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
// todo create this buffer during creating operator
if
(
pBlock
->
pBlockAgg
==
NULL
)
{
pBlock
->
pBlockAgg
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchItem
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
matchInfo
.
pList
,
i
);
if
(
!
pColMatchInfo
->
needOutput
)
{
continue
;
}
pBlock
->
pBlockAgg
[
pColMatchInfo
->
dstSlotId
]
=
pColAgg
[
i
];
}
return
TSDB_CODE_SUCCESS
;
}
else
{
// failed to load the block sma data, data block statistics does not exist, load data block instead
*
status
=
FUNC_DATA_REQUIRED_DATA_LOAD
;
}
}
ASSERT
(
*
status
==
FUNC_DATA_REQUIRED_DATA_LOAD
);
pCost
->
totalCheckedRows
+=
pBlock
->
info
.
rows
;
pCost
->
loadBlocks
+=
1
;
STsdbReader
*
reader
=
pTableScanInfo
->
pReader
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
reader
,
NULL
);
if
(
pCols
==
NULL
)
{
return
terrno
;
}
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pseudoSup
.
pExprInfo
,
pTableScanInfo
->
pseudoSup
.
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
)
{
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
&
pTableScanInfo
->
matchInfo
,
NULL
);
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pTableScanInfo
->
readRecorder
.
filterTime
+=
el
;
if
(
pBlock
->
info
.
rows
==
0
)
{
pCost
->
filterOutBlocks
+=
1
;
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d, elapsed time:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
,
el
);
}
else
{
qDebug
(
"%s data block filter applied, elapsed time:%.2f ms"
,
GET_TASKID
(
pTaskInfo
),
el
);
}
}
return
TSDB_CODE_SUCCESS
;
}
// todo refactor
static
int32_t
loadDataBlockFromOneTable
(
SOperatorInfo
*
pOperator
,
STableMergeScanInfo
*
pTableScanInfo
,
int32_t
readerIdx
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
...
...
@@ -4229,8 +4421,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
bool
allColumnsHaveAgg
=
true
;
SColumnDataAgg
**
pColAgg
=
NULL
;
STsdbReader
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
readerIdx
);
tsdbRetrieveDatablockSMA
(
reader
,
&
pColAgg
,
&
allColumnsHaveAgg
);
// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
if
(
allColumnsHaveAgg
==
true
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pBlock
->
pDataBlock
);
...
...
@@ -4309,9 +4500,141 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
typedef
struct
STableMergeScanSortSourceParam
{
SOperatorInfo
*
pOperator
;
int32_t
readerIdx
;
uint64_t
uid
;
SSDataBlock
*
inputBlock
;
}
STableMergeScanSortSourceParam
;
static
SSDataBlock
*
getTableDataBlockTemp
(
void
*
param
)
{
STableMergeScanSortSourceParam
*
source
=
param
;
SOperatorInfo
*
pOperator
=
source
->
pOperator
;
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
readIdx
=
source
->
readerIdx
;
SSDataBlock
*
pBlock
=
source
->
inputBlock
;
STableMergeScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
SQueryTableDataCond
*
pQueryCond
=
taosArrayGet
(
pTableScanInfo
->
queryConds
,
readIdx
);
blockDataCleanup
(
pBlock
);
int64_t
st
=
taosGetTimestampUs
();
void
*
p
=
taosArrayGet
(
pInfo
->
tableListInfo
->
pTableList
,
readIdx
+
pInfo
->
tableStartIndex
);
SReadHandle
*
pHandle
=
&
pInfo
->
readHandle
;
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
p
,
1
,
&
pInfo
->
pReader
,
GET_TASKID
(
pTaskInfo
));
STsdbReader
*
reader
=
pInfo
->
pReader
;
while
(
tsdbNextDataBlock
(
reader
))
{
if
(
isTaskKilled
(
pOperator
->
pTaskInfo
))
{
T_LONG_JMP
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
// process this data block based on the probabilities
bool
processThisBlock
=
processBlockWithProbability
(
&
pTableScanInfo
->
sample
);
if
(
!
processThisBlock
)
{
continue
;
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
if
(
pQueryCond
->
order
==
TSDB_ORDER_ASC
)
{
pQueryCond
->
twindows
.
skey
=
pBlock
->
info
.
window
.
ekey
+
1
;
}
else
{
pQueryCond
->
twindows
.
ekey
=
pBlock
->
info
.
window
.
skey
-
1
;
}
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlockFromOneTable
(
pOperator
,
pTableScanInfo
,
readIdx
,
pBlock
,
&
status
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pOperator
->
pTaskInfo
->
env
,
code
);
}
// current block is filter out according to filter condition, continue load the next block
if
(
status
==
FUNC_DATA_REQUIRED_FILTEROUT
||
pBlock
->
info
.
rows
==
0
)
{
continue
;
}
uint64_t
*
groupId
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
pBlock
->
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupId
)
{
pBlock
->
info
.
groupId
=
*
groupId
;
}
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
// pTableScanInfo->readRecorder.totalRows;
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
tsdbReaderClose
(
pInfo
->
pReader
);
pInfo
->
pReader
=
NULL
;
return
pBlock
;
}
tsdbReaderClose
(
pInfo
->
pReader
);
pInfo
->
pReader
=
NULL
;
return
NULL
;
}
static
SSDataBlock
*
getTableDataBlock2
(
void
*
param
)
{
STableMergeScanSortSourceParam
*
source
=
param
;
SOperatorInfo
*
pOperator
=
source
->
pOperator
;
int64_t
uid
=
source
->
uid
;
SSDataBlock
*
pBlock
=
source
->
inputBlock
;
STableMergeScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
int64_t
st
=
taosGetTimestampUs
();
blockDataCleanup
(
pBlock
);
STsdbReader
*
reader
=
pTableScanInfo
->
pReader
;
while
(
tsdbTableNextDataBlock
(
reader
,
uid
))
{
if
(
isTaskKilled
(
pOperator
->
pTaskInfo
))
{
T_LONG_JMP
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
// process this data block based on the probabilities
bool
processThisBlock
=
processBlockWithProbability
(
&
pTableScanInfo
->
sample
);
if
(
!
processThisBlock
)
{
continue
;
}
blockDataCleanup
(
pBlock
);
SDataBlockInfo
binfo
=
pBlock
->
info
;
tsdbRetrieveDataBlockInfo
(
reader
,
&
binfo
);
blockDataEnsureCapacity
(
pBlock
,
binfo
.
rows
);
pBlock
->
info
.
type
=
binfo
.
type
;
pBlock
->
info
.
uid
=
binfo
.
uid
;
pBlock
->
info
.
window
=
binfo
.
window
;
pBlock
->
info
.
rows
=
binfo
.
rows
;
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlockFromOneTable2
(
pOperator
,
pTableScanInfo
,
pBlock
,
&
status
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pOperator
->
pTaskInfo
->
env
,
code
);
}
// current block is filter out according to filter condition, continue load the next block
if
(
status
==
FUNC_DATA_REQUIRED_FILTEROUT
||
pBlock
->
info
.
rows
==
0
)
{
continue
;
}
uint64_t
*
groupId
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
pBlock
->
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupId
)
{
pBlock
->
info
.
groupId
=
*
groupId
;
}
pOperator
->
resultInfo
.
totalRows
=
pTableScanInfo
->
readRecorder
.
totalRows
;
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
return
pBlock
;
}
return
NULL
;
}
static
SSDataBlock
*
getTableDataBlock
(
void
*
param
)
{
STableMergeScanSortSourceParam
*
source
=
param
;
SOperatorInfo
*
pOperator
=
source
->
pOperator
;
...
...
@@ -4390,6 +4713,15 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
return
pList
;
}
int32_t
dumpSQueryTableCond
(
const
SQueryTableDataCond
*
src
,
SQueryTableDataCond
*
dst
)
{
memcpy
((
void
*
)
dst
,
(
void
*
)
src
,
sizeof
(
SQueryTableDataCond
));
dst
->
colList
=
taosMemoryCalloc
(
src
->
numOfCols
,
sizeof
(
SColumnInfo
));
for
(
int
i
=
0
;
i
<
src
->
numOfCols
;
i
++
)
{
dst
->
colList
[
i
]
=
src
->
colList
[
i
];
}
return
0
;
}
int32_t
startGroupTableMergeScan
(
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -4409,10 +4741,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t
tableStartIdx
=
pInfo
->
tableStartIndex
;
int32_t
tableEndIdx
=
pInfo
->
tableEndIndex
;
STableListInfo
*
tableListInfo
=
pInfo
->
tableListInfo
;
pInfo
->
dataReaders
=
taosArrayInit
(
64
,
POINTER_BYTES
);
createMultipleDataReaders
(
&
pInfo
->
cond
,
&
pInfo
->
readHandle
,
tableListInfo
,
tableStartIdx
,
tableEndIdx
,
pInfo
->
dataReaders
,
GET_TASKID
(
pTaskInfo
));
pInfo
->
pReader
=
NULL
;
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
...
...
@@ -4421,18 +4750,28 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
SORT_MULTISOURCE_MERGE
,
pInfo
->
bufPageSize
,
numOfBufPage
,
pInfo
->
pSortInputBlock
,
pTaskInfo
->
id
.
str
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
getTableDataBlock
,
NULL
,
NULL
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
getTableDataBlockTemp
,
NULL
,
NULL
);
// one table has one data block
int32_t
numOfTable
=
tableEndIdx
-
tableStartIdx
+
1
;
pInfo
->
queryConds
=
taosArrayInit
(
numOfTable
,
sizeof
(
SQueryTableDataCond
));
for
(
int32_t
i
=
0
;
i
<
numOfTable
;
++
i
)
{
STableKeyInfo
*
tableKeyInfo
=
taosArrayGet
(
pInfo
->
tableListInfo
->
pTableList
,
i
+
tableStartIdx
);
size_t
numReaders
=
taosArrayGetSize
(
pInfo
->
dataReaders
);
size_t
numReaders
=
taosArrayGetSize
(
pInfo
->
dataReaders
);
for
(
int32_t
i
=
0
;
i
<
numReaders
;
++
i
)
{
STableMergeScanSortSourceParam
param
=
{
0
};
param
.
readerIdx
=
i
;
param
.
pOperator
=
pOperator
;
param
.
inputBlock
=
createOneDataBlock
(
pInfo
->
pResBlock
,
false
);
taosArrayPush
(
pInfo
->
sortSourceParams
,
&
param
);
SQueryTableDataCond
cond
;
dumpSQueryTableCond
(
&
pInfo
->
cond
,
&
cond
);
taosArrayPush
(
pInfo
->
queryConds
,
&
cond
);
}
for
(
int32_t
i
=
0
;
i
<
num
Readers
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
OfTable
;
++
i
)
{
SSortSource
*
ps
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortSource
));
STableMergeScanSortSourceParam
*
param
=
taosArrayGet
(
pInfo
->
sortSourceParams
,
i
);
ps
->
param
=
param
;
...
...
@@ -4452,7 +4791,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
size_t
numReaders
=
taosArrayGetSize
(
pInfo
->
dataReader
s
);
int32_t
numOfTable
=
taosArrayGetSize
(
pInfo
->
queryCond
s
);
SSortExecInfo
sortExecInfo
=
tsortGetSortExecInfo
(
pInfo
->
pSortHandle
);
pInfo
->
sortExecInfo
.
sortMethod
=
sortExecInfo
.
sortMethod
;
...
...
@@ -4461,7 +4800,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo
->
sortExecInfo
.
readBytes
+=
sortExecInfo
.
readBytes
;
pInfo
->
sortExecInfo
.
writeBytes
+=
sortExecInfo
.
writeBytes
;
for
(
int32_t
i
=
0
;
i
<
num
Readers
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
num
OfTable
;
++
i
)
{
STableMergeScanSortSourceParam
*
param
=
taosArrayGet
(
pInfo
->
sortSourceParams
,
i
);
blockDataDestroy
(
param
->
inputBlock
);
}
...
...
@@ -4469,12 +4808,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
for
(
int32_t
i
=
0
;
i
<
numReaders
;
++
i
)
{
S
TsdbReader
*
reader
=
taosArrayGetP
(
pInfo
->
dataReader
s
,
i
);
t
sdbReaderClose
(
reader
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
queryConds
);
i
++
)
{
S
QueryTableDataCond
*
cond
=
taosArrayGet
(
pInfo
->
queryCond
s
,
i
);
t
aosMemoryFree
(
cond
->
colList
);
}
taosArrayDestroy
(
pInfo
->
dataReaders
);
pInfo
->
dataReaders
=
NULL
;
taosArrayDestroy
(
pInfo
->
queryConds
);
pInfo
->
queryConds
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -4558,13 +4898,24 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
void
destroyTableMergeScanOperatorInfo
(
void
*
param
)
{
STableMergeScanInfo
*
pTableScanInfo
=
(
STableMergeScanInfo
*
)
param
;
cleanupQueryTableDataCond
(
&
pTableScanInfo
->
cond
);
int32_t
numOfTable
=
taosArrayGetSize
(
pTableScanInfo
->
queryConds
);
for
(
int32_t
i
=
0
;
i
<
numOfTable
;
i
++
)
{
STableMergeScanSortSourceParam
*
p
=
taosArrayGet
(
pTableScanInfo
->
sortSourceParams
,
i
);
blockDataDestroy
(
p
->
inputBlock
);
}
taosArrayDestroy
(
pTableScanInfo
->
sortSourceParams
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
dataReaders
);
++
i
)
{
STsdbReader
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
i
);
tsdbReaderClose
(
reader
);
tsdbReaderClose
(
pTableScanInfo
->
pReader
);
pTableScanInfo
->
pReader
=
NULL
;
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
queryConds
);
i
++
)
{
SQueryTableDataCond
*
pCond
=
taosArrayGet
(
pTableScanInfo
->
queryConds
,
i
);
taosMemoryFree
(
pCond
->
colList
);
}
taosArrayDestroy
(
pTableScanInfo
->
dataReader
s
);
taosArrayDestroy
(
pTableScanInfo
->
queryCond
s
);
if
(
pTableScanInfo
->
matchInfo
.
pList
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
matchInfo
.
pList
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录