MergeTreeData.cpp 30.4 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/MergeTree/MergeTreeData.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <Yandex/time2str.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <Poco/Ext/ScopedTry.h>
M
Merge  
Michael Kolupaev 已提交
4
#include <DB/Interpreters/ExpressionAnalyzer.h>
M
Merge  
Michael Kolupaev 已提交
5
#include <DB/Storages/MergeTree/MergeTreeReader.h>
M
Merge  
Michael Kolupaev 已提交
6 7
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
#include <DB/Storages/MergeTree/MergedBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
8 9 10
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
M
Merge  
Michael Kolupaev 已提交
11
#include <DB/DataStreams/copyData.h>
M
Merge  
Michael Kolupaev 已提交
12 13 14 15 16 17



namespace DB
{

M
Merge  
Michael Kolupaev 已提交
18
static String lastTwoPathComponents(String path)
M
Merge  
Michael Kolupaev 已提交
19 20 21 22 23 24 25 26 27 28 29
{
	if (!path.empty() && *path.rbegin() == '/')
		path.erase(path.end() - 1);
	size_t slash = path.rfind('/');
	if (slash == String::npos || slash == 0)
		return path;
	slash = path.rfind('/', slash - 1);
	if (slash == String::npos)
		return path;
	return path.substr(slash + 1);
}
M
Merge  
Michael Kolupaev 已提交
30 31

MergeTreeData::MergeTreeData(
M
Merge  
Michael Kolupaev 已提交
32
	const String & full_path_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
33 34 35 36 37 38 39
	const Context & context_,
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_, const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
M
Merge  
Michael Kolupaev 已提交
40
	: context(context_),
M
Merge  
Michael Kolupaev 已提交
41 42 43
	date_column_name(date_column_name_), sampling_expression(sampling_expression_),
	index_granularity(index_granularity_),
	mode(mode_), sign_column(sign_column_),
M
Merge  
Michael Kolupaev 已提交
44
	settings(settings_), primary_expr_ast(primary_expr_ast_->clone()),
M
Merge  
Michael Kolupaev 已提交
45 46
	full_path(full_path_), columns(columns_),
	log(&Logger::get("MergeTreeData: " + lastTwoPathComponents(full_path))),
M
Merge  
Michael Kolupaev 已提交
47 48 49 50 51 52 53
	file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)")
{
	/// создаём директорию, если её нет
	Poco::File(full_path).createDirectories();

	/// инициализируем описание сортировки
	sort_descr.reserve(primary_expr_ast->children.size());
M
Merge  
Michael Kolupaev 已提交
54
	for (const ASTPtr & ast : primary_expr_ast->children)
M
Merge  
Michael Kolupaev 已提交
55
	{
M
Merge  
Michael Kolupaev 已提交
56
		String name = ast->getColumnName();
M
Merge  
Michael Kolupaev 已提交
57 58 59 60 61 62 63 64 65
		sort_descr.push_back(SortColumnDescription(name, 1));
	}

	primary_expr = ExpressionAnalyzer(primary_expr_ast, context, *columns).getActions(false);

	ExpressionActionsPtr projected_expr = ExpressionAnalyzer(primary_expr_ast, context, *columns).getActions(true);
	primary_key_sample = projected_expr->getSampleBlock();

	loadDataParts();
M
Merge  
Michael Kolupaev 已提交
66
}
M
Merge  
Michael Kolupaev 已提交
67

M
Merge  
Michael Kolupaev 已提交
68 69
UInt64 MergeTreeData::getMaxDataPartIndex()
{
M
Merge  
Michael Kolupaev 已提交
70 71 72 73 74
	UInt64 max_part_id = 0;
	for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
	{
		max_part_id = std::max(max_part_id, (*it)->right);
	}
M
Merge  
Michael Kolupaev 已提交
75
	return max_part_id;
M
Merge  
Michael Kolupaev 已提交
76 77
}

M
Merge  
Michael Kolupaev 已提交
78
std::string MergeTreeData::getModePrefix() const
M
Merge  
Michael Kolupaev 已提交
79
{
M
Merge  
Michael Kolupaev 已提交
80
	switch (mode)
M
Merge  
Michael Kolupaev 已提交
81
	{
M
Merge  
Michael Kolupaev 已提交
82 83 84
		case Ordinary: 		return "";
		case Collapsing: 	return "Collapsing";
		case Summing: 		return "Summing";
M
Merge  
Michael Kolupaev 已提交
85

M
Merge  
Michael Kolupaev 已提交
86 87
		default:
			throw Exception("Unknown mode of operation for MergeTreeData: " + toString(mode), ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
	}
}




String MergeTreeData::getPartName(DayNum_t left_date, DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level)
{
	DateLUTSingleton & date_lut = DateLUTSingleton::instance();

	/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
	String res;
	{
		unsigned left_date_id = Date2OrderedIdentifier(date_lut.fromDayNum(left_date));
		unsigned right_date_id = Date2OrderedIdentifier(date_lut.fromDayNum(right_date));

		WriteBufferFromString wb(res);

		writeIntText(left_date_id, wb);
		writeChar('_', wb);
		writeIntText(right_date_id, wb);
		writeChar('_', wb);
		writeIntText(left_id, wb);
		writeChar('_', wb);
		writeIntText(right_id, wb);
		writeChar('_', wb);
		writeIntText(level, wb);
	}

	return res;
}


M
Merge  
Michael Kolupaev 已提交
121
void MergeTreeData::parsePartName(const String & file_name, DataPart & part, const Poco::RegularExpression::MatchVec * matches_p)
M
Merge  
Michael Kolupaev 已提交
122
{
M
Merge  
Michael Kolupaev 已提交
123 124 125 126 127 128 129 130 131 132
	Poco::RegularExpression::MatchVec match_vec;
	if (!matches_p)
	{
		if (!isPartDirectory(file_name, match_vec))
			throw Exception("Unexpected part name: " + file_name, ErrorCodes::BAD_DATA_PART_NAME);
		matches_p = &match_vec;
	}

	const Poco::RegularExpression::MatchVec & matches = *matches_p;

M
Merge  
Michael Kolupaev 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	DateLUTSingleton & date_lut = DateLUTSingleton::instance();

	part.left_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length)));
	part.right_date = date_lut.toDayNum(OrderedIdentifier2Date(file_name.substr(matches[2].offset, matches[2].length)));
	part.left = parse<UInt64>(file_name.substr(matches[3].offset, matches[3].length));
	part.right = parse<UInt64>(file_name.substr(matches[4].offset, matches[4].length));
	part.level = parse<UInt32>(file_name.substr(matches[5].offset, matches[5].length));

	part.left_month = date_lut.toFirstDayNumOfMonth(part.left_date);
	part.right_month = date_lut.toFirstDayNumOfMonth(part.right_date);
}


