StorageReplicatedMergeTree.cpp 42.6 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/StorageReplicatedMergeTree.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
M
Merge  
Michael Kolupaev 已提交
4 5 6
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
M
Merge  
Michael Kolupaev 已提交
7
#include <time.h>
M
Merge  
Michael Kolupaev 已提交
8 9 10 11

namespace DB
{

M
Merge  
Michael Kolupaev 已提交
12 13 14 15 16

const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5);
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
M
Merge  
Michael Kolupaev 已提交
17
const auto MERGE_SELECTING_SLEEP = std::chrono::seconds(5);
M
Merge  
Michael Kolupaev 已提交
18 19


M
Merge  
Michael Kolupaev 已提交
20 21 22
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
23
	bool attach,
M
Merge  
Michael Kolupaev 已提交
24 25
	const String & path_, const String & database_name_, const String & name_,
	NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
26
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
27 28 29 30 31 32 33 34
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
	:
M
Merge  
Michael Kolupaev 已提交
35
	context(context_), zookeeper(context.getZooKeeper()),
M
Merge  
Michael Kolupaev 已提交
36
	table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
M
Merge  
Michael Kolupaev 已提交
37
	replica_name(replica_name_),
M
Merge  
Michael Kolupaev 已提交
38
	data(	full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
M
Merge  
Michael Kolupaev 已提交
39
			index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name),
M
Merge  
Michael Kolupaev 已提交
40
	reader(data), writer(data), merger(data), fetcher(data),
M
Merge  
Michael Kolupaev 已提交
41
	log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)"))
M
Merge  
Michael Kolupaev 已提交
42
{
M
Merge  
Michael Kolupaev 已提交
43 44 45 46 47 48
	if (!zookeeper)
	{
		goReadOnly();
		return;
	}

M
Merge  
Michael Kolupaev 已提交
49 50 51
	if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
		zookeeper_path.erase(zookeeper_path.end() - 1);
	replica_path = zookeeper_path + "/replicas/" + replica_name;
52

M
Merge  
Michael Kolupaev 已提交
53 54
	if (!attach)
	{
M
Merge  
Michael Kolupaev 已提交
55
		if (!zookeeper->exists(zookeeper_path))
M
Merge  
Michael Kolupaev 已提交
56 57 58 59 60 61 62
			createTable();

		if (!isTableEmpty())
			throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);

		checkTableStructure();
		createReplica();
M
Merge  
Michael Kolupaev 已提交
63 64 65 66
	}
	else
	{
		checkTableStructure();
M
Merge  
Michael Kolupaev 已提交
67
		checkParts();
M
Merge  
Michael Kolupaev 已提交
68
	}
M
Merge  
Michael Kolupaev 已提交
69

M
Merge  
Michael Kolupaev 已提交
70
	loadQueue();
M
Merge  
Michael Kolupaev 已提交
71 72 73 74 75 76

	String unreplicated_path = full_path + "unreplicated/";
	if (Poco::File(unreplicated_path).exists())
	{
		LOG_INFO(log, "Have unreplicated data");
		unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_, context_, primary_expr_ast_,
M
Merge  
Michael Kolupaev 已提交
77 78
			date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_,
			database_name_ + "." + table_name + "[unreplicated]"));
M
Merge  
Michael Kolupaev 已提交
79
		unreplicated_reader.reset(new MergeTreeDataSelectExecutor(*unreplicated_data));
M
Merge  
Michael Kolupaev 已提交
80
		unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data));
M
Merge  
Michael Kolupaev 已提交
81
	}
M
Merge  
Michael Kolupaev 已提交
82

M
Merge  
Michael Kolupaev 已提交
83 84 85 86 87 88
	/// Сгенерируем этому экземпляру случайный идентификатор.
	struct timespec times;
	if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times))
		throwFromErrno("Cannot clock_gettime.", ErrorCodes::CANNOT_CLOCK_GETTIME);
	active_node_identifier = toString(times.tv_nsec);

M
Merge  
Michael Kolupaev 已提交
89
	restarting_thread = std::thread(&StorageReplicatedMergeTree::restartingThread, this);
M
Merge  
Michael Kolupaev 已提交
90 91 92 93 94
}

StoragePtr StorageReplicatedMergeTree::create(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
95
	bool attach,
M
Merge  
Michael Kolupaev 已提交
96 97
	const String & path_, const String & database_name_, const String & name_,
	NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
98
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
99 100 101 102 103 104 105 106
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
{
M
Merge  
Michael Kolupaev 已提交
107
	StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
M
Merge  
Michael Kolupaev 已提交
108
		path_, database_name_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
M
Merge  
Michael Kolupaev 已提交
109 110
		index_granularity_, mode_, sign_column_, settings_);
	StoragePtr res_ptr = res->thisPtr();
M
Merge  
Michael Kolupaev 已提交
111 112 113 114 115 116
	if (!res->is_read_only)
	{
		String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
		InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
		res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
	}
M
Merge  
Michael Kolupaev 已提交
117
	return res_ptr;
M
Merge  
Michael Kolupaev 已提交
118 119
}

M
Merge  
Michael Kolupaev 已提交
120 121 122 123 124 125 126 127
static String formattedAST(const ASTPtr & ast)
{
	if (!ast)
		return "";
	std::stringstream ss;
	formatAST(*ast, ss, 0, false, true);
	return ss.str();
}
M
Merge  
Michael Kolupaev 已提交
128

