MergeTreeData.cpp 40.9 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <Poco/Ext/ScopedTry.h>
2 3

#include <DB/Storages/MergeTree/MergeTreeData.h>
M
Merge  
Michael Kolupaev 已提交
4
#include <DB/Interpreters/ExpressionAnalyzer.h>
M
Merge  
Michael Kolupaev 已提交
5
#include <DB/Storages/MergeTree/MergeTreeReader.h>
M
Merge  
Michael Kolupaev 已提交
6 7
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
8
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
M
Merge  
Michael Kolupaev 已提交
9 10 11
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
M
Merge  
Michael Kolupaev 已提交
12
#include <DB/DataStreams/copyData.h>
M
Merge  
Michael Kolupaev 已提交
13
#include <DB/IO/WriteBufferFromFile.h>
14
#include <DB/IO/CompressedReadBuffer.h>
A
Merge  
Alexey Milovidov 已提交
15
#include <DB/DataTypes/DataTypeDate.h>
A
Merge  
Andrey Mironov 已提交
16
#include <DB/DataTypes/DataTypeFixedString.h>
17
#include <DB/Common/localBackup.h>
A
Andrey Mironov 已提交
18
#include <DB/Functions/FunctionFactory.h>
19

P
Merge  
Pavel Kartavyy 已提交
20
#include <algorithm>
21
#include <iomanip>
M
Merge  
Michael Kolupaev 已提交
22 23 24 25 26 27 28



namespace DB
{

MergeTreeData::MergeTreeData(
M
Merge  
Michael Kolupaev 已提交
29
	const String & full_path_, NamesAndTypesListPtr columns_,
30
	const NamesAndTypesList & materialized_columns_,
31 32
	const NamesAndTypesList & alias_columns_,
	const ColumnDefaults & column_defaults_,
M
Merge  
Michael Kolupaev 已提交
33 34 35 36 37 38
	const Context & context_,
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_, const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	Mode mode_,
	const String & sign_column_,
A
Merge  
Alexey Milovidov 已提交
39
	const Names & columns_to_sum_,
M
Merge  
Michael Kolupaev 已提交
40
	const MergeTreeSettings & settings_,
M
Merge  
Michael Kolupaev 已提交
41
	const String & log_name_,
M
Merge  
Michael Kolupaev 已提交
42 43
	bool require_part_metadata_,
	BrokenPartCallback broken_part_callback_)
44
    : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
M
Merge  
Michael Kolupaev 已提交
45 46
	date_column_name(date_column_name_), sampling_expression(sampling_expression_),
	index_granularity(index_granularity_),
A
Merge  
Alexey Milovidov 已提交
47
	mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_),
A
Merge  
Alexey Milovidov 已提交
48
	settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
M
Merge  
Michael Kolupaev 已提交
49
	require_part_metadata(require_part_metadata_),
M
Merge  
Michael Kolupaev 已提交
50 51 52
	full_path(full_path_), columns(columns_),
	broken_part_callback(broken_part_callback_),
	log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
