Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ed234f22
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ed234f22
编写于
10月 24, 2022
作者:
H
Haojun Liao
提交者:
GitHub
10月 24, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17599 from taosdata/fix/liao_cov
fix(query): refactor and fix some coverity issues.
上级
b2512f6b
e0d72e47
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
237 addition
and
324 deletion
+237
-324
source/dnode/mnode/impl/src/mndGrant.c
source/dnode/mnode/impl/src/mndGrant.c
+0
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+14
-3
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+14
-3
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+53
-71
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+23
-21
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+0
-16
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+1
-17
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+20
-17
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+13
-62
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+50
-53
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+43
-52
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+5
-6
未找到文件。
source/dnode/mnode/impl/src/mndGrant.c
浏览文件 @
ed234f22
...
...
@@ -21,10 +21,8 @@
static
int32_t
mndRetrieveGrant
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
int32_t
numOfRows
=
0
;
char
*
pWrite
;
int32_t
cols
=
0
;
char
tmp
[
32
];
char
tmp1
[
32
];
if
(
pShow
->
numOfRows
<
1
)
{
cols
=
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
ed234f22
...
...
@@ -1910,6 +1910,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if
(
minKey
==
k
.
ts
)
{
if
(
init
)
{
if
(
merge
.
pTSchema
==
NULL
)
{
return
code
;
}
tRowMerge
(
&
merge
,
pRow
);
}
else
{
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
TSDBROW_SVERSION
(
pRow
),
pReader
,
pBlockScanInfo
->
uid
);
...
...
@@ -1964,7 +1968,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge
(
&
merge
,
&
fRow1
);
}
else
{
init
=
true
;
int32_t
code
=
tRowMergerInit
(
&
merge
,
&
fRow1
,
pReader
->
pSchema
);
code
=
tRowMergerInit
(
&
merge
,
&
fRow1
,
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -1975,17 +1979,24 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if
(
minKey
==
key
)
{
TSDBROW
fRow
=
tsdbRowFromBlockData
(
pBlockData
,
pDumpInfo
->
rowIndex
);
if
(
!
init
)
{
int32_t
code
=
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
code
=
tRowMergerInit
(
&
merge
,
&
fRow
,
pReader
->
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
else
{
if
(
merge
.
pTSchema
==
NULL
)
{
return
code
;
}
tRowMerge
(
&
merge
,
&
fRow
);
}
doMergeRowsInFileBlocks
(
pBlockData
,
pBlockScanInfo
,
pReader
,
&
merge
);
}
}
if
(
merge
.
pTSchema
==
NULL
)
{
return
code
;
}
code
=
tRowMergerGetRow
(
&
merge
,
&
pTSRow
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -3232,7 +3243,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
TSDBROW_SVERSION
(
pRow
),
pReader
,
pBlockScanInfo
->
uid
);
int32_t
code
=
tRowMergerInit
(
&
merge
,
pRow
,
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
||
merge
.
pTSchema
==
NULL
)
{
return
code
;
}
...
...
source/libs/executor/inc/executil.h
浏览文件 @
ed234f22
...
...
@@ -74,7 +74,6 @@ typedef struct SResultRowPosition {
typedef
struct
SResKeyPos
{
SResultRowPosition
pos
;
uint64_t
groupId
;
// char parTbName[TSDB_TABLE_NAME_LEN];
char
key
[];
}
SResKeyPos
;
...
...
@@ -84,6 +83,18 @@ typedef struct SResultRowInfo {
SList
*
openWindow
;
}
SResultRowInfo
;
typedef
struct
SColMatchItem
{
int32_t
colId
;
int32_t
srcSlotId
;
int32_t
dstSlotId
;
bool
needOutput
;
}
SColMatchItem
;
typedef
struct
SColMatchInfo
{
SArray
*
pList
;
// SArray<SColMatchItem>
int32_t
matchType
;
// determinate the source according to col id or slot id
}
SColMatchInfo
;
struct
SqlFunctionCtx
;
size_t
getResultRowSize
(
struct
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
...
...
@@ -121,8 +132,8 @@ size_t getTableTagsBufLen(const SNodeList* pGroups);
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
int32_t
type
);
int32_t
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
int32_t
type
,
SColMatchInfo
*
pMatchInfo
);
void
createExprFromOneNode
(
SExprInfo
*
pExp
,
SNode
*
pNode
,
int16_t
slotId
);
void
createExprFromTargetNode
(
SExprInfo
*
pExp
,
STargetNode
*
pTargetNode
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
ed234f22
...
...
@@ -47,12 +47,7 @@ extern "C" {
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
...
...
@@ -240,7 +235,7 @@ typedef struct SOperatorInfo {
typedef
enum
{
EX_SOURCE_DATA_NOT_READY
=
0x1
,
EX_SOURCE_DATA_READY
=
0x2
,
EX_SOURCE_DATA_READY
=
0x2
,
EX_SOURCE_DATA_EXHAUSTED
=
0x3
,
}
EX_SOURCE_STATUS
;
...
...
@@ -289,15 +284,6 @@ typedef struct SExchangeInfo {
SLimitInfo
limitInfo
;
}
SExchangeInfo
;
typedef
struct
SColMatchInfo
{
int32_t
srcSlotId
;
// source slot id
int32_t
colId
;
int32_t
targetSlotId
;
bool
output
;
// todo remove this?
bool
reserved
;
int32_t
matchType
;
// determinate the source according to col id or slot id
}
SColMatchInfo
;
typedef
struct
SScanInfo
{
int32_t
numOfAsc
;
int32_t
numOfDesc
;
...
...
@@ -339,7 +325,7 @@ typedef struct STableScanInfo {
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
SSDataBlock
*
pResBlock
;
S
Array
*
pColM
atchInfo
;
S
ColMatchInfo
m
atchInfo
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
...
...
@@ -365,10 +351,9 @@ typedef struct STableMergeScanInfo {
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
...
...
@@ -380,43 +365,40 @@ typedef struct STableMergeScanInfo {
int32_t
*
rowEntryInfoOffset
;
SExprInfo
*
pExpr
;
SSDataBlock
*
pResBlock
;
S
Array
*
pColM
atchInfo
;
S
ColMatchInfo
m
atchInfo
;
int32_t
numOfOutput
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval
interval
;
SSampleExecInfo
sample
;
// sample execution info
SSortExecInfo
sortExecInfo
;
SInterval
interval
;
SSampleExecInfo
sample
;
// sample execution info
SSortExecInfo
sortExecInfo
;
}
STableMergeScanInfo
;
typedef
struct
STagScanInfo
{
SColumnInfo
*
pCols
;
SSDataBlock
*
pRes
;
S
Array
*
pColM
atchInfo
;
S
ColMatchInfo
m
atchInfo
;
int32_t
curPos
;
SReadHandle
readHandle
;
STableListInfo
*
pTableList
;
}
STagScanInfo
;
typedef
struct
SLastrowScanInfo
{
SSDataBlock
*
pRes
;
SReadHandle
readHandle
;
void
*
pLastrowReader
;
S
Array
*
pColM
atchInfo
;
int32_t
*
pSlotIds
;
SExprSupp
pseudoExprSup
;
int32_t
retrieveType
;
int32_t
currentGroupIndex
;
SSDataBlock
*
pBufferredRes
;
SArray
*
pUidList
;
int32_t
indexOfBufferedRes
;
SSDataBlock
*
pRes
;
SReadHandle
readHandle
;
void
*
pLastrowReader
;
S
ColMatchInfo
m
atchInfo
;
int32_t
*
pSlotIds
;
SExprSupp
pseudoExprSup
;
int32_t
retrieveType
;
int32_t
currentGroupIndex
;
SSDataBlock
*
pBufferredRes
;
SArray
*
pUidList
;
int32_t
indexOfBufferedRes
;
}
SLastrowScanInfo
;
typedef
enum
EStreamScanMode
{
...
...
@@ -483,28 +465,28 @@ typedef struct STimeWindowAggSupp {
}
STimeWindowAggSupp
;
typedef
struct
SStreamScanInfo
{
uint64_t
tableUid
;
// queried super table uid
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SExprSupp
tbnameCalSup
;
SExprSupp
tagCalSup
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
S
Array
*
pColMatchInfo
;
//
SNode
*
pCondition
;
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
SSDataBlock
*
pUpdateRes
;
// update SSDataBlock
int32_t
updateResIndex
;
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
STqReader
*
tqReader
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
uint64_t
tableUid
;
// queried super table uid
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SExprSupp
tbnameCalSup
;
SExprSupp
tagCalSup
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
S
ColMatchInfo
matchInfo
;
SNode
*
pCondition
;
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
SSDataBlock
*
pUpdateRes
;
// update SSDataBlock
int32_t
updateResIndex
;
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
STqReader
*
tqReader
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
EStreamScanMode
scanMode
;
SOperatorInfo
*
pStreamScanOp
;
...
...
@@ -559,8 +541,8 @@ typedef struct SSysTableScanInfo {
bool
showRewrite
;
SNode
*
pCondition
;
// db_name filter condition, to discard data that are not in current database
SMTbCursor
*
pCur
;
// cursor for iterate the local table meta store.
SSysTableIndex
*
pIdx
;
// idx for local table meta
S
Array
*
scanCols
;
// SArray<int16_t> scan column id list
SSysTableIndex
*
pIdx
;
// idx for local table meta
S
ColMatchInfo
matchInfo
;
SName
name
;
SSDataBlock
*
pRes
;
int64_t
numOfBlocks
;
// extract basic running information.
...
...
@@ -680,7 +662,7 @@ typedef struct SFillOperatorInfo {
SSDataBlock
*
existNewGroupBlock
;
STimeWindow
win
;
SNode
*
pCondition
;
S
Array
*
pColMatchCol
Info
;
S
ColMatchInfo
match
Info
;
int32_t
primaryTsCol
;
int32_t
primarySrcSlotId
;
uint64_t
curGroupId
;
// current handled group id
...
...
@@ -819,7 +801,7 @@ typedef struct SStreamFillOperatorInfo {
int32_t
srcDelRowIndex
;
SSDataBlock
*
pDelRes
;
SNode
*
pCondition
;
S
Array
*
pColMatchCol
Info
;
S
ColMatchInfo
match
Info
;
int32_t
primaryTsCol
;
int32_t
primarySrcSlotId
;
SStreamFillInfo
*
pFillInfo
;
...
...
@@ -863,7 +845,7 @@ typedef struct SSortOperatorInfo {
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
S
Array
*
pColMatchInfo
;
// for index map from table scan output
S
ColMatchInfo
matchInfo
;
int32_t
bufPageSize
;
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
...
...
@@ -932,7 +914,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
SColMatchInfo
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
SExprInfo
*
pPseudoExpr
,
int32_t
numOfPseudoExpr
,
SSDataBlock
*
pBlock
,
const
char
*
idStr
);
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
ed234f22
...
...
@@ -28,7 +28,7 @@
static
SSDataBlock
*
doScanCache
(
SOperatorInfo
*
pOperator
);
static
void
destroyLastrowScanOperator
(
void
*
param
);
static
int32_t
extractCacheScanSlotId
(
const
SArray
*
pColMatchInfo
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
**
pSlotIds
);
static
SArray
*
removeRedundantTsCol
(
SLastRowScanPhysiNode
*
pScanNode
,
SArray
*
pColMatchInfo
);
static
int32_t
removeRedundantTsCol
(
SLastRowScanPhysiNode
*
pScanNode
,
SColMatchInfo
*
pColMatchInfo
);
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pScanNode
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
...
...
@@ -46,10 +46,10 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
int32_t
numOfCols
=
0
;
SArray
*
pColMatchInfo
=
extractColMatchInfo
(
pScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
pInfo
->
pColMatchInfo
=
removeRedundantTsCol
(
pScanNode
,
pColM
atchInfo
);
code
=
extractColMatchInfo
(
pScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
removeRedundantTsCol
(
pScanNode
,
&
pInfo
->
m
atchInfo
);
code
=
extractCacheScanSlotId
(
pInfo
->
pColMatchInfo
,
pTaskInfo
,
&
pInfo
->
pSlotIds
);
code
=
extractCacheScanSlotId
(
pInfo
->
matchInfo
.
pList
,
pTaskInfo
,
&
pInfo
->
pSlotIds
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -64,7 +64,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
if
(
taosArrayGetSize
(
pTableList
->
pGroupList
)
==
taosArrayGetSize
(
pTableList
->
pTableList
))
{
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_ALL
|
(
pScanNode
->
ignoreNull
?
CACHESCAN_RETRIEVE_LAST
:
CACHESCAN_RETRIEVE_LAST_ROW
);
code
=
tsdbCacherowsReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pTableList
->
pTableList
,
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
),
&
pInfo
->
pLastrowReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -139,9 +139,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
if
(
pInfo
->
indexOfBufferedRes
<
pInfo
->
pBufferredRes
->
info
.
rows
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pColMatchInfo
);
++
i
)
{
SColMatchI
nfo
*
pMatchInfo
=
taosArrayGet
(
pInfo
->
pColMatchInfo
,
i
);
int32_t
slotId
=
pMatchInfo
->
targe
tSlotId
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
);
++
i
)
{
SColMatchI
tem
*
pMatchInfo
=
taosArrayGet
(
pInfo
->
matchInfo
.
pList
,
i
);
int32_t
slotId
=
pMatchInfo
->
ds
tSlotId
;
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pInfo
->
pBufferredRes
->
pDataBlock
,
slotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
slotId
);
...
...
@@ -188,7 +188,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SArray
*
pGroupTableList
=
taosArrayGetP
(
pTableList
->
pGroupList
,
pInfo
->
currentGroupIndex
);
tsdbCacherowsReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pGroupTableList
,
taosArrayGetSize
(
pInfo
->
pColMatchInfo
),
&
pInfo
->
pLastrowReader
);
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
),
&
pInfo
->
pLastrowReader
);
taosArrayClear
(
pInfo
->
pUidList
);
int32_t
code
=
tsdbRetrieveCacheRows
(
pInfo
->
pLastrowReader
,
pInfo
->
pRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
...
...
@@ -235,7 +235,7 @@ void destroyLastrowScanOperator(void* param) {
blockDataDestroy
(
pInfo
->
pBufferredRes
);
taosMemoryFree
(
pInfo
->
pSlotIds
);
taosArrayDestroy
(
pInfo
->
pUidList
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
if
(
pInfo
->
pLastrowReader
!=
NULL
)
{
pInfo
->
pLastrowReader
=
tsdbCacherowsReaderClose
(
pInfo
->
pLastrowReader
);
...
...
@@ -255,16 +255,16 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
SSchemaWrapper
*
pWrapper
=
pTaskInfo
->
schemaInfo
.
sw
;
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchI
nfo
*
pColMatch
=
taosArrayGet
(
pColMatchInfo
,
i
);
SColMatchI
tem
*
pColMatch
=
taosArrayGet
(
pColMatchInfo
,
i
);
for
(
int32_t
j
=
0
;
j
<
pWrapper
->
nCols
;
++
j
)
{
if
(
pColMatch
->
colId
==
pWrapper
->
pSchema
[
j
].
colId
&&
pColMatch
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
(
*
pSlotIds
)[
pColMatch
->
targe
tSlotId
]
=
-
1
;
(
*
pSlotIds
)[
pColMatch
->
ds
tSlotId
]
=
-
1
;
break
;
}
if
(
pColMatch
->
colId
==
pWrapper
->
pSchema
[
j
].
colId
)
{
(
*
pSlotIds
)[
pColMatch
->
targe
tSlotId
]
=
j
;
(
*
pSlotIds
)[
pColMatch
->
ds
tSlotId
]
=
j
;
break
;
}
}
...
...
@@ -273,17 +273,18 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
return
TSDB_CODE_SUCCESS
;
}
SArray
*
removeRedundantTsCol
(
SLastRowScanPhysiNode
*
pScanNode
,
SArray
*
pColMatchInfo
)
{
int32_t
removeRedundantTsCol
(
SLastRowScanPhysiNode
*
pScanNode
,
SColMatchInfo
*
pColMatchInfo
)
{
if
(
!
pScanNode
->
ignoreNull
)
{
// retrieve cached last value
return
pColMatchInfo
;
return
TSDB_CODE_SUCCESS
;
}
SArray
*
pMatchInfo
=
taosArrayInit
(
taosArrayGetSize
(
pColMatchInfo
),
sizeof
(
SColMatchInfo
));
size_t
size
=
taosArrayGetSize
(
pColMatchInfo
->
pList
);
SArray
*
pMatchInfo
=
taosArrayInit
(
size
,
sizeof
(
SColMatchInfo
));
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pColMatchInfo
)
;
++
i
)
{
SColMatchI
nfo
*
pColInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SColMatchI
tem
*
pColInfo
=
taosArrayGet
(
pColMatchInfo
->
pList
,
i
);
int32_t
slotId
=
pColInfo
->
targe
tSlotId
;
int32_t
slotId
=
pColInfo
->
ds
tSlotId
;
SNodeList
*
pList
=
pScanNode
->
scan
.
node
.
pOutputDataBlockDesc
->
pSlots
;
SSlotDescNode
*
pDesc
=
(
SSlotDescNode
*
)
nodesListGetNode
(
pList
,
slotId
);
...
...
@@ -292,6 +293,7 @@ SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatch
}
}
taosArrayDestroy
(
pColMatchInfo
);
return
pMatchInfo
;
taosArrayDestroy
(
pColMatchInfo
->
pList
);
pColMatchInfo
->
pList
=
pMatchInfo
;
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/libs/executor/src/dataDeleter.c
浏览文件 @
ed234f22
...
...
@@ -53,22 +53,6 @@ typedef struct SDataDeleterHandle {
TdThreadMutex
mutex
;
}
SDataDeleterHandle
;
static
bool
needCompress
(
const
SSDataBlock
*
pData
,
int32_t
numOfCols
)
{
if
(
tsCompressColData
<
0
||
0
==
pData
->
info
.
rows
)
{
return
false
;
}
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pData
->
pDataBlock
,
col
);
int32_t
colSize
=
pColRes
->
info
.
bytes
*
pData
->
info
.
rows
;
if
(
NEEDTO_COMPRESS_QUERY
(
colSize
))
{
return
true
;
}
}
return
false
;
}
static
void
toDataCacheEntry
(
SDataDeleterHandle
*
pHandle
,
const
SInputData
*
pInput
,
SDataDeleterBuf
*
pBuf
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pHandle
->
pSchema
->
pSlots
);
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
ed234f22
...
...
@@ -51,22 +51,6 @@ typedef struct SDataDispatchHandle {
TdThreadMutex
mutex
;
}
SDataDispatchHandle
;
static
bool
needCompress
(
const
SSDataBlock
*
pData
,
int32_t
numOfCols
)
{
if
(
tsCompressColData
<
0
||
0
==
pData
->
info
.
rows
)
{
return
false
;
}
for
(
int32_t
col
=
0
;
col
<
numOfCols
;
++
col
)
{
SColumnInfoData
*
pColRes
=
taosArrayGet
(
pData
->
pDataBlock
,
col
);
int32_t
colSize
=
pColRes
->
info
.
bytes
*
pData
->
info
.
rows
;
if
(
NEEDTO_COMPRESS_QUERY
(
colSize
))
{
return
true
;
}
}
return
false
;
}
// clang-format off
// data format:
// +----------------+------------------+--------------+--------------+------------------+--------------------------------------------+------------------------------------+-------------+-----------+-------------+-----------+
...
...
@@ -86,7 +70,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
}
}
SDataCacheEntry
*
pEntry
=
(
SDataCacheEntry
*
)
pBuf
->
pData
;
pEntry
->
compressed
=
(
int8_t
)
needCompress
(
pInput
->
pData
,
numOfCols
)
;
pEntry
->
compressed
=
0
;
pEntry
->
numOfRows
=
pInput
->
pData
->
info
.
rows
;
pEntry
->
numOfCols
=
numOfCols
;
pEntry
->
dataLen
=
0
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
ed234f22
...
...
@@ -1066,13 +1066,17 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) {
return
pList
;
}
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
int32_t
type
)
{
size_t
numOfCols
=
LIST_LENGTH
(
pNodeList
);
int32_t
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
int32_t
type
,
SColMatchInfo
*
pMatchInfo
)
{
size_t
numOfCols
=
LIST_LENGTH
(
pNodeList
);
int32_t
code
=
0
;
pMatchInfo
->
matchType
=
type
;
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SColMatchInfo
));
if
(
pList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
...
...
@@ -1080,12 +1084,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
if
(
nodeType
(
pNode
->
pExpr
)
==
QUERY_NODE_COLUMN
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
SColMatchInfo
c
=
{
0
};
c
.
output
=
true
;
SColMatchItem
c
=
{.
needOutput
=
true
};
c
.
colId
=
pColNode
->
colId
;
c
.
srcSlotId
=
pColNode
->
slotId
;
c
.
matchType
=
type
;
c
.
targetSlotId
=
pNode
->
slotId
;
c
.
dstSlotId
=
pNode
->
slotId
;
taosArrayPush
(
pList
,
&
c
);
}
}
...
...
@@ -1102,10 +1104,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
continue
;
}
SColMatchI
nfo
*
info
=
NULL
;
SColMatchI
tem
*
info
=
NULL
;
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pList
);
++
j
)
{
info
=
taosArrayGet
(
pList
,
j
);
if
(
info
->
targe
tSlotId
==
pNode
->
slotId
)
{
if
(
info
->
ds
tSlotId
==
pNode
->
slotId
)
{
break
;
}
}
...
...
@@ -1114,11 +1116,12 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
(
*
numOfOutputCols
)
+=
1
;
}
else
if
(
info
!=
NULL
)
{
// select distinct tbname from stb where tbname='abc';
info
->
o
utput
=
false
;
info
->
needO
utput
=
false
;
}
}
return
pList
;
pMatchInfo
->
pList
=
pList
;
return
code
;
}
static
SResSchema
createResSchema
(
int32_t
type
,
int32_t
bytes
,
int32_t
slotId
,
int32_t
scale
,
int32_t
precision
,
...
...
@@ -1407,14 +1410,14 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
int32_t
i
=
0
,
j
=
0
;
while
(
i
<
numOfSrcCols
&&
j
<
taosArrayGetSize
(
pColMatchInfo
))
{
SColumnInfoData
*
p
=
taosArrayGet
(
pCols
,
i
);
SColMatchI
nfo
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
j
);
if
(
!
outputEveryColumn
&&
pmInfo
->
reserved
)
{
SColMatchI
tem
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
j
);
/*
if (!outputEveryColumn && pmInfo->reserved) {
j++;
continue;
}
}
*/
if
(
p
->
info
.
colId
==
pmInfo
->
colId
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pmInfo
->
targe
tSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pmInfo
->
ds
tSlotId
);
colDataAssign
(
pDst
,
p
,
pBlock
->
info
.
rows
,
&
pBlock
->
info
);
i
++
;
j
++
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
ed234f22
...
...
@@ -355,7 +355,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
goto
_error
;
}
SDataSinkMgtCfg
cfg
=
{.
maxDataBlockNum
=
10000
,
.
maxDataBlockNumPerQuery
=
500
0
};
SDataSinkMgtCfg
cfg
=
{.
maxDataBlockNum
=
500
,
.
maxDataBlockNumPerQuery
=
5
0
};
code
=
dsDataSinkMgtInit
(
&
cfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to dsDataSinkMgtInit, code:%s, %s"
,
tstrerror
(
code
),
(
*
pTask
)
->
id
.
str
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ed234f22
...
...
@@ -1086,7 +1086,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
SColumnInfoData
*
p
,
bool
keep
,
int32_t
status
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
)
{
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
SColMatchInfo
*
pColMatchInfo
,
SFilterInfo
*
pFilterInfo
)
{
if
(
pFilterNode
==
NULL
||
pBlock
->
info
.
rows
==
0
)
{
return
;
}
...
...
@@ -1120,12 +1120,12 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM
extractQualifiedTupleByFilterResult
(
pBlock
,
p
,
keep
,
status
);
if
(
pColMatchInfo
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pColMatchInfo
);
++
i
)
{
SColMatchI
nfo
*
pInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pColMatchInfo
->
pList
);
++
i
)
{
SColMatchI
tem
*
pInfo
=
taosArrayGet
(
pColMatchInfo
->
pList
,
i
);
if
(
pInfo
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
SColumnInfoData
*
pColData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
targe
tSlotId
);
SColumnInfoData
*
pColData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
ds
tSlotId
);
if
(
pColData
->
info
.
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
targe
tSlotId
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
ds
tSlotId
);
break
;
}
}
...
...
@@ -2560,57 +2560,6 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
return
TDB_CODE_SUCCESS
;
}
int32_t
aggDecodeResultRow
(
SOperatorInfo
*
pOperator
,
char
*
result
)
{
if
(
result
==
NULL
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SOptrBasicInfo
*
pInfo
=
(
SOptrBasicInfo
*
)(
pOperator
->
info
);
SAggSupporter
*
pSup
=
(
SAggSupporter
*
)
POINTER_SHIFT
(
pOperator
->
info
,
sizeof
(
SOptrBasicInfo
));
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t
length
=
*
(
int32_t
*
)(
result
);
int32_t
offset
=
sizeof
(
int32_t
);
int32_t
count
=
*
(
int32_t
*
)(
result
+
offset
);
offset
+=
sizeof
(
int32_t
);
while
(
count
--
>
0
&&
length
>
offset
)
{
int32_t
keyLen
=
*
(
int32_t
*
)(
result
+
offset
);
offset
+=
sizeof
(
int32_t
);
uint64_t
tableGroupId
=
*
(
uint64_t
*
)(
result
+
offset
);
SResultRow
*
resultRow
=
getNewResultRow
(
pSup
->
pResultBuf
,
&
pSup
->
currentPageId
,
pSup
->
resultRowSize
);
if
(
!
resultRow
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
// add a new result set for a new group
SResultRowPosition
pos
=
{.
pageId
=
resultRow
->
pageId
,
.
offset
=
resultRow
->
offset
};
tSimpleHashPut
(
pSup
->
pResultRowHashTable
,
result
+
offset
,
keyLen
,
&
pos
,
sizeof
(
SResultRowPosition
));
offset
+=
keyLen
;
int32_t
valueLen
=
*
(
int32_t
*
)(
result
+
offset
);
if
(
valueLen
!=
pSup
->
resultRowSize
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
offset
+=
sizeof
(
int32_t
);
int32_t
pageId
=
resultRow
->
pageId
;
int32_t
pOffset
=
resultRow
->
offset
;
memcpy
(
resultRow
,
result
+
offset
,
valueLen
);
resultRow
->
pageId
=
pageId
;
resultRow
->
offset
=
pOffset
;
offset
+=
valueLen
;
pInfo
->
resultRowInfo
.
cur
=
(
SResultRowPosition
){.
pageId
=
resultRow
->
pageId
,
.
offset
=
resultRow
->
offset
};
// releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId));
}
if
(
offset
!=
length
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
return
TDB_CODE_SUCCESS
;
}
int32_t
handleLimitOffset
(
SOperatorInfo
*
pOperator
,
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
bool
holdDataInBuf
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
)
{
// it is the first group
...
...
@@ -2850,7 +2799,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break
;
}
doFilter
(
pInfo
->
pCondition
,
fillResult
,
pInfo
->
pColMatchCol
Info
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
fillResult
,
&
pInfo
->
match
Info
,
NULL
);
if
(
fillResult
->
info
.
rows
>
0
)
{
break
;
}
...
...
@@ -3098,9 +3047,11 @@ _error:
destroyAggOperatorInfo
(
pInfo
);
}
cleanupExprSupp
(
&
pOperator
->
exprSupp
);
taosMemoryFreeClear
(
pOperator
);
if
(
pOperator
!=
NULL
)
{
cleanupExprSupp
(
&
pOperator
->
exprSupp
);
}
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
...
...
@@ -3136,7 +3087,7 @@ void destroyFillOperatorInfo(void* param) {
cleanupExprSupp
(
&
pInfo
->
noFillExprSupp
);
taosMemoryFreeClear
(
pInfo
->
p
);
taosArrayDestroy
(
pInfo
->
pColMatchColInfo
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -3280,8 +3231,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo
->
primarySrcSlotId
=
((
SColumnNode
*
)((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
pExpr
)
->
slotId
;
int32_t
numOfOutputCols
=
0
;
pInfo
->
pColMatchColInfo
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
code
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
code
=
initFillInfo
(
pInfo
,
pExprInfo
,
pInfo
->
numOfExpr
,
pNoFillSupp
->
pExprInfo
,
pNoFillSupp
->
numOfExprs
,
(
SNodeListNode
*
)
pPhyFillNode
->
pValues
,
pPhyFillNode
->
timeRange
,
pResultInfo
->
capacity
,
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
ed234f22
...
...
@@ -331,12 +331,13 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
}
}
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
pColMatchInfo
);
++
i
)
{
SColMatchI
nfo
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
pColMatchInfo
,
i
);
if
(
!
pColMatchInfo
->
o
utput
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableScanInfo
->
matchInfo
.
pList
);
++
i
)
{
SColMatchI
tem
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
matchInfo
.
pList
,
i
);
if
(
!
pColMatchInfo
->
needO
utput
)
{
continue
;
}
pBlock
->
pBlockAgg
[
pColMatchInfo
->
targetSlotId
]
=
pColAgg
[
i
];
pBlock
->
pBlockAgg
[
pColMatchInfo
->
dstSlotId
]
=
pColAgg
[
i
];
}
return
true
;
...
...
@@ -442,12 +443,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return
terrno
;
}
relocateColumnData
(
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pCols
,
true
);
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
)
{
int64_t
st
=
taosGetTimestampUs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColM
atchInfo
,
pOperator
->
exprSupp
.
pFilterInfo
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
&
pTableScanInfo
->
m
atchInfo
,
pOperator
->
exprSupp
.
pFilterInfo
);
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pTableScanInfo
->
readRecorder
.
filterTime
+=
el
;
...
...
@@ -780,8 +781,8 @@ static void destroyTableScanOperatorInfo(void* param) {
tsdbReaderClose
(
pTableScanInfo
->
dataReader
);
pTableScanInfo
->
dataReader
=
NULL
;
if
(
pTableScanInfo
->
pColMatchInfo
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
pColMatchInfo
);
if
(
pTableScanInfo
->
matchInfo
.
pList
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
matchInfo
.
pList
);
}
cleanupExprSupp
(
&
pTableScanInfo
->
pseudoSup
);
...
...
@@ -798,10 +799,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SDataBlockDescNode
*
pDescNode
=
pTableScanNode
->
scan
.
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pTableScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
extractColMatchInfo
(
pTableScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
int32_t
code
=
initQueryTableDataCond
(
&
pInfo
->
cond
,
pTableScanNode
);
code
=
initQueryTableDataCond
(
&
pInfo
->
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -1512,9 +1513,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
// todo extract method
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pColMatchInfo
);
++
i
)
{
SColMatchI
nfo
*
pColMatchInfo
=
taosArrayGet
(
pInfo
->
pColMatchInfo
,
i
);
if
(
!
pColMatchInfo
->
o
utput
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
);
++
i
)
{
SColMatchI
tem
*
pColMatchInfo
=
taosArrayGet
(
pInfo
->
matchInfo
.
pList
,
i
);
if
(
!
pColMatchInfo
->
needO
utput
)
{
continue
;
}
...
...
@@ -1522,7 +1523,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
for
(
int32_t
j
=
0
;
j
<
blockDataGetNumOfCols
(
pBlock
);
++
j
)
{
SColumnInfoData
*
pResCol
=
bdGetColumnInfoData
(
pBlock
,
j
);
if
(
pResCol
->
info
.
colId
==
pColMatchInfo
->
colId
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
pColMatchInfo
->
targe
tSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
pColMatchInfo
->
ds
tSlotId
);
colDataAssign
(
pDst
,
pResCol
,
pBlock
->
info
.
rows
,
&
pInfo
->
pRes
->
info
);
colExists
=
true
;
break
;
...
...
@@ -1531,7 +1532,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// the required column does not exists in submit block, let's set it to be all null value
if
(
!
colExists
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
pColMatchInfo
->
targe
tSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pInfo
->
pRes
->
pDataBlock
,
pColMatchInfo
->
ds
tSlotId
);
colDataAppendNNULL
(
pDst
,
0
,
pBlockInfo
->
rows
);
}
}
...
...
@@ -2193,8 +2194,8 @@ static void destroyStreamScanOperatorInfo(void* param) {
if
(
pStreamScan
->
tqReader
)
{
tqCloseReader
(
pStreamScan
->
tqReader
);
}
if
(
pStreamScan
->
pColMatchInfo
)
{
taosArrayDestroy
(
pStreamScan
->
pColMatchInfo
);
if
(
pStreamScan
->
matchInfo
.
pList
)
{
taosArrayDestroy
(
pStreamScan
->
matchInfo
.
pList
);
}
if
(
pStreamScan
->
pPseudoExpr
)
{
destroyExprInfo
(
pStreamScan
->
pPseudoExpr
,
pStreamScan
->
numOfPseudoExpr
);
...
...
@@ -2228,17 +2229,17 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
pGroupTags
=
pTableScanNode
->
pGroupTags
;
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
int32_t
numOfOutput
=
taosArrayGetSize
(
pInfo
->
pColMatchInfo
);
int32_t
numOfOutput
=
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
);
SArray
*
pColIds
=
taosArrayInit
(
numOfOutput
,
sizeof
(
int16_t
));
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SColMatchI
nfo
*
id
=
taosArrayGet
(
pInfo
->
pColMatchInfo
,
i
);
SColMatchI
tem
*
id
=
taosArrayGet
(
pInfo
->
matchInfo
.
pList
,
i
);
int16_t
colId
=
id
->
colId
;
taosArrayPush
(
pColIds
,
&
colId
);
if
(
id
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
pInfo
->
primaryTsIndex
=
id
->
targe
tSlotId
;
pInfo
->
primaryTsIndex
=
id
->
ds
tSlotId
;
}
}
...
...
@@ -2387,7 +2388,7 @@ static void destroySysScanOperator(void* param) {
pInfo
->
pIdx
=
NULL
;
}
taosArrayDestroy
(
pInfo
->
scanCols
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
pInfo
->
pUser
);
taosMemoryFreeClear
(
param
);
...
...
@@ -2752,7 +2753,7 @@ static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t
dataBlock
->
info
.
rows
=
numOfRows
;
pInfo
->
pRes
->
info
.
rows
=
numOfRows
;
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
dataBlock
->
pDataBlock
,
false
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
matchInfo
.
pList
,
dataBlock
->
pDataBlock
,
false
);
doFilterResult
(
pInfo
);
blockDataCleanup
(
dataBlock
);
...
...
@@ -3441,7 +3442,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
p
->
info
.
rows
=
numOfRows
;
pInfo
->
pRes
->
info
.
rows
=
numOfRows
;
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
,
false
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
matchInfo
.
pList
,
p
->
pDataBlock
,
false
);
doFilterResult
(
pInfo
);
blockDataCleanup
(
p
);
...
...
@@ -3457,7 +3458,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
p
->
info
.
rows
=
numOfRows
;
pInfo
->
pRes
->
info
.
rows
=
numOfRows
;
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
,
false
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
matchInfo
.
pList
,
p
->
pDataBlock
,
false
);
doFilterResult
(
pInfo
);
blockDataCleanup
(
p
);
...
...
@@ -3618,7 +3619,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
p
->
info
.
rows
=
numOfRows
;
pInfo
->
pRes
->
info
.
rows
=
numOfRows
;
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
,
false
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
matchInfo
.
pList
,
p
->
pDataBlock
,
false
);
doFilterResult
(
pInfo
);
blockDataCleanup
(
p
);
...
...
@@ -3634,7 +3635,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
p
->
info
.
rows
=
numOfRows
;
pInfo
->
pRes
->
info
.
rows
=
numOfRows
;
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
,
false
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
matchInfo
.
pList
,
p
->
pDataBlock
,
false
);
doFilterResult
(
pInfo
);
blockDataCleanup
(
p
);
...
...
@@ -3798,7 +3799,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
char
*
pStart
=
pRsp
->
data
;
extractDataBlockFromFetchRsp
(
pInfo
->
pRes
,
pRsp
->
data
,
pInfo
->
scanCols
,
&
pStart
);
extractDataBlockFromFetchRsp
(
pInfo
->
pRes
,
pRsp
->
data
,
pInfo
->
matchInfo
.
pList
,
&
pStart
);
updateLoadRemoteInfo
(
&
pInfo
->
loadInfo
,
pRsp
->
numOfRows
,
pRsp
->
compLen
,
startTs
,
pOperator
);
// todo log the filter info
...
...
@@ -3827,7 +3828,7 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
p
->
info
.
rows
=
buildDbTableInfoBlock
(
pInfo
->
sysInfo
,
p
,
pSysDbTableMeta
,
size
,
TSDB_PERFORMANCE_SCHEMA_DB
);
pInfo
->
pRes
->
info
.
rows
=
p
->
info
.
rows
;
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
,
false
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
matchInfo
.
pList
,
p
->
pDataBlock
,
false
);
blockDataDestroy
(
p
);
return
pInfo
->
pRes
->
info
.
rows
;
...
...
@@ -3889,18 +3890,16 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
SScanPhysiNode
*
pScanNode
=
&
pScanPhyNode
->
scan
;
SDataBlockDescNode
*
pDescNode
=
pScanNode
->
node
.
pOutputDataBlockDesc
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
int32_t
num
=
0
;
SArray
*
colList
=
extractColMatchInfo
(
pScanNode
->
pScanCols
,
pDescNode
,
&
num
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
extractColMatchInfo
(
pScanNode
->
pScanCols
,
pDescNode
,
&
num
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
pInfo
->
accountId
=
pScanPhyNode
->
accountId
;
pInfo
->
pUser
=
taosMemoryStrDup
((
void
*
)
pUser
);
pInfo
->
sysInfo
=
pScanPhyNode
->
sysInfo
;
pInfo
->
showRewrite
=
pScanPhyNode
->
showRewrite
;
pInfo
->
pRes
=
pResBlock
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
)
;
pInfo
->
pCondition
=
pScanNode
->
node
.
pConditions
;
pInfo
->
scanCols
=
colList
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
...
...
@@ -3922,7 +3921,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
p
ResBlock
->
pDataBlock
);
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
p
Info
->
pRes
->
pDataBlock
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
...
...
@@ -4021,7 +4020,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static
void
destroyTagScanOperatorInfo
(
void
*
param
)
{
STagScanInfo
*
pInfo
=
(
STagScanInfo
*
)
param
;
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -4038,15 +4037,14 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
int32_t
num
=
0
;
int32_t
numOfExprs
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPhyNode
->
pScanPseudoCols
,
NULL
,
&
numOfExprs
);
SArray
*
colList
=
extractColMatchInfo
(
pPhyNode
->
pScanPseudoCols
,
pDescNode
,
&
num
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
extractColMatchInfo
(
pPhyNode
->
pScanPseudoCols
,
pDescNode
,
&
num
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
int32_t
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
numOfExprs
);
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pExprInfo
,
numOfExprs
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pInfo
->
pTableList
=
pTableListInfo
;
pInfo
->
pColMatchInfo
=
colList
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
...
...
@@ -4180,11 +4178,11 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchI
nfo
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
pColMatchInfo
,
i
);
if
(
!
pColMatchInfo
->
o
utput
)
{
SColMatchI
tem
*
pColMatchInfo
=
taosArrayGet
(
pTableScanInfo
->
matchInfo
.
pList
,
i
);
if
(
!
pColMatchInfo
->
needO
utput
)
{
continue
;
}
pBlock
->
pBlockAgg
[
pColMatchInfo
->
targe
tSlotId
]
=
pColAgg
[
i
];
pBlock
->
pBlockAgg
[
pColMatchInfo
->
ds
tSlotId
]
=
pColAgg
[
i
];
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -4215,7 +4213,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
return
terrno
;
}
relocateColumnData
(
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pCols
,
true
);
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
...
...
@@ -4228,7 +4226,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
)
{
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
pTableScanInfo
->
pColM
atchInfo
,
NULL
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
&
pTableScanInfo
->
m
atchInfo
,
NULL
);
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pTableScanInfo
->
readRecorder
.
filterTime
+=
el
;
...
...
@@ -4312,9 +4310,9 @@ static SSDataBlock* getTableDataBlock(void* param) {
SArray
*
generateSortByTsInfo
(
SArray
*
colMatchInfo
,
int32_t
order
)
{
int32_t
tsTargetSlotId
=
0
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
colMatchInfo
);
++
i
)
{
SColMatchI
nfo
*
colInfo
=
taosArrayGet
(
colMatchInfo
,
i
);
SColMatchI
tem
*
colInfo
=
taosArrayGet
(
colMatchInfo
,
i
);
if
(
colInfo
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
tsTargetSlotId
=
colInfo
->
targe
tSlotId
;
tsTargetSlotId
=
colInfo
->
ds
tSlotId
;
}
}
...
...
@@ -4500,8 +4498,8 @@ void destroyTableMergeScanOperatorInfo(void* param) {
}
taosArrayDestroy
(
pTableScanInfo
->
dataReaders
);
if
(
pTableScanInfo
->
pColMatchInfo
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
pColMatchInfo
);
if
(
pTableScanInfo
->
matchInfo
.
pList
!=
NULL
)
{
taosArrayDestroy
(
pTableScanInfo
->
matchInfo
.
pList
);
}
pTableScanInfo
->
pResBlock
=
blockDataDestroy
(
pTableScanInfo
->
pResBlock
);
...
...
@@ -4559,11 +4557,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SDataBlockDescNode
*
pDescNode
=
pTableScanNode
->
scan
.
node
.
pOutputDataBlockDesc
;
int32_t
numOfCols
=
0
;
SArray
*
pColList
=
extractColMatchInfo
(
pTableScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
int32_t
code
=
extractColMatchInfo
(
pTableScanNode
->
scan
.
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
,
&
pInfo
->
matchInfo
);
int32_t
code
=
initQueryTableDataCond
(
&
pInfo
->
cond
,
pTableScanNode
);
code
=
initQueryTableDataCond
(
&
pInfo
->
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
p
Col
List
);
taosArrayDestroy
(
p
Info
->
matchInfo
.
p
List
);
goto
_error
;
}
...
...
@@ -4583,12 +4581,11 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo
->
pFilterNode
=
pTableScanNode
->
scan
.
node
.
pConditions
;
pInfo
->
tableListInfo
=
pTableListInfo
;
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColList
;
pInfo
->
pResBlock
=
createResDataBlock
(
pDescNode
);
pInfo
->
sortSourceParams
=
taosArrayInit
(
64
,
sizeof
(
STableMergeScanSortSourceParam
));
pInfo
->
pSortInfo
=
generateSortByTsInfo
(
pInfo
->
pColMatchInfo
,
pInfo
->
cond
.
order
);
pInfo
->
pSortInfo
=
generateSortByTsInfo
(
pInfo
->
matchInfo
.
pList
,
pInfo
->
cond
.
order
);
pInfo
->
pSortInputBlock
=
createOneDataBlock
(
pInfo
->
pResBlock
,
false
);
int32_t
rowSize
=
pInfo
->
pResBlock
->
info
.
rowSize
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
ed234f22
...
...
@@ -38,8 +38,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSortNode
->
pExprs
,
NULL
,
&
numOfCols
);
int32_t
numOfOutputCols
=
0
;
SArray
*
pColMatchColInfo
=
extractColMatchInfo
(
pSortNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
int32_t
code
=
extractColMatchInfo
(
pSortNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
pOperator
->
exprSupp
.
pCtx
=
createSqlFunctionCtx
(
pExprInfo
,
numOfCols
,
&
pOperator
->
exprSupp
.
rowEntryInfoOffset
);
...
...
@@ -48,7 +47,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pSortInfo
=
createSortInfo
(
pSortNode
->
pSortKeys
);
pInfo
->
pCondition
=
pSortNode
->
node
.
pConditions
;
pInfo
->
pColMatchInfo
=
pColMatchColInfo
;
initLimitInfo
(
pSortNode
->
node
.
pLimit
,
pSortNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pOperator
->
name
=
"SortOperator"
;
...
...
@@ -67,7 +65,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator
->
fpSet
=
createOperatorFpSet
(
doOpenSortOperator
,
doSort
,
NULL
,
NULL
,
destroyOrderOperatorInfo
,
getExplainExecInfo
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -127,11 +125,11 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
// todo extract function to handle this
int32_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchI
nfo
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
ASSERT
(
pmInfo
->
matchType
==
COL_MATCH_FROM_SLOT_ID
);
SColMatchI
tem
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
//
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
p
->
pDataBlock
,
pmInfo
->
srcSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
targe
tSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
ds
tSlotId
);
colDataAssign
(
pDst
,
pSrc
,
p
->
info
.
rows
,
&
pDataBlock
->
info
);
}
...
...
@@ -210,13 +208,13 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
SSDataBlock
*
pBlock
=
NULL
;
while
(
1
)
{
pBlock
=
getSortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
,
pInfo
);
pInfo
->
matchInfo
.
pList
,
pInfo
);
if
(
pBlock
==
NULL
)
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
doFilter
(
pInfo
->
pCondition
,
pBlock
,
pInfo
->
pColM
atchInfo
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pBlock
,
&
pInfo
->
m
atchInfo
,
NULL
);
if
(
blockDataGetNumOfRows
(
pBlock
)
==
0
)
{
continue
;
}
...
...
@@ -256,7 +254,7 @@ void destroyOrderOperatorInfo(void* param) {
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -277,20 +275,17 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
typedef
enum
EChildOperatorStatus
{
CHILD_OP_NEW_GROUP
,
CHILD_OP_SAME_GROUP
,
CHILD_OP_FINISHED
}
EChildOperatorStatus
;
typedef
struct
SGroupSortOperatorInfo
{
SOptrBasicInfo
binfo
;
SArray
*
pSortInfo
;
SArray
*
pColMatchInfo
;
int64_t
startTs
;
uint64_t
sortElapsed
;
bool
hasGroupId
;
uint64_t
currGroupId
;
SOptrBasicInfo
binfo
;
SArray
*
pSortInfo
;
SColMatchInfo
matchInfo
;
int64_t
startTs
;
uint64_t
sortElapsed
;
bool
hasGroupId
;
uint64_t
currGroupId
;
SSDataBlock
*
prefetchedSortInput
;
SSortHandle
*
pCurrSortHandle
;
EChildOperatorStatus
childOpStatus
;
SSortExecInfo
sortExecInfo
;
SSortExecInfo
sortExecInfo
;
}
SGroupSortOperatorInfo
;
SSDataBlock
*
getGroupSortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
,
...
...
@@ -320,11 +315,11 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
if
(
p
->
info
.
rows
>
0
)
{
int32_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchI
nfo
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
ASSERT
(
pmInfo
->
matchType
==
COL_MATCH_FROM_SLOT_ID
);
SColMatchI
tem
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
//
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
p
->
pDataBlock
,
pmInfo
->
srcSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
targe
tSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
ds
tSlotId
);
colDataAssign
(
pDst
,
pSrc
,
p
->
info
.
rows
,
&
pDataBlock
->
info
);
}
...
...
@@ -442,7 +437,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
ASSERT
(
pInfo
->
childOpStatus
!=
CHILD_OP_SAME_GROUP
);
pBlock
=
getGroupSortedBlockData
(
pInfo
->
pCurrSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
,
pInfo
);
pInfo
->
matchInfo
.
pList
,
pInfo
);
if
(
pBlock
!=
NULL
)
{
pBlock
->
info
.
groupId
=
pInfo
->
currGroupId
;
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
...
...
@@ -474,7 +469,7 @@ void destroyGroupSortOperatorInfo(void* param) {
pInfo
->
binfo
.
pRes
=
blockDataDestroy
(
pInfo
->
binfo
.
pRes
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -494,8 +489,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSortPhyNode
->
pExprs
,
NULL
,
&
numOfCols
);
int32_t
numOfOutputCols
=
0
;
SArray
*
pColMatchColInfo
=
extractColMatchInfo
(
pSortPhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
int32_t
code
=
extractColMatchInfo
(
pSortPhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
pOperator
->
exprSupp
.
pCtx
=
createSqlFunctionCtx
(
pExprInfo
,
numOfCols
,
&
pOperator
->
exprSupp
.
rowEntryInfoOffset
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
...
...
@@ -503,8 +498,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
1024
);
pInfo
->
pSortInfo
=
createSortInfo
(
pSortPhyNode
->
pSortKeys
);
;
pInfo
->
pColMatchInfo
=
pColMatchColInfo
;
pOperator
->
name
=
"GroupSortOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
;
pOperator
->
blocking
=
false
;
...
...
@@ -517,7 +511,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doGroupSort
,
NULL
,
NULL
,
destroyGroupSortOperatorInfo
,
getGroupSortExplainExecInfo
);
int32_t
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -535,20 +529,18 @@ _error:
// Multiway Sort Merge operator
typedef
struct
SMultiwayMergeOperatorInfo
{
SOptrBasicInfo
binfo
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SArray
*
pColMatchInfo
;
// for index map from table scan output
SSDataBlock
*
pInputBlock
;
int64_t
startTs
;
// sort start time
bool
groupSort
;
bool
hasGroupId
;
uint64_t
groupId
;
STupleHandle
*
prefetchedTuple
;
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SColMatchInfo
matchInfo
;
SSDataBlock
*
pInputBlock
;
int64_t
startTs
;
// sort start time
bool
groupSort
;
bool
hasGroupId
;
uint64_t
groupId
;
STupleHandle
*
prefetchedTuple
;
}
SMultiwayMergeOperatorInfo
;
int32_t
doOpenMultiwayMergeOperator
(
SOperatorInfo
*
pOperator
)
{
...
...
@@ -645,11 +637,11 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
blockDataEnsureCapacity
(
pDataBlock
,
p
->
info
.
rows
);
int32_t
numOfCols
=
taosArrayGetSize
(
pColMatchInfo
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColMatchI
nfo
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
ASSERT
(
pmInfo
->
matchType
==
COL_MATCH_FROM_SLOT_ID
);
SColMatchI
tem
*
pmInfo
=
taosArrayGet
(
pColMatchInfo
,
i
);
// ASSERT(pColMatchInfo->
== COL_MATCH_FROM_SLOT_ID);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
p
->
pDataBlock
,
pmInfo
->
srcSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
targe
tSlotId
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
pmInfo
->
ds
tSlotId
);
colDataAssign
(
pDst
,
pSrc
,
p
->
info
.
rows
,
&
pDataBlock
->
info
);
}
...
...
@@ -678,7 +670,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
}
SSDataBlock
*
pBlock
=
getMultiwaySortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
,
pOperator
);
pOperator
->
resultInfo
.
capacity
,
pInfo
->
matchInfo
.
pList
,
pOperator
);
if
(
pBlock
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
}
else
{
...
...
@@ -694,7 +686,7 @@ void destroyMultiwayMergeOperatorInfo(void* param) {
tsortDestroySortHandle
(
pInfo
->
pSortHandle
);
taosArrayDestroy
(
pInfo
->
pSortInfo
);
taosArrayDestroy
(
pInfo
->
pColMatchInfo
);
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -731,15 +723,14 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
SArray
*
pSortInfo
=
createSortInfo
(
pMergePhyNode
->
pMergeKeys
);
int32_t
numOfOutputCols
=
0
;
SArray
*
pColMatchColInfo
=
extractColMatchInfo
(
pMergePhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
code
=
extractColMatchInfo
(
pMergePhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
0
);
SSDataBlock
*
pInputBlock
=
createResDataBlock
(
pChildNode
->
pOutputDataBlockDesc
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
1024
);
pInfo
->
groupSort
=
pMergePhyNode
->
groupSort
;
pInfo
->
pSortInfo
=
pSortInfo
;
pInfo
->
pColMatchInfo
=
pColMatchColInfo
;
pInfo
->
pInputBlock
=
pInputBlock
;
pInfo
->
bufPageSize
=
getProperSortPageSize
(
rowSize
);
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
(
numStreams
+
1
);
// one additional is reserved for merged result.
...
...
source/libs/executor/src/tfill.c
浏览文件 @
ed234f22
...
...
@@ -722,7 +722,7 @@ void destroyStreamFillOperatorInfo(void* param) {
pInfo
->
pSrcBlock
=
blockDataDestroy
(
pInfo
->
pSrcBlock
);
pInfo
->
pPrevSrcBlock
=
blockDataDestroy
(
pInfo
->
pPrevSrcBlock
);
pInfo
->
pDelRes
=
blockDataDestroy
(
pInfo
->
pDelRes
);
pInfo
->
pColMatchColInfo
=
taosArrayDestroy
(
pInfo
->
pColMatchColInfo
);
pInfo
->
matchInfo
.
pList
=
taosArrayDestroy
(
pInfo
->
matchInfo
.
pList
);
taosMemoryFree
(
pInfo
);
}
...
...
@@ -1499,7 +1499,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
doStreamFillImpl
(
pOperator
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
pInfo
->
pColMatchCol
Info
,
NULL
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
&
pInfo
->
match
Info
,
NULL
);
memcpy
(
pInfo
->
pRes
->
info
.
parTbName
,
pInfo
->
pSrcBlock
->
info
.
parTbName
,
TSDB_TABLE_NAME_LEN
);
pOperator
->
resultInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
...
...
@@ -1675,11 +1675,10 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pInfo
->
primarySrcSlotId
=
((
SColumnNode
*
)((
STargetNode
*
)
pPhyFillNode
->
pWStartTs
)
->
pExpr
)
->
slotId
;
int32_t
numOfOutputCols
=
0
;
SArray
*
pColMatchColInfo
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
);
int32_t
code
=
extractColMatchInfo
(
pPhyFillNode
->
pFillExprs
,
pPhyFillNode
->
node
.
pOutputDataBlockDesc
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
&
pInfo
->
matchInfo
);
pInfo
->
pCondition
=
pPhyFillNode
->
node
.
pConditions
;
pInfo
->
pColMatchColInfo
=
pColMatchColInfo
;
int32_t
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pFillExprInfo
,
numOfFillCols
);
code
=
initExprSupp
(
&
pOperator
->
exprSupp
,
pFillExprInfo
,
numOfFillCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录