StorageReplicatedMergeTree.cpp 35.9 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/StorageReplicatedMergeTree.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
M
Merge  
Michael Kolupaev 已提交
4 5 6
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
M
Merge  
Michael Kolupaev 已提交
7 8 9 10

namespace DB
{

M
Merge  
Michael Kolupaev 已提交
11 12 13 14 15

const auto QUEUE_UPDATE_SLEEP = std::chrono::seconds(5);
const auto QUEUE_NO_WORK_SLEEP = std::chrono::seconds(5);
const auto QUEUE_ERROR_SLEEP = std::chrono::seconds(1);
const auto QUEUE_AFTER_WORK_SLEEP = std::chrono::seconds(0);
M
Merge  
Michael Kolupaev 已提交
16
const auto MERGE_SELECTING_SLEEP = std::chrono::seconds(5);
M
Merge  
Michael Kolupaev 已提交
17 18


M
Merge  
Michael Kolupaev 已提交
19 20 21
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
22
	bool attach,
M
Merge  
Michael Kolupaev 已提交
23
	const String & path_, const String & name_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
24
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
25 26 27 28 29 30 31 32
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
	:
M
Merge  
Michael Kolupaev 已提交
33
	context(context_), zookeeper(context.getZooKeeper()),
M
Merge  
Michael Kolupaev 已提交
34
	table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
M
Merge  
Michael Kolupaev 已提交
35
	replica_name(replica_name_),
M
Merge  
Michael Kolupaev 已提交
36 37
	data(	full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
			index_granularity_,mode_, sign_column_, settings_),
M
Merge  
Michael Kolupaev 已提交
38
	reader(data), writer(data), merger(data), fetcher(data),
M
Merge  
Michael Kolupaev 已提交
39
	log(&Logger::get("StorageReplicatedMergeTree: " + table_name))
M
Merge  
Michael Kolupaev 已提交
40
{
M
Merge  
Michael Kolupaev 已提交
41 42 43
	if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
		zookeeper_path.erase(zookeeper_path.end() - 1);
	replica_path = zookeeper_path + "/replicas/" + replica_name;
44

M
Merge  
Michael Kolupaev 已提交
45 46
	if (!attach)
	{
M
Merge  
Michael Kolupaev 已提交
47 48 49 50 51 52 53 54
		if (!zookeeper.exists(zookeeper_path))
			createTable();

		if (!isTableEmpty())
			throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);

		checkTableStructure();
		createReplica();
M
Merge  
Michael Kolupaev 已提交
55 56 57 58
	}
	else
	{
		checkTableStructure();
M
Merge  
Michael Kolupaev 已提交
59
		checkParts();
M
Merge  
Michael Kolupaev 已提交
60
	}
M
Merge  
Michael Kolupaev 已提交
61

M
Merge  
Michael Kolupaev 已提交
62
	loadQueue();
M
Merge  
Michael Kolupaev 已提交
63
	activateReplica();
M
Merge  
Michael Kolupaev 已提交
64

M
Merge  
Michael Kolupaev 已提交
65 66 67
	leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", zookeeper,
		std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);

M
Merge  
Michael Kolupaev 已提交
68 69 70
	queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
	for (size_t i = 0; i < settings_.replication_threads; ++i)
		queue_threads.push_back(std::thread(&StorageReplicatedMergeTree::queueThread, this));
M
Merge  
Michael Kolupaev 已提交
71 72 73 74 75
}

StoragePtr StorageReplicatedMergeTree::create(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
76
	bool attach,
M
Merge  
Michael Kolupaev 已提交
77
	const String & path_, const String & name_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
78
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
79 80 81 82 83 84 85 86
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
{
M
Merge  
Michael Kolupaev 已提交
87 88 89 90
	StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
		path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
		index_granularity_, mode_, sign_column_, settings_);
	StoragePtr res_ptr = res->thisPtr();
M
Merge  
Michael Kolupaev 已提交
91 92 93
	String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
	InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
	res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
M
Merge  
Michael Kolupaev 已提交
94
	return res_ptr;
M
Merge  
Michael Kolupaev 已提交
95 96
}

M
Merge  
Michael Kolupaev 已提交
97 98 99 100 101 102 103 104
static String formattedAST(const ASTPtr & ast)
{
	if (!ast)
		return "";
	std::stringstream ss;
	formatAST(*ast, ss, 0, false, true);
	return ss.str();
}
M
Merge  
Michael Kolupaev 已提交
105

