MergeTreeData.cpp 30.7 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/MergeTree/MergeTreeData.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <Yandex/time2str.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <Poco/Ext/ScopedTry.h>
M
Merge  
Michael Kolupaev 已提交
4
#include <DB/Interpreters/ExpressionAnalyzer.h>
M
Merge  
Michael Kolupaev 已提交
5
#include <DB/Storages/MergeTree/MergeTreeReader.h>
M
Merge  
Michael Kolupaev 已提交
6 7
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
8 9 10
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
M
Merge  
Michael Kolupaev 已提交
11
#include <DB/DataStreams/copyData.h>
M
Merge  
Michael Kolupaev 已提交
12
#include <DB/IO/WriteBufferFromFile.h>
P
Merge  
Pavel Kartavyy 已提交
13
#include <algorithm>
M
Merge  
Michael Kolupaev 已提交
14 15 16 17 18 19 20



namespace DB
{

MergeTreeData::MergeTreeData(
M
Merge  
Michael Kolupaev 已提交
21
	const String & full_path_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
22 23 24 25 26 27
	const Context & context_,
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_, const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	Mode mode_,
	const String & sign_column_,
M
Merge  
Michael Kolupaev 已提交
28
	const MergeTreeSettings & settings_,
M
Merge  
Michael Kolupaev 已提交
29
	const String & log_name_,
M
Merge  
Michael Kolupaev 已提交
30 31
	bool require_part_metadata_,
	BrokenPartCallback broken_part_callback_)
M
Merge  
Michael Kolupaev 已提交
32
	: context(context_),
M
Merge  
Michael Kolupaev 已提交
33 34 35
	date_column_name(date_column_name_), sampling_expression(sampling_expression_),
	index_granularity(index_granularity_),
	mode(mode_), sign_column(sign_column_),
M
Merge  
Michael Kolupaev 已提交
36
	settings(settings_), primary_expr_ast(primary_expr_ast_->clone()),
M
Merge  
Michael Kolupaev 已提交
37
	require_part_metadata(require_part_metadata_),
M
Merge  
Michael Kolupaev 已提交
38 39 40
	full_path(full_path_), columns(columns_),
	broken_part_callback(broken_part_callback_),
	log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
M
Merge  
Michael Kolupaev 已提交
41 42 43 44 45 46
{
	/// создаём директорию, если её нет
	Poco::File(full_path).createDirectories();

	/// инициализируем описание сортировки
	sort_descr.reserve(primary_expr_ast->children.size());
M
Merge  
Michael Kolupaev 已提交
47
	for (const ASTPtr & ast : primary_expr_ast->children)
M
Merge  
Michael Kolupaev 已提交
48
	{
M
Merge  
Michael Kolupaev 已提交
49
		String name = ast->getColumnName();
M
Merge  
Michael Kolupaev 已提交
50 51 52 53 54 55 56 57 58
		sort_descr.push_back(SortColumnDescription(name, 1));
	}

	primary_expr = ExpressionAnalyzer(primary_expr_ast, context, *columns).getActions(false);

	ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, *columns).getActions(true);
	primary_key_sample = projected_expr->getSampleBlock();

	loadDataParts();
M
Merge  
Michael Kolupaev 已提交
59
}
M
Merge  
Michael Kolupaev 已提交
60

M
Merge  
Michael Kolupaev 已提交
61 62
UInt64 MergeTreeData::getMaxDataPartIndex()
{
M
Merge  
Michael Kolupaev 已提交
63 64 65 66 67
	UInt64 max_part_id = 0;
	for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
	{
		max_part_id = std::max(max_part_id, (*it)->right);
	}
M
Merge  
Michael Kolupaev 已提交
68
	return max_part_id;
M
Merge  
Michael Kolupaev 已提交
69 70
}

M
Merge  
Michael Kolupaev 已提交
71
std::string MergeTreeData::getModePrefix() const
M
Merge  
Michael Kolupaev 已提交
72
{
M
Merge  
Michael Kolupaev 已提交
73
	switch (mode)
M
Merge  
Michael Kolupaev 已提交
74
	{
M
Merge  
Michael Kolupaev 已提交
75 76 77
		case Ordinary: 		return "";
		case Collapsing: 	return "Collapsing";
		case Summing: 		return "Summing";
S
Merge  
Sergey Fedorov 已提交
78
		case Aggregating: 	return "Aggregating";
M
Merge  
Michael Kolupaev 已提交
79

M
Merge  
Michael Kolupaev 已提交
80 81
		default:
			throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94
	}
}


