MergeTreeDataSelectExecutor.cpp 20.4 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
A
Merge  
Andrey Mironov 已提交
2
#include <DB/Storages/MergeTree/MergeTreeWhereOptimizer.h>
M
Merge  
Michael Kolupaev 已提交
3 4 5 6 7 8 9
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/CollapsingFinalBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
M
Merge  
Michael Kolupaev 已提交
10
#include <DB/Common/VirtualColumnUtils.h>
M
Merge  
Michael Kolupaev 已提交
11

12

M
Merge  
Michael Kolupaev 已提交
13 14 15
namespace DB
{

M
Merge  
Michael Kolupaev 已提交
16
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_) : data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
M
Merge  
Michael Kolupaev 已提交
17 18 19 20 21 22 23 24
{
	min_marks_for_seek = (data.settings.min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
	min_marks_for_concurrent_read = (data.settings.min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
	max_marks_to_use_cache = (data.settings.max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;


}

M
Merge  
Michael Kolupaev 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37
/// Построить блок состоящий только из возможных значений виртуальных столбцов
static Block getBlockWithVirtualColumns(const MergeTreeData::DataPartsVector & parts)
{
	Block res;
	ColumnWithNameAndType _part(new ColumnString, new DataTypeString, "_part");

	for (const auto & part : parts)
		_part.column->insert(part->name);

	res.insert(_part);
	return res;
}

M
Merge  
Michael Kolupaev 已提交
38 39 40
BlockInputStreams MergeTreeDataSelectExecutor::read(
	const Names & column_names_to_return,
	ASTPtr query,
41
	const Context & context,
M
Merge  
Michael Kolupaev 已提交
42 43
	const Settings & settings,
	QueryProcessingStage::Enum & processed_stage,
44 45
	const size_t max_block_size,
	const unsigned threads,
M
Merge  
Michael Kolupaev 已提交
46
	size_t * part_index)
M
Merge  
Michael Kolupaev 已提交
47
{
M
Merge  
Michael Kolupaev 已提交
48 49 50 51
	size_t part_index_var = 0;
	if (!part_index)
		part_index = &part_index_var;

A
Merge  
Andrey Mironov 已提交
52
	MergeTreeData::DataPartsVector parts = data.getDataPartsVector();
M
Merge  
Michael Kolupaev 已提交
53 54 55

	/// Если в запросе есть ограничения на виртуальный столбец _part, выберем только подходящие под него куски.
	Names virt_column_names, real_column_names;
M
Merge  
Michael Kolupaev 已提交
56 57 58 59
	for (const String & name : column_names_to_return)
		if (name != "_part" &&
			name != "_part_index")
			real_column_names.push_back(name);
M
Merge  
Michael Kolupaev 已提交
60
		else
M
Merge  
Michael Kolupaev 已提交
61
			virt_column_names.push_back(name);
M
Merge  
Michael Kolupaev 已提交
62

M
Merge  
Michael Kolupaev 已提交
63
	/// Если в запросе только виртуальные столбцы, надо запросить хотя бы один любой другой.
64
	if (real_column_names.empty())
M
Merge  
Michael Kolupaev 已提交
65 66
		real_column_names.push_back(ExpressionActions::getSmallestColumn(data.getColumnsList()));

67 68 69 70 71 72 73
	ASTSelectQuery & select = *typeid_cast<ASTSelectQuery*>(&*query);

	/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
	if (settings.optimize_move_to_prewhere)
		if (select.where_expression && !select.prewhere_expression)
			MergeTreeWhereOptimizer{select, data, column_names_to_return, log};

M
Merge  
Michael Kolupaev 已提交
74 75 76 77
	Block virtual_columns_block = getBlockWithVirtualColumns(parts);

	/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
	if (!virt_column_names.empty())
78
		VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
M
Merge  
Michael Kolupaev 已提交
79

80
	std::multiset<String> values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
M
Merge  
Michael Kolupaev 已提交
81 82

	data.check(real_column_names);
M
Merge  
Michael Kolupaev 已提交
83 84
	processed_stage = QueryProcessingStage::FetchColumns;

85 86
	PKCondition key_condition(query, context, data.getColumnsList(), data.getSortDescription());
	PKCondition date_condition(query, context, data.getColumnsList(), SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
M
Merge  
Michael Kolupaev 已提交
87

M
Merge  
Michael Kolupaev 已提交
88
	/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part.
M
Merge  
Michael Kolupaev 已提交
89
	{
M
Merge  
Michael Kolupaev 已提交
90 91
		auto prev_parts = parts;
		parts.clear();
M
Merge  
Michael Kolupaev 已提交
92

M
Merge  
Michael Kolupaev 已提交
93
		for (const auto & part : prev_parts)
M
Merge  
Michael Kolupaev 已提交
94
		{
M
Merge  
Michael Kolupaev 已提交
95 96 97 98 99 100 101 102
			if (values.find(part->name) == values.end())
				continue;

			Field left = static_cast<UInt64>(part->left_date);
			Field right = static_cast<UInt64>(part->right_date);

			if (!date_condition.mayBeTrueInRange(&left, &right))
				continue;
M
Merge  
Michael Kolupaev 已提交
103

M
Merge  
Michael Kolupaev 已提交
104
			parts.push_back(part);
M
Merge  
Michael Kolupaev 已提交
105 106 107 108
		}
	}

	/// Семплирование.
M
Merge  
Michael Kolupaev 已提交
109
	Names column_names_to_read = real_column_names;
M
Merge  
Michael Kolupaev 已提交
110 111 112 113
	UInt64 sampling_column_value_limit = 0;
	typedef Poco::SharedPtr<ASTFunction> ASTFunctionPtr;
	ASTFunctionPtr filter_function;
	ExpressionActionsPtr filter_expression;
114
	double relative_sample_size = 0;
M
Merge  
Michael Kolupaev 已提交
115 116 117

	if (select.sample_size)
	{
118
		relative_sample_size = apply_visitor(FieldVisitorConvertToNumber<double>(),
119
			typeid_cast<ASTLiteral&>(*select.sample_size).value);
M
Merge  
Michael Kolupaev 已提交
120

121
		if (relative_sample_size < 0)
M
Merge  
Michael Kolupaev 已提交
122 123
			throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);

124 125
		/// Переводим абсолютную величину сэмплирования (вида SAMPLE 1000000 - сколько строк прочитать) в относительную (какую долю данных читать).
		if (relative_sample_size > 1)
M
Merge  
Michael Kolupaev 已提交
126
		{
127
			size_t requested_count = apply_visitor(FieldVisitorConvertToNumber<UInt64>(), typeid_cast<ASTLiteral&>(*select.sample_size).value);
M
Merge  
Michael Kolupaev 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141

			/// Узнаем, сколько строк мы бы прочли без семплирования.
			LOG_DEBUG(log, "Preliminary index scan with condition: " << key_condition.toString());
			size_t total_count = 0;
			for (size_t i = 0; i < parts.size(); ++i)
			{
				MergeTreeData::DataPartPtr & part = parts[i];
				MarkRanges ranges = markRangesFromPkRange(part->index, key_condition);

				for (size_t j = 0; j < ranges.size(); ++j)
					total_count += ranges[j].end - ranges[j].begin;
			}
			total_count *= data.index_granularity;

142
			relative_sample_size = std::min(1., static_cast<double>(requested_count) / total_count);
M
Merge  
Michael Kolupaev 已提交
143

144
			LOG_DEBUG(log, "Selected relative sample size: " << relative_sample_size);
M
Merge  
Michael Kolupaev 已提交
145 146
		}

147 148 149 150 151 152 153
		/// SAMPLE 1 - то же, что и отсутствие SAMPLE.
		if (relative_sample_size == 1)
			relative_sample_size = 0;
	}

	if (relative_sample_size != 0)
	{
M
Merge  
Michael Kolupaev 已提交
154
		UInt64 sampling_column_max = 0;
M
Merge  
Michael Kolupaev 已提交
155
		DataTypePtr type = data.getPrimaryExpression()->getSampleBlock().getByName(data.sampling_expression->getColumnName()).type;
M
Merge  
Michael Kolupaev 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168

		if (type->getName() == "UInt64")
			sampling_column_max = std::numeric_limits<UInt64>::max();
		else if (type->getName() == "UInt32")
			sampling_column_max = std::numeric_limits<UInt32>::max();
		else if (type->getName() == "UInt16")
			sampling_column_max = std::numeric_limits<UInt16>::max();
		else if (type->getName() == "UInt8")
			sampling_column_max = std::numeric_limits<UInt8>::max();
		else
			throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);

		/// Добавим условие, чтобы отсечь еще что-нибудь при повторном просмотре индекса.
169
		sampling_column_value_limit = static_cast<UInt64>(relative_sample_size * sampling_column_max);
M
Merge  
Michael Kolupaev 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
		if (!key_condition.addCondition(data.sampling_expression->getColumnName(),
			Range::createRightBounded(sampling_column_value_limit, true)))
			throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);

		/// Выражение для фильтрации: sampling_expression <= sampling_column_value_limit

		ASTPtr filter_function_args = new ASTExpressionList;
		filter_function_args->children.push_back(data.sampling_expression);
		filter_function_args->children.push_back(new ASTLiteral(StringRange(), sampling_column_value_limit));

		filter_function = new ASTFunction;
		filter_function->name = "lessOrEquals";
		filter_function->arguments = filter_function_args;
		filter_function->children.push_back(filter_function->arguments);

185
		filter_expression = ExpressionAnalyzer(filter_function, data.context, data.getColumnsList()).getActions(false);
M
Merge  
Michael Kolupaev 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201

		/// Добавим столбцы, нужные для sampling_expression.
		std::vector<String> add_columns = filter_expression->getRequiredColumns();
		column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
		std::sort(column_names_to_read.begin(), column_names_to_read.end());
		column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
	}

	LOG_DEBUG(log, "Key condition: " << key_condition.toString());
	LOG_DEBUG(log, "Date condition: " << date_condition.toString());

	/// PREWHERE
	ExpressionActionsPtr prewhere_actions;
	String prewhere_column;
	if (select.prewhere_expression)
	{
202
		ExpressionAnalyzer analyzer(select.prewhere_expression, data.context, data.getColumnsList());
M
Merge  
Michael Kolupaev 已提交
203 204
		prewhere_actions = analyzer.getActions(false);
		prewhere_column = select.prewhere_expression->getColumnName();
205
		/// TODO: Чтобы работали подзапросы в PREWHERE, можно тут сохранить analyzer.getSetsWithSubqueries(), а потом их выполнить.
M
Merge  
Michael Kolupaev 已提交
206 207 208 209 210 211 212
	}

	RangesInDataParts parts_with_ranges;

	/// Найдем, какой диапазон читать из каждого куска.
	size_t sum_marks = 0;
	size_t sum_ranges = 0;
213
	for (auto & part : parts)
M
Merge  
Michael Kolupaev 已提交
214
	{
M
Merge  
Michael Kolupaev 已提交
215
		RangesInDataPart ranges(part, (*part_index)++);
M
Merge  
Michael Kolupaev 已提交
216 217 218 219 220 221 222
		ranges.ranges = markRangesFromPkRange(part->index, key_condition);

		if (!ranges.ranges.empty())
		{
			parts_with_ranges.push_back(ranges);

			sum_ranges += ranges.ranges.size();
223 224
			for (const auto & range : ranges.ranges)
				sum_marks += range.end - range.begin;
M
Merge  
Michael Kolupaev 已提交
225 226 227 228 229 230 231 232 233 234 235
		}
	}

	LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
			  << sum_marks << " marks to read from " << sum_ranges << " ranges");

	BlockInputStreams res;

	if (select.final)
	{
		/// Добавим столбцы, нужные для вычисления первичного ключа и знака.
M
Merge  
Michael Kolupaev 已提交
236
		std::vector<String> add_columns = data.getPrimaryExpression()->getRequiredColumns();
M
Merge  
Michael Kolupaev 已提交
237 238 239 240 241 242 243 244 245 246 247 248
		column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
		column_names_to_read.push_back(data.sign_column);
		std::sort(column_names_to_read.begin(), column_names_to_read.end());
		column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());

		res = spreadMarkRangesAmongThreadsFinal(
			parts_with_ranges,
			threads,
			column_names_to_read,
			max_block_size,
			settings.use_uncompressed_cache,
			prewhere_actions,
M
Merge  
Michael Kolupaev 已提交
249
			prewhere_column,
M
Merge  
Michael Kolupaev 已提交
250
			virt_column_names);
M
Merge  
Michael Kolupaev 已提交
251 252 253 254 255 256 257 258 259 260
	}
	else
	{
		res = spreadMarkRangesAmongThreads(
			parts_with_ranges,
			threads,
			column_names_to_read,
			max_block_size,
			settings.use_uncompressed_cache,
			prewhere_actions,
M
Merge  
Michael Kolupaev 已提交
261
			prewhere_column,
M
Merge  
Michael Kolupaev 已提交
262
			virt_column_names);
M
Merge  
Michael Kolupaev 已提交
263 264
	}

