StorageReplicatedMergeTree.cpp 36.0 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/StorageReplicatedMergeTree.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
M
Merge  
Michael Kolupaev 已提交
4 5 6
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
M
Merge  
Michael Kolupaev 已提交
7 8 9 10

namespace DB
{

M
Merge  
Michael Kolupaev 已提交
11 12 13 14 15

const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5);
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
M
Merge  
Michael Kolupaev 已提交
16
const auto MERGE_SELECTING_SLEEP = std::chrono::seconds(5);
M
Merge  
Michael Kolupaev 已提交
17 18


M
Merge  
Michael Kolupaev 已提交
19 20 21
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
22
	bool attach,
M
Merge  
Michael Kolupaev 已提交
23
	const String & path_, const String & name_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
24
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
25 26 27 28 29 30 31 32
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
	:
M
Merge  
Michael Kolupaev 已提交
33
	context(context_), zookeeper(context.getZooKeeper()),
M
Merge  
Michael Kolupaev 已提交
34
	table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
M
Merge  
Michael Kolupaev 已提交
35
	replica_name(replica_name_),
M
Merge  
Michael Kolupaev 已提交
36 37
	data(	full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
			index_granularity_,mode_, sign_column_, settings_),
M
Merge  
Michael Kolupaev 已提交
38
	reader(data), writer(data), merger(data), fetcher(data),
M
Merge  
Michael Kolupaev 已提交
39
	log(&Logger::get("StorageReplicatedMergeTree: " + table_name))
M
Merge  
Michael Kolupaev 已提交
40
{
M
Merge  
Michael Kolupaev 已提交
41 42 43
	if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
		zookeeper_path.erase(zookeeper_path.end() - 1);
	replica_path = zookeeper_path + "/replicas/" + replica_name;
44

M
Merge  
Michael Kolupaev 已提交
45 46
	if (!attach)
	{
M
Merge  
Michael Kolupaev 已提交
47 48 49 50 51 52 53 54
		if (!zookeeper.exists(zookeeper_path))
			createTable();

		if (!isTableEmpty())
			throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);

		checkTableStructure();
		createReplica();
M
Merge  
Michael Kolupaev 已提交
55 56 57 58
	}
	else
	{
		checkTableStructure();
M
Merge  
Michael Kolupaev 已提交
59
		checkParts();
M
Merge  
Michael Kolupaev 已提交
60
	}
M
Merge  
Michael Kolupaev 已提交
61

M
Merge  
Michael Kolupaev 已提交
62
	loadQueue();
M
Merge  
Michael Kolupaev 已提交
63 64 65 66 67
}

StoragePtr StorageReplicatedMergeTree::create(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
68
	bool attach,
M
Merge  
Michael Kolupaev 已提交
69
	const String & path_, const String & name_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
70
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
71 72 73 74 75 76 77 78
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
{
M
Merge  
Michael Kolupaev 已提交
79 80 81 82
	StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
		path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
		index_granularity_, mode_, sign_column_, settings_);
	StoragePtr res_ptr = res->thisPtr();
M
Merge  
Michael Kolupaev 已提交
83
	res->startup();
M
Merge  
Michael Kolupaev 已提交
84
	return res_ptr;
M
Merge  
Michael Kolupaev 已提交
85 86
}

M
Merge  
Michael Kolupaev 已提交
87 88 89 90 91 92 93 94
static String formattedAST(const ASTPtr & ast)
{
	if (!ast)
		return "";
	std::stringstream ss;
	formatAST(*ast, ss, 0, false, true);
	return ss.str();
}
M
Merge  
Michael Kolupaev 已提交
95

