Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2b55a75e
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2b55a75e
编写于
6月 21, 2023
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: refact code
上级
7e555e43
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
309 addition
and
143 deletion
+309
-143
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+0
-1
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+5
-1
source/libs/executor/inc/hashjoin.h
source/libs/executor/inc/hashjoin.h
+28
-20
source/libs/executor/src/hashjoinoperator.c
source/libs/executor/src/hashjoinoperator.c
+276
-121
未找到文件。
include/libs/nodes/plannodes.h
浏览文件 @
2b55a75e
...
@@ -415,7 +415,6 @@ typedef struct SHashJoinPhysiNode {
...
@@ -415,7 +415,6 @@ typedef struct SHashJoinPhysiNode {
EJoinType
joinType
;
EJoinType
joinType
;
SNodeList
*
pOnLeft
;
SNodeList
*
pOnLeft
;
SNodeList
*
pOnRight
;
SNodeList
*
pOnRight
;
SNode
*
pOnConditions
;
SNode
*
pFilterConditions
;
SNode
*
pFilterConditions
;
SNodeList
*
pTargets
;
SNodeList
*
pTargets
;
SQueryStat
inputStat
[
2
];
SQueryStat
inputStat
[
2
];
...
...
include/libs/nodes/querynodes.h
浏览文件 @
2b55a75e
...
@@ -167,7 +167,11 @@ typedef struct STempTableNode {
...
@@ -167,7 +167,11 @@ typedef struct STempTableNode {
SNode
*
pSubquery
;
SNode
*
pSubquery
;
}
STempTableNode
;
}
STempTableNode
;
typedef
enum
EJoinType
{
JOIN_TYPE_INNER
=
1
}
EJoinType
;
typedef
enum
EJoinType
{
JOIN_TYPE_INNER
=
1
,
JOIN_TYPE_LEFT
,
JOIN_TYPE_RIGHT
,
}
EJoinType
;
typedef
struct
SJoinTableNode
{
typedef
struct
SJoinTableNode
{
STableNode
table
;
// QUERY_NODE_JOIN_TABLE
STableNode
table
;
// QUERY_NODE_JOIN_TABLE
...
...
source/libs/executor/inc/hashjoin.h
浏览文件 @
2b55a75e
...
@@ -19,6 +19,8 @@
...
@@ -19,6 +19,8 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
#define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760
typedef
struct
SHJoinCtx
{
typedef
struct
SHJoinCtx
{
bool
rowRemains
;
bool
rowRemains
;
SBufRowInfo
*
pBuildRow
;
SBufRowInfo
*
pBuildRow
;
...
@@ -31,14 +33,17 @@ typedef struct SRowLocation {
...
@@ -31,14 +33,17 @@ typedef struct SRowLocation {
int32_t
pos
;
int32_t
pos
;
}
SRowLocation
;
}
SRowLocation
;
typedef
struct
S
ColBuf
Info
{
typedef
struct
S
HJoinCol
Info
{
int32_t
srcSlot
;
int32_t
srcSlot
;
int32_t
dstSlot
;
int32_t
dstSlot
;
bool
keyCol
;
bool
vardata
;
bool
vardata
;
int32_t
*
offset
;
int32_t
*
offset
;
int32_t
bytes
;
int32_t
bytes
;
char
*
data
;
char
*
data
;
}
SColBufInfo
;
char
*
bitMap
;
char
*
dataInBuf
;
}
SHJoinColInfo
;
typedef
struct
SBufPageInfo
{
typedef
struct
SBufPageInfo
{
int32_t
pageSize
;
int32_t
pageSize
;
...
@@ -50,8 +55,7 @@ typedef struct SBufPageInfo {
...
@@ -50,8 +55,7 @@ typedef struct SBufPageInfo {
typedef
struct
SBufRowInfo
{
typedef
struct
SBufRowInfo
{
void
*
next
;
void
*
next
;
uint16_t
pageId
;
uint16_t
pageId
;
int32_t
offset
:
31
;
int32_t
offset
;
int32_t
isNull
:
1
;
}
SBufRowInfo
;
}
SBufRowInfo
;
#pragma pack(pop)
#pragma pack(pop)
...
@@ -59,33 +63,37 @@ typedef struct SGroupData {
...
@@ -59,33 +63,37 @@ typedef struct SGroupData {
SBufRowInfo
*
rows
;
SBufRowInfo
*
rows
;
}
SGroupData
;
}
SGroupData
;
typedef
struct
SJoinTableInfo
{
typedef
struct
S
H
JoinTableInfo
{
SOperatorInfo
*
downStream
;
SOperatorInfo
*
downStream
;
int32_t
blkId
;
int32_t
blkId
;
SQueryStat
inputStat
;
SQueryStat
inputStat
;
int32_t
keyNum
;
int32_t
keyNum
;
S
ColBufInfo
*
keyCols
;
S
HJoinColInfo
*
keyCols
;
char
*
keyBuf
;
char
*
keyBuf
;
char
*
keyData
;
int32_t
valNum
;
int32_t
valNum
;
SColBufInfo
*
valCols
;
SHJoinColInfo
*
valCols
;
char
*
valData
;
int32_t
valBitMapSize
;
int32_t
valBufSize
;
int32_t
valBufSize
;
bool
valVarData
;
SArray
*
valVarCols
;
}
SJoinTableInfo
;
bool
valColExist
;
}
SHJoinTableInfo
;
typedef
struct
SHJoinOperatorInfo
{
typedef
struct
SHJoinOperatorInfo
{
SSDataBlock
*
pRes
;
int32_t
joinType
;
int32_t
joinType
;
SHJoinTableInfo
tbs
[
2
]
;
S
JoinTableInfo
tbs
[
2
]
;
S
HJoinTableInfo
*
pBuild
;
S
JoinTableInfo
*
pBuild
;
S
HJoinTableInfo
*
pProbe
;
S
JoinTableInfo
*
pProbe
;
S
SDataBlock
*
pRes
;
int32_t
pResColNum
;
int32_t
pResColNum
;
int8_t
*
pResColMap
;
int8_t
*
pResColMap
;
SArray
*
pRowBufs
;
SArray
*
pRowBufs
;
SNode
*
pCondAfterJoin
;
SNode
*
pCond
;
SSHashObj
*
pKeyHash
;
SSHashObj
*
pKeyHash
;
SHJoinCtx
ctx
;
SHJoinCtx
ctx
;
}
SHJoinOperatorInfo
;
}
SHJoinOperatorInfo
;
static
SSDataBlock
*
doHashJoin
(
struct
SOperatorInfo
*
pOperator
);
static
SSDataBlock
*
doHashJoin
(
struct
SOperatorInfo
*
pOperator
);
...
...
source/libs/executor/src/hashjoinoperator.c
浏览文件 @
2b55a75e
...
@@ -27,27 +27,31 @@
...
@@ -27,27 +27,31 @@
#include "ttypes.h"
#include "ttypes.h"
#include "hashjoin.h"
#include "hashjoin.h"
int32_t
initJoinKey
BufInfo
(
SColBufInfo
**
ppInfo
,
int32_t
*
colNum
,
SNodeList
*
pList
,
char
**
ppBuf
)
{
int32_t
initJoinKey
ColsInfo
(
SHJoinTableInfo
*
pTable
,
SNodeList
*
pList
)
{
*
col
Num
=
LIST_LENGTH
(
pList
);
pTable
->
key
Num
=
LIST_LENGTH
(
pList
);
(
*
ppInfo
)
=
taosMemoryMalloc
((
*
colNum
)
*
sizeof
(
SColBuf
Info
));
pTable
->
keyCols
=
taosMemoryMalloc
(
pTable
->
keyNum
*
sizeof
(
SHJoinCol
Info
));
if
(
NULL
==
(
*
ppInfo
)
)
{
if
(
NULL
==
pTable
->
keyCols
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
int64_t
bufSize
=
0
;
int64_t
bufSize
=
0
;
int32_t
i
=
0
;
SNode
*
pNode
=
NULL
;
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pList
)
{
FOREACH
(
pNode
,
pList
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
;
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
;
(
*
ppInfo
)
->
srcSlot
=
pColNode
->
slotId
;
pTable
->
keyCols
[
i
]
->
srcSlot
=
pColNode
->
slotId
;
(
*
ppInfo
)
->
vardata
=
IS_VAR_DATA_TYPE
(
pColNode
->
node
.
resType
.
type
);
pTable
->
keyCols
[
i
]
->
vardata
=
IS_VAR_DATA_TYPE
(
pColNode
->
node
.
resType
.
type
);
(
*
ppInfo
)
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
pTable
->
keyCols
[
i
]
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
bufSize
+=
pColNode
->
node
.
resType
.
bytes
;
bufSize
+=
pColNode
->
node
.
resType
.
bytes
;
++
i
;
}
}
*
ppBuf
=
taosMemoryMalloc
(
bufSize
);
if
(
pTable
->
keyNum
>
1
)
{
if
(
NULL
==
*
ppBuf
)
{
pTable
->
keyBuf
=
taosMemoryMalloc
(
bufSize
);
return
TSDB_CODE_OUT_OF_MEMORY
;
if
(
NULL
==
pTable
->
keyBuf
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -58,46 +62,80 @@ void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
...
@@ -58,46 +62,80 @@ void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
SNode
*
pNode
=
NULL
;
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pList
)
{
FOREACH
(
pNode
,
pList
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
STargetNode
*
pTarget
=
(
STargetNode
*
)
pNode
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pTarget
->
pExpr
;
if
(
pCol
->
dataBlockId
==
blkId
)
{
if
(
pCol
->
dataBlockId
==
blkId
)
{
(
*
colNum
)
++
;
(
*
colNum
)
++
;
}
}
}
}
}
}
int32_t
initJoinValBufInfo
(
SJoinTableInfo
*
pTable
,
SNodeList
*
pList
)
{
bool
valColInKeyCols
(
int16_t
slotId
,
int32_t
keyNum
,
SHJoinColInfo
*
pKeys
,
int32_t
*
pKeyIdx
)
{
for
(
int32_t
i
=
0
;
i
<
keyNum
;
++
i
)
{
if
(
pKeys
[
i
].
srcSlot
==
slotId
)
{
*
pKeyIdx
=
i
;
return
true
;
}
}
return
false
;
}
int32_t
initJoinValColsInfo
(
SHJoinTableInfo
*
pTable
,
SNodeList
*
pList
)
{
getJoinValColNum
(
pList
,
pTable
->
blkId
,
&
pTable
->
valNum
);
getJoinValColNum
(
pList
,
pTable
->
blkId
,
&
pTable
->
valNum
);
if
(
pTable
->
valNum
<=
0
)
{
if
(
pTable
->
valNum
==
0
)
{
qError
(
"fail to get join value column, num:%d"
,
pTable
->
valNum
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_INVALID_MSG
;
}
}
pTable
->
valCols
=
taosMemoryMalloc
(
pTable
->
valNum
*
sizeof
(
S
ColBuf
Info
));
pTable
->
valCols
=
taosMemoryMalloc
(
pTable
->
valNum
*
sizeof
(
S
HJoinCol
Info
));
if
(
NULL
==
pTable
->
valCols
)
{
if
(
NULL
==
pTable
->
valCols
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
int32_t
i
=
0
;
int32_t
colNum
=
0
;
SNode
*
pNode
=
NULL
;
SNode
*
pNode
=
NULL
;
FOREACH
(
pNode
,
pList
)
{
FOREACH
(
pNode
,
pList
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
;
STargetNode
*
pTarget
=
(
STargetNode
*
)
pNode
;
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pTarget
->
pExpr
;
if
(
pColNode
->
dataBlockId
==
pTable
->
blkId
)
{
if
(
pColNode
->
dataBlockId
==
pTable
->
blkId
)
{
pTable
->
valCols
->
srcSlot
=
pColNode
->
slotId
;
if
(
valColInKeyCols
(
pColNode
->
slotId
,
pTable
->
keyNum
,
pTable
->
keyCols
,
&
pTable
->
valCols
[
i
].
srcSlot
))
{
pTable
->
valCols
->
vardata
=
IS_VAR_DATA_TYPE
(
pColNode
->
node
.
resType
.
type
);
pTable
->
valCols
[
i
].
keyCol
=
true
;
if
(
pTable
->
valCols
->
vardata
)
{
}
else
{
pTable
->
valVarData
=
true
;
pTable
->
valCols
[
i
].
keyCol
=
false
;
pTable
->
valCols
[
i
].
srcSlot
=
pColNode
->
slotId
;
pTable
->
valColExist
=
true
;
colNum
++
;
}
}
pTable
->
valCols
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
pTable
->
valCols
[
i
].
dstSlot
=
pTarget
->
slotId
;
pTable
->
valBufSize
+=
pColNode
->
node
.
resType
.
bytes
;
pTable
->
valCols
[
i
].
vardata
=
IS_VAR_DATA_TYPE
(
pColNode
->
node
.
resType
.
type
);
if
(
pTable
->
valCols
[
i
].
vardata
)
{
if
(
NULL
==
pTable
->
valVarCols
)
{
pTable
->
valVarCols
=
taosArrayInit
(
pTable
->
valNum
,
sizeof
(
int32_t
));
if
(
NULL
==
pTable
->
valVarCols
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
taosArrayPush
(
pTable
->
valVarCols
,
&
i
);
}
pTable
->
valCols
[
i
].
bytes
=
pColNode
->
node
.
resType
.
bytes
;
if
(
!
pTable
->
valCols
[
i
].
keyCol
&&
!
pTable
->
valCols
[
i
].
vardata
)
{
pTable
->
valBufSize
+=
pColNode
->
node
.
resType
.
bytes
;
}
i
++
;
}
}
}
}
pTable
->
valBitMapSize
=
colNum
/
sizeof
(
int8_t
)
+
((
colNum
%
sizeof
(
int8_t
))
?
1
:
0
);
pTable
->
valBufSize
+=
pTable
->
valBitMapSize
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
initJoinTableInfo
(
SHJoinOperatorInfo
*
pJoin
,
SHashJoinPhysiNode
*
pJoinNode
,
SOperatorInfo
**
pDownstream
,
int32_t
idx
,
SQueryStat
*
pStat
)
{
int32_t
initJoinTableInfo
(
SHJoinOperatorInfo
*
pJoin
,
SHashJoinPhysiNode
*
pJoinNode
,
SOperatorInfo
**
pDownstream
,
int32_t
idx
,
SQueryStat
*
pStat
)
{
SNodeList
*
pKeyList
=
NULL
;
SNodeList
*
pKeyList
=
NULL
;
SJoinTableInfo
*
pTable
=
&
pJoin
->
tbs
[
idx
];
S
H
JoinTableInfo
*
pTable
=
&
pJoin
->
tbs
[
idx
];
pTable
->
downStream
=
pDownstream
[
idx
];
pTable
->
downStream
=
pDownstream
[
idx
];
pTable
->
blkId
=
pDownstream
[
idx
]
->
resultDataBlockId
;
pTable
->
blkId
=
pDownstream
[
idx
]
->
resultDataBlockId
;
if
(
0
==
idx
)
{
if
(
0
==
idx
)
{
...
@@ -106,11 +144,11 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo
...
@@ -106,11 +144,11 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo
pKeyList
=
pJoinNode
->
pOnRight
;
pKeyList
=
pJoinNode
->
pOnRight
;
}
}
int32_t
code
=
initJoinKey
BufInfo
(
&
pTable
->
keyCols
,
&
pTable
->
keyNum
,
pKeyList
,
&
pTable
->
keyBuf
);
int32_t
code
=
initJoinKey
ColsInfo
(
pTable
,
pKeyList
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
int32_t
code
=
initJoinVal
BufInfo
(
&
pTable
->
keyCols
,
&
pTable
->
keyNum
,
pJoinNode
->
pTargets
,
&
pTable
->
keyBuf
,
pTable
->
blkId
,
pTable
);
int32_t
code
=
initJoinVal
ColsInfo
(
pTable
,
pJoinNode
->
pTargets
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
...
@@ -121,20 +159,50 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo
...
@@ -121,20 +159,50 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo
}
}
void
setJoinBuildAndProbeTable
(
SHJoinOperatorInfo
*
pInfo
,
SHashJoinPhysiNode
*
pJoinNode
)
{
void
setJoinBuildAndProbeTable
(
SHJoinOperatorInfo
*
pInfo
,
SHashJoinPhysiNode
*
pJoinNode
)
{
int32_t
buildIdx
=
0
;
int32_t
probeIdx
=
1
;
pInfo
->
joinType
=
pJoinNode
->
joinType
;
switch
(
pInfo
->
joinType
)
{
case
JOIN_TYPE_INNER
:
if
(
pInfo
->
tbs
[
0
].
inputStat
.
inputRowNum
<=
pInfo
->
tbs
[
1
].
inputStat
.
inputRowNum
)
{
buildIdx
=
0
;
probeIdx
=
1
;
}
else
{
buildIdx
=
1
;
probeIdx
=
0
;
}
break
;
case
JOIN_TYPE_LEFT
:
buildIdx
=
1
;
probeIdx
=
0
;
break
;
case
JOIN_TYPE_RIGHT
:
buildIdx
=
0
;
probeIdx
=
1
;
break
;
default:
break
;
}
pInfo
->
pBuild
=
&
pInfo
->
tbs
[
buildIdx
];
pInfo
->
pProbe
=
&
pInfo
->
tbs
[
probeIdx
];
}
void
buildJoinResColMap
(
SHJoinOperatorInfo
*
pInfo
,
SHashJoinPhysiNode
*
pJoinNode
)
{
pInfo
->
pResColNum
=
pJoinNode
->
pTargets
->
length
;
pInfo
->
pResColNum
=
pJoinNode
->
pTargets
->
length
;
pInfo
->
pResColMap
=
taosMemoryCalloc
(
pJoinNode
->
pTargets
->
length
,
sizeof
(
int8_t
));
pInfo
->
pResColMap
=
taosMemoryCalloc
(
pJoinNode
->
pTargets
->
length
,
sizeof
(
int8_t
));
if
(
NULL
==
pInfo
->
pResColMap
)
{
if
(
NULL
==
pInfo
->
pResColMap
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
pInfo
->
pBuild
=
&
pInfo
->
tbs
[
1
];
pInfo
->
pProbe
=
&
pInfo
->
tbs
[
0
];
SNode
*
pNode
=
NULL
;
SNode
*
pNode
=
NULL
;
int32_t
i
=
0
;
int32_t
i
=
0
;
FOREACH
(
pNode
,
pJoinNode
->
pTargets
)
{
FOREACH
(
pNode
,
pJoinNode
->
pTargets
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
;
STargetNode
*
pTarget
=
(
STargetNode
*
)
pNode
;
if
(
pColNode
->
dataBlockId
==
pInfo
->
pBuild
->
blkId
)
{
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pTarget
->
pExpr
;
if
(
pCol
->
dataBlockId
==
pInfo
->
pBuild
->
blkId
)
{
pInfo
->
pResColMap
[
i
]
=
1
;
pInfo
->
pResColMap
[
i
]
=
1
;
}
}
...
@@ -144,6 +212,7 @@ void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJ
...
@@ -144,6 +212,7 @@ void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJ
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
FORCE_INLINE
int32_t
addPageToJoinBuf
(
SArray
*
pRowBufs
)
{
FORCE_INLINE
int32_t
addPageToJoinBuf
(
SArray
*
pRowBufs
)
{
SBufPageInfo
page
;
SBufPageInfo
page
;
page
.
pageSize
=
HASH_JOIN_DEFAULT_PAGE_SIZE
;
page
.
pageSize
=
HASH_JOIN_DEFAULT_PAGE_SIZE
;
...
@@ -180,19 +249,16 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
...
@@ -180,19 +249,16 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
int32_t
numOfCols
=
0
;
int32_t
numOfCols
=
0
;
pInfo
->
pRes
=
createDataBlockFromDescNode
(
pJoinNode
->
node
.
pOutputDataBlockDesc
);
pInfo
->
pRes
=
createDataBlockFromDescNode
(
pJoinNode
->
node
.
pOutputDataBlockDesc
);
SExprInfo
*
pExprInfo
=
createExprInfo
(
pJoinNode
->
pTargets
,
NULL
,
&
numOfCols
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
setOperatorInfo
(
pOperator
,
"HashJoinOperator"
,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
setOperatorInfo
(
pOperator
,
"HashJoinOperator"
,
QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
pExprInfo
=
pExprInfo
;
pOperator
->
exprSupp
.
numOfExprs
=
numOfCols
;
initJoinTableInfo
(
pInfo
,
pJoinNode
,
pDownstream
,
0
,
&
pJoinNode
->
inputStat
[
0
]);
initJoinTableInfo
(
pInfo
,
pJoinNode
,
pDownstream
,
0
,
&
pJoinNode
->
inputStat
[
0
]);
initJoinTableInfo
(
pInfo
,
pJoinNode
,
pDownstream
,
1
,
&
pJoinNode
->
inputStat
[
1
]);
initJoinTableInfo
(
pInfo
,
pJoinNode
,
pDownstream
,
1
,
&
pJoinNode
->
inputStat
[
1
]);
code
=
setJoinBuildAndProbeTable
(
pInfo
,
pJoinNode
);
setJoinBuildAndProbeTable
(
pInfo
,
pJoinNode
);
code
=
buildJoinResColMap
(
pInfo
,
pJoinNode
);
if
(
code
)
{
if
(
code
)
{
goto
_error
;
goto
_error
;
}
}
...
@@ -210,13 +276,13 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
...
@@ -210,13 +276,13 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
}
}
if
(
pJoinNode
->
pFilterConditions
!=
NULL
&&
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
if
(
pJoinNode
->
pFilterConditions
!=
NULL
&&
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pCond
AfterJoin
=
nodesMakeNode
(
QUERY_NODE_LOGIC_CONDITION
);
pInfo
->
pCond
=
nodesMakeNode
(
QUERY_NODE_LOGIC_CONDITION
);
if
(
pInfo
->
pCond
AfterJoin
==
NULL
)
{
if
(
pInfo
->
pCond
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
goto
_error
;
}
}
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)(
pInfo
->
pCond
AfterJoin
);
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)(
pInfo
->
pCond
);
pLogicCond
->
pParameterList
=
nodesMakeList
();
pLogicCond
->
pParameterList
=
nodesMakeList
();
if
(
pLogicCond
->
pParameterList
==
NULL
)
{
if
(
pLogicCond
->
pParameterList
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -227,29 +293,25 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
...
@@ -227,29 +293,25 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
));
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
));
pLogicCond
->
condType
=
LOGIC_COND_TYPE_AND
;
pLogicCond
->
condType
=
LOGIC_COND_TYPE_AND
;
}
else
if
(
pJoinNode
->
pFilterConditions
!=
NULL
)
{
}
else
if
(
pJoinNode
->
pFilterConditions
!=
NULL
)
{
pInfo
->
pCond
AfterJoin
=
nodesCloneNode
(
pJoinNode
->
pFilterConditions
);
pInfo
->
pCond
=
nodesCloneNode
(
pJoinNode
->
pFilterConditions
);
}
else
if
(
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
}
else
if
(
pJoinNode
->
node
.
pConditions
!=
NULL
)
{
pInfo
->
pCond
AfterJoin
=
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
);
pInfo
->
pCond
=
nodesCloneNode
(
pJoinNode
->
node
.
pConditions
);
}
else
{
}
else
{
pInfo
->
pCond
AfterJoin
=
NULL
;
pInfo
->
pCond
=
NULL
;
}
}
code
=
filterInitFromNode
(
pInfo
->
pCond
AfterJoin
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
code
=
filterInitFromNode
(
pInfo
->
pCond
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
goto
_error
;
}
}
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doHashJoin
,
NULL
,
destroHashJoinOperator
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doHashJoin
,
NULL
,
destroHashJoinOperator
,
optrDefaultBufFn
,
NULL
);
code
=
appendDownstream
(
pOperator
,
pDownstream
,
numOfDownstream
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
return
pOperator
;
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
if
(
pInfo
!=
NULL
)
{
destroy
Merge
JoinOperator
(
pInfo
);
destroy
Hash
JoinOperator
(
pInfo
);
}
}
taosMemoryFree
(
pOperator
);
taosMemoryFree
(
pOperator
);
...
@@ -278,47 +340,56 @@ void destroHashJoinOperator(void* param) {
...
@@ -278,47 +340,56 @@ void destroHashJoinOperator(void* param) {
taosMemoryFreeClear
(
param
);
taosMemoryFreeClear
(
param
);
}
}
FORCE_INLINE
char
*
get
ColDataFromRowBufs
(
SArray
*
pRowBufs
,
SBufRowInfo
*
pRow
)
{
FORCE_INLINE
char
*
retrieve
ColDataFromRowBufs
(
SArray
*
pRowBufs
,
SBufRowInfo
*
pRow
)
{
SBufPageInfo
*
pPage
=
taosArrayGet
(
pRowBufs
,
pRow
->
pageId
);
SBufPageInfo
*
pPage
=
taosArrayGet
(
pRowBufs
,
pRow
->
pageId
);
return
pPage
->
data
+
pRow
->
offset
;
return
pPage
->
data
+
pRow
->
offset
;
}
}
FORCE_INLINE
int32_t
copyJoinResRowsToBlock
(
SHJoinOperatorInfo
*
pJoin
,
int32_t
rowNum
,
SBufRowInfo
*
pStart
,
SSDataBlock
*
pRes
)
{
FORCE_INLINE
int32_t
copyJoinResRowsToBlock
(
SHJoinOperatorInfo
*
pJoin
,
int32_t
rowNum
,
SBufRowInfo
*
pStart
,
SSDataBlock
*
pRes
)
{
SJoinTableInfo
*
pBuild
=
pJoin
->
pBuild
;
S
H
JoinTableInfo
*
pBuild
=
pJoin
->
pBuild
;
SJoinTableInfo
*
pProbe
=
pJoin
->
pProbe
;
S
H
JoinTableInfo
*
pProbe
=
pJoin
->
pProbe
;
int32_t
buildIdx
=
0
;
int32_t
buildIdx
=
0
;
int32_t
probeIdx
=
0
;
int32_t
probeIdx
=
0
;
SBufRowInfo
*
pRow
=
pStart
;
SBufRowInfo
*
pRow
=
pStart
;
int32_t
code
=
0
;
int32_t
code
=
0
;
for
(
int32_t
r
=
0
;
r
<
rowNum
;
++
r
)
{
char
*
pData
=
retrieveColDataFromRowBufs
(
pJoin
->
pRowBufs
,
pRow
);
for
(
int32_t
i
=
0
;
i
<
pJoin
->
pResColNum
;
++
i
)
{
if
(
pJoin
->
pResColMap
[
i
])
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
pRes
->
pDataBlock
,
pBuild
->
valCols
[
buildIdx
].
dstSlot
);
if
(
pBuild
->
valCols
[
buildIdx
].
keyCol
)
{
}
else
if
(
colDataIsNull_f
(
pData
,
r
))
{
for
(
int32_t
i
=
0
;
i
<
pJoin
->
pResColNum
;
++
i
)
{
}
else
{
if
(
pJoin
->
pResColMap
[
i
])
{
code
=
colDataSetVal
(
pDst
,
pRes
->
info
.
rows
+
r
,
,
pRow
->
isNull
);
SColumnInfoData
*
pCol
=
taosArrayGet
(
pRes
->
pDataBlock
,
pBuild
->
valCols
[
buildIdx
].
dstSlot
);
if
(
code
)
{
for
(
int32_t
r
=
0
;
r
<
rowNum
;
++
r
)
{
return
code
;
code
=
colDataSetVal
(
pCol
,
pRes
->
info
.
rows
+
r
,
pRow
->
isNull
?
NULL
:
getColDataFromRowBufs
(
pJoin
->
pRowBufs
,
pRow
),
pRow
->
isNull
);
}
}
buildIdx
++
;
}
else
{
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pJoin
->
ctx
.
pProbeData
,
pProbe
->
valCols
[
probeIdx
].
srcSlot
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pRes
->
pDataBlock
,
pProbe
->
valCols
[
probeIdx
].
dstSlot
);
code
=
colDataCopyNItems
(
pDst
,
pRes
->
info
.
rows
,
colDataGetData
(
pSrc
,
pJoin
->
ctx
.
probeIdx
),
rowNum
,
colDataIsNull_s
(
pSrc
,
pJoin
->
ctx
.
probeIdx
));
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
pRow
=
pRow
->
next
;
probeIdx
++
;
}
buildIdx
++
;
}
else
{
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pJoin
->
ctx
.
pProbeData
,
pProbe
->
valCols
[
probeIdx
].
srcSlot
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pJoin
->
ctx
.
pProbeData
,
pProbe
->
valCols
[
probeIdx
].
dstSlot
);
code
=
colDataCopyNItems
(
pDst
,
pRes
->
info
.
rows
,
colDataGetData
(
pSrc
,
pJoin
->
ctx
.
probeIdx
),
rowNum
,
colDataIsNull_s
(
pSrc
,
pJoin
->
ctx
.
probeIdx
));
if
(
code
)
{
return
code
;
}
}
probeIdx
++
;
}
}
pRow
=
pRow
->
next
;
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
appendJoinResToBlock
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pRes
)
{
FORCE_INLINE
void
appendJoinResToBlock
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pRes
)
{
SHJoinOperatorInfo
*
pJoin
=
pOperator
->
info
;
SHJoinOperatorInfo
*
pJoin
=
pOperator
->
info
;
SHJoinCtx
*
pCtx
=
&
pJoin
->
ctx
;
SHJoinCtx
*
pCtx
=
&
pJoin
->
ctx
;
SBufRowInfo
*
pStart
=
pCtx
->
pBuildRow
;
SBufRowInfo
*
pStart
=
pCtx
->
pBuildRow
;
...
@@ -343,7 +414,7 @@ void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
...
@@ -343,7 +414,7 @@ void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
void
doHashJoinImpl
(
struct
SOperatorInfo
*
pOperator
)
{
void
doHashJoinImpl
(
struct
SOperatorInfo
*
pOperator
)
{
SHJoinOperatorInfo
*
pJoin
=
pOperator
->
info
;
SHJoinOperatorInfo
*
pJoin
=
pOperator
->
info
;
SJoinTableInfo
*
pProbe
=
pJoin
->
pProbe
;
S
H
JoinTableInfo
*
pProbe
=
pJoin
->
pProbe
;
SHJoinCtx
*
pCtx
=
&
pJoin
->
ctx
;
SHJoinCtx
*
pCtx
=
&
pJoin
->
ctx
;
SSDataBlock
*
pRes
=
pJoin
->
pRes
;
SSDataBlock
*
pRes
=
pJoin
->
pRes
;
size_t
bufLen
=
0
;
size_t
bufLen
=
0
;
...
@@ -354,8 +425,8 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) {
...
@@ -354,8 +425,8 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) {
}
}
for
(
int32_t
i
=
pCtx
->
probeIdx
;
i
<
pCtx
->
pProbeData
->
info
.
rows
;
++
i
)
{
for
(
int32_t
i
=
pCtx
->
probeIdx
;
i
<
pCtx
->
pProbeData
->
info
.
rows
;
++
i
)
{
copy
ColDataToBuf
(
pProbe
->
keyNum
,
i
,
pProbe
->
keyCols
,
pProbe
->
keyBuf
,
&
bufLen
);
copy
KeyColsDataToBuf
(
pProbe
,
i
,
&
bufLen
);
SGroupData
*
pGroup
=
tSimpleHashGet
(
pJoin
->
pKeyHash
,
pProbe
->
key
Buf
,
bufLen
);
SGroupData
*
pGroup
=
tSimpleHashGet
(
pJoin
->
pKeyHash
,
pProbe
->
key
Data
,
bufLen
);
if
(
pGroup
)
{
if
(
pGroup
)
{
pCtx
->
pBuildRow
=
pGroup
->
rows
;
pCtx
->
pBuildRow
=
pGroup
->
rows
;
appendJoinResToBlock
(
pOperator
,
pRes
);
appendJoinResToBlock
(
pOperator
,
pRes
);
...
@@ -366,40 +437,82 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) {
...
@@ -366,40 +437,82 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) {
}
}
}
}
int32_t
set
ColBufInfo
(
SSDataBlock
*
pBlock
,
int32_t
colNum
,
SColBufInfo
*
pColList
)
{
int32_t
set
KeyColsData
(
SSDataBlock
*
pBlock
,
SHJoinTableInfo
*
pTable
)
{
for
(
int32_t
i
=
0
;
i
<
col
Num
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pTable
->
key
Num
;
++
i
)
{
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
p
ColList
[
i
].
srcSlot
);
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
p
Table
->
keyCols
[
i
].
srcSlot
);
if
(
p
ColList
[
i
].
vardata
!=
IS_VAR_DATA_TYPE
(
pCol
->
info
.
type
))
{
if
(
p
Table
->
keyCols
[
i
].
vardata
!=
IS_VAR_DATA_TYPE
(
pCol
->
info
.
type
))
{
qError
(
"column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d"
,
i
,
p
ColList
[
i
].
srcSlot
,
pCol
->
info
.
type
,
pColList
[
i
].
vardata
);
qError
(
"column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d"
,
i
,
p
Table
->
keyCols
[
i
].
srcSlot
,
pCol
->
info
.
type
,
pTable
->
keyCols
[
i
].
vardata
);
return
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
}
if
(
p
ColList
[
i
].
bytes
!=
IS_VAR_DATA_TYPE
(
pCol
->
info
.
bytes
)
)
{
if
(
p
Table
->
keyCols
[
i
].
bytes
!=
pCol
->
info
.
bytes
)
{
qError
(
"column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d"
,
i
,
p
ColList
[
i
].
srcSlot
,
pCol
->
info
.
bytes
,
pColList
[
i
].
bytes
);
qError
(
"column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d"
,
i
,
p
Table
->
keyCols
[
i
].
srcSlot
,
pCol
->
info
.
bytes
,
pTable
->
keyCols
[
i
].
bytes
);
return
TSDB_CODE_INVALID_PARA
;
return
TSDB_CODE_INVALID_PARA
;
}
}
pColList
[
i
].
data
=
pCol
->
pData
;
pTable
->
keyCols
[
i
].
data
=
pCol
->
pData
;
if
(
pColList
[
i
].
vardata
)
{
if
(
pTable
->
keyCols
[
i
].
vardata
)
{
pColList
[
i
].
offset
=
pCol
->
varmeta
.
offset
;
pTable
->
keyCols
[
i
].
offset
=
pCol
->
varmeta
.
offset
;
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
setValColsData
(
SSDataBlock
*
pBlock
,
SHJoinTableInfo
*
pTable
)
{
if
(
!
pTable
->
valColExist
)
{
return
TSDB_CODE_SUCCESS
;
}
for
(
int32_t
i
=
0
;
i
<
pTable
->
valNum
;
++
i
)
{
if
(
pTable
->
valCols
[
i
].
keyCol
)
{
continue
;
}
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pTable
->
valCols
[
i
].
srcSlot
);
if
(
pTable
->
valCols
[
i
].
vardata
!=
IS_VAR_DATA_TYPE
(
pCol
->
info
.
type
))
{
qError
(
"column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d"
,
i
,
pTable
->
valCols
[
i
].
srcSlot
,
pCol
->
info
.
type
,
pTable
->
valCols
[
i
].
vardata
);
return
TSDB_CODE_INVALID_PARA
;
}
if
(
pTable
->
valCols
[
i
].
bytes
!=
pCol
->
info
.
bytes
)
{
qError
(
"column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d"
,
i
,
pTable
->
valCols
[
i
].
srcSlot
,
pCol
->
info
.
bytes
,
pTable
->
valCols
[
i
].
bytes
);
return
TSDB_CODE_INVALID_PARA
;
}
if
(
!
pTable
->
valCols
[
i
].
vardata
))
{
pTable
->
valCols
[
i
].
bitMap
=
pCol
->
nullbitmap
;
}
pTable
->
valCols
[
i
].
data
=
pCol
->
pData
;
if
(
pTable
->
valCols
[
i
].
vardata
)
{
pTable
->
valCols
[
i
].
offset
=
pCol
->
varmeta
.
offset
;
}
}
}
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
FORCE_INLINE
void
copyColDataToBuf
(
int32_t
colNum
,
int32_t
rowIdx
,
SColBufInfo
*
pColList
,
char
*
pBuf
,
size_t
*
pBufLen
)
{
char
*
pData
=
NULL
;
FORCE_INLINE
void
copyKeyColsDataToBuf
(
SHJoinTableInfo
*
pTable
,
int32_t
rowIdx
,
size_t
*
pBufLen
)
{
char
*
pData
=
NULL
;
size_t
bufLen
=
0
;
size_t
bufLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
colNum
;
++
i
)
{
if
(
pColList
[
i
].
vardata
)
{
if
(
1
==
pTable
->
keyNum
)
{
pData
=
pColList
[
i
].
data
+
pColList
[
i
].
offset
[
rowIdx
];
if
(
pTable
->
keyCols
[
0
].
vardata
)
{
memcpy
(
pBuf
+
bufLen
,
pData
,
varDataTLen
(
pData
))
;
pData
=
pTable
->
keyCols
[
0
].
data
+
pTable
->
keyCols
[
0
].
offset
[
rowIdx
]
;
bufLen
+
=
varDataTLen
(
pData
);
bufLen
=
varDataTLen
(
pData
);
}
else
{
}
else
{
pData
=
pColList
[
i
].
data
+
pColList
[
i
].
bytes
*
rowIdx
;
pData
=
pTable
->
keyCols
[
0
].
data
+
pTable
->
keyCols
[
0
].
bytes
*
rowIdx
;
memcpy
(
pBuf
+
bufLen
,
pColList
[
i
].
data
,
pColList
[
i
].
bytes
);
bufLen
=
pTable
->
keyCols
[
0
].
bytes
;
bufLen
+=
pColList
[
i
].
bytes
;
}
}
pTable
->
keyData
=
pData
;
}
else
{
for
(
int32_t
i
=
0
;
i
<
pTable
->
keyNum
;
++
i
)
{
if
(
pTable
->
keyCols
[
i
].
vardata
)
{
pData
=
pTable
->
keyCols
[
i
].
data
+
pTable
->
keyCols
[
i
].
offset
[
rowIdx
];
memcpy
(
pTable
->
keyBuf
+
bufLen
,
pData
,
varDataTLen
(
pData
));
bufLen
+=
varDataTLen
(
pData
);
}
else
{
pData
=
pTable
->
keyCols
[
i
].
data
+
pTable
->
keyCols
[
i
].
bytes
*
rowIdx
;
memcpy
(
pTable
->
keyBuf
+
bufLen
,
pData
,
pTable
->
keyCols
[
i
].
bytes
);
bufLen
+=
pTable
->
keyCols
[
i
].
bytes
;
}
}
pTable
->
keyData
=
pTable
->
keyBuf
;
}
}
if
(
pBufLen
)
{
if
(
pBufLen
)
{
...
@@ -407,7 +520,50 @@ FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo*
...
@@ -407,7 +520,50 @@ FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo*
}
}
}
}
FORCE_INLINE
void
copyValColsDataToBuf
(
SHJoinTableInfo
*
pTable
,
int32_t
rowIdx
)
{
if
(
!
pTable
->
valColExist
)
{
return
;
}
char
*
pData
=
NULL
;
size_t
bufLen
=
pTable
->
valBitMapSize
;
for
(
int32_t
i
=
0
,
m
=
0
;
i
<
pTable
->
valNum
;
++
i
)
{
if
(
pTable
->
valCols
[
i
].
keyCol
)
{
continue
;
}
if
(
pTable
->
valCols
[
i
].
vardata
)
{
if
(
-
1
==
pTable
->
valCols
[
i
].
offset
[
rowIdx
])
{
colDataSetNull_f
(
pTable
->
valData
,
m
);
}
else
{
pData
=
pTable
->
valCols
[
i
].
data
+
pTable
->
valCols
[
i
].
offset
[
rowIdx
];
memcpy
(
pTable
->
valData
+
bufLen
,
pData
,
varDataTLen
(
pData
));
bufLen
+=
varDataTLen
(
pData
);
}
}
else
{
if
(
colDataIsNull_f
(
pTable
->
valCols
[
i
].
bitMap
,
rowIdx
))
{
colDataSetNull_f
(
pTable
->
valData
,
m
);
}
else
{
pData
=
pTable
->
valCols
[
i
].
data
+
pTable
->
valCols
[
i
].
bytes
*
rowIdx
;
memcpy
(
pTable
->
valData
+
bufLen
,
pData
,
pTable
->
valCols
[
i
].
bytes
);
bufLen
+=
pTable
->
valCols
[
i
].
bytes
;
}
}
m
++
;
}
}
FORCE_INLINE
int32_t
getValBufFromPages
(
SArray
*
pPages
,
int32_t
bufSize
,
char
**
pBuf
,
SBufRowInfo
*
pRow
)
{
FORCE_INLINE
int32_t
getValBufFromPages
(
SArray
*
pPages
,
int32_t
bufSize
,
char
**
pBuf
,
SBufRowInfo
*
pRow
)
{
if
(
0
==
bufSize
)
{
pRow
->
pageId
=
-
1
;
return
TSDB_CODE_SUCCESS
;
}
if
(
bufSize
>
HASH_JOIN_DEFAULT_PAGE_SIZE
)
{
qError
(
"invalid join value buf size:%d"
,
bufSize
);
return
TSDB_CODE_INVALID_PARA
;
}
do
{
do
{
SBufPageInfo
*
page
=
taosArrayGetLast
(
pPages
);
SBufPageInfo
*
page
=
taosArrayGetLast
(
pPages
);
if
((
page
->
pageSize
-
page
->
offset
)
>=
bufSize
)
{
if
((
page
->
pageSize
-
page
->
offset
)
>=
bufSize
)
{
...
@@ -425,26 +581,25 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char**
...
@@ -425,26 +581,25 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char**
}
while
(
true
);
}
while
(
true
);
}
}
FORCE_INLINE
int32_t
getJoinValBufSize
(
SJoinTableInfo
*
pTable
,
int32_t
rowIdx
)
{
FORCE_INLINE
int32_t
getJoinValBufSize
(
S
H
JoinTableInfo
*
pTable
,
int32_t
rowIdx
)
{
if
(
!
pTable
->
valVarData
)
{
if
(
NULL
==
pTable
->
valVarCols
)
{
return
pTable
->
valBufSize
;
return
pTable
->
valBufSize
;
}
}
int32_t
bufLen
=
0
;
int32_t
*
varColIdx
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
pTable
->
valNum
;
++
i
)
{
int32_t
bufLen
=
pTable
->
valBufSize
;
if
(
pTable
->
valCols
[
i
].
vardata
)
{
int32_t
varColNum
=
taosArrayGetSize
(
pTable
->
valVarCols
);
char
*
pData
=
pTable
->
valCols
[
i
].
data
+
pTable
->
valCols
[
i
].
offset
[
rowIdx
];
for
(
int32_t
i
=
0
;
i
<
varColNum
;
++
i
)
{
bufLen
+=
varDataTLen
(
pData
);
varColIdx
=
taosArrayGet
(
pTable
->
valVarCols
,
i
);
}
else
{
char
*
pData
=
pTable
->
valCols
[
*
varColIdx
].
data
+
pTable
->
valCols
[
*
varColIdx
].
offset
[
rowIdx
];
bufLen
+=
pTable
->
valCols
[
i
].
bytes
;
bufLen
+=
varDataTLen
(
pData
);
}
}
}
return
bufLen
;
return
bufLen
;
}
}
int32_t
getJoinValBuf
(
SHJoinOperatorInfo
*
pJoin
,
SSHashObj
*
pHash
,
SGroupData
*
pGroup
,
SJoinTableInfo
*
pTable
,
char
**
pBuf
,
size_t
keyLen
,
int32_t
rowIdx
)
{
int32_t
addRowToHashImpl
(
SHJoinOperatorInfo
*
pJoin
,
SGroupData
*
pGroup
,
SHJoinTableInfo
*
pTable
,
size_t
keyLen
,
int32_t
rowIdx
)
{
SGroupData
group
=
{
0
};
SGroupData
group
=
{
0
};
SBufRowInfo
*
pRow
=
NULL
;
SBufRowInfo
*
pRow
=
NULL
;
...
@@ -461,14 +616,14 @@ int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* p
...
@@ -461,14 +616,14 @@ int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* p
}
}
}
}
int32_t
code
=
getValBufFromPages
(
pJoin
->
pRowBufs
,
getJoinValBufSize
(
pTable
,
rowIdx
),
pBuf
,
pRow
);
int32_t
code
=
getValBufFromPages
(
pJoin
->
pRowBufs
,
getJoinValBufSize
(
pTable
,
rowIdx
),
&
pTable
->
valData
,
pRow
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
if
(
NULL
==
pGroup
)
{
if
(
NULL
==
pGroup
)
{
pRow
->
next
=
NULL
;
pRow
->
next
=
NULL
;
if
(
tSimpleHashPut
(
p
Hash
,
pTable
->
keyBuf
,
keyLen
,
&
group
,
sizeof
(
group
)))
{
if
(
tSimpleHashPut
(
p
Join
->
pKeyHash
,
pTable
->
keyData
,
keyLen
,
&
group
,
sizeof
(
group
)))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
}
else
{
}
else
{
...
@@ -479,36 +634,35 @@ int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* p
...
@@ -479,36 +634,35 @@ int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* p
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
addRowToHash
(
SHJoinOperatorInfo
*
pJoin
,
SSDataBlock
*
pBlock
,
char
*
pKey
,
size_t
keyLen
,
int32_t
rowIdx
)
{
int32_t
addRowToHash
(
SHJoinOperatorInfo
*
pJoin
,
SSDataBlock
*
pBlock
,
size_t
keyLen
,
int32_t
rowIdx
)
{
SJoinTableInfo
*
pBuild
=
pJoin
->
pBuild
;
S
H
JoinTableInfo
*
pBuild
=
pJoin
->
pBuild
;
int32_t
code
=
set
ColBufInfo
(
pBlock
,
pBuild
->
valNum
,
pBuild
->
valCols
);
int32_t
code
=
set
ValColsData
(
pBlock
,
pBuild
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
char
*
valBuf
=
NULL
;
SGroupData
*
pGroup
=
tSimpleHashGet
(
pJoin
->
pKeyHash
,
pBuild
->
keyData
,
keyLen
);
SGroupData
*
pRes
=
tSimpleHashGet
(
pJoin
->
pKeyHash
,
pBuild
->
keyBuf
,
keyLen
);
code
=
addRowToHashImpl
(
pJoin
,
pGroup
,
pBuild
,
keyLen
,
rowIdx
);
code
=
getJoinValBuf
(
pJoin
->
pKeyHash
,
pRes
,
pBuild
,
&
valBuf
,
keyLen
,
rowIdx
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
copy
ColDataToBuf
(
pBuild
->
valNum
,
rowIdx
,
pBuild
->
valCols
,
valBuf
,
NULL
);
copy
ValColsDataToBuf
(
pBuild
,
rowIdx
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
addBlockRowsToHash
(
SSDataBlock
*
pBlock
,
SHJoinOperatorInfo
*
pJoin
)
{
int32_t
addBlockRowsToHash
(
SSDataBlock
*
pBlock
,
SHJoinOperatorInfo
*
pJoin
)
{
SJoinTableInfo
*
pBuild
=
pJoin
->
pBuild
;
S
H
JoinTableInfo
*
pBuild
=
pJoin
->
pBuild
;
int32_t
code
=
set
ColBufInfo
(
pBlock
,
pBuild
->
keyNum
,
pBuild
->
keyCols
);
int32_t
code
=
set
KeyColsData
(
pBlock
,
pBuild
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
size_t
bufLen
=
0
;
size_t
bufLen
=
0
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
copy
ColDataToBuf
(
pBuild
->
keyNum
,
i
,
pBuild
->
keyCols
,
pBuild
->
keyBuf
,
&
bufLen
);
copy
KeyColsDataToBuf
(
pBuild
,
i
,
&
bufLen
);
code
=
addRowToHash
(
pJoin
,
pBlock
,
pBuild
->
keyBuf
,
bufLen
,
i
);
code
=
addRowToHash
(
pJoin
,
pBlock
,
bufLen
,
i
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
...
@@ -528,7 +682,7 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
...
@@ -528,7 +682,7 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
break
;
break
;
}
}
code
=
addBlockRowsToHash
(
pBlock
,
pJoin
->
pKeyHash
,
pJoin
->
pBuild
);
code
=
addBlockRowsToHash
(
pBlock
,
pJoin
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
...
@@ -539,17 +693,16 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
...
@@ -539,17 +693,16 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
void
launchBlockHashJoin
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
void
launchBlockHashJoin
(
struct
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
SHJoinOperatorInfo
*
pJoin
=
pOperator
->
info
;
SHJoinOperatorInfo
*
pJoin
=
pOperator
->
info
;
SJoinTableInfo
*
pProbe
=
pJoin
->
pProbe
;
S
H
JoinTableInfo
*
pProbe
=
pJoin
->
pProbe
;
int32_t
code
=
set
ColBufInfo
(
pBlock
,
pProbe
->
keyNum
,
pProbe
->
keyCols
);
int32_t
code
=
set
KeyColsData
(
pBlock
,
pProbe
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
code
=
set
ColBufInfo
(
pBlock
,
pProbe
->
valNum
,
pProbe
->
valCols
);
code
=
set
ValColsData
(
pBlock
,
pProbe
);
if
(
code
)
{
if
(
code
)
{
return
code
;
return
code
;
}
}
pJoin
->
ctx
.
probeIdx
=
0
;
pJoin
->
ctx
.
probeIdx
=
0
;
pJoin
->
ctx
.
pBuildRow
=
NULL
;
pJoin
->
ctx
.
pBuildRow
=
NULL
;
pJoin
->
ctx
.
pProbeData
=
pBlock
;
pJoin
->
ctx
.
pProbeData
=
pBlock
;
...
@@ -562,7 +715,7 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
...
@@ -562,7 +715,7 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SSDataBlock
*
pRes
=
pJoin
->
pRes
;
SSDataBlock
*
pRes
=
pJoin
->
pRes
;
blockDataCleanup
(
pRes
)
;
pRes
->
info
.
rows
=
0
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
return
NULL
;
...
@@ -596,6 +749,8 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
...
@@ -596,6 +749,8 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
while
(
true
)
{
while
(
true
)
{
SSDataBlock
*
pBlock
=
pJoin
->
pProbe
->
downStream
->
fpSet
.
getNextFn
(
pJoin
->
pProbe
->
downStream
);
SSDataBlock
*
pBlock
=
pJoin
->
pProbe
->
downStream
->
fpSet
.
getNextFn
(
pJoin
->
pProbe
->
downStream
);
if
(
NULL
==
pBlock
)
{
if
(
NULL
==
pBlock
)
{
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
pOperator
->
status
=
OP_EXEC_DONE
;
break
;
break
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录