M
Merge  
Michael Kolupaev 已提交
129
void StorageReplicatedMergeTree::createTable()
M
Merge  
Michael Kolupaev 已提交
130
{
M
Merge  
Michael Kolupaev 已提交
131
	zookeeper->create(zookeeper_path, "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
132

M
Merge  
Michael Kolupaev 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
	/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
	std::stringstream metadata;
	metadata << "metadata format version: 1" << std::endl;
	metadata << "date column: " << data.date_column_name << std::endl;
	metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
	metadata << "index granularity: " << data.index_granularity << std::endl;
	metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
	metadata << "sign column: " << data.sign_column << std::endl;
	metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
	metadata << "columns:" << std::endl;
	WriteBufferFromOStream buf(metadata);
	for (auto & it : data.getColumnsList())
	{
		writeBackQuotedString(it.first, buf);
		writeChar(' ', buf);
		writeString(it.second->getName(), buf);
		writeChar('\n', buf);
	}
	buf.next();
M
Merge  
Michael Kolupaev 已提交
152

M
Merge  
Michael Kolupaev 已提交
153
	zookeeper->create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
154

M
Merge  
Michael Kolupaev 已提交
155 156 157 158 159
	zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
	zookeeper->create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
	zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
	zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
	zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
160
}
M
Merge  
Michael Kolupaev 已提交
161

M
Merge  
Michael Kolupaev 已提交
162 163
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
	* Если нет - бросить исключение.
M
Merge  
Michael Kolupaev 已提交
164
	*/
M
Merge  
Michael Kolupaev 已提交
165 166
void StorageReplicatedMergeTree::checkTableStructure()
{
M
Merge  
Michael Kolupaev 已提交
167
	String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
M
Merge  
Michael Kolupaev 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
	ReadBufferFromString buf(metadata_str);
	assertString("metadata format version: 1", buf);
	assertString("\ndate column: ", buf);
	assertString(data.date_column_name, buf);
	assertString("\nsampling expression: ", buf);
	assertString(formattedAST(data.sampling_expression), buf);
	assertString("\nindex granularity: ", buf);
	assertString(toString(data.index_granularity), buf);
	assertString("\nmode: ", buf);
	assertString(toString(static_cast<int>(data.mode)), buf);
	assertString("\nsign column: ", buf);
	assertString(data.sign_column, buf);
	assertString("\nprimary key: ", buf);
	assertString(formattedAST(data.primary_expr_ast), buf);
	assertString("\ncolumns:\n", buf);
	for (auto & it : data.getColumnsList())
	{
		String name;
		readBackQuotedString(name, buf);
		if (name != it.first)
			throw Exception("Unexpected column name in ZooKeeper: expected " + it.first + ", found " + name,
				ErrorCodes::UNKNOWN_IDENTIFIER);
		assertString(" ", buf);
		assertString(it.second->getName(), buf);
		assertString("\n", buf);
	}
M
Merge  
Michael Kolupaev 已提交
194
	assertEOF(buf);
M
Merge  
Michael Kolupaev 已提交
195
}
M
Merge  
Michael Kolupaev 已提交
196

M
Merge  
Michael Kolupaev 已提交
197 198
void StorageReplicatedMergeTree::createReplica()
{
M
Merge  
Michael Kolupaev 已提交
199 200 201 202 203 204
	zookeeper->create(replica_path, "", zkutil::CreateMode::Persistent);
	zookeeper->create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
	zookeeper->create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
	zookeeper->create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
	zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
	zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
205
}
M
Merge  
Michael Kolupaev 已提交
206

M
Merge  
Michael Kolupaev 已提交
207 208 209 210 211
void StorageReplicatedMergeTree::activateReplica()
{
	std::stringstream host;
	host << "host: " << context.getInterserverIOHost() << std::endl;
	host << "port: " << context.getInterserverIOPort() << std::endl;
M
Merge  
Michael Kolupaev 已提交
212

M
Merge  
Michael Kolupaev 已提交
213 214 215 216 217 218 219 220 221
	/** Если нода отмечена как активная, но отметка сделана в этом же экземпляре, удалим ее.
	  * Такое возможно только при истечении сессии в ZooKeeper.
	  * Здесь есть небольшой race condition (можем удалить не ту ноду, для которой сделали tryGet),
	  *  но он крайне маловероятен при нормальном использовании.
	  */
	String data;
	if (zookeeper->tryGet(replica_path + "/is_active", data) && data == active_node_identifier)
		zookeeper->tryRemove(replica_path + "/is_active");

M
Merge  
Michael Kolupaev 已提交
222
	/// Одновременно объявим, что эта реплика активна, и обновим хост.
M
Merge  
Michael Kolupaev 已提交
223
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
224
	ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral));
M
Merge  
Michael Kolupaev 已提交
225
	ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
M
Merge  
Michael Kolupaev 已提交
226 227 228

	try
	{
M
Merge  
Michael Kolupaev 已提交
229
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
230 231 232 233 234 235 236 237 238
	}
	catch (zkutil::KeeperException & e)
	{
		if (e.code == zkutil::ReturnCode::NodeExists)
			throw Exception("Replica " + replica_path + " appears to be already active. If you're sure it's not, "
				"try again in a minute or remove znode " + replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);

		throw;
	}
M
Merge  
Michael Kolupaev 已提交
239

M
Merge  
Michael Kolupaev 已提交
240
	replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", *zookeeper);
M
Merge  
Michael Kolupaev 已提交
241
}
M
Merge  
Michael Kolupaev 已提交
242

M
Merge  
Michael Kolupaev 已提交
243 244
bool StorageReplicatedMergeTree::isTableEmpty()
{
M
Merge  
Michael Kolupaev 已提交
245
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
246 247
	for (const auto & replica : replicas)
	{
M
Merge  
Michael Kolupaev 已提交
248
		if (!zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
M
Merge  
Michael Kolupaev 已提交
249 250 251 252
			return false;
	}
	return true;
}
M
Merge  
Michael Kolupaev 已提交
253

M
Merge  
Michael Kolupaev 已提交
254 255
void StorageReplicatedMergeTree::checkParts()
{
M
Merge  
Michael Kolupaev 已提交
256
	Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
M
Merge  
Michael Kolupaev 已提交
257 258

	/// Куски в ZK.
M
Merge  
Michael Kolupaev 已提交
259 260
	NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());

M
Merge  
Michael Kolupaev 已提交
261
	MergeTreeData::DataParts parts = data.getAllDataParts();
M
Merge  
Michael Kolupaev 已提交
262

M
Merge  
Michael Kolupaev 已提交
263
	/// Локальные куски, которых нет в ZK.
M
Merge  
Michael Kolupaev 已提交
264
	MergeTreeData::DataParts unexpected_parts;
M
Merge  
Michael Kolupaev 已提交
265

M
Merge  
Michael Kolupaev 已提交
266 267 268 269 270 271 272 273
	for (const auto & part : parts)
	{
		if (expected_parts.count(part->name))
		{
			expected_parts.erase(part->name);
		}
		else
		{
M
Merge  
Michael Kolupaev 已提交
274
			unexpected_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
275 276 277
		}
	}

M
Merge  
Michael Kolupaev 已提交
278
	/// Какие локальные куски добавить в ZK.
M
Merge  
Michael Kolupaev 已提交
279
	MergeTreeData::DataPartsVector parts_to_add;
M
Merge  
Michael Kolupaev 已提交
280 281 282 283

	/// Какие куски нужно забрать с других реплик.
	Strings parts_to_fetch;

M
Merge  
Michael Kolupaev 已提交
284 285
	for (const String & missing_name : expected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
286
		/// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим.
M
Merge  
Michael Kolupaev 已提交
287
		auto containing = data.getContainingPart(missing_name);
M
Merge  
Michael Kolupaev 已提交
288 289 290 291 292 293 294 295 296 297
		if (containing)
		{
			LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
			if (unexpected_parts.count(containing))
			{
				parts_to_add.push_back(containing);
				unexpected_parts.erase(containing);
			}
		}
		else
M
Merge  
Michael Kolupaev 已提交
298
		{
M
Merge  
Michael Kolupaev 已提交
299
			parts_to_fetch.push_back(missing_name);
M
Merge  
Michael Kolupaev 已提交
300 301
		}
	}
M
Merge  
Michael Kolupaev 已提交
302

M
Merge  
Michael Kolupaev 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
	for (const String & name : parts_to_fetch)
		expected_parts.erase(name);

	bool skip_sanity_check = false;
	if (zookeeper->exists(replica_path + "/flags/force_restore_data"))
	{
		skip_sanity_check = true;
		zookeeper->remove(replica_path + "/flags/force_restore_data");
	}

	String sanity_report =
		"There are " + toString(unexpected_parts.size()) + " unexpected parts, "
					 + toString(parts_to_add.size()) + " unexpectedly merged parts, "
					 + toString(expected_parts.size()) + " unexpectedly obsolete parts, "
					 + toString(parts_to_fetch.size()) + " missing parts";
	bool insane =
		parts_to_add.size() > 2 ||
M
Merge  
Michael Kolupaev 已提交
320
		unexpected_parts.size() > 2 ||
M
Merge  
Michael Kolupaev 已提交
321 322 323 324
		expected_parts.size() > 20 ||
		parts_to_fetch.size() > 2;

	if (skip_sanity_check)
M
Merge  
Michael Kolupaev 已提交
325
	{
M
Merge  
Michael Kolupaev 已提交
326 327 328 329 330 331 332
		LOG_WARNING(log, "Skipping the limits on severity of changes to data parts (flag "
			<< replica_path << "/flags/force_restore_data). " << sanity_report);
	}
	else if (insane)
	{
		throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. "
			+ sanity_report,
M
Merge  
Michael Kolupaev 已提交
333
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
M
Merge  
Michael Kolupaev 已提交
334 335 336 337 338
	}

	/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
	for (MergeTreeData::DataPartPtr part : parts_to_add)
	{
M
Merge  
Michael Kolupaev 已提交
339
		LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
M
Merge  
Michael Kolupaev 已提交
340

M
Merge  
Michael Kolupaev 已提交
341 342
		zkutil::Ops ops;
		checkPartAndAddToZooKeeper(part, ops);
M
Merge  
Michael Kolupaev 已提交
343
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
344
	}
M
Merge  
Michael Kolupaev 已提交
345

M
Merge  
Michael Kolupaev 已提交
346 347 348
	/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
	for (const String & name : expected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
		LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);

		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
		zookeeper->multi(ops);
	}

	/// Добавим в очередь задание забрать недостающие куски с других реплик и уберем из ZK информацию, что они у нас есть.
	for (const String & name : parts_to_fetch)
	{
		LOG_ERROR(log, "Removing missing part from ZooKeeper and queueing a fetch: " << name);

		LogEntry log_entry;
		log_entry.type = LogEntry::GET_PART;
		log_entry.source_replica = replica_name;
		log_entry.new_part_name = name;

		/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
M
Merge  
Michael Kolupaev 已提交
368 369 370
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
M
Merge  
Michael Kolupaev 已提交
371 372
		ops.push_back(new zkutil::Op::Create(
			replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
M
Merge  
Michael Kolupaev 已提交
373
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
374 375 376
	}

	/// Удалим лишние локальные куски.
M
Merge  
Michael Kolupaev 已提交
377 378
	for (MergeTreeData::DataPartPtr part : unexpected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
379
		LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name);
M
Merge  
Michael Kolupaev 已提交
380
		data.renameAndDetachPart(part, "ignored_");
M
Merge  
Michael Kolupaev 已提交
381 382
	}
}
M
Merge  
Michael Kolupaev 已提交
383

