未验证 提交 4782c117 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #6691 from yandex/weiqxu-master

Merging "check free space when use external sort/aggerator"
......@@ -347,6 +347,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
......
......@@ -4,6 +4,7 @@
#include <DataStreams/copyData.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <common/config_common.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
......@@ -21,10 +22,11 @@ namespace DB
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_),
min_free_disk_space(min_free_disk_space_)
{
children.push_back(input);
header = children.at(0)->getHeader();
......@@ -77,6 +79,12 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();
......
......@@ -18,6 +18,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
* If data to sort is too much, could use external sorting, with temporary files.
*/
......@@ -73,7 +77,8 @@ public:
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
......@@ -93,6 +98,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");
......
......@@ -24,6 +24,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <common/demangle.h>
#include <common/config_common.h>
namespace ProfileEvents
......@@ -639,6 +640,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
#if !UNBUNDLED
auto free_space = Poco::File(params.tmp_path).freeSpace();
if (current_memory_usage + params.min_free_disk_space > free_space)
throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
writeToTemporaryFile(result);
}
......
......@@ -39,6 +39,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int NOT_ENOUGH_SPACE;
}
class IBlockOutputStream;
......@@ -796,6 +797,7 @@ public:
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
const size_t min_free_disk_space;
Params(
const Block & src_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
......@@ -803,21 +805,23 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_, size_t max_threads_)
const std::string & tmp_path_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_), max_threads(max_threads_)
tmp_path(tmp_path_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0)
{
intermediate_header = intermediate_header_;
}
......
......@@ -1657,7 +1657,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
......@@ -1723,7 +1723,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
......@@ -1943,7 +1943,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
if (modificator == Modificator::ROLLUP)
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
......@@ -1972,7 +1972,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
......@@ -2073,7 +2073,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
}
}
......@@ -2111,7 +2111,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoP
return std::make_shared<MergeSortingTransform>(
header, order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
});
}
......
......@@ -79,7 +79,7 @@ int main(int argc, char ** argv)
Aggregator::Params params(
stream->getHeader(), {0, 1}, aggregate_descriptions,
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1);
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0);
Aggregator aggregator(params);
......
......@@ -5,6 +5,7 @@
#include <Common/formatReadable.h>
#include <Common/ProfileEvents.h>
#include <common/config_common.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
......@@ -236,11 +237,13 @@ MergeSortingTransform::MergeSortingTransform(
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_)
: IProcessor({header}, {header})
, description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
, min_free_disk_space(min_free_disk_space_)
{
auto & sample = inputs.front().getHeader();
......@@ -504,6 +507,12 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();
......
......@@ -14,6 +14,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
class MergeSorter;
class MergeSortingTransform : public IProcessor
......@@ -24,7 +28,8 @@ public:
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_);
~MergeSortingTransform() override;
......@@ -44,6 +49,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");
......
......@@ -229,7 +229,8 @@ try
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
1 /// max_threads
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
......@@ -301,7 +302,8 @@ try
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
1 /// max_threads
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
......
......@@ -133,7 +133,7 @@ try
SortDescription description = {{0, 1, 1}};
auto transform = std::make_shared<MergeSortingTransform>(
source->getPort().getHeader(), description,
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".");
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".", 0);
auto sink = std::make_shared<CheckSortedSink>();
connect(source->getPort(), transform->getInputs().front());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册