提交 dd08f06c 编写于 作者: C chertus

squash left-hand blocks in partial merge join

上级 31f9eeb2
......@@ -294,6 +294,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for LEFT and INNER JOINs.") \
M(SettingBool, partial_merge_join_optimisations, false, "Enable optimisations in partial merge join") \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size.") \
M(SettingFloat, partial_merge_join_memory_coefficient, 0.25, "") \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
......
......@@ -24,7 +24,11 @@ namespace ErrorCodes
}
AnalyzedJoin::AnalyzedJoin(const Settings & settings)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
: size_limits(SizeLimits{settings.max_rows_in_join,
(settings.partial_merge_join ?
UInt64(settings.max_bytes_in_join * settings.partial_merge_join_memory_coefficient) :
UInt64(settings.max_bytes_in_join)),
settings.join_overflow_mode})
, join_use_nulls(settings.join_use_nulls)
, partial_merge_join(settings.partial_merge_join)
, partial_merge_join_optimisations(settings.partial_merge_join_optimisations)
......@@ -272,4 +276,11 @@ JoinPtr makeJoin(std::shared_ptr<AnalyzedJoin> table_join, const Block & right_s
return std::make_shared<Join>(table_join, right_sample_block);
}
bool isMergeJoin(const JoinPtr & join)
{
if (join)
return typeid_cast<const MergeJoin *>(join.get());
return false;
}
}
......@@ -119,4 +119,6 @@ public:
struct ASTTableExpression;
NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context);
bool isMergeJoin(const JoinPtr &);
}
......@@ -25,6 +25,7 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h>
#include <DataStreams/FillingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
......@@ -1116,6 +1117,15 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
/// Applies to all sources except stream_with_non_joined_data.
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, expressions.before_join);
if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimisations)
{
/// TODO: * min(query_memoty_limit, max_bytes_in_join)
size_t bytes_in_block = settings.partial_merge_join_memory_coefficient * settings.max_bytes_in_join;
if (bytes_in_block)
for (auto & stream : pipeline.streams)
stream = std::make_shared<SquashingBlockInputStream>(stream, 0, bytes_in_block);
}
}
if (JoinPtr join = expressions.before_join->getTableJoinAlgo())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册