void MergeTreeData::loadDataParts()
{
	LOG_DEBUG(log, "Loading data parts");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

	data_parts.clear();

M
Merge  
Michael Kolupaev 已提交
155
	Strings all_file_names;
M
Merge  
Michael Kolupaev 已提交
156 157
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
M
Merge  
Michael Kolupaev 已提交
158
		all_file_names.push_back(it.name());
M
Merge  
Michael Kolupaev 已提交
159

M
Merge  
Michael Kolupaev 已提交
160 161 162
	Strings part_file_names;
	for (const String & file_name : all_file_names)
	{
M
Merge  
Michael Kolupaev 已提交
163 164 165 166 167
		/// Удаляем временные директории старше суток.
		if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
			continue;

		if (0 == file_name.compare(0, strlen("old_"), "old_"))
M
Merge  
Michael Kolupaev 已提交
168 169 170 171 172 173
		{
			String new_file_name = file_name.substr(strlen("old_"));
			LOG_WARNING(log, "Renaming " << file_name << " to " << new_file_name << " for compatibility reasons");
			Poco::File(full_path + file_name).renameTo(full_path + new_file_name);
			part_file_names.push_back(new_file_name);
		}
M
Merge  
Michael Kolupaev 已提交
174
		else
M
Merge  
Michael Kolupaev 已提交
175
		{
M
Merge  
Michael Kolupaev 已提交
176
			part_file_names.push_back(file_name);
M
Merge  
Michael Kolupaev 已提交
177
		}
M
Merge  
Michael Kolupaev 已提交
178 179 180
	}

	Poco::RegularExpression::MatchVec matches;
M
Merge  
Michael Kolupaev 已提交
181
	for (const String & file_name : part_file_names)
M
Merge  
Michael Kolupaev 已提交
182 183 184 185
	{
		if (!isPartDirectory(file_name, matches))
			continue;

M
Merge  
Michael Kolupaev 已提交
186
		MutableDataPartPtr part = std::make_shared<DataPart>(*this);
M
Merge  
Michael Kolupaev 已提交
187
		parsePartName(file_name, *part, &matches);
M
Merge  
Michael Kolupaev 已提交
188 189
		part->name = file_name;

M
Merge  
Michael Kolupaev 已提交
190
		/// Игнорируем и, возможно, удаляем битые куски, которые могут образовываться после грубого перезапуска сервера.
M
Merge  
Michael Kolupaev 已提交
191 192 193 194 195
		if (isBrokenPart(full_path + file_name))
		{
			if (part->level == 0)
			{
				/// Восстановить куски нулевого уровня невозможно.
M
Merge  
Michael Kolupaev 已提交
196
				LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because is't impossible to repair.");
M
Merge  
Michael Kolupaev 已提交
197 198 199 200
				part->remove();
			}
			else
			{
M
Merge  
Michael Kolupaev 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
				/// Посмотрим, сколько кусков покрыты битым. Если хотя бы два, предполагаем, что битый кусок образован их
				///  слиянием, и мы ничего не потеряем, если его удалим.
				int contained_parts = 0;

				LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");

				for (const String & contained_name : part_file_names)
				{
					if (contained_name == file_name)
						continue;
					if (!isPartDirectory(contained_name, matches))
						continue;
					DataPart contained_part(*this);
					parsePartName(contained_name, contained_part, &matches);
					if (part->contains(contained_part))
					{
						LOG_ERROR(log, "Found part " << full_path + contained_name);
						++contained_parts;
					}
				}

				if (contained_parts >= 2)
				{
					LOG_ERROR(log, "Removing broken part " << full_path + file_name << " because it covers at least 2 other parts");
					Poco::File(full_path + file_name).remove(true);
				}
				else
				{
					LOG_ERROR(log, "Not removing broken part " << full_path + file_name
						<< " because it covers less than 2 parts. You need to resolve this manually");
				}
M
Merge  
Michael Kolupaev 已提交
232 233 234 235 236 237 238 239 240 241
			}

			continue;
		}

		part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();

		try
		{
			part->loadIndex();
242
			part->loadChecksums();
M
Merge  
Michael Kolupaev 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
		}
		catch (...)
		{
			/// Не будем вставлять в набор кусок с битым индексом. Пропустим кусок и позволим серверу запуститься.
			tryLogCurrentException(__PRETTY_FUNCTION__);
			continue;
		}

		data_parts.insert(part);
	}

	all_data_parts = data_parts;

	/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
	  *  но по каким-то причинам остались лежать в файловой системе.
	  * Удаление файлов будет произведено потом в методе clearOldParts.
	  */

	if (data_parts.size() >= 2)
	{
		DataParts::iterator prev_jt = data_parts.begin();
		DataParts::iterator curr_jt = prev_jt;
		++curr_jt;
		while (curr_jt != data_parts.end())
		{
			/// Куски данных за разные месяцы рассматривать не будем
			if ((*curr_jt)->left_month != (*curr_jt)->right_month
				|| (*curr_jt)->right_month != (*prev_jt)->left_month
				|| (*prev_jt)->left_month != (*prev_jt)->right_month)
			{
				++prev_jt;
				++curr_jt;
				continue;
			}

			if ((*curr_jt)->contains(**prev_jt))
			{
M
Merge  
Michael Kolupaev 已提交
280 281
				LOG_INFO(log, "Part " << (*curr_jt)->name << " contains " << (*prev_jt)->name);
				(*prev_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
282 283 284 285 286 287
				data_parts.erase(prev_jt);
				prev_jt = curr_jt;
				++curr_jt;
			}
			else if ((*prev_jt)->contains(**curr_jt))
			{
M
Merge  
Michael Kolupaev 已提交
288 289
				LOG_INFO(log, "Part " << (*prev_jt)->name << " contains " << (*curr_jt)->name);
				(*curr_jt)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303
				data_parts.erase(curr_jt++);
			}
			else
			{
				++prev_jt;
				++curr_jt;
			}
		}
	}

	LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
}


M
Merge  
Michael Kolupaev 已提交
304
Strings MergeTreeData::clearOldParts()
M
Merge  
Michael Kolupaev 已提交
305 306
{
	Poco::ScopedTry<Poco::FastMutex> lock;
M
Merge  
Michael Kolupaev 已提交
307
	Strings res;
M
Merge  
Michael Kolupaev 已提交
308 309 310 311

	/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
	if (!lock.lock(&all_data_parts_mutex))
	{
M
Merge  
Michael Kolupaev 已提交
312
		return res;
M
Merge  
Michael Kolupaev 已提交
313 314
	}

M
Merge  
Michael Kolupaev 已提交
315
	time_t now = time(0);
M
Merge  
Michael Kolupaev 已提交
316 317
	for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
	{
M
Merge  
Michael Kolupaev 已提交
318
		int ref_count = it->use_count();
M
Merge  
Michael Kolupaev 已提交
319 320
		if (ref_count == 1 && /// После этого ref_count не может увеличиться.
			(*it)->remove_time + settings.old_parts_lifetime < now)
M
Merge  
Michael Kolupaev 已提交
321
		{
M
Merge  
Michael Kolupaev 已提交
322
			LOG_DEBUG(log, "Removing part " << (*it)->name);
M
Merge  
Michael Kolupaev 已提交
323

M
Merge  
Michael Kolupaev 已提交
324 325
			res.push_back((*it)->name);
			(*it)->remove();
M
Merge  
Michael Kolupaev 已提交
326 327 328 329 330 331
			all_data_parts.erase(it++);
		}
		else
			++it;
	}

M
Merge  
Michael Kolupaev 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
	/// Удаляем временные директории старше суток.
	Strings all_file_names;
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it(full_path); it != end; ++it)
		all_file_names.push_back(it.name());

	for (const String & file_name : all_file_names)
	{
		if (0 == file_name.compare(0, strlen("tmp_"), "tmp_"))
		{
			Poco::File tmp_dir(full_path + file_name);

			if (tmp_dir.isDirectory() && tmp_dir.getLastModified().epochTime() + 86400 < time(0))
			{
				LOG_WARNING(log, "Removing temporary directory " << full_path << file_name);
				Poco::File(full_path + file_name).remove(true);
			}

			continue;
		}
	}

M
Merge  
Michael Kolupaev 已提交
354
	return res;
M
Merge  
Michael Kolupaev 已提交
355 356
}