M
Merge  
Michael Kolupaev 已提交
53
{
A
Merge  
Alexey Milovidov 已提交
54
	/// Проверяем, что столбец с датой существует и имеет тип Date.
A
Merge  
Alexey Milovidov 已提交
55 56
	const auto check_date_exists = [this] (const NamesAndTypesList & columns)
	{
A
Merge  
Andrey Mironov 已提交
57
		for (const auto & column : columns)
A
Merge  
Alexey Milovidov 已提交
58
		{
A
Merge  
Andrey Mironov 已提交
59
			if (column.name == date_column_name)
A
Merge  
Alexey Milovidov 已提交
60
			{
A
Merge  
Andrey Mironov 已提交
61
				if (!typeid_cast<const DataTypeDate *>(column.type.get()))
A
Merge  
Alexey Milovidov 已提交
62
					throw Exception("Date column (" + date_column_name + ") for storage of MergeTree family must have type Date."
A
Merge  
Andrey Mironov 已提交
63 64 65
						" Provided column of type " + column.type->getName() + "."
						" You may have separate column with type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
				return true;
A
Merge  
Alexey Milovidov 已提交
66 67 68
			}
		}

A
Merge  
Andrey Mironov 已提交
69 70 71 72 73 74
		return false;
	};

	if (!check_date_exists(*columns) && !check_date_exists(materialized_columns))
		throw Exception{
			"Date column (" + date_column_name + ") does not exist in table declaration.",
A
Merge  
Alexey Milovidov 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87
			ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};

	/// Если заданы columns_to_sum, проверяем, что такие столбцы существуют.
	if (!columns_to_sum.empty())
	{
		if (mode != Summing)
			throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", ErrorCodes::LOGICAL_ERROR);

		for (const auto & column_to_sum : columns_to_sum)
			if (columns->end() == std::find_if(columns->begin(), columns->end(),
				[&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; }))
				throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
	}
A
Merge  
Alexey Milovidov 已提交
88

M
Merge  
Michael Kolupaev 已提交
89 90
	/// создаём директорию, если её нет
	Poco::File(full_path).createDirectories();
M
Merge  
Michael Kolupaev 已提交
91
	Poco::File(full_path + "detached").createDirectory();
M
Merge  
Michael Kolupaev 已提交
92

A
Merge  
Alexey Milovidov 已提交
93
	if (primary_expr_ast)
M
Merge  
Michael Kolupaev 已提交
94
	{
A
Merge  
Alexey Milovidov 已提交
95 96 97 98 99 100 101
		/// инициализируем описание сортировки
		sort_descr.reserve(primary_expr_ast->children.size());
		for (const ASTPtr & ast : primary_expr_ast->children)
		{
			String name = ast->getColumnName();
			sort_descr.push_back(SortColumnDescription(name, 1));
		}
M
Merge  
Michael Kolupaev 已提交
102

A
Merge  
Alexey Milovidov 已提交
103
		primary_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(false);
M
Merge  
Michael Kolupaev 已提交
104

A
Merge  
Alexey Milovidov 已提交
105 106 107 108 109
		ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(true);
		primary_key_sample = projected_expr->getSampleBlock();
	}
	else if (mode != Unsorted)
		throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
M
Merge  
Michael Kolupaev 已提交
110
}
M
Merge  
Michael Kolupaev 已提交
111

M
Merge  
Michael Kolupaev 已提交
112 113
UInt64 MergeTreeData::getMaxDataPartIndex()
{
M
Merge  
Michael Kolupaev 已提交
114
	UInt64 max_part_id = 0;
115 116 117
	for (const auto & part : data_parts)
		max_part_id = std::max(max_part_id, part->right);

M
Merge  
Michael Kolupaev 已提交
118
	return max_part_id;
M
Merge  
Michael Kolupaev 已提交
119 120
}

M
Merge  
Michael Kolupaev 已提交
121
std::string MergeTreeData::getModePrefix() const
M
Merge  
Michael Kolupaev 已提交
122
{
M
Merge  
Michael Kolupaev 已提交
123
	switch (mode)
M
Merge  
Michael Kolupaev 已提交
124
	{
M
Merge  
Michael Kolupaev 已提交
125 126 127
		case Ordinary: 		return "";
		case Collapsing: 	return "Collapsing";
		case Summing: 		return "Summing";
S
Merge  
Sergey Fedorov 已提交
128
		case Aggregating: 	return "Aggregating";
A
Merge  
Alexey Milovidov 已提交
129
		case Unsorted: 		return "Unsorted";
M
Merge  
Michael Kolupaev 已提交
130

M
Merge  
Michael Kolupaev 已提交
131 132
		default:
			throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
133 134 135 136
	}
}


M
Merge  
Michael Kolupaev 已提交
137
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
138 139 140 141 142 143 144 145
{
	LOG_DEBUG(log, "Loading data parts");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

	data_parts.clear();

146
	Strings part_file_names;
M
Merge  
Michael Kolupaev 已提交
147 148
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
M
Merge  
Michael Kolupaev 已提交
149
	{
150 151
		/// Пропускаем временные директории старше суток.
		if (0 == it.name().compare(0, strlen("tmp_"), "tmp_"))
M
Merge  
Michael Kolupaev 已提交
152 153
			continue;

154
		part_file_names.push_back(it.name());
M
Merge  
Michael Kolupaev 已提交
155 156
	}

M
Merge  
Michael Kolupaev 已提交
157
	DataPartsVector broken_parts_to_remove;
M
Merge  
Michael Kolupaev 已提交
158 159
	DataPartsVector broken_parts_to_detach;
	size_t suspicious_broken_parts = 0;
M
Merge  
Michael Kolupaev 已提交
160

M
Merge  
Michael Kolupaev 已提交
161
	Poco::RegularExpression::MatchVec matches;
M
Merge  
Michael Kolupaev 已提交
162
	for (const String & file_name : part_file_names)
M
Merge  
Michael Kolupaev 已提交
163
	{
M
Merge  
Michael Kolupaev 已提交
164
		if (!ActiveDataPartSet::isPartDirectory(file_name, &matches))
M
Merge  
Michael Kolupaev 已提交
165 166
			continue;

M
Merge  
Michael Kolupaev 已提交
167
		MutableDataPartPtr part = std::make_shared<DataPart>(*this);
168
		ActiveDataPartSet::parsePartName(file_name, *part, &matches);
M
Merge  
Michael Kolupaev 已提交
169 170
		part->name = file_name;

M
Merge  
Michael Kolupaev 已提交
171 172 173 174
		bool broken = false;

		try
		{
M
Merge  
Michael Kolupaev 已提交
175 176
			part->loadColumns(require_part_metadata);
			part->loadChecksums(require_part_metadata);
M
Merge  
Michael Kolupaev 已提交
177
			part->loadIndex();
M
Merge  
Michael Kolupaev 已提交
178
			part->checkNotBroken(require_part_metadata);
M
Merge  
Michael Kolupaev 已提交
179 180 181 182 183 184 185
		}
		catch (...)
		{
			broken = true;
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
186
		/// Игнорируем и, возможно, удаляем битые куски, которые могут образовываться после грубого перезапуска сервера.
M
Merge  
Michael Kolupaev 已提交
187
		if (broken)
M
Merge  
Michael Kolupaev 已提交
188 189 190 191
		{
			if (part->level == 0)
			{
				/// Восстановить куски нулевого уровня невозможно.
192
				LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it's impossible to repair.");
M
Merge  
Michael Kolupaev 已提交
193
				broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
194 195 196
			}
			else
			{
M
Merge  
Michael Kolupaev 已提交
197 198 199 200 201
				/// Посмотрим, сколько кусков покрыты битым. Если хотя бы два, предполагаем, что битый кусок образован их
				///  слиянием, и мы ничего не потеряем, если его удалим.
				int contained_parts = 0;

				LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");
M
Merge  
Michael Kolupaev 已提交
202
				++suspicious_broken_parts;
M
Merge  
Michael Kolupaev 已提交
203 204 205 206 207

				for (const String & contained_name : part_file_names)
				{
					if (contained_name == file_name)
						continue;
M
Merge  
Michael Kolupaev 已提交
208
					if (!ActiveDataPartSet::isPartDirectory(contained_name, &matches))
M
Merge  
Michael Kolupaev 已提交
209 210
						continue;
					DataPart contained_part(*this);
211
					ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
M
Merge  
Michael Kolupaev 已提交
212 213 214 215 216 217 218 219 220 221
					if (part->contains(contained_part))
					{
						LOG_ERROR(log, "Found part " << full_path + contained_name);
						++contained_parts;
					}
				}

				if (contained_parts >= 2)
				{
					LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it covers at least 2 other parts");
M
Merge  
Michael Kolupaev 已提交
222
					broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
223 224 225
				}
				else
				{
M
Merge  
Michael Kolupaev 已提交
226
					LOG_ERROR(log, "Detaching broken part " << full_path + file_name
M
Merge  
Michael Kolupaev 已提交
227
						<< " because it covers less than 2 parts. You need to resolve this manually");
M
Merge  
Michael Kolupaev 已提交
228
					broken_parts_to_detach.push_back(part);
M
Merge  
Michael Kolupaev 已提交
229
				}
M
Merge  
Michael Kolupaev 已提交
230 231 232 233 234 235 236 237 238 239
			}

			continue;
		}

		part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();

		data_parts.insert(part);
	}

M
Merge  
Michael Kolupaev 已提交
240 241
	if (suspicious_broken_parts > 5 && !skip_sanity_checks)
		throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
M
Merge  
Michael Kolupaev 已提交
242 243 244 245
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);

	for (const auto & part : broken_parts_to_remove)
		part->remove();
M
Merge  
Michael Kolupaev 已提交
246
	for (const auto & part : broken_parts_to_detach)
247
		part->renameAddPrefix(true, "");
M
Merge  
Michael Kolupaev 已提交
248

M
Merge  
Michael Kolupaev 已提交
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
	all_data_parts = data_parts;

	/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
	  *  но по каким-то причинам остались лежать в файловой системе.
	  * Удаление файлов будет произведено потом в методе clearOldParts.
	  */

	if (data_parts.size() >= 2)
	{
		DataParts::iterator prev_jt = data_parts.begin();
		DataParts::iterator curr_jt = prev_jt;
		++curr_jt;
		while (curr_jt != data_parts.end())
		{
			/// Куски данных за разные месяцы рассматривать не будем
			if ((*curr_jt)->left_month != (*curr_jt)->right_month
				|| (*curr_jt)->right_month != (*prev_jt)->left_month
				|| (*prev_jt)->left_month != (*prev_jt)->right_month)
			{
				++prev_jt;
				++curr_jt;
				continue;
			}

			if ((*curr_jt)->contains(**prev_jt))
			{
M
Merge  
Michael Kolupaev 已提交
275
				(*prev_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
276 277 278 279 280 281
				data_parts.erase(prev_jt);
				prev_jt = curr_jt;
				++curr_jt;
			}
			else if ((*prev_jt)->contains(**curr_jt))
			{
M
Merge  
Michael Kolupaev 已提交
282
				(*curr_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
283 284 285 286 287 288 289 290 291 292
				data_parts.erase(curr_jt++);
			}
			else
			{
				++prev_jt;
				++curr_jt;
			}
		}
	}

A
Merge  
Andrey Mironov 已提交
293 294
	calculateColumnSizes();

M
Merge  
Michael Kolupaev 已提交
295 296 297 298
	LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
}


M
Merge  
Michael Kolupaev 已提交
299
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
M
Merge  
Michael Kolupaev 已提交
300 301
{
	Poco::ScopedTry<Poco::FastMutex> lock;
M
Merge  
Michael Kolupaev 已提交
302
	DataPartsVector res;
M
Merge  
Michael Kolupaev 已提交
303 304 305

	/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
	if (!lock.lock(&all_data_parts_mutex))
306 307
	{
		LOG_TRACE(log, "grabOldParts: all_data_parts is locked");
M
Merge  
Michael Kolupaev 已提交
308
		return res;
309
	}
M
Merge  
Michael Kolupaev 已提交
310

M
Merge  
Michael Kolupaev 已提交
311 312
	/// Удаляем временные директории старше суток.
	Poco::DirectoryIterator end;
313
	for (Poco::DirectoryIterator it{full_path}; it != end; ++it)
M
Merge  
Michael Kolupaev 已提交
314
	{
315
		if (0 == it.name().compare(0, strlen("tmp_"), "tmp_"))
M
Merge  
Michael Kolupaev 已提交
316
		{
317
			Poco::File tmp_dir(full_path + it.name());
M
Merge  
Michael Kolupaev 已提交
318 319 320

			if (tmp_dir.isDirectory() && tmp_dir.getLastModified().epochTime() + 86400 < time(0))
			{
321 322
				LOG_WARNING(log, "Removing temporary directory " << full_path << it.name());
				Poco::File(full_path + it.name()).remove(true);
M
Merge  
Michael Kolupaev 已提交
323 324 325 326
			}
		}
	}

M
Merge  
Michael Kolupaev 已提交
327 328 329
	time_t now = time(0);
	for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
	{
330
		if (it->unique() && /// После этого ref_count не может увеличиться.
M
Merge  
Michael Kolupaev 已提交
331 332 333 334 335 336 337 338 339 340
			(*it)->remove_time < now &&
			now - (*it)->remove_time > settings.old_parts_lifetime)
		{
			res.push_back(*it);
			all_data_parts.erase(it++);
		}
		else
			++it;
	}

M
Merge  
Michael Kolupaev 已提交
341
	return res;
M
Merge  
Michael Kolupaev 已提交
342 343
}

M
Merge  
Michael Kolupaev 已提交
344 345 346 347 348 349 350 351 352 353
void MergeTreeData::addOldParts(const MergeTreeData::DataPartsVector & parts)
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
	all_data_parts.insert(parts.begin(), parts.end());
}

void MergeTreeData::clearOldParts()
{
	auto parts_to_remove = grabOldParts();

354
	for (const DataPartPtr & part : parts_to_remove)
M
Merge  
Michael Kolupaev 已提交
355 356 357 358 359 360
	{
		LOG_DEBUG(log, "Removing part " << part->name);
		part->remove();
	}
}

M
Merge  
Michael Kolupaev 已提交
361
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
M
Merge  
Michael Kolupaev 已提交
362
{
M
Merge  
Michael Kolupaev 已提交
363 364 365 366 367 368
	if (move_data)
	{
		Poco::File(full_path).renameTo(new_full_path);
		/// Если данные перемещать не нужно, значит их переместил кто-то другой. Расчитываем, что он еще и сбросил кеши.
		context.resetCaches();
	}
369

M
Merge  
Michael Kolupaev 已提交
370
	full_path = new_full_path;
M
Merge  
Michael Kolupaev 已提交
371 372
}

M
Merge  
Michael Kolupaev 已提交
373
void MergeTreeData::dropAllData()
M
Merge  
Michael Kolupaev 已提交
374 375 376
{
	data_parts.clear();
	all_data_parts.clear();
A
Merge  
Andrey Mironov 已提交
377
	column_sizes.clear();
M
Merge  
Michael Kolupaev 已提交
378

379
	context.resetCaches();
380

M
Merge  
Michael Kolupaev 已提交
381 382 383 384
	Poco::File(full_path).remove(true);
}


M
Merge  
Michael Kolupaev 已提交
385
void MergeTreeData::checkAlter(const AlterCommands & params)
M
Merge  
Michael Kolupaev 已提交
386
{
M
Merge  
Michael Kolupaev 已提交
387
	/// Проверим, что указанные преобразования можно совершить над списком столбцов без учета типов.
388 389 390 391 392
	auto new_columns = *columns;
	auto new_materialized_columns = materialized_columns;
	auto new_alias_columns = alias_columns;
	auto new_column_defaults = column_defaults;
	params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
M
Merge  
Michael Kolupaev 已提交
393

M
Merge  
Michael Kolupaev 已提交
394 395
	/// Список столбцов, которые нельзя трогать.
	/// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
A
Merge  
Alexey Milovidov 已提交
396 397 398 399 400 401

	Names keys;

	if (primary_expr)
		keys = primary_expr->getRequiredColumns();

M
Merge  
Michael Kolupaev 已提交
402
	keys.push_back(sign_column);
A
Merge  
Alexey Milovidov 已提交
403

M
Merge  
Michael Kolupaev 已提交
404
	std::sort(keys.begin(), keys.end());
M
Merge  
Michael Kolupaev 已提交
405

M
Merge  
Michael Kolupaev 已提交
406
	for (const AlterCommand & command : params)
407
	{
M
Merge  
Michael Kolupaev 已提交
408 409
		if (std::binary_search(keys.begin(), keys.end(), command.column_name))
			throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
410 411
	}

M
Merge  
Michael Kolupaev 已提交
412 413 414
	/// Проверим, что преобразования типов возможны.
	ExpressionActionsPtr unused_expression;
	NameToNameMap unused_map;
415 416 417 418 419

	/// augment plain columns with materialized columns for convert expression creation
	new_columns.insert(std::end(new_columns),
		std::begin(new_materialized_columns), std::end(new_materialized_columns));
	createConvertExpression(nullptr, getColumnsList(), new_columns, unused_expression, unused_map);
M
Merge  
Michael Kolupaev 已提交
420 421
}

422
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
M
Merge  
Michael Kolupaev 已提交
423
	ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map)
M
Merge  
Michael Kolupaev 已提交
424
{
M
Merge  
Michael Kolupaev 已提交
425 426
	out_expression = nullptr;
	out_rename_map.clear();
M
Merge  
Michael Kolupaev 已提交
427

M
Merge  
Michael Kolupaev 已提交
428 429 430
	typedef std::map<String, DataTypePtr> NameToType;
	NameToType new_types;
	for (const NameAndTypePair & column : new_columns)
M
Merge  
Michael Kolupaev 已提交
431
	{
M
Merge  
Michael Kolupaev 已提交
432
		new_types[column.name] = column.type;
M
Merge  
Michael Kolupaev 已提交
433
	}
M
Merge  
Michael Kolupaev 已提交
434

M
Merge  
Michael Kolupaev 已提交
435 436 437
	/// Сколько столбцов сейчас в каждой вложенной структуре. Столбцы не из вложенных структур сюда тоже попадут и не помешают.
	std::map<String, int> nested_table_counts;
	for (const NameAndTypePair & column : old_columns)
M
Merge  
Michael Kolupaev 已提交
438
	{
M
Merge  
Michael Kolupaev 已提交
439
		++nested_table_counts[DataTypeNested::extractNestedTableName(column.name)];
M
Merge  
Michael Kolupaev 已提交
440
	}
441

M
Merge  
Michael Kolupaev 已提交
442
	for (const NameAndTypePair & column : old_columns)
M
Merge  
Michael Kolupaev 已提交
443
	{
M
Merge  
Michael Kolupaev 已提交
444
		if (!new_types.count(column.name))
445
		{
M
Merge  
Michael Kolupaev 已提交
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
			if (!part || part->hasColumnFiles(column.name))
			{
				/// Столбец нужно удалить.

				String escaped_column = escapeForFileName(column.name);
				out_rename_map[escaped_column + ".bin"] = "";
				out_rename_map[escaped_column + ".mrk"] = "";

				/// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами.
				if (typeid_cast<const DataTypeArray *>(&*column.type))
				{
					String nested_table = DataTypeNested::extractNestedTableName(column.name);
					/// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы.
					if (!--nested_table_counts[nested_table])
					{
						String escaped_nested_table = escapeForFileName(nested_table);
						out_rename_map[escaped_nested_table + ".size0.bin"] = "";
						out_rename_map[escaped_nested_table + ".size0.mrk"] = "";
					}
				}
			}
467
		}
M
Merge  
Michael Kolupaev 已提交
468 469
		else
		{
A
Merge  
Andrey Mironov 已提交
470 471
			const auto new_type = new_types[column.name].get();
			const String new_type_name = new_type->getName();
M
Merge  
Michael Kolupaev 已提交
472 473 474 475 476 477 478 479

			if (new_type_name != column.type->getName() &&
				(!part || part->hasColumnFiles(column.name)))
			{
				/// Нужно изменить тип столбца.

				if (!out_expression)
					out_expression = new ExpressionActions(NamesAndTypesList(), context.getSettingsRef());
M
Merge  
Michael Kolupaev 已提交
480

M
Merge  
Michael Kolupaev 已提交
481 482 483
				out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name));

				Names out_names;
A
Merge  
Andrey Mironov 已提交
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503

				if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(new_type))
				{
					const auto width = fixed_string->getN();
					const auto string_width_column = toString(width);
					out_expression->addInput({ new ColumnConstUInt64{1, width}, new DataTypeUInt64, string_width_column });

					const auto function = FunctionFactory::instance().get("toFixedString", context);
					out_expression->add(ExpressionAction::applyFunction(function, Names{
						column.name, string_width_column
					}), out_names);

					out_expression->add(ExpressionAction::removeColumn(string_width_column));
				}
				else
				{
					const FunctionPtr & function = FunctionFactory::instance().get("to" + new_type_name, context);
					out_expression->add(ExpressionAction::applyFunction(function, Names{column.name}), out_names);
				}

M
Merge  
Michael Kolupaev 已提交
504 505
				out_expression->add(ExpressionAction::removeColumn(column.name));

A
Merge  
Andrey Mironov 已提交
506 507
				const String escaped_expr = escapeForFileName(out_names[0]);
				const String escaped_column = escapeForFileName(column.name);
M
Merge  
Michael Kolupaev 已提交
508 509 510 511
				out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin";
				out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk";
			}
		}