M
Merge  
Michael Kolupaev 已提交
96
void StorageReplicatedMergeTree::createTable()
M
Merge  
Michael Kolupaev 已提交
97
{
M
Merge  
Michael Kolupaev 已提交
98
	zookeeper.create(zookeeper_path, "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
99

M
Merge  
Michael Kolupaev 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
	/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
	std::stringstream metadata;
	metadata << "metadata format version: 1" << std::endl;
	metadata << "date column: " << data.date_column_name << std::endl;
	metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
	metadata << "index granularity: " << data.index_granularity << std::endl;
	metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
	metadata << "sign column: " << data.sign_column << std::endl;
	metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
	metadata << "columns:" << std::endl;
	WriteBufferFromOStream buf(metadata);
	for (auto & it : data.getColumnsList())
	{
		writeBackQuotedString(it.first, buf);
		writeChar(' ', buf);
		writeString(it.second->getName(), buf);
		writeChar('\n', buf);
	}
	buf.next();
M
Merge  
Michael Kolupaev 已提交
119

M
Merge  
Michael Kolupaev 已提交
120
	zookeeper.create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
121

M
Merge  
Michael Kolupaev 已提交
122 123
	zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
	zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
124
	zookeeper.create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
125
	zookeeper.create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
126
	zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
127
}
M
Merge  
Michael Kolupaev 已提交
128

M
Merge  
Michael Kolupaev 已提交
129 130
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
	* Если нет - бросить исключение.
M
Merge  
Michael Kolupaev 已提交
131
	*/
M
Merge  
Michael Kolupaev 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
void StorageReplicatedMergeTree::checkTableStructure()
{
	String metadata_str = zookeeper.get(zookeeper_path + "/metadata");
	ReadBufferFromString buf(metadata_str);
	assertString("metadata format version: 1", buf);
	assertString("\ndate column: ", buf);
	assertString(data.date_column_name, buf);
	assertString("\nsampling expression: ", buf);
	assertString(formattedAST(data.sampling_expression), buf);
	assertString("\nindex granularity: ", buf);
	assertString(toString(data.index_granularity), buf);
	assertString("\nmode: ", buf);
	assertString(toString(static_cast<int>(data.mode)), buf);
	assertString("\nsign column: ", buf);
	assertString(data.sign_column, buf);
	assertString("\nprimary key: ", buf);
	assertString(formattedAST(data.primary_expr_ast), buf);
	assertString("\ncolumns:\n", buf);
	for (auto & it : data.getColumnsList())
	{
		String name;
		readBackQuotedString(name, buf);
		if (name != it.first)
			throw Exception("Unexpected column name in ZooKeeper: expected " + it.first + ", found " + name,
				ErrorCodes::UNKNOWN_IDENTIFIER);
		assertString(" ", buf);
		assertString(it.second->getName(), buf);
		assertString("\n", buf);
	}
M
Merge  
Michael Kolupaev 已提交
161
	assertEOF(buf);
M
Merge  
Michael Kolupaev 已提交
162
}
M
Merge  
Michael Kolupaev 已提交
163

M
Merge  
Michael Kolupaev 已提交
164 165 166 167 168 169 170 171 172
void StorageReplicatedMergeTree::createReplica()
{
	zookeeper.create(replica_path, "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
}
M
Merge  
Michael Kolupaev 已提交
173

M
Merge  
Michael Kolupaev 已提交
174 175 176 177 178
void StorageReplicatedMergeTree::activateReplica()
{
	std::stringstream host;
	host << "host: " << context.getInterserverIOHost() << std::endl;
	host << "port: " << context.getInterserverIOPort() << std::endl;
M
Merge  
Michael Kolupaev 已提交
179

M
Merge  
Michael Kolupaev 已提交
180
	/// Одновременно объявим, что эта реплика активна, и обновим хост.
M
Merge  
Michael Kolupaev 已提交
181 182 183
	zkutil::Ops ops;
	ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper.getDefaultACL(), zkutil::CreateMode::Ephemeral));
	ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
M
Merge  
Michael Kolupaev 已提交
184 185 186 187 188 189 190 191 192 193 194 195 196

	try
	{
		zookeeper.multi(ops);
	}
	catch (zkutil::KeeperException & e)
	{
		if (e.code == zkutil::ReturnCode::NodeExists)
			throw Exception("Replica " + replica_path + " appears to be already active. If you're sure it's not, "
				"try again in a minute or remove znode " + replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);

		throw;
	}
M
Merge  
Michael Kolupaev 已提交
197

M
Merge  
Michael Kolupaev 已提交
198 199
	replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", zookeeper);
}
M
Merge  
Michael Kolupaev 已提交
200

M
Merge  
Michael Kolupaev 已提交
201 202 203 204 205 206 207 208 209 210
bool StorageReplicatedMergeTree::isTableEmpty()
{
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
	for (const auto & replica : replicas)
	{
		if (!zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
			return false;
	}
	return true;
}
M
Merge  
Michael Kolupaev 已提交
211

M
Merge  
Michael Kolupaev 已提交
212 213 214 215 216
void StorageReplicatedMergeTree::checkParts()
{
	Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
	NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());

M
Merge  
Michael Kolupaev 已提交
217
	MergeTreeData::DataParts parts = data.getAllDataParts();
M
Merge  
Michael Kolupaev 已提交
218

M
Merge  
Michael Kolupaev 已提交
219
	MergeTreeData::DataParts unexpected_parts;
M
Merge  
Michael Kolupaev 已提交
220 221 222 223 224 225 226 227
	for (const auto & part : parts)
	{
		if (expected_parts.count(part->name))
		{
			expected_parts.erase(part->name);
		}
		else
		{
M
Merge  
Michael Kolupaev 已提交
228
			unexpected_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
229 230 231
		}
	}

M
Merge  
Michael Kolupaev 已提交
232 233 234 235 236 237 238 239 240
	/// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим.
	MergeTreeData::DataPartsVector parts_to_add;
	for (const String & missing_name : expected_parts)
	{
		auto containing = data.getContainingPart(missing_name);
		if (!containing)
			throw Exception("Not found " + toString(expected_parts.size())
				+ " parts (including " + missing_name + ") in table " + data.getTableName(),
				ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
241
		LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
M
Merge  
Michael Kolupaev 已提交
242 243 244 245 246 247
		if (unexpected_parts.count(containing))
		{
			parts_to_add.push_back(containing);
			unexpected_parts.erase(containing);
		}
	}
M
Merge  
Michael Kolupaev 已提交
248

M
Merge  
Michael Kolupaev 已提交
249 250 251 252 253
	if (parts_to_add.size() > 2 ||
		unexpected_parts.size() > 2 ||
		expected_parts.size() > 20)
	{
		throw Exception("The local set of parts of table " + data.getTableName() + " doesn't look like the set of parts in ZooKeeper."
M
Merge  
Michael Kolupaev 已提交
254
			" There are " + toString(unexpected_parts.size()) + " unexpected parts, "
M
Merge  
Michael Kolupaev 已提交
255 256
			+ toString(parts_to_add.size()) + " unexpectedly merged parts, "
			+ toString(expected_parts.size()) + " unexpectedly obsolete parts",
M
Merge  
Michael Kolupaev 已提交
257
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
M
Merge  
Michael Kolupaev 已提交
258 259 260 261 262
	}

	/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
	for (MergeTreeData::DataPartPtr part : parts_to_add)
	{
M
Merge  
Michael Kolupaev 已提交
263
		LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
M
Merge  
Michael Kolupaev 已提交
264 265 266 267
		zkutil::Ops ops;
		checkPartAndAddToZooKeeper(part, ops);
		zookeeper.multi(ops);
	}
M
Merge  
Michael Kolupaev 已提交
268

M
Merge  
Michael Kolupaev 已提交
269 270 271 272 273 274 275 276 277 278
	/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
	for (const String & name : expected_parts)
	{
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
		zookeeper.multi(ops);
	}

	/// Удалим лишние локальные куски.
M
Merge  
Michael Kolupaev 已提交
279 280 281
	for (MergeTreeData::DataPartPtr part : unexpected_parts)
	{
		LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name);
M
Merge  
Michael Kolupaev 已提交
282
		data.renameAndDetachPart(part, "ignored_");
M
Merge  
Michael Kolupaev 已提交
283 284
	}
}
M
Merge  
Michael Kolupaev 已提交
285

M
Merge  
Michael Kolupaev 已提交
286 287 288 289 290
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
{
	String another_replica = findReplicaHavingPart(part->name, false);
	if (!another_replica.empty())
	{
M
Merge  
Michael Kolupaev 已提交
291 292 293 294 295 296
		String checksums_str;
		if (zookeeper.tryGet(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums", checksums_str))
		{
			auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
			checksums.checkEqual(part->checksums, true);
		}
M
Merge  
Michael Kolupaev 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310
	}

	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name,
		"",
		zookeeper.getDefaultACL(),
		zkutil::CreateMode::Persistent));
	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name + "/checksums",
		part->checksums.toString(),
		zookeeper.getDefaultACL(),
		zkutil::CreateMode::Persistent));
}

