MergeTreeData.cpp 27.1 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/MergeTree/MergeTreeData.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <Yandex/time2str.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <Poco/Ext/ScopedTry.h>
M
Merge  
Michael Kolupaev 已提交
4
#include <DB/Interpreters/ExpressionAnalyzer.h>
M
Merge  
Michael Kolupaev 已提交
5
#include <DB/Storages/MergeTree/MergeTreeReader.h>
M
Merge  
Michael Kolupaev 已提交
6 7
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
8 9 10
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
M
Merge  
Michael Kolupaev 已提交
11
#include <DB/DataStreams/copyData.h>
M
Merge  
Michael Kolupaev 已提交
12 13 14 15 16 17 18



namespace DB
{

MergeTreeData::MergeTreeData(
M
Merge  
Michael Kolupaev 已提交
19
	const String & full_path_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
20 21 22 23 24 25
	const Context & context_,
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_, const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	Mode mode_,
	const String & sign_column_,
M
Merge  
Michael Kolupaev 已提交
26 27
	const MergeTreeSettings & settings_,
	const String & log_name_)
M
Merge  
Michael Kolupaev 已提交
28
	: context(context_),
M
Merge  
Michael Kolupaev 已提交
29 30 31
	date_column_name(date_column_name_), sampling_expression(sampling_expression_),
	index_granularity(index_granularity_),
	mode(mode_), sign_column(sign_column_),
M
Merge  
Michael Kolupaev 已提交
32
	settings(settings_), primary_expr_ast(primary_expr_ast_->clone()),
M
Merge  
Michael Kolupaev 已提交
33
	full_path(full_path_), columns(columns_), log_name(log_name_),
34
	log(&Logger::get(log_name + " (Data)"))
M
Merge  
Michael Kolupaev 已提交
35 36 37 38 39 40
{
	/// создаём директорию, если её нет
	Poco::File(full_path).createDirectories();

	/// инициализируем описание сортировки
	sort_descr.reserve(primary_expr_ast->children.size());
M
Merge  
Michael Kolupaev 已提交
41
	for (const ASTPtr & ast : primary_expr_ast->children)
M
Merge  
Michael Kolupaev 已提交
42
	{
M
Merge  
Michael Kolupaev 已提交
43
		String name = ast->getColumnName();
M
Merge  
Michael Kolupaev 已提交
44 45 46 47 48 49 50 51 52
		sort_descr.push_back(SortColumnDescription(name, 1));
	}

	primary_expr = ExpressionAnalyzer(primary_expr_ast, context, *columns).getActions(false);

	ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, *columns).getActions(true);
	primary_key_sample = projected_expr->getSampleBlock();

	loadDataParts();
M
Merge  
Michael Kolupaev 已提交
53
}
M
Merge  
Michael Kolupaev 已提交
54

M
Merge  
Michael Kolupaev 已提交
55 56
UInt64 MergeTreeData::getMaxDataPartIndex()
{
M
Merge  
Michael Kolupaev 已提交
57 58 59 60 61
	UInt64 max_part_id = 0;
	for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
	{
		max_part_id = std::max(max_part_id, (*it)->right);
	}
M
Merge  
Michael Kolupaev 已提交
62
	return max_part_id;
M
Merge  
Michael Kolupaev 已提交
63 64
}

M
Merge  
Michael Kolupaev 已提交
65
std::string MergeTreeData::getModePrefix() const
M
Merge  
Michael Kolupaev 已提交
66
{
M
Merge  
Michael Kolupaev 已提交
67
	switch (mode)
M
Merge  
Michael Kolupaev 已提交
68
	{
M
Merge  
Michael Kolupaev 已提交
69 70 71
		case Ordinary: 		return "";
		case Collapsing: 	return "Collapsing";
		case Summing: 		return "Summing";
M
Merge  
Michael Kolupaev 已提交
72

M
Merge  
Michael Kolupaev 已提交
73 74
		default:
			throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
	}
}


