MergeTreeReadPool.h 11.4 KB
Newer Older
A
Merge  
Andrey Mironov 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
#pragma once

#include <DB/Core/NamesAndTypes.h>
#include <DB/Storages/MergeTree/RangesInDataPart.h>
#include <statdaemons/ext/range.hpp>
#include <mutex>


namespace DB
{


struct MergeTreeReadTask
{
	MergeTreeData::DataPartPtr data_part;
	MarkRanges mark_ranges;
	std::size_t part_index_in_query;
	const Names & ordered_names;
	const NameSet & column_name_set;
	const NamesAndTypesList & columns;
	const NamesAndTypesList & pre_columns;
	const bool remove_prewhere_column;
A
Merge  
Andrey Mironov 已提交
23
	const bool should_reorder;
A
Merge  
Andrey Mironov 已提交
24

A
Merge  
Andrey Mironov 已提交
25 26 27 28
	MergeTreeReadTask(
		const MergeTreeData::DataPartPtr & data_part, const MarkRanges & ranges, const std::size_t part_index_in_query,
		const Names & ordered_names, const NameSet & column_name_set, const NamesAndTypesList & columns,
		const NamesAndTypesList & pre_columns, const bool remove_prewhere_column, const bool should_reorder)
A
Merge  
Andrey Mironov 已提交
29 30
		: data_part{data_part}, mark_ranges{ranges}, part_index_in_query{part_index_in_query},
		  ordered_names{ordered_names}, column_name_set{column_name_set}, columns{columns}, pre_columns{pre_columns},
A
Merge  
Andrey Mironov 已提交
31
		  remove_prewhere_column{remove_prewhere_column}, should_reorder{should_reorder}
A
Merge  
Andrey Mironov 已提交
32 33 34 35 36 37 38 39
	{}
};

using MergeTreeReadTaskPtr = std::unique_ptr<MergeTreeReadTask>;

class MergeTreeReadPool
{
public:
A
Merge  
Andrey Mironov 已提交
40 41 42 43
	MergeTreeReadPool(
		const RangesInDataParts & parts, MergeTreeData & data, const ExpressionActionsPtr & prewhere_actions,
		const String & prewhere_column_name, const bool check_columns, const Names & column_names)
		: parts{parts}, data{data}, column_names{column_names}
A
Merge  
Andrey Mironov 已提交
44
	{
A
Merge  
Andrey Mironov 已提交
45
		fillPerPartInfo(prewhere_actions, prewhere_column_name, check_columns);
A
Merge  
Andrey Mironov 已提交
46 47 48 49 50 51 52 53 54
	}

	MergeTreeReadPool(const MergeTreeReadPool &) = delete;
	MergeTreeReadPool & operator=(const MergeTreeReadPool &) = delete;

	MergeTreeReadTaskPtr getTask(const std::size_t min_marks_to_read)
	{
		const std::lock_guard<std::mutex> lock{mutex};

A
Merge  
Andrey Mironov 已提交
55
		if (remaining_part_indices.empty())
A
Merge  
Andrey Mironov 已提交
56 57 58
			return nullptr;

		/// find a part which has marks remaining
A
Merge  
Andrey Mironov 已提交
59
		const auto part_id = remaining_part_indices.back();
A
Merge  
Andrey Mironov 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88

		auto & part = parts[part_id];
		const auto & column_name_set = per_part_column_name_set[part_id];
		const auto & columns = per_part_columns[part_id];
		const auto & pre_columns = per_part_pre_columns[part_id];
		const auto remove_prewhere_column = per_part_remove_prewhere_column[part_id];
		auto & marks_in_part = per_part_sum_marks[part_id];

		/// Берём весь кусок, если он достаточно мал
		auto need_marks = std::min(marks_in_part, min_marks_to_read);

		/// Не будем оставлять в куске слишком мало строк.
		if (marks_in_part > need_marks &&
			marks_in_part - need_marks < min_marks_to_read)
			need_marks = marks_in_part;

		MarkRanges ranges_to_get_from_part;

		/// Возьмем весь кусок, если он достаточно мал.
		if (marks_in_part <= need_marks)
		{
			const auto marks_to_get_from_range = marks_in_part;

			/// Восстановим порядок отрезков.
			std::reverse(part.ranges.begin(), part.ranges.end());

			ranges_to_get_from_part = part.ranges;

			marks_in_part -= marks_to_get_from_range;
A
Merge  
Andrey Mironov 已提交
89 90

			remaining_part_indices.pop_back();
A
Merge  
Andrey Mironov 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
		}
		else
		{
			/// Цикл по отрезкам куска.
			while (need_marks > 0 && !part.ranges.empty())
			{
				auto & range = part.ranges.back();

				const std::size_t marks_in_range = range.end - range.begin;
				const std::size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);

				ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
				range.begin += marks_to_get_from_range;
				if (range.begin == range.end)
					part.ranges.pop_back();

				marks_in_part -= marks_to_get_from_range;
				need_marks -= marks_to_get_from_range;
			}
A
Merge  
Andrey Mironov 已提交
110 111 112

			if (0 == marks_in_part)
				remaining_part_indices.pop_back();
A
Merge  
Andrey Mironov 已提交
113 114 115
		}

