提交 10bad32f 编写于 作者: N Nikolai Kochetov

Refactor code a little bit. Add comment.

上级 f995ef97
......@@ -267,51 +267,41 @@ void MergeTreeReadPool::fillPerThreadInfo(
size_t sum_marks;
size_t part_idx;
};
std::map<String, std::vector<PartInfo>> parts_per_disk;
for (size_t i = 0; i < parts.size(); ++i)
{
PartInfo part_info{parts[i], per_part_sum_marks[i], i};
if (parts[i].data_part->isStoredOnDisk())
parts_per_disk[parts[i].data_part->volume->getDisk()->getName()].push_back(std::move(part_info));
else
parts_per_disk[""].push_back(std::move(part_info));
}
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
using PartsInfo = std::vector<PartInfo>;
std::queue<PartsInfo> parts_queue;
auto it = std::prev(parts_per_disk.end());
auto * current_parts = &it->second;
auto get_next_parts = [&]()
{
size_t n = parts_per_disk.size();
do
/// Group parts by volume name.
/// We try minimize the number of threads concurrently read from the same volume.
/// It improves the performance for JBOD architecture.
std::map<String, std::vector<PartInfo>> parts_per_disk;
for (size_t i = 0; i < parts.size(); ++i)
{
++it;
if (it == parts_per_disk.end())
it = parts_per_disk.begin();
current_parts = &it->second;
} while (current_parts->empty() && --n);
return !current_parts->empty();
};
PartInfo part_info{parts[i], per_part_sum_marks[i], i};
if (parts[i].data_part->isStoredOnDisk())
parts_per_disk[parts[i].data_part->volume->getDisk()->getName()].push_back(std::move(part_info));
else
parts_per_disk[""].push_back(std::move(part_info));
}
auto get_current_parts = [&]()
{
if (!current_parts->empty())
return true;
return get_next_parts();
};
for (auto & info : parts_per_disk)
parts_queue.push(std::move(info.second));
}
for (size_t i = 0; i < threads && get_next_parts(); ++i)
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
for (size_t i = 0; i < threads && !parts_queue.empty(); ++i)
{
auto need_marks = min_marks_per_thread;
while (need_marks > 0 && get_current_parts())
while (need_marks > 0 && !parts_queue.empty())
{
RangesInDataPart & part = current_parts->back().part;
size_t & marks_in_part = current_parts->back().sum_marks;
const auto part_idx = current_parts->back().part_idx;
auto & current_parts = parts_queue.front();
RangesInDataPart & part = current_parts.back().part;
size_t & marks_in_part = current_parts.back().sum_marks;
const auto part_idx = current_parts.back().part_idx;
/// Do not get too few rows from part.
if (marks_in_part >= min_marks_for_concurrent_read &&
......@@ -333,7 +323,9 @@ void MergeTreeReadPool::fillPerThreadInfo(
marks_in_ranges = marks_in_part;
need_marks -= marks_in_part;
current_parts->pop_back();
current_parts.pop_back();
if (current_parts.empty())
parts_queue.pop();
}
else
{
......@@ -362,6 +354,17 @@ void MergeTreeReadPool::fillPerThreadInfo(
if (marks_in_ranges != 0)
remaining_thread_tasks.insert(i);
}
/// Before processing next thread, change volume if possible.
/// Different threads will likely start reading from different volumes,
/// which may improve read parallelism for JBOD.
/// It also may be helpful in case we have backoff threads.
/// Backoff threads will likely to reduce load for different disks, not the same one.
if (parts_queue.size() > 1)
{
parts_queue.push(std::move(parts_queue.front()));
parts_queue.pop();
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册