void MergeTreeData::loadDataParts()
{
	LOG_DEBUG(log, "Loading data parts");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

	data_parts.clear();

M
Merge  
Michael Kolupaev 已提交
88
	Strings all_file_names;
M
Merge  
Michael Kolupaev 已提交
89 90
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
M
Merge  
Michael Kolupaev 已提交
91
		all_file_names.push_back(it.name());
M
Merge  
Michael Kolupaev 已提交
92

M
Merge  
Michael Kolupaev 已提交
93 94 95
	Strings part_file_names;
	for (const String & file_name : all_file_names)
	{
M
Merge  
Michael Kolupaev 已提交
96 97 98 99 100
		/// Удаляем временные директории старше суток.
		if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
			continue;

		if (0 == file_name.compare(0, strlen("old_"), "old_"))
M
Merge  
Michael Kolupaev 已提交
101 102 103 104 105 106
		{
			String new_file_name = file_name.substr(strlen("old_"));
			LOG_WARNING(log, "Renaming " << file_name << " to " << new_file_name << " for compatibility reasons");
			Poco::File(full_path + file_name).renameTo(full_path + new_file_name);
			part_file_names.push_back(new_file_name);
		}
M
Merge  
Michael Kolupaev 已提交
107
		else
M
Merge  
Michael Kolupaev 已提交
108
		{
M
Merge  
Michael Kolupaev 已提交
109
			part_file_names.push_back(file_name);
M
Merge  
Michael Kolupaev 已提交
110
		}
M
Merge  
Michael Kolupaev 已提交
111 112
	}

M
Merge  
Michael Kolupaev 已提交
113 114
	DataPartsVector broken_parts_to_remove;

M
Merge  
Michael Kolupaev 已提交
115
	Poco::RegularExpression::MatchVec matches;
M
Merge  
Michael Kolupaev 已提交
116
	for (const String & file_name : part_file_names)
M
Merge  
Michael Kolupaev 已提交
117
	{
118
		if (!ActiveDataPartSet::isPartDirectory(file_name, matches))
M
Merge  
Michael Kolupaev 已提交
119 120
			continue;

M
Merge  
Michael Kolupaev 已提交
121
		MutableDataPartPtr part = std::make_shared<DataPart>(*this);
122
		ActiveDataPartSet::parsePartName(file_name, *part, &matches);
M
Merge  
Michael Kolupaev 已提交
123 124
		part->name = file_name;

M
Merge  
Michael Kolupaev 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138
		bool broken = false;

		try
		{
			part->loadIndex();
			part->loadChecksums();
			part->checkNotBroken();
		}
		catch (...)
		{
			broken = true;
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
139
		/// Игнорируем и, возможно, удаляем битые куски, которые могут образовываться после грубого перезапуска сервера.
M
Merge  
Michael Kolupaev 已提交
140
		if (broken)
M
Merge  
Michael Kolupaev 已提交
141 142 143 144
		{
			if (part->level == 0)
			{
				/// Восстановить куски нулевого уровня невозможно.
M
Merge  
Michael Kolupaev 已提交
145
				LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because is't impossible to repair.");
M
Merge  
Michael Kolupaev 已提交
146
				broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
147 148 149
			}
			else
			{
M
Merge  
Michael Kolupaev 已提交
150 151 152 153 154 155 156 157 158 159
				/// Посмотрим, сколько кусков покрыты битым. Если хотя бы два, предполагаем, что битый кусок образован их
				///  слиянием, и мы ничего не потеряем, если его удалим.
				int contained_parts = 0;

				LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");

				for (const String & contained_name : part_file_names)
				{
					if (contained_name == file_name)
						continue;
160
					if (!ActiveDataPartSet::isPartDirectory(contained_name, matches))
M
Merge  
Michael Kolupaev 已提交
161 162
						continue;
					DataPart contained_part(*this);
163
					ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
M
Merge  
Michael Kolupaev 已提交
164 165 166 167 168 169 170 171 172 173
					if (part->contains(contained_part))
					{
						LOG_ERROR(log, "Found part " << full_path + contained_name);
						++contained_parts;
					}
				}

				if (contained_parts >= 2)
				{
					LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it covers at least 2 other parts");
M
Merge  
Michael Kolupaev 已提交
174
					broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
175 176 177 178 179 180
				}
				else
				{
					LOG_ERROR(log, "Not removing broken part " << full_path + file_name
						<< " because it covers less than 2 parts. You need to resolve this manually");
				}
M
Merge  
Michael Kolupaev 已提交
181 182 183 184 185 186 187 188 189 190
			}

			continue;
		}

		part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();

		data_parts.insert(part);
	}

M
Merge  
Michael Kolupaev 已提交
191 192 193 194 195 196 197
	if (broken_parts_to_remove.size() > 2)
		throw Exception("Suspiciously many (" + toString(broken_parts_to_remove.size()) + ") broken parts to remove.",
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);

	for (const auto & part : broken_parts_to_remove)
		part->remove();

M
Merge  
Michael Kolupaev 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
	all_data_parts = data_parts;

	/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
	  *  но по каким-то причинам остались лежать в файловой системе.
	  * Удаление файлов будет произведено потом в методе clearOldParts.
	  */

	if (data_parts.size() >= 2)
	{
		DataParts::iterator prev_jt = data_parts.begin();
		DataParts::iterator curr_jt = prev_jt;
		++curr_jt;
		while (curr_jt != data_parts.end())
		{
			/// Куски данных за разные месяцы рассматривать не будем
			if ((*curr_jt)->left_month != (*curr_jt)->right_month
				|| (*curr_jt)->right_month != (*prev_jt)->left_month
				|| (*prev_jt)->left_month != (*prev_jt)->right_month)
			{
				++prev_jt;
				++curr_jt;
				continue;
			}

			if ((*curr_jt)->contains(**prev_jt))
			{
M
Merge  
Michael Kolupaev 已提交
224
				(*prev_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
225 226 227 228 229 230
				data_parts.erase(prev_jt);
				prev_jt = curr_jt;
				++curr_jt;
			}
			else if ((*prev_jt)->contains(**curr_jt))
			{
M
Merge  
Michael Kolupaev 已提交
231
				(*curr_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
232 233 234 235 236 237 238 239 240 241 242 243 244 245
				data_parts.erase(curr_jt++);
			}
			else
			{
				++prev_jt;
				++curr_jt;
			}
		}
	}

	LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
}


M
Merge  
Michael Kolupaev 已提交
246
Strings MergeTreeData::clearOldParts()
M
Merge  
Michael Kolupaev 已提交
247 248
{
	Poco::ScopedTry<Poco::FastMutex> lock;
M
Merge  
Michael Kolupaev 已提交
249
	Strings res;
M
Merge  
Michael Kolupaev 已提交
250 251 252 253

	/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
	if (!lock.lock(&all_data_parts_mutex))
	{
M
Merge  
Michael Kolupaev 已提交
254
		return res;
M
Merge  
Michael Kolupaev 已提交
255 256
	}

M
Merge  
Michael Kolupaev 已提交
257
	time_t now = time(0);
M
Merge  
Michael Kolupaev 已提交
258 259
	for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
	{
M
Merge  
Michael Kolupaev 已提交
260
		int ref_count = it->use_count();
M
Merge  
Michael Kolupaev 已提交
261 262
		if (ref_count == 1 && /// После этого ref_count не может увеличиться.
			(*it)->remove_time + settings.old_parts_lifetime < now)
M
Merge  
Michael Kolupaev 已提交
263
		{
M
Merge  
Michael Kolupaev 已提交
264
			LOG_DEBUG(log, "Removing part " << (*it)->name);
M
Merge  
Michael Kolupaev 已提交
265

M
Merge  
Michael Kolupaev 已提交
266 267
			res.push_back((*it)->name);
			(*it)->remove();
M
Merge  
Michael Kolupaev 已提交
268 269 270 271 272 273
			all_data_parts.erase(it++);
		}
		else
			++it;
	}

M
Merge  
Michael Kolupaev 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
	/// Удаляем временные директории старше суток.
	Strings all_file_names;
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
		all_file_names.push_back(it.name());

	for (const String & file_name : all_file_names)
	{
		if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
		{
			Poco::File tmp_dir(full_path + file_name);

			if (tmp_dir.isDirectory() && tmp_dir.getLastModified().epochTime() + 86400 < time(0))
			{
				LOG_WARNING(log, "Removing temporary directory " << full_path << file_name);
				Poco::File(full_path + file_name).remove(true);
			}

			continue;
		}
	}

M
Merge  
Michael Kolupaev 已提交
296
	return res;
M
Merge  
Michael Kolupaev 已提交
297 298
}

M
Merge  
Michael Kolupaev 已提交
299
void MergeTreeData::setPath(const String & new_full_path)
M
Merge  
Michael Kolupaev 已提交
300
{
301
	Poco::File(full_path).renameTo(new_full_path);
M
Merge  
Michael Kolupaev 已提交
302
	full_path = new_full_path;
303

304
	context.resetCaches();
M
Merge  
Michael Kolupaev 已提交
305 306
}

M
Merge  
Michael Kolupaev 已提交
307
void MergeTreeData::dropAllData()
M
Merge  
Michael Kolupaev 已提交
308 309 310 311
{
	data_parts.clear();
	all_data_parts.clear();

312
	context.resetCaches();
313

M
Merge  
Michael Kolupaev 已提交
314 315 316
	Poco::File(full_path).remove(true);
}

317
void MergeTreeData::removeColumnFiles(String column_name, bool remove_array_size_files)
M
Merge  
Michael Kolupaev 已提交
318 319 320 321
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

322 323 324 325 326 327 328 329 330 331
	size_t dot_pos = column_name.find('.');
	if (dot_pos != std::string::npos)
	{
		std::string nested_column = column_name.substr(0, dot_pos);
		column_name = nested_column + "%2E" + column_name.substr(dot_pos + 1);

		if (remove_array_size_files)
			column_name = std::string("(?:") + nested_column + "|" + column_name + ")";
	}

M
Merge  
Michael Kolupaev 已提交
332 333 334 335 336 337 338 339 340
	/// Регэксп выбирает файлы столбца для удаления
	Poco::RegularExpression re(column_name + "(?:(?:\\.|\\%2E).+){0,1}" +"(?:\\.mrk|\\.bin|\\.size\\d+\\.bin|\\.size\\d+\\.mrk)");
	/// Цикл по всем директориям кусочков
	Poco::RegularExpression::MatchVec matches;
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it_dir = Poco::DirectoryIterator(full_path); it_dir != end; ++it_dir)
	{
		std::string dir_name = it_dir.name();

341
		if (!ActiveDataPartSet::isPartDirectory(dir_name, matches))
M
Merge  
Michael Kolupaev 已提交
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
			continue;

		/// Цикл по каждому из файлов в директории кусочков
		String full_dir_name = full_path + dir_name + "/";
		for (Poco::DirectoryIterator it_file(full_dir_name); it_file != end; ++it_file)
		{
			if (re.match(it_file.name()))
			{
				Poco::File file(full_dir_name + it_file.name());
				if (file.exists())
					file.remove();
			}
		}
	}
}

void MergeTreeData::createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column)
{
M
Merge  
Michael Kolupaev 已提交
360 361 362
	Names out_names;
	out_expression = new ExpressionActions(
		NamesAndTypesList(1, NameAndTypePair(in_column_name, getDataTypeByName(in_column_name))), context.getSettingsRef());
M
Merge  
Michael Kolupaev 已提交
363

M
Merge  
Michael Kolupaev 已提交
364 365 366
	FunctionPtr function = context.getFunctionFactory().get("to" + out_type, context);
	out_expression->add(ExpressionActions::Action::applyFunction(function, Names(1, in_column_name)), out_names);
	out_expression->add(ExpressionActions::Action::removeColumn(in_column_name));
M
Merge  
Michael Kolupaev 已提交
367

M
Merge  
Michael Kolupaev 已提交
368
	out_column = out_names[0];
M
Merge  
Michael Kolupaev 已提交
369 370
}