265
	if (relative_sample_size != 0)
266 267
		for (auto & stream : res)
			stream = new FilterBlockInputStream(new ExpressionBlockInputStream(stream, filter_expression), filter_function->getColumnName());
M
Merge  
Michael Kolupaev 已提交
268 269 270 271 272 273 274 275 276 277 278

	return res;
}

BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
	RangesInDataParts parts,
	size_t threads,
	const Names & column_names,
	size_t max_block_size,
	bool use_uncompressed_cache,
	ExpressionActionsPtr prewhere_actions,
M
Merge  
Michael Kolupaev 已提交
279
	const String & prewhere_column,
M
Merge  
Michael Kolupaev 已提交
280
	const Names & virt_columns)
M
Merge  
Michael Kolupaev 已提交
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
{
	/// На всякий случай перемешаем куски.
	std::random_shuffle(parts.begin(), parts.end());

	/// Посчитаем засечки для каждого куска.
	std::vector<size_t> sum_marks_in_parts(parts.size());
	size_t sum_marks = 0;
	for (size_t i = 0; i < parts.size(); ++i)
	{
		/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
		std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());

		sum_marks_in_parts[i] = 0;
		for (size_t j = 0; j < parts[i].ranges.size(); ++j)
		{
			MarkRange & range = parts[i].ranges[j];
			sum_marks_in_parts[i] += range.end - range.begin;
		}
		sum_marks += sum_marks_in_parts[i];
	}

	if (sum_marks > max_marks_to_use_cache)
		use_uncompressed_cache = false;

	BlockInputStreams res;

	if (sum_marks > 0)
	{
		size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;

		for (size_t i = 0; i < threads && !parts.empty(); ++i)
		{
			size_t need_marks = min_marks_per_thread;
			BlockInputStreams streams;

			/// Цикл по кускам.
			while (need_marks > 0 && !parts.empty())
			{
				RangesInDataPart & part = parts.back();
				size_t & marks_in_part = sum_marks_in_parts.back();

				/// Не будем брать из куска слишком мало строк.
				if (marks_in_part >= min_marks_for_concurrent_read &&
					need_marks < min_marks_for_concurrent_read)
					need_marks = min_marks_for_concurrent_read;

				/// Не будем оставлять в куске слишком мало строк.
				if (marks_in_part > need_marks &&
					marks_in_part - need_marks < min_marks_for_concurrent_read)
					need_marks = marks_in_part;

M
Merge  
Michael Kolupaev 已提交
332 333
				MarkRanges ranges_to_get_from_part;

M
Merge  
Michael Kolupaev 已提交
334 335 336 337 338 339
				/// Возьмем весь кусок, если он достаточно мал.
				if (marks_in_part <= need_marks)
				{
					/// Восстановим порядок отрезков.
					std::reverse(part.ranges.begin(), part.ranges.end());

M
Merge  
Michael Kolupaev 已提交
340 341
					ranges_to_get_from_part = part.ranges;

M
Merge  
Michael Kolupaev 已提交
342 343 344 345
					need_marks -= marks_in_part;
					parts.pop_back();
					sum_marks_in_parts.pop_back();
				}
M
Merge  
Michael Kolupaev 已提交
346
				else
M
Merge  
Michael Kolupaev 已提交
347
				{
M
Merge  
Michael Kolupaev 已提交
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
					/// Цикл по отрезкам куска.
					while (need_marks > 0)
					{
						if (part.ranges.empty())
							throw Exception("Unexpected end of ranges while spreading marks among threads", ErrorCodes::LOGICAL_ERROR);

						MarkRange & range = part.ranges.back();
						size_t marks_in_range = range.end - range.begin;

						size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
						ranges_to_get_from_part.push_back(MarkRange(range.begin, range.begin + marks_to_get_from_range));
						range.begin += marks_to_get_from_range;
						marks_in_part -= marks_to_get_from_range;
						need_marks -= marks_to_get_from_range;
						if (range.begin == range.end)
							part.ranges.pop_back();
					}
M
Merge  
Michael Kolupaev 已提交
365 366 367
				}

				streams.push_back(new MergeTreeBlockInputStream(
368 369
					data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
					part.data_part, ranges_to_get_from_part, use_uncompressed_cache,
M
Merge  
Michael Kolupaev 已提交
370
					prewhere_actions, prewhere_column));
M
Merge  
Michael Kolupaev 已提交
371 372 373 374 375 376 377 378 379
				for (const String & virt_column : virt_columns)
				{
					if (virt_column == "_part")
						streams.back() = new AddingConstColumnBlockInputStream<String>(
							streams.back(), new DataTypeString, part.data_part->name, "_part");
					else if (virt_column == "_part_index")
						streams.back() = new AddingConstColumnBlockInputStream<UInt64>(
							streams.back(), new DataTypeUInt64, part.part_index_in_query, "_part_index");
				}
M
Merge  
Michael Kolupaev 已提交
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
			}

			if (streams.size() == 1)
				res.push_back(streams[0]);
			else
				res.push_back(new ConcatBlockInputStream(streams));
		}

		if (!parts.empty())
			throw Exception("Couldn't spread marks among threads", ErrorCodes::LOGICAL_ERROR);
	}

	return res;
}

BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal(
	RangesInDataParts parts,
	size_t threads,
	const Names & column_names,
	size_t max_block_size,
	bool use_uncompressed_cache,
	ExpressionActionsPtr prewhere_actions,
M
Merge  
Michael Kolupaev 已提交
402
	const String & prewhere_column,
M
Merge  
Michael Kolupaev 已提交
403
	const Names & virt_columns)
M
Merge  
Michael Kolupaev 已提交
404 405 406 407 408 409 410 411 412 413 414
{
	size_t sum_marks = 0;
	for (size_t i = 0; i < parts.size(); ++i)
		for (size_t j = 0; j < parts[i].ranges.size(); ++j)
			sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;

	if (sum_marks > max_marks_to_use_cache)
		use_uncompressed_cache = false;

	ExpressionActionsPtr sign_filter_expression;
	String sign_filter_column;
415
	createPositiveSignCondition(sign_filter_expression, sign_filter_column);
M
Merge  
Michael Kolupaev 已提交
416 417 418 419 420 421 422 423

	BlockInputStreams to_collapse;

	for (size_t part_index = 0; part_index < parts.size(); ++part_index)
	{
		RangesInDataPart & part = parts[part_index];

		BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
424 425
			data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
			part.data_part, part.ranges, use_uncompressed_cache,
M
Merge  
Michael Kolupaev 已提交
426
			prewhere_actions, prewhere_column);
M
Merge  
Michael Kolupaev 已提交
427 428 429 430 431 432 433 434 435
		for (const String & virt_column : virt_columns)
		{
			if (virt_column == "_part")
				source_stream = new AddingConstColumnBlockInputStream<String>(
					source_stream, new DataTypeString, part.data_part->name, "_part");
			else if (virt_column == "_part_index")
				source_stream = new AddingConstColumnBlockInputStream<UInt64>(
					source_stream, new DataTypeUInt64, part.part_index_in_query, "_part_index");
		}
M
Merge  
Michael Kolupaev 已提交
436

M
Merge  
Michael Kolupaev 已提交
437
		to_collapse.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
M
Merge  
Michael Kolupaev 已提交
438 439 440 441 442 443
	}

	BlockInputStreams res;
	if (to_collapse.size() == 1)
		res.push_back(new FilterBlockInputStream(new ExpressionBlockInputStream(to_collapse[0], sign_filter_expression), sign_filter_column));
	else if (to_collapse.size() > 1)