void MergeTreeData::loadDataParts()
{
	LOG_DEBUG(log, "Loading data parts");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

	data_parts.clear();

M
Merge  
Michael Kolupaev 已提交
95
	Strings all_file_names;
M
Merge  
Michael Kolupaev 已提交
96 97
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
M
Merge  
Michael Kolupaev 已提交
98
		all_file_names.push_back(it.name());
M
Merge  
Michael Kolupaev 已提交
99

M
Merge  
Michael Kolupaev 已提交
100 101 102
	Strings part_file_names;
	for (const String & file_name : all_file_names)
	{
M
Merge  
Michael Kolupaev 已提交
103 104 105 106 107
		/// Удаляем временные директории старше суток.
		if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
			continue;

		if (0 == file_name.compare(0, strlen("old_"), "old_"))
M
Merge  
Michael Kolupaev 已提交
108 109 110 111 112 113
		{
			String new_file_name = file_name.substr(strlen("old_"));
			LOG_WARNING(log, "Renaming " << file_name << " to " << new_file_name << " for compatibility reasons");
			Poco::File(full_path + file_name).renameTo(full_path + new_file_name);
			part_file_names.push_back(new_file_name);
		}
M
Merge  
Michael Kolupaev 已提交
114
		else
M
Merge  
Michael Kolupaev 已提交
115
		{
M
Merge  
Michael Kolupaev 已提交
116
			part_file_names.push_back(file_name);
M
Merge  
Michael Kolupaev 已提交
117
		}
M
Merge  
Michael Kolupaev 已提交
118 119
	}

M
Merge  
Michael Kolupaev 已提交
120 121
	DataPartsVector broken_parts_to_remove;

M
Merge  
Michael Kolupaev 已提交
122
	Poco::RegularExpression::MatchVec matches;
M
Merge  
Michael Kolupaev 已提交
123
	for (const String & file_name : part_file_names)
M
Merge  
Michael Kolupaev 已提交
124
	{
125
		if (!ActiveDataPartSet::isPartDirectory(file_name, matches))
M
Merge  
Michael Kolupaev 已提交
126 127
			continue;

M
Merge  
Michael Kolupaev 已提交
128
		MutableDataPartPtr part = std::make_shared<DataPart>(*this);
129
		ActiveDataPartSet::parsePartName(file_name, *part, &matches);
M
Merge  
Michael Kolupaev 已提交
130 131
		part->name = file_name;

M
Merge  
Michael Kolupaev 已提交
132 133 134 135
		bool broken = false;

		try
		{
M
Merge  
Michael Kolupaev 已提交
136
			part->loadColumns();
M
Merge  
Michael Kolupaev 已提交
137
			part->loadChecksums();
M
Merge  
Michael Kolupaev 已提交
138
			part->loadIndex();
M
Merge  
Michael Kolupaev 已提交
139 140 141 142 143 144 145 146
			part->checkNotBroken();
		}
		catch (...)
		{
			broken = true;
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
147
		/// Игнорируем и, возможно, удаляем битые куски, которые могут образовываться после грубого перезапуска сервера.
M
Merge  
Michael Kolupaev 已提交
148
		if (broken)
M
Merge  
Michael Kolupaev 已提交
149 150 151 152
		{
			if (part->level == 0)
			{
				/// Восстановить куски нулевого уровня невозможно.
M
Merge  
Michael Kolupaev 已提交
153
				LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because is't impossible to repair.");
M
Merge  
Michael Kolupaev 已提交
154
				broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
155 156 157
			}
			else
			{
M
Merge  
Michael Kolupaev 已提交
158 159 160 161 162 163 164 165 166 167
				/// Посмотрим, сколько кусков покрыты битым. Если хотя бы два, предполагаем, что битый кусок образован их
				///  слиянием, и мы ничего не потеряем, если его удалим.
				int contained_parts = 0;

				LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");

				for (const String & contained_name : part_file_names)
				{
					if (contained_name == file_name)
						continue;
168
					if (!ActiveDataPartSet::isPartDirectory(contained_name, matches))
M
Merge  
Michael Kolupaev 已提交
169 170
						continue;
					DataPart contained_part(*this);
171
					ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
M
Merge  
Michael Kolupaev 已提交
172 173 174 175 176 177 178 179 180 181
					if (part->contains(contained_part))
					{
						LOG_ERROR(log, "Found part " << full_path + contained_name);
						++contained_parts;
					}
				}

				if (contained_parts >= 2)
				{
					LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it covers at least 2 other parts");
M
Merge  
Michael Kolupaev 已提交
182
					broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
183 184 185 186 187 188
				}
				else
				{
					LOG_ERROR(log, "Not removing broken part " << full_path + file_name
						<< " because it covers less than 2 parts. You need to resolve this manually");
				}
M
Merge  
Michael Kolupaev 已提交
189 190 191 192 193 194 195 196 197 198
			}

			continue;
		}

		part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();

		data_parts.insert(part);
	}

M
Merge  
Michael Kolupaev 已提交
199 200 201 202 203 204 205
	if (broken_parts_to_remove.size() > 2)
		throw Exception("Suspiciously many (" + toString(broken_parts_to_remove.size()) + ") broken parts to remove.",
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);

	for (const auto & part : broken_parts_to_remove)
		part->remove();

M
Merge  
Michael Kolupaev 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
	all_data_parts = data_parts;

	/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
	  *  но по каким-то причинам остались лежать в файловой системе.
	  * Удаление файлов будет произведено потом в методе clearOldParts.
	  */

	if (data_parts.size() >= 2)
	{
		DataParts::iterator prev_jt = data_parts.begin();
		DataParts::iterator curr_jt = prev_jt;
		++curr_jt;
		while (curr_jt != data_parts.end())
		{
			/// Куски данных за разные месяцы рассматривать не будем
			if ((*curr_jt)->left_month != (*curr_jt)->right_month
				|| (*curr_jt)->right_month != (*prev_jt)->left_month
				|| (*prev_jt)->left_month != (*prev_jt)->right_month)
			{
				++prev_jt;
				++curr_jt;
				continue;
			}

			if ((*curr_jt)->contains(**prev_jt))
			{
M
Merge  
Michael Kolupaev 已提交
232
				(*prev_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
233 234 235 236 237 238
				data_parts.erase(prev_jt);
				prev_jt = curr_jt;
				++curr_jt;
			}
			else if ((*prev_jt)->contains(**curr_jt))
			{
M
Merge  
Michael Kolupaev 已提交
239
				(*curr_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253
				data_parts.erase(curr_jt++);
			}
			else
			{
				++prev_jt;
				++curr_jt;
			}
		}
	}

	LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
}


M
Merge  
Michael Kolupaev 已提交
254
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
M
Merge  
Michael Kolupaev 已提交
255 256
{
	Poco::ScopedTry<Poco::FastMutex> lock;
M
Merge  
Michael Kolupaev 已提交
257
	DataPartsVector res;
M
Merge  
Michael Kolupaev 已提交
258 259 260

	/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
	if (!lock.lock(&all_data_parts_mutex))
M
Merge  
Michael Kolupaev 已提交
261
		return res;
M
Merge  
Michael Kolupaev 已提交
262

M
Merge  
Michael Kolupaev 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
	/// Удаляем временные директории старше суток.
	Strings all_file_names;
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
		all_file_names.push_back(it.name());

	for (const String & file_name : all_file_names)
	{
		if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
		{
			Poco::File tmp_dir(full_path + file_name);

			if (tmp_dir.isDirectory() && tmp_dir.getLastModified().epochTime() + 86400 < time(0))
			{
				LOG_WARNING(log, "Removing temporary directory " << full_path << file_name);
				Poco::File(full_path + file_name).remove(true);
			}

			continue;
		}
	}

M
Merge  
Michael Kolupaev 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
	time_t now = time(0);
	for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
	{
		int ref_count = it->use_count();
		if (ref_count == 1 && /// После этого ref_count не может увеличиться.
			(*it)->remove_time < now &&
			now - (*it)->remove_time > settings.old_parts_lifetime)
		{
			res.push_back(*it);
			all_data_parts.erase(it++);
		}
		else
			++it;
	}

M
Merge  
Michael Kolupaev 已提交
300
	return res;
M
Merge  
Michael Kolupaev 已提交
301 302
}

M
Merge  
Michael Kolupaev 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
void MergeTreeData::addOldParts(const MergeTreeData::DataPartsVector & parts)
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
	all_data_parts.insert(parts.begin(), parts.end());
}

void MergeTreeData::clearOldParts()
{
	auto parts_to_remove = grabOldParts();

	for (DataPartPtr part : parts_to_remove)
	{
		LOG_DEBUG(log, "Removing part " << part->name);
		part->remove();
	}
}

M
Merge  
Michael Kolupaev 已提交
320
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
M
Merge  
Michael Kolupaev 已提交
321
{
M
Merge  
Michael Kolupaev 已提交
322 323 324 325 326 327
	if (move_data)
	{
		Poco::File(full_path).renameTo(new_full_path);
		/// Если данные перемещать не нужно, значит их переместил кто-то другой. Расчитываем, что он еще и сбросил кеши.
		context.resetCaches();
	}
328

M
Merge  
Michael Kolupaev 已提交
329
	full_path = new_full_path;
M
Merge  
Michael Kolupaev 已提交
330 331
}

M
Merge  
Michael Kolupaev 已提交
332
void MergeTreeData::dropAllData()
M
Merge  
Michael Kolupaev 已提交
333 334 335 336
{
	data_parts.clear();
	all_data_parts.clear();

337
	context.resetCaches();
338

M
Merge  
Michael Kolupaev 已提交
339 340 341 342
	Poco::File(full_path).remove(true);
}


M
Merge  
Michael Kolupaev 已提交
343
void MergeTreeData::checkAlter(const AlterCommands & params)
M
Merge  
Michael Kolupaev 已提交
344
{
M
Merge  
Michael Kolupaev 已提交
345 346 347
	/// Проверим, что указанные преобразования можно совершить над списком столбцов без учета типов.
	NamesAndTypesList new_columns = *columns;
	params.apply(new_columns);
M
Merge  
Michael Kolupaev 已提交
348

M
Merge  
Michael Kolupaev 已提交
349 350 351 352 353
	/// Список столбцов, которые нельзя трогать.
	/// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
	Names keys = primary_expr->getRequiredColumns();
	keys.push_back(sign_column);
	std::sort(keys.begin(), keys.end());
M
Merge  
Michael Kolupaev 已提交
354

M
Merge  
Michael Kolupaev 已提交
355
	for (const AlterCommand & command : params)
356
	{
M
Merge  
Michael Kolupaev 已提交
357 358
		if (std::binary_search(keys.begin(), keys.end(), command.column_name))
			throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
359 360
	}

M
Merge  
Michael Kolupaev 已提交
361 362 363 364
	/// Проверим, что преобразования типов возможны.
	ExpressionActionsPtr unused_expression;
	NameToNameMap unused_map;
	createConvertExpression(nullptr, *columns, new_columns, unused_expression, unused_map);
M
Merge  
Michael Kolupaev 已提交
365 366
}

M
Merge  
Michael Kolupaev 已提交
367 368
void MergeTreeData::createConvertExpression(DataPartPtr part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
	ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map)
M
Merge  
Michael Kolupaev 已提交
369
{
M
Merge  
Michael Kolupaev 已提交
370 371
	out_expression = nullptr;
	out_rename_map.clear();
M
Merge  
Michael Kolupaev 已提交
372

M
Merge  
Michael Kolupaev 已提交
373 374 375
	typedef std::map<String, DataTypePtr> NameToType;
	NameToType new_types;
	for (const NameAndTypePair & column : new_columns)
M
Merge  
Michael Kolupaev 已提交
376
	{
M
Merge  
Michael Kolupaev 已提交
377
		new_types[column.name] = column.type;
M
Merge  
Michael Kolupaev 已提交
378
	}
M
Merge  
Michael Kolupaev 已提交
379

M
Merge  
Michael Kolupaev 已提交
380 381 382
	/// Сколько столбцов сейчас в каждой вложенной структуре. Столбцы не из вложенных структур сюда тоже попадут и не помешают.
	std::map<String, int> nested_table_counts;
	for (const NameAndTypePair & column : old_columns)
M
Merge  
Michael Kolupaev 已提交
383
	{
M
Merge  
Michael Kolupaev 已提交
384
		++nested_table_counts[DataTypeNested::extractNestedTableName(column.name)];
M
Merge  
Michael Kolupaev 已提交
385
	}
386

M
Merge  
Michael Kolupaev 已提交
387
	for (const NameAndTypePair & column : old_columns)
M
Merge  
Michael Kolupaev 已提交
388
	{
M
Merge  
Michael Kolupaev 已提交
389
		if (!new_types.count(column.name))
390
		{
M
Merge  
Michael Kolupaev 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
			if (!part || part->hasColumnFiles(column.name))
			{
				/// Столбец нужно удалить.

				String escaped_column = escapeForFileName(column.name);
				out_rename_map[escaped_column + ".bin"] = "";
				out_rename_map[escaped_column + ".mrk"] = "";

				/// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами.
				if (typeid_cast<const DataTypeArray *>(&*column.type))
				{
					String nested_table = DataTypeNested::extractNestedTableName(column.name);
					/// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы.
					if (!--nested_table_counts[nested_table])
					{
						String escaped_nested_table = escapeForFileName(nested_table);
						out_rename_map[escaped_nested_table + ".size0.bin"] = "";
						out_rename_map[escaped_nested_table + ".size0.mrk"] = "";
					}
				}
			}
412
		}
M
Merge  
Michael Kolupaev 已提交
413 414 415 416 417 418 419 420 421 422 423
		else
		{
			String new_type_name = new_types[column.name]->getName();

			if (new_type_name != column.type->getName() &&
				(!part || part->hasColumnFiles(column.name)))
			{
				/// Нужно изменить тип столбца.

				if (!out_expression)
					out_expression = new ExpressionActions(NamesAndTypesList(), context.getSettingsRef());
M
Merge  
Michael Kolupaev 已提交
424

M
Merge  
Michael Kolupaev 已提交
425 426 427 428 429 430 431 432 433 434 435 436 437
				out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name));

				FunctionPtr function = context.getFunctionFactory().get("to" + new_type_name, context);
				Names out_names;
				out_expression->add(ExpressionAction::applyFunction(function, Names(1, column.name)), out_names);
				out_expression->add(ExpressionAction::removeColumn(column.name));

				String escaped_expr = escapeForFileName(out_names[0]);
				String escaped_column = escapeForFileName(column.name);
				out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin";
				out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk";
			}
		}
M
Merge  
Michael Kolupaev 已提交
438 439
	}
}
M
Merge  
Michael Kolupaev 已提交
440

