MergeTreeData.cpp 41.2 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <Poco/Ext/ScopedTry.h>
2 3

#include <DB/Storages/MergeTree/MergeTreeData.h>
M
Merge  
Michael Kolupaev 已提交
4
#include <DB/Interpreters/ExpressionAnalyzer.h>
M
Merge  
Michael Kolupaev 已提交
5
#include <DB/Storages/MergeTree/MergeTreeReader.h>
M
Merge  
Michael Kolupaev 已提交
6 7
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
8
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
M
Merge  
Michael Kolupaev 已提交
9 10 11
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
M
Merge  
Michael Kolupaev 已提交
12
#include <DB/DataStreams/copyData.h>
M
Merge  
Michael Kolupaev 已提交
13
#include <DB/IO/WriteBufferFromFile.h>
14
#include <DB/IO/CompressedReadBuffer.h>
A
Merge  
Alexey Milovidov 已提交
15
#include <DB/DataTypes/DataTypeDate.h>
A
Merge  
Andrey Mironov 已提交
16
#include <DB/DataTypes/DataTypeFixedString.h>
17
#include <DB/Common/localBackup.h>
A
Andrey Mironov 已提交
18
#include <DB/Functions/FunctionFactory.h>
19
#include <Poco/DirectoryIterator.h>
20

P
Merge  
Pavel Kartavyy 已提交
21
#include <algorithm>
22
#include <iomanip>
23
#include <thread>
M
Merge  
Michael Kolupaev 已提交
24 25 26 27 28 29 30