M
Merge  
Michael Kolupaev 已提交
512 513
	}
}
M
Merge  
Michael Kolupaev 已提交
514

515 516
MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
	const DataPartPtr & part, const NamesAndTypesList & new_columns, bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
517
{
M
Merge  
Michael Kolupaev 已提交
518
	ExpressionActionsPtr expression;
M
Merge  
Michael Kolupaev 已提交
519
	AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Блокирует изменение куска.
M
Merge  
Michael Kolupaev 已提交
520
	createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map);
M
Merge  
Michael Kolupaev 已提交
521

M
Merge  
Michael Kolupaev 已提交
522 523 524 525 526 527 528 529
	if (!skip_sanity_checks && transaction->rename_map.size() > 5)
	{
		transaction->clear();

		throw Exception("Suspiciously many (" + toString(transaction->rename_map.size()) + ") files need to be modified in part " + part->name
						+ ". Aborting just in case");
	}

M
Merge  
Michael Kolupaev 已提交
530
	if (transaction->rename_map.empty())
M
Merge  
Michael Kolupaev 已提交
531
	{
M
Merge  
Michael Kolupaev 已提交
532
		transaction->clear();
M
Merge  
Michael Kolupaev 已提交
533
		return nullptr;
M
Merge  
Michael Kolupaev 已提交
534
	}