M
Merge  
Michael Kolupaev 已提交
106
void StorageReplicatedMergeTree::createTable()
M
Merge  
Michael Kolupaev 已提交
107
{
M
Merge  
Michael Kolupaev 已提交
108
	zookeeper.create(zookeeper_path, "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
109

M
Merge  
Michael Kolupaev 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
	/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
	std::stringstream metadata;
	metadata << "metadata format version: 1" << std::endl;
	metadata << "date column: " << data.date_column_name << std::endl;
	metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
	metadata << "index granularity: " << data.index_granularity << std::endl;
	metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
	metadata << "sign column: " << data.sign_column << std::endl;
	metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
	metadata << "columns:" << std::endl;
	WriteBufferFromOStream buf(metadata);
	for (auto & it : data.getColumnsList())
	{
		writeBackQuotedString(it.first, buf);
		writeChar(' ', buf);
		writeString(it.second->getName(), buf);
		writeChar('\n', buf);
	}
	buf.next();
M
Merge  
Michael Kolupaev 已提交
129

M
Merge  
Michael Kolupaev 已提交
130
	zookeeper.create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
131

M
Merge  
Michael Kolupaev 已提交
132 133
	zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
	zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
134
	zookeeper.create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
135
	zookeeper.create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
136
	zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
137
}
M
Merge  
Michael Kolupaev 已提交
138

M
Merge  
Michael Kolupaev 已提交
139 140
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
	* Если нет - бросить исключение.
M
Merge  
Michael Kolupaev 已提交
141
	*/
M
Merge  
Michael Kolupaev 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
void StorageReplicatedMergeTree::checkTableStructure()
{
	String metadata_str = zookeeper.get(zookeeper_path + "/metadata");
	ReadBufferFromString buf(metadata_str);
	assertString("metadata format version: 1", buf);
	assertString("\ndate column: ", buf);
	assertString(data.date_column_name, buf);
	assertString("\nsampling expression: ", buf);
	assertString(formattedAST(data.sampling_expression), buf);
	assertString("\nindex granularity: ", buf);
	assertString(toString(data.index_granularity), buf);
	assertString("\nmode: ", buf);
	assertString(toString(static_cast<int>(data.mode)), buf);
	assertString("\nsign column: ", buf);
	assertString(data.sign_column, buf);
	assertString("\nprimary key: ", buf);
	assertString(formattedAST(data.primary_expr_ast), buf);
	assertString("\ncolumns:\n", buf);
	for (auto & it : data.getColumnsList())
	{
		String name;
		readBackQuotedString(name, buf);
		if (name != it.first)
			throw Exception("Unexpected column name in ZooKeeper: expected " + it.first + ", found " + name,
				ErrorCodes::UNKNOWN_IDENTIFIER);
		assertString(" ", buf);
		assertString(it.second->getName(), buf);
		assertString("\n", buf);
	}
M
Merge  
Michael Kolupaev 已提交
171
	assertEOF(buf);
M
Merge  
Michael Kolupaev 已提交
172
}
M
Merge  
Michael Kolupaev 已提交
173

M
Merge  
Michael Kolupaev 已提交
174 175 176 177 178 179 180 181 182
void StorageReplicatedMergeTree::createReplica()
{
	zookeeper.create(replica_path, "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
}
M
Merge  
Michael Kolupaev 已提交
183

M
Merge  
Michael Kolupaev 已提交
184 185 186 187 188
void StorageReplicatedMergeTree::activateReplica()
{
	std::stringstream host;
	host << "host: " << context.getInterserverIOHost() << std::endl;
	host << "port: " << context.getInterserverIOPort() << std::endl;
M
Merge  
Michael Kolupaev 已提交
189

M
Merge  
Michael Kolupaev 已提交
190
	/// Одновременно объявим, что эта реплика активна, и обновим хост.
M
Merge  
Michael Kolupaev 已提交
191 192 193
	zkutil::Ops ops;
	ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper.getDefaultACL(), zkutil::CreateMode::Ephemeral));
	ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
M
Merge  
Michael Kolupaev 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206

	try
	{
		zookeeper.multi(ops);
	}
	catch (zkutil::KeeperException & e)
	{
		if (e.code == zkutil::ReturnCode::NodeExists)
			throw Exception("Replica " + replica_path + " appears to be already active. If you're sure it's not, "
				"try again in a minute or remove znode " + replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);

		throw;
	}
M
Merge  
Michael Kolupaev 已提交
207

M
Merge  
Michael Kolupaev 已提交
208 209
	replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", zookeeper);
}
M
Merge  
Michael Kolupaev 已提交
210