namespace DB
{

MergeTreeData::MergeTreeData(
M
Merge  
Michael Kolupaev 已提交
31
	const String & full_path_, NamesAndTypesListPtr columns_,
32
	const NamesAndTypesList & materialized_columns_,
33 34
	const NamesAndTypesList & alias_columns_,
	const ColumnDefaults & column_defaults_,
M
Merge  
Michael Kolupaev 已提交
35 36 37 38 39 40
	const Context & context_,
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_, const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	Mode mode_,
	const String & sign_column_,
A
Merge  
Alexey Milovidov 已提交
41
	const Names & columns_to_sum_,
M
Merge  
Michael Kolupaev 已提交
42
	const MergeTreeSettings & settings_,
M
Merge  
Michael Kolupaev 已提交
43
	const String & log_name_,
M
Merge  
Michael Kolupaev 已提交
44 45
	bool require_part_metadata_,
	BrokenPartCallback broken_part_callback_)
46
    : ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
M
Merge  
Michael Kolupaev 已提交
47 48
	date_column_name(date_column_name_), sampling_expression(sampling_expression_),
	index_granularity(index_granularity_),
A
Merge  
Alexey Milovidov 已提交
49
	mode(mode_), sign_column(sign_column_), columns_to_sum(columns_to_sum_),
A
Merge  
Alexey Milovidov 已提交
50
	settings(settings_), primary_expr_ast(primary_expr_ast_ ? primary_expr_ast_->clone() : nullptr),
M
Merge  
Michael Kolupaev 已提交
51
	require_part_metadata(require_part_metadata_),
M
Merge  
Michael Kolupaev 已提交
52 53 54
	full_path(full_path_), columns(columns_),
	broken_part_callback(broken_part_callback_),
	log_name(log_name_), log(&Logger::get(log_name + " (Data)"))
M
Merge  
Michael Kolupaev 已提交
55
{
A
Merge  
Alexey Milovidov 已提交
56
	/// Проверяем, что столбец с датой существует и имеет тип Date.
A
Merge  
Alexey Milovidov 已提交
57 58
	const auto check_date_exists = [this] (const NamesAndTypesList & columns)
	{
A
Merge  
Andrey Mironov 已提交
59
		for (const auto & column : columns)
A
Merge  
Alexey Milovidov 已提交
60
		{
A
Merge  
Andrey Mironov 已提交
61
			if (column.name == date_column_name)
A
Merge  
Alexey Milovidov 已提交
62
			{
A
Merge  
Andrey Mironov 已提交
63
				if (!typeid_cast<const DataTypeDate *>(column.type.get()))
A
Merge  
Alexey Milovidov 已提交
64
					throw Exception("Date column (" + date_column_name + ") for storage of MergeTree family must have type Date."
A
Merge  
Andrey Mironov 已提交
65 66 67
						" Provided column of type " + column.type->getName() + "."
						" You may have separate column with type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
				return true;
A
Merge  
Alexey Milovidov 已提交
68 69 70
			}
		}

A
Merge  
Andrey Mironov 已提交
71 72 73 74 75 76
		return false;
	};

	if (!check_date_exists(*columns) && !check_date_exists(materialized_columns))
		throw Exception{
			"Date column (" + date_column_name + ") does not exist in table declaration.",
A
Merge  
Alexey Milovidov 已提交
77 78 79 80 81 82 83 84 85 86 87 88 89
			ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};

	/// Если заданы columns_to_sum, проверяем, что такие столбцы существуют.
	if (!columns_to_sum.empty())
	{
		if (mode != Summing)
			throw Exception("List of columns to sum for MergeTree cannot be specified in all modes except Summing.", ErrorCodes::LOGICAL_ERROR);

		for (const auto & column_to_sum : columns_to_sum)
			if (columns->end() == std::find_if(columns->begin(), columns->end(),
				[&](const NameAndTypePair & name_and_type) { return column_to_sum == name_and_type.name; }))
				throw Exception("Column " + column_to_sum + " listed in columns to sum does not exist in table declaration.");
	}
A
Merge  
Alexey Milovidov 已提交
90

M
Merge  
Michael Kolupaev 已提交
91 92
	/// создаём директорию, если её нет
	Poco::File(full_path).createDirectories();
M
Merge  
Michael Kolupaev 已提交
93
	Poco::File(full_path + "detached").createDirectory();
M
Merge  
Michael Kolupaev 已提交
94

A
Merge  
Alexey Milovidov 已提交
95
	if (primary_expr_ast)
M
Merge  
Michael Kolupaev 已提交
96
	{
A
Merge  
Alexey Milovidov 已提交
97 98 99 100 101 102 103
		/// инициализируем описание сортировки
		sort_descr.reserve(primary_expr_ast->children.size());
		for (const ASTPtr & ast : primary_expr_ast->children)
		{
			String name = ast->getColumnName();
			sort_descr.push_back(SortColumnDescription(name, 1));
		}
M
Merge  
Michael Kolupaev 已提交
104

A
Merge  
Alexey Milovidov 已提交
105
		primary_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(false);
M
Merge  
Michael Kolupaev 已提交
106

A
Merge  
Alexey Milovidov 已提交
107 108 109 110 111
		ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, getColumnsList()).getActions(true);
		primary_key_sample = projected_expr->getSampleBlock();
	}
	else if (mode != Unsorted)
		throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
M
Merge  
Michael Kolupaev 已提交
112
}
M
Merge  
Michael Kolupaev 已提交
113

M
Merge  
Michael Kolupaev 已提交
114 115
UInt64 MergeTreeData::getMaxDataPartIndex()
{
M
Merge  
Michael Kolupaev 已提交
116
	UInt64 max_part_id = 0;
117 118 119
	for (const auto & part : data_parts)
		max_part_id = std::max(max_part_id, part->right);

M
Merge  
Michael Kolupaev 已提交
120
	return max_part_id;
M
Merge  
Michael Kolupaev 已提交
121 122
}

M
Merge  
Michael Kolupaev 已提交
123
std::string MergeTreeData::getModePrefix() const
M
Merge  
Michael Kolupaev 已提交
124
{
M
Merge  
Michael Kolupaev 已提交
125
	switch (mode)
M
Merge  
Michael Kolupaev 已提交
126
	{
M
Merge  
Michael Kolupaev 已提交
127 128 129
		case Ordinary: 		return "";
		case Collapsing: 	return "Collapsing";
		case Summing: 		return "Summing";
S
Merge  
Sergey Fedorov 已提交
130
		case Aggregating: 	return "Aggregating";
A
Merge  
Alexey Milovidov 已提交
131
		case Unsorted: 		return "Unsorted";
M
Merge  
Michael Kolupaev 已提交
132

M
Merge  
Michael Kolupaev 已提交
133 134
		default:
			throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
135 136 137 138
	}
}


M
Merge  
Michael Kolupaev 已提交
139
void MergeTreeData::loadDataParts(bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
140 141 142 143 144 145 146 147
{
	LOG_DEBUG(log, "Loading data parts");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

	data_parts.clear();

148
	Strings part_file_names;
M
Merge  
Michael Kolupaev 已提交
149 150
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
M
Merge  
Michael Kolupaev 已提交
151
	{
152 153
		/// Пропускаем временные директории старше суток.
		if (0 == it.name().compare(0, strlen("tmp_"), "tmp_"))
M
Merge  
Michael Kolupaev 已提交
154 155
			continue;

156
		part_file_names.push_back(it.name());
M
Merge  
Michael Kolupaev 已提交
157 158
	}

M
Merge  
Michael Kolupaev 已提交
159
	DataPartsVector broken_parts_to_remove;
M
Merge  
Michael Kolupaev 已提交
160 161
	DataPartsVector broken_parts_to_detach;
	size_t suspicious_broken_parts = 0;
M
Merge  
Michael Kolupaev 已提交
162

M
Merge  
Michael Kolupaev 已提交
163
	Poco::RegularExpression::MatchVec matches;
M
Merge  
Michael Kolupaev 已提交
164
	for (const String & file_name : part_file_names)
M
Merge  
Michael Kolupaev 已提交
165
	{
M
Merge  
Michael Kolupaev 已提交
166
		if (!ActiveDataPartSet::isPartDirectory(file_name, &matches))
M
Merge  
Michael Kolupaev 已提交
167 168
			continue;

M
Merge  
Michael Kolupaev 已提交
169
		MutableDataPartPtr part = std::make_shared<DataPart>(*this);
170
		ActiveDataPartSet::parsePartName(file_name, *part, &matches);
M
Merge  
Michael Kolupaev 已提交
171 172
		part->name = file_name;

M
Merge  
Michael Kolupaev 已提交
173 174 175 176
		bool broken = false;

		try
		{
M
Merge  
Michael Kolupaev 已提交
177 178
			part->loadColumns(require_part_metadata);
			part->loadChecksums(require_part_metadata);
M
Merge  
Michael Kolupaev 已提交
179
			part->loadIndex();
M
Merge  
Michael Kolupaev 已提交
180
			part->checkNotBroken(require_part_metadata);
M
Merge  
Michael Kolupaev 已提交
181 182 183 184 185 186 187
		}
		catch (...)
		{
			broken = true;
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
188
		/// Игнорируем и, возможно, удаляем битые куски, которые могут образовываться после грубого перезапуска сервера.
M
Merge  
Michael Kolupaev 已提交
189
		if (broken)
M
Merge  
Michael Kolupaev 已提交
190 191 192 193
		{
			if (part->level == 0)
			{
				/// Восстановить куски нулевого уровня невозможно.
194
				LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it's impossible to repair.");
M
Merge  
Michael Kolupaev 已提交
195
				broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
196 197 198
			}
			else
			{
M
Merge  
Michael Kolupaev 已提交
199 200 201 202 203
				/// Посмотрим, сколько кусков покрыты битым. Если хотя бы два, предполагаем, что битый кусок образован их
				///  слиянием, и мы ничего не потеряем, если его удалим.
				int contained_parts = 0;

				LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");
M
Merge  
Michael Kolupaev 已提交
204
				++suspicious_broken_parts;
M
Merge  
Michael Kolupaev 已提交
205 206 207 208 209

				for (const String & contained_name : part_file_names)
				{
					if (contained_name == file_name)
						continue;
M
Merge  
Michael Kolupaev 已提交
210
					if (!ActiveDataPartSet::isPartDirectory(contained_name, &matches))
M
Merge  
Michael Kolupaev 已提交
211 212
						continue;
					DataPart contained_part(*this);
213
					ActiveDataPartSet::parsePartName(contained_name, contained_part, &matches);
M
Merge  
Michael Kolupaev 已提交
214 215 216 217 218 219 220 221 222 223
					if (part->contains(contained_part))
					{
						LOG_ERROR(log, "Found part " << full_path + contained_name);
						++contained_parts;
					}
				}

				if (contained_parts >= 2)
				{
					LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it covers at least 2 other parts");
M
Merge  
Michael Kolupaev 已提交
224
					broken_parts_to_remove.push_back(part);
M
Merge  
Michael Kolupaev 已提交
225 226 227
				}
				else
				{
M
Merge  
Michael Kolupaev 已提交
228
					LOG_ERROR(log, "Detaching broken part " << full_path + file_name
M
Merge  
Michael Kolupaev 已提交
229
						<< " because it covers less than 2 parts. You need to resolve this manually");
M
Merge  
Michael Kolupaev 已提交
230
					broken_parts_to_detach.push_back(part);
M
Merge  
Michael Kolupaev 已提交
231
				}
M
Merge  
Michael Kolupaev 已提交
232 233 234 235 236 237 238 239 240 241
			}

			continue;
		}

		part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();

		data_parts.insert(part);
	}

M
Merge  
Michael Kolupaev 已提交
242 243
	if (suspicious_broken_parts > 5 && !skip_sanity_checks)
		throw Exception("Suspiciously many (" + toString(suspicious_broken_parts) + ") broken parts to remove.",
M
Merge  
Michael Kolupaev 已提交
244 245 246 247
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);

	for (const auto & part : broken_parts_to_remove)
		part->remove();
M
Merge  
Michael Kolupaev 已提交
248
	for (const auto & part : broken_parts_to_detach)
249
		part->renameAddPrefix(true, "");
M
Merge  
Michael Kolupaev 已提交
250

M
Merge  
Michael Kolupaev 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
	all_data_parts = data_parts;

	/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
	  *  но по каким-то причинам остались лежать в файловой системе.
	  * Удаление файлов будет произведено потом в методе clearOldParts.
	  */

	if (data_parts.size() >= 2)
	{
		DataParts::iterator prev_jt = data_parts.begin();
		DataParts::iterator curr_jt = prev_jt;
		++curr_jt;
		while (curr_jt != data_parts.end())
		{
			/// Куски данных за разные месяцы рассматривать не будем
			if ((*curr_jt)->left_month != (*curr_jt)->right_month
				|| (*curr_jt)->right_month != (*prev_jt)->left_month
				|| (*prev_jt)->left_month != (*prev_jt)->right_month)
			{
				++prev_jt;
				++curr_jt;
				continue;
			}

			if ((*curr_jt)->contains(**prev_jt))
			{
M
Merge  
Michael Kolupaev 已提交
277
				(*prev_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
278 279 280 281 282 283
				data_parts.erase(prev_jt);
				prev_jt = curr_jt;
				++curr_jt;
			}
			else if ((*prev_jt)->contains(**curr_jt))
			{
M
Merge  
Michael Kolupaev 已提交
284
				(*curr_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
285 286 287 288 289 290 291 292 293 294
				data_parts.erase(curr_jt++);
			}
			else
			{
				++prev_jt;
				++curr_jt;
			}
		}
	}

A
Merge  
Andrey Mironov 已提交
295 296
	calculateColumnSizes();

M
Merge  
Michael Kolupaev 已提交
297 298 299 300
	LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
}


M
Merge  
Michael Kolupaev 已提交
301
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
M
Merge  
Michael Kolupaev 已提交
302 303
{
	Poco::ScopedTry<Poco::FastMutex> lock;
M
Merge  
Michael Kolupaev 已提交
304
	DataPartsVector res;
M
Merge  
Michael Kolupaev 已提交
305 306 307

	/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
	if (!lock.lock(&all_data_parts_mutex))
308 309
	{
		LOG_TRACE(log, "grabOldParts: all_data_parts is locked");
M
Merge  
Michael Kolupaev 已提交
310
		return res;
311
	}
M
Merge  
Michael Kolupaev 已提交
312

M
Merge  
Michael Kolupaev 已提交
313 314
	/// Удаляем временные директории старше суток.
	Poco::DirectoryIterator end;
315
	for (Poco::DirectoryIterator it{full_path}; it != end; ++it)
M
Merge  
Michael Kolupaev 已提交
316
	{
317
		if (0 == it.name().compare(0, strlen("tmp_"), "tmp_"))
M
Merge  
Michael Kolupaev 已提交
318
		{
319
			Poco::File tmp_dir(full_path + it.name());
M
Merge  
Michael Kolupaev 已提交
320 321 322

			if (tmp_dir.isDirectory() && tmp_dir.getLastModified().epochTime() + 86400 < time(0))
			{
323 324
				LOG_WARNING(log, "Removing temporary directory " << full_path << it.name());
				Poco::File(full_path + it.name()).remove(true);
M
Merge  
Michael Kolupaev 已提交
325 326 327 328
			}
		}
	}

M
Merge  
Michael Kolupaev 已提交
329 330 331
	time_t now = time(0);
	for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
	{
332
		if (it->unique() && /// После этого ref_count не может увеличиться.
M
Merge  
Michael Kolupaev 已提交
333 334 335 336 337 338 339 340 341 342
			(*it)->remove_time < now &&
			now - (*it)->remove_time > settings.old_parts_lifetime)
		{
			res.push_back(*it);
			all_data_parts.erase(it++);
		}
		else
			++it;
	}

M
Merge  
Michael Kolupaev 已提交
343
	return res;
M
Merge  
Michael Kolupaev 已提交
344 345
}

M
Merge  
Michael Kolupaev 已提交
346 347 348 349 350 351 352 353 354 355
void MergeTreeData::addOldParts(const MergeTreeData::DataPartsVector & parts)
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
	all_data_parts.insert(parts.begin(), parts.end());
}

void MergeTreeData::clearOldParts()
{
	auto parts_to_remove = grabOldParts();

356
	for (const DataPartPtr & part : parts_to_remove)
M
Merge  
Michael Kolupaev 已提交
357 358 359 360 361 362
	{
		LOG_DEBUG(log, "Removing part " << part->name);
		part->remove();
	}
}

M
Merge  
Michael Kolupaev 已提交
363
void MergeTreeData::setPath(const String & new_full_path, bool move_data)
M
Merge  
Michael Kolupaev 已提交
364
{
M
Merge  
Michael Kolupaev 已提交
365 366
	if (move_data)
	{
A
Merge  
Andrey Mironov 已提交
367 368 369 370 371 372
		if (Poco::File{new_full_path}.exists())
			throw Exception{
				"Target path already exists: " + new_full_path,
				/// @todo existing target can also be a file, not directory
				ErrorCodes::DIRECTORY_ALREADY_EXISTS
			};
M
Merge  
Michael Kolupaev 已提交
373 374 375 376
		Poco::File(full_path).renameTo(new_full_path);
		/// Если данные перемещать не нужно, значит их переместил кто-то другой. Расчитываем, что он еще и сбросил кеши.
		context.resetCaches();
	}
377

M
Merge  
Michael Kolupaev 已提交
378
	full_path = new_full_path;
M
Merge  
Michael Kolupaev 已提交
379 380
}

M
Merge  
Michael Kolupaev 已提交
381
void MergeTreeData::dropAllData()
M
Merge  
Michael Kolupaev 已提交
382 383 384
{
	data_parts.clear();
	all_data_parts.clear();
A
Merge  
Andrey Mironov 已提交
385
	column_sizes.clear();
M
Merge  
Michael Kolupaev 已提交
386

387
	context.resetCaches();
388

M
Merge  
Michael Kolupaev 已提交
389 390 391 392
	Poco::File(full_path).remove(true);
}


M
Merge  
Michael Kolupaev 已提交
393
void MergeTreeData::checkAlter(const AlterCommands & params)
M
Merge  
Michael Kolupaev 已提交
394
{
M
Merge  
Michael Kolupaev 已提交
395
	/// Проверим, что указанные преобразования можно совершить над списком столбцов без учета типов.
396 397 398 399 400
	auto new_columns = *columns;
	auto new_materialized_columns = materialized_columns;
	auto new_alias_columns = alias_columns;
	auto new_column_defaults = column_defaults;
	params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
M
Merge  
Michael Kolupaev 已提交
401

M
Merge  
Michael Kolupaev 已提交
402 403
	/// Список столбцов, которые нельзя трогать.
	/// sampling_expression можно не учитывать, потому что он обязан содержаться в первичном ключе.
A
Merge  
Alexey Milovidov 已提交
404 405 406 407 408 409

	Names keys;

	if (primary_expr)
		keys = primary_expr->getRequiredColumns();

M
Merge  
Michael Kolupaev 已提交
410
	keys.push_back(sign_column);
A
Merge  
Alexey Milovidov 已提交
411

M
Merge  
Michael Kolupaev 已提交
412
	std::sort(keys.begin(), keys.end());
M
Merge  
Michael Kolupaev 已提交
413

M
Merge  
Michael Kolupaev 已提交
414
	for (const AlterCommand & command : params)
415
	{
M
Merge  
Michael Kolupaev 已提交
416 417
		if (std::binary_search(keys.begin(), keys.end(), command.column_name))
			throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
418 419
	}

M
Merge  
Michael Kolupaev 已提交
420 421 422
	/// Проверим, что преобразования типов возможны.
	ExpressionActionsPtr unused_expression;
	NameToNameMap unused_map;
423 424 425 426 427

	/// augment plain columns with materialized columns for convert expression creation
	new_columns.insert(std::end(new_columns),
		std::begin(new_materialized_columns), std::end(new_materialized_columns));
	createConvertExpression(nullptr, getColumnsList(), new_columns, unused_expression, unused_map);
M
Merge  
Michael Kolupaev 已提交
428 429
}

430
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
M
Merge  
Michael Kolupaev 已提交
431
	ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map)
M
Merge  
Michael Kolupaev 已提交
432
{
M
Merge  
Michael Kolupaev 已提交
433 434
	out_expression = nullptr;
	out_rename_map.clear();
M
Merge  
Michael Kolupaev 已提交
435

M
Merge  
Michael Kolupaev 已提交
436 437 438
	typedef std::map<String, DataTypePtr> NameToType;
	NameToType new_types;
	for (const NameAndTypePair & column : new_columns)
M
Merge  
Michael Kolupaev 已提交
439
	{
M
Merge  
Michael Kolupaev 已提交
440
		new_types[column.name] = column.type;
M
Merge  
Michael Kolupaev 已提交
441
	}
M
Merge  
Michael Kolupaev 已提交
442

M
Merge  
Michael Kolupaev 已提交
443 444 445
	/// Сколько столбцов сейчас в каждой вложенной структуре. Столбцы не из вложенных структур сюда тоже попадут и не помешают.
	std::map<String, int> nested_table_counts;
	for (const NameAndTypePair & column : old_columns)
M
Merge  
Michael Kolupaev 已提交
446
	{
M
Merge  
Michael Kolupaev 已提交
447
		++nested_table_counts[DataTypeNested::extractNestedTableName(column.name)];
M
Merge  
Michael Kolupaev 已提交
448
	}
449

M
Merge  
Michael Kolupaev 已提交
450
	for (const NameAndTypePair & column : old_columns)
M
Merge  
Michael Kolupaev 已提交
451
	{
M
Merge  
Michael Kolupaev 已提交
452
		if (!new_types.count(column.name))
453
		{
M
Merge  
Michael Kolupaev 已提交
454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
			if (!part || part->hasColumnFiles(column.name))
			{
				/// Столбец нужно удалить.

				String escaped_column = escapeForFileName(column.name);
				out_rename_map[escaped_column + ".bin"] = "";
				out_rename_map[escaped_column + ".mrk"] = "";

				/// Если это массив или последний столбец вложенной структуры, нужно удалить файлы с размерами.
				if (typeid_cast<const DataTypeArray *>(&*column.type))
				{
					String nested_table = DataTypeNested::extractNestedTableName(column.name);
					/// Если это был последний столбец, относящийся к этим файлам .size0, удалим файлы.
					if (!--nested_table_counts[nested_table])
					{
						String escaped_nested_table = escapeForFileName(nested_table);
						out_rename_map[escaped_nested_table + ".size0.bin"] = "";
						out_rename_map[escaped_nested_table + ".size0.mrk"] = "";
					}
				}
			}
475
		}
M
Merge  
Michael Kolupaev 已提交
476 477
		else
		{
A
Merge  
Andrey Mironov 已提交
478 479
			const auto new_type = new_types[column.name].get();
			const String new_type_name = new_type->getName();
M
Merge  
Michael Kolupaev 已提交
480 481 482 483 484 485 486 487

			if (new_type_name != column.type->getName() &&
				(!part || part->hasColumnFiles(column.name)))
			{
				/// Нужно изменить тип столбца.

				if (!out_expression)
					out_expression = new ExpressionActions(NamesAndTypesList(), context.getSettingsRef());
M
Merge  
Michael Kolupaev 已提交
488

M
Merge  
Michael Kolupaev 已提交
489 490 491
				out_expression->addInput(ColumnWithNameAndType(nullptr, column.type, column.name));

				Names out_names;
A
Merge  
Andrey Mironov 已提交
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511

				if (const auto fixed_string = typeid_cast<const DataTypeFixedString *>(new_type))
				{
					const auto width = fixed_string->getN();
					const auto string_width_column = toString(width);
					out_expression->addInput({ new ColumnConstUInt64{1, width}, new DataTypeUInt64, string_width_column });

					const auto function = FunctionFactory::instance().get("toFixedString", context);
					out_expression->add(ExpressionAction::applyFunction(function, Names{
						column.name, string_width_column
					}), out_names);

					out_expression->add(ExpressionAction::removeColumn(string_width_column));
				}
				else
				{
					const FunctionPtr & function = FunctionFactory::instance().get("to" + new_type_name, context);
					out_expression->add(ExpressionAction::applyFunction(function, Names{column.name}), out_names);
				}

M
Merge  
Michael Kolupaev 已提交
512 513
				out_expression->add(ExpressionAction::removeColumn(column.name));

A
Merge  
Andrey Mironov 已提交
514 515
				const String escaped_expr = escapeForFileName(out_names[0]);
				const String escaped_column = escapeForFileName(column.name);
M
Merge  
Michael Kolupaev 已提交
516 517 518 519
				out_rename_map[escaped_expr + ".bin"] = escaped_column + ".bin";
				out_rename_map[escaped_expr + ".mrk"] = escaped_column + ".mrk";
			}
		}
M
Merge  
Michael Kolupaev 已提交
520 521
	}
}
M
Merge  
Michael Kolupaev 已提交
522

523 524
MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
	const DataPartPtr & part, const NamesAndTypesList & new_columns, bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
525
{
M
Merge  
Michael Kolupaev 已提交
526
	ExpressionActionsPtr expression;
M
Merge  
Michael Kolupaev 已提交
527
	AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Блокирует изменение куска.
M
Merge  
Michael Kolupaev 已提交
528
	createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map);
M
Merge  
Michael Kolupaev 已提交
529

M
Merge  
Michael Kolupaev 已提交
530 531 532 533 534 535 536 537
	if (!skip_sanity_checks && transaction->rename_map.size() > 5)
	{
		transaction->clear();

		throw Exception("Suspiciously many (" + toString(transaction->rename_map.size()) + ") files need to be modified in part " + part->name
						+ ". Aborting just in case");
	}

M
Merge  
Michael Kolupaev 已提交
538
	if (transaction->rename_map.empty())
M
Merge  
Michael Kolupaev 已提交
539
	{
M
Merge  
Michael Kolupaev 已提交
540
		transaction->clear();
M
Merge  
Michael Kolupaev 已提交
541
		return nullptr;
M
Merge  
Michael Kolupaev 已提交
542
	}
M
Merge  
Michael Kolupaev 已提交
543

M
Merge  
Michael Kolupaev 已提交
544 545 546 547
	DataPart::Checksums add_checksums;

	/// Применим выражение и запишем результат во временные файлы.
	if (expression)
M
Merge  
Michael Kolupaev 已提交
548 549
	{
		MarkRanges ranges(1, MarkRange(0, part->size));
M
Merge  
Michael Kolupaev 已提交
550
		BlockInputStreamPtr part_in = new MergeTreeBlockInputStream(full_path + part->name + '/',
A
Merge  
Alexey Milovidov 已提交
551
			DEFAULT_MERGE_BLOCK_SIZE, expression->getRequiredColumns(), *this, part, ranges, false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE);
M
Merge  
Michael Kolupaev 已提交
552
		ExpressionBlockInputStream in(part_in, expression);
553
		MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4);
M
Merge  
Michael Kolupaev 已提交
554
		in.readPrefix();
M
Merge  
Michael Kolupaev 已提交
555
		out.writePrefix();
M
Merge  
Michael Kolupaev 已提交
556

M
Merge  
Michael Kolupaev 已提交
557 558
		while (Block b = in.read())
			out.write(b);
M
Merge  
Michael Kolupaev 已提交
559

M
Merge  
Michael Kolupaev 已提交
560 561 562
		in.readSuffix();
		add_checksums = out.writeSuffixAndGetChecksums();
	}
M
Merge  
Michael Kolupaev 已提交
563

M
Merge  
Michael Kolupaev 已提交
564
	/// Обновим контрольные суммы.
M
Merge  
Michael Kolupaev 已提交
565 566 567 568 569 570
	DataPart::Checksums new_checksums = part->checksums;
	for (auto it : transaction->rename_map)
	{
		if (it.second == "")
		{
			new_checksums.files.erase(it.first);
M
Merge  
Michael Kolupaev 已提交
571
		}
M
Merge  
Michael Kolupaev 已提交
572
		else
M
Merge  
Michael Kolupaev 已提交
573
		{
M
Merge  
Michael Kolupaev 已提交
574
			new_checksums.files[it.second] = add_checksums.files[it.first];
M
Merge  
Michael Kolupaev 已提交
575 576
		}
	}
M
Merge  
Michael Kolupaev 已提交
577

M
Merge  
Michael Kolupaev 已提交
578 579
	/// Запишем обновленные контрольные суммы во временный файл
	if (!part->checksums.empty())
M
Merge  
Michael Kolupaev 已提交
580
	{
M
Merge  
Michael Kolupaev 已提交
581
		transaction->new_checksums = new_checksums;
M
Merge  
Michael Kolupaev 已提交
582
		WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
583
		new_checksums.write(checksums_file);
M
Merge  
Michael Kolupaev 已提交
584
		transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
M
Merge  
Michael Kolupaev 已提交
585
	}
M
Merge  
Michael Kolupaev 已提交
586

M
Merge  
Michael Kolupaev 已提交
587 588
	/// Запишем обновленный список столбцов во временный файл.
	{
M
Merge  
Michael Kolupaev 已提交
589
		transaction->new_columns = new_columns.filter(part->columns.getNames());
M
Merge  
Michael Kolupaev 已提交
590 591 592 593 594
		WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
		transaction->new_columns.writeText(columns_file);
		transaction->rename_map["columns.txt.tmp"] = "columns.txt";
	}

M
Merge  
Michael Kolupaev 已提交
595 596
	return transaction;
}
M
Merge  
Michael Kolupaev 已提交
597

M
Merge  
Michael Kolupaev 已提交
598 599 600 601 602
void MergeTreeData::AlterDataPartTransaction::commit()
{
	if (!data_part)
		return;
	try
M
Merge  
Michael Kolupaev 已提交
603
	{
M
Merge  
Michael Kolupaev 已提交
604 605
		Poco::ScopedWriteRWLock lock(data_part->columns_lock);

M
Merge  
Michael Kolupaev 已提交
606
		String path = data_part->storage.full_path + data_part->name + "/";
M
Merge  
Michael Kolupaev 已提交
607 608

		/// 1) Переименуем старые файлы.
M
Merge  
Michael Kolupaev 已提交
609
		for (auto it : rename_map)
M
Merge  
Michael Kolupaev 已提交
610
		{
M
Merge  
Michael Kolupaev 已提交
611 612 613 614
			String name = it.second.empty() ? it.first : it.second;
			Poco::File(path + name).renameTo(path + name + ".tmp2");
		}

615
		/// 2) Переместим на их место новые и обновим метаданные в оперативке.
M
Merge  
Michael Kolupaev 已提交
616 617 618
		for (auto it : rename_map)
		{
			if (!it.second.empty())
M
Merge  
Michael Kolupaev 已提交
619
			{
M
Merge  
Michael Kolupaev 已提交
620
				Poco::File(path + it.first).renameTo(path + it.second);
M
Merge  
Michael Kolupaev 已提交
621
			}
M
Merge  
Michael Kolupaev 已提交
622
		}
623 624 625 626

		DataPart & mutable_part = const_cast<DataPart &>(*data_part);
		mutable_part.checksums = new_checksums;
		mutable_part.columns = new_columns;
M
Merge  
Michael Kolupaev 已提交
627 628 629 630 631 632 633 634

		/// 3) Удалим старые файлы.
		for (auto it : rename_map)
		{
			String name = it.second.empty() ? it.first : it.second;
			Poco::File(path + name + ".tmp2").remove();
		}

M
Merge  
Michael Kolupaev 已提交
635 636
		mutable_part.size_in_bytes = MergeTreeData::DataPart::calcTotalSize(path);

M
Merge  
Michael Kolupaev 已提交
637 638 639
		/// TODO: можно не сбрасывать кеши при добавлении столбца.
		data_part->storage.context.resetCaches();

M
Merge  
Michael Kolupaev 已提交
640
		clear();
M
Merge  
Michael Kolupaev 已提交
641 642 643 644
	}
	catch (...)
	{
		/// Если что-то пошло не так, не будем удалять временные файлы в деструкторе.
M
Merge  
Michael Kolupaev 已提交
645
		clear();
M
Merge  
Michael Kolupaev 已提交
646
		throw;
M
Merge  
Michael Kolupaev 已提交
647
	}
M
Merge  
Michael Kolupaev 已提交
648
}
M
Merge  
Michael Kolupaev 已提交
649

M
Merge  
Michael Kolupaev 已提交
650
MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
M
Merge  
Michael Kolupaev 已提交
651
{
M
Merge  
Michael Kolupaev 已提交
652
	try
M
Merge  
Michael Kolupaev 已提交
653
	{
M
Merge  
Michael Kolupaev 已提交
654 655 656 657
		if (!data_part)
			return;

		LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->name);
M
Merge  
Michael Kolupaev 已提交
658

M
Merge  
Michael Kolupaev 已提交
659 660 661 662
		String path = data_part->storage.full_path + data_part->name + "/";
		for (auto it : rename_map)
		{
			if (!it.second.empty())
M
Merge  
Michael Kolupaev 已提交
663
			{
M
Merge  
Michael Kolupaev 已提交
664 665
				try
				{
M
Merge  
Michael Kolupaev 已提交
666 667 668
					Poco::File file(path + it.first);
					if (file.exists())
						file.remove();
M
Merge  
Michael Kolupaev 已提交
669 670 671 672 673
				}
				catch (Poco::Exception & e)
				{
					LOG_WARNING(data_part->storage.log, "Can't remove " << path + it.first << ": " << e.displayText());
				}
M
Merge  
Michael Kolupaev 已提交
674
			}
M
Merge  
Michael Kolupaev 已提交
675 676
		}
	}
M
Merge  
Michael Kolupaev 已提交
677
	catch (...)
M
Merge  
Michael Kolupaev 已提交
678
	{
M
Merge  
Michael Kolupaev 已提交
679
		tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
680
	}
M
Merge  
Michael Kolupaev 已提交
681 682 683
}