M
Merge  
Michael Kolupaev 已提交
384 385 386 387 388
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
{
	String another_replica = findReplicaHavingPart(part->name, false);
	if (!another_replica.empty())
	{
M
Merge  
Michael Kolupaev 已提交
389
		String checksums_str;
M
Merge  
Michael Kolupaev 已提交
390
		if (zookeeper->tryGet(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums", checksums_str))
M
Merge  
Michael Kolupaev 已提交
391 392 393 394
		{
			auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
			checksums.checkEqual(part->checksums, true);
		}
M
Merge  
Michael Kolupaev 已提交
395 396 397 398 399
	}

	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name,
		"",
M
Merge  
Michael Kolupaev 已提交
400
		zookeeper->getDefaultACL(),
M
Merge  
Michael Kolupaev 已提交
401 402 403 404
		zkutil::CreateMode::Persistent));
	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name + "/checksums",
		part->checksums.toString(),
M
Merge  
Michael Kolupaev 已提交
405
		zookeeper->getDefaultACL(),
M
Merge  
Michael Kolupaev 已提交
406 407 408
		zkutil::CreateMode::Persistent));
}

M
Merge  
Michael Kolupaev 已提交
409 410 411 412 413 414 415 416 417
void StorageReplicatedMergeTree::clearOldParts()
{
	Strings parts = data.clearOldParts();

	for (const String & name : parts)
	{
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
M
Merge  
Michael Kolupaev 已提交
418
		zkutil::ReturnCode::type code = zookeeper->tryMulti(ops);
M
Merge  
Michael Kolupaev 已提交
419 420 421 422 423 424 425 426
		if (code != zkutil::ReturnCode::Ok)
			LOG_WARNING(log, "Couldn't remove part " << name << " from ZooKeeper: " << zkutil::ReturnCode::toString(code));
	}

	if (!parts.empty())
		LOG_DEBUG(log, "Removed " << parts.size() << " old parts");
}

M
Merge  
Michael Kolupaev 已提交
427 428
void StorageReplicatedMergeTree::clearOldLogs()
{
M
Merge  
Michael Kolupaev 已提交
429
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
430 431 432 433
	UInt64 min_pointer = std::numeric_limits<UInt64>::max();
	for (const String & replica : replicas)
	{
		String pointer;
M
Merge  
Michael Kolupaev 已提交
434
		if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
M
Merge  
Michael Kolupaev 已提交
435 436 437 438
			return;
		min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
	}

M
Merge  
Michael Kolupaev 已提交
439
	Strings entries = zookeeper->getChildren(replica_path + "/log");
M
Merge  
Michael Kolupaev 已提交
440 441 442 443 444 445 446 447
	std::sort(entries.begin(), entries.end());
	size_t removed = 0;

	for (const String & entry : entries)
	{
		UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
		if (index >= min_pointer)
			break;
M
Merge  
Michael Kolupaev 已提交
448
		zookeeper->remove(replica_path + "/log/" + entry);
M
Merge  
Michael Kolupaev 已提交
449 450 451 452 453 454 455 456 457 458
		++removed;
	}

	if (removed > 0)
		LOG_DEBUG(log, "Removed " << removed << " old log entries");
}

void StorageReplicatedMergeTree::clearOldBlocks()
{
	zkutil::Stat stat;
M
Merge  
Michael Kolupaev 已提交
459
	if (!zookeeper->exists(zookeeper_path + "/blocks", &stat))
M
Merge  
Michael Kolupaev 已提交
460 461
		throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);

M
Merge  
Michael Kolupaev 已提交
462 463
	int children_count = stat.getnumChildren();

M
Merge  
Michael Kolupaev 已提交
464
	/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
M
Merge  
Michael Kolupaev 已提交
465
	if (static_cast<double>(children_count) < data.settings.replicated_deduplication_window * 1.1)
M
Merge  
Michael Kolupaev 已提交
466 467
		return;

M
Merge  
Michael Kolupaev 已提交
468 469 470
	LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
		<< " old blocks from ZooKeeper");

M
Merge  
Michael Kolupaev 已提交
471
	Strings blocks = zookeeper->getChildren(zookeeper_path + "/blocks");