M
Merge  
Michael Kolupaev 已提交
357
void MergeTreeData::setPath(const String & new_full_path)
M
Merge  
Michael Kolupaev 已提交
358
{
359
	Poco::File(full_path).renameTo(new_full_path);
M
Merge  
Michael Kolupaev 已提交
360
	full_path = new_full_path;
361

362
	context.resetCaches();
363

M
Merge  
Michael Kolupaev 已提交
364
	log = &Logger::get(lastTwoPathComponents(full_path));
M
Merge  
Michael Kolupaev 已提交
365 366
}

M
Merge  
Michael Kolupaev 已提交
367
void MergeTreeData::dropAllData()
M
Merge  
Michael Kolupaev 已提交
368 369 370 371
{
	data_parts.clear();
	all_data_parts.clear();

372
	context.resetCaches();
373

M
Merge  
Michael Kolupaev 已提交
374 375 376
	Poco::File(full_path).remove(true);
}

377
void MergeTreeData::removeColumnFiles(String column_name, bool remove_array_size_files)
M
Merge  
Michael Kolupaev 已提交
378 379 380 381
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

382 383 384 385 386 387 388 389 390 391
	size_t dot_pos = column_name.find('.');
	if (dot_pos != std::string::npos)
	{
		std::string nested_column = column_name.substr(0, dot_pos);
		column_name = nested_column + "%2E" + column_name.substr(dot_pos + 1);

		if (remove_array_size_files)
			column_name = std::string("(?:") + nested_column + "|" + column_name + ")";
	}