M
Merge  
Michael Kolupaev 已提交
211 212 213 214 215 216 217 218 219 220
bool StorageReplicatedMergeTree::isTableEmpty()
{
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
	for (const auto & replica : replicas)
	{
		if (!zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
			return false;
	}
	return true;
}
M
Merge  
Michael Kolupaev 已提交
221

M
Merge  
Michael Kolupaev 已提交
222 223 224 225 226
void StorageReplicatedMergeTree::checkParts()
{
	Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
	NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());

M
Merge  
Michael Kolupaev 已提交
227
	MergeTreeData::DataParts parts = data.getAllDataParts();
M
Merge  
Michael Kolupaev 已提交
228

M
Merge  
Michael Kolupaev 已提交
229
	MergeTreeData::DataParts unexpected_parts;
M
Merge  
Michael Kolupaev 已提交
230 231 232 233 234 235 236 237
	for (const auto & part : parts)
	{
		if (expected_parts.count(part->name))
		{
			expected_parts.erase(part->name);
		}
		else
		{
M
Merge  
Michael Kolupaev 已提交
238
			unexpected_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
239 240 241
		}
	}

M
Merge  
Michael Kolupaev 已提交
242 243 244 245 246 247 248 249 250
	/// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим.
	MergeTreeData::DataPartsVector parts_to_add;
	for (const String & missing_name : expected_parts)
	{
		auto containing = data.getContainingPart(missing_name);
		if (!containing)
			throw Exception("Not found " + toString(expected_parts.size())
				+ " parts (including " + missing_name + ") in table " + data.getTableName(),
				ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
M
Merge  
Michael Kolupaev 已提交
251
		LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
M
Merge  
Michael Kolupaev 已提交
252 253 254 255 256 257
		if (unexpected_parts.count(containing))
		{
			parts_to_add.push_back(containing);
			unexpected_parts.erase(containing);
		}
	}
M
Merge  
Michael Kolupaev 已提交
258

M
Merge  
Michael Kolupaev 已提交
259 260 261 262 263
	if (parts_to_add.size() > 2 ||
		unexpected_parts.size() > 2 ||
		expected_parts.size() > 20)
	{
		throw Exception("The local set of parts of table " + data.getTableName() + " doesn't look like the set of parts in ZooKeeper."
M
Merge  
Michael Kolupaev 已提交
264
			" There are " + toString(unexpected_parts.size()) + " unexpected parts, "
M
Merge  
Michael Kolupaev 已提交
265 266
			+ toString(parts_to_add.size()) + " unexpectedly merged parts, "
			+ toString(expected_parts.size()) + " unexpectedly obsolete parts",
M
Merge  
Michael Kolupaev 已提交
267
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
M
Merge  
Michael Kolupaev 已提交
268 269 270 271 272
	}

	/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
	for (MergeTreeData::DataPartPtr part : parts_to_add)
	{
M
Merge  
Michael Kolupaev 已提交
273
		LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
M
Merge  
Michael Kolupaev 已提交
274 275 276 277
		zkutil::Ops ops;
		checkPartAndAddToZooKeeper(part, ops);
		zookeeper.multi(ops);
	}
M
Merge  
Michael Kolupaev 已提交
278

M
Merge  
Michael Kolupaev 已提交
279 280 281 282 283 284 285 286 287 288
	/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
	for (const String & name : expected_parts)
	{
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
		zookeeper.multi(ops);
	}

	/// Удалим лишние локальные куски.
M
Merge  
Michael Kolupaev 已提交
289 290 291
	for (MergeTreeData::DataPartPtr part : unexpected_parts)
	{
		LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name);
M
Merge  
Michael Kolupaev 已提交
292
		data.renameAndDetachPart(part, "ignored_");
M
Merge  
Michael Kolupaev 已提交
293 294
	}
}
M
Merge  
Michael Kolupaev 已提交
295

M
Merge  
Michael Kolupaev 已提交
296 297 298 299 300
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
{
	String another_replica = findReplicaHavingPart(part->name, false);
	if (!another_replica.empty())
	{
M
Merge  
Michael Kolupaev 已提交
301 302 303 304 305 306
		String checksums_str;
		if (zookeeper.tryGet(zookeeper_path + "/replicas/" + another_replica + "/parts/" + part->name + "/checksums", checksums_str))
		{
			auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
			checksums.checkEqual(part->checksums, true);
		}
M
Merge  
Michael Kolupaev 已提交
307 308 309 310 311 312 313 314 315 316 317 318 319 320
	}

	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name,
		"",
		zookeeper.getDefaultACL(),
		zkutil::CreateMode::Persistent));
	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name + "/checksums",
		part->checksums.toString(),
		zookeeper.getDefaultACL(),
		zkutil::CreateMode::Persistent));
}

M
Merge  
Michael Kolupaev 已提交
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
void StorageReplicatedMergeTree::clearOldParts()
{
	Strings parts = data.clearOldParts();

	for (const String & name : parts)
	{
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
		zkutil::ReturnCode::type code = zookeeper.tryMulti(ops);
		if (code != zkutil::ReturnCode::Ok)
			LOG_WARNING(log, "Couldn't remove part " << name << " from ZooKeeper: " << zkutil::ReturnCode::toString(code));
	}

	if (!parts.empty())
		LOG_DEBUG(log, "Removed " << parts.size() << " old parts");
}

M
Merge  
Michael Kolupaev 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
void StorageReplicatedMergeTree::clearOldLogs()
{
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
	UInt64 min_pointer = std::numeric_limits<UInt64>::max();
	for (const String & replica : replicas)
	{
		String pointer;
		if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log_pointers/" + replica_name, pointer))
			return;
		min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
	}

	Strings entries = zookeeper.getChildren(replica_path + "/log");
	std::sort(entries.begin(), entries.end());
	size_t removed = 0;

	for (const String & entry : entries)
	{
		UInt64 index = parse<UInt64>(entry.substr(strlen("log-")));
		if (index >= min_pointer)
			break;
		zookeeper.remove(replica_path + "/log/" + entry);
		++removed;
	}

	if (removed > 0)
		LOG_DEBUG(log, "Removed " << removed << " old log entries");
}

void StorageReplicatedMergeTree::clearOldBlocks()
{
	zkutil::Stat stat;
	if (!zookeeper.exists(zookeeper_path + "/blocks", &stat))
		throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);

M
Merge  
Michael Kolupaev 已提交
374 375
	int children_count = stat.getnumChildren();

M
Merge  
Michael Kolupaev 已提交
376
	/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
M
Merge  
Michael Kolupaev 已提交
377
	if (static_cast<double>(children_count) < data.settings.replicated_deduplication_window * 1.1)
M
Merge  
Michael Kolupaev 已提交
378 379
		return;

M
Merge  
Michael Kolupaev 已提交
380 381 382
	LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
		<< " old blocks from ZooKeeper");

M
Merge  
Michael Kolupaev 已提交
383 384 385 386 387 388 389 390 391 392 393
	Strings blocks = zookeeper.getChildren(zookeeper_path + "/blocks");

	std::vector<std::pair<Int64, String> > timed_blocks;

	for (const String & block : blocks)
	{
		zkutil::Stat stat;
		zookeeper.exists(zookeeper_path + "/blocks/" + block, &stat);
		timed_blocks.push_back(std::make_pair(stat.getczxid(), block));
	}

M
Merge  
Michael Kolupaev 已提交
394
	std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
M
Merge  
Michael Kolupaev 已提交
395 396
	for (size_t i = data.settings.replicated_deduplication_window; i <  timed_blocks.size(); ++i)
	{
M
Merge  
Michael Kolupaev 已提交
397 398 399 400 401
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
		zookeeper.multi(ops);
M
Merge  
Michael Kolupaev 已提交
402 403 404 405 406
	}

	LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}