M
Merge  
Michael Kolupaev 已提交
472 473 474 475 476 477

	std::vector<std::pair<Int64, String> > timed_blocks;

	for (const String & block : blocks)
	{
		zkutil::Stat stat;
M
Merge  
Michael Kolupaev 已提交
478
		zookeeper->exists(zookeeper_path + "/blocks/" + block, &stat);
M
Merge  
Michael Kolupaev 已提交
479 480 481
		timed_blocks.push_back(std::make_pair(stat.getczxid(), block));
	}

M
Merge  
Michael Kolupaev 已提交
482
	std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
M
Merge  
Michael Kolupaev 已提交
483 484
	for (size_t i = data.settings.replicated_deduplication_window; i <  timed_blocks.size(); ++i)
	{
M
Merge  
Michael Kolupaev 已提交
485 486 487 488
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
M
Merge  
Michael Kolupaev 已提交
489
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
490 491 492 493 494
	}

	LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}

M
Merge  
Michael Kolupaev 已提交
495 496 497 498
void StorageReplicatedMergeTree::loadQueue()
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

M
Merge  
Michael Kolupaev 已提交
499
	Strings children = zookeeper->getChildren(replica_path + "/queue");
M
Merge  
Michael Kolupaev 已提交
500 501 502
	std::sort(children.begin(), children.end());
	for (const String & child : children)
	{
M
Merge  
Michael Kolupaev 已提交
503
		String s = zookeeper->get(replica_path + "/queue/" + child);
M
Merge  
Michael Kolupaev 已提交
504 505
		LogEntry entry = LogEntry::parse(s);
		entry.znode_name = child;
M
Merge  
Michael Kolupaev 已提交
506
		entry.tagPartsAsCurrentlyMerging(*this);
M
Merge  
Michael Kolupaev 已提交
507
		queue.push_back(entry);
M
Merge  
Michael Kolupaev 已提交
508 509 510 511 512 513
	}
}

void StorageReplicatedMergeTree::pullLogsToQueue()
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
514

M
Merge  
Michael Kolupaev 已提交
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
	/// Сольем все логи в хронологическом порядке.

	struct LogIterator
	{
		String replica;		/// Имя реплики.
		UInt64 index;		/// Номер записи в логе (суффикс имени ноды).

		Int64 timestamp;	/// Время (czxid) создания записи в логе.
		String entry_str;	/// Сама запись.

		bool operator<(const LogIterator & rhs) const
		{
			return timestamp < rhs.timestamp;
		}

		bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
		{
			String index_str = toString(index);
			while (index_str.size() < 10)
				index_str = '0' + index_str;
			zkutil::Stat stat;
			if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
				return false;
			timestamp = stat.getczxid();
			return true;
		}
	};

	typedef std::priority_queue<LogIterator> PriorityQueue;
	PriorityQueue priority_queue;

M
Merge  
Michael Kolupaev 已提交
546
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
547

M
Merge  
Michael Kolupaev 已提交
548 549
	for (const String & replica : replicas)
	{
M
Merge  
Michael Kolupaev 已提交
550 551
		String index_str;
		UInt64 index;
M
Merge  
Michael Kolupaev 已提交
552

M
Merge  
Michael Kolupaev 已提交
553
		if (zookeeper->tryGet(replica_path + "/log_pointers/" + replica, index_str))
M
Merge  
Michael Kolupaev 已提交
554
		{
M
Michael Kolupaev 已提交
555
			index = parse<UInt64>(index_str);
M
Merge  
Michael Kolupaev 已提交
556 557 558 559
		}
		else
		{
			/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
M
Merge  
Michael Kolupaev 已提交
560
			Strings entries = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/log");
M
Merge  
Michael Kolupaev 已提交
561
			std::sort(entries.begin(), entries.end());
M
Michael Kolupaev 已提交
562
			index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
M
Merge  
Michael Kolupaev 已提交
563

M
Merge  
Michael Kolupaev 已提交
564
			zookeeper->create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
565 566
		}

M
Merge  
Michael Kolupaev 已提交
567 568 569 570
		LogIterator iterator;
		iterator.replica = replica;
		iterator.index = index;

M
Merge  
Michael Kolupaev 已提交
571
		if (iterator.readEntry(*zookeeper, zookeeper_path))
M
Merge  
Michael Kolupaev 已提交
572 573
			priority_queue.push(iterator);
	}
M
Merge  
Michael Kolupaev 已提交
574

M
Merge  
Michael Kolupaev 已提交
575 576
	size_t count = 0;

M
Merge  
Michael Kolupaev 已提交
577 578 579 580
	while (!priority_queue.empty())
	{
		LogIterator iterator = priority_queue.top();
		priority_queue.pop();
M
Merge  
Michael Kolupaev 已提交
581
		++count;
M
Merge  
Michael Kolupaev 已提交
582

M
Merge  
Michael Kolupaev 已提交
583
		LogEntry entry = LogEntry::parse(iterator.entry_str);
M
Merge  
Michael Kolupaev 已提交
584

M
Merge  
Michael Kolupaev 已提交
585 586 587
		/// Одновременно добавим запись в очередь и продвинем указатель на лог.
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Create(
M
Merge  
Michael Kolupaev 已提交
588
			replica_path + "/queue/queue-", iterator.entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
M
Merge  
Michael Kolupaev 已提交
589 590
		ops.push_back(new zkutil::Op::SetData(
			replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
M
Merge  
Michael Kolupaev 已提交
591
		auto results = zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
592 593

		String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
M
Merge  
Michael Kolupaev 已提交
594
		entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
M
Merge  
Michael Kolupaev 已提交
595
		entry.tagPartsAsCurrentlyMerging(*this);
M
Merge  
Michael Kolupaev 已提交
596 597 598
		queue.push_back(entry);

		++iterator.index;
M
Merge  
Michael Kolupaev 已提交
599
		if (iterator.readEntry(*zookeeper, zookeeper_path))
M
Merge  
Michael Kolupaev 已提交
600
			priority_queue.push(iterator);
M
Merge  
Michael Kolupaev 已提交
601
	}
M
Merge  
Michael Kolupaev 已提交
602 603 604

	if (count > 0)
		LOG_DEBUG(log, "Pulled " << count << " entries to queue");
M
Merge  
Michael Kolupaev 已提交
605 606
}

M
Merge  
Michael Kolupaev 已提交
607
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
M
Merge  
Michael Kolupaev 已提交
608
{
M
Merge  
Michael Kolupaev 已提交
609 610 611 612 613 614 615 616 617 618 619
	if (entry.type == LogEntry::MERGE_PARTS)
	{
		/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
		  * Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
		  * Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
		  * Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
		  */
		for (const auto & name : entry.parts_to_merge)
		{
			if (future_parts.count(name))
			{
M
Merge  
Michael Kolupaev 已提交
620
				LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " because part " << name << " is not ready yet.");
M
Merge  
Michael Kolupaev 已提交
621 622 623 624 625 626
				return false;
			}
		}
	}

	return true;
M
Merge  
Michael Kolupaev 已提交
627 628 629 630 631 632 633 634 635 636 637
}

void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
	if (entry.type == LogEntry::GET_PART ||
		entry.type == LogEntry::MERGE_PARTS)
	{
		/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
		MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name);

		/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
M
Merge  
Michael Kolupaev 已提交
638
		if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
M
Merge  
Michael Kolupaev 已提交
639
		{
M
Merge  
Michael Kolupaev 已提交
640 641
			if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
				LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
M
Merge  
Michael Kolupaev 已提交
642
			return;
M
Merge  
Michael Kolupaev 已提交
643
		}
M
Merge  
Michael Kolupaev 已提交
644 645
	}