M
Merge  
Michael Kolupaev 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
	/// Регэксп выбирает файлы столбца для удаления
	Poco::RegularExpression re(column_name + "(?:(?:\\.|\\%2E).+){0,1}" +"(?:\\.mrk|\\.bin|\\.size\\d+\\.bin|\\.size\\d+\\.mrk)");
	/// Цикл по всем директориям кусочков
	Poco::RegularExpression::MatchVec matches;
	Poco::DirectoryIterator end;
	for (Poco::DirectoryIterator it_dir = Poco::DirectoryIterator(full_path); it_dir != end; ++it_dir)
	{
		std::string dir_name = it_dir.name();

		if (!isPartDirectory(dir_name, matches))
			continue;

		/// Цикл по каждому из файлов в директории кусочков
		String full_dir_name = full_path + dir_name + "/";
		for (Poco::DirectoryIterator it_file(full_dir_name); it_file != end; ++it_file)
		{
			if (re.match(it_file.name()))
			{
				Poco::File file(full_dir_name + it_file.name());
				if (file.exists())
					file.remove();
			}
		}
	}
}

void MergeTreeData::createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column)
{
M
Merge  
Michael Kolupaev 已提交
420 421 422
	Names out_names;
	out_expression = new ExpressionActions(
		NamesAndTypesList(1, NameAndTypePair(in_column_name, getDataTypeByName(in_column_name))), context.getSettingsRef());
M
Merge  
Michael Kolupaev 已提交
423

M
Merge  
Michael Kolupaev 已提交
424 425 426
	FunctionPtr function = context.getFunctionFactory().get("to" + out_type, context);
	out_expression->add(ExpressionActions::Action::applyFunction(function, Names(1, in_column_name)), out_names);
	out_expression->add(ExpressionActions::Action::removeColumn(in_column_name));
M
Merge  
Michael Kolupaev 已提交
427

M
Merge  
Michael Kolupaev 已提交
428
	out_column = out_names[0];
M
Merge  
Michael Kolupaev 已提交
429 430
}