684
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
M
Merge  
Michael Kolupaev 已提交
685
{
M
Merge  
Michael Kolupaev 已提交
686
	auto removed = renameTempPartAndReplace(part, increment, out_transaction);
M
Merge  
Michael Kolupaev 已提交
687 688 689 690 691
	if (!removed.empty())
	{
		LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
			<< " existing part(s) (including " << removed[0]->name << ")");
	}
M
Merge  
Michael Kolupaev 已提交
692 693
}

M
Merge  
Michael Kolupaev 已提交
694
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
695
	MutableDataPartPtr & part, Increment * increment, Transaction * out_transaction)
M
Merge  
Michael Kolupaev 已提交
696
{
M
Merge  
Michael Kolupaev 已提交
697
	if (out_transaction && out_transaction->data)
698
		throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid", ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
699

700
	LOG_TRACE(log, "Renaming " << part->name << ".");
M
Merge  
Michael Kolupaev 已提交
701 702 703 704

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

M
Merge  
Michael Kolupaev 已提交
705 706
	String old_name = part->name;
	String old_path = getFullPath() + old_name + "/";
M
Merge  
Michael Kolupaev 已提交
707

M
Merge  
Michael Kolupaev 已提交
708
	/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
M
Merge  
Michael Kolupaev 已提交
709 710 711 712
	  * Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
	  *  содержат ещё не добавленный кусок.
	  */
	if (increment)
M
Merge  
Michael Kolupaev 已提交
713
		part->left = part->right = increment->get(false);
M
Merge  
Michael Kolupaev 已提交
714

M
Merge  
Michael Kolupaev 已提交
715
	String new_name = ActiveDataPartSet::getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
M
Merge  
Michael Kolupaev 已提交
716

M
Merge  
Michael Kolupaev 已提交
717 718 719 720 721
	part->is_temp = false;
	part->name = new_name;
	bool duplicate = data_parts.count(part);
	part->name = old_name;
	part->is_temp = true;
M
Merge  
Michael Kolupaev 已提交
722

M
Merge  
Michael Kolupaev 已提交
723 724 725 726
	if (duplicate)
		throw Exception("Part " + new_name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);

	String new_path = getFullPath() + new_name + "/";
M
Merge  
Michael Kolupaev 已提交
727 728 729 730

	/// Переименовываем кусок.
	Poco::File(old_path).renameTo(new_path);

M
Merge  
Michael Kolupaev 已提交
731 732 733
	part->is_temp = false;
	part->name = new_name;

M
Merge  
Michael Kolupaev 已提交
734
	bool obsolete = false; /// Покрыт ли part каким-нибудь куском.
M
Merge  
Michael Kolupaev 已提交
735 736 737 738 739 740 741 742
	DataPartsVector res;
	/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
	DataParts::iterator it = data_parts.lower_bound(part);
	/// Пойдем влево.
	while (it != data_parts.begin())
	{
		--it;
		if (!part->contains(**it))
M
Michael Kolupaev 已提交
743
		{
M
Merge  
Michael Kolupaev 已提交
744 745
			if ((*it)->contains(*part))
				obsolete = true;
M
Michael Kolupaev 已提交
746
			++it;
M
Merge  
Michael Kolupaev 已提交
747
			break;
M
Michael Kolupaev 已提交
748
		}
M
Merge  
Michael Kolupaev 已提交
749
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
750
		(*it)->remove_time = time(0);
A
Merge  
Andrey Mironov 已提交
751
		removePartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
752 753 754 755
		data_parts.erase(it++); /// Да, ++, а не --.
	}
	std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
	/// Пойдем вправо.
M
Merge  
Michael Kolupaev 已提交
756
	while (it != data_parts.end())
M
Merge  
Michael Kolupaev 已提交
757
	{
M
Merge  
Michael Kolupaev 已提交
758 759 760 761 762 763
		if (!part->contains(**it))
		{
			if ((*it)->name == part->name || (*it)->contains(*part))
				obsolete = true;
			break;
		}
M
Merge  
Michael Kolupaev 已提交
764
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
765
		(*it)->remove_time = time(0);
A
Merge  
Andrey Mironov 已提交
766
		removePartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
767 768 769
		data_parts.erase(it++);
	}

M
Merge  
Michael Kolupaev 已提交
770 771 772 773 774 775 776
	if (obsolete)
	{
		LOG_WARNING(log, "Obsolete part " + part->name + " added");
	}
	else
	{
		data_parts.insert(part);
A
Merge  
Andrey Mironov 已提交
777
		addPartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
778
	}
779

M
Merge  
Michael Kolupaev 已提交
780
	all_data_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
781

M
Merge  
Michael Kolupaev 已提交
782 783 784 785 786 787 788
	if (out_transaction)
	{
		out_transaction->data = this;
		out_transaction->added_parts = res;
		out_transaction->removed_parts = DataPartsVector(1, part);
	}

M
Merge  
Michael Kolupaev 已提交
789
	return res;
M
Merge  
Michael Kolupaev 已提交
790 791
}