M
Merge  
Michael Kolupaev 已提交
444
		res.push_back(new CollapsingFinalBlockInputStream(to_collapse, data.getSortDescription(), data.sign_column));
M
Merge  
Michael Kolupaev 已提交
445 446 447 448

	return res;
}

449
void MergeTreeDataSelectExecutor::createPositiveSignCondition(ExpressionActionsPtr & out_expression, String & out_column)
M
Merge  
Michael Kolupaev 已提交
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475
{
	ASTFunction * function = new ASTFunction;
	ASTPtr function_ptr = function;

	ASTExpressionList * arguments = new ASTExpressionList;
	ASTPtr arguments_ptr = arguments;

	ASTIdentifier * sign = new ASTIdentifier;
	ASTPtr sign_ptr = sign;

	ASTLiteral * one = new ASTLiteral;
	ASTPtr one_ptr = one;

	function->name = "equals";
	function->arguments = arguments_ptr;
	function->children.push_back(arguments_ptr);

	arguments->children.push_back(sign_ptr);
	arguments->children.push_back(one_ptr);

	sign->name = data.sign_column;
	sign->kind = ASTIdentifier::Column;

	one->type = new DataTypeInt8;
	one->value = Field(static_cast<Int64>(1));

476
	out_expression = ExpressionAnalyzer(function_ptr, data.context, data.getColumnsList()).getActions(false);
M
Merge  
Michael Kolupaev 已提交
477 478 479 480 481 482 483 484
	out_column = function->getColumnName();
}