M
Merge  
Michael Kolupaev 已提交
431 432 433 434 435 436 437 438 439 440
static DataTypePtr getDataTypeByName(const String & name, const NamesAndTypesList & columns)
{
	for (const auto & it : columns)
	{
		if (it.first == name)
			return it.second;
	}
	throw Exception("No column " + name + " in table", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
}

441
/// одинаковыми считаются имена, вида "name.*"
M
Merge  
Michael Kolupaev 已提交
442
static bool namesWithDotEqual(const String & name_with_dot, const NameAndTypePair & name_type)
443 444 445 446
{
	return (name_with_dot == name_type.first.substr(0, name_with_dot.length()));
}

M
Merge  
Michael Kolupaev 已提交
447 448 449
void MergeTreeData::alter(const ASTAlterQuery::Parameters & params)
{
	{
M
Merge  
Michael Kolupaev 已提交
450 451 452 453
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
		Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
		alterColumns(params, columns, context);
	}
454

M
Merge  
Michael Kolupaev 已提交
455 456 457
	if (params.type == ASTAlterQuery::DROP)
	{
		String column_name = dynamic_cast<const ASTIdentifier &>(*params.column).name;
458 459 460 461 462 463 464 465 466

		/// Если нет колонок вида nested_name.*, то удалим столбцы размера массивов
		bool remove_array_size_files = false;
		size_t dot_pos = column_name.find('.');
		if (dot_pos != std::string::npos)
		{
			remove_array_size_files = (columns->end() == std::find_if(columns->begin(), columns->end(), boost::bind(namesWithDotEqual, column_name.substr(0, dot_pos), _1)));
		}
		removeColumnFiles(column_name, remove_array_size_files);
M
Merge  
Michael Kolupaev 已提交
467

468
		context.resetCaches();
M
Merge  
Michael Kolupaev 已提交
469 470
	}
}
M
Merge  
Michael Kolupaev 已提交
471

M
Merge  
Michael Kolupaev 已提交
472 473 474 475 476 477 478
void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params)
{
	DataPartsVector parts;
	{
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
		parts = DataPartsVector(data_parts.begin(), data_parts.end());
	}
M
Merge  
Michael Kolupaev 已提交
479

M
Merge  
Michael Kolupaev 已提交
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
	Names column_name;
	const ASTNameTypePair & name_type = dynamic_cast<const ASTNameTypePair &>(*params.name_type);
	StringRange type_range = name_type.type->range;
	String type(type_range.first, type_range.second - type_range.first);
	DataTypePtr old_type_ptr = DB::getDataTypeByName(name_type.name, *columns);
	DataTypePtr new_type_ptr = context.getDataTypeFactory().get(type);
	if (dynamic_cast<DataTypeNested *>(old_type_ptr.get()) || dynamic_cast<DataTypeArray *>(old_type_ptr.get()) ||
		dynamic_cast<DataTypeNested *>(new_type_ptr.get()) || dynamic_cast<DataTypeArray *>(new_type_ptr.get()))
		throw Exception("ALTER MODIFY not supported for nested and array types");

	column_name.push_back(name_type.name);
	ExpressionActionsPtr expr;
	String out_column;
	createConvertExpression(name_type.name, type, expr, out_column);

	ColumnNumbers num(1, 0);
	for (DataPartPtr & part : parts)
	{
		MarkRanges ranges(1, MarkRange(0, part->size));
		ExpressionBlockInputStream in(new MergeTreeBlockInputStream(full_path + part->name + '/',
A
Alexey Milovidov 已提交
500
			DEFAULT_MERGE_BLOCK_SIZE, column_name, *this, part, ranges, false, nullptr, ""), expr);
M
Merge  
Michael Kolupaev 已提交
501
		MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true);
M
Merge  
Michael Kolupaev 已提交
502
		in.readPrefix();
M
Merge  
Michael Kolupaev 已提交
503
		out.writePrefix();
M
Merge  
Michael Kolupaev 已提交
504

M
Merge  
Michael Kolupaev 已提交
505 506
		try
		{
M
Merge  
Michael Kolupaev 已提交
507
			while(Block b = in.read())
M
Merge  
Michael Kolupaev 已提交
508
				out.write(b);
M
Merge  
Michael Kolupaev 已提交
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523

			in.readSuffix();
			DataPart::Checksums add_checksums = out.writeSuffixAndGetChecksums();

			/// Запишем обновленные контрольные суммы во временный файл.
			if (!part->checksums.empty())
			{
				DataPart::Checksums new_checksums = part->checksums;
				std::string escaped_name = escapeForFileName(name_type.name);
				std::string escaped_out_column = escapeForFileName(out_column);
				new_checksums.files[escaped_name  + ".bin"] = add_checksums.files[escaped_out_column + ".bin"];
				new_checksums.files[escaped_name  + ".mrk"] = add_checksums.files[escaped_out_column + ".mrk"];

				WriteBufferFromFile checksums_file(full_path + part->name + '/' + escaped_out_column + ".checksums.txt", 1024);
				new_checksums.writeText(checksums_file);
M
Merge  
Michael Kolupaev 已提交
524
			}
M
Merge  
Michael Kolupaev 已提交
525 526 527 528 529
		}
		catch (const Exception & e)
		{
			if (e.code() != ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING)
				throw;
M
Merge  
Michael Kolupaev 已提交
530 531
		}
	}
M
Merge  
Michael Kolupaev 已提交
532 533 534 535 536
}

void MergeTreeData::commitAlterModify(const ASTAlterQuery::Parameters & params)
{
	DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
537 538
	{
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
539
		parts = DataPartsVector(data_parts.begin(), data_parts.end());
M
Merge  
Michael Kolupaev 已提交
540
	}
M
Merge  
Michael Kolupaev 已提交
541 542 543 544 545 546 547 548 549 550 551 552

	const ASTNameTypePair & name_type = dynamic_cast<const ASTNameTypePair &>(*params.name_type);
	StringRange type_range = name_type.type->range;
	String type(type_range.first, type_range.second - type_range.first);

	ExpressionActionsPtr expr;
	String out_column;
	createConvertExpression(name_type.name, type, expr, out_column);

	/// переименовываем файлы
	/// переименовываем старые столбцы, добавляя расширение .old
	for (DataPartPtr & part : parts)
M
Merge  
Michael Kolupaev 已提交
553
	{
M
Merge  
Michael Kolupaev 已提交
554 555
		std::string part_path = full_path + part->name + '/';
		std::string path = part_path + escapeForFileName(name_type.name);
M
Merge  
Michael Kolupaev 已提交
556 557 558 559 560 561
		if (Poco::File(path + ".bin").exists())
		{
			LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << path + ".bin" + ".old");
			Poco::File(path + ".bin").renameTo(path + ".bin" + ".old");
			LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << path + ".mrk" + ".old");
			Poco::File(path + ".mrk").renameTo(path + ".mrk" + ".old");
M
Merge  
Michael Kolupaev 已提交
562 563 564 565 566 567

			if (Poco::File(part_path + "checksums.txt").exists())
			{
				LOG_TRACE(log, "Renaming " << part_path + "checksums.txt" << " to " << part_path + "checksums.txt" + ".old");
				Poco::File(part_path + "checksums.txt").renameTo(part_path + "checksums.txt" + ".old");
			}
M
Merge  
Michael Kolupaev 已提交
568 569
		}
	}
M
Merge  
Michael Kolupaev 已提交
570