M
Merge  
Michael Kolupaev 已提交
792
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add, bool clear_without_timeout)
M
Merge  
Michael Kolupaev 已提交
793 794 795 796 797
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	for (const DataPartPtr & part : remove)
	{
M
Merge  
Michael Kolupaev 已提交
798
		part->remove_time = clear_without_timeout ? 0 : time(0);
A
Merge  
Andrey Mironov 已提交
799
		removePartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
800 801 802 803 804
		data_parts.erase(part);
	}
	for (const DataPartPtr & part : add)
	{
		data_parts.insert(part);
A
Merge  
Andrey Mironov 已提交
805
		addPartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
806 807 808
	}
}

809
void MergeTreeData::attachPart(const DataPartPtr & part)
M
Merge  
Michael Kolupaev 已提交
810 811 812 813 814 815 816 817 818
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

	if (!all_data_parts.insert(part).second)
		throw Exception("Part " + part->name + " is already attached", ErrorCodes::DUPLICATE_DATA_PART);
	data_parts.insert(part);
}

819
void MergeTreeData::renameAndDetachPart(const DataPartPtr & part, const String & prefix, bool restore_covered, bool move_to_detached)
M
Merge  
Michael Kolupaev 已提交
820
{
M
Merge  
Michael Kolupaev 已提交
821 822
	LOG_INFO(log, "Renaming " << part->name << " to " << prefix << part->name << " and detaching it.");

M
Merge  
Michael Kolupaev 已提交
823
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
824
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
825

M
Merge  
Michael Kolupaev 已提交
826
	if (!all_data_parts.erase(part))
M
Merge  
Michael Kolupaev 已提交
827
		throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
828

A
Merge  
Andrey Mironov 已提交
829
	removePartContributionToColumnSizes(part);
M
Merge  
Michael Kolupaev 已提交
830
	data_parts.erase(part);
M
Merge  
Michael Kolupaev 已提交
831
	if (move_to_detached || !prefix.empty())
832
		part->renameAddPrefix(move_to_detached, prefix);
M
Merge  
Michael Kolupaev 已提交
833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849

	if (restore_covered)
	{
		auto it = all_data_parts.lower_bound(part);
		Strings restored;
		bool error = false;

		UInt64 pos = part->left;

		if (it != all_data_parts.begin())
		{
			--it;
			if (part->contains(**it))
			{
				if ((*it)->left != part->left)
					error = true;
				data_parts.insert(*it);
A
Merge  
Andrey Mironov 已提交
850
				addPartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867
				pos = (*it)->right + 1;
				restored.push_back((*it)->name);
			}
			else
				error = true;
			++it;
		}
		else
			error = true;

		for (; it != all_data_parts.end() && part->contains(**it); ++it)
		{
			if ((*it)->left < pos)
				continue;
			if ((*it)->left > pos)
				error = true;
			data_parts.insert(*it);
A
Merge  
Andrey Mironov 已提交
868
			addPartContributionToColumnSizes(*it);
M
Merge  
Michael Kolupaev 已提交
869 870 871 872 873 874 875 876 877 878 879 880 881 882 883
			pos = (*it)->right + 1;
			restored.push_back((*it)->name);
		}

		if (pos != part->right + 1)
			error = true;

		for (const String & name : restored)
		{
			LOG_INFO(log, "Activated part " << name);
		}

		if (error)
			LOG_ERROR(log, "The set of parts restored in place of " << part->name << " looks incomplete. There might or might not be a data loss.");
	}
M
Merge  
Michael Kolupaev 已提交
884 885
}