M
Merge  
Michael Kolupaev 已提交
646
	if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
M
Merge  
Michael Kolupaev 已提交
647
		LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
M
Merge  
Michael Kolupaev 已提交
648 649 650

	bool do_fetch = false;

M
Merge  
Michael Kolupaev 已提交
651 652
	if (entry.type == LogEntry::GET_PART)
	{
M
Merge  
Michael Kolupaev 已提交
653
		do_fetch = true;
M
Merge  
Michael Kolupaev 已提交
654 655 656
	}
	else if (entry.type == LogEntry::MERGE_PARTS)
	{
M
Merge  
Michael Kolupaev 已提交
657
		MergeTreeData::DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
658
		bool have_all_parts = true;;
M
Merge  
Michael Kolupaev 已提交
659 660 661
		for (const String & name : entry.parts_to_merge)
		{
			MergeTreeData::DataPartPtr part = data.getContainingPart(name);
M
Merge  
Michael Kolupaev 已提交
662 663 664 665 666 667 668 669 670 671 672 673
			if (!part)
			{
				have_all_parts = false;
				break;
			}
			if (part->name != name)
			{
				LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
					<< " but should be merged into " << entry.new_part_name);
				have_all_parts = false;
				break;
			}
M
Merge  
Michael Kolupaev 已提交
674 675
			parts.push_back(part);
		}
M
Merge  
Michael Kolupaev 已提交
676

M
Merge  
Michael Kolupaev 已提交
677
		if (!have_all_parts)
M
Merge  
Michael Kolupaev 已提交
678
		{
M
Merge  
Michael Kolupaev 已提交
679 680 681
			/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
			do_fetch = true;
			LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
M
Merge  
Michael Kolupaev 已提交
682
		}
M
Merge  
Michael Kolupaev 已提交
683 684 685 686 687
		else
		{
			MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);

			zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
688
			checkPartAndAddToZooKeeper(part, ops);
M
Merge  
Michael Kolupaev 已提交
689

M
Merge  
Michael Kolupaev 已提交
690
			zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
691 692 693

			ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
		}
M
Merge  
Michael Kolupaev 已提交
694 695 696 697 698
	}
	else
	{
		throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
	}
M
Merge  
Michael Kolupaev 已提交
699 700 701 702 703

	if (do_fetch)
	{
		try
		{
M
Merge  
Michael Kolupaev 已提交
704 705 706 707 708 709
			String replica = findReplicaHavingPart(entry.new_part_name, true);
			if (replica.empty())
			{
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
				throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
			}
M
Merge  
Michael Kolupaev 已提交
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769
			fetchPart(entry.new_part_name, replica);

			if (entry.type == LogEntry::MERGE_PARTS)
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
		}
		catch (...)
		{
			/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
			  * а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
			  * для этого мерджа в конец очереди.
			  *
			  */
			try
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

				/// Найдем действие по объединению этого куска с другими. Запомним других.
				StringSet parts_for_merge;
				LogEntries::iterator merge_entry;
				for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
				{
					if (it->type == LogEntry::MERGE_PARTS)
					{
						if (std::find(it->parts_to_merge.begin(), it->parts_to_merge.end(), entry.new_part_name)
							!= it->parts_to_merge.end())
						{
							parts_for_merge = StringSet(it->parts_to_merge.begin(), it->parts_to_merge.end());
							merge_entry = it;
							break;
						}
					}
				}

				if (!parts_for_merge.empty())
				{
					/// Переместим в конец очереди действия, получающие parts_for_merge.
					for (LogEntries::iterator it = queue.begin(); it != queue.end();)
					{
						auto it0 = it;
						++it;

						if (it0 == merge_entry)
							break;

						if ((it0->type == LogEntry::MERGE_PARTS || it0->type == LogEntry::GET_PART)
							&& parts_for_merge.count(it0->new_part_name))
						{
							queue.splice(queue.end(), queue, it0, it);
						}
					}
				}
			}
			catch (...)
			{
				tryLogCurrentException(__PRETTY_FUNCTION__);
			}

			throw;
		}
	}
M
Merge  
Michael Kolupaev 已提交
770 771 772 773 774 775
}

void StorageReplicatedMergeTree::queueUpdatingThread()
{
	while (!shutdown_called)
	{
M
Merge  
Michael Kolupaev 已提交
776 777 778
		try
		{
			pullLogsToQueue();
M
Merge  
Michael Kolupaev 已提交
779 780

			clearOldParts();
M
Merge  
Michael Kolupaev 已提交
781 782 783 784 785 786 787

			/// Каждую минуту выбрасываем ненужные записи из лога.
			if (time(0) - clear_old_logs_time > 60)
			{
				clear_old_logs_time = time(0);
				clearOldLogs();
			}
M
Merge  
Michael Kolupaev 已提交
788 789 790 791 792 793
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
794 795
		std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP);
	}
M
Merge  
Michael Kolupaev 已提交
796
}
M
Merge  
Michael Kolupaev 已提交
797

M
Merge  
Michael Kolupaev 已提交
798 799 800 801 802
void StorageReplicatedMergeTree::queueThread()
{
	while (!shutdown_called)
	{
		LogEntry entry;
M
Merge  
Michael Kolupaev 已提交
803
		bool have_work = false;
M
Merge  
Michael Kolupaev 已提交
804

M
Merge  
Michael Kolupaev 已提交
805
		try
M
Merge  
Michael Kolupaev 已提交
806 807
		{
			Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
808
			bool empty = queue.empty();
M
Merge  
Michael Kolupaev 已提交
809 810
			if (!empty)
			{
M
Merge  
Michael Kolupaev 已提交
811 812 813 814 815 816 817 818 819 820 821
				for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
				{
					if (shouldExecuteLogEntry(*it))
					{
						entry = *it;
						entry.tagPartAsFuture(*this);
						queue.erase(it);
						have_work = true;
						break;
					}
				}
M
Merge  
Michael Kolupaev 已提交
822 823
			}
		}
M
Merge  
Michael Kolupaev 已提交
824 825 826 827
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
828

M
Merge  
Michael Kolupaev 已提交
829
		if (!have_work)
M
Merge  
Michael Kolupaev 已提交
830 831 832 833
		{
			std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
			continue;
		}
M
Merge  
Michael Kolupaev 已提交
834

M
Merge  
Michael Kolupaev 已提交
835
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
836

M
Merge  
Michael Kolupaev 已提交
837 838 839
		try
		{
			executeLogEntry(entry);
M
Merge  
Michael Kolupaev 已提交
840

M
Merge  
Michael Kolupaev 已提交
841
			auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry.znode_name);
M
Merge  
Michael Kolupaev 已提交
842 843 844
			if (code != zkutil::ReturnCode::Ok)
				LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
					<< zkutil::ReturnCode::toString(code) + ". There must be a bug somewhere. Ignoring it.");
M
Merge  
Michael Kolupaev 已提交
845 846 847

			success = true;
		}