M
Merge  
Michael Kolupaev 已提交
535

M
Merge  
Michael Kolupaev 已提交
536 537 538 539
	DataPart::Checksums add_checksums;

	/// Применим выражение и запишем результат во временные файлы.
	if (expression)
M
Merge  
Michael Kolupaev 已提交
540 541
	{
		MarkRanges ranges(1, MarkRange(0, part->size));
M
Merge  
Michael Kolupaev 已提交
542
		BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/',
M
Merge  
Michael Kolupaev 已提交
543
			DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "", false);
M
Merge  
Michael Kolupaev 已提交
544
		ExpressionBlockInputStream in(part_in, expression);
545
		MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4);
M
Merge  
Michael Kolupaev 已提交
546
		in.readPrefix();
M
Merge  
Michael Kolupaev 已提交
547
		out.writePrefix();
M
Merge  
Michael Kolupaev 已提交
548

M
Merge  
Michael Kolupaev 已提交
549 550
		while (Block b = in.read())
			out.write(b);
M
Merge  
Michael Kolupaev 已提交
551

M
Merge  
Michael Kolupaev 已提交
552 553 554
		in.readSuffix();
		add_checksums = out.writeSuffixAndGetChecksums();
	}
M
Merge  
Michael Kolupaev 已提交
555

M
Merge  
Michael Kolupaev 已提交
556
	/// Обновим контрольные суммы.
M
Merge  
Michael Kolupaev 已提交
557 558 559 560 561 562
	DataPart::Checksums new_checksums = part->checksums;
	for (auto it : transaction->rename_map)
	{
		if (it.second == "")
		{
			new_checksums.files.erase(it.first);
M
Merge  
Michael Kolupaev 已提交
563
		}
M
Merge  
Michael Kolupaev 已提交
564
		else
M
Merge  
Michael Kolupaev 已提交
565
		{
M
Merge  
Michael Kolupaev 已提交
566
			new_checksums.files[it.second] = add_checksums.files[it.first];
M
Merge  
Michael Kolupaev 已提交
567 568
		}
	}
M
Merge  
Michael Kolupaev 已提交
569

M
Merge  
Michael Kolupaev 已提交
570 571
	/// Запишем обновленные контрольные суммы во временный файл
	if (!part->checksums.empty())