886
void MergeTreeData::detachPartInPlace(const DataPartPtr & part)
M
Merge  
Michael Kolupaev 已提交
887 888 889 890
{
	renameAndDetachPart(part, "", false, false);
}

M
Merge  
Michael Kolupaev 已提交
891 892 893 894 895 896 897
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	return data_parts;
}

A
Merge  
Andrey Mironov 已提交
898 899 900 901
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVector()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

902
	return DataPartsVector(std::begin(data_parts), std::end(data_parts));
A
Merge  
Andrey Mironov 已提交
903 904
}

M
Merge  
Michael Kolupaev 已提交
905 906 907 908 909 910 911
MergeTreeData::DataParts MergeTreeData::getAllDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);

	return all_data_parts;
}

M
Merge  
Michael Kolupaev 已提交
912
size_t MergeTreeData::getMaxPartsCountForMonth()
M
Merge  
Michael Kolupaev 已提交
913 914 915
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

M
Merge  
Michael Kolupaev 已提交
916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
	size_t res = 0;
	size_t cur_count = 0;
	DayNum_t cur_month = DayNum_t(0);

	for (const auto & part : data_parts)
	{
		if (part->left_month == cur_month)
		{
			++cur_count;
		}
		else
		{
			cur_month = part->left_month;
			cur_count = 1;
		}

		res = std::max(res, cur_count);
	}

	return res;