M
Merge  
Michael Kolupaev 已提交
371 372 373 374 375 376 377 378 379 380
static DataTypePtr getDataTypeByName(const String & name, const NamesAndTypesList & columns)
{
	for (const auto & it : columns)
	{
		if (it.first == name)
			return it.second;
	}
	throw Exception("No column " + name + " in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}

381
/// одинаковыми считаются имена, вида "name.*"
M
Merge  
Michael Kolupaev 已提交
382
static bool namesWithDotEqual(const String & name_with_dot, const NameAndTypePair & name_type)
383 384 385 386
{
	return (name_with_dot == name_type.first.substr(0, name_with_dot.length()));
}

M
Merge  
Michael Kolupaev 已提交
387 388 389
void MergeTreeData::alter(const ASTAlterQuery::Parameters & params)
{
	{
M
Merge  
Michael Kolupaev 已提交
390 391 392 393
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
		Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
		alterColumns(params, columns, context);
	}
394

M
Merge  
Michael Kolupaev 已提交
395 396 397
	if (params.type == ASTAlterQuery::DROP)
	{
		String column_name = dynamic_cast<const ASTIdentifier &>(*params.column).name;
398 399 400 401 402 403 404 405 406

		/// Если нет колонок вида nested_name.*, то удалим столбцы размера массивов
		bool remove_array_size_files = false;
		size_t dot_pos = column_name.find('.');
		if (dot_pos != std::string::npos)
		{
			remove_array_size_files = (columns->end() == std::find_if(columns->begin(), columns->end(), boost::bind(namesWithDotEqual, column_name.substr(0, dot_pos), _1)));
		}
		removeColumnFiles(column_name, remove_array_size_files);
M
Merge  
Michael Kolupaev 已提交
407

408
		context.resetCaches();
M
Merge  
Michael Kolupaev 已提交
409 410
	}
}
M
Merge  
Michael Kolupaev 已提交
411

M
Merge  
Michael Kolupaev 已提交
412 413 414 415 416 417 418
void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params)
{
	DataPartsVector parts;
	{
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
		parts = DataPartsVector(data_parts.begin(), data_parts.end());
	}
M
Merge  
Michael Kolupaev 已提交
419

M
Merge  
Michael Kolupaev 已提交
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
	Names column_name;
	const ASTNameTypePair & name_type = dynamic_cast<const ASTNameTypePair &>(*params.name_type);
	StringRange type_range = name_type.type->range;
	String type(type_range.first, type_range.second - type_range.first);
	DataTypePtr old_type_ptr = DB::getDataTypeByName(name_type.name, *columns);
	DataTypePtr new_type_ptr = context.getDataTypeFactory().get(type);
	if (dynamic_cast<DataTypeNested *>(old_type_ptr.get()) || dynamic_cast<DataTypeArray *>(old_type_ptr.get()) ||
		dynamic_cast<DataTypeNested *>(new_type_ptr.get()) || dynamic_cast<DataTypeArray *>(new_type_ptr.get()))
		throw Exception("ALTER MODIFY not supported for nested and array types");

	column_name.push_back(name_type.name);
	ExpressionActionsPtr expr;
	String out_column;
	createConvertExpression(name_type.name, type, expr, out_column);

	ColumnNumbers num(1, 0);
	for (DataPartPtr & part : parts)
	{
		MarkRanges ranges(1, MarkRange(0, part->size));
		ExpressionBlockInputStream in(new MergeTreeBlockInputStream(full_path + part->name + '/',
A
Alexey Milovidov 已提交
440
			DEFAULT_MERGE_BLOCK_SIZE, column_name, *this, part, ranges, false, nullptr, ""), expr);
M
Merge  
Michael Kolupaev 已提交
441
		MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true);
M
Merge  
Michael Kolupaev 已提交
442
		in.readPrefix();
M
Merge  
Michael Kolupaev 已提交
443
		out.writePrefix();
M
Merge  
Michael Kolupaev 已提交
444

M
Merge  
Michael Kolupaev 已提交
445 446
		try
		{
447
			while (Block b = in.read())
M
Merge  
Michael Kolupaev 已提交
448
				out.write(b);
M
Merge  
Michael Kolupaev 已提交
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463

			in.readSuffix();
			DataPart::Checksums add_checksums = out.writeSuffixAndGetChecksums();

			/// Запишем обновленные контрольные суммы во временный файл.
			if (!part->checksums.empty())
			{
				DataPart::Checksums new_checksums = part->checksums;
				std::string escaped_name = escapeForFileName(name_type.name);
				std::string escaped_out_column = escapeForFileName(out_column);
				new_checksums.files[escaped_name  + ".bin"] = add_checksums.files[escaped_out_column + ".bin"];
				new_checksums.files[escaped_name  + ".mrk"] = add_checksums.files[escaped_out_column + ".mrk"];

				WriteBufferFromFile checksums_file(full_path + part->name + '/' + escaped_out_column + ".checksums.txt", 1024);
				new_checksums.writeText(checksums_file);
M
Merge  
Michael Kolupaev 已提交
464
			}
M
Merge  
Michael Kolupaev 已提交
465 466 467 468 469
		}
		catch (const Exception & e)
		{
			if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING)
				throw;
M
Merge  
Michael Kolupaev 已提交
470 471
		}
	}
