Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f990ceab
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f990ceab
编写于
10月 30, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
365e6eec
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
368 addition
and
320 deletion
+368
-320
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+14
-22
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-2
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+13
-11
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+273
-23
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+20
-20
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+16
-174
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+31
-68
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
f990ceab
...
...
@@ -95,27 +95,21 @@ typedef struct SColMatchInfo {
int32_t
matchType
;
// determinate the source according to col id or slot id
}
SColMatchInfo
;
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
typedef
struct
STableListInfo
{
bool
oneTableForEachGroup
;
int32_t
numOfOuputGroups
;
// the data block will be generated one by one
int32_t
*
groupOffset
;
// keep the offset value for each group in the tableList
SArray
*
pTableList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
uint64_t
suid
;
}
STableListInfo
;
void
destroyTableList
(
STableListInfo
*
pTableList
);
int32_t
getNumOfOutputGroups
(
const
STableListInfo
*
pTableList
);
bool
oneTableForEachGroup
(
const
STableListInfo
*
pTableList
);
uint64_t
getTableGroupId
(
const
STableListInfo
*
pTableList
,
uint64_t
tableUid
);
int32_t
addTableIntoTableList
(
STableListInfo
*
pTableList
,
uint64_t
uid
,
uint64_t
gid
);
int32_t
getTablesOfGroup
(
const
STableListInfo
*
pTableList
,
int32_t
ordinalIndex
,
STableKeyInfo
**
pKeyInfo
,
int32_t
*
num
);
uint64_t
getTotalTables
(
const
STableListInfo
*
pTableList
);
typedef
struct
STableListInfo
STableListInfo
;
struct
SqlFunctionCtx
;
STableListInfo
*
tableListCreate
();
void
*
tableListDestroy
(
STableListInfo
*
pTableListInfo
);
void
tableListClear
(
STableListInfo
*
pTableListInfo
);
int32_t
tableListGetOutputGroups
(
const
STableListInfo
*
pTableList
);
bool
oneTableForEachGroup
(
const
STableListInfo
*
pTableList
);
uint64_t
getTableGroupId
(
const
STableListInfo
*
pTableList
,
uint64_t
tableUid
);
int32_t
tableListAddTableInfo
(
STableListInfo
*
pTableList
,
uint64_t
uid
,
uint64_t
gid
);
int32_t
tableListGetGroupList
(
const
STableListInfo
*
pTableList
,
int32_t
ordinalIndex
,
STableKeyInfo
**
pKeyInfo
,
int32_t
*
num
);
uint64_t
tableListGetSize
(
const
STableListInfo
*
pTableList
);
uint64_t
tableListGetSuid
(
const
STableListInfo
*
pTableList
);
STableKeyInfo
*
tableListGetInfo
(
const
STableListInfo
*
pTableList
,
int32_t
index
);
size_t
getResultRowSize
(
struct
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
...
...
@@ -128,6 +122,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
if
(
forUpdate
)
{
setBufPageDirty
(
bufPage
,
true
);
}
SResultRow
*
pRow
=
(
SResultRow
*
)((
char
*
)
bufPage
+
pos
->
offset
);
return
pRow
;
}
...
...
@@ -146,7 +141,6 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
int32_t
getTableList
(
void
*
metaHandle
,
void
*
pVnode
,
SScanPhysiNode
*
pScanNode
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
STableListInfo
*
pListInfo
);
int32_t
getGroupIdFromTagsVal
(
void
*
pMeta
,
uint64_t
uid
,
SNodeList
*
pGroupNode
,
char
*
keyBuf
,
uint64_t
*
pGroupId
);
int32_t
getColInfoResultForGroupby
(
void
*
metaHandle
,
SNodeList
*
group
,
STableListInfo
*
pTableListInfo
);
size_t
getTableTagsBufLen
(
const
SNodeList
*
pGroups
);
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
...
...
@@ -169,9 +163,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
void
cleanupQueryTableDataCond
(
SQueryTableDataCond
*
pCond
);
int32_t
convertFillType
(
int32_t
mode
);
int32_t
resultrowComparAsc
(
const
void
*
p1
,
const
void
*
p2
);
int32_t
isQualifiedTable
(
STableKeyInfo
*
info
,
SNode
*
pTagCond
,
void
*
metaHandle
,
bool
*
pQualified
);
#endif // TDENGINE_QUERYUTIL_H
source/libs/executor/inc/executorimpl.h
浏览文件 @
f990ceab
...
...
@@ -184,7 +184,7 @@ typedef struct SExecTaskInfo {
int64_t
version
;
// used for stream to record wal version
SStreamTaskInfo
streamInfo
;
SSchemaInfo
schemaInfo
;
STableListInfo
tableqi
nfoList
;
// this is a table list
STableListInfo
*
pTableI
nfoList
;
// this is a table list
const
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
...
...
@@ -1077,7 +1077,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
);
bool
groupbyTbname
(
SNodeList
*
pGroupList
);
int32_t
setGroupIdMapForAllTables
(
STableListInfo
*
pTableListInfo
,
SReadHandle
*
pHandle
,
SNodeList
*
group
,
bool
groupSort
);
void
*
destroySqlFunctionCtx
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
int32_t
buildDataBlockFromGroupRes
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
f990ceab
...
...
@@ -59,22 +59,23 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto
_error
;
}
STableListInfo
*
pTableList
=
&
pTaskInfo
->
tableqi
nfoList
;
STableListInfo
*
pTableList
=
pTaskInfo
->
pTableI
nfoList
;
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
pInfo
->
pUidList
=
taosArrayInit
(
4
,
sizeof
(
int64_t
));
// partition by tbname, todo opt perf
if
(
oneTableForEachGroup
(
pTableList
)
||
(
getTotalTables
(
pTableList
)
==
1
))
{
if
(
oneTableForEachGroup
(
pTableList
)
||
(
tableListGetSize
(
pTableList
)
==
1
))
{
pInfo
->
retrieveType
=
CACHESCAN_RETRIEVE_TYPE_ALL
|
(
pScanNode
->
ignoreNull
?
CACHESCAN_RETRIEVE_LAST
:
CACHESCAN_RETRIEVE_LAST_ROW
);
STableKeyInfo
*
pList
=
taosArrayGet
(
pTableList
->
pTableList
,
0
);
size_t
num
=
taosArrayGetSize
(
pTableList
->
pTableList
);
STableKeyInfo
*
pList
=
tableListGetInfo
(
pTableList
,
0
);
size_t
num
=
tableListGetSize
(
pTableList
);
uint64_t
suid
=
tableListGetSuid
(
pTableList
);
code
=
tsdbCacherowsReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pList
,
num
,
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
),
pTableList
->
suid
,
&
pInfo
->
pLastrowReader
);
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
),
suid
,
&
pInfo
->
pLastrowReader
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -120,8 +121,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SLastrowScanInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableListInfo
*
pTableList
=
&
pTaskInfo
->
tableqinfoList
;
int32_t
size
=
taosArrayGetSize
(
pTableList
->
pTableList
);
STableListInfo
*
pTableList
=
pTaskInfo
->
pTableInfoList
;
uint64_t
suid
=
tableListGetSuid
(
pTableList
);
int32_t
size
=
tableListGetSize
(
pTableList
);
if
(
size
==
0
)
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
...
...
@@ -184,20 +187,19 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
return
NULL
;
}
}
else
{
size_t
totalGroups
=
getNumOf
OutputGroups
(
pTableList
);
size_t
totalGroups
=
tableListGet
OutputGroups
(
pTableList
);
while
(
pInfo
->
currentGroupIndex
<
totalGroups
)
{
STableKeyInfo
*
pList
=
NULL
;
int32_t
num
=
0
;
int32_t
code
=
getTablesOfGroup
(
pTableList
,
pInfo
->
currentGroupIndex
,
&
pList
,
&
num
);
int32_t
code
=
tableListGetGroupList
(
pTableList
,
pInfo
->
currentGroupIndex
,
&
pList
,
&
num
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
tsdbCacherowsReaderOpen
(
pInfo
->
readHandle
.
vnode
,
pInfo
->
retrieveType
,
pList
,
num
,
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
),
pTableList
->
suid
,
&
pInfo
->
pLastrowReader
);
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
),
suid
,
&
pInfo
->
pLastrowReader
);
taosArrayClear
(
pInfo
->
pUidList
);
code
=
tsdbRetrieveCacheRows
(
pInfo
->
pLastrowReader
,
pInfo
->
pRes
,
pInfo
->
pSlotIds
,
pInfo
->
pUidList
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
f990ceab
...
...
@@ -26,10 +26,30 @@
#include "executorimpl.h"
#include "tcompression.h"
static
int32_t
removeInvalidTable
(
SArray
*
list
,
SHashObj
*
tags
);
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
struct
STableListInfo
{
bool
oneTableForEachGroup
;
int32_t
numOfOuputGroups
;
// the data block will be generated one by one
int32_t
*
groupOffset
;
// keep the offset value for each group in the tableList
SArray
*
pTableList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
uint64_t
suid
;
};
typedef
struct
tagFilterAssist
{
SHashObj
*
colHash
;
int32_t
index
;
SArray
*
cInfoList
;
}
tagFilterAssist
;
static
int32_t
removeInvalidTable
(
SArray
*
uids
,
SHashObj
*
tags
);
static
int32_t
optimizeTbnameInCond
(
void
*
metaHandle
,
int64_t
suid
,
SArray
*
list
,
SNode
*
pTagCond
,
SHashObj
*
tags
);
static
int32_t
optimizeTbnameInCondImpl
(
void
*
metaHandle
,
int64_t
suid
,
SArray
*
list
,
SNode
*
pTagCond
);
static
int64_t
getLimit
(
const
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
limit
;
}
static
int64_t
getOffset
(
const
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
offset
;
}
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
)
{
pResultRowInfo
->
size
=
0
;
pResultRowInfo
->
cur
.
pageId
=
-
1
;
...
...
@@ -301,12 +321,6 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle,
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
tagFilterAssist
{
SHashObj
*
colHash
;
int32_t
index
;
SArray
*
cInfoList
;
}
tagFilterAssist
;
static
EDealRes
getColumn
(
SNode
**
pNode
,
void
*
pContext
)
{
SColumnNode
*
pSColumnNode
=
NULL
;
if
(
QUERY_NODE_COLUMN
==
nodeType
((
*
pNode
)))
{
...
...
@@ -766,6 +780,7 @@ static int tableUidCompare(const void* a, const void* b) {
}
return
u1
<
u2
?
-
1
:
1
;
}
static
int32_t
optimizeTbnameInCond
(
void
*
metaHandle
,
int64_t
suid
,
SArray
*
list
,
SNode
*
cond
,
SHashObj
*
tags
)
{
int32_t
ret
=
-
1
;
if
(
nodeType
(
cond
)
==
QUERY_NODE_OPERATOR
)
{
...
...
@@ -820,6 +835,7 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) {
taosArrayPush
(
validUid
,
uid
);
}
}
taosArraySwap
(
uids
,
validUid
);
taosArrayDestroy
(
validUid
);
return
0
;
...
...
@@ -1642,9 +1658,6 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) {
pLimitInfo
->
slimit
.
offset
!=
-
1
);
}
static
int64_t
getLimit
(
const
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
limit
;
}
static
int64_t
getOffset
(
const
SNode
*
pLimit
)
{
return
NULL
==
pLimit
?
-
1
:
((
SLimitNode
*
)
pLimit
)
->
offset
;
}
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
)
{
SLimit
limit
=
{.
limit
=
getLimit
(
pLimit
),
.
offset
=
getOffset
(
pLimit
)};
SLimit
slimit
=
{.
limit
=
getLimit
(
pSLimit
),
.
offset
=
getOffset
(
pSLimit
)};
...
...
@@ -1655,11 +1668,23 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit
pLimitInfo
->
remainGroupOffset
=
slimit
.
offset
;
}
uint64_t
getTotalTables
(
const
STableListInfo
*
pTableList
)
{
uint64_t
tableListGetSize
(
const
STableListInfo
*
pTableList
)
{
ASSERT
(
taosArrayGetSize
(
pTableList
->
pTableList
)
==
taosHashGetSize
(
pTableList
->
map
));
return
taosArrayGetSize
(
pTableList
->
pTableList
);
}
uint64_t
tableListGetSuid
(
const
STableListInfo
*
pTableList
)
{
return
pTableList
->
suid
;
}
STableKeyInfo
*
tableListGetInfo
(
const
STableListInfo
*
pTableList
,
int32_t
index
)
{
if
(
taosArrayGetSize
(
pTableList
->
pTableList
)
==
0
)
{
return
NULL
;
}
return
taosArrayGet
(
pTableList
->
pTableList
,
index
);
}
uint64_t
getTableGroupId
(
const
STableListInfo
*
pTableList
,
uint64_t
tableUid
)
{
int32_t
*
slot
=
taosHashGet
(
pTableList
->
map
,
&
tableUid
,
sizeof
(
tableUid
));
ASSERT
(
pTableList
->
map
!=
NULL
&&
slot
!=
NULL
);
...
...
@@ -1670,7 +1695,7 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
return
pKeyInfo
->
groupId
;
}
int32_t
addTableIntoTableList
(
STableListInfo
*
pTableList
,
uint64_t
uid
,
uint64_t
gid
)
{
int32_t
tableListAddTableInfo
(
STableListInfo
*
pTableList
,
uint64_t
uid
,
uint64_t
gid
)
{
if
(
pTableList
->
map
==
NULL
)
{
ASSERT
(
taosArrayGetSize
(
pTableList
->
pTableList
)
==
0
);
pTableList
->
map
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
...
...
@@ -1686,9 +1711,9 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
getTablesOfGroup
(
const
STableListInfo
*
pTableList
,
int32_t
ordinalGroupIndex
,
STableKeyInfo
**
pKeyInfo
,
int32_t
tableListGetGroupList
(
const
STableListInfo
*
pTableList
,
int32_t
ordinalGroupIndex
,
STableKeyInfo
**
pKeyInfo
,
int32_t
*
size
)
{
int32_t
total
=
getNumOf
OutputGroups
(
pTableList
);
int32_t
total
=
tableListGet
OutputGroups
(
pTableList
);
if
(
ordinalGroupIndex
<
0
||
ordinalGroupIndex
>=
total
)
{
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -1696,10 +1721,10 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
// here handle two special cases:
// 1. only one group exists, and 2. one table exists for each group.
if
(
total
==
1
)
{
*
size
=
getTotalTables
(
pTableList
);
*
size
=
tableListGetSize
(
pTableList
);
*
pKeyInfo
=
(
*
size
==
0
)
?
NULL
:
taosArrayGet
(
pTableList
->
pTableList
,
0
);
return
TSDB_CODE_SUCCESS
;
}
else
if
(
total
==
getTotalTables
(
pTableList
))
{
}
else
if
(
total
==
tableListGetSize
(
pTableList
))
{
*
size
=
1
;
*
pKeyInfo
=
taosArrayGet
(
pTableList
->
pTableList
,
ordinalGroupIndex
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1716,16 +1741,241 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI
return
TSDB_CODE_SUCCESS
;
}
int32_t
getNumOf
OutputGroups
(
const
STableListInfo
*
pTableList
)
{
return
pTableList
->
numOfOuputGroups
;
}
int32_t
tableListGet
OutputGroups
(
const
STableListInfo
*
pTableList
)
{
return
pTableList
->
numOfOuputGroups
;
}
bool
oneTableForEachGroup
(
const
STableListInfo
*
pTableList
)
{
return
pTableList
->
oneTableForEachGroup
;
}
void
destroyTableList
(
STableListInfo
*
pTableqinfoList
)
{
pTableqinfoList
->
pTableList
=
taosArrayDestroy
(
pTableqinfoList
->
pTableList
);
taosMemoryFreeClear
(
pTableqinfoList
->
groupOffset
);
STableListInfo
*
tableListCreate
()
{
STableListInfo
*
pListInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableListInfo
));
if
(
pListInfo
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pListInfo
->
pTableList
=
taosArrayInit
(
4
,
sizeof
(
STableKeyInfo
));
if
(
pListInfo
->
pTableList
==
NULL
)
{
goto
_error
;
}
pListInfo
->
map
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
if
(
pListInfo
->
map
==
NULL
)
{
goto
_error
;
}
pListInfo
->
numOfOuputGroups
=
1
;
return
pListInfo
;
_error:
tableListDestroy
(
pListInfo
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
void
*
tableListDestroy
(
STableListInfo
*
pTableListInfo
)
{
pTableListInfo
->
pTableList
=
taosArrayDestroy
(
pTableListInfo
->
pTableList
);
taosMemoryFreeClear
(
pTableListInfo
->
groupOffset
);
taosHashCleanup
(
pTableListInfo
->
map
);
pTableListInfo
->
pTableList
=
NULL
;
pTableListInfo
->
map
=
NULL
;
taosMemoryFree
(
pTableListInfo
);
return
NULL
;
}
void
tableListClear
(
STableListInfo
*
pTableListInfo
)
{
taosArrayClear
(
pTableListInfo
->
pTableList
);
taosHashClear
(
pTableListInfo
->
map
);
taosMemoryFree
(
pTableListInfo
->
groupOffset
);
pTableListInfo
->
numOfOuputGroups
=
1
;
pTableListInfo
->
oneTableForEachGroup
=
false
;
}
static
int32_t
orderbyGroupIdComparFn
(
const
void
*
p1
,
const
void
*
p2
)
{
STableKeyInfo
*
pInfo1
=
(
STableKeyInfo
*
)
p1
;
STableKeyInfo
*
pInfo2
=
(
STableKeyInfo
*
)
p2
;
if
(
pInfo1
->
groupId
==
pInfo2
->
groupId
)
{
return
0
;
}
else
{
return
pInfo1
->
groupId
<
pInfo2
->
groupId
?
-
1
:
1
;
}
}
static
int32_t
sortTableGroup
(
STableListInfo
*
pTableListInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
taosArraySort
(
pTableListInfo
->
pTableList
,
orderbyGroupIdComparFn
);
int32_t
size
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
SArray
*
pList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
STableKeyInfo
*
pInfo
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
0
);
uint64_t
gid
=
pInfo
->
groupId
;
taosHashCleanup
(
pTableqinfoList
->
map
);
int32_t
start
=
0
;
taosArrayPush
(
pList
,
&
start
);
pTableqinfoList
->
pTableList
=
NULL
;
pTableqinfoList
->
map
=
NULL
;
for
(
int32_t
i
=
1
;
i
<
size
;
++
i
)
{
pInfo
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
if
(
pInfo
->
groupId
!=
gid
)
{
taosArrayPush
(
pList
,
&
i
);
gid
=
pInfo
->
groupId
;
}
}
pTableListInfo
->
numOfOuputGroups
=
taosArrayGetSize
(
pList
);
pTableListInfo
->
groupOffset
=
taosMemoryMalloc
(
sizeof
(
int32_t
)
*
pTableListInfo
->
numOfOuputGroups
);
memcpy
(
pTableListInfo
->
groupOffset
,
taosArrayGet
(
pList
,
0
),
sizeof
(
int32_t
)
*
pTableListInfo
->
numOfOuputGroups
);
taosArrayDestroy
(
pList
);
# if 0
SArray
*
sortSupport
=
taosArrayInit
(
16
,
sizeof
(
uint64_t
));
if
(
sortSupport
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
size_t
num
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
for
(
int32_t
i
=
0
;
i
<
num
;
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
uint64_t
*
groupId
=
taosHashGet
(
pTableListInfo
->
map
,
&
info
->
uid
,
sizeof
(
uint64_t
));
int32_t
index
=
taosArraySearchIdx
(
sortSupport
,
groupId
,
compareUint64Val
,
TD_EQ
);
if
(
index
==
-
1
)
{
void
*
p
=
taosArraySearch
(
sortSupport
,
groupId
,
compareUint64Val
,
TD_GT
);
SArray
*
tGroup
=
taosArrayInit
(
8
,
sizeof
(
STableKeyInfo
));
if
(
tGroup
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
taosArrayPush
(
tGroup
,
info
)
==
NULL
)
{
qError
(
"taos push info array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
p
==
NULL
)
{
if
(
taosArrayPush
(
sortSupport
,
groupId
)
==
NULL
)
{
qError
(
"taos push support array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
taosArrayPush
(
pTableListInfo
->
pGroupList
,
&
tGroup
)
==
NULL
)
{
qError
(
"taos push group array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
}
else
{
int32_t
pos
=
TARRAY_ELEM_IDX
(
sortSupport
,
p
);
if
(
taosArrayInsert
(
sortSupport
,
pos
,
groupId
)
==
NULL
)
{
qError
(
"taos insert support array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
taosArrayInsert
(
pTableListInfo
->
pGroupList
,
pos
,
&
tGroup
)
==
NULL
)
{
qError
(
"taos insert group array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
}
}
else
{
SArray
*
tGroup
=
(
SArray
*
)
taosArrayGetP
(
pTableListInfo
->
pGroupList
,
index
);
if
(
taosArrayPush
(
tGroup
,
info
)
==
NULL
)
{
qError
(
"taos push uid array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
}
}
taosArrayDestroy
(
sortSupport
);
#endif
return
TDB_CODE_SUCCESS
;
_error:
// taosArrayDestroy(sortSupport);
return
code
;
}
int32_t
setGroupIdMapForAllTables
(
STableListInfo
*
pTableListInfo
,
SReadHandle
*
pHandle
,
SNodeList
*
group
,
bool
groupSort
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
ASSERT
(
pTableListInfo
->
map
!=
NULL
);
bool
groupByTbname
=
groupbyTbname
(
group
);
size_t
numOfTables
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
if
(
group
==
NULL
||
groupByTbname
)
{
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
info
->
groupId
=
groupByTbname
?
info
->
uid
:
0
;
}
pTableListInfo
->
oneTableForEachGroup
=
groupByTbname
;
if
(
groupSort
&&
groupByTbname
)
{
taosArraySort
(
pTableListInfo
->
pTableList
,
orderbyGroupIdComparFn
);
pTableListInfo
->
numOfOuputGroups
=
numOfTables
;
}
else
{
pTableListInfo
->
numOfOuputGroups
=
1
;
}
}
else
{
code
=
getColInfoResultForGroupby
(
pHandle
->
meta
,
group
,
pTableListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
groupSort
)
{
code
=
sortTableGroup
(
pTableListInfo
);
}
}
// add all table entry in the hash map
size_t
size
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableKeyInfo
*
p
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
taosHashPut
(
pTableListInfo
->
map
,
&
p
->
uid
,
sizeof
(
uint64_t
),
&
i
,
sizeof
(
int32_t
));
}
return
code
;
}
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idStr
)
{
int64_t
st
=
taosGetTimestampUs
();
if
(
pHandle
==
NULL
)
{
qError
(
"invalid handle, in creating operator tree, %s"
,
idStr
);
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanNode
,
pTagCond
,
pTagIndexCond
,
pTableListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to getTableList, code: %s"
,
tstrerror
(
code
));
return
code
;
}
pTableListInfo
->
numOfOuputGroups
=
1
;
int64_t
st1
=
taosGetTimestampUs
();
qDebug
(
"generate queried table list completed, elapsed time:%.2f ms %s"
,
(
st1
-
st
)
/
1000
.
0
,
idStr
);
if
(
taosArrayGetSize
(
pTableListInfo
->
pTableList
)
==
0
)
{
qDebug
(
"no table qualified for query, %s"
PRIx64
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
code
=
setGroupIdMapForAllTables
(
pTableListInfo
,
pHandle
,
pGroupTags
,
groupSort
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
int64_t
st2
=
taosGetTimestampUs
();
qDebug
(
"generate group id map completed, elapsed time:%.2f ms %s"
,
(
st2
-
st1
)
/
1000
.
0
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/libs/executor/src/executor.c
浏览文件 @
f990ceab
...
...
@@ -287,14 +287,11 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
STableListInfo
*
pListInfo
=
&
pTaskInfo
->
tableqinfoList
;
if
(
isAdd
)
{
qDebug
(
"add %d tables id into query list, %s"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
),
pTaskInfo
->
id
.
str
);
}
// traverse to the stream scanner node to add this table id
SOperatorInfo
*
pInfo
=
pTaskInfo
->
pRoot
;
while
(
pInfo
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
...
...
@@ -328,7 +325,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
}
STableListInfo
*
pTableListInfo
=
&
pTaskInfo
->
tableqi
nfoList
;
STableListInfo
*
pTableListInfo
=
pTaskInfo
->
pTableI
nfoList
;
for
(
int32_t
i
=
0
;
i
<
numOfQualifiedTables
;
++
i
)
{
uint64_t
*
uid
=
taosArrayGet
(
qa
,
i
);
...
...
@@ -361,7 +358,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (!exists) {
#endif
addTableIntoTableList
(
pTableListInfo
,
keyInfo
.
uid
,
keyInfo
.
groupId
);
tableListAddTableInfo
(
pTableListInfo
,
keyInfo
.
uid
,
keyInfo
.
groupId
);
}
if
(
keyBuf
!=
NULL
)
{
...
...
@@ -925,8 +922,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int64_t
ts
=
pOffset
->
ts
;
if
(
uid
==
0
)
{
if
(
ta
osArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTable
List
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
ta
osArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTable
List
,
0
);
if
(
ta
bleListGetSize
(
pTaskInfo
->
pTableInfo
List
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
ta
bleListGetInfo
(
pTaskInfo
->
pTableInfo
List
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
}
else
{
...
...
@@ -937,7 +934,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
int32_t
numOfTables
=
getTotalTables
(
&
pTaskInfo
->
tableqi
nfoList
);
int32_t
numOfTables
=
tableListGetSize
(
pTaskInfo
->
pTableI
nfoList
);
#ifndef NDEBUG
qDebug
(
"switch to next table %"
PRId64
" (cursor %d), %"
PRId64
" rows returned"
,
uid
,
...
...
@@ -947,7 +944,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
bool
found
=
false
;
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableKeyInfo
*
pTableInfo
=
ta
osArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTable
List
,
i
);
STableKeyInfo
*
pTableInfo
=
ta
bleListGetInfo
(
pTaskInfo
->
pTableInfo
List
,
i
);
if
(
pTableInfo
->
uid
==
uid
)
{
found
=
true
;
pTableScanInfo
->
currentTable
=
i
;
...
...
@@ -959,8 +956,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
ASSERT
(
found
);
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
STableKeyInfo
*
pList
=
ta
osArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTable
List
,
0
);
int32_t
num
=
getTotalTables
(
&
pTaskInfo
->
tableqi
nfoList
);
STableKeyInfo
*
pList
=
ta
bleListGetInfo
(
pTaskInfo
->
pTableInfo
List
,
0
);
int32_t
num
=
tableListGetSize
(
pTaskInfo
->
pTableI
nfoList
);
if
(
tsdbReaderOpen
(
pTableScanInfo
->
readHandle
.
vnode
,
&
pTableScanInfo
->
cond
,
pList
,
num
,
&
pTableScanInfo
->
dataReader
,
NULL
)
<
0
||
pTableScanInfo
->
dataReader
==
NULL
)
{
...
...
@@ -993,22 +990,25 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SMetaTableInfo
mtInfo
=
getUidfromSnapShot
(
sContext
);
tsdbReaderClose
(
pInfo
->
dataReader
);
pInfo
->
dataReader
=
NULL
;
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
taosArrayDestroy
(
pTaskInfo
->
tableqinfoList
.
pTableList
);
if
(
mtInfo
.
uid
==
0
)
return
0
;
// no data
tableListClear
(
pTaskInfo
->
pTableInfoList
);
if
(
mtInfo
.
uid
==
0
)
{
return
0
;
// no data
}
initQueryTableDataCondForTmq
(
&
pTaskInfo
->
streamInfo
.
tableCond
,
sContext
,
&
mtInfo
);
pTaskInfo
->
streamInfo
.
tableCond
.
twindows
.
skey
=
pOffset
->
ts
;
STableListInfo
*
pListInfo
=
&
pTaskInfo
->
tableqinfoList
;
pListInfo
->
pTableList
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
taosArrayPush
(
pListInfo
->
pTableList
,
&
(
STableKeyInfo
){.
uid
=
mtInfo
.
uid
,
.
groupId
=
0
});
STableListInfo
*
pListInfo
=
pTaskInfo
->
pTableInfoList
;
tableListAddTableInfo
(
pListInfo
,
mtInfo
.
uid
,
0
);
STableKeyInfo
*
pList
=
taosArrayGet
(
pListInfo
->
pTableList
,
0
);
STableKeyInfo
*
pList
=
tableListGetInfo
(
pListInfo
,
0
);
int32_t
size
=
tableListGetSize
(
pListInfo
);
ASSERT
(
size
==
1
);
tsdbReaderOpen
(
pInfo
->
vnode
,
&
pTaskInfo
->
streamInfo
.
tableCond
,
pList
,
taosArrayGetSize
(
pListInfo
->
pTableList
),
&
pInfo
->
dataReader
,
NULL
);
tsdbReaderOpen
(
pInfo
->
vnode
,
&
pTaskInfo
->
streamInfo
.
tableCond
,
pList
,
size
,
&
pInfo
->
dataReader
,
NULL
);
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
strcpy
(
pTaskInfo
->
streamInfo
.
tbName
,
mtInfo
.
tbName
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
f990ceab
...
...
@@ -3273,6 +3273,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
execModel
=
model
;
pTaskInfo
->
pTableInfoList
=
tableListCreate
();
char
*
p
=
taosMemoryCalloc
(
1
,
128
);
snprintf
(
p
,
128
,
"TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
queryId
);
...
...
@@ -3364,117 +3365,6 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) {
static
void
cleanupStreamInfo
(
SStreamTaskInfo
*
pStreamInfo
)
{
tDeleteSSchemaWrapper
(
pStreamInfo
->
schema
);
}
static
int32_t
orderbyGroupIdComparFn
(
const
void
*
p1
,
const
void
*
p2
)
{
STableKeyInfo
*
pInfo1
=
(
STableKeyInfo
*
)
p1
;
STableKeyInfo
*
pInfo2
=
(
STableKeyInfo
*
)
p2
;
if
(
pInfo1
->
groupId
==
pInfo2
->
groupId
)
{
return
0
;
}
else
{
return
pInfo1
->
groupId
<
pInfo2
->
groupId
?
-
1
:
1
;
}
}
static
int32_t
sortTableGroup
(
STableListInfo
*
pTableListInfo
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
taosArraySort
(
pTableListInfo
->
pTableList
,
orderbyGroupIdComparFn
);
int32_t
size
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
SArray
*
pList
=
taosArrayInit
(
4
,
sizeof
(
int32_t
));
STableKeyInfo
*
pInfo
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
0
);
uint64_t
gid
=
pInfo
->
groupId
;
int32_t
start
=
0
;
taosArrayPush
(
pList
,
&
start
);
for
(
int32_t
i
=
1
;
i
<
size
;
++
i
)
{
pInfo
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
if
(
pInfo
->
groupId
!=
gid
)
{
taosArrayPush
(
pList
,
&
i
);
gid
=
pInfo
->
groupId
;
}
}
pTableListInfo
->
numOfOuputGroups
=
taosArrayGetSize
(
pList
);
pTableListInfo
->
groupOffset
=
taosMemoryMalloc
(
sizeof
(
int32_t
)
*
pTableListInfo
->
numOfOuputGroups
);
memcpy
(
pTableListInfo
->
groupOffset
,
taosArrayGet
(
pList
,
0
),
sizeof
(
int32_t
)
*
pTableListInfo
->
numOfOuputGroups
);
taosArrayDestroy
(
pList
);
# if 0
SArray
*
sortSupport
=
taosArrayInit
(
16
,
sizeof
(
uint64_t
));
if
(
sortSupport
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
size_t
num
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
for
(
int32_t
i
=
0
;
i
<
num
;
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
uint64_t
*
groupId
=
taosHashGet
(
pTableListInfo
->
map
,
&
info
->
uid
,
sizeof
(
uint64_t
));
int32_t
index
=
taosArraySearchIdx
(
sortSupport
,
groupId
,
compareUint64Val
,
TD_EQ
);
if
(
index
==
-
1
)
{
void
*
p
=
taosArraySearch
(
sortSupport
,
groupId
,
compareUint64Val
,
TD_GT
);
SArray
*
tGroup
=
taosArrayInit
(
8
,
sizeof
(
STableKeyInfo
));
if
(
tGroup
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
taosArrayPush
(
tGroup
,
info
)
==
NULL
)
{
qError
(
"taos push info array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
p
==
NULL
)
{
if
(
taosArrayPush
(
sortSupport
,
groupId
)
==
NULL
)
{
qError
(
"taos push support array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
taosArrayPush
(
pTableListInfo
->
pGroupList
,
&
tGroup
)
==
NULL
)
{
qError
(
"taos push group array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
}
else
{
int32_t
pos
=
TARRAY_ELEM_IDX
(
sortSupport
,
p
);
if
(
taosArrayInsert
(
sortSupport
,
pos
,
groupId
)
==
NULL
)
{
qError
(
"taos insert support array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
taosArrayInsert
(
pTableListInfo
->
pGroupList
,
pos
,
&
tGroup
)
==
NULL
)
{
qError
(
"taos insert group array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
}
}
else
{
SArray
*
tGroup
=
(
SArray
*
)
taosArrayGetP
(
pTableListInfo
->
pGroupList
,
index
);
if
(
taosArrayPush
(
tGroup
,
info
)
==
NULL
)
{
qError
(
"taos push uid array error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
}
}
taosArrayDestroy
(
sortSupport
);
#endif
return
TDB_CODE_SUCCESS
;
_error:
// taosArrayDestroy(sortSupport);
return
code
;
}
bool
groupbyTbname
(
SNodeList
*
pGroupList
)
{
bool
bytbname
=
false
;
if
(
LIST_LENGTH
(
pGroupList
)
==
1
)
{
...
...
@@ -3488,52 +3378,6 @@ bool groupbyTbname(SNodeList* pGroupList) {
return
bytbname
;
}
int32_t
setGroupIdMapForAllTables
(
STableListInfo
*
pTableListInfo
,
SReadHandle
*
pHandle
,
SNodeList
*
group
,
bool
groupSort
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
pTableListInfo
->
map
=
taosHashInit
(
32
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
if
(
pTableListInfo
->
map
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
}
bool
groupByTbname
=
groupbyTbname
(
group
);
size_t
numOfTables
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
if
(
group
==
NULL
||
groupByTbname
)
{
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableKeyInfo
*
info
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
info
->
groupId
=
groupByTbname
?
info
->
uid
:
0
;
}
pTableListInfo
->
oneTableForEachGroup
=
groupByTbname
;
if
(
groupSort
&&
groupByTbname
)
{
taosArraySort
(
pTableListInfo
->
pTableList
,
orderbyGroupIdComparFn
);
pTableListInfo
->
numOfOuputGroups
=
numOfTables
;
}
else
{
pTableListInfo
->
numOfOuputGroups
=
1
;
}
}
else
{
code
=
getColInfoResultForGroupby
(
pHandle
->
meta
,
group
,
pTableListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
if
(
groupSort
)
{
code
=
sortTableGroup
(
pTableListInfo
);
}
}
// add all table entry in the hash map
size_t
size
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableKeyInfo
*
p
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
i
);
taosHashPut
(
pTableListInfo
->
map
,
&
p
->
uid
,
sizeof
(
uint64_t
),
&
i
,
sizeof
(
int32_t
));
}
return
code
;
}
static
int32_t
initTableblockDistQueryCond
(
uint64_t
uid
,
SQueryTableDataCond
*
pCond
)
{
memset
(
pCond
,
0
,
sizeof
(
SQueryTableDataCond
));
...
...
@@ -3629,11 +3473,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
#ifndef NDEBUG
int32_t
sz
=
ta
osArrayGetSize
(
pTableListInfo
->
pTableList
);
int32_t
sz
=
ta
bleListGetSize
(
pTableListInfo
);
qDebug
(
"create stream task, total:%d"
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
STableKeyInfo
*
pKeyInfo
=
ta
osArrayGet
(
pTableListInfo
->
pTableList
,
i
);
STableKeyInfo
*
pKeyInfo
=
ta
bleListGetInfo
(
pTableListInfo
,
i
);
qDebug
(
"add table uid:%"
PRIu64
", gid:%"
PRIu64
,
pKeyInfo
->
uid
,
pKeyInfo
->
groupId
);
}
#endif
...
...
@@ -3656,8 +3500,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pOperator
=
createTagScanOperatorInfo
(
pHandle
,
pScanPhyNode
,
pTableListInfo
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
==
type
)
{
SBlockDistScanPhysiNode
*
pBlockNode
=
(
SBlockDistScanPhysiNode
*
)
pPhyNode
;
pTableListInfo
->
pTableList
=
taosArrayInit
(
4
,
sizeof
(
STableKeyInfo
));
pTableListInfo
->
numOfOuputGroups
=
1
;
if
(
pBlockNode
->
tableType
==
TSDB_SUPER_TABLE
)
{
SArray
*
pList
=
taosArrayInit
(
4
,
sizeof
(
STableKeyInfo
));
...
...
@@ -3667,13 +3509,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
ta
osArrayGetSize
(
pTableListInfo
->
pTableList
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
ta
bleListGetSize
(
pTableListInfo
);
++
i
)
{
STableKeyInfo
*
p
=
taosArrayGet
(
pList
,
i
);
addTableIntoTableList
(
pTableListInfo
,
p
->
uid
,
0
);
tableListAddTableInfo
(
pTableListInfo
,
p
->
uid
,
0
);
}
taosArrayDestroy
(
pList
);
}
else
{
// Create one table group.
addTableIntoTableList
(
pTableListInfo
,
pBlockNode
->
uid
,
0
);
tableListAddTableInfo
(
pTableListInfo
,
pBlockNode
->
uid
,
0
);
}
SQueryTableDataCond
cond
=
{
0
};
...
...
@@ -3683,11 +3525,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return
NULL
;
}
size_t
num
=
getTotalTables
(
pTableListInfo
);
void
*
pList
=
NULL
;
if
(
num
>
0
)
{
pList
=
taosArrayGet
(
pTableListInfo
->
pTableList
,
0
);
}
size_t
num
=
tableListGetSize
(
pTableListInfo
);
void
*
pList
=
tableListGetInfo
(
pTableListInfo
,
0
);
STsdbReader
*
pReader
=
NULL
;
tsdbReaderOpen
(
pHandle
->
vnode
,
&
cond
,
pList
,
num
,
&
pReader
,
""
);
...
...
@@ -4000,15 +3839,18 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
if
(
NULL
==
pDeleterParam
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
tbNum
=
taosArrayGetSize
(
pTask
->
tableqinfoList
.
pTableList
);
pDeleterParam
->
suid
=
pTask
->
tableqinfoList
.
suid
;
int32_t
tbNum
=
tableListGetSize
(
pTask
->
pTableInfoList
);
pDeleterParam
->
suid
=
tableListGetSuid
(
pTask
->
pTableInfoList
);
// TODO extract uid list
pDeleterParam
->
pUidList
=
taosArrayInit
(
tbNum
,
sizeof
(
uint64_t
));
if
(
NULL
==
pDeleterParam
->
pUidList
)
{
taosMemoryFree
(
pDeleterParam
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
i
<
tbNum
;
++
i
)
{
STableKeyInfo
*
pTable
=
ta
osArrayGet
(
pTask
->
tableqinfoList
.
pTable
List
,
i
);
STableKeyInfo
*
pTable
=
ta
bleListGetInfo
(
pTask
->
pTableInfo
List
,
i
);
taosArrayPush
(
pDeleterParam
->
pUidList
,
&
pTable
->
uid
);
}
...
...
@@ -4044,7 +3886,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
sql
=
NULL
;
(
*
pTaskInfo
)
->
pSubplan
=
pPlan
;
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
&
(
*
pTaskInfo
)
->
tableqi
nfoList
,
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
(
*
pTaskInfo
)
->
pTableI
nfoList
,
pPlan
->
pTagCond
,
pPlan
->
pTagIndexCond
,
pPlan
->
user
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
...
...
@@ -4064,7 +3906,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
));
destroyTableList
(
&
pTaskInfo
->
tableqi
nfoList
);
pTaskInfo
->
pTableInfoList
=
tableListDestroy
(
pTaskInfo
->
pTableI
nfoList
);
destroyOperatorInfo
(
pTaskInfo
->
pRoot
);
cleanupTableSchemaInfo
(
&
pTaskInfo
->
schemaInfo
);
cleanupStreamInfo
(
&
pTaskInfo
->
streamInfo
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
f990ceab
...
...
@@ -623,7 +623,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pBlock
->
info
=
binfo
;
ASSERT
(
binfo
.
uid
!=
0
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
&
pTaskInfo
->
tableqi
nfoList
,
pBlock
->
info
.
uid
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableI
nfoList
,
pBlock
->
info
.
uid
);
uint32_t
status
=
0
;
int32_t
code
=
loadDataBlock
(
pOperator
,
pTableScanInfo
,
pBlock
,
&
status
);
...
...
@@ -719,7 +719,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// scan table one by one sequentially
if
(
pInfo
->
scanMode
==
TABLE_SCAN__TABLE_ORDER
)
{
int32_t
numOfTables
=
ta
osArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTable
List
);
int32_t
numOfTables
=
ta
bleListGetSize
(
pTaskInfo
->
pTableInfo
List
);
while
(
1
)
{
SSDataBlock
*
result
=
doTableScanGroup
(
pOperator
);
...
...
@@ -733,7 +733,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return
NULL
;
}
STableKeyInfo
*
pTableInfo
=
ta
osArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTable
List
,
pInfo
->
currentTable
);
STableKeyInfo
*
pTableInfo
=
ta
bleListGetInfo
(
pTaskInfo
->
pTableInfo
List
,
pInfo
->
currentTable
);
tsdbSetTableList
(
pInfo
->
dataReader
,
pTableInfo
,
1
);
qDebug
(
"set uid:%"
PRIu64
" into scanner, total tables:%d, index:%d %s"
,
pTableInfo
->
uid
,
numOfTables
,
pInfo
->
currentTable
,
pTaskInfo
->
id
.
str
);
...
...
@@ -743,14 +743,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
}
else
{
// scan table group by group sequentially
if
(
pInfo
->
currentGroupId
==
-
1
)
{
if
((
++
pInfo
->
currentGroupId
)
>=
getNumOfOutputGroups
(
&
pTaskInfo
->
tableqi
nfoList
))
{
if
((
++
pInfo
->
currentGroupId
)
>=
tableListGetOutputGroups
(
pTaskInfo
->
pTableI
nfoList
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
int32_t
num
=
0
;
STableKeyInfo
*
pList
=
NULL
;
getTablesOfGroup
(
&
pTaskInfo
->
tableqi
nfoList
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
tableListGetGroupList
(
pTaskInfo
->
pTableI
nfoList
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
ASSERT
(
pInfo
->
dataReader
==
NULL
);
int32_t
code
=
tsdbReaderOpen
(
pInfo
->
readHandle
.
vnode
,
&
pInfo
->
cond
,
pList
,
num
,
(
STsdbReader
**
)
&
pInfo
->
dataReader
,
...
...
@@ -766,7 +766,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return
result
;
}
if
((
++
pInfo
->
currentGroupId
)
>=
getNumOfOutputGroups
(
&
pTaskInfo
->
tableqi
nfoList
))
{
if
((
++
pInfo
->
currentGroupId
)
>=
tableListGetOutputGroups
(
pTaskInfo
->
pTableI
nfoList
))
{
doSetOperatorCompleted
(
pOperator
);
return
NULL
;
}
...
...
@@ -778,7 +778,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t
num
=
0
;
STableKeyInfo
*
pList
=
NULL
;
getTablesOfGroup
(
&
pTaskInfo
->
tableqi
nfoList
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
tableListGetGroupList
(
pTaskInfo
->
pTableI
nfoList
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
tsdbSetTableList
(
pInfo
->
dataReader
,
pList
,
num
);
tsdbReaderReset
(
pInfo
->
dataReader
,
&
pInfo
->
cond
);
...
...
@@ -1119,7 +1119,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
relocateColumnData
(
pBlock
,
pTableScanInfo
->
matchInfo
.
pList
,
pCols
,
true
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
&
pTaskInfo
->
tableqi
nfoList
,
binfo
.
uid
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableI
nfoList
,
binfo
.
uid
);
}
tsdbReaderClose
(
pReader
);
...
...
@@ -1139,8 +1139,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
}
static
uint64_t
getGroupIdByUid
(
SStreamScanInfo
*
pInfo
,
uint64_t
uid
)
{
return
getTableGroupId
(
&
pInfo
->
pTableScanOp
->
pTaskInfo
->
tableqi
nfoList
,
uid
);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->
tableqi
nfoList.map;
return
getTableGroupId
(
pInfo
->
pTableScanOp
->
pTaskInfo
->
pTableI
nfoList
,
uid
);
// SHashObj* map = pInfo->pTableScanOp->pTaskInfo->
pTableI
nfoList.map;
// uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t));
// if (groupId) {
// return *groupId;
...
...
@@ -1564,7 +1564,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo
->
pRes
->
info
.
type
=
STREAM_NORMAL
;
pInfo
->
pRes
->
info
.
version
=
pBlock
->
info
.
version
;
pInfo
->
pRes
->
info
.
groupId
=
getTableGroupId
(
&
pTaskInfo
->
tableqi
nfoList
,
pBlock
->
info
.
uid
);
pInfo
->
pRes
->
info
.
groupId
=
getTableGroupId
(
pTaskInfo
->
pTableI
nfoList
,
pBlock
->
info
.
uid
);
// todo extract method
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
matchInfo
.
pList
);
++
i
)
{
...
...
@@ -2076,12 +2076,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
static
SArray
*
extractTableIdList
(
const
STableListInfo
*
pTable
Group
Info
)
{
static
SArray
*
extractTableIdList
(
const
STableListInfo
*
pTable
List
Info
)
{
SArray
*
tableIdList
=
taosArrayInit
(
4
,
sizeof
(
uint64_t
));
// Transfer the Array of STableKeyInfo into uid list.
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pTableGroupInfo
->
pTableList
);
++
i
)
{
STableKeyInfo
*
pkeyInfo
=
taosArrayGet
(
pTableGroupInfo
->
pTableList
,
i
);
size_t
size
=
tableListGetSize
(
pTableListInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableKeyInfo
*
pkeyInfo
=
tableListGetInfo
(
pTableListInfo
,
i
);
taosArrayPush
(
tableIdList
,
&
pkeyInfo
->
uid
);
}
...
...
@@ -2350,7 +2351,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
STableKeyInfo
*
pList
=
NULL
;
int32_t
num
=
0
;
getTablesOfGroup
(
&
pTaskInfo
->
tableqi
nfoList
,
0
,
&
pList
,
&
num
);
tableListGetGroupList
(
pTaskInfo
->
pTableI
nfoList
,
0
,
&
pList
,
&
num
);
if
(
pHandle
->
initTableReader
)
{
pTSInfo
->
scanMode
=
TABLE_SCAN__TABLE_ORDER
;
...
...
@@ -2383,7 +2384,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
// set the extract column id to streamHandle
tqReaderSetColIdList
(
pInfo
->
tqReader
,
pColIds
);
SArray
*
tableIdList
=
extractTableIdList
(
&
pTaskInfo
->
tableqi
nfoList
);
SArray
*
tableIdList
=
extractTableIdList
(
pTaskInfo
->
pTableI
nfoList
);
code
=
tqReaderSetTbUidList
(
pInfo
->
tqReader
,
tableIdList
);
if
(
code
!=
0
)
{
taosArrayDestroy
(
tableIdList
);
...
...
@@ -4082,7 +4083,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pInfo
->
pRes
;
blockDataCleanup
(
pRes
);
int32_t
size
=
ta
osArrayGetSize
(
pInfo
->
pTableList
->
pTable
List
);
int32_t
size
=
ta
bleListGetSize
(
pTaskInfo
->
pTableInfo
List
);
if
(
size
==
0
)
{
setTaskStatus
(
pTaskInfo
,
TASK_COMPLETED
);
return
NULL
;
...
...
@@ -4094,7 +4095,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderInit
(
&
mr
,
pInfo
->
readHandle
.
meta
,
0
);
while
(
pInfo
->
curPos
<
size
&&
count
<
pOperator
->
resultInfo
.
capacity
)
{
STableKeyInfo
*
item
=
ta
osArrayGet
(
pInfo
->
pTableList
->
pTableList
,
pInfo
->
curPos
);
STableKeyInfo
*
item
=
ta
bleListGetInfo
(
pInfo
->
pTableList
,
pInfo
->
curPos
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
item
->
uid
);
tDecoderClear
(
&
mr
.
coder
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4209,47 +4210,10 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
return
NULL
;
}
int32_t
createScanTableListInfo
(
SScanPhysiNode
*
pScanNode
,
SNodeList
*
pGroupTags
,
bool
groupSort
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
idStr
)
{
int64_t
st
=
taosGetTimestampUs
();
if
(
pHandle
==
NULL
)
{
qError
(
"invalid handle, in creating operator tree, %s"
,
idStr
);
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
code
=
getTableList
(
pHandle
->
meta
,
pHandle
->
vnode
,
pScanNode
,
pTagCond
,
pTagIndexCond
,
pTableListInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to getTableList, code: %s"
,
tstrerror
(
code
));
return
code
;
}
pTableListInfo
->
numOfOuputGroups
=
1
;
int64_t
st1
=
taosGetTimestampUs
();
qDebug
(
"generate queried table list completed, elapsed time:%.2f ms %s"
,
(
st1
-
st
)
/
1000
.
0
,
idStr
);
if
(
taosArrayGetSize
(
pTableListInfo
->
pTableList
)
==
0
)
{
qDebug
(
"no table qualified for query, %s"
PRIx64
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
code
=
setGroupIdMapForAllTables
(
pTableListInfo
,
pHandle
,
pGroupTags
,
groupSort
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
int64_t
st2
=
taosGetTimestampUs
();
qDebug
(
"generate group id map completed, elapsed time:%.2f ms %s"
,
(
st2
-
st1
)
/
1000
.
0
,
idStr
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
createMultipleDataReaders
(
SQueryTableDataCond
*
pQueryCond
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableListInfo
,
int32_t
tableStartIdx
,
int32_t
tableEndIdx
,
SArray
*
arrayReader
,
const
char
*
idstr
)
{
for
(
int32_t
i
=
tableStartIdx
;
i
<=
tableEndIdx
;
++
i
)
{
STableKeyInfo
*
pList
=
ta
osArrayGet
(
pTableListInfo
->
pTableList
,
i
);
STableKeyInfo
*
pList
=
ta
bleListGetInfo
(
pTableListInfo
,
i
);
STsdbReader
*
pReader
=
NULL
;
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
pList
,
1
,
&
pReader
,
idstr
);
taosArrayPush
(
arrayReader
,
&
pReader
);
...
...
@@ -4262,7 +4226,7 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle*
STableListInfo
*
pTableListInfo
,
int32_t
tableStartIdx
,
int32_t
tableEndIdx
,
STsdbReader
**
ppReader
,
const
char
*
idstr
)
{
STsdbReader
*
pReader
=
NULL
;
void
*
pStart
=
ta
osArrayGet
(
pTableListInfo
->
pTableList
,
tableStartIdx
);
void
*
pStart
=
ta
bleListGetInfo
(
pTableListInfo
,
tableStartIdx
);
int32_t
num
=
tableEndIdx
-
tableStartIdx
+
1
;
int32_t
code
=
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
pStart
,
num
,
&
pReader
,
idstr
);
...
...
@@ -4522,7 +4486,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
int64_t
st
=
taosGetTimestampUs
();
void
*
p
=
taosArrayGet
(
pInfo
->
tableListInfo
->
pTableList
,
readIdx
+
pInfo
->
tableStartIndex
);
void
*
p
=
tableListGetInfo
(
pInfo
->
tableListInfo
,
readIdx
+
pInfo
->
tableStartIndex
);
SReadHandle
*
pHandle
=
&
pInfo
->
readHandle
;
tsdbReaderOpen
(
pHandle
->
vnode
,
pQueryCond
,
p
,
1
,
&
pInfo
->
pReader
,
GET_TASKID
(
pTaskInfo
));
...
...
@@ -4565,7 +4529,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) {
continue
;
}
pBlock
->
info
.
groupId
=
getTableGroupId
(
&
pOperator
->
pTaskInfo
->
tableqi
nfoList
,
pBlock
->
info
.
uid
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pOperator
->
pTaskInfo
->
pTableI
nfoList
,
pBlock
->
info
.
uid
);
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
// pTableScanInfo->readRecorder.totalRows;
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
...
...
@@ -4622,7 +4586,7 @@ static SSDataBlock* getTableDataBlock2(void* param) {
continue
;
}
pBlock
->
info
.
groupId
=
getTableGroupId
(
&
pOperator
->
pTaskInfo
->
tableqi
nfoList
,
pBlock
->
info
.
uid
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pOperator
->
pTaskInfo
->
pTableI
nfoList
,
pBlock
->
info
.
uid
);
pOperator
->
resultInfo
.
totalRows
=
pTableScanInfo
->
readRecorder
.
totalRows
;
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
...
...
@@ -4676,7 +4640,7 @@ static SSDataBlock* getTableDataBlock(void* param) {
continue
;
}
pBlock
->
info
.
groupId
=
getTableGroupId
(
&
pOperator
->
pTaskInfo
->
tableqi
nfoList
,
pBlock
->
info
.
uid
);
pBlock
->
info
.
groupId
=
getTableGroupId
(
pOperator
->
pTaskInfo
->
pTableI
nfoList
,
pBlock
->
info
.
uid
);
pOperator
->
resultInfo
.
totalRows
=
pTableScanInfo
->
readRecorder
.
totalRows
;
pTableScanInfo
->
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
...
...
@@ -4719,10 +4683,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
{
size_t
tableListSize
=
taosArrayGetSize
(
pInfo
->
tableListInfo
->
pTableList
);
size_t
numOfTables
=
tableListGetSize
(
pInfo
->
tableListInfo
);
int32_t
i
=
pInfo
->
tableStartIndex
+
1
;
for
(;
i
<
tableListSize
;
++
i
)
{
STableKeyInfo
*
tableKeyInfo
=
ta
osArrayGet
(
pInfo
->
tableListInfo
->
pTableList
,
i
);
for
(;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
tableKeyInfo
=
ta
bleListGetInfo
(
pInfo
->
tableListInfo
,
i
);
if
(
tableKeyInfo
->
groupId
!=
pInfo
->
groupId
)
{
break
;
}
...
...
@@ -4847,7 +4811,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
size_t
tableListSize
=
ta
osArrayGetSize
(
pInfo
->
tableListInfo
->
pTableList
);
size_t
tableListSize
=
ta
bleListGetSize
(
pInfo
->
tableListInfo
);
if
(
!
pInfo
->
hasGroupId
)
{
pInfo
->
hasGroupId
=
true
;
...
...
@@ -4856,7 +4820,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
return
NULL
;
}
pInfo
->
tableStartIndex
=
0
;
pInfo
->
groupId
=
((
STableKeyInfo
*
)
ta
osArrayGet
(
pInfo
->
tableListInfo
->
pTableList
,
pInfo
->
tableStartIndex
))
->
groupId
;
pInfo
->
groupId
=
((
STableKeyInfo
*
)
ta
bleListGetInfo
(
pInfo
->
tableListInfo
,
pInfo
->
tableStartIndex
))
->
groupId
;
startGroupTableMergeScan
(
pOperator
);
}
...
...
@@ -4875,8 +4839,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
break
;
}
pInfo
->
tableStartIndex
=
pInfo
->
tableEndIndex
+
1
;
pInfo
->
groupId
=
((
STableKeyInfo
*
)
taosArrayGet
(
pInfo
->
tableListInfo
->
pTableList
,
pInfo
->
tableStartIndex
))
->
groupId
;
pInfo
->
groupId
=
tableListGetInfo
(
pInfo
->
tableListInfo
,
pInfo
->
tableStartIndex
)
->
groupId
;
startGroupTableMergeScan
(
pOperator
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录