M
Merge  
Michael Kolupaev 已提交
936 937
}

938
void MergeTreeData::delayInsertIfNeeded(Poco::Event * until)
M
Merge  
Michael Kolupaev 已提交
939 940 941 942 943 944
{
	size_t parts_count = getMaxPartsCountForMonth();
	if (parts_count > settings.parts_to_delay_insert)
	{
		double delay = std::pow(settings.insert_delay_step, parts_count - settings.parts_to_delay_insert);
		delay /= 1000;
945 946

		if (delay > DBMS_MAX_DELAY_OF_INSERT)
947 948 949 950
		{
			ProfileEvents::increment(ProfileEvents::RejectedInserts);
			throw Exception("Too much parts. Merges are processing significantly slower than inserts.", ErrorCodes::TOO_MUCH_PARTS);
		}
951 952 953

		ProfileEvents::increment(ProfileEvents::DelayedInserts);
		ProfileEvents::increment(ProfileEvents::DelayedInsertsMilliseconds, delay * 1000);
954

M
Merge  
Michael Kolupaev 已提交
955
		LOG_INFO(log, "Delaying inserting block by "
956 957 958 959 960 961
			<< std::fixed << std::setprecision(4) << delay << " sec. because there are " << parts_count << " parts");

		if (until)
			until->tryWait(delay * 1000);
		else
			std::this_thread::sleep_for(std::chrono::duration<double>(delay));
M
Merge  
Michael Kolupaev 已提交
962 963 964
	}
}

