Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
39ac85b1
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
39ac85b1
编写于
6月 15, 2022
作者:
S
shenglian-zhou
提交者:
GitHub
6月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13842 from taosdata/szhou/feature/multi-tb-merge-scan
feat: add table merge scan operator
上级
8ac6495d
8efaa65a
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
546 addition
and
27 deletion
+546
-27
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+9
-0
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+16
-7
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+510
-15
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+4
-2
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+5
-1
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+2
-2
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
39ac85b1
...
...
@@ -912,6 +912,15 @@ int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
getTableList
(
void
*
metaHandle
,
int32_t
tableType
,
uint64_t
tableUid
,
STableListInfo
*
pListInfo
,
SNode
*
pTagCond
);
int32_t
createMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SArray
*
dataReaders
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
#ifdef __cplusplus
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
39ac85b1
...
...
@@ -4556,8 +4556,6 @@ static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
);
static
int32_t
getTableList
(
void
*
metaHandle
,
int32_t
tableType
,
uint64_t
tableUid
,
STableListInfo
*
pListInfo
,
SNode
*
pTagCond
);
static
SArray
*
extractColumnInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
...
...
@@ -4691,19 +4689,30 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SArray
*
groupKeys
=
extractPartitionColInfo
(
pTableScanNode
->
pPartitionKeys
);
code
=
generateGroupIdMap
(
pTableListInfo
,
pHandle
,
groupKeys
);
//
todo for json
code
=
generateGroupIdMap
(
pTableListInfo
,
pHandle
,
groupKeys
);
//
todo for json
taosArrayDestroy
(
groupKeys
);
if
(
code
){
if
(
code
)
{
tsdbCleanupReadHandle
(
pDataReader
);
return
NULL
;
}
SOperatorInfo
*
pOperator
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
SOperatorInfo
*
pOperator
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
type
)
{
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
SArray
*
dataReaders
=
taosArrayInit
(
8
,
POINTER_BYTES
);
createMultipleDataReaders
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
dataReaders
,
queryId
,
taskId
,
pTagCond
);
extractTableSchemaVersion
(
pHandle
,
pTableScanNode
->
scan
.
uid
,
pTaskInfo
);
SArray
*
groupKeys
=
extractPartitionColInfo
(
pTableScanNode
->
pPartitionKeys
);
generateGroupIdMap
(
pTableListInfo
,
pHandle
,
groupKeys
);
//todo for json
taosArrayDestroy
(
groupKeys
);
SOperatorInfo
*
pOperator
=
createTableMergeScanOperatorInfo
(
pTableScanNode
,
dataReaders
,
pHandle
,
pTaskInfo
);
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
type
)
{
return
createExchangeOperatorInfo
(
pHandle
->
pMsgCb
->
clientRpc
,
(
SExchangePhysiNode
*
)
pPhyNode
,
pTaskInfo
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
39ac85b1
...
...
@@ -630,7 +630,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
int32_t
len
=
tSerializeBlockDistInfo
(
NULL
,
0
,
&
blockDistInfo
);
char
*
p
=
taosMemoryCalloc
(
1
,
len
+
VARSTR_HEADER_SIZE
);
char
*
p
=
taosMemoryCalloc
(
1
,
len
+
VARSTR_HEADER_SIZE
);
tSerializeBlockDistInfo
(
varDataVal
(
p
),
len
,
&
blockDistInfo
);
varDataSetLen
(
p
,
len
);
...
...
@@ -642,7 +642,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
}
static
void
destroyBlockDistScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
}
...
...
@@ -654,24 +654,25 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
goto
_error
;
}
pInfo
->
pHandle
=
dataReader
;
pInfo
->
pHandle
=
dataReader
;
pInfo
->
pResBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
SColumnInfoData
infoData
=
{
0
};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
infoData
.
info
.
bytes
=
1024
;
infoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
infoData
.
info
.
bytes
=
1024
;
taosArrayPush
(
pInfo
->
pResBlock
->
pDataBlock
,
&
infoData
);
pOperator
->
name
=
"DataBlockInfoScanOperator"
;
pOperator
->
name
=
"DataBlockInfoScanOperator"
;
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
NULL
,
destroyBlockDistScanOperatorInfo
,
NULL
,
NULL
,
NULL
);
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doBlockInfoScan
,
NULL
,
NULL
,
destroyBlockDistScanOperatorInfo
,
NULL
,
NULL
,
NULL
);
return
pOperator
;
_error:
...
...
@@ -890,7 +891,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return
NULL
;
}
int32_t
current
=
pInfo
->
validBlockIndex
++
;
int32_t
current
=
pInfo
->
validBlockIndex
++
;
SSDataBlock
*
pBlock
=
taosArrayGetP
(
pInfo
->
pBlockLists
,
current
);
blockDataUpdateTsWindow
(
pBlock
,
0
);
return
pBlock
;
...
...
@@ -1058,7 +1059,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
SScanPhysiNode
*
pScanPhyNode
=
&
pTableScanNode
->
scan
;
SDataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
SOperatorInfo
*
pTableScanDummy
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
SOperatorInfo
*
pTableScanDummy
=
createTableScanOperatorInfo
(
pTableScanNode
,
pDataReader
,
pHandle
,
pTaskInfo
);
STableScanInfo
*
pSTInfo
=
(
STableScanInfo
*
)
pTableScanDummy
->
info
;
...
...
@@ -1512,7 +1513,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pRsp
->
numOfRows
,
pInfo
->
loadInfo
.
totalRows
);
if
(
pRsp
->
numOfRows
==
0
)
{
taosMemoryFree
(
pRsp
);
return
NULL
;
}
...
...
@@ -1849,3 +1849,498 @@ _error:
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
typedef
struct
STableMergeScanInfo
{
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
bool
hasGroupId
;
uint64_t
groupId
;
STupleHandle
*
prefetchedTuple
;
SArray
*
sortSourceParams
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
// int32_t prevGroupId; // previous table group id
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
SqlFunctionCtx
*
pCtx
;
// which belongs to the direct upstream operator operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowCellInfoOffset
;
SExprInfo
*
pExpr
;
SSDataBlock
*
pResBlock
;
SArray
*
pColMatchInfo
;
int32_t
numOfOutput
;
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SqlFunctionCtx
*
pPseudoCtx
;
// int32_t* rowCellInfoOffset;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SSampleExecInfo
sample
;
// sample execution info
int32_t
curTWinIdx
;
}
STableMergeScanInfo
;
int32_t
createMultipleDataReaders
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SArray
*
arrayReader
,
uint64_t
queryId
,
uint64_t
taskId
,
SNode
*
pTagCond
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pTableScanNode
->
scan
.
tableType
,
pTableScanNode
->
scan
.
uid
,
pTableListInfo
,
pTagCond
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
taosArrayGetSize
(
pTableListInfo
->
pTableList
)
==
0
)
{
qDebug
(
"no table qualified for query, TID:0x%"
PRIx64
", QID:0x%"
PRIx64
,
taskId
,
queryId
);
goto
_error
;
}
SQueryTableDataCond
cond
=
{
0
};
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
// TODO: free the sublist info and the table list in it
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
++
i
)
{
STableListInfo
*
subListInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
subListInfo
));
subListInfo
->
pTableList
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
taosArrayPush
(
subListInfo
->
pTableList
,
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
));
tsdbReaderT
*
pReader
=
tsdbReaderOpen
(
pHandle
->
vnode
,
&
cond
,
subListInfo
,
queryId
,
taskId
);
taosArrayPush
(
arrayReader
,
&
pReader
);
taosArrayDestroy
(
subListInfo
->
pTableList
);
taosMemoryFree
(
subListInfo
);
}
clearupQueryTableDataCond
(
&
cond
);
return
0
;
_error:
return
code
;
}
static
int32_t
loadDataBlockFromOneTable
(
SOperatorInfo
*
pOperator
,
STableMergeScanInfo
*
pTableScanInfo
,
int32_t
readerIdx
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SFileBlockLoadRecorder
*
pCost
=
&
pTableScanInfo
->
readRecorder
;
pCost
->
totalBlocks
+=
1
;
pCost
->
totalRows
+=
pBlock
->
info
.
rows
;
*
status
=
pInfo
->
dataBlockLoadFlag
;
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
||
overlapWithTimeWindow
(
&
pTableScanInfo
->
interval
,
&
pBlock
->
info
,
pTableScanInfo
->
cond
.
order
))
{
(
*
status
)
=
FUNC_DATA_REQUIRED_DATA_LOAD
;
}
SDataBlockInfo
*
pBlockInfo
=
&
pBlock
->
info
;
taosMemoryFreeClear
(
pBlock
->
pBlockAgg
);
if
(
*
status
==
FUNC_DATA_REQUIRED_FILTEROUT
)
{
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
filterOutBlocks
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_NOT_LOAD
)
{
qDebug
(
"%s data block skipped, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
skipBlocks
+=
1
;
// clear all data in pBlock that are set when handing the previous block
for
(
int32_t
i
=
0
;
i
<
pBlockInfo
->
numOfCols
;
++
i
)
{
SColumnInfoData
*
pcol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
pcol
->
pData
=
NULL
;
}
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_STATIS_LOAD
)
{
pCost
->
loadBlockStatis
+=
1
;
bool
allColumnsHaveAgg
=
true
;
SColumnDataAgg
**
pColAgg
=
NULL
;
tsdbReaderT
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
readerIdx
);
tsdbRetrieveDataBlockStatisInfo
(
reader
,
&
pColAgg
,
&
allColumnsHaveAgg
);
if
(
allColumnsHaveAgg
==
true
)
{
int32_t
numOfCols
=
pBlock
->
info
.
numOfCols
;
// todo create this buffer during creating operator
if
(
pBlock
->
pBlockAgg
==
NULL
)
{
pBlock
->
pBlockAgg
=
taosMemoryCalloc
(
numOfCols
,
POINTER_BYTES
);
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchInfo
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
pColMatchInfo
,
i
);
if
(
!
pColMatchInfo
->
output
)
{
continue
;
}
pBlock
->
pBlockAgg
[
pColMatchInfo
->
targetSlotId
]
=
pColAgg
[
i
];
}
return
TSDB_CODE_SUCCESS
;
}
else
{
// failed to load the block sma data, data block statistics does not exist, load data block instead
*
status
=
FUNC_DATA_REQUIRED_DATA_LOAD
;
}
}
ASSERT
(
*
status
==
FUNC_DATA_REQUIRED_DATA_LOAD
);
// todo filter data block according to the block sma data firstly
#if 0
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
return TSDB_CODE_SUCCESS;
}
#endif
pCost
->
totalCheckedRows
+=
pBlock
->
info
.
rows
;
pCost
->
loadBlocks
+=
1
;
tsdbReaderT
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
readerIdx
);
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
reader
,
NULL
);
if
(
pCols
==
NULL
)
{
return
terrno
;
}
relocateColumnData
(
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pCols
);
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
numOfPseudoExpr
>
0
)
{
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pTableScanInfo
->
pPseudoExpr
,
pTableScanInfo
->
numOfPseudoExpr
,
pBlock
);
}
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
);
int64_t
et
=
taosGetTimestampMs
();
pTableScanInfo
->
readRecorder
.
filterTime
+=
(
et
-
st
);
if
(
pBlock
->
info
.
rows
==
0
)
{
pCost
->
filterOutBlocks
+=
1
;
qDebug
(
"%s data block filter out, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
}
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
STableMergeScanSortSourceParam
{
SOperatorInfo
*
pOperator
;
int32_t
readerIdx
;
SSDataBlock
*
inputBlock
;
}
STableMergeScanSortSourceParam
;
static
SSDataBlock
*
getTableDataBlock
(
void
*
param
)
{
STableMergeScanSortSourceParam
*
source
=
param
;
SOperatorInfo
*
pOperator
=
source
->
pOperator
;
int32_t
readerIdx
=
source
->
readerIdx
;
SSDataBlock
*
pBlock
=
source
->
inputBlock
;
STableMergeScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
int64_t
st
=
taosGetTimestampUs
();
blockDataCleanup
(
pBlock
);
tsdbReaderT
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
readerIdx
);
while
(
tsdbNextDataBlock
(
reader
))
{
if
(
isTaskKilled
(
pOperator
->
pTaskInfo
))
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
// process this data block based on the probabilities
bool
processThisBlock
=
processBlockWithProbability
(
&
pTableScanInfo
->
sample
);
if
(
!
processThisBlock
)
{
continue
;
}
tsdbRetrieveDataBlockInfo
(
reader
,
&
pBlock
->
info
);
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlockFromOneTable
(
pOperator
,
pTableScanInfo
,
readerIdx
,
pBlock
,
&
status
);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
code
);
}
// current block is filter out according to filter condition, continue load the next block
if
(
status
==
FUNC_DATA_REQUIRED_FILTEROUT
||
pBlock
->
info
.
rows
==
0
)
{
continue
;
}
uint64_t
*
groupId
=
taosHashGet
(
pOperator
->
pTaskInfo
->
tableqinfoList
.
map
,
&
pBlock
->
info
.
uid
,
sizeof
(
int64_t
));
if
(
groupId
)
{
pBlock
->
info
.
groupId
=
*
groupId
;
}
pOperator
->
resultInfo
.
totalRows
=
pTableScanInfo
->
readRecorder
.
totalRows
;
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
return
pBlock
;
}
return
NULL
;
}
SArray
*
generateSortByTsInfo
(
int32_t
order
)
{
SArray
*
pList
=
taosArrayInit
(
1
,
sizeof
(
SBlockOrderInfo
));
SBlockOrderInfo
bi
=
{
0
};
bi
.
order
=
order
;
bi
.
slotId
=
0
;
bi
.
nullFirst
=
NULL_ORDER_FIRST
;
taosArrayPush
(
pList
,
&
bi
);
return
pList
;
}
int32_t
doOpenTableMergeScanOperator
(
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
OPTR_IS_OPENED
(
pOperator
))
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
numOfBufPage
=
pInfo
->
sortBufSize
/
pInfo
->
bufPageSize
;
pInfo
->
pSortHandle
=
tsortCreateSortHandle
(
pInfo
->
pSortInfo
,
pInfo
->
pColMatchInfo
,
SORT_MULTISOURCE_MERGE
,
pInfo
->
bufPageSize
,
numOfBufPage
,
pInfo
->
pSortInputBlock
,
pTaskInfo
->
id
.
str
);
tsortSetFetchRawDataFp
(
pInfo
->
pSortHandle
,
getTableDataBlock
,
NULL
,
NULL
);
size_t
numReaders
=
taosArrayGetSize
(
pInfo
->
dataReaders
);
for
(
int32_t
i
=
0
;
i
<
numReaders
;
++
i
)
{
SSortSource
*
ps
=
taosMemoryCalloc
(
1
,
sizeof
(
SSortSource
));
STableMergeScanSortSourceParam
*
param
=
taosArrayGet
(
pInfo
->
sortSourceParams
,
i
);
ps
->
param
=
param
;
tsortAddSource
(
pInfo
->
pSortHandle
,
ps
);
}
int32_t
code
=
tsortOpen
(
pInfo
->
pSortHandle
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
terrno
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
OPTR_SET_OPENED
(
pOperator
);
return
TSDB_CODE_SUCCESS
;
}
SSDataBlock
*
getSortedTableMergeScanBlockData
(
SSortHandle
*
pHandle
,
int32_t
capacity
,
SOperatorInfo
*
pOperator
)
{
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SSDataBlock
*
p
=
tsortGetSortedDataBlock
(
pHandle
);
if
(
p
==
NULL
)
{
return
NULL
;
}
blockDataEnsureCapacity
(
p
,
capacity
);
while
(
1
)
{
STupleHandle
*
pTupleHandle
=
NULL
;
if
(
pInfo
->
prefetchedTuple
==
NULL
)
{
pTupleHandle
=
tsortNextTuple
(
pHandle
);
}
else
{
pTupleHandle
=
pInfo
->
prefetchedTuple
;
pInfo
->
groupId
=
tsortGetGroupId
(
pTupleHandle
);
pInfo
->
prefetchedTuple
=
NULL
;
}
if
(
pTupleHandle
==
NULL
)
{
break
;
}
uint64_t
tupleGroupId
=
tsortGetGroupId
(
pTupleHandle
);
if
(
!
pInfo
->
hasGroupId
)
{
pInfo
->
groupId
=
tupleGroupId
;
pInfo
->
hasGroupId
=
true
;
appendOneRowToDataBlock
(
p
,
pTupleHandle
);
}
else
if
(
pInfo
->
groupId
==
tupleGroupId
)
{
appendOneRowToDataBlock
(
p
,
pTupleHandle
);
}
else
{
pInfo
->
prefetchedTuple
=
pTupleHandle
;
break
;
}
if
(
p
->
info
.
rows
>=
capacity
)
{
break
;
}
}
qDebug
(
"%s get sorted row blocks, rows:%d"
,
GET_TASKID
(
pTaskInfo
),
p
->
info
.
rows
);
return
(
p
->
info
.
rows
>
0
)
?
p
:
NULL
;
}
SSDataBlock
*
doTableMergeScan
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableMergeScanInfo
*
pInfo
=
pOperator
->
info
;
int32_t
code
=
pOperator
->
fpSet
.
_openFn
(
pOperator
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
SSDataBlock
*
pBlock
=
getSortedTableMergeScanBlockData
(
pInfo
->
pSortHandle
,
pOperator
->
resultInfo
.
capacity
,
pOperator
);
if
(
pBlock
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
}
else
{
doSetOperatorCompleted
(
pOperator
);
}
return
pBlock
;
}
void
destroyTableMergeScanOperatorInfo
(
void
*
param
,
int32_t
numOfOutput
)
{
STableMergeScanInfo
*
pTableScanInfo
=
(
STableMergeScanInfo
*
)
param
;
clearupQueryTableDataCond
(
&
pTableScanInfo
->
cond
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
dataReaders
);
++
i
)
{
tsdbReaderT
*
reader
=
taosArrayGetP
(
pTableScanInfo
->
dataReaders
,
i
);
tsdbCleanupReadHandle
(
reader
);
}
taosArrayDestroy
(
pTableScanInfo
->
dataReaders
);
if
(
pTableScanInfo
->
pColMatchInfo
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
pColMatchInfo
);
}
taosArrayDestroy
(
pTableScanInfo
->
sortSourceParams
);
pTableScanInfo
->
pResBlock
=
blockDataDestroy
(
pTableScanInfo
->
pResBlock
);
pTableScanInfo
->
pSortInputBlock
=
blockDataDestroy
(
pTableScanInfo
->
pSortInputBlock
);
taosArrayDestroy
(
pTableScanInfo
->
pSortInfo
);
}
typedef
struct
STableMergeScanExecInfo
{
SFileBlockLoadRecorder
blockRecorder
;
SSortExecInfo
sortExecInfo
;
}
STableMergeScanExecInfo
;
int32_t
getTableMergeScanExplainExecInfo
(
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
)
{
ASSERT
(
pOptr
!=
NULL
);
// TODO: merge these two info into one struct
STableMergeScanExecInfo
*
execInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMergeScanExecInfo
));
STableMergeScanInfo
*
pInfo
=
pOptr
->
info
;
execInfo
->
blockRecorder
=
pInfo
->
readRecorder
;
execInfo
->
sortExecInfo
=
tsortGetSortExecInfo
(
pInfo
->
pSortHandle
);
*
pOptrExplain
=
execInfo
;
*
len
=
sizeof
(
STableMergeScanExecInfo
);
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SArray
*
dataReaders
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
STableMergeScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMergeScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
SDataBlockDescNode
*
pDescNode
=
pTableScanNode
->
scan
.
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
SArray
*
pColList
=
extractColMatchInfo
(
pTableScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
pTaskInfo
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
initQueryTableDataCond
(
&
pInfo
->
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
if
(
pTableScanNode
->
scan
.
pScanPseudoCols
!=
NULL
)
{
pInfo
->
pPseudoExpr
=
createExprInfo
(
pTableScanNode
->
scan
.
pScanPseudoCols
,
NULL
,
&
pInfo
->
numOfPseudoExpr
);
pInfo
->
pPseudoCtx
=
createSqlFunctionCtx
(
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
&
pInfo
->
rowCellInfoOffset
);
}
pInfo
->
scanInfo
=
(
SScanInfo
){.
numOfAsc
=
pTableScanNode
->
scanSeq
[
0
],
.
numOfDesc
=
pTableScanNode
->
scanSeq
[
1
]};
pInfo
->
readHandle
=
*
readHandle
;
pInfo
->
interval
=
extractIntervalInfo
(
pTableScanNode
);
pInfo
->
sample
.
sampleRatio
=
pTableScanNode
->
ratio
;
pInfo
->
sample
.
seed
=
taosGetTimestampSec
();
pInfo
->
dataBlockLoadFlag
=
pTableScanNode
->
dataRequired
;
pInfo
->
pFilterNode
=
pTableScanNode
->
scan
.
node
.
pConditions
;
pInfo
->
dataReaders
=
dataReaders
;
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColList
;
pInfo
->
curTWinIdx
=
0
;
pInfo
->
pResBlock
=
createResDataBlock
(
pDescNode
);
pInfo
->
sortSourceParams
=
taosArrayInit
(
taosArrayGetSize
(
dataReaders
),
sizeof
(
STableMergeScanSortSourceParam
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
dataReaders
);
++
i
)
{
STableMergeScanSortSourceParam
*
param
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMergeScanSortSourceParam
));
param
->
readerIdx
=
i
;
param
->
pOperator
=
pOperator
;
param
->
inputBlock
=
createOneDataBlock
(
pInfo
->
pResBlock
,
false
);
taosArrayPush
(
pInfo
->
sortSourceParams
,
param
);
taosMemoryFree
(
param
);
}
pInfo
->
pSortInfo
=
generateSortByTsInfo
(
pInfo
->
cond
.
order
);
pInfo
->
pSortInputBlock
=
createOneDataBlock
(
pInfo
->
pResBlock
,
false
);
int32_t
rowSize
=
pInfo
->
pResBlock
->
info
.
rowSize
;
pInfo
->
bufPageSize
=
rowSize
<
1024
?
1024
:
rowSize
*
2
;
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
16
;
pInfo
->
hasGroupId
=
false
;
pInfo
->
prefetchedTuple
=
NULL
;
pOperator
->
name
=
"TableMergeScanOperator"
;
// TODO : change it
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfExprs
=
numOfCols
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
initResultSizeInfo
(
pOperator
,
1024
);
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenTableMergeScanOperator
,
doTableMergeScan
,
NULL
,
NULL
,
destroyTableMergeScanOperatorInfo
,
NULL
,
NULL
,
getTableMergeScanExplainExecInfo
);
pOperator
->
cost
.
openCost
=
0
;
return
pOperator
;
_error:
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
return
NULL
;
}
source/libs/executor/src/sortoperator.c
浏览文件 @
39ac85b1
...
...
@@ -76,7 +76,9 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
colDataAppendNULL
(
pColInfo
,
pBlock
->
info
.
rows
);
}
else
{
char
*
pData
=
tsortGetValue
(
pTupleHandle
,
i
);
colDataAppend
(
pColInfo
,
pBlock
->
info
.
rows
,
pData
,
false
);
if
(
pData
!=
NULL
)
{
colDataAppend
(
pColInfo
,
pBlock
->
info
.
rows
,
pData
,
false
);
}
}
}
...
...
@@ -427,7 +429,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
pInfo
->
pInputBlock
=
pInputBlock
;
pOperator
->
name
=
"MultiwaySortMerge"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_MERGE
;
pOperator
->
blocking
=
tru
e
;
pOperator
->
blocking
=
fals
e
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
...
...
source/libs/executor/src/tsort.c
浏览文件 @
39ac85b1
...
...
@@ -738,7 +738,11 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
void
*
tsortGetValue
(
STupleHandle
*
pVHandle
,
int32_t
colIndex
)
{
SColumnInfoData
*
pColInfo
=
TARRAY_GET_ELEM
(
pVHandle
->
pBlock
->
pDataBlock
,
colIndex
);
return
colDataGetData
(
pColInfo
,
pVHandle
->
rowIndex
);
if
(
pColInfo
->
pData
==
NULL
)
{
return
NULL
;
}
else
{
return
colDataGetData
(
pColInfo
,
pVHandle
->
rowIndex
);
}
}
uint64_t
tsortGetGroupId
(
STupleHandle
*
pVHandle
)
{
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
39ac85b1
...
...
@@ -470,8 +470,8 @@ static ENodeType getScanOperatorType(EScanType scanType) {
case
SCAN_TYPE_STREAM
:
return
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
case
SCAN_TYPE_TABLE_MERGE
:
return
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
//
return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
//
return QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
return
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
;
default:
break
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录