M
Merge  
Michael Kolupaev 已提交
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
void StorageReplicatedMergeTree::clearOldParts()
{
	Strings parts = data.clearOldParts();

	for (const String & name : parts)
	{
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
		zkutil::ReturnCode::type code = zookeeper.tryMulti(ops);
		if (code != zkutil::ReturnCode::Ok)
			LOG_WARNING(log, "Couldn't remove part " << name << " from ZooKeeper: " << zkutil::ReturnCode::toString(code));
	}

	if (!parts.empty())
		LOG_DEBUG(log, "Removed " << parts.size() << " old parts");
}

M
Merge  
Michael Kolupaev 已提交
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
void StorageReplicatedMergeTree::clearOldLogs()
{
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
	UInt64 min_pointer = std::numeric_limits<UInt64>::max();
	for (const String & replica : replicas)
	{
		String pointer;
		if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
			return;
		min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
	}

	Strings entries = zookeeper.getChildren(replica_path + "/log");
	std::sort(entries.begin(), entries.end());
	size_t removed = 0;

	for (const String & entry : entries)
	{
		UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
		if (index >= min_pointer)
			break;
		zookeeper.remove(replica_path + "/log/" + entry);
		++removed;
	}

	if (removed > 0)
		LOG_DEBUG(log, "Removed " << removed << " old log entries");
}

void StorageReplicatedMergeTree::clearOldBlocks()
{
	zkutil::Stat stat;
	if (!zookeeper.exists(zookeeper_path + "/blocks", &stat))
		throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);

M
Merge  
Michael Kolupaev 已提交
364 365
	int children_count = stat.getnumChildren();

M
Merge  
Michael Kolupaev 已提交
366
	/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
M
Merge  
Michael Kolupaev 已提交
367
	if (static_cast<double>(children_count) < data.settings.replicated_deduplication_window * 1.1)
M
Merge  
Michael Kolupaev 已提交
368 369
		return;

M
Merge  
Michael Kolupaev 已提交
370 371 372
	LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
		<< " old blocks from ZooKeeper");

M
Merge  
Michael Kolupaev 已提交
373 374 375 376 377 378 379 380 381 382 383
	Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks");

	std::vector<std::pair<Int64, String> > timed_blocks;

	for (const String & block : blocks)
	{
		zkutil::Stat stat;
		zookeeper.exists(zookeeper_path + "/blocks/" + block, &stat);
		timed_blocks.push_back(std::make_pair(stat.getczxid(), block));
	}

M
Merge  
Michael Kolupaev 已提交
384
	std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
M
Merge  
Michael Kolupaev 已提交
385 386
	for (size_t i = data.settings.replicated_deduplication_window; i <  timed_blocks.size(); ++i)
	{
M
Merge  
Michael Kolupaev 已提交
387 388 389 390 391
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
		zookeeper.multi(ops);
M
Merge  
Michael Kolupaev 已提交
392 393 394 395 396
	}

	LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}

M
Merge  
Michael Kolupaev 已提交
397 398 399 400 401 402 403 404
void StorageReplicatedMergeTree::loadQueue()
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

	Strings children = zookeeper.getChildren(replica_path + "/queue");
	std::sort(children.begin(), children.end());
	for (const String & child : children)
	{
M
Merge  
Michael Kolupaev 已提交
405
		String s = zookeeper.get(replica_path + "/queue/" + child);
M
Merge  
Michael Kolupaev 已提交
406 407
		LogEntry entry = LogEntry::parse(s);
		entry.znode_name = child;
M
Merge  
Michael Kolupaev 已提交
408
		entry.tagPartsAsCurrentlyMerging(*this);
M
Merge  
Michael Kolupaev 已提交
409
		queue.push_back(entry);
M
Merge  
Michael Kolupaev 已提交
410 411 412 413 414 415
	}
}