M
Merge  
Michael Kolupaev 已提交
407 408 409 410 411 412 413 414
void StorageReplicatedMergeTree::loadQueue()
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

	Strings children = zookeeper.getChildren(replica_path + "/queue");
	std::sort(children.begin(), children.end());
	for (const String & child : children)
	{
M
Merge  
Michael Kolupaev 已提交
415
		String s = zookeeper.get(replica_path + "/queue/" + child);
M
Merge  
Michael Kolupaev 已提交
416 417
		LogEntry entry = LogEntry::parse(s);
		entry.znode_name = child;
M
Merge  
Michael Kolupaev 已提交
418
		entry.tagPartsAsCurrentlyMerging(*this);
M
Merge  
Michael Kolupaev 已提交
419
		queue.push_back(entry);
M
Merge  
Michael Kolupaev 已提交
420 421 422 423 424 425
	}
}

void StorageReplicatedMergeTree::pullLogsToQueue()
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
426

M
Merge  
Michael Kolupaev 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457
	/// Сольем все логи в хронологическом порядке.

	struct LogIterator
	{
		String replica;		/// Имя реплики.
		UInt64 index;		/// Номер записи в логе (суффикс имени ноды).

		Int64 timestamp;	/// Время (czxid) создания записи в логе.
		String entry_str;	/// Сама запись.

		bool operator<(const LogIterator & rhs) const
		{
			return timestamp < rhs.timestamp;
		}

		bool readEntry(zkutil::ZooKeeper & zookeeper, const String & zookeeper_path)
		{
			String index_str = toString(index);
			while (index_str.size() < 10)
				index_str = '0' + index_str;
			zkutil::Stat stat;
			if (!zookeeper.tryGet(zookeeper_path + "/replicas/" + replica + "/log/log-" + index_str, entry_str, &stat))
				return false;
			timestamp = stat.getczxid();
			return true;
		}
	};

	typedef std::priority_queue<LogIterator> PriorityQueue;
	PriorityQueue priority_queue;

M
Merge  
Michael Kolupaev 已提交
458
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
459

M
Merge  
Michael Kolupaev 已提交
460 461
	for (const String & replica : replicas)
	{
M
Merge  
Michael Kolupaev 已提交
462 463
		String index_str;
		UInt64 index;
M
Merge  
Michael Kolupaev 已提交
464

M
Merge  
Michael Kolupaev 已提交
465
		if (zookeeper.tryGet(replica_path + "/log_pointers/" + replica, index_str))
M
Merge  
Michael Kolupaev 已提交
466
		{
M
Michael Kolupaev 已提交
467
			index = parse<UInt64>(index_str);
M
Merge  
Michael Kolupaev 已提交
468 469 470 471
		}
		else
		{
			/// Если у нас еще нет указателя на лог этой реплики, поставим указатель на первую запись в нем.
M
Merge  
Michael Kolupaev 已提交
472
			Strings entries = zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/log");
M
Merge  
Michael Kolupaev 已提交
473
			std::sort(entries.begin(), entries.end());
M
Michael Kolupaev 已提交
474
			index = entries.empty() ? 0 : parse<UInt64>(entries[0].substr(strlen("log-")));
M
Merge  
Michael Kolupaev 已提交
475

M
Merge  
Michael Kolupaev 已提交
476
			zookeeper.create(replica_path + "/log_pointers/" + replica, toString(index), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
477 478
		}

M
Merge  
Michael Kolupaev 已提交
479 480 481 482 483 484 485
		LogIterator iterator;
		iterator.replica = replica;
		iterator.index = index;

		if (iterator.readEntry(zookeeper, zookeeper_path))
			priority_queue.push(iterator);
	}
M
Merge  
Michael Kolupaev 已提交
486

M
Merge  
Michael Kolupaev 已提交
487 488
	size_t count = 0;

M
Merge  
Michael Kolupaev 已提交
489 490 491 492
	while (!priority_queue.empty())
	{
		LogIterator iterator = priority_queue.top();
		priority_queue.pop();
M
Merge  
Michael Kolupaev 已提交
493
		++count;
M
Merge  
Michael Kolupaev 已提交
494

M
Merge  
Michael Kolupaev 已提交
495
		LogEntry entry = LogEntry::parse(iterator.entry_str);
M
Merge  
Michael Kolupaev 已提交
496

M
Merge  
Michael Kolupaev 已提交
497 498 499 500 501 502 503 504 505
		/// Одновременно добавим запись в очередь и продвинем указатель на лог.
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Create(
			replica_path + "/queue/queue-", iterator.entry_str, zookeeper.getDefaultACL(), zkutil::CreateMode::PersistentSequential));
		ops.push_back(new zkutil::Op::SetData(
			replica_path + "/log_pointers/" + iterator.replica, toString(iterator.index + 1), -1));
		auto results = zookeeper.multi(ops);

		String path_created = dynamic_cast<zkutil::OpResult::Create &>((*results)[0]).getPathCreated();
M
Merge  
Michael Kolupaev 已提交
506
		entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
M
Merge  
Michael Kolupaev 已提交
507
		entry.tagPartsAsCurrentlyMerging(*this);
M
Merge  
Michael Kolupaev 已提交
508 509 510 511 512
		queue.push_back(entry);

		++iterator.index;
		if (iterator.readEntry(zookeeper, zookeeper_path))
			priority_queue.push(iterator);
M
Merge  
Michael Kolupaev 已提交
513
	}
M
Merge  
Michael Kolupaev 已提交
514 515 516

	if (count > 0)
		LOG_DEBUG(log, "Pulled " << count << " entries to queue");
M
Merge  
Michael Kolupaev 已提交
517 518
}