M
Merge  
Michael Kolupaev 已提交
965
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
M
Merge  
Michael Kolupaev 已提交
966 967
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
968
	ActiveDataPartSet::parsePartName(part_name, *tmp_part);
M
Merge  
Michael Kolupaev 已提交
969

M
Merge  
Michael Kolupaev 已提交
970
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
971

M
Merge  
Michael Kolupaev 已提交
972
	/// Кусок может покрываться только предыдущим или следующим в data_parts.
M
Merge  
Michael Kolupaev 已提交
973
	DataParts::iterator it = data_parts.lower_bound(tmp_part);
M
Merge  
Michael Kolupaev 已提交
974

M
Merge  
Michael Kolupaev 已提交
975
	if (it != data_parts.end())
M
Merge  
Michael Kolupaev 已提交
976 977 978 979 980 981 982
	{
		if ((*it)->name == part_name)
			return *it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

M
Merge  
Michael Kolupaev 已提交
983
	if (it != data_parts.begin())
M
Merge  
Michael Kolupaev 已提交
984 985 986 987 988 989 990 991 992
	{
		--it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

	return nullptr;
}

M
Merge  
Michael Kolupaev 已提交
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name)
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
	ActiveDataPartSet::parsePartName(part_name, *tmp_part);

	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
	DataParts::iterator it = all_data_parts.lower_bound(tmp_part);
	if (it != all_data_parts.end() && (*it)->name == part_name)
		return *it;

	return nullptr;
}

M
Merge  
Michael Kolupaev 已提交
1006 1007 1008 1009
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
{
	MutableDataPartPtr part = std::make_shared<DataPart>(*this);
	part->name = relative_path;
1010
	ActiveDataPartSet::parsePartName(Poco::Path(relative_path).getFileName(), *part);
M
Merge  
Michael Kolupaev 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032

	/// Раньше список столбцов записывался неправильно. Удалим его и создадим заново.
	if (Poco::File(full_path + relative_path + "/columns.txt").exists())
		Poco::File(full_path + relative_path + "/columns.txt").remove();

	part->loadColumns(false);
	part->loadChecksums(false);
	part->loadIndex();
	part->checkNotBroken(false);

	part->modification_time = Poco::File(full_path + relative_path).getLastModified().epochTime();

	/// Если нет файла с чексуммами, посчитаем чексуммы и запишем. Заодно проверим данные.
	if (part->checksums.empty())
	{
		MergeTreePartChecker::Settings settings;
		settings.setIndexGranularity(index_granularity);
		settings.setRequireColumnFiles(true);
		MergeTreePartChecker::checkDataPart(full_path + relative_path, settings, context.getDataTypeFactory(), &part->checksums);

		{
			WriteBufferFromFile out(full_path + relative_path + "/checksums.txt.tmp", 4096);
1033
			part->checksums.write(out);
M
Merge  
Michael Kolupaev 已提交
1034 1035 1036 1037 1038 1039 1040 1041
		}

		Poco::File(full_path + relative_path + "/checksums.txt.tmp").renameTo(full_path + relative_path + "/checksums.txt");
	}

	return part;
}

1042

M
Merge  
Michael Kolupaev 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{
	if (is_compressed && have_uncompressed)
	{
		if (!rhs.is_compressed)
			throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
		if (rhs.uncompressed_size != uncompressed_size)
			throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
		if (rhs.uncompressed_hash != uncompressed_hash)
			throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
		return;
	}
	if (rhs.file_size != file_size)
		throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
	if (rhs.file_hash != file_hash)
		throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

void MergeTreeData::DataPart::Checksums::Checksum::checkSize(const String & path) const
{
	Poco::File file(path);
	if (!file.exists())
		throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
	size_t size = file.getSize();
	if (size != file_size)
		throw Exception(path + " has unexpected size: " + DB::toString(size) + " instead of " + DB::toString(file_size),
			ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}

void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool have_uncompressed) const
1073
{
M
Merge  
Michael Kolupaev 已提交
1074
	for (const auto & it : rhs.files)
1075 1076 1077
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
1078
		if (!files.count(name))
1079 1080 1081
			throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
	}

M
Merge  
Michael Kolupaev 已提交
1082
	for (const auto & it : files)
1083 1084 1085
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
1086 1087
		auto jt = rhs.files.find(name);
		if (jt == rhs.files.end())
1088 1089
			throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);

M
Merge  
Michael Kolupaev 已提交
1090 1091 1092
		it.second.checkEqual(jt->second, have_uncompressed, name);
	}
}
M
Merge  
Michael Kolupaev 已提交
1093

M
Merge  
Michael Kolupaev 已提交
1094 1095 1096 1097 1098 1099
void MergeTreeData::DataPart::Checksums::checkSizes(const String & path) const
{
	for (const auto & it : files)
	{
		const String & name = it.first;
		it.second.checkSize(path + name);
1100 1101 1102
	}
}

1103
bool MergeTreeData::DataPart::Checksums::read(ReadBuffer & in)
1104
{
M
Merge  
Michael Kolupaev 已提交
1105
	files.clear();
1106

M
Merge  
Michael Kolupaev 已提交
1107 1108 1109
	DB::assertString("checksums format version: ", in);
	int format_version;
	DB::readText(format_version, in);
1110 1111 1112
	DB::assertString("\n", in);

	if (format_version < 1 || format_version > 4)
M
Merge  
Michael Kolupaev 已提交
1113
		throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
1114

1115 1116
	if (format_version == 1)
		return false;
1117
	if (format_version == 2)
1118
		return read_v2(in);
1119
	if (format_version == 3)
1120 1121 1122
		return read_v3(in);
	if (format_version == 4)
		return read_v4(in);
1123 1124 1125 1126

	return false;
}