void StorageReplicatedMergeTree::pullLogsToQueue()
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
416

M
Merge  
Michael Kolupaev 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
	/// Сольем все логи в хронологическом порядке.

	struct LogIterator
	{
		String replica;		/// Имя реплики.
		UInt64 index;		/// Номер записи в логе (суффикс имени ноды).

		Int64 timestamp;	/// Время (czxid) создания записи в логе.
		String entry_str;	/// Сама запись.

		bool operator<(const LogIterator & rhs) const
		{
			return timestamp < rhs.timestamp;
		}

		bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
		{
			String index_str = toString(index);
			while (index_str.size() < 10)
				index_str = '0' + index_str;
			zkutil::Stat stat;
			if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
				return false;
			timestamp = stat.getczxid();
			return true;
		}
	};

	typedef std::priority_queue<LogIterator> PriorityQueue;
	PriorityQueue priority_queue;

M
Merge  
Michael Kolupaev 已提交
448
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
449

M
Merge  
Michael Kolupaev 已提交
450 451
	for (const String & replica : replicas)
	{
M
Merge  
Michael Kolupaev 已提交
452 453
		String index_str;
		UInt64 index;
M
Merge  
Michael Kolupaev 已提交
454

M
Merge  
Michael Kolupaev 已提交
455
		if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, index_str))
M
Merge  
Michael Kolupaev 已提交
456
		{
M
Michael Kolupaev 已提交
457
			index = parse<UInt64>(index_str);
M
Merge  
Michael Kolupaev 已提交
458 459 460 461
		}
		else
		{
			/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
M
Merge  
Michael Kolupaev 已提交
462
			Strings entries = zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/log");
M
Merge  
Michael Kolupaev 已提交
463
			std::sort(entries.begin(), entries.end());
M
Michael Kolupaev 已提交
464
			index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
M
Merge  
Michael Kolupaev 已提交
465

M
Merge  
Michael Kolupaev 已提交
466
			zookeeper.create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
467 468
		}

M
Merge  
Michael Kolupaev 已提交
469 470 471 472 473 474 475
		LogIterator iterator;
		iterator.replica = replica;
		iterator.index = index;

		if (iterator.readEntry(zookeeper, zookeeper_path))
			priority_queue.push(iterator);
	}
M
Merge  
Michael Kolupaev 已提交
476

M
Merge  
Michael Kolupaev 已提交
477 478
	size_t count = 0;

M
Merge  
Michael Kolupaev 已提交
479 480 481 482
	while (!priority_queue.empty())
	{
		LogIterator iterator = priority_queue.top();
		priority_queue.pop();
M
Merge  
Michael Kolupaev 已提交
483
		++count;
M
Merge  
Michael Kolupaev 已提交
484

M
Merge  
Michael Kolupaev 已提交
485
		LogEntry entry = LogEntry::parse(iterator.entry_str);
M
Merge  
Michael Kolupaev 已提交
486

M
Merge  
Michael Kolupaev 已提交
487 488 489 490 491 492 493 494 495
		/// Одновременно добавим запись в очередь и продвинем указатель на лог.
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Create(
			replica_path + "/queue/queue-", iterator.entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential));
		ops.push_back(new zkutil::Op::SetData(
			replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
		auto results = zookeeper.multi(ops);

		String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
M
Merge  
Michael Kolupaev 已提交
496
		entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
M
Merge  
Michael Kolupaev 已提交
497
		entry.tagPartsAsCurrentlyMerging(*this);
M
Merge  
Michael Kolupaev 已提交
498 499 500 501 502
		queue.push_back(entry);

		++iterator.index;
		if (iterator.readEntry(zookeeper, zookeeper_path))
			priority_queue.push(iterator);
M
Merge  
Michael Kolupaev 已提交
503
	}
M
Merge  
Michael Kolupaev 已提交
504 505 506

	if (count > 0)
		LOG_DEBUG(log, "Pulled " << count << " entries to queue");
M
Merge  
Michael Kolupaev 已提交
507 508
}

M
Merge  
Michael Kolupaev 已提交
509
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
M
Merge  
Michael Kolupaev 已提交
510
{
M
Merge  
Michael Kolupaev 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528
	if (entry.type == LogEntry::MERGE_PARTS)
	{
		/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
		  * Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
		  * Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
		  * Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
		  */
		for (const auto & name : entry.parts_to_merge)
		{
			if (future_parts.count(name))
			{
				LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " yet because part " << name << " is not ready yet.");
				return false;
			}
		}
	}

	return true;
M
Merge  
Michael Kolupaev 已提交
529 530 531 532 533 534 535 536 537 538 539 540
}

void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
	if (entry.type == LogEntry::GET_PART ||
		entry.type == LogEntry::MERGE_PARTS)
	{
		/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
		MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name);

		/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
		if (containing_part && zookeeper.exists(replica_path + "/parts/" + containing_part->name))