M
Merge  
Michael Kolupaev 已提交
519
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
M
Merge  
Michael Kolupaev 已提交
520
{
M
Merge  
Michael Kolupaev 已提交
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538
	if (entry.type == LogEntry::MERGE_PARTS)
	{
		/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
		  * Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
		  * Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
		  * Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
		  */
		for (const auto & name : entry.parts_to_merge)
		{
			if (future_parts.count(name))
			{
				LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " yet because part " << name << " is not ready yet.");
				return false;
			}
		}
	}

	return true;
M
Merge  
Michael Kolupaev 已提交
539 540 541 542 543 544 545 546 547 548 549 550
}

void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
{
	if (entry.type == LogEntry::GET_PART ||
		entry.type == LogEntry::MERGE_PARTS)
	{
		/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
		MergeTreeData::DataPartPtr containing_part = data.getContainingPart(entry.new_part_name);

		/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
		if (containing_part && zookeeper.exists(replica_path + "/parts/" + containing_part->name))
M
Merge  
Michael Kolupaev 已提交
551
		{
M
Merge  
Michael Kolupaev 已提交
552 553
			if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
				LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
M
Merge  
Michael Kolupaev 已提交
554
			return;
M
Merge  
Michael Kolupaev 已提交
555
		}
M
Merge  
Michael Kolupaev 已提交
556 557
	}

M
Merge  
Michael Kolupaev 已提交
558 559 560 561 562
	if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
		LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist. This is a bug.");

	bool do_fetch = false;

M
Merge  
Michael Kolupaev 已提交
563 564
	if (entry.type == LogEntry::GET_PART)
	{
M
Merge  
Michael Kolupaev 已提交
565
		do_fetch = true;
M
Merge  
Michael Kolupaev 已提交
566 567 568
	}
	else if (entry.type == LogEntry::MERGE_PARTS)
	{
M
Merge  
Michael Kolupaev 已提交
569
		MergeTreeData::DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
570
		bool have_all_parts = true;;
M
Merge  
Michael Kolupaev 已提交
571 572 573
		for (const String & name : entry.parts_to_merge)
		{
			MergeTreeData::DataPartPtr part = data.getContainingPart(name);
M
Merge  
Michael Kolupaev 已提交
574 575 576 577 578 579 580 581 582 583 584 585
			if (!part)
			{
				have_all_parts = false;
				break;
			}
			if (part->name != name)
			{
				LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
					<< " but should be merged into " << entry.new_part_name);
				have_all_parts = false;
				break;
			}
M
Merge  
Michael Kolupaev 已提交
586 587
			parts.push_back(part);
		}
M
Merge  
Michael Kolupaev 已提交
588

M
Merge  
Michael Kolupaev 已提交
589
		if (!have_all_parts)
M
Merge  
Michael Kolupaev 已提交
590
		{
M
Merge  
Michael Kolupaev 已提交
591 592 593
			/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
			do_fetch = true;
			LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
M
Merge  
Michael Kolupaev 已提交
594
		}
M
Merge  
Michael Kolupaev 已提交
595 596 597 598 599
		else
		{
			MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);

			zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
600
			checkPartAndAddToZooKeeper(part, ops);
M
Merge  
Michael Kolupaev 已提交
601 602 603 604 605

			zookeeper.multi(ops);

			ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
		}
M
Merge  
Michael Kolupaev 已提交
606 607 608 609 610
	}
	else
	{
		throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
	}
M
Merge  
Michael Kolupaev 已提交
611 612 613 614 615

	if (do_fetch)
	{
		try
		{
M
Merge  
Michael Kolupaev 已提交
616 617 618 619 620 621
			String replica = findReplicaHavingPart(entry.new_part_name, true);
			if (replica.empty())
			{
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
				throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
			}
M
Merge  
Michael Kolupaev 已提交
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681
			fetchPart(entry.new_part_name, replica);

			if (entry.type == LogEntry::MERGE_PARTS)
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
		}
		catch (...)
		{
			/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
			  * а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
			  * для этого мерджа в конец очереди.
			  *
			  */
			try
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

				/// Найдем действие по объединению этого куска с другими. Запомним других.
				StringSet parts_for_merge;
				LogEntries::iterator merge_entry;
				for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
				{
					if (it->type == LogEntry::MERGE_PARTS)
					{
						if (std::find(it->parts_to_merge.begin(), it->parts_to_merge.end(), entry.new_part_name)
							!= it->parts_to_merge.end())
						{
							parts_for_merge = StringSet(it->parts_to_merge.begin(), it->parts_to_merge.end());
							merge_entry = it;
							break;
						}
					}
				}

				if (!parts_for_merge.empty())
				{
					/// Переместим в конец очереди действия, получающие parts_for_merge.
					for (LogEntries::iterator it = queue.begin(); it != queue.end();)
					{
						auto it0 = it;
						++it;

						if (it0 == merge_entry)
							break;

						if ((it0->type == LogEntry::MERGE_PARTS || it0->type == LogEntry::GET_PART)
							&& parts_for_merge.count(it0->new_part_name))
						{
							queue.splice(queue.end(), queue, it0, it);
						}
					}
				}
			}
			catch (...)
			{
				tryLogCurrentException(__PRETTY_FUNCTION__);
			}

			throw;
		}
	}
M
Merge  
Michael Kolupaev 已提交
682 683 684 685 686 687
}