1127
bool MergeTreeData::DataPart::Checksums::read_v2(ReadBuffer & in)
1128 1129 1130
{
	size_t count;

1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
	DB::readText(count, in);
	DB::assertString(" files:\n", in);

	for (size_t i = 0; i < count; ++i)
	{
		String name;
		Checksum sum;

		DB::readString(name, in);
		DB::assertString("\n\tsize: ", in);
M
Merge  
Michael Kolupaev 已提交
1141
		DB::readText(sum.file_size, in);
1142
		DB::assertString("\n\thash: ", in);
M
Merge  
Michael Kolupaev 已提交
1143
		DB::readText(sum.file_hash.first, in);
1144
		DB::assertString(" ", in);
M
Merge  
Michael Kolupaev 已提交
1145
		DB::readText(sum.file_hash.second, in);
1146 1147 1148
		DB::assertString("\n\tcompressed: ", in);
		DB::readText(sum.is_compressed, in);
		if (sum.is_compressed)
M
Merge  
Michael Kolupaev 已提交
1149
		{
1150 1151 1152 1153 1154 1155
			DB::assertString("\n\tuncompressed size: ", in);
			DB::readText(sum.uncompressed_size, in);
			DB::assertString("\n\tuncompressed hash: ", in);
			DB::readText(sum.uncompressed_hash.first, in);
			DB::assertString(" ", in);
			DB::readText(sum.uncompressed_hash.second, in);
M
Merge  
Michael Kolupaev 已提交
1156
		}
1157
		DB::assertString("\n", in);
1158

M
Merge  
Michael Kolupaev 已提交
1159
		files.insert(std::make_pair(name, sum));
1160
	}
1161 1162

	return true;
1163 1164
}

1165
bool MergeTreeData::DataPart::Checksums::read_v3(ReadBuffer & in)
1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192
{
	size_t count;

	DB::readVarUInt(count, in);

	for (size_t i = 0; i < count; ++i)
	{
		String name;
		Checksum sum;

		DB::readBinary(name, in);
		DB::readVarUInt(sum.file_size, in);
		DB::readBinary(sum.file_hash, in);
		DB::readBinary(sum.is_compressed, in);

		if (sum.is_compressed)
		{
			DB::readVarUInt(sum.uncompressed_size, in);
			DB::readBinary(sum.uncompressed_hash, in);
		}

		files.emplace(std::move(name), sum);
	}

	return true;
}

1193
bool MergeTreeData::DataPart::Checksums::read_v4(ReadBuffer & from)
1194
{
1195 1196 1197 1198 1199 1200 1201 1202 1203
	CompressedReadBuffer in{from};
	return read_v3(in);
}

void MergeTreeData::DataPart::Checksums::write(WriteBuffer & to) const
{
	DB::writeString("checksums format version: 4\n", to);

	DB::CompressedWriteBuffer out{to, CompressionMethod::LZ4, 1 << 16};
1204
	DB::writeVarUInt(files.size(), out);
1205

M
Merge  
Michael Kolupaev 已提交
1206
	for (const auto & it : files)
1207
	{
M
Merge  
Michael Kolupaev 已提交
1208 1209
		const String & name = it.first;
		const Checksum & sum = it.second;
1210 1211 1212 1213 1214 1215

		DB::writeBinary(name, out);
		DB::writeVarUInt(sum.file_size, out);
		DB::writeBinary(sum.file_hash, out);
		DB::writeBinary(sum.is_compressed, out);

M
Merge  
Michael Kolupaev 已提交
1216 1217
		if (sum.is_compressed)
		{
1218 1219
			DB::writeVarUInt(sum.uncompressed_size, out);
			DB::writeBinary(sum.uncompressed_hash, out);
M
Merge  
Michael Kolupaev 已提交
1220
		}
1221 1222 1223
	}
}

A
Merge  
Andrey Mironov 已提交
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
void MergeTreeData::calculateColumnSizes()
{
	column_sizes.clear();

	for (const auto & part : data_parts)
		addPartContributionToColumnSizes(part);
}

void MergeTreeData::addPartContributionToColumnSizes(const DataPartPtr & part)
{
	const auto & files = part->checksums.files;

	for (const auto & column : *columns)
	{
		const auto escaped_name = escapeForFileName(column.name);
		const auto bin_file_name = escaped_name + ".bin";
		const auto mrk_file_name = escaped_name + ".mrk";

		auto & column_size = column_sizes[column.name];

		if (files.count(bin_file_name)) column_size += files.find(bin_file_name)->second.file_size;
		if (files.count(mrk_file_name)) column_size += files.find(mrk_file_name)->second.file_size;
	}
}

void MergeTreeData::removePartContributionToColumnSizes(const DataPartPtr & part)
{
	const auto & files = part->checksums.files;

	for (const auto & column : *columns)
	{
		const auto escaped_name = escapeForFileName(column.name);
		const auto bin_file_name = escaped_name + ".bin";
		const auto mrk_file_name = escaped_name + ".mrk";

		auto & column_size = column_sizes[column.name];

		if (files.count(bin_file_name)) column_size -= files.find(bin_file_name)->second.file_size;
		if (files.count(mrk_file_name)) column_size -= files.find(mrk_file_name)->second.file_size;
	}
}

A
Merge  
Alexey Milovidov 已提交
1266

1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300
void MergeTreeData::freezePartition(const std::string & prefix)
{
	LOG_DEBUG(log, "Freezing parts with prefix " + prefix);

	String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();
	String shadow_path = clickhouse_path + "shadow/";
	Poco::File(shadow_path).createDirectories();
	String backup_path = shadow_path + toString(Increment(shadow_path + "increment.txt").get(true)) + "/";

	LOG_DEBUG(log, "Snapshot will be placed at " + backup_path);

	size_t parts_processed = 0;
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
	{
		if (0 == it.name().compare(0, prefix.size(), prefix))
		{
			LOG_DEBUG(log, "Freezing part " + it.name());

			String part_absolute_path = it.path().absolute().toString();
			if (0 != part_absolute_path.compare(0, clickhouse_path.size(), clickhouse_path))
				throw Exception("Part path " + part_absolute_path + " is not inside " + clickhouse_path, ErrorCodes::LOGICAL_ERROR);

			String backup_part_absolute_path = part_absolute_path;
			backup_part_absolute_path.replace(0, clickhouse_path.size(), backup_path);
			localBackup(part_absolute_path, backup_part_absolute_path);
			++parts_processed;
		}
	}

	LOG_DEBUG(log, "Freezed " << parts_processed << " parts");
}


A
Merge  
Alexey Milovidov 已提交
1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
static std::pair<String, DayNum_t> getMonthNameAndDayNum(const Field & partition)
{
	String month_name = partition.getType() == Field::Types::UInt64
		? toString(partition.get<UInt64>())
		: partition.safeGet<String>();

	if (month_name.size() != 6 || !std::all_of(month_name.begin(), month_name.end(), isdigit))
		throw Exception("Invalid partition format: " + month_name + ". Partition should consist of 6 digits: YYYYMM",
			ErrorCodes::INVALID_PARTITION_NAME);

1311
	DayNum_t date = DateLUT::instance().YYYYMMDDToDayNum(parse<UInt32>(month_name + "01"));
A
Merge  
Alexey Milovidov 已提交
1312 1313

	/// Не можем просто сравнить date с нулем, потому что 0 тоже валидный DayNum.
1314
	if (month_name != toString(DateLUT::instance().toNumYYYYMMDD(date) / 100))
A
Merge  
Alexey Milovidov 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
		throw Exception("Invalid partition format: " + month_name + " doesn't look like month.",
			ErrorCodes::INVALID_PARTITION_NAME);

	return std::make_pair(month_name, date);
}


String MergeTreeData::getMonthName(const Field & partition)
{
	return getMonthNameAndDayNum(partition).first;
}

DayNum_t MergeTreeData::getMonthDayNum(const Field & partition)
{
	return getMonthNameAndDayNum(partition).second;
}

M
Merge  
Michael Kolupaev 已提交
1332
}