M
Merge  
Michael Kolupaev 已提交
472 473 474 475 476
}

void MergeTreeData::commitAlterModify(const ASTAlterQuery::Parameters & params)
{
	DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
477 478
	{
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
479
		parts = DataPartsVector(data_parts.begin(), data_parts.end());
M
Merge  
Michael Kolupaev 已提交
480
	}
M
Merge  
Michael Kolupaev 已提交
481 482 483 484 485 486 487 488 489 490 491 492

	const ASTNameTypePair & name_type = dynamic_cast<const ASTNameTypePair &>(*params.name_type);
	StringRange type_range = name_type.type->range;
	String type(type_range.first, type_range.second - type_range.first);

	ExpressionActionsPtr expr;
	String out_column;
	createConvertExpression(name_type.name, type, expr, out_column);

	/// переименовываем файлы
	/// переименовываем старые столбцы, добавляя расширение .old
	for (DataPartPtr & part : parts)
M
Merge  
Michael Kolupaev 已提交
493
	{
M
Merge  
Michael Kolupaev 已提交
494 495
		std::string part_path = full_path + part->name + '/';
		std::string path = part_path + escapeForFileName(name_type.name);
M
Merge  
Michael Kolupaev 已提交
496 497 498 499 500 501
		if (Poco::File(path + ".bin").exists())
		{
			LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << path + ".bin" + ".old");
			Poco::File(path + ".bin").renameTo(path + ".bin" + ".old");
			LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << path + ".mrk" + ".old");
			Poco::File(path + ".mrk").renameTo(path + ".mrk" + ".old");
M
Merge  
Michael Kolupaev 已提交
502 503 504 505 506 507

			if (Poco::File(part_path + "checksums.txt").exists())
			{
				LOG_TRACE(log, "Renaming " << part_path + "checksums.txt" << " to " << part_path + "checksums.txt" + ".old");
				Poco::File(part_path + "checksums.txt").renameTo(part_path + "checksums.txt" + ".old");
			}
M
Merge  
Michael Kolupaev 已提交
508 509
		}
	}