M
Merge  
Michael Kolupaev 已提交
571 572 573
	/// переименовываем временные столбцы
	for (DataPartPtr & part : parts)
	{
M
Merge  
Michael Kolupaev 已提交
574 575 576 577 578
		std::string part_path = full_path + part->name + '/';
		std::string name = escapeForFileName(out_column);
		std::string new_name = escapeForFileName(name_type.name);
		std::string path = part_path + name;
		std::string new_path = part_path + new_name;
M
Merge  
Michael Kolupaev 已提交
579 580 581 582 583 584
		if (Poco::File(path + ".bin").exists())
		{
			LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << new_path + ".bin");
			Poco::File(path + ".bin").renameTo(new_path + ".bin");
			LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << new_path + ".mrk");
			Poco::File(path + ".mrk").renameTo(new_path + ".mrk");
M
Merge  
Michael Kolupaev 已提交
585 586 587 588 589 590

			if (Poco::File(path + ".checksums.txt").exists())
			{
				LOG_TRACE(log, "Renaming " << path + ".checksums.txt" << " to " << part_path + ".checksums.txt");
				Poco::File(path + ".checksums.txt").renameTo(part_path + "checksums.txt");
			}
M
Merge  
Michael Kolupaev 已提交
591 592 593 594 595 596
		}
	}

	// удаляем старые столбцы
	for (DataPartPtr & part : parts)
	{
M
Merge  
Michael Kolupaev 已提交
597 598
		std::string part_path = full_path + part->name + '/';
		std::string path = part_path + escapeForFileName(name_type.name);
M
Merge  
Michael Kolupaev 已提交
599 600 601 602 603 604
		if (Poco::File(path + ".bin" + ".old").exists())
		{
			LOG_TRACE(log, "Removing old column " << path + ".bin" + ".old");
			Poco::File(path + ".bin" + ".old").remove();
			LOG_TRACE(log, "Removing old column " << path + ".mrk" + ".old");
			Poco::File(path + ".mrk" + ".old").remove();
M
Merge  
Michael Kolupaev 已提交
605 606 607 608 609 610

			if (Poco::File(part_path + "checksums.txt" + ".old").exists())
			{
				LOG_TRACE(log, "Removing old checksums " << part_path + "checksums.txt" + ".old");
				Poco::File(part_path + "checksums.txt" + ".old").remove();
			}
M
Merge  
Michael Kolupaev 已提交
611 612 613
		}
	}

614
	context.resetCaches();
M
Merge  
Michael Kolupaev 已提交
615 616 617 618 619

	{
		Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
		Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
		alterColumns(params, columns, context);
M
Merge  
Michael Kolupaev 已提交
620
	}
M
Merge  
Michael Kolupaev 已提交
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678
}


bool MergeTreeData::isPartDirectory(const String & dir_name, Poco::RegularExpression::MatchVec & matches) const
{
	return (file_name_regexp.match(dir_name, 0, matches) && 6 == matches.size());
}


bool MergeTreeData::isBrokenPart(const String & path)
{
	/// Проверяем, что первичный ключ непуст.

	Poco::File index_file(path + "/primary.idx");

	if (!index_file.exists() || index_file.getSize() == 0)
	{
		LOG_ERROR(log, "Part " << path << " is broken: primary key is empty.");

		return true;
	}

	/// Проверяем, что все засечки непусты и имеют одинаковый размер.

	ssize_t marks_size = -1;
	for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
	{
		Poco::File marks_file(path + "/" + escapeForFileName(it->first) + ".mrk");

		/// при добавлении нового столбца в таблицу файлы .mrk не создаются. Не будем ничего удалять.
		if (!marks_file.exists())
			continue;

		if (marks_size == -1)
		{
			marks_size = marks_file.getSize();

			if (0 == marks_size)
			{
				LOG_ERROR(log, "Part " << path << " is broken: " << marks_file.path() << " is empty.");

				return true;
			}
		}
		else
		{
			if (static_cast<ssize_t>(marks_file.getSize()) != marks_size)
			{
				LOG_ERROR(log, "Part " << path << " is broken: marks have different sizes.");

				return true;
			}
		}
	}

	return false;
}

M
Merge  
Michael Kolupaev 已提交
679
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
M
Merge  
Michael Kolupaev 已提交
680
{
M
Merge  
Michael Kolupaev 已提交
681 682 683 684 685 686
	auto removed = renameTempPartAndReplace(part, increment);
	if (!removed.empty())
	{
		LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
			<< " existing part(s) (including " << removed[0]->name << ")");
	}
M
Merge  
Michael Kolupaev 已提交
687 688
}

M
Merge  
Michael Kolupaev 已提交
689
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment)
M
Merge  
Michael Kolupaev 已提交
690 691 692 693 694 695
{
	LOG_TRACE(log, "Renaming.");

	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);

696
	String old_path = getFullPath() + part->name + "/";
M
Merge  
Michael Kolupaev 已提交
697

M
Merge  
Michael Kolupaev 已提交
698
	/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
M
Merge  
Michael Kolupaev 已提交
699 700 701 702
	  * Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
	  *  содержат ещё не добавленный кусок.
	  */
	if (increment)
M
Merge  
Michael Kolupaev 已提交
703
		part->left = part->right = increment->get(false);
M
Merge  
Michael Kolupaev 已提交
704

M
Michael Kolupaev 已提交
705
	part->name = getPartName(part->left_date, part->right_date, part->left, part->right, part->level);