void StorageReplicatedMergeTree::queueUpdatingThread()
{
	while (!shutdown_called)
	{
M
Merge  
Michael Kolupaev 已提交
688 689 690
		try
		{
			pullLogsToQueue();
M
Merge  
Michael Kolupaev 已提交
691 692

			clearOldParts();
M
Merge  
Michael Kolupaev 已提交
693 694 695 696 697 698 699

			/// Каждую минуту выбрасываем ненужные записи из лога.
			if (time(0) - clear_old_logs_time > 60)
			{
				clear_old_logs_time = time(0);
				clearOldLogs();
			}
M
Merge  
Michael Kolupaev 已提交
700 701 702 703 704 705
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
706 707
		std::this_thread::sleep_for(QUEUE_UPDATE_SLEEP);
	}
M
Merge  
Michael Kolupaev 已提交
708
}
M
Merge  
Michael Kolupaev 已提交
709

M
Merge  
Michael Kolupaev 已提交
710 711 712 713 714
void StorageReplicatedMergeTree::queueThread()
{
	while (!shutdown_called)
	{
		LogEntry entry;
M
Merge  
Michael Kolupaev 已提交
715
		bool have_work = false;
M
Merge  
Michael Kolupaev 已提交
716

M
Merge  
Michael Kolupaev 已提交
717
		try
M
Merge  
Michael Kolupaev 已提交
718 719
		{
			Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
720
			bool empty = queue.empty();
M
Merge  
Michael Kolupaev 已提交
721 722
			if (!empty)
			{
M
Merge  
Michael Kolupaev 已提交
723 724 725 726 727 728 729 730 731 732 733
				for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
				{
					if (shouldExecuteLogEntry(*it))
					{
						entry = *it;
						entry.tagPartAsFuture(*this);
						queue.erase(it);
						have_work = true;
						break;
					}
				}
M
Merge  
Michael Kolupaev 已提交
734 735
			}
		}
M
Merge  
Michael Kolupaev 已提交
736 737 738 739
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
740

M
Merge  
Michael Kolupaev 已提交
741
		if (!have_work)
M
Merge  
Michael Kolupaev 已提交
742 743 744 745
		{
			std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
			continue;
		}
M
Merge  
Michael Kolupaev 已提交
746

M
Merge  
Michael Kolupaev 已提交
747
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
748

M
Merge  
Michael Kolupaev 已提交
749 750 751
		try
		{
			executeLogEntry(entry);
M
Merge  
Michael Kolupaev 已提交
752 753 754 755 756

			auto code = zookeeper.tryRemove(replica_path + "/queue/" + entry.znode_name);
			if (code != zkutil::ReturnCode::Ok)
				LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
					<< zkutil::ReturnCode::toString(code) + ". There must be a bug somewhere. Ignoring it.");
M
Merge  
Michael Kolupaev 已提交
757 758 759

			success = true;
		}
M
Michael Kolupaev 已提交
760 761 762 763 764 765 766 767
		catch (Exception & e)
		{
			if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
				/// Если ни у кого нет нужного куска, это нормальная ситуация; не будем писать в лог с уровнем Error.
				LOG_INFO(log, e.displayText());
			else
				tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
768 769
		catch (...)
		{
M
Merge  
Michael Kolupaev 已提交
770
			tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
771 772 773 774 775 776 777
		}

		if (shutdown_called)
			break;

		if (success)
		{
M
Michael Kolupaev 已提交
778
			entry.currently_merging_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
779 780 781 782 783 784
			std::this_thread::sleep_for(QUEUE_AFTER_WORK_SLEEP);
		}
		else
		{
			{
				/// Добавим действие, которое не получилось выполнить, в конец очереди.
M
Merge  
Michael Kolupaev 已提交
785
				entry.future_part_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
786 787 788
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
				queue.push_back(entry);
			}
M
Michael Kolupaev 已提交
789
			entry.currently_merging_tagger = nullptr;
M
Merge  
Michael Kolupaev 已提交
790 791 792 793 794
			std::this_thread::sleep_for(QUEUE_ERROR_SLEEP);
		}
	}
}

M
Merge  
Michael Kolupaev 已提交
795 796 797 798 799 800
void StorageReplicatedMergeTree::mergeSelectingThread()
{
	pullLogsToQueue();

	while (!shutdown_called && is_leader_node)
	{
M
Michael Kolupaev 已提交
801
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
802

M
Michael Kolupaev 已提交
803
		try
M
Merge  
Michael Kolupaev 已提交
804
		{
M
Michael Kolupaev 已提交
805
			size_t merges_queued = 0;
M
Merge  
Michael Kolupaev 已提交
806

M
Michael Kolupaev 已提交
807 808
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
809

M
Michael Kolupaev 已提交
810 811 812 813
				for (const auto & entry : queue)
					if (entry.type == LogEntry::MERGE_PARTS)
						++merges_queued;
			}
M
Merge  
Michael Kolupaev 已提交
814

M
Michael Kolupaev 已提交
815 816 817 818 819
			if (merges_queued >= data.settings.merging_threads)
			{
				std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
				continue;
			}
M
Merge  
Michael Kolupaev 已提交
820

M
Michael Kolupaev 已提交
821 822
			/// Есть ли активный мердж крупных кусков.
			bool has_big_merge = false;
M
Merge  
Michael Kolupaev 已提交
823 824

			{
M
Michael Kolupaev 已提交
825 826 827
				Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);

				for (const auto & name : currently_merging)
M
Merge  
Michael Kolupaev 已提交
828
				{
M
Michael Kolupaev 已提交
829 830 831 832
					MergeTreeData::DataPartPtr part = data.getContainingPart(name);
					if (!part)
						continue;
					if (part->name != name)
M
Michael Kolupaev 已提交
833 834 835 836
					{
						LOG_INFO(log, "currently_merging contains obsolete part " << name << " contained in" << part->name);
						continue;
					}
M
Michael Kolupaev 已提交
837 838 839 840 841
					if (part->size * data.index_granularity > 25 * 1024 * 1024)
					{
						has_big_merge = true;
						break;
					}
M
Merge  
Michael Kolupaev 已提交
842 843 844 845
				}
			}

			MergeTreeData::DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
846

M
Merge  
Michael Kolupaev 已提交
847
			{
M
Merge  
Michael Kolupaev 已提交
848
				Poco::ScopedLock<Poco::FastMutex> lock(currently_merging_mutex);
M
Merge  
Michael Kolupaev 已提交
849

M
Merge  
Michael Kolupaev 已提交
850 851 852 853 854 855
				String merged_name;
				auto can_merge = std::bind(
					&StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);

				if (merger.selectPartsToMerge(parts, merged_name, 0, false, false, has_big_merge, can_merge) ||
					merger.selectPartsToMerge(parts, merged_name, 0,  true, false, has_big_merge, can_merge))
M
Merge  
Michael Kolupaev 已提交
856
				{
M
Merge  
Michael Kolupaev 已提交
857 858
					LogEntry entry;
					entry.type = LogEntry::MERGE_PARTS;
M
Merge  
Michael Kolupaev 已提交
859
					entry.source_replica = replica_name;
M
Merge  
Michael Kolupaev 已提交
860 861 862 863 864 865 866 867 868 869
					entry.new_part_name = merged_name;

					for (const auto & part : parts)
					{
						entry.parts_to_merge.push_back(part->name);
					}

					zookeeper.create(replica_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);

					success = true;
M
Merge  
Michael Kolupaev 已提交
870
				}
M
Merge  
Michael Kolupaev 已提交
871
			}
M
Merge  
Michael Kolupaev 已提交
872

M
Merge  
Michael Kolupaev 已提交
873 874 875
			/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
			///  (чтобы куски пометились как currently_merging).
			pullLogsToQueue();
M
Merge  
Michael Kolupaev 已提交
876

M
Merge  
Michael Kolupaev 已提交
877 878 879 880 881 882 883 884 885
			for (size_t i = 0; i + 1 < parts.size(); ++i)
			{
				/// Уберем больше не нужные отметки о несуществующих блоках.
				for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
				{
					String number_str = toString(number);
					while (number_str.size() < 10)
						number_str = '0' + number_str;
					String path = zookeeper_path + "/block_numbers/block-" + number_str;
M
Merge  
Michael Kolupaev 已提交
886

M
Merge  
Michael Kolupaev 已提交
887 888
					zookeeper.tryRemove(path);
				}
M
Merge  
Michael Kolupaev 已提交
889 890 891 892 893 894 895
			}
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
896
		if (shutdown_called || !is_leader_node)
M
Merge  
Michael Kolupaev 已提交
897 898
			break;

M
Merge  
Michael Kolupaev 已提交
899 900 901 902 903 904 905 906 907 908
		if (!success)
			std::this_thread::sleep_for(MERGE_SELECTING_SLEEP);
	}
}