M
Merge  
Michael Kolupaev 已提交
441
MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(DataPartPtr part, const NamesAndTypesList & new_columns, bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
442
{
M
Merge  
Michael Kolupaev 已提交
443
	ExpressionActionsPtr expression;
M
Merge  
Michael Kolupaev 已提交
444
	AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Блокирует изменение куска.
M
Merge  
Michael Kolupaev 已提交
445
	createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map);
M
Merge  
Michael Kolupaev 已提交
446

M
Merge  
Michael Kolupaev 已提交
447 448 449 450 451 452 453 454
	if (!skip_sanity_checks && transaction->rename_map.size() > 5)
	{
		transaction->clear();

		throw Exception("Suspiciously many (" + toString(transaction->rename_map.size()) + ") files need to be modified in part " + part->name
						+ ". Aborting just in case");
	}

M
Merge  
Michael Kolupaev 已提交
455
	if (transaction->rename_map.empty())
M
Merge  
Michael Kolupaev 已提交
456
	{
M
Merge  
Michael Kolupaev 已提交
457
		transaction->clear();
M
Merge  
Michael Kolupaev 已提交
458
		return nullptr;
M
Merge  
Michael Kolupaev 已提交
459
	}
M
Merge  
Michael Kolupaev 已提交
460

M
Merge  
Michael Kolupaev 已提交
461 462 463 464
	DataPart::Checksums add_checksums;

	/// Применим выражение и запишем результат во временные файлы.
	if (expression)
M
Merge  
Michael Kolupaev 已提交
465 466
	{
		MarkRanges ranges(1, MarkRange(0, part->size));
M
Merge  
Michael Kolupaev 已提交
467
		BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/',
M
Merge  
Michael Kolupaev 已提交
468
			DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "", false);
M
Merge  
Michael Kolupaev 已提交
469
		ExpressionBlockInputStream in(part_in, expression);
M
Merge  
Michael Kolupaev 已提交
470
		MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true);
