diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 7bb7c9e5dbacec76d6f7c2395ff8f11106f1dfab..b2a027cb5e2ec34a5a664be9baca47e3550f9f90 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -294,6 +294,7 @@ struct Settings : public SettingsCollection M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for LEFT and INNER JOINs.") \ M(SettingBool, partial_merge_join_optimisations, false, "Enable optimisations in partial merge join") \ M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size.") \ + M(SettingFloat, partial_merge_join_memory_coefficient, 0.25, "") \ \ M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \ M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \ diff --git a/dbms/src/Interpreters/AnalyzedJoin.cpp b/dbms/src/Interpreters/AnalyzedJoin.cpp index cdf047fc5e5046673395dd678a5f2348b0aa209c..25b0f7ae286967689b234cc6fb11ed56d0a6cc17 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.cpp +++ b/dbms/src/Interpreters/AnalyzedJoin.cpp @@ -24,7 +24,11 @@ namespace ErrorCodes } AnalyzedJoin::AnalyzedJoin(const Settings & settings) - : size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode}) + : size_limits(SizeLimits{settings.max_rows_in_join, + (settings.partial_merge_join ? + UInt64(settings.max_bytes_in_join * settings.partial_merge_join_memory_coefficient) : + UInt64(settings.max_bytes_in_join)), + settings.join_overflow_mode}) , join_use_nulls(settings.join_use_nulls) , partial_merge_join(settings.partial_merge_join) , partial_merge_join_optimisations(settings.partial_merge_join_optimisations) @@ -272,4 +276,11 @@ JoinPtr makeJoin(std::shared_ptr table_join, const Block & right_s return std::make_shared(table_join, right_sample_block); } +bool isMergeJoin(const JoinPtr & join) +{ + if (join) + return typeid_cast(join.get()); + return false; +} + } diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index f9d0d9d0f79ce7251b180d5a19711eeeb479cce0..a1e03f559f6d3598231810d53246a2fca7e362b2 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -119,4 +119,6 @@ public: struct ASTTableExpression; NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context); +bool isMergeJoin(const JoinPtr &); + } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 8c49cf34ca52357be0d3071af7189d33539c4fc9..49c0583622c50569b19fdbf58f456fa9c82da3be 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -1116,6 +1117,15 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS /// Applies to all sources except stream_with_non_joined_data. for (auto & stream : pipeline.streams) stream = std::make_shared(stream, expressions.before_join); + + if (isMergeJoin(expressions.before_join->getTableJoinAlgo()) && settings.partial_merge_join_optimisations) + { + /// TODO: * min(query_memoty_limit, max_bytes_in_join) + size_t bytes_in_block = settings.partial_merge_join_memory_coefficient * settings.max_bytes_in_join; + if (bytes_in_block) + for (auto & stream : pipeline.streams) + stream = std::make_shared(stream, 0, bytes_in_block); + } } if (JoinPtr join = expressions.before_join->getTableJoinAlgo())