M
Michael Kolupaev 已提交
848 849 850 851 852 853 854 855
		catch (Exception & e)
		{
			if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
				/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
				LOG_INFO(log, e.displayText());
			else
				tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
856 857
		catch (...)
		{
M
Merge  
Michael Kolupaev 已提交
858
			tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
859 860 861 862 863 864 865
		}

		if (shutdown_called)
			break;

		if (success)
		{
M
Michael Kolupaev 已提交
866
			entry.currently_merging_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
867 868 869 870 871 872
			std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP);
		}
		else
		{
			{
				/// Добавим действие, которое не получилось выполнить, в конец очереди.
M
Merge  
Michael Kolupaev 已提交
873
				entry.future_part_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
874 875 876
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
				queue.push_back(entry);
			}
M
Michael Kolupaev 已提交
877
			entry.currently_merging_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
878 879 880 881 882
			std::this_thread::sleep_for(QUEUE_ERROR_SLEEP);
		}
	}
}

M
Merge  
Michael Kolupaev 已提交
883 884 885 886 887 888
void StorageReplicatedMergeTree::mergeSelectingThread()
{
	pullLogsToQueue();

	while (!shutdown_called && is_leader_node)
	{
M
Michael Kolupaev 已提交
889
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
890

M
Michael Kolupaev 已提交
891
		try
M
Merge  
Michael Kolupaev 已提交
892
		{
M
Michael Kolupaev 已提交
893
			size_t merges_queued = 0;
M
Merge  
Michael Kolupaev 已提交
894

M
Michael Kolupaev 已提交
895 896
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
897

M
Michael Kolupaev 已提交
898 899 900 901
				for (const auto & entry : queue)
					if (entry.type == LogEntry::MERGE_PARTS)
						++merges_queued;
			}
M
Merge  
Michael Kolupaev 已提交
902

M
Michael Kolupaev 已提交
903 904 905 906 907
			if (merges_queued >= data.settings.merging_threads)
			{
				std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
				continue;
			}
M
Merge  
Michael Kolupaev 已提交
908

M
Michael Kolupaev 已提交
909 910
			/// Есть ли активный мердж крупных кусков.
			bool has_big_merge = false;
M
Merge  
Michael Kolupaev 已提交
911 912

			{
M
Michael Kolupaev 已提交
913 914 915
				Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);

				for (const auto & name : currently_merging)
M
Merge  
Michael Kolupaev 已提交
916
				{
M
Michael Kolupaev 已提交
917 918 919 920
					MergeTreeData::DataPartPtr part = data.getContainingPart(name);
					if (!part)
						continue;
					if (part->name != name)
M
Michael Kolupaev 已提交
921
					{
M
Merge  
Michael Kolupaev 已提交
922
						LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in " << part->name);
M
Michael Kolupaev 已提交
923 924
						continue;
					}
M
Michael Kolupaev 已提交
925 926 927 928 929
					if (part->size * data.index_granularity > 25 * 1024 * 1024)
					{
						has_big_merge = true;
						break;
					}
M
Merge  
Michael Kolupaev 已提交
930 931 932 933
				}
			}

			MergeTreeData::DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
934

M
Merge  
Michael Kolupaev 已提交
935
			{
M
Merge  
Michael Kolupaev 已提交
936
				Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
M
Merge  
Michael Kolupaev 已提交
937

M
Merge  
Michael Kolupaev 已提交
938 939 940 941
				String merged_name;
				auto can_merge = std::bind(
					&StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);

M
Merge  
Michael Kolupaev 已提交
942 943 944 945
				if (merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
												false, false, has_big_merge, can_merge) ||
					merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
												true, false, has_big_merge, can_merge))
M
Merge  
Michael Kolupaev 已提交
946
				{
M
Merge  
Michael Kolupaev 已提交
947 948
					LogEntry entry;
					entry.type = LogEntry::MERGE_PARTS;
M
Merge  
Michael Kolupaev 已提交
949
					entry.source_replica = replica_name;
M
Merge  
Michael Kolupaev 已提交
950 951 952 953 954 955 956
					entry.new_part_name = merged_name;

					for (const auto & part : parts)
					{
						entry.parts_to_merge.push_back(part->name);
					}

M
Merge  
Michael Kolupaev 已提交
957
					zookeeper->create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
M
Merge  
Michael Kolupaev 已提交
958 959

					success = true;
M
Merge  
Michael Kolupaev 已提交
960
				}
M
Merge  
Michael Kolupaev 已提交
961
			}
M
Merge  
Michael Kolupaev 已提交
962

M
Merge  
Michael Kolupaev 已提交
963
			if (success)
M
Merge  
Michael Kolupaev 已提交
964
			{
M
Merge  
Michael Kolupaev 已提交
965 966 967 968 969 970
				/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
				///  (чтобы куски пометились как currently_merging).
				pullLogsToQueue();

				String month_name = parts[0]->name.substr(0, 6);
				for (size_t i = 0; i + 1 < parts.size(); ++i)
M
Merge  
Michael Kolupaev 已提交
971
				{
M
Merge  
Michael Kolupaev 已提交
972 973 974 975 976 977 978
					/// Уберем больше не нужные отметки о несуществующих блоках.
					for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
					{
						String number_str = toString(number);
						while (number_str.size() < 10)
							number_str = '0' + number_str;
						String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
M
Merge  
Michael Kolupaev 已提交
979

M
Merge  
Michael Kolupaev 已提交
980
						zookeeper->tryRemove(path);
M
Merge  
Michael Kolupaev 已提交
981
					}
M
Merge  
Michael Kolupaev 已提交
982
				}
M
Merge  
Michael Kolupaev 已提交
983 984 985 986 987 988 989
			}
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
990
		if (shutdown_called || !is_leader_node)
M
Merge  
Michael Kolupaev 已提交
991 992
			break;

M
Merge  
Michael Kolupaev 已提交
993 994 995 996 997 998 999 1000 1001 1002
		if (!success)
			std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
	}
}

void StorageReplicatedMergeTree::clearOldBlocksThread()
{
	while (!shutdown_called && is_leader_node)
	{
		try
M
Merge  
Michael Kolupaev 已提交
1003 1004 1005
		{
			clearOldBlocks();
		}
M
Merge  
Michael Kolupaev 已提交
1006 1007 1008 1009
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
1010

M
Merge  
Michael Kolupaev 已提交
1011 1012 1013 1014 1015 1016 1017 1018
		/// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду.
		/// TODO: Лучше во всех подобных местах использовать condition variable.
		for (size_t i = 0; i < 60; ++i)
		{
			if (shutdown_called || !is_leader_node)
				break;
			std::this_thread::sleep_for(std::chrono::seconds(1));
		}
M
Merge  
Michael Kolupaev 已提交
1019 1020 1021 1022 1023 1024 1025 1026
	}
}

bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
	if (currently_merging.count(left->name) || currently_merging.count(right->name))
		return false;

M
Merge  
Michael Kolupaev 已提交
1027 1028
	String month_name = left->name.substr(0, 6);

M
Merge  
Michael Kolupaev 已提交
1029 1030 1031 1032 1033 1034
	/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
	for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
	{
		String number_str = toString(number);
		while (number_str.size() < 10)
			number_str = '0' + number_str;
M
Merge  
Michael Kolupaev 已提交
1035
		String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + number_str;
M
Merge  
Michael Kolupaev 已提交
1036

M
Merge  
Michael Kolupaev 已提交
1037
		if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
M
Merge  
Michael Kolupaev 已提交
1038 1039
		{
			LOG_DEBUG(log, "Can't merge parts " << left->name << " and " << right->name << " because block " << path << " exists");
M
Merge  
Michael Kolupaev 已提交
1040
			return false;
M
Merge  
Michael Kolupaev 已提交
1041
		}
M
Merge  
Michael Kolupaev 已提交
1042 1043 1044 1045 1046 1047 1048
	}

	return true;
}

void StorageReplicatedMergeTree::becomeLeader()
{
M
Merge  
Michael Kolupaev 已提交
1049
	LOG_INFO(log, "Became leader");
M
Merge  
Michael Kolupaev 已提交
1050 1051
	is_leader_node = true;
	merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
M
Merge  
Michael Kolupaev 已提交
1052
	clear_old_blocks_thread = std::thread(&StorageReplicatedMergeTree::clearOldBlocksThread, this);
M
Merge  
Michael Kolupaev 已提交
1053 1054
}

M
Merge  
Michael Kolupaev 已提交
1055
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
M
Merge  
Michael Kolupaev 已提交
1056
{
M
Merge  
Michael Kolupaev 已提交
1057
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
1058 1059 1060 1061 1062 1063

	/// Из реплик, у которых есть кусок, выберем одну равновероятно.
	std::random_shuffle(replicas.begin(), replicas.end());

	for (const String & replica : replicas)
	{
M
Merge  
Michael Kolupaev 已提交
1064 1065
		if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
			(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
M
Merge  
Michael Kolupaev 已提交
1066 1067 1068
			return replica;
	}

M
Merge  
Michael Kolupaev 已提交
1069
	return "";
M
Merge  
Michael Kolupaev 已提交
1070 1071 1072 1073
}

void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_name)
{
M
Merge  
Michael Kolupaev 已提交
1074 1075
	LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_name);

M
Merge  
Michael Kolupaev 已提交
1076 1077 1078 1079 1080
	auto table_lock = lockStructure(true);

	String host;
	int port;

M
Merge  
Michael Kolupaev 已提交
1081
	String host_port_str = zookeeper->get(zookeeper_path + "/replicas/" + replica_name + "/host");
M
Merge  
Michael Kolupaev 已提交
1082 1083 1084 1085 1086 1087 1088 1089
	ReadBufferFromString buf(host_port_str);
	assertString("host: ", buf);
	readString(host, buf);
	assertString("\nport: ", buf);
	readText(port, buf);
	assertString("\n", buf);
	assertEOF(buf);

M
Merge  
Michael Kolupaev 已提交
1090
	MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
M
Merge  
Michael Kolupaev 已提交
1091 1092
	auto removed_parts = data.renameTempPartAndReplace(part);

M
Merge  
Michael Kolupaev 已提交
1093
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
1094
	checkPartAndAddToZooKeeper(part, ops);
M
Michael Kolupaev 已提交
1095

M
Merge  
Michael Kolupaev 已提交
1096
	zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
1097

M
Michael Kolupaev 已提交
1098 1099 1100 1101 1102 1103
	for (const auto & removed_part : removed_parts)
	{
		LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
		ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
	}

M
Merge  
Michael Kolupaev 已提交
1104 1105
	ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);

M
Merge  
Michael Kolupaev 已提交
1106
	LOG_DEBUG(log, "Fetched part");
M
Merge  
Michael Kolupaev 已提交
1107
}
M
Merge  
Michael Kolupaev 已提交
1108

M
Merge  
Michael Kolupaev 已提交
1109 1110
void StorageReplicatedMergeTree::shutdown()
{
M
Merge  
Michael Kolupaev 已提交
1111
	if (permanent_shutdown_called)
M
Merge  
Michael Kolupaev 已提交
1112
		return;
M
Merge  
Michael Kolupaev 已提交
1113 1114 1115 1116 1117 1118
	permanent_shutdown_called = true;
	restarting_thread.join();
}

void StorageReplicatedMergeTree::partialShutdown()
{
M
Merge  
Michael Kolupaev 已提交
1119
	leader_election = nullptr;
M
Merge  
Michael Kolupaev 已提交
1120 1121 1122
	shutdown_called = true;
	replica_is_active_node = nullptr;

M
Merge  
Michael Kolupaev 已提交
1123 1124 1125 1126
	merger.cancelAll();
	if (unreplicated_merger)
		unreplicated_merger->cancelAll();

M
Merge  
Michael Kolupaev 已提交
1127
	LOG_TRACE(log, "Waiting for threads to finish");
M
Merge  
Michael Kolupaev 已提交
1128
	if (is_leader_node)
M
Merge  
Michael Kolupaev 已提交
1129
	{
M
Merge  
Michael Kolupaev 已提交
1130
		is_leader_node = false;
M
Merge  
Michael Kolupaev 已提交
1131 1132 1133 1134
		if (merge_selecting_thread.joinable())
			merge_selecting_thread.join();
		if (clear_old_blocks_thread.joinable())
			clear_old_blocks_thread.join();
M
Merge  
Michael Kolupaev 已提交
1135
	}
M
Merge  
Michael Kolupaev 已提交
1136 1137
	if (queue_updating_thread.joinable())
		queue_updating_thread.join();
M
Merge  
Michael Kolupaev 已提交
1138 1139
	for (auto & thread : queue_threads)
		thread.join();
M
Merge  
Michael Kolupaev 已提交
1140
	queue_threads.clear();
M
Merge  
Michael Kolupaev 已提交
1141
	LOG_TRACE(log, "Threads finished");
M
Merge  
Michael Kolupaev 已提交
1142 1143
}

M
Merge  
Michael Kolupaev 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159
void StorageReplicatedMergeTree::goReadOnly()
{
	LOG_INFO(log, "Going to read-only mode");

	is_read_only = true;
	shutdown_called = true;
	permanent_shutdown_called = true;

	leader_election = nullptr;
	replica_is_active_node = nullptr;
	merger.cancelAll();
	is_leader_node = false;

	endpoint_holder = nullptr;
}