M
Merge  
Michael Kolupaev 已提交
572
	{
M
Merge  
Michael Kolupaev 已提交
573
		transaction->new_checksums = new_checksums;
M
Merge  
Michael Kolupaev 已提交
574
		WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
575
		new_checksums.write(checksums_file);
M
Merge  
Michael Kolupaev 已提交
576
		transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
M
Merge  
Michael Kolupaev 已提交
577
	}
M
Merge  
Michael Kolupaev 已提交
578

M
Merge  
Michael Kolupaev 已提交
579 580
	/// Запишем обновленный список столбцов во временный файл.
	{
M
Merge  
Michael Kolupaev 已提交
581
		transaction->new_columns = new_columns.filter(part->columns.getNames());
M
Merge  
Michael Kolupaev 已提交
582 583 584 585 586
		WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
		transaction->new_columns.writeText(columns_file);
		transaction->rename_map["columns.txt.tmp"] = "columns.txt";
	}

M
Merge  
Michael Kolupaev 已提交
587 588
	return transaction;
}
M
Merge  
Michael Kolupaev 已提交
589

M
Merge  
Michael Kolupaev 已提交
590 591 592 593 594
void MergeTreeData::AlterDataPartTransaction::commit()
{
	if (!data_part)
		return;
	try
M
Merge  
Michael Kolupaev 已提交
595
	{
M
Merge  
Michael Kolupaev 已提交
596 597
		Poco::ScopedWriteRWLock lock(data_part->columns_lock);

M
Merge  
Michael Kolupaev 已提交
598
		String path = data_part->storage.full_path + data_part->name + "/";
M
Merge  
Michael Kolupaev 已提交
599 600

		/// 1) Переименуем старые файлы.
M
Merge  
Michael Kolupaev 已提交
601
		for (auto it : rename_map)
M
Merge  
Michael Kolupaev 已提交
602
		{
M
Merge  
Michael Kolupaev 已提交
603 604 605 606
			String name = it.second.empty() ? it.first : it.second;
			Poco::File(path + name).renameTo(path + name + ".tmp2");
		}

607
		/// 2) Переместим на их место новые и обновим метаданные в оперативке.
M
Merge  
Michael Kolupaev 已提交
608 609 610
		for (auto it : rename_map)
		{
			if (!it.second.empty())
M
Merge  
Michael Kolupaev 已提交
611
			{
M
Merge  
Michael Kolupaev 已提交
612
				Poco::File(path + it.first).renameTo(path + it.second);
M
Merge  
Michael Kolupaev 已提交
613
			}
M
Merge  
Michael Kolupaev 已提交
614
		}
615 616 617 618

		DataPart & mutable_part = const_cast<DataPart &>(*data_part);
		mutable_part.checksums = new_checksums;
		mutable_part.columns = new_columns;
M
Merge  
Michael Kolupaev 已提交
619 620 621 622 623 624 625 626

		/// 3) Удалим старые файлы.
		for (auto it : rename_map)
		{
			String name = it.second.empty() ? it.first : it.second;
			Poco::File(path + name + ".tmp2").remove();
		}

M
Merge  
Michael Kolupaev 已提交
627 628
		mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);

M
Merge  
Michael Kolupaev 已提交
629 630 631
		/// TODO: можно не сбрасывать кеши при добавлении столбца.
		data_part->storage.context.resetCaches();

M
Merge  
Michael Kolupaev 已提交
632
		clear();
M
Merge  
Michael Kolupaev 已提交
633 634 635 636
	}
	catch (...)
	{
		/// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
M
Merge  
Michael Kolupaev 已提交
637
		clear();
M
Merge  
Michael Kolupaev 已提交
638
		throw;
M
Merge  
Michael Kolupaev 已提交
639
	}
M
Merge  
Michael Kolupaev 已提交
640
}
M
Merge  
Michael Kolupaev 已提交
641

M
Merge  
Michael Kolupaev 已提交
642
MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
M
Merge  
Michael Kolupaev 已提交
643
{
M
Merge  
Michael Kolupaev 已提交
644
	try
M
Merge  
Michael Kolupaev 已提交
645
	{
M
Merge  
Michael Kolupaev 已提交
646 647 648 649
		if (!data_part)
			return;

		LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->name);
M
Merge  
Michael Kolupaev 已提交
650

M
Merge  
Michael Kolupaev 已提交
651 652 653 654
		String path = data_part->storage.full_path + data_part->name + "/";
		for (auto it : rename_map)
		{
			if (!it.second.empty())
M
Merge  
Michael Kolupaev 已提交
655
			{
M
Merge  
Michael Kolupaev 已提交
656 657
				try
				{
M
Merge  
Michael Kolupaev 已提交
658 659 660
					Poco::File file(path + it.first);
					if (file.exists())
						file.remove();
M
Merge  
Michael Kolupaev 已提交
661 662 663 664 665
				}
				catch (Poco::Exception & e)
				{
					LOG_WARNING(data_part->storage.log, "Can't remove " << path + it.first << ": " << e.displayText());
				}
M
Merge  
Michael Kolupaev 已提交
666
			}
M
Merge  
Michael Kolupaev 已提交
667 668
		}
	}
M
Merge  
Michael Kolupaev 已提交
669
	catch (...)
M
Merge  
Michael Kolupaev 已提交
670
	{
M
Merge  
Michael Kolupaev 已提交
671
		tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
672
	}
M
Merge  
Michael Kolupaev 已提交
673 674 675
}


676
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
M
Merge  
Michael Kolupaev 已提交
677
{
M
Merge  
Michael Kolupaev 已提交
678
	auto removed = renameTempPartAndReplace(part, increment, out_transaction);
M
Merge  
Michael Kolupaev 已提交
679 680 681 682 683
	if (!removed.empty())
	{
		LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
			<< " existing part(s) (including " << removed[0]->name << ")");
	}
M
Merge  
Michael Kolupaev 已提交
684 685
}

M
Merge  
Michael Kolupaev 已提交
686
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
687
	MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
M
Merge  
Michael Kolupaev 已提交
688
{
M
Merge  
Michael Kolupaev 已提交
689
	if (out_transaction && out_transaction->data)
690
		throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
691

692
	LOG_TRACE(log, "Renaming " << part->name << ".");
M
Merge  
Michael Kolupaev 已提交
693 694 695 696

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

M
Merge  
Michael Kolupaev 已提交
697 698
	String old_name = part->name;
	String old_path = getFullPath() + old_name + "/";
M
Merge  
Michael Kolupaev 已提交
699

M
Merge  
Michael Kolupaev 已提交
700
	/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
M
Merge  
Michael Kolupaev 已提交
701 702 703 704
	  * Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
	  *  содержат ещё не добавленный кусок.
	  */
	if (increment)
M
Merge  
Michael Kolupaev 已提交
705
		part->left = part->right = increment->get(false);
M
Merge  
Michael Kolupaev 已提交
706

M
Merge  
Michael Kolupaev 已提交
707
	String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
M
Merge  
Michael Kolupaev 已提交
708

M
Merge  
Michael Kolupaev 已提交
709 710 711 712 713
	part->is_temp = false;
	part->name = new_name;
	bool duplicate = data_parts.count(part);
	part->name = old_name;
	part->is_temp = true;
M
Merge  
Michael Kolupaev 已提交
714

M
Merge  
Michael Kolupaev 已提交
715 716 717 718
	if (duplicate)
		throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);

	String new_path = getFullPath() + new_name + "/";