		return std::make_unique<MergeTreeReadTask>(
A
Merge  
Andrey Mironov 已提交
116 117
			part.data_part, ranges_to_get_from_part, part.part_index_in_query, column_names, column_name_set, columns,
			pre_columns, remove_prewhere_column, per_part_should_reorder[part_id]);
A
Merge  
Andrey Mironov 已提交
118 119 120
	}

public:
A
Merge  
Andrey Mironov 已提交
121 122
	void fillPerPartInfo(
		const ExpressionActionsPtr & prewhere_actions, const String & prewhere_column_name, const bool check_columns)
A
Merge  
Andrey Mironov 已提交
123
	{
A
Merge  
Andrey Mironov 已提交
124 125 126
		remaining_part_indices.reserve(parts.size());

		for (const auto i : ext::range(0, parts.size()))
A
Merge  
Andrey Mironov 已提交
127
		{
A
Merge  
Andrey Mironov 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141
			auto & part = parts[i];

			/// Посчитаем засечки для каждого куска.
			size_t sum_marks = 0;
			/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
			std::reverse(std::begin(part.ranges), std::end(part.ranges));

			for (const auto & range : part.ranges)
				sum_marks += range.end - range.begin;

			per_part_sum_marks.push_back(sum_marks);

			if (0 != sum_marks)
				remaining_part_indices.push_back(i);
A
Merge  
Andrey Mironov 已提交
142

A
Merge  
Andrey Mironov 已提交
143 144 145 146 147 148 149
			per_part_columns_lock.push_back(std::make_unique<Poco::ScopedReadRWLock>(
				part.data_part->columns_lock));

			/// inject column names required for DEFAULT evaluation in current part
			auto required_column_names = column_names;

			const auto injected_columns = injectRequiredColumns(part.data_part, required_column_names);
A
Merge  
Andrey Mironov 已提交
150
			auto should_reoder = !injected_columns.empty();
A
Merge  
Andrey Mironov 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163

			Names required_pre_column_names;

			if (prewhere_actions)
			{
				/// collect columns required for PREWHERE evaluation
				required_pre_column_names = prewhere_actions->getRequiredColumns();

				/// there must be at least one column required for PREWHERE
				if (required_pre_column_names.empty())
					required_pre_column_names.push_back(required_column_names[0]);

				/// PREWHERE columns may require some additional columns for DEFAULT evaluation
A
Merge  
Andrey Mironov 已提交
164 165 166
				const auto injected_pre_columns = injectRequiredColumns(part.data_part, required_pre_column_names);
				if (!injected_pre_columns.empty())
					should_reoder = true;
A
Merge  
Andrey Mironov 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204

				/// will be used to distinguish between PREWHERE and WHERE columns when applying filter
				const NameSet pre_name_set{
					std::begin(required_pre_column_names), std::end(required_pre_column_names)
				};
				/** Если выражение в PREWHERE - не столбец таблицы, не нужно отдавать наружу столбец с ним
				 *	(от storage ожидают получить только столбцы таблицы). */
				per_part_remove_prewhere_column.push_back(0 == pre_name_set.count(prewhere_column_name));

				Names post_column_names;
				for (const auto & name : required_column_names)
					if (!pre_name_set.count(name))
						post_column_names.push_back(name);

				required_column_names = post_column_names;
			}
			else
				per_part_remove_prewhere_column.push_back(false);

			per_part_column_name_set.emplace_back(std::begin(required_column_names), std::end(required_column_names));

			if (check_columns)
			{
				/** Под part->columns_lock проверим, что все запрошенные столбцы в куске того же типа, что в таблице.
				 *	Это может быть не так во время ALTER MODIFY. */
				if (!required_pre_column_names.empty())
					data.check(part.data_part->columns, required_pre_column_names);
				if (!required_column_names.empty())
					data.check(part.data_part->columns, required_column_names);

				per_part_pre_columns.push_back(data.getColumnsList().addTypes(required_pre_column_names));
				per_part_columns.push_back(data.getColumnsList().addTypes(required_column_names));
			}
			else
			{
				per_part_pre_columns.push_back(part.data_part->columns.addTypes(required_pre_column_names));
				per_part_columns.push_back(part.data_part->columns.addTypes(required_column_names));
			}
A
Merge  
Andrey Mironov 已提交
205 206

			per_part_should_reorder.push_back(should_reoder);
A
Merge  
Andrey Mironov 已提交
207 208 209 210 211 212 213 214 215 216 217 218
		}
	}