/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPkRange(const MergeTreeData::DataPart::Index & index, PKCondition & key_condition)
{
	MarkRanges res;

M
Merge  
Michael Kolupaev 已提交
485
	size_t key_size = data.getSortDescription().size();
M
Merge  
Michael Kolupaev 已提交
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
	size_t marks_count = index.size() / key_size;

	/// Если индекс не используется.
	if (key_condition.alwaysTrue())
	{
		res.push_back(MarkRange(0, marks_count));
	}
	else
	{
		/** В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
			* На каждом шаге берем левый отрезок и проверяем, подходит ли он.
			* Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его.
			* Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем.
			*/
		std::vector<MarkRange> ranges_stack;
		ranges_stack.push_back(MarkRange(0, marks_count));
		while (!ranges_stack.empty())
		{
			MarkRange range = ranges_stack.back();
			ranges_stack.pop_back();

			bool may_be_true;
			if (range.end == marks_count)
				may_be_true = key_condition.mayBeTrueAfter(&index[range.begin * key_size]);
			else
				may_be_true = key_condition.mayBeTrueInRange(&index[range.begin * key_size], &index[range.end * key_size]);

			if (!may_be_true)
				continue;

			if (range.end == range.begin + 1)
			{
				/// Увидели полезный промежуток между соседними засечками. Либо добавим его к последнему диапазону, либо начнем новый диапазон.
				if (res.empty() || range.begin - res.back().end > min_marks_for_seek)
					res.push_back(range);
				else
					res.back().end = range.end;
			}
			else
			{
				/// Разбиваем отрезок и кладем результат в стек справа налево.
				size_t step = (range.end - range.begin - 1) / data.settings.coarse_index_granularity + 1;
				size_t end;

				for (end = range.end; end > range.begin + step; end -= step)
					ranges_stack.push_back(MarkRange(end - step, end));

				ranges_stack.push_back(MarkRange(range.begin, end));
			}
		}
	}

	return res;
}

}