M
Merge  
Michael Kolupaev 已提交
471
		in.readPrefix();
M
Merge  
Michael Kolupaev 已提交
472
		out.writePrefix();
M
Merge  
Michael Kolupaev 已提交
473

M
Merge  
Michael Kolupaev 已提交
474 475
		while (Block b = in.read())
			out.write(b);
M
Merge  
Michael Kolupaev 已提交
476

M
Merge  
Michael Kolupaev 已提交
477 478 479
		in.readSuffix();
		add_checksums = out.writeSuffixAndGetChecksums();
	}
M
Merge  
Michael Kolupaev 已提交
480

M
Merge  
Michael Kolupaev 已提交
481
	/// Обновим контрольные суммы.
M
Merge  
Michael Kolupaev 已提交
482 483 484 485 486 487
	DataPart::Checksums new_checksums = part->checksums;
	for (auto it : transaction->rename_map)
	{
		if (it.second == "")
		{
			new_checksums.files.erase(it.first);
M
Merge  
Michael Kolupaev 已提交
488
		}
M
Merge  
Michael Kolupaev 已提交
489
		else
M
Merge  
Michael Kolupaev 已提交
490
		{
M
Merge  
Michael Kolupaev 已提交
491
			new_checksums.files[it.second] = add_checksums.files[it.first];
M
Merge  
Michael Kolupaev 已提交
492 493
		}
	}