	/** Если некоторых запрошенных столбцов нет в куске,
	 *	то выясняем, какие столбцы может быть необходимо дополнительно прочитать,
	 *	чтобы можно было вычислить DEFAULT выражение для этих столбцов.
	 *	Добавляет их в columns. */
	NameSet injectRequiredColumns(const MergeTreeData::DataPartPtr & part, Names & columns) const
	{
		NameSet required_columns{std::begin(columns), std::end(columns)};
		NameSet injected_columns;

A
Merge  
Andrey Mironov 已提交
219 220
		auto all_column_files_missing = true;

A
Merge  
Andrey Mironov 已提交
221 222 223 224 225 226
		for (size_t i = 0; i < columns.size(); ++i)
		{
			const auto & column_name = columns[i];

			/// column has files and hence does not require evaluation
			if (part->hasColumnFiles(column_name))
A
Merge  
Andrey Mironov 已提交
227 228
			{
				all_column_files_missing = false;
A
Merge  
Andrey Mironov 已提交
229
				continue;
A
Merge  
Andrey Mironov 已提交
230
			}
A
Merge  
Andrey Mironov 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255

			const auto default_it = data.column_defaults.find(column_name);
			/// columns has no explicit default expression
			if (default_it == std::end(data.column_defaults))
				continue;

			/// collect identifiers required for evaluation
			IdentifierNameSet identifiers;
			default_it->second.expression->collectIdentifierNames(identifiers);

			for (const auto & identifier : identifiers)
			{
				if (data.hasColumn(identifier))
				{
					/// ensure each column is added only once
					if (required_columns.count(identifier) == 0)
					{
						columns.emplace_back(identifier);
						required_columns.emplace(identifier);
						injected_columns.emplace(identifier);
					}
				}
			}
		}

A
Merge  
Andrey Mironov 已提交
256 257 258 259 260 261 262
		if (all_column_files_missing)
		{
			addMinimumSizeColumn(part, columns);
			/// correctly report added column
			injected_columns.insert(columns.back());
		}

A
Merge  
Andrey Mironov 已提交
263 264 265
		return injected_columns;
	}

A
Merge  
Andrey Mironov 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
	/** Добавить столбец минимального размера.
	  * Используется в случае, когда ни один столбец не нужен, но нужно хотя бы знать количество строк.
	  * Добавляет в columns.
	  */
	void addMinimumSizeColumn(const MergeTreeData::DataPartPtr & part, Names & columns) const
	{
		const auto get_column_size = [this, &part] (const String & name) {
			const auto & files = part->checksums.files;

			const auto escaped_name = escapeForFileName(name);
			const auto bin_file_name = escaped_name + ".bin";
			const auto mrk_file_name = escaped_name + ".mrk";

			return files.find(bin_file_name)->second.file_size + files.find(mrk_file_name)->second.file_size;
		};

		const auto & storage_columns = data.getColumnsList();
		const NameAndTypePair * minimum_size_column = nullptr;
		auto minimum_size = std::numeric_limits<size_t>::max();

		for (const auto & column : storage_columns)
		{
			if (!part->hasColumnFiles(column.name))
				continue;

			const auto size = get_column_size(column.name);
			if (size < minimum_size)
			{
				minimum_size = size;
				minimum_size_column = &column;
			}
		}

		if (!minimum_size_column)
			throw Exception{
				"Could not find a column of minimum size in MergeTree",
				ErrorCodes::LOGICAL_ERROR
			};

		columns.push_back(minimum_size_column->name);
	}

A
Merge  
Andrey Mironov 已提交
308 309 310
	std::vector<std::unique_ptr<Poco::ScopedReadRWLock>> per_part_columns_lock;
	RangesInDataParts parts;
	std::vector<std::size_t> per_part_sum_marks;
A
Merge  
Andrey Mironov 已提交
311
	std::vector<std::size_t> remaining_part_indices;
A
Merge  
Andrey Mironov 已提交
312
	MergeTreeData & data;
A
Merge  
Andrey Mironov 已提交
313
	Names column_names;
A
Merge  
Andrey Mironov 已提交
314 315 316 317 318
	std::vector<NameSet> per_part_column_name_set;
	std::vector<NamesAndTypesList> per_part_columns;
	std::vector<NamesAndTypesList> per_part_pre_columns;
	/// @todo actually all of these values are either true or false for the whole query, thus no vector required
	std::vector<bool> per_part_remove_prewhere_column;
A
Merge  
Andrey Mironov 已提交
319
	std::vector<bool> per_part_should_reorder;
A
Merge  
Andrey Mironov 已提交
320 321 322 323 324 325 326 327

	mutable std::mutex mutex;
};

using MergeTreeReadPoolPtr = std::shared_ptr<MergeTreeReadPool>;


}