M
Merge  
Michael Kolupaev 已提交
541
		{
M
Merge  
Michael Kolupaev 已提交
542 543
			if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
				LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
M
Merge  
Michael Kolupaev 已提交
544
			return;
M
Merge  
Michael Kolupaev 已提交
545
		}
M
Merge  
Michael Kolupaev 已提交
546 547
	}

M
Merge  
Michael Kolupaev 已提交
548 549 550 551 552
	if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
		LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist. This is a bug.");

	bool do_fetch = false;

M
Merge  
Michael Kolupaev 已提交
553 554
	if (entry.type == LogEntry::GET_PART)
	{
M
Merge  
Michael Kolupaev 已提交
555
		do_fetch = true;
M
Merge  
Michael Kolupaev 已提交
556 557 558
	}
	else if (entry.type == LogEntry::MERGE_PARTS)
	{
M
Merge  
Michael Kolupaev 已提交
559
		MergeTreeData::DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
560
		bool have_all_parts = true;;
M
Merge  
Michael Kolupaev 已提交
561 562 563
		for (const String & name : entry.parts_to_merge)
		{
			MergeTreeData::DataPartPtr part = data.getContainingPart(name);
M
Merge  
Michael Kolupaev 已提交
564 565 566 567 568 569 570 571 572 573 574 575
			if (!part)
			{
				have_all_parts = false;
				break;
			}
			if (part->name != name)
			{
				LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
					<< " but should be merged into " << entry.new_part_name);
				have_all_parts = false;
				break;
			}
M
Merge  
Michael Kolupaev 已提交
576 577
			parts.push_back(part);
		}
M
Merge  
Michael Kolupaev 已提交
578

M
Merge  
Michael Kolupaev 已提交
579
		if (!have_all_parts)
M
Merge  
Michael Kolupaev 已提交
580
		{
M
Merge  
Michael Kolupaev 已提交
581 582 583
			/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
			do_fetch = true;
			LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
M
Merge  
Michael Kolupaev 已提交
584
		}
M
Merge  
Michael Kolupaev 已提交
585 586 587 588 589
		else
		{
			MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);

			zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
590
			checkPartAndAddToZooKeeper(part, ops);
M
Merge  
Michael Kolupaev 已提交
591 592 593 594 595

			zookeeper.multi(ops);

			ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
		}
M
Merge  
Michael Kolupaev 已提交
596 597 598 599 600
	}
	else
	{
		throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
	}
M
Merge  
Michael Kolupaev 已提交
601 602 603 604 605

	if (do_fetch)
	{
		try
		{
M
Merge  
Michael Kolupaev 已提交
606 607 608 609 610 611
			String replica = findReplicaHavingPart(entry.new_part_name, true);
			if (replica.empty())
			{
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
				throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
			}
M
Merge  
Michael Kolupaev 已提交
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
			fetchPart(entry.new_part_name, replica);

			if (entry.type == LogEntry::MERGE_PARTS)
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
		}
		catch (...)
		{
			/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
			  * а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
			  * для этого мерджа в конец очереди.
			  *
			  */
			try
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

				/// Найдем действие по объединению этого куска с другими. Запомним других.
				StringSet parts_for_merge;
				LogEntries::iterator merge_entry;
				for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
				{
					if (it->type == LogEntry::MERGE_PARTS)
					{
						if (std::find(it->parts_to_merge.begin(), it->parts_to_merge.end(), entry.new_part_name)
							!= it->parts_to_merge.end())
						{
							parts_for_merge = StringSet(it->parts_to_merge.begin(), it->parts_to_merge.end());
							merge_entry = it;
							break;
						}
					}
				}

				if (!parts_for_merge.empty())
				{
					/// Переместим в конец очереди действия, получающие parts_for_merge.
					for (LogEntries::iterator it = queue.begin(); it != queue.end();)
					{
						auto it0 = it;
						++it;

						if (it0 == merge_entry)
							break;

						if ((it0->type == LogEntry::MERGE_PARTS || it0->type == LogEntry::GET_PART)
							&& parts_for_merge.count(it0->new_part_name))
						{
							queue.splice(queue.end(), queue, it0, it);
						}
					}
				}
			}
			catch (...)
			{
				tryLogCurrentException(__PRETTY_FUNCTION__);
			}

			throw;
		}
	}
M
Merge  
Michael Kolupaev 已提交
672 673 674 675 676 677
}

void StorageReplicatedMergeTree::queueUpdatingThread()
{
	while (!shutdown_called)
	{
M
Merge  
Michael Kolupaev 已提交
678 679 680
		try
		{
			pullLogsToQueue();
M
Merge  
Michael Kolupaev 已提交
681 682

			clearOldParts();
M
Merge  
Michael Kolupaev 已提交
683 684 685 686 687 688 689

			/// Каждую минуту выбрасываем ненужные записи из лога.
			if (time(0) - clear_old_logs_time > 60)
			{
				clear_old_logs_time = time(0);
				clearOldLogs();
			}
M
Merge  
Michael Kolupaev 已提交
690 691 692 693 694 695
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
696 697
		std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP);
	}
M
Merge  
Michael Kolupaev 已提交
698
}
M
Merge  
Michael Kolupaev 已提交
699