void StorageReplicatedMergeTree::clearOldBlocksThread()
{
	while (!shutdown_called && is_leader_node)
	{
		try
M
Merge  
Michael Kolupaev 已提交
909 910 911
		{
			clearOldBlocks();
		}
M
Merge  
Michael Kolupaev 已提交
912 913 914 915
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
916

M
Merge  
Michael Kolupaev 已提交
917 918 919 920 921 922 923 924
		/// Спим минуту, но проверяем, нужно ли завершиться, каждую секунду.
		/// TODO: Лучше во всех подобных местах использовать condition variable.
		for (size_t i = 0; i < 60; ++i)
		{
			if (shutdown_called || !is_leader_node)
				break;
			std::this_thread::sleep_for(std::chrono::seconds(1));
		}
M
Merge  
Michael Kolupaev 已提交
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
	}
}

bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
	if (currently_merging.count(left->name) || currently_merging.count(right->name))
		return false;

	/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
	for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
	{
		String number_str = toString(number);
		while (number_str.size() < 10)
			number_str = '0' + number_str;
		String path = zookeeper_path + "/block_numbers/block-" + number_str;

		if (AbandonableLockInZooKeeper::check(path, zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
M
Merge  
Michael Kolupaev 已提交
942 943
		{
			LOG_DEBUG(log, "Can't merge parts " << left->name << " and " << right->name << " because block " << path << " exists");
M
Merge  
Michael Kolupaev 已提交
944
			return false;
M
Merge  
Michael Kolupaev 已提交
945
		}
M
Merge  
Michael Kolupaev 已提交
946 947 948 949 950 951 952
	}

	return true;
}

void StorageReplicatedMergeTree::becomeLeader()
{
M
Merge  
Michael Kolupaev 已提交
953
	LOG_INFO(log, "Became leader");
M
Merge  
Michael Kolupaev 已提交
954 955
	is_leader_node = true;
	merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
M
Merge  
Michael Kolupaev 已提交
956
	clear_old_blocks_thread = std::thread(&StorageReplicatedMergeTree::clearOldBlocksThread, this);
M
Merge  
Michael Kolupaev 已提交
957 958
}

M
Merge  
Michael Kolupaev 已提交
959
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
M
Merge  
Michael Kolupaev 已提交
960 961 962 963 964 965 966 967 968
{
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");

	/// Из реплик, у которых есть кусок, выберем одну равновероятно.
	std::random_shuffle(replicas.begin(), replicas.end());

	for (const String & replica : replicas)
	{
		if (zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
M
Merge  
Michael Kolupaev 已提交
969
			(!active || zookeeper.exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
M
Merge  
Michael Kolupaev 已提交
970 971 972
			return replica;
	}

M
Merge  
Michael Kolupaev 已提交
973
	return "";
M
Merge  
Michael Kolupaev 已提交
974 975 976 977
}

void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_name)
{
M
Merge  
Michael Kolupaev 已提交
978 979
	LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_name);

M
Merge  
Michael Kolupaev 已提交
980 981 982 983 984 985 986 987 988 989 990 991 992 993
	auto table_lock = lockStructure(true);

	String host;
	int port;

	String host_port_str = zookeeper.get(zookeeper_path + "/replicas/" + replica_name + "/host");
	ReadBufferFromString buf(host_port_str);
	assertString("host: ", buf);
	readString(host, buf);
	assertString("\nport: ", buf);
	readText(port, buf);
	assertString("\n", buf);
	assertEOF(buf);

M
Merge  
Michael Kolupaev 已提交
994
	MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
M
Merge  
Michael Kolupaev 已提交
995 996
	auto removed_parts = data.renameTempPartAndReplace(part);

M
Merge  
Michael Kolupaev 已提交
997
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
998
	checkPartAndAddToZooKeeper(part, ops);
M
Michael Kolupaev 已提交
999

M
Merge  
Michael Kolupaev 已提交
1000 1001
	zookeeper.multi(ops);

M
Michael Kolupaev 已提交
1002 1003 1004 1005 1006 1007
	for (const auto & removed_part : removed_parts)
	{
		LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
		ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
	}

M
Merge  
Michael Kolupaev 已提交
1008 1009
	ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);

M
Merge  
Michael Kolupaev 已提交
1010
	LOG_DEBUG(log, "Fetched part");
M
Merge  
Michael Kolupaev 已提交
1011
}
M
Merge  
Michael Kolupaev 已提交
1012