M
Merge  
Michael Kolupaev 已提交
706

M
Merge  
Michael Kolupaev 已提交
707 708
	if (data_parts.count(part))
		throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
709

710
	String new_path = getFullPath() + part->name + "/";
M
Merge  
Michael Kolupaev 已提交
711 712 713 714

	/// Переименовываем кусок.
	Poco::File(old_path).renameTo(new_path);

M
Merge  
Michael Kolupaev 已提交
715 716 717 718 719 720 721 722
	DataPartsVector res;
	/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
	DataParts::iterator it = data_parts.lower_bound(part);
	/// Пойдем влево.
	while (it != data_parts.begin())
	{
		--it;
		if (!part->contains(**it))
M
Michael Kolupaev 已提交
723 724
		{
			++it;
M
Merge  
Michael Kolupaev 已提交
725
			break;
M
Michael Kolupaev 已提交
726
		}
M
Merge  
Michael Kolupaev 已提交
727
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
728
		(*it)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
729 730 731 732 733 734 735
		data_parts.erase(it++); /// Да, ++, а не --.
	}
	std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
	/// Пойдем вправо.
	while (it != data_parts.end() && part->contains(**it))
	{
		res.push_back(*it);
M
Merge  
Michael Kolupaev 已提交
736
		(*it)->remove_time = time(0);
M
Merge  
Michael Kolupaev 已提交
737 738 739
		data_parts.erase(it++);
	}

M
Merge  
Michael Kolupaev 已提交
740 741
	data_parts.insert(part);
	all_data_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
742 743

	return res;
M
Merge  
Michael Kolupaev 已提交
744 745
}

M
Merge  
Michael Kolupaev 已提交
746
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
M
Merge  
Michael Kolupaev 已提交
747 748
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
749
	Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
M
Merge  
Michael Kolupaev 已提交
750
	if (!all_data_parts.erase(part))
M
Merge  
Michael Kolupaev 已提交
751
		throw Exception("No such data part", ErrorCodes::NO_SUCH_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
752
	data_parts.erase(part);
M
Merge  
Michael Kolupaev 已提交
753
	part->renameAddPrefix(prefix);
M
Merge  
Michael Kolupaev 已提交
754 755
}

M
Merge  
Michael Kolupaev 已提交
756 757 758 759 760 761 762
MergeTreeData::DataParts MergeTreeData::getDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	return data_parts;
}

M
Merge  
Michael Kolupaev 已提交
763 764 765 766 767 768 769
MergeTreeData::DataParts MergeTreeData::getAllDataParts()
{
	Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);

	return all_data_parts;
}

M
Merge  
Michael Kolupaev 已提交
770 771 772 773 774 775 776
size_t MergeTreeData::getDataPartsCount()
{
	Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);

	return data_parts.size();
}