M
Merge  
Michael Kolupaev 已提交
700 701 702 703 704
void StorageReplicatedMergeTree::queueThread()
{
	while (!shutdown_called)
	{
		LogEntry entry;
M
Merge  
Michael Kolupaev 已提交
705
		bool have_work = false;
M
Merge  
Michael Kolupaev 已提交
706

M
Merge  
Michael Kolupaev 已提交
707
		try
M
Merge  
Michael Kolupaev 已提交
708 709
		{
			Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
710
			bool empty = queue.empty();
M
Merge  
Michael Kolupaev 已提交
711 712
			if (!empty)
			{
M
Merge  
Michael Kolupaev 已提交
713 714 715 716 717 718 719 720 721 722 723
				for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
				{
					if (shouldExecuteLogEntry(*it))
					{
						entry = *it;
						entry.tagPartAsFuture(*this);
						queue.erase(it);
						have_work = true;
						break;
					}
				}
M
Merge  
Michael Kolupaev 已提交
724 725
			}
		}
M
Merge  
Michael Kolupaev 已提交
726 727 728 729
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
730

M
Merge  
Michael Kolupaev 已提交
731
		if (!have_work)
M
Merge  
Michael Kolupaev 已提交
732 733 734 735
		{
			std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
			continue;
		}
M
Merge  
Michael Kolupaev 已提交
736

M
Merge  
Michael Kolupaev 已提交
737
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
738

M
Merge  
Michael Kolupaev 已提交
739 740 741
		try
		{
			executeLogEntry(entry);
M
Merge  
Michael Kolupaev 已提交
742 743 744 745 746

			auto code = zookeeper.tryRemove(replica_path + "/queue/" + entry.znode_name);
			if (code != zkutil::ReturnCode::Ok)
				LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
					<< zkutil::ReturnCode::toString(code) + ". There must be a bug somewhere. Ignoring it.");
M
Merge  
Michael Kolupaev 已提交
747 748 749

			success = true;
		}
M
Michael Kolupaev 已提交
750 751 752 753 754 755 756 757
		catch (Exception & e)
		{
			if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
				/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
				LOG_INFO(log, e.displayText());
			else
				tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
758 759
		catch (...)
		{
M
Merge  
Michael Kolupaev 已提交
760
			tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
761 762 763 764 765 766 767
		}

		if (shutdown_called)
			break;

		if (success)
		{
M
Michael Kolupaev 已提交
768
			entry.currently_merging_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
769 770 771 772 773 774
			std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP);
		}
		else
		{
			{
				/// Добавим действие, которое не получилось выполнить, в конец очереди.
M
Merge  
Michael Kolupaev 已提交
775
				entry.future_part_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
776 777 778
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
				queue.push_back(entry);
			}
M
Michael Kolupaev 已提交
779
			entry.currently_merging_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
780 781 782 783 784
			std::this_thread::sleep_for(QUEUE_ERROR_SLEEP);
		}
	}
}

M
Merge  
Michael Kolupaev 已提交
785 786 787 788 789 790
void StorageReplicatedMergeTree::mergeSelectingThread()
{
	pullLogsToQueue();

	while (!shutdown_called && is_leader_node)
	{
M
Michael Kolupaev 已提交
791
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
792

M
Michael Kolupaev 已提交
793
		try
M
Merge  
Michael Kolupaev 已提交
794
		{
M
Michael Kolupaev 已提交
795
			size_t merges_queued = 0;
M
Merge  
Michael Kolupaev 已提交
796

M
Michael Kolupaev 已提交
797 798
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
799

M
Michael Kolupaev 已提交
800 801 802 803
				for (const auto & entry : queue)
					if (entry.type == LogEntry::MERGE_PARTS)
						++merges_queued;
			}
M
Merge  
Michael Kolupaev 已提交
804

M
Michael Kolupaev 已提交
805 806 807 808 809
			if (merges_queued >= data.settings.merging_threads)
			{
				std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
				continue;
			}
M
Merge  
Michael Kolupaev 已提交
810

M
Michael Kolupaev 已提交
811 812
			/// Есть ли активный мердж крупных кусков.
			bool has_big_merge = false;
M
Merge  
Michael Kolupaev 已提交
813 814

			{
M
Michael Kolupaev 已提交
815 816 817
				Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);

				for (const auto & name : currently_merging)
M
Merge  
Michael Kolupaev 已提交
818
				{
M
Michael Kolupaev 已提交
819 820 821 822
					MergeTreeData::DataPartPtr part = data.getContainingPart(name);
					if (!part)
						continue;
					if (part->name != name)
M
Michael Kolupaev 已提交
823 824 825 826
					{
						LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in" << part->name);
						continue;
					}
M
Michael Kolupaev 已提交
827 828 829 830 831
					if (part->size * data.index_granularity > 25 * 1024 * 1024)
					{
						has_big_merge = true;
						break;
					}
M
Merge  
Michael Kolupaev 已提交
832 833 834 835
				}
			}

			MergeTreeData::DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
836

M
Merge  
Michael Kolupaev 已提交
837
			{
M
Merge  
Michael Kolupaev 已提交
838
				Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
M
Merge  
Michael Kolupaev 已提交
839

M
Merge  
Michael Kolupaev 已提交
840 841 842 843 844 845
				String merged_name;
				auto can_merge = std::bind(
					&StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);

				if (merger.selectPartsToMerge(parts, merged_name, 0, false, false, has_big_merge, can_merge) ||
					merger.selectPartsToMerge(parts, merged_name, 0,  true, false, has_big_merge, can_merge))
M
Merge  
Michael Kolupaev 已提交
846
				{
M
Merge  
Michael Kolupaev 已提交
847 848
					LogEntry entry;
					entry.type = LogEntry::MERGE_PARTS;
M
Merge  
Michael Kolupaev 已提交
849
					entry.source_replica = replica_name;
M
Merge  
Michael Kolupaev 已提交
850 851 852 853 854 855 856 857 858 859
					entry.new_part_name = merged_name;

					for (const auto & part : parts)
					{
						entry.parts_to_merge.push_back(part->name);
					}

					zookeeper.create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);

					success = true;
M
Merge  
Michael Kolupaev 已提交
860
				}
M
Merge  
Michael Kolupaev 已提交
861
			}