M
Merge  
Michael Kolupaev 已提交
510

M
Merge  
Michael Kolupaev 已提交
511 512 513
	/// переименовываем временные столбцы
	for (DataPartPtr & part : parts)
	{
M
Merge  
Michael Kolupaev 已提交
514 515 516 517 518
		std::string part_path = full_path + part->name + '/';
		std::string name = escapeForFileName(out_column);
		std::string new_name = escapeForFileName(name_type.name);
		std::string path = part_path + name;
		std::string new_path = part_path + new_name;
M
Merge  
Michael Kolupaev 已提交
519 520 521 522 523 524
		if (Poco::File(path + ".bin").exists())
		{
			LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << new_path + ".bin");
			Poco::File(path + ".bin").renameTo(new_path + ".bin");
			LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << new_path + ".mrk");
			Poco::File(path + ".mrk").renameTo(new_path + ".mrk");
M
Merge  
Michael Kolupaev 已提交
525 526 527 528 529 530

			if (Poco::File(path + ".checksums.txt").exists())
			{
				LOG_TRACE(log, "Renaming " << path + ".checksums.txt" << " to " << part_path + ".checksums.txt");
				Poco::File(path + ".checksums.txt").renameTo(part_path + "checksums.txt");
			}
M
Merge  
Michael Kolupaev 已提交
531 532 533 534 535 536
		}
	}

	// удаляем старые столбцы
	for (DataPartPtr & part : parts)
	{
M
Merge  
Michael Kolupaev 已提交
537 538
		std::string part_path = full_path + part->name + '/';
		std::string path = part_path + escapeForFileName(name_type.name);
M
Merge  
Michael Kolupaev 已提交
539 540 541 542 543 544
		if (Poco::File(path + ".bin" + ".old").exists())
		{
			LOG_TRACE(log, "Removing old column " << path + ".bin" + ".old");
			Poco::File(path + ".bin" + ".old").remove();
			LOG_TRACE(log, "Removing old column " << path + ".mrk" + ".old");
			Poco::File(path + ".mrk" + ".old").remove();
M
Merge  
Michael Kolupaev 已提交
545 546 547 548 549 550

			if (Poco::File(part_path + "checksums.txt" + ".old").exists())
			{
				LOG_TRACE(log, "Removing old checksums " << part_path + "checksums.txt" + ".old");
				Poco::File(part_path + "checksums.txt" + ".old").remove();
			}
M
Merge  
Michael Kolupaev 已提交
551 552 553
		}
	}