M
Merge  
Michael Kolupaev 已提交
1013 1014 1015 1016
void StorageReplicatedMergeTree::shutdown()
{
	if (shutdown_called)
		return;
M
Merge  
Michael Kolupaev 已提交
1017
	leader_election = nullptr;
M
Merge  
Michael Kolupaev 已提交
1018 1019 1020 1021
	shutdown_called = true;
	replica_is_active_node = nullptr;
	endpoint_holder = nullptr;

M
Merge  
Michael Kolupaev 已提交
1022
	LOG_TRACE(log, "Waiting for threads to finish");
M
Merge  
Michael Kolupaev 已提交
1023
	if (is_leader_node)
M
Merge  
Michael Kolupaev 已提交
1024
	{
M
Merge  
Michael Kolupaev 已提交
1025
		merge_selecting_thread.join();
M
Merge  
Michael Kolupaev 已提交
1026 1027
		clear_old_blocks_thread.join();
	}
M
Merge  
Michael Kolupaev 已提交
1028 1029 1030 1031
	queue_updating_thread.join();
	for (auto & thread : queue_threads)
		thread.join();
	LOG_TRACE(log, "Threads finished");
M
Merge  
Michael Kolupaev 已提交
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
}

StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
	try
	{
		shutdown();
	}
	catch(...)
	{
		tryLogCurrentException("~StorageReplicatedMergeTree");
	}
}

BlockInputStreams StorageReplicatedMergeTree::read(
		const Names & column_names,
		ASTPtr query,
		const Settings & settings,
		QueryProcessingStage::Enum & processed_stage,
		size_t max_block_size,
		unsigned threads)
{
	return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
}

M
Merge  
Michael Kolupaev 已提交
1057 1058 1059 1060 1061 1062 1063 1064
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
	String insert_id;
	if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*query))
		insert_id = insert->insert_id;

	return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
M
Merge  
Michael Kolupaev 已提交
1065 1066 1067

void StorageReplicatedMergeTree::drop()
{
M
Merge  
Michael Kolupaev 已提交
1068 1069
	shutdown();

M
Merge  
Michael Kolupaev 已提交
1070 1071 1072 1073 1074 1075
	replica_is_active_node = nullptr;
	zookeeper.removeRecursive(replica_path);
	if (zookeeper.getChildren(zookeeper_path + "/replicas").empty())
		zookeeper.removeRecursive(zookeeper_path);
}

M
Merge  
Michael Kolupaev 已提交
1076 1077 1078
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
	writeString("format version: 1\n", out);
M
Merge  
Michael Kolupaev 已提交
1079 1080 1081
	writeString("source replica: ", out);
	writeString(source_replica, out);
	writeString("\n", out);
M
Merge  
Michael Kolupaev 已提交
1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
	switch (type)
	{
		case GET_PART:
			writeString("get\n", out);
			writeString(new_part_name, out);
			break;
		case MERGE_PARTS:
			writeString("merge\n", out);
			for (const String & s : parts_to_merge)
			{
				writeString(s, out);
				writeString("\n", out);
			}
			writeString("into\n", out);
			writeString(new_part_name, out);
			break;
	}
	writeString("\n", out);
}

void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
	String type_str;

	assertString("format version: 1\n", in);
M
Michael Kolupaev 已提交
1107
	assertString("source replica: ", in);
M
Merge  
Michael Kolupaev 已提交
1108
	readString(source_replica, in);
M
Merge  
Michael Kolupaev 已提交
1109
	assertString("\n", in);
M
Michael Kolupaev 已提交
1110 1111
	readString(type_str, in);
	assertString("\n", in);
M
Merge  
Michael Kolupaev 已提交
1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134

	if (type_str == "get")
	{
		type = GET_PART;
		readString(new_part_name, in);
	}
	else if (type_str == "merge")
	{
		type = MERGE_PARTS;
		while (true)
		{
			String s;
			readString(s, in);
			assertString("\n", in);
			if (s == "into")
				break;
			parts_to_merge.push_back(s);
		}
		readString(new_part_name, in);
	}
	assertString("\n", in);
}

M
Merge  
Michael Kolupaev 已提交
1135
}