M
Merge  
Michael Kolupaev 已提交
719 720 721 722

	/// Переименовываем кусок.
	Poco::File(old_path).renameTo(new_path);

M
Merge  
Michael Kolupaev 已提交
723 724 725
	part->is_temp = false;
	part->name = new_name;

M
Merge  
Michael Kolupaev 已提交
726
	bool obsolete = false; /// Покрыт ли part каким-нибудь куском.
M
Merge  
Michael Kolupaev 已提交
727 728 729 730 731 732 733 734
	DataPartsVector res;
	/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
	DataParts::iterator it = data_parts.lower_bound(part);
	/// Пойдем влево.
	while (it != data_parts.begin())
	{
		--it;
		if (!part->contains(**it))
M
Michael Kolupaev 已提交
735
		{
M
Merge  
Michael Kolupaev 已提交
736 737
			if ((*it)->contains(*part))
				obsolete = true;
M
Michael Kolupaev 已提交
738
			++it;
M
Merge  
Michael Kolupaev 已提交
739
			break;
M
Michael Kolupaev 已提交
740
		}
M
Merge  
Michael Kolupaev 已提交
741
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
742
		(*it)->remove_time = time(0);
A
Merge  
Andrey Mironov 已提交
743
		removePartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
744 745 746 747
		data_parts.erase(it++); /// Да, ++, а не --.
	}
	std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
	/// Пойдем вправо.
M
Merge  
Michael Kolupaev 已提交
748
	while (it != data_parts.end())
M
Merge  
Michael Kolupaev 已提交
749
	{
M
Merge  
Michael Kolupaev 已提交
750 751 752 753 754 755
		if (!part->contains(**it))
		{
			if ((*it)->name == part->name || (*it)->contains(*part))
				obsolete = true;
			break;
		}
M
Merge  
Michael Kolupaev 已提交
756
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
757
		(*it)->remove_time = time(0);
A
Merge  
Andrey Mironov 已提交
758
		removePartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
759 760 761
		data_parts.erase(it++);
	}

M
Merge  
Michael Kolupaev 已提交
762 763 764 765 766 767 768
	if (obsolete)
	{
		LOG_WARNING(log, "Obsolete part " + part->name + " added");
	}
	else
	{
		data_parts.insert(part);
A
Merge  
Andrey Mironov 已提交
769
		addPartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
770
	}
771

M
Merge  
Michael Kolupaev 已提交
772
	all_data_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
773

M
Merge  
Michael Kolupaev 已提交
774 775 776 777 778 779 780
	if (out_transaction)
	{
		out_transaction->data = this;
		out_transaction->added_parts = res;
		out_transaction->removed_parts = DataPartsVector(1, part);
	}

M
Merge  
Michael Kolupaev 已提交
781
	return res;
M
Merge  
Michael Kolupaev 已提交
782 783
}

M
Merge  
Michael Kolupaev 已提交
784
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
M
Merge  
Michael Kolupaev 已提交
785 786 787 788 789
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	for (const DataPartPtr & part : remove)
	{
M
Merge  
Michael Kolupaev 已提交
790
		part->remove_time = clear_without_timeout ? 0 : time(0);
A
Merge  
Andrey Mironov 已提交
791
		removePartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
792 793 794 795 796
		data_parts.erase(part);
	}
	for (const DataPartPtr & part : add)
	{
		data_parts.insert(part);
A
Merge  
Andrey Mironov 已提交
797
		addPartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
798 799 800
	}
}

801
void MergeTreeData::attachPart(const DataPartPtr & part)
M
Merge  
Michael Kolupaev 已提交
802 803 804 805 806 807 808 809 810
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

	if (!all_data_parts.insert(part).second)
		throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
	data_parts.insert(part);
}

811
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
M
Merge  
Michael Kolupaev 已提交
812
{
M
Merge  
Michael Kolupaev 已提交
813 814
	LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");

M
Merge  
Michael Kolupaev 已提交
815
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
816
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
817

M
Merge  
Michael Kolupaev 已提交
818
	if (!all_data_parts.erase(part))
M
Merge  
Michael Kolupaev 已提交
819
		throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
820

A
Merge  
Andrey Mironov 已提交
821
	removePartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
822
	data_parts.erase(part);
M
Merge  
Michael Kolupaev 已提交
823
	if (move_to_detached || !prefix.empty())
824
		part->renameAddPrefix(move_to_detached, prefix);
M
Merge  
Michael Kolupaev 已提交
825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841

	if (restore_covered)
	{
		auto it = all_data_parts.lower_bound(part);
		Strings restored;
		bool error = false;

		UInt64 pos = part->left;

		if (it != all_data_parts.begin())
		{
			--it;
			if (part->contains(**it))
			{
				if ((*it)->left != part->left)
					error = true;
				data_parts.insert(*it);
A
Merge  
Andrey Mironov 已提交
842
				addPartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859
				pos = (*it)->right + 1;
				restored.push_back((*it)->name);
			}
			else
				error = true;
			++it;
		}
		else
			error = true;

		for (; it != all_data_parts.end() && part->contains(**it); ++it)
		{
			if ((*it)->left < pos)
				continue;
			if ((*it)->left > pos)
				error = true;
			data_parts.insert(*it);
A
Merge  
Andrey Mironov 已提交
860
			addPartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
861 862 863 864 865 866 867 868 869 870 871 872 873 874 875
			pos = (*it)->right + 1;
			restored.push_back((*it)->name);
		}

		if (pos != part->right + 1)
			error = true;

		for (const String & name : restored)
		{
			LOG_INFO(log, "Activated part " << name);
		}

		if (error)
			LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
	}
M
Merge  
Michael Kolupaev 已提交
876 877
}

878
void MergeTreeData::detachPartInPlace(const DataPartPtr & part)
M
Merge  
Michael Kolupaev 已提交
879 880 881 882
{
	renameAndDetachPart(part, "", false, false);
}

M
Merge  
Michael Kolupaev 已提交
883 884 885 886 887 888 889
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	return data_parts;
}

A
Merge  
Andrey Mironov 已提交
890 891 892 893
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

894
	return DataPartsVector(std::begin(data_parts), std::end(data_parts));
A
Merge  
Andrey Mironov 已提交
895 896
}

M
Merge  
Michael Kolupaev 已提交
897 898 899 900 901 902 903
MergeTreeData::DataParts MergeTreeData::getAllDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);

	return all_data_parts;
}

M
Merge  
Michael Kolupaev 已提交
904
size_t MergeTreeData::getMaxPartsCountForMonth()
M
Merge  
Michael Kolupaev 已提交
905 906 907
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

M
Merge  
Michael Kolupaev 已提交
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
	size_t res = 0;
	size_t cur_count = 0;
	DayNum_t cur_month = DayNum_t(0);

	for (const auto & part : data_parts)
	{
		if (part->left_month == cur_month)
		{
			++cur_count;
		}
		else
		{
			cur_month = part->left_month;
			cur_count = 1;
		}

		res = std::max(res, cur_count);
	}

	return res;