M
Merge  
Michael Kolupaev 已提交
862

M
Merge  
Michael Kolupaev 已提交
863 864 865
			/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
			///  (чтобы куски пометились как currently_merging).
			pullLogsToQueue();
M
Merge  
Michael Kolupaev 已提交
866

M
Merge  
Michael Kolupaev 已提交
867 868 869 870 871 872 873 874 875
			for (size_t i = 0; i + 1 < parts.size(); ++i)
			{
				/// Уберем больше не нужные отметки о несуществующих блоках.
				for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
				{
					String number_str = toString(number);
					while (number_str.size() < 10)
						number_str = '0' + number_str;
					String path = zookeeper_path + "/block_numbers/block-" + number_str;
M
Merge  
Michael Kolupaev 已提交
876

M
Merge  
Michael Kolupaev 已提交
877 878
					zookeeper.tryRemove(path);
				}
M
Merge  
Michael Kolupaev 已提交
879 880 881 882 883 884 885
			}
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
886
		if (shutdown_called || !is_leader_node)
M
Merge  
Michael Kolupaev 已提交
887 888
			break;

M
Merge  
Michael Kolupaev 已提交
889 890 891 892 893 894 895 896 897 898
		if (!success)
			std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
	}
}

void StorageReplicatedMergeTree::clearOldBlocksThread()
{
	while (!shutdown_called && is_leader_node)
	{
		try
M
Merge  
Michael Kolupaev 已提交
899 900 901
		{
			clearOldBlocks();
		}
M
Merge  
Michael Kolupaev 已提交
902 903 904 905
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
906

M
Merge  
Michael Kolupaev 已提交
907 908 909 910 911 912 913 914
		/// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду.
		/// TODO: Лучше во всех подобных местах использовать condition variable.
		for (size_t i = 0; i < 60; ++i)
		{
			if (shutdown_called || !is_leader_node)
				break;
			std::this_thread::sleep_for(std::chrono::seconds(1));
		}
M
Merge  
Michael Kolupaev 已提交
915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931
	}
}

bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
	if (currently_merging.count(left->name) || currently_merging.count(right->name))
		return false;

	/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
	for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
	{
		String number_str = toString(number);
		while (number_str.size() < 10)
			number_str = '0' + number_str;
		String path = zookeeper_path + "/block_numbers/block-" + number_str;

		if (AbandonableLockInZooKeeper::check(path, zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
M
Merge  
Michael Kolupaev 已提交
932 933
		{
			LOG_DEBUG(log, "Can't merge parts " << left->name << " and " << right->name << " because block " << path << " exists");
M
Merge  
Michael Kolupaev 已提交
934
			return false;
M
Merge  
Michael Kolupaev 已提交
935
		}
M
Merge  
Michael Kolupaev 已提交
936 937 938 939 940 941 942
	}

	return true;
}

void StorageReplicatedMergeTree::becomeLeader()
{
M
Merge  
Michael Kolupaev 已提交
943
	LOG_INFO(log, "Became leader");
M
Merge  
Michael Kolupaev 已提交
944 945
	is_leader_node = true;
	merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
M
Merge  
Michael Kolupaev 已提交
946
	clear_old_blocks_thread = std::thread(&StorageReplicatedMergeTree::clearOldBlocksThread, this);
M
Merge  
Michael Kolupaev 已提交
947 948
}

M
Merge  
Michael Kolupaev 已提交
949
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
M
Merge  
Michael Kolupaev 已提交
950 951 952 953 954 955 956 957 958
{
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");

	/// Из реплик, у которых есть кусок, выберем одну равновероятно.
	std::random_shuffle(replicas.begin(), replicas.end());

	for (const String & replica : replicas)
	{
		if (zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
M
Merge  
Michael Kolupaev 已提交
959
			(!active || zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
M
Merge  
Michael Kolupaev 已提交
960 961 962
			return replica;
	}

M
Merge  
Michael Kolupaev 已提交
963
	return "";
M
Merge  
Michael Kolupaev 已提交
964 965 966 967
}

void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_name)
{
M
Merge  
Michael Kolupaev 已提交
968 969
	LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_name);

M
Merge  
Michael Kolupaev 已提交
970 971 972 973 974 975 976 977 978 979 980 981 982 983
	auto table_lock = lockStructure(true);

	String host;
	int port;

	String host_port_str = zookeeper.get(zookeeper_path + "/replicas/" + replica_name + "/host");
	ReadBufferFromString buf(host_port_str);
	assertString("host: ", buf);
	readString(host, buf);
	assertString("\nport: ", buf);
	readText(port, buf);
	assertString("\n", buf);
	assertEOF(buf);

M
Merge  
Michael Kolupaev 已提交
984
	MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
M
Merge  
Michael Kolupaev 已提交
985 986
	auto removed_parts = data.renameTempPartAndReplace(part);

M
Merge  
Michael Kolupaev 已提交
987
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
988
	checkPartAndAddToZooKeeper(part, ops);
M
Michael Kolupaev 已提交
989

M
Merge  
Michael Kolupaev 已提交
990 991
	zookeeper.multi(ops);

M
Michael Kolupaev 已提交
992 993 994 995 996 997
	for (const auto & removed_part : removed_parts)
	{
		LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
		ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
	}

M
Merge  
Michael Kolupaev 已提交
998 999
	ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);

M
Merge  
Michael Kolupaev 已提交
1000
	LOG_DEBUG(log, "Fetched part");
M
Merge  
Michael Kolupaev 已提交
1001
}
M
Merge  
Michael Kolupaev 已提交
1002

