Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
04b12ee7
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
04b12ee7
编写于
7月 18, 2022
作者:
H
Haojun Liao
提交者:
GitHub
7月 18, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15063 from taosdata/feature/3_liaohj
fix(query): filter new added child table for stream scan operator.
上级
71bc44c0
eebbbce4
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
84 addition
and
85 deletion
+84
-85
include/common/tcommon.h
include/common/tcommon.h
+0
-2
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+4
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+9
-17
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+16
-11
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+18
-3
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+14
-17
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+23
-34
未找到文件。
include/common/tcommon.h
浏览文件 @
04b12ee7
...
...
@@ -80,8 +80,6 @@ typedef struct {
SArray
*
pTableList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
bool
needSortTableByGroupId
;
void
*
pTagCond
;
void
*
pTagIndexCond
;
uint64_t
suid
;
}
STableListInfo
;
...
...
source/libs/executor/inc/executil.h
浏览文件 @
04b12ee7
...
...
@@ -15,6 +15,7 @@
#ifndef TDENGINE_QUERYUTIL_H
#define TDENGINE_QUERYUTIL_H
#include "vnode.h"
#include "function.h"
#include "nodes.h"
#include "plannodes.h"
...
...
@@ -106,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
SSDataBlock
*
createResDataBlock
(
SDataBlockDescNode
*
pNode
);
EDealRes
doTranslateTagExpr
(
SNode
**
pNode
,
void
*
pContext
);
int32_t
getTableList
(
void
*
metaHandle
,
void
*
vnode
,
SScanPhysiNode
*
pScanNode
,
STableListInfo
*
pListInfo
);
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
STableListInfo
*
pListInfo
);
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
...
...
@@ -128,4 +129,6 @@ int32_t convertFillType(int32_t mode);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isTableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
04b12ee7
...
...
@@ -311,19 +311,10 @@ typedef struct STableScanInfo {
int32_t
dataBlockLoadFlag
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SSampleExecInfo
sample
;
// sample execution info
int32_t
currentGroupId
;
int32_t
currentTable
;
#if 0
struct {
uint64_t uid;
int64_t ts;
} lastStatus;
#endif
int8_t
scanMode
;
int8_t
noTable
;
int8_t
scanMode
;
int8_t
noTable
;
}
STableScanInfo
;
typedef
struct
STagScanInfo
{
...
...
@@ -429,8 +420,9 @@ typedef struct SStreamScanInfo {
// status for tmq
// SSchemaWrapper schema;
STqOffset
offset
;
STqOffset
offset
;
SNode
*
pTagCond
;
SNode
*
pTagIndexCond
;
}
SStreamScanInfo
;
typedef
struct
SSysTableScanInfo
{
...
...
@@ -874,8 +866,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SReadHandle
*
readHandle
,
uint64_t
uid
,
SBlockDistScanPhysiNode
*
pBlockScanNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
,
uint64_t
queryId
,
uint64_t
taskId
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
...
...
@@ -966,12 +958,12 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
);
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idstr
);
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SGroupSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
STableListInfo
*
pTableListInfo
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
,
uint64_t
queryId
,
uint64_t
taskId
);
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
);
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
04b12ee7
...
...
@@ -265,7 +265,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
static
bool
isTableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
SMeta
*
metaHandle
)
{
int32_t
isTableOk
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
)
{
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
metaHandle
,
0
);
metaGetTableEntryByUid
(
&
mr
,
info
->
uid
);
...
...
@@ -280,19 +280,22 @@ static bool isTableOk(STableKeyInfo* info, SNode* pTagCond, SMeta* metaHandle) {
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
terrno
=
code
;
nodesDestroyNode
(
pTagCondTmp
);
return
false
;
*
pQualified
=
false
;
return
code
;
}
ASSERT
(
nodeType
(
pNew
)
==
QUERY_NODE_VALUE
);
SValueNode
*
pValue
=
(
SValueNode
*
)
pNew
;
ASSERT
(
pValue
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_BOOL
);
bool
result
=
pValue
->
datum
.
b
;
*
pQualified
=
pValue
->
datum
.
b
;
nodesDestroyNode
(
pNew
);
return
result
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
STableListInfo
*
pListInfo
)
{
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
S
Node
*
pTagCond
,
SNode
*
pTagIndexCond
,
S
TableListInfo
*
pListInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
pListInfo
->
pTableList
=
taosArrayInit
(
8
,
sizeof
(
STableKeyInfo
));
...
...
@@ -304,8 +307,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
pListInfo
->
suid
=
pScanNode
->
suid
;
SNode
*
pTagCond
=
(
SNode
*
)
pListInfo
->
pTagCond
;
SNode
*
pTagIndexCond
=
(
SNode
*
)
pListInfo
->
pTagIndexCond
;
if
(
pScanNode
->
tableType
==
TSDB_SUPER_TABLE
)
{
if
(
pTagIndexCond
)
{
SIndexMetaArg
metaArg
=
{
...
...
@@ -345,9 +346,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
int32_t
i
=
0
;
while
(
i
<
taosArrayGetSize
(
pListInfo
->
pTableList
))
{
STableKeyInfo
*
info
=
taosArrayGet
(
pListInfo
->
pTableList
,
i
);
bool
isOk
=
isTableOk
(
info
,
pTagCond
,
metaHandle
);
if
(
terrno
)
return
terrno
;
if
(
!
isOk
)
{
bool
qualified
=
true
;
code
=
isTableOk
(
info
,
pTagCond
,
metaHandle
,
&
qualified
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
!
qualified
)
{
taosArrayRemove
(
pListInfo
->
pTableList
,
i
);
continue
;
}
...
...
@@ -362,7 +368,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
// put into list as default group, remove it if grouping sorting is required later
taosArrayPush
(
pListInfo
->
pGroupList
,
&
pListInfo
->
pTableList
);
return
code
;
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
04b12ee7
...
...
@@ -153,7 +153,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
return
pTaskInfo
;
}
static
SArray
*
filterQualifiedChildTables
(
const
SStreamScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
)
{
static
SArray
*
filterQualifiedChildTables
(
const
SStreamScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
,
const
char
*
idstr
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
// let's discard the tables those are not created according to the queried super table.
...
...
@@ -164,7 +164,7 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s
"
,
*
id
,
tstrerror
(
terrno
)
);
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s
, %s"
,
*
id
,
tstrerror
(
terrno
),
idstr
);
continue
;
}
...
...
@@ -172,6 +172,21 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
if
(
mr
.
me
.
type
!=
TSDB_CHILD_TABLE
||
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
if
(
pScanInfo
->
pTagCond
!=
NULL
)
{
bool
qualified
=
false
;
STableKeyInfo
info
=
{.
groupId
=
0
,
.
uid
=
mr
.
me
.
uid
,
.
lastKey
=
0
};
code
=
isTableOk
(
&
info
,
pScanInfo
->
pTagCond
,
pScanInfo
->
readHandle
.
meta
,
&
qualified
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to filter new table, uid:0x%"
PRIx64
", %s"
,
info
.
uid
,
idstr
);
continue
;
}
if
(
!
qualified
)
{
continue
;
}
}
/*pScanInfo->pStreamScanOp->pTaskInfo->tableqinfoList.*/
// handle multiple partition
...
...
@@ -194,7 +209,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
int32_t
code
=
0
;
SStreamScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
,
GET_TASKID
(
pTaskInfo
)
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
code
=
tqReaderAddTbUidList
(
pScanInfo
->
tqReader
,
qa
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
04b12ee7
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <executorimpl.h>
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -4369,8 +4368,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
}
SOperatorInfo
*
createOperatorTree
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
queryId
,
uint64_t
taskId
,
STableListInfo
*
pTableListInfo
,
const
char
*
pUser
)
{
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
pUser
)
{
int32_t
type
=
nodeType
(
pPhyNode
);
if
(
pPhyNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pPhyNode
->
pChildren
)
==
0
)
{
...
...
@@ -4378,7 +4376,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -4398,7 +4396,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
type
)
{
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -4411,7 +4409,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
SOperatorInfo
*
pOperator
=
createTableMergeScanOperatorInfo
(
pTableScanNode
,
pTableListInfo
,
pHandle
,
pTaskInfo
,
queryId
,
taskId
);
createTableMergeScanOperatorInfo
(
pTableScanNode
,
pTableListInfo
,
pHandle
,
pTaskInfo
);
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
...
...
@@ -4428,15 +4426,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
};
if
(
pHandle
)
{
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
}
}
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
,
pTableScanNode
,
pTaskInfo
,
&
twSup
,
queryId
,
taskId
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
,
pTableScanNode
,
pTagCond
,
pTaskInfo
,
&
twSup
);
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
...
...
@@ -4445,7 +4442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
==
type
)
{
STagScanPhysiNode
*
pScanPhyNode
=
(
STagScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanPhyNode
,
pTableListInfo
);
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanPhyNode
,
pTa
gCond
,
pTagIndexCond
,
pTa
bleListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
terrno
;
return
NULL
;
...
...
@@ -4481,8 +4478,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
==
type
)
{
SLastRowScanPhysiNode
*
pScanNode
=
(
SLastRowScanPhysiNode
*
)
pPhyNode
;
int32_t
code
=
createScanTableListInfo
(
&
pScanNode
->
scan
,
pScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
queryId
,
taskId
);
int32_t
code
=
createScanTableListInfo
(
&
pScanNode
->
scan
,
pScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
...
...
@@ -4506,7 +4502,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SOperatorInfo
**
ops
=
taosMemoryCalloc
(
size
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
i
);
ops
[
i
]
=
createOperatorTree
(
pChildNode
,
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
pTableListInfo
,
pUser
);
ops
[
i
]
=
createOperatorTree
(
pChildNode
,
pTaskInfo
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pUser
);
if
(
ops
[
i
]
==
NULL
)
{
return
NULL
;
}
...
...
@@ -4689,6 +4685,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return
pList
;
}
#if 0
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, const char* idstr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
...
...
@@ -4722,6 +4719,7 @@ _error:
terrno = code;
return NULL;
}
#endif
static
int32_t
extractTbscanInStreamOpTree
(
SOperatorInfo
*
pOperator
,
STableScanInfo
**
ppInfo
)
{
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
...
...
@@ -4765,6 +4763,7 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
return
-
1
;
}
#if 0
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
STableScanInfo* pTableScanInfo = NULL;
if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
...
...
@@ -4788,6 +4787,7 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa
// TODO: set uid and ts to data reader
return 0;
}
#endif
int32_t
encodeOperator
(
SOperatorInfo
*
ops
,
char
**
result
,
int32_t
*
length
,
int32_t
*
nOptrWithVal
)
{
int32_t
code
=
TDB_CODE_SUCCESS
;
...
...
@@ -4939,10 +4939,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
}
(
*
pTaskInfo
)
->
sql
=
sql
;
(
*
pTaskInfo
)
->
tableqinfoList
.
pTagCond
=
pPlan
->
pTagCond
;
(
*
pTaskInfo
)
->
tableqinfoList
.
pTagIndexCond
=
pPlan
->
pTagIndexCond
;
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
(
*
pTaskInfo
)
->
tableqinfoList
,
pPlan
->
user
);
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
&
(
*
pTaskInfo
)
->
tableqinfoList
,
pPlan
->
pTagCond
,
pPlan
->
pTagIndexCond
,
pPlan
->
user
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
code
=
(
*
pTaskInfo
)
->
code
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
04b12ee7
...
...
@@ -1497,9 +1497,8 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFree
(
pStreamScan
);
}
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
SExecTaskInfo
*
pTaskInfo
,
STimeWindowAggSupp
*
pTwSup
)
{
SStreamScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -1512,6 +1511,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
SDataBlockDescNode
*
pDescNode
=
pScanPhyNode
->
node
.
pOutputDataBlockDesc
;
pInfo
->
pTagCond
=
pTagCond
;
int32_t
numOfCols
=
0
;
pInfo
->
pColMatchInfo
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
COL_MATCH_FROM_COL_ID
);
...
...
@@ -2550,26 +2551,19 @@ typedef struct STableMergeScanInfo {
int32_t
tableEndIndex
;
bool
hasGroupId
;
uint64_t
groupId
;
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SArray
*
dataReaders
;
// array of tsdbReaderT*
SReadHandle
readHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
uint64_t
queryId
;
uint64_t
taskId
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
// int32_t prevGroupId; // previous table group id
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SNode
*
pFilterNode
;
// filter info, which is push down by optimizer
...
...
@@ -2584,26 +2578,25 @@ typedef struct STableMergeScanInfo {
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SqlFunctionCtx
*
pPseudoCtx
;
// int32_t* rowEntryInfoOffset;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
// window to check if current data block needs to be loaded.
SInterval
interval
;
SSampleExecInfo
sample
;
// sample execution info
}
STableMergeScanInfo
;
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
uint64_t
queryId
,
uint64_t
taskId
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanNode
,
pTableListInfo
);
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idStr
)
{
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanNode
,
pTa
gCond
,
pTagIndexCond
,
pTa
bleListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
taosArrayGetSize
(
pTableListInfo
->
pTableList
)
==
0
)
{
qDebug
(
"no table qualified for query,
TID:0x%"
PRIx64
", QID:0x%"
PRIx64
,
taskId
,
queryId
);
qDebug
(
"no table qualified for query,
%s"
PRIx64
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3027,8 +3020,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
}
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
STableListInfo
*
pTableListInfo
,
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
,
uint64_t
queryId
,
uint64_t
taskId
)
{
SReadHandle
*
readHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
STableMergeScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableMergeScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -3067,9 +3059,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo
->
pResBlock
=
createResDataBlock
(
pDescNode
);
pInfo
->
dataReaders
=
taosArrayInit
(
64
,
POINTER_BYTES
);
pInfo
->
queryId
=
queryId
;
pInfo
->
taskId
=
taskId
;
pInfo
->
sortSourceParams
=
taosArrayInit
(
64
,
sizeof
(
STableMergeScanSortSourceParam
));
pInfo
->
pSortInfo
=
generateSortByTsInfo
(
pInfo
->
pColMatchInfo
,
pInfo
->
cond
.
order
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录