M
Merge  
Michael Kolupaev 已提交
777
MergeTreeData::DataPartPtr MergeTreeData::getContainingPart(const String & part_name, bool including_inactive)
M
Merge  
Michael Kolupaev 已提交
778 779 780 781
{
	MutableDataPartPtr tmp_part(new DataPart(*this));
	parsePartName(part_name, *tmp_part);

M
Merge  
Michael Kolupaev 已提交
782 783 784
	Poco::ScopedLock<Poco::FastMutex> lock(including_inactive ? all_data_parts_mutex : data_parts_mutex);

	DataParts & parts = including_inactive ? all_data_parts : data_parts;
M
Merge  
Michael Kolupaev 已提交
785 786

	/// Кусок может покрываться только предыдущим или следующим в data_parts.
M
Merge  
Michael Kolupaev 已提交
787
	DataParts::iterator it = parts.lower_bound(tmp_part);
M
Merge  
Michael Kolupaev 已提交
788

M
Merge  
Michael Kolupaev 已提交
789
	if (it != parts.end())
M
Merge  
Michael Kolupaev 已提交
790 791 792 793 794 795 796
	{
		if ((*it)->name == part_name)
			return *it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

M
Merge  
Michael Kolupaev 已提交
797
	if (it != parts.begin())
M
Merge  
Michael Kolupaev 已提交
798 799 800 801 802 803 804 805 806
	{
		--it;
		if ((*it)->contains(*tmp_part))
			return *it;
	}

	return nullptr;
}

807

M
Merge  
Michael Kolupaev 已提交
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837
void MergeTreeData::DataPart::Checksums::Checksum::checkEqual(const Checksum & rhs, bool have_uncompressed, const String & name) const
{
	if (is_compressed && have_uncompressed)
	{
		if (!rhs.is_compressed)
			throw Exception("No uncompressed checksum for file " + name, ErrorCodes::CHECKSUM_DOESNT_MATCH);
		if (rhs.uncompressed_size != uncompressed_size)
			throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
		if (rhs.uncompressed_hash != uncompressed_hash)
			throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
		return;
	}
	if (rhs.file_size != file_size)
		throw Exception("Unexpected size of file " + name + " in data part", ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
	if (rhs.file_hash != file_hash)
		throw Exception("Checksum mismatch for file " + name + " in data part", ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

void MergeTreeData::DataPart::Checksums::Checksum::checkSize(const String & path) const
{
	Poco::File file(path);
	if (!file.exists())
		throw Exception(path + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
	size_t size = file.getSize();
	if (size != file_size)
		throw Exception(path + " has unexpected size: " + DB::toString(size) + " instead of " + DB::toString(file_size),
			ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
}

void MergeTreeData::DataPart::Checksums::checkEqual(const Checksums & rhs, bool have_uncompressed) const
838
{
M
Merge  
Michael Kolupaev 已提交
839
	for (const auto & it : rhs.files)
840 841 842
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
843
		if (!files.count(name))
844 845 846
			throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
	}

M
Merge  
Michael Kolupaev 已提交
847
	for (const auto & it : files)
848 849 850
	{
		const String & name = it.first;

M
Merge  
Michael Kolupaev 已提交
851 852
		auto jt = rhs.files.find(name);
		if (jt == rhs.files.end())
853 854
			throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);

M
Merge  
Michael Kolupaev 已提交
855 856 857
		it.second.checkEqual(jt->second, have_uncompressed, name);
	}
}
M
Merge  
Michael Kolupaev 已提交
858

M
Merge  
Michael Kolupaev 已提交
859 860 861 862 863 864
void MergeTreeData::DataPart::Checksums::checkSizes(const String & path) const
{
	for (const auto & it : files)
	{
		const String & name = it.first;
		it.second.checkSize(path + name);
865 866 867
	}
}

868
bool MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
869
{
M
Merge  
Michael Kolupaev 已提交
870
	files.clear();
871 872
	size_t count;

M
Merge  
Michael Kolupaev 已提交
873 874 875 876 877
	DB::assertString("checksums format version: ", in);
	int format_version;
	DB::readText(format_version, in);
	if (format_version < 1 || format_version > 2)
		throw Exception("Bad checksums format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT);
878 879
	if (format_version == 1)
		return false;
M
Merge  
Michael Kolupaev 已提交
880
	DB::assertString("\n",in);
881 882 883 884 885 886 887 888 889 890
	DB::readText(count, in);
	DB::assertString(" files:\n", in);

	for (size_t i = 0; i < count; ++i)
	{
		String name;
		Checksum sum;

		DB::readString(name, in);
		DB::assertString("\n\tsize: ", in);
M
Merge  
Michael Kolupaev 已提交
891
		DB::readText(sum.file_size, in);
892
		DB::assertString("\n\thash: ", in);
M
Merge  
Michael Kolupaev 已提交
893
		DB::readText(sum.file_hash.first, in);
894
		DB::assertString(" ", in);
M
Merge  
Michael Kolupaev 已提交
895
		DB::readText(sum.file_hash.second, in);
896 897 898
		DB::assertString("\n\tcompressed: ", in);
		DB::readText(sum.is_compressed, in);
		if (sum.is_compressed)
M
Merge  
Michael Kolupaev 已提交
899
		{
900 901 902 903 904 905
			DB::assertString("\n\tuncompressed size: ", in);
			DB::readText(sum.uncompressed_size, in);
			DB::assertString("\n\tuncompressed hash: ", in);
			DB::readText(sum.uncompressed_hash.first, in);
			DB::assertString(" ", in);
			DB::readText(sum.uncompressed_hash.second, in);
M
Merge  
Michael Kolupaev 已提交
906
		}
907
		DB::assertString("\n", in);
908

M
Merge  
Michael Kolupaev 已提交
909
		files.insert(std::make_pair(name, sum));
910
	}
911 912

	return true;
913 914 915 916
}

void MergeTreeData::DataPart::Checksums::writeText(WriteBuffer & out) const
{
M
Merge  
Michael Kolupaev 已提交
917
	DB::writeString("checksums format version: 2\n", out);
M
Merge  
Michael Kolupaev 已提交
918
	DB::writeText(files.size(), out);
919 920
	DB::writeString(" files:\n", out);

M
Merge  
Michael Kolupaev 已提交
921
	for (const auto & it : files)
922
	{
M
Merge  
Michael Kolupaev 已提交
923 924 925
		const String & name = it.first;
		const Checksum & sum = it.second;
		DB::writeString(name, out);
926
		DB::writeString("\n\tsize: ", out);
M
Merge  
Michael Kolupaev 已提交
927
		DB::writeText(sum.file_size, out);
928
		DB::writeString("\n\thash: ", out);
M
Merge  
Michael Kolupaev 已提交
929
		DB::writeText(sum.file_hash.first, out);
930
		DB::writeString(" ", out);
M
Merge  
Michael Kolupaev 已提交
931 932 933
		DB::writeText(sum.file_hash.second, out);
		DB::writeString("\n\tcompressed: ", out);
		DB::writeText(sum.is_compressed, out);
934
		DB::writeString("\n", out);
M
Merge  
Michael Kolupaev 已提交
935 936 937 938 939 940 941 942 943 944
		if (sum.is_compressed)
		{
			DB::writeString("\tuncompressed size: ", out);
			DB::writeText(sum.uncompressed_size, out);
			DB::writeString("\n\tuncompressed hash: ", out);
			DB::writeText(sum.uncompressed_hash.first, out);
			DB::writeString(" ", out);
			DB::writeText(sum.uncompressed_hash.second, out);
			DB::writeString("\n", out);
		}
945 946 947
	}
}

M
Merge  
Michael Kolupaev 已提交
948
}