M
Merge  
Michael Kolupaev 已提交
494

M
Merge  
Michael Kolupaev 已提交
495 496
	/// Запишем обновленные контрольные суммы во временный файл
	if (!part->checksums.empty())
M
Merge  
Michael Kolupaev 已提交
497
	{
M
Merge  
Michael Kolupaev 已提交
498
		transaction->new_checksums = new_checksums;
M
Merge  
Michael Kolupaev 已提交
499 500 501
		WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
		new_checksums.writeText(checksums_file);
		transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
M
Merge  
Michael Kolupaev 已提交
502
	}
M
Merge  
Michael Kolupaev 已提交
503

M
Merge  
Michael Kolupaev 已提交
504 505
	/// Запишем обновленный список столбцов во временный файл.
	{
M
Merge  
Michael Kolupaev 已提交
506
		transaction->new_columns = new_columns.filter(part->columns.getNames());
M
Merge  
Michael Kolupaev 已提交
507 508 509 510 511
		WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
		transaction->new_columns.writeText(columns_file);
		transaction->rename_map["columns.txt.tmp"] = "columns.txt";
	}

M
Merge  
Michael Kolupaev 已提交
512 513
	return transaction;
}
M
Merge  
Michael Kolupaev 已提交
514

M
Merge  
Michael Kolupaev 已提交
515 516 517 518 519
void MergeTreeData::AlterDataPartTransaction::commit()
{
	if (!data_part)
		return;
	try
M
Merge  
Michael Kolupaev 已提交
520
	{
M
Merge  
Michael Kolupaev 已提交
521 522
		Poco::ScopedWriteRWLock lock(data_part->columns_lock);

M
Merge  
Michael Kolupaev 已提交
523
		String path = data_part->storage.full_path + data_part->name + "/";
M
Merge  
Michael Kolupaev 已提交
524 525

		/// 1) Переименуем старые файлы.
M
Merge  
Michael Kolupaev 已提交
526
		for (auto it : rename_map)
M
Merge  
Michael Kolupaev 已提交
527
		{
M
Merge  
Michael Kolupaev 已提交
528 529 530 531
			String name = it.second.empty() ? it.first : it.second;
			Poco::File(path + name).renameTo(path + name + ".tmp2");
		}

532
		/// 2) Переместим на их место новые и обновим метаданные в оперативке.
M
Merge  
Michael Kolupaev 已提交
533 534 535
		for (auto it : rename_map)
		{
			if (!it.second.empty())
M
Merge  
Michael Kolupaev 已提交
536
			{
M
Merge  
Michael Kolupaev 已提交
537
				Poco::File(path + it.first).renameTo(path + it.second);
M
Merge  
Michael Kolupaev 已提交
538
			}
M
Merge  
Michael Kolupaev 已提交
539
		}
540 541 542 543

		DataPart & mutable_part = const_cast<DataPart &>(*data_part);
		mutable_part.checksums = new_checksums;
		mutable_part.columns = new_columns;
M
Merge  
Michael Kolupaev 已提交
544 545 546 547 548 549 550 551

		/// 3) Удалим старые файлы.
		for (auto it : rename_map)
		{
			String name = it.second.empty() ? it.first : it.second;
			Poco::File(path + name + ".tmp2").remove();
		}

M
Merge  
Michael Kolupaev 已提交
552 553
		mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);

M
Merge  
Michael Kolupaev 已提交
554 555 556
		/// TODO: можно не сбрасывать кеши при добавлении столбца.
		data_part->storage.context.resetCaches();

M
Merge  
Michael Kolupaev 已提交
557
		clear();
M
Merge  
Michael Kolupaev 已提交
558 559 560 561
	}
	catch (...)
	{
		/// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
M
Merge  
Michael Kolupaev 已提交
562
		clear();
M
Merge  
Michael Kolupaev 已提交
563
		throw;
M
Merge  
Michael Kolupaev 已提交
564
	}
M
Merge  
Michael Kolupaev 已提交
565
}
M
Merge  
Michael Kolupaev 已提交
566

M
Merge  
Michael Kolupaev 已提交
567
MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
M
Merge  
Michael Kolupaev 已提交
568
{
M
Merge  
Michael Kolupaev 已提交
569
	try
M
Merge  
Michael Kolupaev 已提交
570
	{
M
Merge  
Michael Kolupaev 已提交
571 572 573 574
		if (!data_part)
			return;

		LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->name);
M
Merge  
Michael Kolupaev 已提交
575

M
Merge  
Michael Kolupaev 已提交
576 577 578 579
		String path = data_part->storage.full_path + data_part->name + "/";
		for (auto it : rename_map)
		{
			if (!it.second.empty())
M
Merge  
Michael Kolupaev 已提交
580
			{
M
Merge  
Michael Kolupaev 已提交
581 582
				try
				{
M
Merge  
Michael Kolupaev 已提交
583 584 585
					Poco::File file(path + it.first);
					if (file.exists())
						file.remove();
M
Merge  
Michael Kolupaev 已提交
586 587 588 589 590
				}
				catch (Poco::Exception & e)
				{
					LOG_WARNING(data_part->storage.log, "Can't remove " << path + it.first << ": " << e.displayText());
				}
M
Merge  
Michael Kolupaev 已提交
591
			}
M
Merge  
Michael Kolupaev 已提交
592 593
		}
	}
