Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
2dot5
ClickHouse
提交
8cea4531
C
ClickHouse
项目概览
2dot5
/
ClickHouse
通知
3
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
ClickHouse
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
8cea4531
编写于
4月 18, 2020
作者:
A
alexey-milovidov
提交者:
GitHub
4月 18, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10133 from 4ertus2/joins
JOIN with Dictionary
上级
d2462183
47cb6702
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
724 addition
and
124 deletion
+724
-124
src/Interpreters/DictionaryReader.cpp
src/Interpreters/DictionaryReader.cpp
+167
-0
src/Interpreters/DictionaryReader.h
src/Interpreters/DictionaryReader.h
+46
-0
src/Interpreters/ExpressionAnalyzer.cpp
src/Interpreters/ExpressionAnalyzer.cpp
+67
-43
src/Interpreters/ExpressionAnalyzer.h
src/Interpreters/ExpressionAnalyzer.h
+0
-2
src/Interpreters/HashJoin.cpp
src/Interpreters/HashJoin.cpp
+102
-10
src/Interpreters/HashJoin.h
src/Interpreters/HashJoin.h
+8
-3
src/Interpreters/InterpreterSelectQuery.cpp
src/Interpreters/InterpreterSelectQuery.cpp
+2
-1
src/Interpreters/JoinedTables.cpp
src/Interpreters/JoinedTables.cpp
+67
-0
src/Interpreters/JoinedTables.h
src/Interpreters/JoinedTables.h
+2
-0
src/Interpreters/SyntaxAnalyzer.cpp
src/Interpreters/SyntaxAnalyzer.cpp
+5
-35
src/Interpreters/SyntaxAnalyzer.h
src/Interpreters/SyntaxAnalyzer.h
+2
-1
src/Interpreters/TableJoin.cpp
src/Interpreters/TableJoin.cpp
+41
-14
src/Interpreters/TableJoin.h
src/Interpreters/TableJoin.h
+12
-3
src/Storages/StorageDictionary.h
src/Storages/StorageDictionary.h
+2
-0
tests/queries/0_stateless/00561_storage_join.sql
tests/queries/0_stateless/00561_storage_join.sql
+2
-4
tests/queries/0_stateless/01115_join_with_dictionary.reference
.../queries/0_stateless/01115_join_with_dictionary.reference
+103
-0
tests/queries/0_stateless/01115_join_with_dictionary.sql
tests/queries/0_stateless/01115_join_with_dictionary.sql
+90
-0
tests/queries/1_stateful/00065_loyalty_with_storage_join.sql
tests/queries/1_stateful/00065_loyalty_with_storage_join.sql
+6
-8
未找到文件。
src/Interpreters/DictionaryReader.cpp
0 → 100644
浏览文件 @
8cea4531
#include <Interpreters/DictionaryReader.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
namespace
DB
{
namespace
ErrorCodes
{
extern
const
int
NUMBER_OF_COLUMNS_DOESNT_MATCH
;
extern
const
int
TYPE_MISMATCH
;
}
DictionaryReader
::
FunctionWrapper
::
FunctionWrapper
(
FunctionOverloadResolverPtr
resolver
,
const
ColumnsWithTypeAndName
&
arguments
,
Block
&
block
,
const
ColumnNumbers
&
arg_positions_
,
const
String
&
column_name
,
TypeIndex
expected_type
)
:
arg_positions
(
arg_positions_
)
,
result_pos
(
block
.
columns
())
{
FunctionBasePtr
prepared_function
=
resolver
->
build
(
arguments
);
ColumnWithTypeAndName
result
;
result
.
name
=
"get_"
+
column_name
;
result
.
type
=
prepared_function
->
getReturnType
();
if
(
result
.
type
->
getTypeId
()
!=
expected_type
)
throw
Exception
(
"Type mismatch in dictionary reader for: "
+
column_name
,
ErrorCodes
::
TYPE_MISMATCH
);
block
.
insert
(
result
);
function
=
prepared_function
->
prepare
(
block
,
arg_positions
,
result_pos
);
}
static
constexpr
const
size_t
key_size
=
1
;
DictionaryReader
::
DictionaryReader
(
const
String
&
dictionary_name
,
const
Names
&
src_column_names
,
const
NamesAndTypesList
&
result_columns
,
const
Context
&
context
)
:
result_header
(
makeResultBlock
(
result_columns
))
,
key_position
(
key_size
+
result_header
.
columns
())
{
if
(
src_column_names
.
size
()
!=
result_columns
.
size
())
throw
Exception
(
"Columns number mismatch in dictionary reader"
,
ErrorCodes
::
NUMBER_OF_COLUMNS_DOESNT_MATCH
);
ColumnWithTypeAndName
dict_name
;
ColumnWithTypeAndName
key
;
ColumnWithTypeAndName
column_name
;
{
dict_name
.
name
=
"dict"
;
dict_name
.
type
=
std
::
make_shared
<
DataTypeString
>
();
dict_name
.
column
=
dict_name
.
type
->
createColumnConst
(
1
,
dictionary_name
);
/// TODO: composite key (key_size > 1)
key
.
name
=
"key"
;
key
.
type
=
std
::
make_shared
<
DataTypeUInt64
>
();
column_name
.
name
=
"column"
;
column_name
.
type
=
std
::
make_shared
<
DataTypeString
>
();
}
/// dictHas('dict_name', id)
ColumnsWithTypeAndName
arguments_has
;
arguments_has
.
push_back
(
dict_name
);
arguments_has
.
push_back
(
key
);
/// dictGet('dict_name', 'attr_name', id)
ColumnsWithTypeAndName
arguments_get
;
arguments_get
.
push_back
(
dict_name
);
arguments_get
.
push_back
(
column_name
);
arguments_get
.
push_back
(
key
);
sample_block
.
insert
(
dict_name
);
for
(
auto
&
columns_name
:
src_column_names
)
{
ColumnWithTypeAndName
name
;
name
.
name
=
"col_"
+
columns_name
;
name
.
type
=
std
::
make_shared
<
DataTypeString
>
();
name
.
column
=
name
.
type
->
createColumnConst
(
1
,
columns_name
);
sample_block
.
insert
(
name
);
}
sample_block
.
insert
(
key
);
ColumnNumbers
positions_has
{
0
,
key_position
};
function_has
=
std
::
make_unique
<
FunctionWrapper
>
(
FunctionFactory
::
instance
().
get
(
"dictHas"
,
context
),
arguments_has
,
sample_block
,
positions_has
,
"has"
,
DataTypeUInt8
().
getTypeId
());
functions_get
.
reserve
(
result_header
.
columns
());
for
(
size_t
i
=
0
;
i
<
result_header
.
columns
();
++
i
)
{
size_t
column_name_pos
=
key_size
+
i
;
auto
&
column
=
result_header
.
getByPosition
(
i
);
arguments_get
[
1
].
column
=
DataTypeString
().
createColumnConst
(
1
,
src_column_names
[
i
]);
ColumnNumbers
positions_get
{
0
,
column_name_pos
,
key_position
};
functions_get
.
emplace_back
(
FunctionWrapper
(
FunctionFactory
::
instance
().
get
(
"dictGet"
,
context
),
arguments_get
,
sample_block
,
positions_get
,
column
.
name
,
column
.
type
->
getTypeId
()));
}
}
void
DictionaryReader
::
readKeys
(
const
IColumn
&
keys
,
Block
&
out_block
,
ColumnVector
<
UInt8
>::
Container
&
found
,
std
::
vector
<
size_t
>
&
positions
)
const
{
Block
working_block
=
sample_block
;
size_t
has_position
=
key_position
+
1
;
size_t
size
=
keys
.
size
();
/// set keys for dictHas()
ColumnWithTypeAndName
&
key_column
=
working_block
.
getByPosition
(
key_position
);
key_column
.
column
=
keys
.
cloneResized
(
size
);
/// just a copy we cannot avoid
/// calculate and extract dictHas()
function_has
->
execute
(
working_block
,
size
);
ColumnWithTypeAndName
&
has_column
=
working_block
.
getByPosition
(
has_position
);
auto
mutable_has
=
(
*
std
::
move
(
has_column
.
column
)).
mutate
();
found
.
swap
(
typeid_cast
<
ColumnVector
<
UInt8
>
&>
(
*
mutable_has
).
getData
());
has_column
.
column
=
nullptr
;
/// set mapping form source keys to resulting rows in output block
positions
.
clear
();
positions
.
resize
(
size
,
0
);
size_t
pos
=
0
;
for
(
size_t
i
=
0
;
i
<
size
;
++
i
)
if
(
found
[
i
])
positions
[
i
]
=
pos
++
;
/// set keys for dictGet(): remove not found keys
key_column
.
column
=
key_column
.
column
->
filter
(
found
,
-
1
);
size_t
rows
=
key_column
.
column
->
size
();
/// calculate dictGet()
for
(
auto
&
func
:
functions_get
)
func
.
execute
(
working_block
,
rows
);
/// make result: copy header block with correct names and move data columns
out_block
=
result_header
.
cloneEmpty
();
size_t
first_get_position
=
has_position
+
1
;
for
(
size_t
i
=
0
;
i
<
out_block
.
columns
();
++
i
)
{
auto
&
src_column
=
working_block
.
getByPosition
(
first_get_position
+
i
);
auto
&
dst_column
=
out_block
.
getByPosition
(
i
);
dst_column
.
column
=
src_column
.
column
;
src_column
.
column
=
nullptr
;
}
}
Block
DictionaryReader
::
makeResultBlock
(
const
NamesAndTypesList
&
names
)
{
Block
block
;
for
(
auto
&
nm
:
names
)
{
ColumnWithTypeAndName
column
{
nullptr
,
nm
.
type
,
nm
.
name
};
if
(
column
.
type
->
isNullable
())
column
.
type
=
typeid_cast
<
const
DataTypeNullable
&>
(
*
column
.
type
).
getNestedType
();
block
.
insert
(
std
::
move
(
column
));
}
return
block
;
}
}
src/Interpreters/DictionaryReader.h
0 → 100644
浏览文件 @
8cea4531
#pragma once
#include <Core/Block.h>
#include <Columns/ColumnVector.h>
#include <Functions/IFunctionAdaptors.h>
namespace
DB
{
class
Context
;
/// Read block of required columns from Dictionary by UInt64 key column. Rename columns if needed.
/// Current implementation uses dictHas() + N * dictGet() functions.
class
DictionaryReader
{
public:
struct
FunctionWrapper
{
ExecutableFunctionPtr
function
;
ColumnNumbers
arg_positions
;
size_t
result_pos
=
0
;
FunctionWrapper
(
FunctionOverloadResolverPtr
resolver
,
const
ColumnsWithTypeAndName
&
arguments
,
Block
&
block
,
const
ColumnNumbers
&
arg_positions_
,
const
String
&
column_name
,
TypeIndex
expected_type
);
void
execute
(
Block
&
block
,
size_t
rows
)
const
{
function
->
execute
(
block
,
arg_positions
,
result_pos
,
rows
,
false
);
}
};
DictionaryReader
(
const
String
&
dictionary_name
,
const
Names
&
src_column_names
,
const
NamesAndTypesList
&
result_columns
,
const
Context
&
context
);
void
readKeys
(
const
IColumn
&
keys
,
Block
&
out_block
,
ColumnVector
<
UInt8
>::
Container
&
found
,
std
::
vector
<
size_t
>
&
positions
)
const
;
private:
Block
result_header
;
Block
sample_block
;
/// dictionary name, column names, key, dictHas() result, dictGet() results
size_t
key_position
;
std
::
unique_ptr
<
FunctionWrapper
>
function_has
;
std
::
vector
<
FunctionWrapper
>
functions_get
;
static
Block
makeResultBlock
(
const
NamesAndTypesList
&
names
);
};
}
src/Interpreters/ExpressionAnalyzer.cpp
浏览文件 @
8cea4531
...
...
@@ -31,17 +31,20 @@
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/DictionaryReader.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
#include <Storages/StorageDistributed.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageJoin.h>
#include <DataStreams/copyData.h>
#include <DataStreams/IBlockInputStream.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
...
...
@@ -502,25 +505,11 @@ bool SelectQueryExpressionAnalyzer::appendJoin(ExpressionActionsChain & chain, b
return
true
;
}
static
JoinPtr
tryGetStorageJoin
(
const
ASTTablesInSelectQueryElement
&
join_element
,
std
::
shared_ptr
<
TableJoin
>
analyzed_join
,
const
Context
&
context
)
static
JoinPtr
tryGetStorageJoin
(
std
::
shared_ptr
<
TableJoin
>
analyzed_join
)
{
const
auto
&
table_to_join
=
join_element
.
table_expression
->
as
<
ASTTableExpression
&>
();
/// TODO This syntax does not support specifying a database name.
if
(
table_to_join
.
database_and_table_name
)
{
auto
table_id
=
context
.
resolveStorageID
(
table_to_join
.
database_and_table_name
);
StoragePtr
table
=
DatabaseCatalog
::
instance
().
tryGetTable
(
table_id
);
if
(
table
)
{
auto
*
storage_join
=
dynamic_cast
<
StorageJoin
*>
(
table
.
get
());
if
(
storage_join
)
return
storage_join
->
getJoin
(
analyzed_join
);
}
}
if
(
auto
*
table
=
analyzed_join
->
joined_storage
.
get
())
if
(
auto
*
storage_join
=
dynamic_cast
<
StorageJoin
*>
(
table
))
return
storage_join
->
getJoin
(
analyzed_join
);
return
{};
}
...
...
@@ -531,10 +520,44 @@ static ExpressionActionsPtr createJoinedBlockActions(const Context & context, co
return
ExpressionAnalyzer
(
expression_list
,
syntax_result
,
context
).
getActions
(
true
,
false
);
}
static
std
::
shared_ptr
<
IJoin
>
makeJoin
(
std
::
shared_ptr
<
TableJoin
>
analyzed_join
,
const
Block
&
sample_block
)
static
bool
allowDictJoin
(
StoragePtr
joined_storage
,
const
Context
&
context
,
String
&
dict_name
,
String
&
key_name
)
{
auto
*
dict
=
dynamic_cast
<
const
StorageDictionary
*>
(
joined_storage
.
get
());
if
(
!
dict
)
return
false
;
dict_name
=
dict
->
dictionaryName
();
auto
dictionary
=
context
.
getExternalDictionariesLoader
().
getDictionary
(
dict_name
);
if
(
!
dictionary
)
return
false
;
const
DictionaryStructure
&
structure
=
dictionary
->
getStructure
();
if
(
structure
.
id
)
{
key_name
=
structure
.
id
->
name
;
return
true
;
}
return
false
;
}
static
std
::
shared_ptr
<
IJoin
>
makeJoin
(
std
::
shared_ptr
<
TableJoin
>
analyzed_join
,
const
Block
&
sample_block
,
const
Context
&
context
)
{
bool
allow_merge_join
=
analyzed_join
->
allowMergeJoin
();
/// HashJoin with Dictionary optimisation
String
dict_name
;
String
key_name
;
if
(
analyzed_join
->
joined_storage
&&
allowDictJoin
(
analyzed_join
->
joined_storage
,
context
,
dict_name
,
key_name
))
{
Names
original_names
;
NamesAndTypesList
result_columns
;
if
(
analyzed_join
->
allowDictJoin
(
key_name
,
sample_block
,
original_names
,
result_columns
))
{
analyzed_join
->
dictionary_reader
=
std
::
make_shared
<
DictionaryReader
>
(
dict_name
,
original_names
,
result_columns
,
context
);
return
std
::
make_shared
<
HashJoin
>
(
analyzed_join
,
sample_block
);
}
}
if
(
analyzed_join
->
forceHashJoin
()
||
(
analyzed_join
->
preferMergeJoin
()
&&
!
allow_merge_join
))
return
std
::
make_shared
<
HashJoin
>
(
analyzed_join
,
sample_block
);
else
if
(
analyzed_join
->
forceMergeJoin
()
||
(
analyzed_join
->
preferMergeJoin
()
&&
allow_merge_join
))
...
...
@@ -550,48 +573,49 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(const ASTTablesInSelectQuer
SubqueryForSet
&
subquery_for_join
=
subqueries_for_sets
[
join_subquery_id
];
///
Special case - if table name is specified on the right of JOIN, then the table has the type Join (the previously prepared mapping)
.
///
Use StorageJoin if any
.
if
(
!
subquery_for_join
.
join
)
subquery_for_join
.
join
=
tryGetStorageJoin
(
join_element
,
syntax
->
analyzed_join
,
context
);
subquery_for_join
.
join
=
tryGetStorageJoin
(
syntax
->
analyzed_join
);
if
(
!
subquery_for_join
.
join
)
{
/// Actions which need to be calculated on joined block.
ExpressionActionsPtr
joined_block_actions
=
createJoinedBlockActions
(
context
,
analyzedJoin
());
Names
original_right_columns
;
if
(
!
subquery_for_join
.
source
)
{
NamesWithAliases
required_columns_with_aliases
=
analyzedJoin
().
getRequiredColumns
(
joined_block_actions
->
getSampleBlock
(),
joined_block_actions
->
getRequiredColumns
());
makeSubqueryForJoin
(
join_element
,
std
::
move
(
required_columns_with_aliases
),
subquery_for_join
);
NamesWithAliases
required_columns_with_aliases
=
analyzedJoin
().
getRequiredColumns
(
joined_block_actions
->
getSampleBlock
(),
joined_block_actions
->
getRequiredColumns
());
for
(
auto
&
pr
:
required_columns_with_aliases
)
original_right_columns
.
push_back
(
pr
.
first
);
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
* - this function shows the expression JOIN _data1.
*/
auto
interpreter
=
interpretSubquery
(
join_element
.
table_expression
,
context
,
original_right_columns
,
query_options
);
subquery_for_join
.
makeSource
(
interpreter
,
std
::
move
(
required_columns_with_aliases
));
}
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_join
.
setJoinActions
(
joined_block_actions
);
/// changes subquery_for_join.sample_block inside
subquery_for_join
.
join
=
makeJoin
(
syntax
->
analyzed_join
,
subquery_for_join
.
sample_block
);
subquery_for_join
.
join
=
makeJoin
(
syntax
->
analyzed_join
,
subquery_for_join
.
sample_block
,
context
);
/// Do not make subquery for join over dictionary.
if
(
syntax
->
analyzed_join
->
dictionary_reader
)
{
JoinPtr
join
=
subquery_for_join
.
join
;
subqueries_for_sets
.
erase
(
join_subquery_id
);
return
join
;
}
}
return
subquery_for_join
.
join
;
}
void
SelectQueryExpressionAnalyzer
::
makeSubqueryForJoin
(
const
ASTTablesInSelectQueryElement
&
join_element
,
NamesWithAliases
&&
required_columns_with_aliases
,
SubqueryForSet
&
subquery_for_set
)
const
{
/** For GLOBAL JOINs (in the case, for example, of the push method for executing GLOBAL subqueries), the following occurs
* - in the addExternalStorage function, the JOIN (SELECT ...) subquery is replaced with JOIN _data1,
* in the subquery_for_set object this subquery is exposed as source and the temporary table _data1 as the `table`.
* - this function shows the expression JOIN _data1.
*/
Names
original_columns
;
for
(
auto
&
pr
:
required_columns_with_aliases
)
original_columns
.
push_back
(
pr
.
first
);
auto
interpreter
=
interpretSubquery
(
join_element
.
table_expression
,
context
,
original_columns
,
query_options
);
subquery_for_set
.
makeSource
(
interpreter
,
std
::
move
(
required_columns_with_aliases
));
}
bool
SelectQueryExpressionAnalyzer
::
appendPrewhere
(
ExpressionActionsChain
&
chain
,
bool
only_types
,
const
Names
&
additional_required_columns
)
{
...
...
src/Interpreters/ExpressionAnalyzer.h
浏览文件 @
8cea4531
...
...
@@ -276,8 +276,6 @@ private:
SetPtr
isPlainStorageSetInSubquery
(
const
ASTPtr
&
subquery_or_table_name
);
JoinPtr
makeTableJoin
(
const
ASTTablesInSelectQueryElement
&
join_element
);
void
makeSubqueryForJoin
(
const
ASTTablesInSelectQueryElement
&
join_element
,
NamesWithAliases
&&
required_columns_with_aliases
,
SubqueryForSet
&
subquery_for_set
)
const
;
const
ASTSelectQuery
*
getAggregatingQuery
()
const
;
...
...
src/Interpreters/HashJoin.cpp
浏览文件 @
8cea4531
...
...
@@ -4,16 +4,21 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/join_common.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/joinDispatch.h>
#include <Interpreters/NullableUtils.h>
#include <Interpreters/DictionaryReader.h>
#include <Storages/StorageDictionary.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
...
...
@@ -21,8 +26,6 @@
#include <Core/ColumnNumbers.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace
DB
{
...
...
@@ -282,6 +285,42 @@ static KeyGetter createKeyGetter(const ColumnRawPtrs & key_columns, const Sizes
return
KeyGetter
(
key_columns
,
key_sizes
,
nullptr
);
}
class
KeyGetterForDict
{
public:
using
Mapped
=
JoinStuff
::
MappedOne
;
using
FindResult
=
ColumnsHashing
::
columns_hashing_impl
::
FindResultImpl
<
Mapped
>
;
KeyGetterForDict
(
const
ColumnRawPtrs
&
key_columns_
,
const
Sizes
&
,
void
*
)
:
key_columns
(
key_columns_
)
{}
FindResult
findKey
(
const
TableJoin
&
table_join
,
size_t
row
,
const
Arena
&
)
{
const
DictionaryReader
&
reader
=
*
table_join
.
dictionary_reader
;
if
(
!
read_result
)
{
reader
.
readKeys
(
*
key_columns
[
0
],
read_result
,
found
,
positions
);
result
.
block
=
&
read_result
;
if
(
table_join
.
forceNullableRight
())
for
(
auto
&
column
:
read_result
)
if
(
table_join
.
rightBecomeNullable
(
column
.
type
))
JoinCommon
::
convertColumnToNullable
(
column
);
}
result
.
row_num
=
positions
[
row
];
return
FindResult
(
&
result
,
found
[
row
]);
}
private:
const
ColumnRawPtrs
&
key_columns
;
Block
read_result
;
Mapped
result
;
ColumnVector
<
UInt8
>::
Container
found
;
std
::
vector
<
size_t
>
positions
;
};
template
<
HashJoin
::
Type
type
,
typename
Value
,
typename
Mapped
>
struct
KeyGetterForTypeImpl
;
...
...
@@ -351,7 +390,7 @@ size_t HashJoin::getTotalRowCount() const
for
(
const
auto
&
block
:
data
->
blocks
)
res
+=
block
.
rows
();
}
else
else
if
(
data
->
type
!=
Type
::
DICT
)
{
joinDispatch
(
kind
,
strictness
,
data
->
maps
,
[
&
](
auto
,
auto
,
auto
&
map
)
{
res
+=
map
.
getTotalRowCount
(
data
->
type
);
});
}
...
...
@@ -368,7 +407,7 @@ size_t HashJoin::getTotalByteCount() const
for
(
const
auto
&
block
:
data
->
blocks
)
res
+=
block
.
bytes
();
}
else
else
if
(
data
->
type
!=
Type
::
DICT
)
{
joinDispatch
(
kind
,
strictness
,
data
->
maps
,
[
&
](
auto
,
auto
,
auto
&
map
)
{
res
+=
map
.
getTotalByteCountImpl
(
data
->
type
);
});
res
+=
data
->
pool
.
size
();
...
...
@@ -400,7 +439,13 @@ void HashJoin::setSampleBlock(const Block & block)
if
(
nullable_right_side
)
JoinCommon
::
convertColumnsToNullable
(
sample_block_with_columns_to_add
);
if
(
strictness
==
ASTTableJoin
::
Strictness
::
Asof
)
if
(
table_join
->
dictionary_reader
)
{
data
->
type
=
Type
::
DICT
;
std
::
get
<
MapsOne
>
(
data
->
maps
).
create
(
Type
::
DICT
);
chooseMethod
(
key_columns
,
key_sizes
);
/// init key_sizes
}
else
if
(
strictness
==
ASTTableJoin
::
Strictness
::
Asof
)
{
if
(
kind
!=
ASTTableJoin
::
Kind
::
Left
and
kind
!=
ASTTableJoin
::
Kind
::
Inner
)
throw
Exception
(
"ASOF only supports LEFT and INNER as base joins"
,
ErrorCodes
::
NOT_IMPLEMENTED
);
...
...
@@ -526,7 +571,8 @@ namespace
switch
(
type
)
{
case
HashJoin
::
Type
::
EMPTY
:
break
;
case
HashJoin
::
Type
::
CROSS
:
break
;
/// Do nothing. We have already saved block, and it is enough.
case
HashJoin
::
Type
::
CROSS
:
break
;
/// Do nothing. We have already saved block, and it is enough.
case
HashJoin
::
Type
::
DICT
:
break
;
/// Noone should call it with Type::DICT.
#define M(TYPE) \
case HashJoin::Type::TYPE: \
...
...
@@ -598,6 +644,8 @@ bool HashJoin::addJoinedBlock(const Block & source_block, bool check_limits)
{
if
(
empty
())
throw
Exception
(
"Logical error: HashJoin was not initialized"
,
ErrorCodes
::
LOGICAL_ERROR
);
if
(
overDictionary
())
throw
Exception
(
"Logical error: insert into hash-map in HashJoin over dictionary"
,
ErrorCodes
::
LOGICAL_ERROR
);
/// There's no optimization for right side const columns. Remove constness if any.
Block
block
=
materializeBlock
(
source_block
);
...
...
@@ -930,8 +978,7 @@ IColumn::Filter switchJoinRightColumns(const Maps & maps_, AddedColumns & added_
case HashJoin::Type::TYPE: \
return joinRightColumnsSwitchNullability<KIND, STRICTNESS,\
typename KeyGetterForType<HashJoin::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, added_columns, null_map);\
break;
*maps_.TYPE, added_columns, null_map);
APPLY_FOR_JOIN_VARIANTS
(
M
)
#undef M
...
...
@@ -940,6 +987,20 @@ IColumn::Filter switchJoinRightColumns(const Maps & maps_, AddedColumns & added_
}
}
template
<
ASTTableJoin
::
Kind
KIND
,
ASTTableJoin
::
Strictness
STRICTNESS
>
IColumn
::
Filter
dictionaryJoinRightColumns
(
const
TableJoin
&
table_join
,
AddedColumns
&
added_columns
,
const
ConstNullMapPtr
&
null_map
)
{
if
constexpr
(
KIND
==
ASTTableJoin
::
Kind
::
Left
&&
(
STRICTNESS
==
ASTTableJoin
::
Strictness
::
Any
||
STRICTNESS
==
ASTTableJoin
::
Strictness
::
Semi
||
STRICTNESS
==
ASTTableJoin
::
Strictness
::
Anti
))
{
return
joinRightColumnsSwitchNullability
<
KIND
,
STRICTNESS
,
KeyGetterForDict
>
(
table_join
,
added_columns
,
null_map
);
}
throw
Exception
(
"Logical error: wrong JOIN combination"
,
ErrorCodes
::
LOGICAL_ERROR
);
}
}
/// nameless
...
...
@@ -1000,7 +1061,9 @@ void HashJoin::joinBlockImpl(
bool
has_required_right_keys
=
(
required_right_keys
.
columns
()
!=
0
);
added_columns
.
need_filter
=
need_filter
||
has_required_right_keys
;
IColumn
::
Filter
row_filter
=
switchJoinRightColumns
<
KIND
,
STRICTNESS
>
(
maps_
,
added_columns
,
data
->
type
,
null_map
);
IColumn
::
Filter
row_filter
=
overDictionary
()
?
dictionaryJoinRightColumns
<
KIND
,
STRICTNESS
>
(
*
table_join
,
added_columns
,
null_map
)
:
switchJoinRightColumns
<
KIND
,
STRICTNESS
>
(
maps_
,
added_columns
,
data
->
type
,
null_map
);
for
(
size_t
i
=
0
;
i
<
added_columns
.
size
();
++
i
)
block
.
insert
(
added_columns
.
moveColumn
(
i
));
...
...
@@ -1211,7 +1274,36 @@ void HashJoin::joinBlock(Block & block, ExtraBlockPtr & not_processed)
const
Names
&
key_names_left
=
table_join
->
keyNamesLeft
();
JoinCommon
::
checkTypesOfKeys
(
block
,
key_names_left
,
right_table_keys
,
key_names_right
);
if
(
joinDispatch
(
kind
,
strictness
,
data
->
maps
,
[
&
](
auto
kind_
,
auto
strictness_
,
auto
&
map
)
if
(
overDictionary
())
{
using
Kind
=
ASTTableJoin
::
Kind
;
using
Strictness
=
ASTTableJoin
::
Strictness
;
auto
&
map
=
std
::
get
<
MapsOne
>
(
data
->
maps
);
if
(
kind
==
Kind
::
Left
)
{
switch
(
strictness
)
{
case
Strictness
::
Any
:
case
Strictness
::
All
:
joinBlockImpl
<
Kind
::
Left
,
Strictness
::
Any
>
(
block
,
key_names_left
,
sample_block_with_columns_to_add
,
map
);
break
;
case
Strictness
::
Semi
:
joinBlockImpl
<
Kind
::
Left
,
Strictness
::
Semi
>
(
block
,
key_names_left
,
sample_block_with_columns_to_add
,
map
);
break
;
case
Strictness
::
Anti
:
joinBlockImpl
<
Kind
::
Left
,
Strictness
::
Anti
>
(
block
,
key_names_left
,
sample_block_with_columns_to_add
,
map
);
break
;
default:
throw
Exception
(
"Logical error: wrong JOIN combination"
,
ErrorCodes
::
LOGICAL_ERROR
);
}
}
else
if
(
kind
==
Kind
::
Inner
&&
strictness
==
Strictness
::
All
)
joinBlockImpl
<
Kind
::
Left
,
Strictness
::
Semi
>
(
block
,
key_names_left
,
sample_block_with_columns_to_add
,
map
);
else
throw
Exception
(
"Logical error: wrong JOIN combination"
,
ErrorCodes
::
LOGICAL_ERROR
);
}
else
if
(
joinDispatch
(
kind
,
strictness
,
data
->
maps
,
[
&
](
auto
kind_
,
auto
strictness_
,
auto
&
map
)
{
joinBlockImpl
<
kind_
,
strictness_
>
(
block
,
key_names_left
,
sample_block_with_columns_to_add
,
map
);
}))
...
...
src/Interpreters/HashJoin.h
浏览文件 @
8cea4531
...
...
@@ -27,6 +27,7 @@ namespace DB
{
class
TableJoin
;
class
DictionaryReader
;
namespace
JoinStuff
{
...
...
@@ -148,7 +149,8 @@ class HashJoin : public IJoin
public:
HashJoin
(
std
::
shared_ptr
<
TableJoin
>
table_join_
,
const
Block
&
right_sample_block
,
bool
any_take_last_row_
=
false
);
bool
empty
()
{
return
data
->
type
==
Type
::
EMPTY
;
}
bool
empty
()
const
{
return
data
->
type
==
Type
::
EMPTY
;
}
bool
overDictionary
()
const
{
return
data
->
type
==
Type
::
DICT
;
}
/** Add block of data from right hand of JOIN to the map.
* Returns false, if some limit was exceeded and you should not insert more data.
...
...
@@ -186,7 +188,7 @@ public:
/// Sum size in bytes of all buffers, used for JOIN maps and for all memory pools.
size_t
getTotalByteCount
()
const
final
;
bool
alwaysReturnsEmptySet
()
const
final
{
return
isInnerOrRight
(
getKind
())
&&
data
->
empty
;
}
bool
alwaysReturnsEmptySet
()
const
final
{
return
isInnerOrRight
(
getKind
())
&&
data
->
empty
&&
!
overDictionary
()
;
}
ASTTableJoin
::
Kind
getKind
()
const
{
return
kind
;
}
ASTTableJoin
::
Strictness
getStrictness
()
const
{
return
strictness
;
}
...
...
@@ -220,12 +222,12 @@ public:
{
EMPTY
,
CROSS
,
DICT
,
#define M(NAME) NAME,
APPLY_FOR_JOIN_VARIANTS
(
M
)
#undef M
};
/** Different data structures, that are used to perform JOIN.
*/
template
<
typename
Mapped
>
...
...
@@ -247,6 +249,7 @@ public:
{
case
Type
::
EMPTY
:
break
;
case
Type
::
CROSS
:
break
;
case
Type
::
DICT
:
break
;
#define M(NAME) \
case Type::NAME: NAME = std::make_unique<typename decltype(NAME)::element_type>(); break;
...
...
@@ -261,6 +264,7 @@ public:
{
case
Type
::
EMPTY
:
return
0
;
case
Type
::
CROSS
:
return
0
;
case
Type
::
DICT
:
return
0
;
#define M(NAME) \
case Type::NAME: return NAME ? NAME->size() : 0;
...
...
@@ -277,6 +281,7 @@ public:
{
case
Type
::
EMPTY
:
return
0
;
case
Type
::
CROSS
:
return
0
;
case
Type
::
DICT
:
return
0
;
#define M(NAME) \
case Type::NAME: return NAME ? NAME->getBufferSizeInBytes() : 0;
...
...
src/Interpreters/InterpreterSelectQuery.cpp
浏览文件 @
8cea4531
...
...
@@ -305,12 +305,13 @@ InterpreterSelectQuery::InterpreterSelectQuery(
max_streams
=
settings
.
max_threads
;
ASTSelectQuery
&
query
=
getSelectQuery
();
std
::
shared_ptr
<
TableJoin
>
table_join
=
joined_tables
.
makeTableJoin
(
query
);
auto
analyze
=
[
&
]
(
bool
try_move_to_prewhere
=
true
)
{
syntax_analyzer_result
=
SyntaxAnalyzer
(
*
context
).
analyzeSelect
(
query_ptr
,
SyntaxAnalyzerResult
(
source_header
.
getNamesAndTypesList
(),
storage
),
options
,
joined_tables
.
tablesWithColumns
(),
required_result_column_names
);
options
,
joined_tables
.
tablesWithColumns
(),
required_result_column_names
,
table_join
);
/// Save scalar sub queries's results in the query context
if
(
context
->
hasQueryContext
())
...
...
src/Interpreters/JoinedTables.cpp
浏览文件 @
8cea4531
#include <Interpreters/JoinedTables.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/Context.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/InJoinSubqueriesPreprocessor.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/StorageValues.h>
#include <Storages/StorageJoin.h>
#include <Storages/StorageDictionary.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Parsers/ParserTablesInSelectQuery.h>
#include <Parsers/parseQuery.h>
namespace
DB
{
...
...
@@ -26,6 +34,34 @@ namespace ErrorCodes
namespace
{
void
replaceJoinedTable
(
const
ASTSelectQuery
&
select_query
)
{
const
ASTTablesInSelectQueryElement
*
join
=
select_query
.
join
();
if
(
!
join
||
!
join
->
table_expression
)
return
;
/// TODO: Push down for CROSS JOIN is not OK [disabled]
const
auto
&
table_join
=
join
->
table_join
->
as
<
ASTTableJoin
&>
();
if
(
table_join
.
kind
==
ASTTableJoin
::
Kind
::
Cross
)
return
;
auto
&
table_expr
=
join
->
table_expression
->
as
<
ASTTableExpression
&>
();
if
(
table_expr
.
database_and_table_name
)
{
const
auto
&
table_id
=
table_expr
.
database_and_table_name
->
as
<
ASTIdentifier
&>
();
String
expr
=
"(select * from "
+
table_id
.
name
+
") as "
+
table_id
.
shortName
();
// FIXME: since the expression "a as b" exposes both "a" and "b" names, which is not equivalent to "(select * from a) as b",
// we can't replace aliased tables.
// FIXME: long table names include database name, which we can't save within alias.
if
(
table_id
.
alias
.
empty
()
&&
table_id
.
isShort
())
{
ParserTableExpression
parser
;
table_expr
=
parseQuery
(
parser
,
expr
,
0
,
DBMS_DEFAULT_MAX_PARSER_DEPTH
)
->
as
<
ASTTableExpression
&>
();
}
}
}
template
<
typename
T
>
void
checkTablesWithColumns
(
const
std
::
vector
<
T
>
&
tables_with_columns
,
const
Context
&
context
)
{
...
...
@@ -209,4 +245,35 @@ void JoinedTables::rewriteDistributedInAndJoins(ASTPtr & query)
}
}
std
::
shared_ptr
<
TableJoin
>
JoinedTables
::
makeTableJoin
(
const
ASTSelectQuery
&
select_query
)
{
if
(
tables_with_columns
.
size
()
<
2
)
return
{};
auto
settings
=
context
.
getSettingsRef
();
auto
table_join
=
std
::
make_shared
<
TableJoin
>
(
settings
,
context
.
getTemporaryVolume
());
const
ASTTablesInSelectQueryElement
*
ast_join
=
select_query
.
join
();
const
auto
&
table_to_join
=
ast_join
->
table_expression
->
as
<
ASTTableExpression
&>
();
/// TODO This syntax does not support specifying a database name.
if
(
table_to_join
.
database_and_table_name
)
{
auto
joined_table_id
=
context
.
resolveStorageID
(
table_to_join
.
database_and_table_name
);
StoragePtr
table
=
DatabaseCatalog
::
instance
().
tryGetTable
(
joined_table_id
);
if
(
table
)
{
if
(
dynamic_cast
<
StorageJoin
*>
(
table
.
get
())
||
dynamic_cast
<
StorageDictionary
*>
(
table
.
get
()))
table_join
->
joined_storage
=
table
;
}
}
if
(
!
table_join
->
joined_storage
&&
settings
.
enable_optimize_predicate_expression
)
replaceJoinedTable
(
select_query
);
return
table_join
;
}
}
src/Interpreters/JoinedTables.h
浏览文件 @
8cea4531
...
...
@@ -10,6 +10,7 @@ namespace DB
class
ASTSelectQuery
;
class
Context
;
class
TableJoin
;
struct
SelectQueryOptions
;
/// Joined tables' columns resolver.
...
...
@@ -30,6 +31,7 @@ public:
/// Make fake tables_with_columns[0] in case we have predefined input in InterpreterSelectQuery
void
makeFakeTable
(
StoragePtr
storage
,
const
Block
&
source_header
);
std
::
shared_ptr
<
TableJoin
>
makeTableJoin
(
const
ASTSelectQuery
&
select_query
);
const
std
::
vector
<
TableWithColumnNamesAndTypes
>
&
tablesWithColumns
()
const
{
return
tables_with_columns
;
}
...
...
src/Interpreters/SyntaxAnalyzer.cpp
浏览文件 @
8cea4531
...
...
@@ -29,8 +29,6 @@
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ParserTablesInSelectQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Functions/FunctionFactory.h>
...
...
@@ -547,34 +545,6 @@ void collectJoinedColumns(TableJoin & analyzed_join, const ASTSelectQuery & sele
}
}
void
replaceJoinedTable
(
const
ASTSelectQuery
&
select_query
)
{
const
ASTTablesInSelectQueryElement
*
join
=
select_query
.
join
();
if
(
!
join
||
!
join
->
table_expression
)
return
;
/// TODO: Push down for CROSS JOIN is not OK [disabled]
const
auto
&
table_join
=
join
->
table_join
->
as
<
ASTTableJoin
&>
();
if
(
table_join
.
kind
==
ASTTableJoin
::
Kind
::
Cross
)
return
;
auto
&
table_expr
=
join
->
table_expression
->
as
<
ASTTableExpression
&>
();
if
(
table_expr
.
database_and_table_name
)
{
const
auto
&
table_id
=
table_expr
.
database_and_table_name
->
as
<
ASTIdentifier
&>
();
String
expr
=
"(select * from "
+
table_id
.
name
+
") as "
+
table_id
.
shortName
();
// FIXME: since the expression "a as b" exposes both "a" and "b" names, which is not equivalent to "(select * from a) as b",
// we can't replace aliased tables.
// FIXME: long table names include database name, which we can't save within alias.
if
(
table_id
.
alias
.
empty
()
&&
table_id
.
isShort
())
{
ParserTableExpression
parser
;
table_expr
=
parseQuery
(
parser
,
expr
,
0
,
DBMS_DEFAULT_MAX_PARSER_DEPTH
)
->
as
<
ASTTableExpression
&>
();
}
}
}
std
::
vector
<
const
ASTFunction
*>
getAggregates
(
ASTPtr
&
query
,
const
ASTSelectQuery
&
select_query
)
{
/// There can not be aggregate functions inside the WHERE and PREWHERE.
...
...
@@ -781,7 +751,8 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect(
SyntaxAnalyzerResult
&&
result
,
const
SelectQueryOptions
&
select_options
,
const
std
::
vector
<
TableWithColumnNamesAndTypes
>
&
tables_with_columns
,
const
Names
&
required_result_columns
)
const
const
Names
&
required_result_columns
,
std
::
shared_ptr
<
TableJoin
>
table_join
)
const
{
auto
*
select_query
=
query
->
as
<
ASTSelectQuery
>
();
if
(
!
select_query
)
...
...
@@ -793,14 +764,13 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect(
const
auto
&
settings
=
context
.
getSettingsRef
();
const
NameSet
&
source_columns_set
=
result
.
source_columns_set
;
result
.
analyzed_join
=
std
::
make_shared
<
TableJoin
>
(
settings
,
context
.
getTemporaryVolume
());
result
.
analyzed_join
=
table_join
;
if
(
!
result
.
analyzed_join
)
/// ExpressionAnalyzer expects some not empty object here
result
.
analyzed_join
=
std
::
make_shared
<
TableJoin
>
();
if
(
remove_duplicates
)
renameDuplicatedColumns
(
select_query
);
if
(
settings
.
enable_optimize_predicate_expression
)
replaceJoinedTable
(
*
select_query
);
/// TODO: Remove unneeded conversion
std
::
vector
<
TableWithColumnNames
>
tables_with_column_names
;
tables_with_column_names
.
reserve
(
tables_with_columns
.
size
());
...
...
src/Interpreters/SyntaxAnalyzer.h
浏览文件 @
8cea4531
...
...
@@ -94,7 +94,8 @@ public:
SyntaxAnalyzerResult
&&
result
,
const
SelectQueryOptions
&
select_options
=
{},
const
std
::
vector
<
TableWithColumnNamesAndTypes
>
&
tables_with_columns
=
{},
const
Names
&
required_result_columns
=
{})
const
;
const
Names
&
required_result_columns
=
{},
std
::
shared_ptr
<
TableJoin
>
table_join
=
{})
const
;
private:
const
Context
&
context
;
...
...
src/Interpreters/TableJoin.cpp
浏览文件 @
8cea4531
...
...
@@ -159,22 +159,26 @@ NamesWithAliases TableJoin::getRequiredColumns(const Block & sample, const Names
return
getNamesWithAliases
(
required_columns
);
}
bool
TableJoin
::
leftBecomeNullable
(
const
DataTypePtr
&
column_type
)
const
{
return
forceNullableLeft
()
&&
column_type
->
canBeInsideNullable
();
}
bool
TableJoin
::
rightBecomeNullable
(
const
DataTypePtr
&
column_type
)
const
{
return
forceNullableRight
()
&&
column_type
->
canBeInsideNullable
();
}
void
TableJoin
::
addJoinedColumn
(
const
NameAndTypePair
&
joined_column
)
{
if
(
join_use_nulls
&&
isLeftOrFull
(
table_join
.
kind
))
{
auto
type
=
joined_column
.
type
->
canBeInsideNullable
()
?
makeNullable
(
joined_column
.
type
)
:
joined_column
.
type
;
columns_added_by_join
.
emplace_back
(
NameAndTypePair
(
joined_column
.
name
,
std
::
move
(
type
)));
}
if
(
rightBecomeNullable
(
joined_column
.
type
))
columns_added_by_join
.
emplace_back
(
NameAndTypePair
(
joined_column
.
name
,
makeNullable
(
joined_column
.
type
)));
else
columns_added_by_join
.
push_back
(
joined_column
);
}
void
TableJoin
::
addJoinedColumnsAndCorrectNullability
(
Block
&
sample_block
)
const
{
bool
right_or_full_join
=
isRightOrFull
(
table_join
.
kind
);
bool
left_or_full_join
=
isLeftOrFull
(
table_join
.
kind
);
for
(
auto
&
col
:
sample_block
)
{
/// Materialize column.
...
...
@@ -183,9 +187,7 @@ void TableJoin::addJoinedColumnsAndCorrectNullability(Block & sample_block) cons
if
(
col
.
column
)
col
.
column
=
nullptr
;
bool
make_nullable
=
join_use_nulls
&&
right_or_full_join
;
if
(
make_nullable
&&
col
.
type
->
canBeInsideNullable
())
if
(
leftBecomeNullable
(
col
.
type
))
col
.
type
=
makeNullable
(
col
.
type
);
}
...
...
@@ -193,9 +195,7 @@ void TableJoin::addJoinedColumnsAndCorrectNullability(Block & sample_block) cons
{
auto
res_type
=
col
.
type
;
bool
make_nullable
=
join_use_nulls
&&
left_or_full_join
;
if
(
make_nullable
&&
res_type
->
canBeInsideNullable
())
if
(
rightBecomeNullable
(
res_type
))
res_type
=
makeNullable
(
res_type
);
sample_block
.
insert
(
ColumnWithTypeAndName
(
nullptr
,
res_type
,
col
.
name
));
...
...
@@ -242,4 +242,31 @@ bool TableJoin::allowMergeJoin() const
return
allow_merge_join
;
}
bool
TableJoin
::
allowDictJoin
(
const
String
&
dict_key
,
const
Block
&
sample_block
,
Names
&
names
,
NamesAndTypesList
&
result_columns
)
const
{
/// Support ALL INNER, [ANY | ALL | SEMI | ANTI] LEFT
if
(
!
isLeft
(
kind
())
&&
!
(
isInner
(
kind
())
&&
strictness
()
==
ASTTableJoin
::
Strictness
::
All
))
return
false
;
const
Names
&
right_keys
=
keyNamesRight
();
if
(
right_keys
.
size
()
!=
1
)
return
false
;
for
(
auto
&
col
:
sample_block
)
{
String
original
=
original_names
.
find
(
col
.
name
)
->
second
;
if
(
col
.
name
==
right_keys
[
0
])
{
if
(
original
!=
dict_key
)
return
false
;
/// JOIN key != Dictionary key
continue
;
/// do not extract key column
}
names
.
push_back
(
original
);
result_columns
.
push_back
({
col
.
name
,
col
.
type
});
}
return
true
;
}
}
src/Interpreters/TableJoin.h
浏览文件 @
8cea4531
...
...
@@ -8,6 +8,7 @@
#include <Interpreters/asof.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <Storages/IStorage_fwd.h>
#include <utility>
#include <memory>
...
...
@@ -19,6 +20,7 @@ class Context;
class
ASTSelectQuery
;
struct
DatabaseAndTableWithAlias
;
class
Block
;
class
DictionaryReader
;
struct
Settings
;
...
...
@@ -42,10 +44,10 @@ class TableJoin
friend
class
SyntaxAnalyzer
;
const
SizeLimits
size_limits
;
const
size_t
default_max_bytes
;
const
bool
join_use_nulls
;
const
size_t
default_max_bytes
=
0
;
const
bool
join_use_nulls
=
false
;
const
size_t
max_joined_block_rows
=
0
;
JoinAlgorithm
join_algorithm
;
JoinAlgorithm
join_algorithm
=
JoinAlgorithm
::
AUTO
;
const
bool
partial_merge_join_optimizations
=
false
;
const
size_t
partial_merge_join_rows_in_right_blocks
=
0
;
...
...
@@ -69,6 +71,7 @@ class TableJoin
VolumePtr
tmp_volume
;
public:
TableJoin
()
=
default
;
TableJoin
(
const
Settings
&
,
VolumePtr
tmp_volume
);
/// for StorageJoin
...
...
@@ -84,12 +87,16 @@ public:
table_join
.
strictness
=
strictness
;
}
StoragePtr
joined_storage
;
std
::
shared_ptr
<
DictionaryReader
>
dictionary_reader
;
ASTTableJoin
::
Kind
kind
()
const
{
return
table_join
.
kind
;
}
ASTTableJoin
::
Strictness
strictness
()
const
{
return
table_join
.
strictness
;
}
bool
sameStrictnessAndKind
(
ASTTableJoin
::
Strictness
,
ASTTableJoin
::
Kind
)
const
;
const
SizeLimits
&
sizeLimits
()
const
{
return
size_limits
;
}
VolumePtr
getTemporaryVolume
()
{
return
tmp_volume
;
}
bool
allowMergeJoin
()
const
;
bool
allowDictJoin
(
const
String
&
dict_key
,
const
Block
&
sample_block
,
Names
&
,
NamesAndTypesList
&
)
const
;
bool
preferMergeJoin
()
const
{
return
join_algorithm
==
JoinAlgorithm
::
PREFER_PARTIAL_MERGE
;
}
bool
forceMergeJoin
()
const
{
return
join_algorithm
==
JoinAlgorithm
::
PARTIAL_MERGE
;
}
bool
forceHashJoin
()
const
{
return
join_algorithm
==
JoinAlgorithm
::
HASH
;
}
...
...
@@ -115,6 +122,8 @@ public:
size_t
rightKeyInclusion
(
const
String
&
name
)
const
;
NameSet
requiredRightKeys
()
const
;
bool
leftBecomeNullable
(
const
DataTypePtr
&
column_type
)
const
;
bool
rightBecomeNullable
(
const
DataTypePtr
&
column_type
)
const
;
void
addJoinedColumn
(
const
NameAndTypePair
&
joined_column
);
void
addJoinedColumnsAndCorrectNullability
(
Block
&
sample_block
)
const
;
...
...
src/Storages/StorageDictionary.h
浏览文件 @
8cea4531
...
...
@@ -26,6 +26,8 @@ public:
static
NamesAndTypesList
getNamesAndTypes
(
const
DictionaryStructure
&
dictionary_structure
);
static
String
generateNamesAndTypesDescription
(
const
NamesAndTypesList
&
list
);
const
String
&
dictionaryName
()
const
{
return
dictionary_name
;
}
private:
String
dictionary_name
;
...
...
tests/queries/0_stateless/00561_storage_join.sql
浏览文件 @
8cea4531
SET
any_join_distinct_right_table_keys
=
1
;
drop
table
IF
EXISTS
joinbug
;
CREATE
TABLE
joinbug
(
...
...
@@ -21,7 +19,7 @@ CREATE TABLE joinbug_join (
val
UInt64
,
val2
Int32
,
created
UInt64
)
ENGINE
=
Join
(
ANY
,
INNER
,
id2
);
)
ENGINE
=
Join
(
SEMI
,
LEFT
,
id2
);
insert
into
joinbug_join
(
id
,
id2
,
val
,
val2
,
created
)
select
id
,
id2
,
val
,
val2
,
created
...
...
@@ -36,7 +34,7 @@ select id, id2, val, val2, created
from
(
SELECT
toUInt64
(
arrayJoin
(
range
(
50
)))
AS
id2
)
js1
ANY
INNER
JOIN
joinbug_join
using
id2
;
SEMI
LEFT
JOIN
joinbug_join
using
id2
;
DROP
TABLE
joinbug
;
DROP
TABLE
joinbug_join
;
tests/queries/0_stateless/01115_join_with_dictionary.reference
0 → 100644
浏览文件 @
8cea4531
flat: left on
0 0 0 0 0
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 0 0 0
flat: left
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 0 0
flat: any left
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 0 0
flat: semi left
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
flat: anti left
4 0 0
flat: inner
0 0 0 0
1 1 1 1
flat: inner on
0 0 0 0 0
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
hashed: left on
0 0 0 0 0
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
4 \N \N \N \N
hashed: left
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 \N \N \N
hashed: any left
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
4 \N \N \N
hashed: semi left
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
hashed: anti left
4 \N \N \N
hashed: inner
0 0 0 0
1 1 1 1
hashed: inner on
0 0 0 0 0
1 1 1 1 1
2 2 2 2 2
3 3 3 3 3
complex_cache (smoke)
0 \N \N \N \N
1 \N \N \N \N
2 \N \N \N \N
3 \N \N \N \N
4 \N \N \N \N
not optimized (smoke)
0 0 0 0
1 1 1 1
2 2 2 2
3 3 3 3
-
0 0 0 0 0
1 1 1 1 1
\N 2 2 2 2
\N 3 3 3 3
-
2 2 2 2
3 3 3 3
4 \N \N \N
5 \N \N \N
\N 0 0 0
\N 1 1 1
-
0 0 0 0
1 1 1 1
-
0 0 0 0
1 1 1 1
3 3 3 3
2 2 2 2
-
0 0 0 0
1 1 1 1
-
3 3 3 3
2 2 2 2
tests/queries/0_stateless/01115_join_with_dictionary.sql
0 → 100644
浏览文件 @
8cea4531
SET
send_logs_level
=
'none'
;
DROP
DATABASE
IF
EXISTS
db_01115
;
CREATE
DATABASE
db_01115
Engine
=
Ordinary
;
USE
db_01115
;
DROP
DICTIONARY
IF
EXISTS
dict_flat
;
DROP
DICTIONARY
IF
EXISTS
dict_hashed
;
DROP
DICTIONARY
IF
EXISTS
dict_complex_cache
;
CREATE
TABLE
t1
(
key
UInt64
,
a
UInt8
,
b
String
,
c
Float64
)
ENGINE
=
MergeTree
()
ORDER
BY
key
;
INSERT
INTO
t1
SELECT
number
,
number
,
toString
(
number
),
number
from
numbers
(
4
);
CREATE
DICTIONARY
dict_flat
(
key
UInt64
DEFAULT
0
,
a
UInt8
DEFAULT
42
,
b
String
DEFAULT
'x'
,
c
Float64
DEFAULT
42
.
0
)
PRIMARY
KEY
key
SOURCE
(
CLICKHOUSE
(
HOST
'localhost'
PORT
9000
USER
'default'
TABLE
't1'
PASSWORD
''
DB
'db_01115'
))
LIFETIME
(
MIN
1
MAX
10
)
LAYOUT
(
FLAT
());
CREATE
DICTIONARY
db_01115
.
dict_hashed
(
key
UInt64
DEFAULT
0
,
a
UInt8
DEFAULT
42
,
b
String
DEFAULT
'x'
,
c
Float64
DEFAULT
42
.
0
)
PRIMARY
KEY
key
SOURCE
(
CLICKHOUSE
(
HOST
'localhost'
PORT
9000
USER
'default'
TABLE
't1'
DB
'db_01115'
))
LIFETIME
(
MIN
1
MAX
10
)
LAYOUT
(
HASHED
());
CREATE
DICTIONARY
dict_complex_cache
(
key
UInt64
DEFAULT
0
,
a
UInt8
DEFAULT
42
,
b
String
DEFAULT
'x'
,
c
Float64
DEFAULT
42
.
0
)
PRIMARY
KEY
key
,
b
SOURCE
(
CLICKHOUSE
(
HOST
'localhost'
PORT
9000
USER
'default'
TABLE
't1'
DB
'db_01115'
))
LIFETIME
(
MIN
1
MAX
10
)
LAYOUT
(
COMPLEX_KEY_CACHE
(
SIZE_IN_CELLS
1
));
SET
join_use_nulls
=
0
;
SELECT
'flat: left on'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
LEFT
JOIN
dict_flat
d
ON
s1
.
key
=
d
.
key
ORDER
BY
s1
.
key
;
SELECT
'flat: left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
LEFT
JOIN
dict_flat
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'flat: any left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
ANY
LEFT
JOIN
dict_flat
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'flat: semi left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
SEMI
JOIN
dict_flat
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'flat: anti left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
ANTI
JOIN
dict_flat
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'flat: inner'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
JOIN
dict_flat
d
USING
(
key
);
SELECT
'flat: inner on'
;
SELECT
*
FROM
(
SELECT
number
AS
k
FROM
numbers
(
100
))
s1
JOIN
dict_flat
d
ON
k
=
key
ORDER
BY
k
;
SET
join_use_nulls
=
1
;
SELECT
'hashed: left on'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
LEFT
JOIN
dict_hashed
d
ON
s1
.
key
=
d
.
key
ORDER
BY
s1
.
key
;
SELECT
'hashed: left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
LEFT
JOIN
dict_hashed
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'hashed: any left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
ANY
LEFT
JOIN
dict_hashed
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'hashed: semi left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
SEMI
JOIN
dict_hashed
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'hashed: anti left'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
ANTI
JOIN
dict_hashed
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'hashed: inner'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
JOIN
dict_hashed
d
USING
(
key
);
SELECT
'hashed: inner on'
;
SELECT
*
FROM
(
SELECT
number
AS
k
FROM
numbers
(
100
))
s1
JOIN
dict_hashed
d
ON
k
=
key
ORDER
BY
k
;
SELECT
'complex_cache (smoke)'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
5
))
s1
LEFT
JOIN
dict_complex_cache
d
ON
s1
.
key
=
d
.
key
ORDER
BY
s1
.
key
;
SELECT
'not optimized (smoke)'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
RIGHT
JOIN
dict_flat
d
USING
(
key
)
ORDER
BY
key
;
SELECT
'-'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
RIGHT
JOIN
dict_flat
d
ON
s1
.
key
=
d
.
key
ORDER
BY
d
.
key
;
SELECT
'-'
;
SELECT
*
FROM
(
SELECT
number
+
2
AS
key
FROM
numbers
(
4
))
s1
FULL
JOIN
dict_flat
d
USING
(
key
)
ORDER
BY
s1
.
key
,
d
.
key
;
SELECT
'-'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
ANY
INNER
JOIN
dict_flat
d
USING
(
key
);
SELECT
'-'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
ANY
RIGHT
JOIN
dict_flat
d
USING
(
key
);
SELECT
'-'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
SEMI
RIGHT
JOIN
dict_flat
d
USING
(
key
);
SELECT
'-'
;
SELECT
*
FROM
(
SELECT
number
AS
key
FROM
numbers
(
2
))
s1
ANTI
RIGHT
JOIN
dict_flat
d
USING
(
key
);
DROP
DICTIONARY
dict_flat
;
DROP
DICTIONARY
dict_hashed
;
DROP
DICTIONARY
dict_complex_cache
;
DROP
TABLE
t1
;
DROP
DATABASE
IF
EXISTS
db_01115
;
tests/queries/1_stateful/00065_loyalty_with_storage_join.sql
浏览文件 @
8cea4531
SET
any_join_distinct_right_table_keys
=
1
;
USE
test
;
DROP
TABLE
IF
EXISTS
join
;
CREATE
TABLE
join
(
UserID
UInt64
,
loyalty
Int8
)
ENGINE
=
Join
(
ANY
,
INNER
,
UserID
);
CREATE
TABLE
join
(
UserID
UInt64
,
loyalty
Int8
)
ENGINE
=
Join
(
SEMI
,
LEFT
,
UserID
);
INSERT
INTO
join
SELECT
UserID
,
toInt8
(
if
((
sum
(
SearchEngineID
=
2
)
AS
yandex
)
>
(
sum
(
SearchEngineID
=
3
)
AS
google
),
yandex
/
(
yandex
+
google
),
-
google
/
(
yandex
+
google
))
*
10
)
AS
loyalty
yandex
/
(
yandex
+
google
),
-
google
/
(
yandex
+
google
))
*
10
)
AS
loyalty
FROM
hits
WHERE
(
SearchEngineID
=
2
)
OR
(
SearchEngineID
=
3
)
GROUP
BY
UserID
...
...
@@ -19,17 +17,17 @@ HAVING (yandex + google) > 10;
SELECT
loyalty
,
count
()
FROM
hits
ANY
INNER
JOIN
join
USING
UserID
FROM
hits
SEMI
LEFT
JOIN
join
USING
UserID
GROUP
BY
loyalty
ORDER
BY
loyalty
ASC
;
DETACH
TABLE
join
;
ATTACH
TABLE
join
(
UserID
UInt64
,
loyalty
Int8
)
ENGINE
=
Join
(
ANY
,
INNER
,
UserID
);
ATTACH
TABLE
join
(
UserID
UInt64
,
loyalty
Int8
)
ENGINE
=
Join
(
SEMI
,
LEFT
,
UserID
);
SELECT
loyalty
,
count
()
FROM
hits
ANY
INNER
JOIN
join
USING
UserID
FROM
hits
SEMI
LEFT
JOIN
join
USING
UserID
GROUP
BY
loyalty
ORDER
BY
loyalty
ASC
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录