M
Merge  
Michael Kolupaev 已提交
1160 1161 1162 1163
void StorageReplicatedMergeTree::startup()
{
	shutdown_called = false;

M
Merge  
Michael Kolupaev 已提交
1164 1165 1166 1167
	merger.uncancelAll();
	if (unreplicated_merger)
		unreplicated_merger->uncancelAll();

M
Merge  
Michael Kolupaev 已提交
1168 1169
	activateReplica();

M
Merge  
Michael Kolupaev 已提交
1170
	leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", *zookeeper,
M
Merge  
Michael Kolupaev 已提交
1171 1172 1173 1174 1175 1176 1177
		std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);

	queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
	for (size_t i = 0; i < data.settings.replication_threads; ++i)
		queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
}

M
Merge  
Michael Kolupaev 已提交
1178 1179
void StorageReplicatedMergeTree::restartingThread()
{
M
Merge  
Michael Kolupaev 已提交
1180
	try
M
Merge  
Michael Kolupaev 已提交
1181
	{
M
Merge  
Michael Kolupaev 已提交
1182 1183 1184
		startup();

		while (!permanent_shutdown_called)
M
Merge  
Michael Kolupaev 已提交
1185
		{
M
Merge  
Michael Kolupaev 已提交
1186 1187 1188
			if (zookeeper->expired())
			{
				LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
M
Merge  
Michael Kolupaev 已提交
1189

M
Merge  
Michael Kolupaev 已提交
1190 1191 1192 1193
				/// Запретим писать в таблицу, пока подменяем zookeeper.
				LOG_TRACE(log, "Locking all operations");
				auto structure_lock = lockDataForAlter();
				LOG_TRACE(log, "Locked all operations");
M
Merge  
Michael Kolupaev 已提交
1194

M
Merge  
Michael Kolupaev 已提交
1195
				partialShutdown();
M
Merge  
Michael Kolupaev 已提交
1196

M
Merge  
Michael Kolupaev 已提交
1197 1198 1199 1200
				zookeeper = context.getZooKeeper();

				startup();
			}
M
Merge  
Michael Kolupaev 已提交
1201

M
Merge  
Michael Kolupaev 已提交
1202 1203 1204 1205 1206 1207
			std::this_thread::sleep_for(std::chrono::seconds(2));
		}
	}
	catch (...)
	{
		tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
M
Merge  
Michael Kolupaev 已提交
1208 1209
		LOG_ERROR(log, "Exception in restartingThread. The storage will be read-only until server restart.");
		goReadOnly();
M
Merge  
Michael Kolupaev 已提交
1210
		return;
M
Merge  
Michael Kolupaev 已提交
1211 1212
	}

M
Merge  
Michael Kolupaev 已提交
1213 1214 1215 1216 1217 1218 1219 1220 1221
	try
	{
		endpoint_holder = nullptr;
		partialShutdown();
	}
	catch (...)
	{
		tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
	}
M
Merge  
Michael Kolupaev 已提交
1222 1223
}

M
Merge  
Michael Kolupaev 已提交
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
	try
	{
		shutdown();
	}
	catch(...)
	{
		tryLogCurrentException("~StorageReplicatedMergeTree");
	}
}

BlockInputStreams StorageReplicatedMergeTree::read(
		const Names & column_names,
		ASTPtr query,
		const Settings & settings,
		QueryProcessingStage::Enum & processed_stage,
		size_t max_block_size,
		unsigned threads)
{
M
Merge  
Michael Kolupaev 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252
	BlockInputStreams res = reader.read(column_names, query, settings, processed_stage, max_block_size, threads);

	if (unreplicated_reader)
	{
		BlockInputStreams res2 = unreplicated_reader->read(column_names, query, settings, processed_stage, max_block_size, threads);
		res.insert(res.begin(), res2.begin(), res2.end());
	}

	return res;
M
Merge  
Michael Kolupaev 已提交
1253 1254
}

M
Merge  
Michael Kolupaev 已提交
1255 1256
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
M
Merge  
Michael Kolupaev 已提交
1257 1258 1259
	if (is_read_only)
		throw Exception("Table is in read only mode", ErrorCodes::TABLE_IS_READ_ONLY);

M
Merge  
Michael Kolupaev 已提交
1260 1261 1262 1263 1264 1265
	String insert_id;
	if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*query))
		insert_id = insert->insert_id;

	return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
M
Merge  
Michael Kolupaev 已提交
1266

M
Merge  
Michael Kolupaev 已提交
1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285
bool StorageReplicatedMergeTree::optimize()
{
	/// Померджим какие-нибудь куски из директории unreplicated. TODO: Мерджить реплицируемые куски тоже.

	if (!unreplicated_data)
		return false;

	unreplicated_data->clearOldParts();

	MergeTreeData::DataPartsVector parts;
	String merged_name;
	auto always_can_merge = [](const MergeTreeData::DataPartPtr &a, const MergeTreeData::DataPartPtr &b) { return true; };
	if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge))
		return false;

	unreplicated_merger->mergeParts(parts, merged_name);
	return true;
}

M
Merge  
Michael Kolupaev 已提交
1286 1287
void StorageReplicatedMergeTree::drop()
{
M
Merge  
Michael Kolupaev 已提交
1288 1289
	shutdown();

M
Merge  
Michael Kolupaev 已提交
1290
	replica_is_active_node = nullptr;
M
Merge  
Michael Kolupaev 已提交
1291 1292 1293
	zookeeper->removeRecursive(replica_path);
	if (zookeeper->getChildren(zookeeper_path + "/replicas").empty())
		zookeeper->removeRecursive(zookeeper_path);
M
Merge  
Michael Kolupaev 已提交
1294 1295
}

M
Merge  
Michael Kolupaev 已提交
1296 1297 1298
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
	writeString("format version: 1\n", out);
M
Merge  
Michael Kolupaev 已提交
1299 1300 1301
	writeString("source replica: ", out);
	writeString(source_replica, out);
	writeString("\n", out);
M
Merge  
Michael Kolupaev 已提交
1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326
	switch (type)
	{
		case GET_PART:
			writeString("get\n", out);
			writeString(new_part_name, out);
			break;
		case MERGE_PARTS:
			writeString("merge\n", out);
			for (const String & s : parts_to_merge)
			{
				writeString(s, out);
				writeString("\n", out);
			}
			writeString("into\n", out);
			writeString(new_part_name, out);
			break;
	}
	writeString("\n", out);
}

void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
	String type_str;

	assertString("format version: 1\n", in);
M
Michael Kolupaev 已提交
1327
	assertString("source replica: ", in);
M
Merge  
Michael Kolupaev 已提交
1328
	readString(source_replica, in);
M
Merge  
Michael Kolupaev 已提交
1329
	assertString("\n", in);
M
Michael Kolupaev 已提交
1330 1331
	readString(type_str, in);
	assertString("\n", in);
M
Merge  
Michael Kolupaev 已提交
1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354

	if (type_str == "get")
	{
		type = GET_PART;
		readString(new_part_name, in);
	}
	else if (type_str == "merge")
	{
		type = MERGE_PARTS;
		while (true)
		{
			String s;
			readString(s, in);
			assertString("\n", in);
			if (s == "into")
				break;
			parts_to_merge.push_back(s);
		}
		readString(new_part_name, in);
	}
	assertString("\n", in);
}

M
Merge  
Michael Kolupaev 已提交
1355
}