M
Merge  
Michael Kolupaev 已提交
594
	catch (...)
M
Merge  
Michael Kolupaev 已提交
595
	{
M
Merge  
Michael Kolupaev 已提交
596
		tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
597
	}
M
Merge  
Michael Kolupaev 已提交
598 599 600
}


M
Merge  
Michael Kolupaev 已提交
601
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment, Transaction * out_transaction)
M
Merge  
Michael Kolupaev 已提交
602
{
M
Merge  
Michael Kolupaev 已提交
603
	auto removed = renameTempPartAndReplace(part, increment, out_transaction);
M
Merge  
Michael Kolupaev 已提交
604 605 606 607 608
	if (!removed.empty())
	{
		LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
			<< " existing part(s) (including " << removed[0]->name << ")");
	}
M
Merge  
Michael Kolupaev 已提交
609 610
}

M
Merge  
Michael Kolupaev 已提交
611 612
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
	MutableDataPartPtr part, Increment * increment, Transaction * out_transaction)
M
Merge  
Michael Kolupaev 已提交
613
{
M
Merge  
Michael Kolupaev 已提交
614 615 616
	if (out_transaction && out_transaction->data)
		throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid");

617
	LOG_TRACE(log, "Renaming " << part->name << ".");
M
Merge  
Michael Kolupaev 已提交
618 619 620 621

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

M
Merge  
Michael Kolupaev 已提交
622 623
	String old_name = part->name;
	String old_path = getFullPath() + old_name + "/";
M
Merge  
Michael Kolupaev 已提交
624

M
Merge  
Michael Kolupaev 已提交
625
	/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
M
Merge  
Michael Kolupaev 已提交
626 627 628 629
	  * Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
	  *  содержат ещё не добавленный кусок.
	  */
	if (increment)
M
Merge  
Michael Kolupaev 已提交
630
		part->left = part->right = increment->get(false);
M
Merge  
Michael Kolupaev 已提交
631

M
Merge  
Michael Kolupaev 已提交
632
	String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
M
Merge  
Michael Kolupaev 已提交
633

M
Merge  
Michael Kolupaev 已提交
634 635 636 637 638
	part->is_temp = false;
	part->name = new_name;
	bool duplicate = data_parts.count(part);
	part->name = old_name;
	part->is_temp = true;
M
Merge  
Michael Kolupaev 已提交
639

M
Merge  
Michael Kolupaev 已提交
640 641 642 643
	if (duplicate)
		throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);

	String new_path = getFullPath() + new_name + "/";
M
Merge  
Michael Kolupaev 已提交
644 645 646 647

	/// Переименовываем кусок.
	Poco::File(old_path).renameTo(new_path);

M
Merge  
Michael Kolupaev 已提交
648 649 650
	part->is_temp = false;
	part->name = new_name;

M
Merge  
Michael Kolupaev 已提交
651
	bool obsolete = false; /// Покрыт ли part каким-нибудь куском.
M
Merge  
Michael Kolupaev 已提交
652 653 654 655 656 657 658 659
	DataPartsVector res;
	/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
	DataParts::iterator it = data_parts.lower_bound(part);
	/// Пойдем влево.
	while (it != data_parts.begin())
	{
		--it;
		if (!part->contains(**it))
M
Michael Kolupaev 已提交
660
		{
M
Merge  
Michael Kolupaev 已提交
661 662
			if ((*it)->contains(*part))
				obsolete = true;
M
Michael Kolupaev 已提交
663
			++it;
M
Merge  
Michael Kolupaev 已提交
664
			break;
M
Michael Kolupaev 已提交
665
		}
M
Merge  
Michael Kolupaev 已提交
666
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
667
		(*it)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
668 669 670 671
		data_parts.erase(it++); /// Да, ++, а не --.
	}
	std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
	/// Пойдем вправо.
M
Merge  
Michael Kolupaev 已提交
672
	while (it != data_parts.end())
M
Merge  
Michael Kolupaev 已提交
673
	{
M
Merge  
Michael Kolupaev 已提交
674 675 676 677 678 679
		if (!part->contains(**it))
		{
			if ((*it)->name == part->name || (*it)->contains(*part))
				obsolete = true;
			break;
		}
M
Merge  
Michael Kolupaev 已提交
680
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
681
		(*it)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
682 683 684
		data_parts.erase(it++);
	}

M
Merge  
Michael Kolupaev 已提交
685 686 687 688 689 690 691 692
	if (obsolete)
	{
		LOG_WARNING(log, "Obsolete part " + part->name + " added");
	}
	else
	{
		data_parts.insert(part);
	}
693

M
Merge  
Michael Kolupaev 已提交
694
	all_data_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
695

M
Merge  
Michael Kolupaev 已提交
696 697 698 699 700 701 702
	if (out_transaction)
	{
		out_transaction->data = this;
		out_transaction->added_parts = res;
		out_transaction->removed_parts = DataPartsVector(1, part);
	}

M
Merge  
Michael Kolupaev 已提交
703
	return res;
M
Merge  
Michael Kolupaev 已提交
704 705
}