554
	context.resetCaches();
M
Merge  
Michael Kolupaev 已提交
555 556 557 558 559

	{
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
		Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
		alterColumns(params, columns, context);
M
Merge  
Michael Kolupaev 已提交
560
	}
M
Merge  
Michael Kolupaev 已提交
561 562 563
}


M
Merge  
Michael Kolupaev 已提交
564
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
M
Merge  
Michael Kolupaev 已提交
565
{
M
Merge  
Michael Kolupaev 已提交
566 567 568 569 570 571
	auto removed = renameTempPartAndReplace(part, increment);
	if (!removed.empty())
	{
		LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
			<< " existing part(s) (including " << removed[0]->name << ")");
	}
M
Merge  
Michael Kolupaev 已提交
572 573
}

M
Merge  
Michael Kolupaev 已提交
574
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment)
M
Merge  
Michael Kolupaev 已提交
575 576 577 578 579 580
{
	LOG_TRACE(log, "Renaming.");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

581
	String old_path = getFullPath() + part->name + "/";
M
Merge  
Michael Kolupaev 已提交
582

M
Merge  
Michael Kolupaev 已提交
583
	/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
M
Merge  
Michael Kolupaev 已提交
584 585 586 587
	  * Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
	  *  содержат ещё не добавленный кусок.
	  */
	if (increment)
M
Merge  
Michael Kolupaev 已提交
588
		part->left = part->right = increment->get(false);
M
Merge  
Michael Kolupaev 已提交
589

590
	part->name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
M
Merge  
Michael Kolupaev 已提交
591

M
Merge  
Michael Kolupaev 已提交
592 593
	if (data_parts.count(part))
		throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
594

595
	String new_path = getFullPath() + part->name + "/";
M
Merge  
Michael Kolupaev 已提交
596 597 598 599

	/// Переименовываем кусок.
	Poco::File(old_path).renameTo(new_path);

M
Merge  
Michael Kolupaev 已提交
600 601 602 603 604 605 606 607
	DataPartsVector res;
	/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
	DataParts::iterator it = data_parts.lower_bound(part);
	/// Пойдем влево.
	while (it != data_parts.begin())
	{
		--it;
		if (!part->contains(**it))
M
Michael Kolupaev 已提交
608 609
		{
			++it;
M
Merge  
Michael Kolupaev 已提交
610
			break;
M
Michael Kolupaev 已提交
611
		}
M
Merge  
Michael Kolupaev 已提交
612
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
613
		(*it)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
614 615 616 617 618 619 620
		data_parts.erase(it++); /// Да, ++, а не --.
	}
	std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
	/// Пойдем вправо.
	while (it != data_parts.end() && part->contains(**it))
	{
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
621
		(*it)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
622 623 624
		data_parts.erase(it++);
	}

M
Merge  
Michael Kolupaev 已提交
625 626
	data_parts.insert(part);
	all_data_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
627 628

	return res;
M
Merge  
Michael Kolupaev 已提交
629 630
}