M
Merge  
Michael Kolupaev 已提交
928 929
}

930
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
M
Merge  
Michael Kolupaev 已提交
931 932 933 934 935 936
{
	size_t parts_count = getMaxPartsCountForMonth();
	if (parts_count > settings.parts_to_delay_insert)
	{
		double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert);
		delay /= 1000;
937 938

		if (delay > DBMS_MAX_DELAY_OF_INSERT)
939 940 941 942
		{
			ProfileEvents::increment(ProfileEvents::RejectedInserts);
			throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
		}
943 944 945

		ProfileEvents::increment(ProfileEvents::DelayedInserts);
		ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay * 1000);
946

M
Merge  
Michael Kolupaev 已提交
947
		LOG_INFO(log, "Delaying inserting block by "
948 949 950 951 952 953
			<< std::fixed << std::setprecision(4) << delay << " sec. because there are " << parts_count << " parts");

		if (until)
			until->tryWait(delay * 1000);
		else
			std::this_thread::sleep_for(std::chrono::duration<double>(delay));
M
Merge  
Michael Kolupaev 已提交
954 955 956
	}
}

M
Merge  
Michael Kolupaev 已提交
957
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
M
Merge  
Michael Kolupaev 已提交
958 959
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
960
	ActiveDataPartSet::parsePartName(part_name, *tmp_part);
M
Merge  
Michael Kolupaev 已提交
961

M
Merge  
Michael Kolupaev 已提交
962
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
963

M
Merge  
Michael Kolupaev 已提交
964
	/// Кусок может покрываться только предыдущим или следующим в data_parts.
M
Merge  
Michael Kolupaev 已提交
965
	DataParts::iterator it = data_parts.lower_bound(tmp_part);
M
Merge  
Michael Kolupaev 已提交
966

M
Merge  
Michael Kolupaev 已提交
967
	if (it != data_parts.end())
M
Merge  
Michael Kolupaev 已提交
968 969 970 971 972 973 974
	{
		if ((*it)->name == part_name)
			return *it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

M
Merge  
Michael Kolupaev 已提交
975
	if (it != data_parts.begin())
M
Merge  
Michael Kolupaev 已提交
976 977 978 979 980 981 982 983 984
	{
		--it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

	return nullptr;
}

M
Merge  
Michael Kolupaev 已提交
985 986 987 988 989 990 991 992 993 994 995 996 997
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
	ActiveDataPartSet::parsePartName(part_name, *tmp_part);

	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
	DataParts::iterator it = all_data_parts.lower_bound(tmp_part);
	if (it != all_data_parts.end() && (*it)->name == part_name)
		return *it;

	return nullptr;
}

M
Merge  
Michael Kolupaev 已提交
998 999 1000 1001
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
{
	MutableDataPartPtr part = std::make_shared<DataPart>(*this);
	part->name = relative_path;
1002
	ActiveDataPartSet::parsePartName(Poco::Path(relative_path).getFileName(), *part);
M
Merge  
Michael Kolupaev 已提交
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024

	/// Раньше список столбцов записывался неправильно. Удалим его и создадим заново.
	if (Poco::File(full_path + relative_path + "/columns.txt").exists())
		Poco::File(full_path + relative_path + "/columns.txt").remove();

	part->loadColumns(false);
	part->loadChecksums(false);
	part->loadIndex();
	part->checkNotBroken(false);

	part->modification_time = Poco::File(full_path + relative_path).getLastModified().epochTime();

	/// Если нет файла с чексуммами, посчитаем чексуммы и запишем. Заодно проверим данные.
	if (part->checksums.empty())
	{
		MergeTreePartChecker::Settings settings;
		settings.setIndexGranularity(index_granularity);
		settings.setRequireColumnFiles(true);
		MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, context.getDataTypeFactory(), &part->checksums);

		{
			WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
1025
			part->checksums.write(out);
M
Merge  
Michael Kolupaev 已提交
1026 1027 1028 1029 1030 1031 1032 1033
		}

		Poco::File(full_path + relative_path + "/checksums.txt.tmp").renameTo(full_path + relative_path + "/checksums.txt");
	}

	return part;
}

1034

M
Merge  
Michael Kolupaev 已提交
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{
	if (is_compressed && have_uncompressed)
	{
		if (!rhs.is_compressed)
			throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
		if (rhs.uncompressed_size != uncompressed_size)
			throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
		if (rhs.uncompressed_hash != uncompressed_hash)
			throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
		return;
	}
	if (rhs.file_size != file_size)
		throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
	if (rhs.file_hash != file_hash)
		throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

void MergeTreeData::DataPart::Checksums::Checksum::checkSize(const String & path) const
{
	Poco::File file(path);
	if (!file.exists())
		throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
	size_t size = file.getSize();
	if (size != file_size)
		throw Exception(path + " has unexpected size: " + DB::toString(size) + " instead of " + DB::toString(file_size),
			ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}

void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool have_uncompressed) const
1065
{
M
Merge  
Michael Kolupaev 已提交
1066
	for (const auto & it : rhs.files)
1067 1068 1069
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
1070
		if (!files.count(name))
1071 1072 1073
			throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
	}

M
Merge  
Michael Kolupaev 已提交
1074
	for (const auto & it : files)
1075 1076 1077
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
1078 1079
		auto jt = rhs.files.find(name);
		if (jt == rhs.files.end())
1080 1081
			throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);

M
Merge  
Michael Kolupaev 已提交
1082 1083 1084
		it.second.checkEqual(jt->second, have_uncompressed, name);
	}
}
M
Merge  
Michael Kolupaev 已提交
1085

M
Merge  
Michael Kolupaev 已提交
1086 1087 1088 1089 1090 1091
void MergeTreeData::DataPart::Checksums::checkSizes(const String & path) const
{
	for (const auto & it : files)
	{
		const String & name = it.first;
		it.second.checkSize(path + name);
1092 1093 1094
	}
}

1095
bool MergeTreeData::DataPart::Checksums::read(ReadBuffer & in)
1096
{
M
Merge  
Michael Kolupaev 已提交
1097
	files.clear();
1098

M
Merge  
Michael Kolupaev 已提交
1099 1100 1101
	DB::assertString("checksums format version: ", in);
	int format_version;
	DB::readText(format_version, in);
1102 1103 1104
	DB::assertString("\n", in);

	if (format_version < 1 || format_version > 4)
M
Merge  
Michael Kolupaev 已提交
1105
		throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
1106

1107 1108
	if (format_version == 1)
		return false;
1109
	if (format_version == 2)
1110
		return read_v2(in);
1111
	if (format_version == 3)
1112 1113 1114
		return read_v3(in);
	if (format_version == 4)
		return read_v4(in);
1115 1116 1117 1118

	return false;
}