M
Merge  
Michael Kolupaev 已提交
1003 1004 1005 1006
void StorageReplicatedMergeTree::shutdown()
{
	if (shutdown_called)
		return;
M
Merge  
Michael Kolupaev 已提交
1007
	leader_election = nullptr;
M
Merge  
Michael Kolupaev 已提交
1008 1009 1010 1011
	shutdown_called = true;
	replica_is_active_node = nullptr;
	endpoint_holder = nullptr;

M
Merge  
Michael Kolupaev 已提交
1012
	LOG_TRACE(log, "Waiting for threads to finish");
M
Merge  
Michael Kolupaev 已提交
1013
	if (is_leader_node)
M
Merge  
Michael Kolupaev 已提交
1014
	{
M
Merge  
Michael Kolupaev 已提交
1015
		is_leader_node = false;
M
Merge  
Michael Kolupaev 已提交
1016
		merge_selecting_thread.join();
M
Merge  
Michael Kolupaev 已提交
1017 1018
		clear_old_blocks_thread.join();
	}
M
Merge  
Michael Kolupaev 已提交
1019 1020 1021 1022
	queue_updating_thread.join();
	for (auto & thread : queue_threads)
		thread.join();
	LOG_TRACE(log, "Threads finished");
M
Merge  
Michael Kolupaev 已提交
1023 1024
}

M
Merge  
Michael Kolupaev 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042
void StorageReplicatedMergeTree::startup()
{
	shutdown_called = false;

	String endpoint_name = "ReplicatedMergeTree:" + replica_path;
	InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(data, thisPtr());
	endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, context.getInterserverIOHandler());

	activateReplica();

	leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper,
		std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);

	queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
	for (size_t i = 0; i < data.settings.replication_threads; ++i)
		queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
}

M
Merge  
Michael Kolupaev 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
	try
	{
		shutdown();
	}
	catch(...)
	{
		tryLogCurrentException("~StorageReplicatedMergeTree");
	}
}

BlockInputStreams StorageReplicatedMergeTree::read(
		const Names & column_names,
		ASTPtr query,
		const Settings & settings,
		QueryProcessingStage::Enum & processed_stage,
		size_t max_block_size,
		unsigned threads)
{
	return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
}

M
Merge  
Michael Kolupaev 已提交
1066 1067 1068 1069 1070 1071 1072 1073
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
	String insert_id;
	if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*query))
		insert_id = insert->insert_id;

	return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
M
Merge  
Michael Kolupaev 已提交
1074 1075 1076

void StorageReplicatedMergeTree::drop()
{
M
Merge  
Michael Kolupaev 已提交
1077 1078
	shutdown();

M
Merge  
Michael Kolupaev 已提交
1079 1080 1081 1082 1083 1084
	replica_is_active_node = nullptr;
	zookeeper.removeRecursive(replica_path);
	if (zookeeper.getChildren(zookeeper_path + "/replicas").empty())
		zookeeper.removeRecursive(zookeeper_path);
}

M
Merge  
Michael Kolupaev 已提交
1085 1086 1087
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
	writeString("format version: 1\n", out);
M
Merge  
Michael Kolupaev 已提交
1088 1089 1090
	writeString("source replica: ", out);
	writeString(source_replica, out);
	writeString("\n", out);
M
Merge  
Michael Kolupaev 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
	switch (type)
	{
		case GET_PART:
			writeString("get\n", out);
			writeString(new_part_name, out);
			break;
		case MERGE_PARTS:
			writeString("merge\n", out);
			for (const String & s : parts_to_merge)
			{
				writeString(s, out);
				writeString("\n", out);
			}
			writeString("into\n", out);
			writeString(new_part_name, out);
			break;
	}
	writeString("\n", out);
}

void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
	String type_str;

	assertString("format version: 1\n", in);
M
Michael Kolupaev 已提交
1116
	assertString("source replica: ", in);
M
Merge  
Michael Kolupaev 已提交
1117
	readString(source_replica, in);
M
Merge  
Michael Kolupaev 已提交
1118
	assertString("\n", in);
M
Michael Kolupaev 已提交
1119 1120
	readString(type_str, in);
	assertString("\n", in);
M
Merge  
Michael Kolupaev 已提交
1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143

	if (type_str == "get")
	{
		type = GET_PART;
		readString(new_part_name, in);
	}
	else if (type_str == "merge")
	{
		type = MERGE_PARTS;
		while (true)
		{
			String s;
			readString(s, in);
			assertString("\n", in);
			if (s == "into")
				break;
			parts_to_merge.push_back(s);
		}
		readString(new_part_name, in);
	}
	assertString("\n", in);
}

M
Merge  
Michael Kolupaev 已提交
1144
}