M
Merge  
Michael Kolupaev 已提交
631
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
M
Merge  
Michael Kolupaev 已提交
632 633
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
634
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
635
	if (!all_data_parts.erase(part))
M
Merge  
Michael Kolupaev 已提交
636
		throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
637
	data_parts.erase(part);
M
Merge  
Michael Kolupaev 已提交
638
	part->renameAddPrefix(prefix);
M
Merge  
Michael Kolupaev 已提交
639 640
}

M
Merge  
Michael Kolupaev 已提交
641 642 643 644 645 646 647
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	return data_parts;
}

M
Merge  
Michael Kolupaev 已提交
648 649 650 651 652 653 654
MergeTreeData::DataParts MergeTreeData::getAllDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);

	return all_data_parts;
}

M
Merge  
Michael Kolupaev 已提交
655
size_t MergeTreeData::getMaxPartsCountForMonth()
M
Merge  
Michael Kolupaev 已提交
656 657 658
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

M
Merge  
Michael Kolupaev 已提交
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
	size_t res = 0;
	size_t cur_count = 0;
	DayNum_t cur_month = DayNum_t(0);

	for (const auto & part : data_parts)
	{
		if (part->left_month == cur_month)
		{
			++cur_count;
		}
		else
		{
			cur_month = part->left_month;
			cur_count = 1;
		}

		res = std::max(res, cur_count);
	}

	return res;
M
Merge  
Michael Kolupaev 已提交
679 680
}

M
Merge  
Michael Kolupaev 已提交
681
MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_name, bool including_inactive)
M
Merge  
Michael Kolupaev 已提交
682 683
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
684
	ActiveDataPartSet::parsePartName(part_name, *tmp_part);
M
Merge  
Michael Kolupaev 已提交
685

M
Merge  
Michael Kolupaev 已提交
686 687 688
	Poco::ScopedLock<Poco::FastMutex> lock(including_inactive ? all_data_parts_mutex : data_parts_mutex);

	DataParts & parts = including_inactive ? all_data_parts : data_parts;
M
Merge  
Michael Kolupaev 已提交
689 690

	/// Кусок может покрываться только предыдущим или следующим в data_parts.
M
Merge  
Michael Kolupaev 已提交
691
	DataParts::iterator it = parts.lower_bound(tmp_part);
M
Merge  
Michael Kolupaev 已提交
692

M
Merge  
Michael Kolupaev 已提交
693
	if (it != parts.end())
M
Merge  
Michael Kolupaev 已提交
694 695 696 697 698 699 700
	{
		if ((*it)->name == part_name)
			return *it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

M
Merge  
Michael Kolupaev 已提交
701
	if (it != parts.begin())
M
Merge  
Michael Kolupaev 已提交
702 703 704 705 706 707 708 709 710
	{
		--it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

	return nullptr;
}

711

M
Merge  
Michael Kolupaev 已提交
712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{
	if (is_compressed && have_uncompressed)
	{
		if (!rhs.is_compressed)
			throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
		if (rhs.uncompressed_size != uncompressed_size)
			throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
		if (rhs.uncompressed_hash != uncompressed_hash)
			throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
		return;
	}
	if (rhs.file_size != file_size)
		throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
	if (rhs.file_hash != file_hash)
		throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

void MergeTreeData::DataPart::Checksums::Checksum::checkSize(const String & path) const
{
	Poco::File file(path);
	if (!file.exists())
		throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
	size_t size = file.getSize();
	if (size != file_size)
		throw Exception(path + " has unexpected size: " + DB::toString(size) + " instead of " + DB::toString(file_size),
			ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}

void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool have_uncompressed) const
742
{
M
Merge  
Michael Kolupaev 已提交
743
	for (const auto & it : rhs.files)
744 745 746
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
747
		if (!files.count(name))
748 749 750
			throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
	}

M
Merge  
Michael Kolupaev 已提交
751
	for (const auto & it : files)
752 753 754
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
755 756
		auto jt = rhs.files.find(name);
		if (jt == rhs.files.end())
757 758
			throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);

M
Merge  
Michael Kolupaev 已提交
759 760 761
		it.second.checkEqual(jt->second, have_uncompressed, name);
	}
}
M
Merge  
Michael Kolupaev 已提交
762