1119
bool MergeTreeData::DataPart::Checksums::read_v2(ReadBuffer & in)
1120 1121 1122
{
	size_t count;

1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
	DB::readText(count, in);
	DB::assertString(" files:\n", in);

	for (size_t i = 0; i < count; ++i)
	{
		String name;
		Checksum sum;

		DB::readString(name, in);
		DB::assertString("\n\tsize: ", in);
M
Merge  
Michael Kolupaev 已提交
1133
		DB::readText(sum.file_size, in);
1134
		DB::assertString("\n\thash: ", in);
M
Merge  
Michael Kolupaev 已提交
1135
		DB::readText(sum.file_hash.first, in);
1136
		DB::assertString(" ", in);
M
Merge  
Michael Kolupaev 已提交
1137
		DB::readText(sum.file_hash.second, in);
1138 1139 1140
		DB::assertString("\n\tcompressed: ", in);
		DB::readText(sum.is_compressed, in);
		if (sum.is_compressed)
M
Merge  
Michael Kolupaev 已提交
1141
		{
1142 1143 1144 1145 1146 1147
			DB::assertString("\n\tuncompressed size: ", in);
			DB::readText(sum.uncompressed_size, in);
			DB::assertString("\n\tuncompressed hash: ", in);
			DB::readText(sum.uncompressed_hash.first, in);
			DB::assertString(" ", in);
			DB::readText(sum.uncompressed_hash.second, in);
M
Merge  
Michael Kolupaev 已提交
1148
		}
1149
		DB::assertString("\n", in);
1150

M
Merge  
Michael Kolupaev 已提交
1151
		files.insert(std::make_pair(name, sum));
1152
	}
1153 1154

	return true;
1155 1156
}

1157
bool MergeTreeData::DataPart::Checksums::read_v3(ReadBuffer & in)
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
{
	size_t count;

	DB::readVarUInt(count, in);

	for (size_t i = 0; i < count; ++i)
	{
		String name;
		Checksum sum;

		DB::readBinary(name, in);
		DB::readVarUInt(sum.file_size, in);
		DB::readBinary(sum.file_hash, in);
		DB::readBinary(sum.is_compressed, in);

		if (sum.is_compressed)
		{
			DB::readVarUInt(sum.uncompressed_size, in);
			DB::readBinary(sum.uncompressed_hash, in);
		}

		files.emplace(std::move(name), sum);
	}

	return true;
}

1185
bool MergeTreeData::DataPart::Checksums::read_v4(ReadBuffer & from)
1186
{
1187 1188 1189 1190 1191 1192 1193 1194 1195
	CompressedReadBuffer in{from};
	return read_v3(in);
}

void MergeTreeData::DataPart::Checksums::write(WriteBuffer & to) const
{
	DB::writeString("checksums format version: 4\n", to);

	DB::CompressedWriteBuffer out{to, CompressionMethod::LZ4, 1 << 16};
1196
	DB::writeVarUInt(files.size(), out);
1197

M
Merge  
Michael Kolupaev 已提交
1198
	for (const auto & it : files)
1199
	{
M
Merge  
Michael Kolupaev 已提交
1200 1201
		const String & name = it.first;
		const Checksum & sum = it.second;
1202 1203 1204 1205 1206 1207

		DB::writeBinary(name, out);
		DB::writeVarUInt(sum.file_size, out);
		DB::writeBinary(sum.file_hash, out);
		DB::writeBinary(sum.is_compressed, out);

M
Merge  
Michael Kolupaev 已提交
1208 1209
		if (sum.is_compressed)
		{
1210 1211
			DB::writeVarUInt(sum.uncompressed_size, out);
			DB::writeBinary(sum.uncompressed_hash, out);
M
Merge  
Michael Kolupaev 已提交
1212
		}
1213 1214 1215
	}
}

A
Merge  
Andrey Mironov 已提交
1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
void MergeTreeData::calculateColumnSizes()
{
	column_sizes.clear();

	for (const auto & part : data_parts)
		addPartContributionToColumnSizes(part);
}

void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
{
	const auto & files = part->checksums.files;

	for (const auto & column : *columns)
	{
		const auto escaped_name = escapeForFileName(column.name);
		const auto bin_file_name = escaped_name + ".bin";
		const auto mrk_file_name = escaped_name + ".mrk";

		auto & column_size = column_sizes[column.name];

		if (files.count(bin_file_name)) column_size += files.find(bin_file_name)->second.file_size;
		if (files.count(mrk_file_name)) column_size += files.find(mrk_file_name)->second.file_size;
	}
}

void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part)
{
	const auto & files = part->checksums.files;

	for (const auto & column : *columns)
	{
		const auto escaped_name = escapeForFileName(column.name);
		const auto bin_file_name = escaped_name + ".bin";
		const auto mrk_file_name = escaped_name + ".mrk";

		auto & column_size = column_sizes[column.name];

		if (files.count(bin_file_name)) column_size -= files.find(bin_file_name)->second.file_size;
		if (files.count(mrk_file_name)) column_size -= files.find(mrk_file_name)->second.file_size;
	}
}

A
Merge  
Alexey Milovidov 已提交
1258

1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292
void MergeTreeData::freezePartition(const std::string & prefix)
{
	LOG_DEBUG(log, "Freezing parts with prefix " + prefix);

	String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
	String shadow_path = clickhouse_path + "shadow/";
	Poco::File(shadow_path).createDirectories();
	String backup_path = shadow_path + toString(Increment(shadow_path + "increment.txt").get(true)) + "/";

	LOG_DEBUG(log, "Snapshot will be placed at " + backup_path);

	size_t parts_processed = 0;
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
	{
		if (0 == it.name().compare(0, prefix.size(), prefix))
		{
			LOG_DEBUG(log, "Freezing part " + it.name());

			String part_absolute_path = it.path().absolute().toString();
			if (0 != part_absolute_path.compare(0, clickhouse_path.size(), clickhouse_path))
				throw Exception("Part path " + part_absolute_path + " is not inside " + clickhouse_path, ErrorCodes::LOGICAL_ERROR);

			String backup_part_absolute_path = part_absolute_path;
			backup_part_absolute_path.replace(0, clickhouse_path.size(), backup_path);
			localBackup(part_absolute_path, backup_part_absolute_path);
			++parts_processed;
		}
	}

	LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
}


A
Merge  
Alexey Milovidov 已提交
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition)
{
	String month_name = partition.getType() == Field::Types::UInt64
		? toString(partition.get<UInt64>())
		: partition.safeGet<String>();

	if (month_name.size() != 6 || !std::all_of(month_name.begin(), month_name.end(), isdigit))
		throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
			ErrorCodes::INVALID_PARTITION_NAME);

1303
	DayNum_t date = DateLUT::instance().YYYYMMDDToDayNum(parse<UInt32>(month_name + "01"));
A
Merge  
Alexey Milovidov 已提交
1304 1305

	/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
1306
	if (month_name != toString(DateLUT::instance().toNumYYYYMMDD(date) / 100))
A
Merge  
Alexey Milovidov 已提交
1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323
		throw Exception("Invalid partition format: " + month_name + " doesn't look like month.",
			ErrorCodes::INVALID_PARTITION_NAME);

	return std::make_pair(month_name, date);
}


String MergeTreeData::getMonthName(const Field & partition)
{
	return getMonthNameAndDayNum(partition).first;
}

DayNum_t MergeTreeData::getMonthDayNum(const Field & partition)
{
	return getMonthNameAndDayNum(partition).second;
}

M
Merge  
Michael Kolupaev 已提交
1324
}