M
Merge  
Michael Kolupaev 已提交
706
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
M
Merge  
Michael Kolupaev 已提交
707 708 709 710 711 712 713
{
	LOG_TRACE(log, "Removing " << remove.size() << " parts and adding " << add.size() << " parts.");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	for (const DataPartPtr & part : remove)
	{
M
Merge  
Michael Kolupaev 已提交
714
		part->remove_time = clear_without_timeout ? 0 : time(0);
M
Merge  
Michael Kolupaev 已提交
715 716 717 718 719 720 721 722
		data_parts.erase(part);
	}
	for (const DataPartPtr & part : add)
	{
		data_parts.insert(part);
	}
}

M
Merge  
Michael Kolupaev 已提交
723
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix, bool restore_covered)
M
Merge  
Michael Kolupaev 已提交
724
{
M
Merge  
Michael Kolupaev 已提交
725 726
	LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");

M
Merge  
Michael Kolupaev 已提交
727
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
728
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
729

M
Merge  
Michael Kolupaev 已提交
730
	if (!all_data_parts.erase(part))
M
Merge  
Michael Kolupaev 已提交
731
		throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
732

M
Merge  
Michael Kolupaev 已提交
733
	data_parts.erase(part);
M
Merge  
Michael Kolupaev 已提交
734
	part->renameAddPrefix(prefix);
M
Merge  
Michael Kolupaev 已提交
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783

	if (restore_covered)
	{
		auto it = all_data_parts.lower_bound(part);
		Strings restored;
		bool error = false;

		UInt64 pos = part->left;

		if (it != all_data_parts.begin())
		{
			--it;
			if (part->contains(**it))
			{
				if ((*it)->left != part->left)
					error = true;
				data_parts.insert(*it);
				pos = (*it)->right + 1;
				restored.push_back((*it)->name);
			}
			else
				error = true;
			++it;
		}
		else
			error = true;

		for (; it != all_data_parts.end() && part->contains(**it); ++it)
		{
			if ((*it)->left < pos)
				continue;
			if ((*it)->left > pos)
				error = true;
			data_parts.insert(*it);
			pos = (*it)->right + 1;
			restored.push_back((*it)->name);
		}

		if (pos != part->right + 1)
			error = true;

		for (const String & name : restored)
		{
			LOG_INFO(log, "Activated part " << name);
		}

		if (error)
			LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
	}
M
Merge  
Michael Kolupaev 已提交
784 785
}

M
Merge  
Michael Kolupaev 已提交
786 787 788 789 790 791 792
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	return data_parts;
}

M
Merge  
Michael Kolupaev 已提交
793 794 795 796 797 798 799
MergeTreeData::DataParts MergeTreeData::getAllDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);

	return all_data_parts;
}

M
Merge  
Michael Kolupaev 已提交
800
size_t MergeTreeData::getMaxPartsCountForMonth()
M
Merge  
Michael Kolupaev 已提交
801 802 803
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

M
Merge  
Michael Kolupaev 已提交
804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823
	size_t res = 0;
	size_t cur_count = 0;
	DayNum_t cur_month = DayNum_t(0);

	for (const auto & part : data_parts)
	{
		if (part->left_month == cur_month)
		{
			++cur_count;
		}
		else
		{
			cur_month = part->left_month;
			cur_count = 1;
		}

		res = std::max(res, cur_count);
	}

	return res;
M
Merge  
Michael Kolupaev 已提交
824 825
}

M
Merge  
Michael Kolupaev 已提交
826 827 828 829 830 831 832
void MergeTreeData::delayInsertIfNeeded()
{
	size_t parts_count = getMaxPartsCountForMonth();
	if (parts_count > settings.parts_to_delay_insert)
	{
		double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert);
		delay /= 1000;
833 834
		delay = std::min(delay, DBMS_MAX_DELAY_OF_INSERT);

M
Merge  
Michael Kolupaev 已提交
835 836 837 838 839 840
		LOG_INFO(log, "Delaying inserting block by "
			<< std::fixed << std::setprecision(4) << delay << "s because there are " << parts_count << " parts");
		std::this_thread::sleep_for(std::chrono::duration<double>(delay));
	}
}

M
Merge  
Michael Kolupaev 已提交
841
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
M
Merge  
Michael Kolupaev 已提交
842 843
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
844
	ActiveDataPartSet::parsePartName(part_name, *tmp_part);
M
Merge  
Michael Kolupaev 已提交
845

M
Merge  
Michael Kolupaev 已提交
846
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
847

M
Merge  
Michael Kolupaev 已提交
848
	/// Кусок может покрываться только предыдущим или следующим в data_parts.
M
Merge  
Michael Kolupaev 已提交
849
	DataParts::iterator it = data_parts.lower_bound(tmp_part);
M
Merge  
Michael Kolupaev 已提交
850

M
Merge  
Michael Kolupaev 已提交
851
	if (it != data_parts.end())
M
Merge  
Michael Kolupaev 已提交
852 853 854 855 856 857 858
	{
		if ((*it)->name == part_name)
			return *it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

M
Merge  
Michael Kolupaev 已提交
859
	if (it != data_parts.begin())
M
Merge  
Michael Kolupaev 已提交
860 861 862 863 864 865 866 867 868
	{
		--it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

	return nullptr;
}

M
Merge  
Michael Kolupaev 已提交
869 870 871 872 873 874 875 876 877 878 879 880 881
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
	ActiveDataPartSet::parsePartName(part_name, *tmp_part);

	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
	DataParts::iterator it = all_data_parts.lower_bound(tmp_part);
	if (it != all_data_parts.end() && (*it)->name == part_name)
		return *it;

	return nullptr;
}

882

