未验证 提交 f995ef97 编写于 作者: A Amos Bird

Balanced reading from JBOD

上级 30325689
......@@ -169,6 +169,8 @@ class IColumn;
M(Milliseconds, read_backoff_min_interval_between_events_ms, 1000, "Settings to reduce the number of threads in case of slow reads. Do not pay attention to the event, if the previous one has passed less than a certain amount of time.", 0) \
M(UInt64, read_backoff_min_events, 2, "Settings to reduce the number of threads in case of slow reads. The number of events after which the number of threads will be reduced.", 0) \
\
M(UInt64, read_backoff_min_concurrency, 1, "Settings to try keeping the minimal number of threads in case of slow reads.", 0) \
\
M(Float, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.", 0) \
\
M(Bool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.", 0) \
......
......@@ -898,7 +898,7 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams,
sum_marks,
min_marks_for_concurrent_read,
parts,
std::move(parts),
data,
metadata_snapshot,
query_info.prewhere_info,
......
......@@ -21,7 +21,7 @@ MergeTreeReadPool::MergeTreeReadPool(
const size_t threads_,
const size_t sum_marks_,
const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_,
RangesInDataParts && parts_,
const MergeTreeData & data_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
......@@ -38,11 +38,11 @@ MergeTreeReadPool::MergeTreeReadPool(
, do_not_steal_tasks{do_not_steal_tasks_}
, predict_block_size_bytes{preferred_block_size_bytes_ > 0}
, prewhere_info{prewhere_info_}
, parts_ranges{parts_}
, parts_ranges{std::move(parts_)}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_, min_marks_for_concurrent_read_);
const auto per_part_sum_marks = fillPerPartInfo(parts_ranges, check_columns_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_);
}
......@@ -62,7 +62,24 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
return nullptr;
/// Steal task if nothing to do and it's not prohibited
const auto thread_idx = tasks_remaining_for_this_thread ? thread : *std::begin(remaining_thread_tasks);
auto thread_idx = thread;
if (!tasks_remaining_for_this_thread)
{
auto it = remaining_thread_tasks.lower_bound(backoff_state.current_threads);
// Grab the entire tasks of a thread which is killed by backoff
if (it != remaining_thread_tasks.end())
{
threads_tasks[thread] = std::move(threads_tasks[*it]);
remaining_thread_tasks.erase(it);
}
else // Try steal tasks from the next thread
{
it = remaining_thread_tasks.upper_bound(thread);
if (it == remaining_thread_tasks.end())
it = remaining_thread_tasks.begin();
thread_idx = *it;
}
}
auto & thread_tasks = threads_tasks[thread_idx];
auto & thread_task = thread_tasks.parts_and_ranges.back();
......@@ -163,7 +180,7 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::lock_guard lock(mutex);
if (backoff_state.current_threads <= 1)
if (backoff_state.current_threads <= backoff_settings.min_concurrency)
return;
size_t throughput = info.bytes_read * 1000000000 / info.nanoseconds;
......@@ -194,14 +211,14 @@ void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInf
std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
RangesInDataParts & parts, const bool check_columns)
const RangesInDataParts & parts, const bool check_columns)
{
std::vector<size_t> per_part_sum_marks;
Block sample_block = metadata_snapshot->getSampleBlock();
for (const auto i : ext::range(0, parts.size()))
{
auto & part = parts[i];
const auto & part = parts[i];
/// Read marks for every data part.
size_t sum_marks = 0;
......@@ -238,21 +255,63 @@ std::vector<size_t> MergeTreeReadPool::fillPerPartInfo(
void MergeTreeReadPool::fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read)
const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read)
{
threads_tasks.resize(threads);
if (parts.empty())
return;
struct PartInfo
{
RangesInDataPart part;
size_t sum_marks;
size_t part_idx;
};
std::map<String, std::vector<PartInfo>> parts_per_disk;
for (size_t i = 0; i < parts.size(); ++i)
{
PartInfo part_info{parts[i], per_part_sum_marks[i], i};
if (parts[i].data_part->isStoredOnDisk())
parts_per_disk[parts[i].data_part->volume->getDisk()->getName()].push_back(std::move(part_info));
else
parts_per_disk[""].push_back(std::move(part_info));
}
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
for (size_t i = 0; i < threads && !parts.empty(); ++i)
auto it = std::prev(parts_per_disk.end());
auto * current_parts = &it->second;
auto get_next_parts = [&]()
{
size_t n = parts_per_disk.size();
do
{
++it;
if (it == parts_per_disk.end())
it = parts_per_disk.begin();
current_parts = &it->second;
} while (current_parts->empty() && --n);
return !current_parts->empty();
};
auto get_current_parts = [&]()
{
if (!current_parts->empty())
return true;
return get_next_parts();
};
for (size_t i = 0; i < threads && get_next_parts(); ++i)
{
auto need_marks = min_marks_per_thread;
while (need_marks > 0 && !parts.empty())
while (need_marks > 0 && get_current_parts())
{
const auto part_idx = parts.size() - 1;
RangesInDataPart & part = parts.back();
size_t & marks_in_part = per_part_sum_marks.back();
RangesInDataPart & part = current_parts->back().part;
size_t & marks_in_part = current_parts->back().sum_marks;
const auto part_idx = current_parts->back().part_idx;
/// Do not get too few rows from part.
if (marks_in_part >= min_marks_for_concurrent_read &&
......@@ -274,8 +333,7 @@ void MergeTreeReadPool::fillPerThreadInfo(
marks_in_ranges = marks_in_part;
need_marks -= marks_in_part;
parts.pop_back();
per_part_sum_marks.pop_back();
current_parts->pop_back();
}
else
{
......
......@@ -36,13 +36,16 @@ public:
size_t min_interval_between_events_ms = 1000;
/// Number of events to do backoff - to lower number of threads in pool.
size_t min_events = 2;
/// Try keeping the minimal number of threads in pool.
size_t min_concurrency = 1;
/// Constants above is just an example.
BackoffSettings(const Settings & settings)
: min_read_latency_ms(settings.read_backoff_min_latency_ms.totalMilliseconds()),
max_throughput(settings.read_backoff_max_throughput),
min_interval_between_events_ms(settings.read_backoff_min_interval_between_events_ms.totalMilliseconds()),
min_events(settings.read_backoff_min_events)
min_events(settings.read_backoff_min_events),
min_concurrency(settings.read_backoff_min_concurrency)
{
}
......@@ -68,7 +71,7 @@ private:
public:
MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
RangesInDataParts && parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const bool do_not_steal_tasks_ = false);
......@@ -88,11 +91,11 @@ public:
private:
std::vector<size_t> fillPerPartInfo(
RangesInDataParts & parts, const bool check_columns);
const RangesInDataParts & parts, const bool check_columns);
void fillPerThreadInfo(
const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const MergeTreeData & data;
StorageMetadataPtr metadata_snapshot;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册