M
Merge  
Michael Kolupaev 已提交
763 764 765 766 767 768
void MergeTreeData::DataPart::Checksums::checkSizes(const String & path) const
{
	for (const auto & it : files)
	{
		const String & name = it.first;
		it.second.checkSize(path + name);
769 770 771
	}
}

772
bool MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
773
{
M
Merge  
Michael Kolupaev 已提交
774
	files.clear();
775 776
	size_t count;

M
Merge  
Michael Kolupaev 已提交
777 778 779 780 781
	DB::assertString("checksums format version: ", in);
	int format_version;
	DB::readText(format_version, in);
	if (format_version < 1 || format_version > 2)
		throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
782 783
	if (format_version == 1)
		return false;
M
Merge  
Michael Kolupaev 已提交
784
	DB::assertString("\n", in);
785 786 787 788 789 790 791 792 793 794
	DB::readText(count, in);
	DB::assertString(" files:\n", in);

	for (size_t i = 0; i < count; ++i)
	{
		String name;
		Checksum sum;

		DB::readString(name, in);
		DB::assertString("\n\tsize: ", in);
M
Merge  
Michael Kolupaev 已提交
795
		DB::readText(sum.file_size, in);
796
		DB::assertString("\n\thash: ", in);
M
Merge  
Michael Kolupaev 已提交
797
		DB::readText(sum.file_hash.first, in);
798
		DB::assertString(" ", in);
M
Merge  
Michael Kolupaev 已提交
799
		DB::readText(sum.file_hash.second, in);
800 801 802
		DB::assertString("\n\tcompressed: ", in);
		DB::readText(sum.is_compressed, in);
		if (sum.is_compressed)
M
Merge  
Michael Kolupaev 已提交
803
		{
804 805 806 807 808 809
			DB::assertString("\n\tuncompressed size: ", in);
			DB::readText(sum.uncompressed_size, in);
			DB::assertString("\n\tuncompressed hash: ", in);
			DB::readText(sum.uncompressed_hash.first, in);
			DB::assertString(" ", in);
			DB::readText(sum.uncompressed_hash.second, in);
M
Merge  
Michael Kolupaev 已提交
810
		}
811
		DB::assertString("\n", in);
812

M
Merge  
Michael Kolupaev 已提交
813
		files.insert(std::make_pair(name, sum));
814
	}
815 816

	return true;
817 818 819 820
}

void MergeTreeData::DataPart::Checksums::writeText(WriteBuffer & out) const
{
M
Merge  
Michael Kolupaev 已提交
821
	DB::writeString("checksums format version: 2\n", out);
M
Merge  
Michael Kolupaev 已提交
822
	DB::writeText(files.size(), out);
823 824
	DB::writeString(" files:\n", out);

M
Merge  
Michael Kolupaev 已提交
825
	for (const auto & it : files)
826
	{
M
Merge  
Michael Kolupaev 已提交
827 828 829
		const String & name = it.first;
		const Checksum & sum = it.second;
		DB::writeString(name, out);
830
		DB::writeString("\n\tsize: ", out);
M
Merge  
Michael Kolupaev 已提交
831
		DB::writeText(sum.file_size, out);
832
		DB::writeString("\n\thash: ", out);
M
Merge  
Michael Kolupaev 已提交
833
		DB::writeText(sum.file_hash.first, out);
834
		DB::writeString(" ", out);
M
Merge  
Michael Kolupaev 已提交
835 836 837
		DB::writeText(sum.file_hash.second, out);
		DB::writeString("\n\tcompressed: ", out);
		DB::writeText(sum.is_compressed, out);
838
		DB::writeString("\n", out);
M
Merge  
Michael Kolupaev 已提交
839 840 841 842 843 844 845 846 847 848
		if (sum.is_compressed)
		{
			DB::writeString("\tuncompressed size: ", out);
			DB::writeText(sum.uncompressed_size, out);
			DB::writeString("\n\tuncompressed hash: ", out);
			DB::writeText(sum.uncompressed_hash.first, out);
			DB::writeString(" ", out);
			DB::writeText(sum.uncompressed_hash.second, out);
			DB::writeString("\n", out);
		}
849 850 851
	}
}

M
Merge  
Michael Kolupaev 已提交
852
}