Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e6afcbf0
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e6afcbf0
编写于
6月 15, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: hash join init version
上级
48f52d59
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
456 addition
and
24 deletion
+456
-24
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+11
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+5
-0
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-1
source/libs/executor/inc/hashjoin.h
source/libs/executor/inc/hashjoin.h
+95
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-1
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-2
source/libs/executor/src/hashjoinoperator.c
source/libs/executor/src/hashjoinoperator.c
+320
-0
source/libs/executor/src/mergejoinoperator.c
source/libs/executor/src/mergejoinoperator.c
+20
-20
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
e6afcbf0
...
...
@@ -246,6 +246,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
,
QUERY_NODE_PHYSICAL_PLAN_PROJECT
,
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
e6afcbf0
...
...
@@ -410,6 +410,17 @@ typedef struct SSortMergeJoinPhysiNode {
SNode
*
pColEqualOnConditions
;
}
SSortMergeJoinPhysiNode
;
typedef
struct
SHashJoinPhysiNode
{
SPhysiNode
node
;
EJoinType
joinType
;
SNodeList
*
pOnLeft
;
SNodeList
*
pOnRight
;
SNode
*
pOnConditions
;
SNode
*
pFilterConditions
;
SNodeList
*
pTargets
;
SQueryStat
inputStat
[
2
];
}
SHashJoinPhysiNode
;
typedef
struct
SAggPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of group_by_clause and parameter expression of aggregate function
...
...
include/libs/qcom/query.h
浏览文件 @
e6afcbf0
...
...
@@ -212,6 +212,11 @@ typedef struct SQueryNodeStat {
int32_t
tableNum
;
// vg table number, unit is TSDB_TABLE_NUM_UNIT
}
SQueryNodeStat
;
typedef
struct
SQueryStat
{
int64_t
inputRowNum
;
int32_t
inputRowSize
;
}
SQueryStat
;
int32_t
initTaskQueue
();
int32_t
cleanupTaskQueue
();
...
...
source/libs/executor/inc/executil.h
浏览文件 @
e6afcbf0
...
...
@@ -162,7 +162,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode
size_t
getTableTagsBufLen
(
const
SNodeList
*
pGroups
);
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
SArray
*
makeColumnArrayFromList
(
SNodeList
*
pNodeList
);
int32_t
extractColMatchInfo
(
SNodeList
*
pNodeList
,
SDataBlockDescNode
*
pOutputNodeList
,
int32_t
*
numOfOutputCols
,
int32_t
type
,
SColMatchInfo
*
pMatchInfo
);
...
...
source/libs/executor/inc/hashjoin.h
0 → 100755
浏览文件 @
e6afcbf0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_HASHJOIN_H
#define TDENGINE_HASHJOIN_H
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SHJoinRowCtx
{
bool
rowRemains
;
int64_t
ts
;
SArray
*
leftRowLocations
;
SArray
*
leftCreatedBlocks
;
SArray
*
rightCreatedBlocks
;
int32_t
leftRowIdx
;
int32_t
rightRowIdx
;
bool
rightUseBuildTable
;
SArray
*
rightRowLocations
;
}
SHJoinRowCtx
;
typedef
struct
SRowLocation
{
SSDataBlock
*
pDataBlock
;
int32_t
pos
;
}
SRowLocation
;
typedef
struct
SColBufInfo
{
int32_t
slotId
;
bool
vardata
;
int32_t
*
offset
;
int32_t
bytes
;
char
*
data
;
}
SColBufInfo
;
typedef
struct
SJoinTableInfo
{
int32_t
keyNum
;
SColBufInfo
*
keyCols
;
char
*
keyBuf
;
int32_t
valNum
;
SColBufInfo
*
valCols
;
char
*
valBuf
;
}
SJoinTableInfo
;
typedef
struct
SHJoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
SJoinTableInfo
tbs
[
2
];
SJoinTableInfo
*
pBuild
;
SJoinTableInfo
*
pProbe
;
int32_t
pLeftKeyNum
;
SColBufInfo
*
pLeftKeyInfo
;
char
*
pLeftKeyBuf
;
int32_t
pLeftValNum
;
SColBufInfo
*
pLeftValInfo
;
char
*
pLeftValBuf
;
int32_t
pRightKeyNum
;
SColBufInfo
*
pRightKeyInfo
;
char
*
pRightKeyBuf
;
int32_t
pRightValNum
;
SColBufInfo
*
pRightValInfo
;
char
*
pRightValBuf
;
SNode
*
pCondAfterJoin
;
SSHashObj
*
pKeyHash
;
SQueryStat
inputStat
[
2
];
SHJoinRowCtx
rowCtx
;
}
SHJoinOperatorInfo
;
static
SSDataBlock
*
doHashJoin
(
struct
SOperatorInfo
*
pOperator
);
static
void
destroyHashJoinOperator
(
void
*
param
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_HASHJOIN_H
source/libs/executor/src/executil.c
浏览文件 @
e6afcbf0
...
...
@@ -1246,7 +1246,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode,
return
TSDB_CODE_SUCCESS
;
}
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
)
{
SArray
*
makeColumnArrayFromList
(
SNodeList
*
pNodeList
)
{
if
(
!
pNodeList
)
{
return
NULL
;
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
e6afcbf0
...
...
@@ -846,7 +846,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pPartNode
->
pTargets
,
NULL
,
&
numOfCols
);
pInfo
->
pGroupCols
=
extractPartitionColInfo
(
pPartNode
->
pPartitionKeys
);
pInfo
->
pGroupCols
=
makeColumnArrayFromList
(
pPartNode
->
pPartitionKeys
);
if
(
pPartNode
->
pExprs
!=
NULL
)
{
int32_t
num
=
0
;
...
...
@@ -1240,7 +1240,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
goto
_error
;
}
pInfo
->
partitionSup
.
pGroupCols
=
extractPartitionColInfo
(
pPartNode
->
part
.
pPartitionKeys
);
pInfo
->
partitionSup
.
pGroupCols
=
makeColumnArrayFromList
(
pPartNode
->
part
.
pPartitionKeys
);
if
(
pPartNode
->
part
.
pExprs
!=
NULL
)
{
int32_t
num
=
0
;
...
...
source/libs/executor/src/hashjoinoperator.c
0 → 100755
浏览文件 @
e6afcbf0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "hashjoin.h"
int32_t
initJoinKeyBufInfo
(
SColBufInfo
**
ppInfo
,
int32_t
*
colNum
,
SNodeList
*
pList
,
char
**
ppBuf
)
{
*
colNum
=
LIST_LENGTH
(
pList
);
(
*
ppInfo
)
=
taosMemoryMalloc
((
*
colNum
)
*
sizeof
(
SColBufInfo
));
if
(
NULL
==
(
*
ppInfo
))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int64_t
bufSize
=
0
;
for
(
int32_t
i
=
0
;
i
<
*
colNum
;
++
i
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
nodesListGetNode
(
pList
,
i
);
(
*
ppInfo
)
->
slotId
=
pColNode
->
slotId
;
(
*
ppInfo
)
->
vardata
=
IS_VAR_DATA_TYPE
(
pColNode
->
node
.
resType
.
type
);
(
*
ppInfo
)
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
bufSize
+=
pColNode
->
node
.
resType
.
bytes
;
}
*
ppBuf
=
taosMemoryMalloc
(
bufSize
);
if
(
NULL
==
*
ppBuf
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
initJoinTableInfo
(
SHJoinOperatorInfo
*
pJoin
,
SNodeList
*
pKeyList
,
int32_t
idx
)
{
SJoinTableInfo
*
pTable
=
&
pJoin
->
tbs
[
idx
];
int32_t
code
=
initJoinKeyBufInfo
(
&
pTable
->
keyCols
,
&
pTable
->
keyNum
,
pKeyList
,
&
pTable
->
keyBuf
);
if
(
code
)
{
return
code
;
}
}
SOperatorInfo
*
createHashJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SHashJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
)
{
SHJoinOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SHJoinOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pOperator
==
NULL
||
pInfo
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
int32_t
numOfCols
=
0
;
pInfo
->
pRes
=
createDataBlockFromDescNode
(
pJoinNode
->
node
.
pOutputDataBlockDesc
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pJoinNode
->
pTargets
,
NULL
,
&
numOfCols
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
setOperatorInfo
(
pOperator
,
"HashJoinOperator"
,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
initJoinTableInfo
(
pJoinNode
,
pJoinNode
->
pOnLeft
,
0
);
initJoinTableInfo
(
pJoinNode
,
pJoinNode
->
pOnRight
,
1
);
code
=
initJoinColBufInfo
(
&
pInfo
->
pRightKeyInfo
,
&
pInfo
->
pRightKeyNum
,
pJoinNode
->
pOnRight
,
&
pInfo
->
pRightKeyBuf
);
if
(
code
)
{
goto
_error
;
}
memcpy
(
pInfo
->
inputStat
,
pJoinNode
->
inputStat
,
sizeof
(
pJoinNode
->
inputStat
));
size_t
hashCap
=
pInfo
->
inputStat
[
1
].
inputRowNum
>
0
?
(
pInfo
->
inputStat
[
1
].
inputRowNum
*
1
.
5
)
:
1024
;
pInfo
->
pKeyHash
=
tSimpleHashInit
(
hashCap
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
));
if
(
pInfo
->
pKeyHash
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
if
(
pJoinNode
->
pFilterConditions
!=
NULL
&&
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pCondAfterJoin
=
nodesMakeNode
(
QUERY_NODE_LOGIC_CONDITION
);
if
(
pInfo
->
pCondAfterJoin
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)(
pInfo
->
pCondAfterJoin
);
pLogicCond
->
pParameterList
=
nodesMakeList
();
if
(
pLogicCond
->
pParameterList
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
nodesCloneNode
(
pJoinNode
->
pFilterConditions
));
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
));
pLogicCond
->
condType
=
LOGIC_COND_TYPE_AND
;
}
else
if
(
pJoinNode
->
pFilterConditions
!=
NULL
)
{
pInfo
->
pCondAfterJoin
=
nodesCloneNode
(
pJoinNode
->
pFilterConditions
);
}
else
if
(
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pCondAfterJoin
=
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
);
}
else
{
pInfo
->
pCondAfterJoin
=
NULL
;
}
code
=
filterInitFromNode
(
pInfo
->
pCondAfterJoin
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doHashJoin
,
NULL
,
destroHashJoinOperator
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
pDownstream
,
numOfDownstream
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
_error:
if
(
pInfo
!=
NULL
)
{
destroyMergeJoinOperator
(
pInfo
);
}
taosMemoryFree
(
pOperator
);
pTaskInfo
->
code
=
code
;
return
NULL
;
}
void
destroHashJoinOperator
(
void
*
param
)
{
SHJoinOperatorInfo
*
pJoinOperator
=
(
SHJoinOperatorInfo
*
)
param
;
if
(
pJoinOperator
->
pColEqualOnConditions
!=
NULL
)
{
mergeJoinDestoryBuildTable
(
pJoinOperator
->
rightBuildTable
);
taosMemoryFreeClear
(
pJoinOperator
->
rightEqOnCondKeyBuf
);
taosArrayDestroy
(
pJoinOperator
->
rightEqOnCondCols
);
taosMemoryFreeClear
(
pJoinOperator
->
leftEqOnCondKeyBuf
);
taosArrayDestroy
(
pJoinOperator
->
leftEqOnCondCols
);
}
nodesDestroyNode
(
pJoinOperator
->
pCondAfterMerge
);
taosArrayDestroy
(
pJoinOperator
->
rowCtx
.
leftCreatedBlocks
);
taosArrayDestroy
(
pJoinOperator
->
rowCtx
.
rightCreatedBlocks
);
taosArrayDestroy
(
pJoinOperator
->
rowCtx
.
leftRowLocations
);
taosArrayDestroy
(
pJoinOperator
->
rowCtx
.
rightRowLocations
);
pJoinOperator
->
pRes
=
blockDataDestroy
(
pJoinOperator
->
pRes
);
taosMemoryFreeClear
(
param
);
}
static
void
doHashJoinImpl
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pRes
)
{
SHJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
int32_t
nrows
=
pRes
->
info
.
rows
;
bool
asc
=
(
pJoinInfo
->
inputOrder
==
TSDB_ORDER_ASC
)
?
true
:
false
;
while
(
1
)
{
int64_t
leftTs
=
0
;
int64_t
rightTs
=
0
;
if
(
pJoinInfo
->
rowCtx
.
rowRemains
)
{
leftTs
=
pJoinInfo
->
rowCtx
.
ts
;
rightTs
=
pJoinInfo
->
rowCtx
.
ts
;
}
else
{
bool
hasNextTs
=
mergeJoinGetNextTimestamp
(
pOperator
,
&
leftTs
,
&
rightTs
);
if
(
!
hasNextTs
)
{
break
;
}
}
if
(
leftTs
==
rightTs
)
{
mergeJoinJoinDownstreamTsRanges
(
pOperator
,
leftTs
,
pRes
,
&
nrows
);
}
else
if
((
asc
&&
leftTs
<
rightTs
)
||
(
!
asc
&&
leftTs
>
rightTs
))
{
pJoinInfo
->
leftPos
+=
1
;
if
(
pJoinInfo
->
leftPos
>=
pJoinInfo
->
pLeft
->
info
.
rows
&&
pRes
->
info
.
rows
<
pOperator
->
resultInfo
.
threshold
)
{
continue
;
}
}
else
if
((
asc
&&
leftTs
>
rightTs
)
||
(
!
asc
&&
leftTs
<
rightTs
))
{
pJoinInfo
->
rightPos
+=
1
;
if
(
pJoinInfo
->
rightPos
>=
pJoinInfo
->
pRight
->
info
.
rows
&&
pRes
->
info
.
rows
<
pOperator
->
resultInfo
.
threshold
)
{
continue
;
}
}
// the pDataBlock are always the same one, no need to call this again
pRes
->
info
.
rows
=
nrows
;
pRes
->
info
.
dataLoad
=
1
;
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
}
}
}
int32_t
setColBufInfo
(
SSDataBlock
*
pBlock
,
int32_t
colNum
,
SColBufInfo
*
pColList
)
{
for
(
int32_t
i
=
0
;
i
<
colNum
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pColList
[
i
].
slotId
);
if
(
pColList
[
i
].
vardata
!=
IS_VAR_DATA_TYPE
(
pCol
->
info
.
type
))
{
qError
(
"column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d"
,
i
,
pColList
[
i
].
slotId
,
pCol
->
info
.
type
,
pColList
[
i
].
vardata
);
return
TSDB_CODE_INVALID_PARA
;
}
if
(
pColList
[
i
].
bytes
!=
IS_VAR_DATA_TYPE
(
pCol
->
info
.
bytes
))
{
qError
(
"column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d"
,
i
,
pColList
[
i
].
slotId
,
pCol
->
info
.
bytes
,
pColList
[
i
].
bytes
);
return
TSDB_CODE_INVALID_PARA
;
}
pColList
[
i
].
data
=
pCol
->
pData
;
if
(
pColList
[
i
].
vardata
)
{
pColList
[
i
].
offset
=
pCol
->
varmeta
.
offset
;
}
}
return
TSDB_CODE_SUCCESS
;
}
FORCE_INLINE
void
copyColDataToBuf
(
int32_t
colNum
,
int32_t
rowIdx
,
SColBufInfo
*
pColList
,
char
*
pBuf
,
size_t
*
bufLen
)
{
char
*
pData
=
NULL
;
*
bufLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
colNum
;
++
i
)
{
if
(
pColList
[
i
].
vardata
)
{
pData
=
pColList
[
i
].
data
+
pColList
[
i
].
offset
[
rowIdx
];
memcpy
(
pBuf
+
*
bufLen
,
pData
,
varDataTLen
(
pData
));
*
bufLen
+=
varDataTLen
(
pData
);
}
}
}
int32_t
addBlockRowsKeyToHash
(
SSDataBlock
*
pBlock
,
SHJoinOperatorInfo
*
pJoin
)
{
int32_t
code
=
setColBufInfo
(
pBlock
,
pJoin
->
pRightKeyNum
,
pJoin
->
pRightKeyInfo
);
if
(
code
)
{
return
code
;
}
size_t
bufLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
copyColDataToBuf
(
pJoin
->
pRightKeyNum
,
i
,
pJoin
->
pRightKeyInfo
,
pJoin
->
pRightKeyBuf
,
&
bufLen
);
}
tSimpleHashPut
(
pJoin
->
pKeyHash
,
pJoin
->
pRightKeyBuf
,
bufLen
,
NULL
,
0
);
}
int32_t
buildJoinKeyHash
(
struct
SOperatorInfo
*
pOperator
)
{
SHJoinOperatorInfo
*
pJoin
=
pOperator
->
info
;
SSDataBlock
*
pBlock
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
while
(
true
)
{
pBlock
=
pOperator
->
pDownstream
[
1
]
->
fpSet
.
getNextFn
(
pOperator
->
pDownstream
[
1
]);
if
(
NULL
==
pBlock
)
{
break
;
}
code
=
addBlockRowsKeyToHash
(
pBlock
,
pJoin
);
if
(
code
)
{
return
code
;
}
}
return
TSDB_CODE_SUCCESS
;
}
SSDataBlock
*
doHashJoin
(
struct
SOperatorInfo
*
pOperator
)
{
SHJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSDataBlock
*
pRes
=
pJoinInfo
->
pRes
;
blockDataCleanup
(
pRes
);
if
(
NULL
==
pJoinInfo
->
pKeyHash
)
{
code
=
buildJoinKeyHash
(
pJoinInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
if
(
tSimpleHashGetSize
(
pJoinInfo
->
pKeyHash
)
<=
0
)
{
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
return
NULL
;
}
}
while
(
true
)
{
int32_t
numOfRowsBefore
=
pRes
->
info
.
rows
;
doHashJoinImpl
(
pOperator
,
pRes
);
int32_t
numOfNewRows
=
pRes
->
info
.
rows
-
numOfRowsBefore
;
if
(
numOfNewRows
==
0
)
{
break
;
}
if
(
pOperator
->
exprSupp
.
pFilterInfo
!=
NULL
)
{
doFilter
(
pRes
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
}
}
return
(
pRes
->
info
.
rows
>
0
)
?
pRes
:
NULL
;
}
source/libs/executor/src/joinoperator.c
→
source/libs/executor/src/
merge
joinoperator.c
浏览文件 @
e6afcbf0
...
...
@@ -26,7 +26,7 @@
#include "tmsg.h"
#include "ttypes.h"
typedef
struct
SJoinRowCtx
{
typedef
struct
S
M
JoinRowCtx
{
bool
rowRemains
;
int64_t
ts
;
SArray
*
leftRowLocations
;
...
...
@@ -37,9 +37,9 @@ typedef struct SJoinRowCtx {
bool
rightUseBuildTable
;
SArray
*
rightRowLocations
;
}
SJoinRowCtx
;
}
S
M
JoinRowCtx
;
typedef
struct
SJoinOperatorInfo
{
typedef
struct
S
M
JoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
int32_t
inputOrder
;
...
...
@@ -63,16 +63,16 @@ typedef struct SJoinOperatorInfo {
int32_t
rightEqOnCondKeyLen
;
SSHashObj
*
rightBuildTable
;
SJoinRowCtx
rowCtx
;
}
SJoinOperatorInfo
;
S
M
JoinRowCtx
rowCtx
;
}
S
M
JoinOperatorInfo
;
static
void
setJoinColumnInfo
(
SColumnInfo
*
pColumn
,
const
SColumnNode
*
pColumnNode
);
static
SSDataBlock
*
doMergeJoin
(
struct
SOperatorInfo
*
pOperator
);
static
void
destroyMergeJoinOperator
(
void
*
param
);
static
void
extractTimeCondition
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
num
,
static
void
extractTimeCondition
(
S
M
JoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
num
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
const
char
*
idStr
);
static
void
extractTimeCondition
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
num
,
static
void
extractTimeCondition
(
S
M
JoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstream
,
int32_t
num
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
const
char
*
idStr
)
{
SNode
*
pMergeCondition
=
pJoinNode
->
pMergeCondition
;
if
(
nodeType
(
pMergeCondition
)
!=
QUERY_NODE_OPERATOR
)
{
...
...
@@ -104,7 +104,7 @@ static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDown
setJoinColumnInfo
(
&
pInfo
->
rightCol
,
rightTsCol
);
}
static
void
extractEqualOnCondColsFromOper
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstreams
,
SOperatorNode
*
pOperNode
,
static
void
extractEqualOnCondColsFromOper
(
S
M
JoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownstreams
,
SOperatorNode
*
pOperNode
,
SColumn
*
pLeft
,
SColumn
*
pRight
)
{
SColumnNode
*
pLeftNode
=
(
SColumnNode
*
)
pOperNode
->
pLeft
;
SColumnNode
*
pRightNode
=
(
SColumnNode
*
)
pOperNode
->
pRight
;
...
...
@@ -117,7 +117,7 @@ static void extractEqualOnCondColsFromOper(SJoinOperatorInfo* pInfo, SOperatorIn
}
}
static
void
extractEqualOnCondCols
(
SJoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownStream
,
SNode
*
pEqualOnCondNode
,
static
void
extractEqualOnCondCols
(
S
M
JoinOperatorInfo
*
pInfo
,
SOperatorInfo
**
pDownStream
,
SNode
*
pEqualOnCondNode
,
SArray
*
leftTagEqCols
,
SArray
*
rightTagEqCols
)
{
SColumn
left
=
{
0
};
SColumn
right
=
{
0
};
...
...
@@ -200,7 +200,7 @@ static int32_t fillKeyBufFromTagCols(SArray* pCols, SSDataBlock* pBlock, int32_t
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
)
{
S
JoinOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
S
JoinOperatorInfo
));
S
MJoinOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SM
JoinOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -308,7 +308,7 @@ static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
}
void
destroyMergeJoinOperator
(
void
*
param
)
{
S
JoinOperatorInfo
*
pJoinOperator
=
(
S
JoinOperatorInfo
*
)
param
;
S
MJoinOperatorInfo
*
pJoinOperator
=
(
SM
JoinOperatorInfo
*
)
param
;
if
(
pJoinOperator
->
pColEqualOnConditions
!=
NULL
)
{
mergeJoinDestoryBuildTable
(
pJoinOperator
->
rightBuildTable
);
taosMemoryFreeClear
(
pJoinOperator
->
rightEqOnCondKeyBuf
);
...
...
@@ -331,7 +331,7 @@ void destroyMergeJoinOperator(void* param) {
static
void
mergeJoinJoinLeftRight
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pRes
,
int32_t
currRow
,
SSDataBlock
*
pLeftBlock
,
int32_t
leftPos
,
SSDataBlock
*
pRightBlock
,
int32_t
rightPos
)
{
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
S
M
JoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
for
(
int32_t
i
=
0
;
i
<
pOperator
->
exprSupp
.
numOfExprs
;
++
i
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pRes
->
pDataBlock
,
i
);
...
...
@@ -408,7 +408,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
SArray
*
createdBlocks
)
{
ASSERT
(
whichChild
==
0
||
whichChild
==
1
);
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
S
M
JoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
int32_t
endPos
=
-
1
;
SSDataBlock
*
dataBlock
=
startDataBlock
;
mergeJoinGetBlockRowsEqualTs
(
dataBlock
,
tsSlotId
,
startPos
,
timestamp
,
&
endPos
,
rowLocations
,
createdBlocks
);
...
...
@@ -441,7 +441,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
return
0
;
}
static
int32_t
mergeJoinFillBuildTable
(
SJoinOperatorInfo
*
pInfo
,
SArray
*
rightRowLocations
)
{
static
int32_t
mergeJoinFillBuildTable
(
S
M
JoinOperatorInfo
*
pInfo
,
SArray
*
rightRowLocations
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
rightRowLocations
);
++
i
)
{
SRowLocation
*
rightRow
=
taosArrayGet
(
rightRowLocations
,
i
);
int32_t
keyLen
=
fillKeyBufFromTagCols
(
pInfo
->
rightEqOnCondCols
,
rightRow
->
pDataBlock
,
rightRow
->
pos
,
pInfo
->
rightEqOnCondKeyBuf
);
...
...
@@ -462,7 +462,7 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock*
int32_t
rightRowIdx
,
bool
useBuildTableTSRange
,
SArray
*
rightRowLocations
,
bool
*
pReachThreshold
)
{
*
pReachThreshold
=
false
;
uint32_t
limitRowNum
=
pOperator
->
resultInfo
.
threshold
;
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
S
M
JoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
size_t
leftNumJoin
=
taosArrayGetSize
(
leftRowLocations
);
int32_t
i
,
j
;
...
...
@@ -505,7 +505,7 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock*
return
TSDB_CODE_SUCCESS
;
}
static
void
mergeJoinDestroyTSRangeCtx
(
SJoinOperatorInfo
*
pJoinInfo
,
SArray
*
leftRowLocations
,
SArray
*
leftCreatedBlocks
,
static
void
mergeJoinDestroyTSRangeCtx
(
S
M
JoinOperatorInfo
*
pJoinInfo
,
SArray
*
leftRowLocations
,
SArray
*
leftCreatedBlocks
,
SArray
*
rightCreatedBlocks
,
bool
rightUseBuildTable
,
SArray
*
rightRowLocations
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
rightCreatedBlocks
);
++
i
)
{
SSDataBlock
*
pBlock
=
taosArrayGetP
(
rightCreatedBlocks
,
i
);
...
...
@@ -543,7 +543,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef
static
int32_t
mergeJoinJoinDownstreamTsRanges
(
SOperatorInfo
*
pOperator
,
int64_t
timestamp
,
SSDataBlock
*
pRes
,
int32_t
*
nRows
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
S
M
JoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
SArray
*
leftRowLocations
=
NULL
;
SArray
*
rightRowLocations
=
NULL
;
SArray
*
leftCreatedBlocks
=
NULL
;
...
...
@@ -611,7 +611,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
}
static
bool
mergeJoinGetNextTimestamp
(
SOperatorInfo
*
pOperator
,
int64_t
*
pLeftTs
,
int64_t
*
pRightTs
)
{
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
S
M
JoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
if
(
pJoinInfo
->
pLeft
==
NULL
||
pJoinInfo
->
leftPos
>=
pJoinInfo
->
pLeft
->
info
.
rows
)
{
SOperatorInfo
*
ds1
=
pOperator
->
pDownstream
[
0
];
...
...
@@ -647,7 +647,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs
}
static
void
doMergeJoinImpl
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pRes
)
{
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
S
M
JoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
int32_t
nrows
=
pRes
->
info
.
rows
;
...
...
@@ -691,7 +691,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
}
SSDataBlock
*
doMergeJoin
(
struct
SOperatorInfo
*
pOperator
)
{
SJoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
S
M
JoinOperatorInfo
*
pJoinInfo
=
pOperator
->
info
;
SSDataBlock
*
pRes
=
pJoinInfo
->
pRes
;
blockDataCleanup
(
pRes
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录