M
Merge  
Michael Kolupaev 已提交
883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{
	if (is_compressed && have_uncompressed)
	{
		if (!rhs.is_compressed)
			throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
		if (rhs.uncompressed_size != uncompressed_size)
			throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
		if (rhs.uncompressed_hash != uncompressed_hash)
			throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
		return;
	}
	if (rhs.file_size != file_size)
		throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
	if (rhs.file_hash != file_hash)
		throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

void MergeTreeData::DataPart::Checksums::Checksum::checkSize(const String & path) const
{
	Poco::File file(path);
	if (!file.exists())
		throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
	size_t size = file.getSize();
	if (size != file_size)
		throw Exception(path + " has unexpected size: " + DB::toString(size) + " instead of " + DB::toString(file_size),
			ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}

void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool have_uncompressed) const
913
{
M
Merge  
Michael Kolupaev 已提交
914
	for (const auto & it : rhs.files)
915 916 917
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
918
		if (!files.count(name))
919 920 921
			throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
	}

M
Merge  
Michael Kolupaev 已提交
922
	for (const auto & it : files)
923 924 925
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
926 927
		auto jt = rhs.files.find(name);
		if (jt == rhs.files.end())
928 929
			throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);

M
Merge  
Michael Kolupaev 已提交
930 931 932
		it.second.checkEqual(jt->second, have_uncompressed, name);
	}
}
M
Merge  
Michael Kolupaev 已提交
933

M
Merge  
Michael Kolupaev 已提交
934 935 936 937 938 939
void MergeTreeData::DataPart::Checksums::checkSizes(const String & path) const
{
	for (const auto & it : files)
	{
		const String & name = it.first;
		it.second.checkSize(path + name);
940 941 942
	}
}

943
bool MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
944
{
M
Merge  
Michael Kolupaev 已提交
945
	files.clear();
946 947
	size_t count;

M
Merge  
Michael Kolupaev 已提交
948 949 950 951 952
	DB::assertString("checksums format version: ", in);
	int format_version;
	DB::readText(format_version, in);
	if (format_version < 1 || format_version > 2)
		throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
953 954
	if (format_version == 1)
		return false;
M
Merge  
Michael Kolupaev 已提交
955
	DB::assertString("\n", in);
956 957 958 959 960 961 962 963 964 965
	DB::readText(count, in);
	DB::assertString(" files:\n", in);

	for (size_t i = 0; i < count; ++i)
	{
		String name;
		Checksum sum;

		DB::readString(name, in);
		DB::assertString("\n\tsize: ", in);
M
Merge  
Michael Kolupaev 已提交
966
		DB::readText(sum.file_size, in);
967
		DB::assertString("\n\thash: ", in);
M
Merge  
Michael Kolupaev 已提交
968
		DB::readText(sum.file_hash.first, in);
969
		DB::assertString(" ", in);
M
Merge  
Michael Kolupaev 已提交
970
		DB::readText(sum.file_hash.second, in);
971 972 973
		DB::assertString("\n\tcompressed: ", in);
		DB::readText(sum.is_compressed, in);
		if (sum.is_compressed)
M
Merge  
Michael Kolupaev 已提交
974
		{
975 976 977 978 979 980
			DB::assertString("\n\tuncompressed size: ", in);
			DB::readText(sum.uncompressed_size, in);
			DB::assertString("\n\tuncompressed hash: ", in);
			DB::readText(sum.uncompressed_hash.first, in);
			DB::assertString(" ", in);
			DB::readText(sum.uncompressed_hash.second, in);
M
Merge  
Michael Kolupaev 已提交
981
		}
982
		DB::assertString("\n", in);
983

M
Merge  
Michael Kolupaev 已提交
984
		files.insert(std::make_pair(name, sum));
985
	}
986 987

	return true;
988 989 990 991
}

void MergeTreeData::DataPart::Checksums::writeText(WriteBuffer & out) const
{
M
Merge  
Michael Kolupaev 已提交
992
	DB::writeString("checksums format version: 2\n", out);
M
Merge  
Michael Kolupaev 已提交
993
	DB::writeText(files.size(), out);
994 995
	DB::writeString(" files:\n", out);

M
Merge  
Michael Kolupaev 已提交
996
	for (const auto & it : files)
997
	{
M
Merge  
Michael Kolupaev 已提交
998 999 1000
		const String & name = it.first;
		const Checksum & sum = it.second;
		DB::writeString(name, out);
1001
		DB::writeString("\n\tsize: ", out);
M
Merge  
Michael Kolupaev 已提交
1002
		DB::writeText(sum.file_size, out);
1003
		DB::writeString("\n\thash: ", out);
M
Merge  
Michael Kolupaev 已提交
1004
		DB::writeText(sum.file_hash.first, out);
1005
		DB::writeString(" ", out);
M
Merge  
Michael Kolupaev 已提交
1006 1007 1008
		DB::writeText(sum.file_hash.second, out);
		DB::writeString("\n\tcompressed: ", out);
		DB::writeText(sum.is_compressed, out);
1009
		DB::writeString("\n", out);
M
Merge  
Michael Kolupaev 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
		if (sum.is_compressed)
		{
			DB::writeString("\tuncompressed size: ", out);
			DB::writeText(sum.uncompressed_size, out);
			DB::writeString("\n\tuncompressed hash: ", out);
			DB::writeText(sum.uncompressed_hash.first, out);
			DB::writeString(" ", out);
			DB::writeText(sum.uncompressed_hash.second, out);
			DB::writeString("\n", out);
		}
1020 1021 1022
	}
}

M
Merge  
Michael Kolupaev 已提交
1023
}