StorageReplicatedMergeTree.cpp 67.1 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/StorageReplicatedMergeTree.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
M
Merge  
Michael Kolupaev 已提交
4
#include <DB/Storages/MergeTree/MergeTreePartChecker.h>
M
Merge  
Michael Kolupaev 已提交
5 6 7
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
M
Merge  
Michael Kolupaev 已提交
8
#include <DB/Interpreters/InterpreterAlterQuery.h>
M
Merge  
Michael Kolupaev 已提交
9
#include <time.h>
M
Merge  
Michael Kolupaev 已提交
10 11 12 13

namespace DB
{

M
Merge  
Michael Kolupaev 已提交
14

M
Merge  
Michael Kolupaev 已提交
15
const auto ERROR_SLEEP_MS = 1000;
M
Merge  
Michael Kolupaev 已提交
16
const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
M
Merge  
Michael Kolupaev 已提交
17
const auto CLEANUP_SLEEP_MS = 30 * 1000;
M
Merge  
Michael Kolupaev 已提交
18

M
Merge  
Michael Kolupaev 已提交
19 20 21 22 23 24 25 26 27
/// Преобразовать число в строку формате суффиксов автоинкрементных нод в ZooKeeper.
static String padIndex(UInt64 index)
{
	String index_str = toString(index);
	while (index_str.size() < 10)
		index_str = '0' + index_str;
	return index_str;
}

M
Merge  
Michael Kolupaev 已提交
28

M
Merge  
Michael Kolupaev 已提交
29 30 31
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
32
	bool attach,
M
Merge  
Michael Kolupaev 已提交
33 34
	const String & path_, const String & database_name_, const String & name_,
	NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
35
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
36 37 38 39 40 41 42 43
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
	:
44
	context(context_), zookeeper(context.getZooKeeper()), database_name(database_name_),
M
Merge  
Michael Kolupaev 已提交
45
	table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'), zookeeper_path(zookeeper_path_),
M
Merge  
Michael Kolupaev 已提交
46
	replica_name(replica_name_),
M
Merge  
Michael Kolupaev 已提交
47
	data(	full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
M
Merge  
Michael Kolupaev 已提交
48 49
			index_granularity_, mode_, sign_column_, settings_, database_name_ + "." + table_name, true,
			std::bind(&StorageReplicatedMergeTree::enqueuePartForCheck, this, std::placeholders::_1)),
M
Merge  
Michael Kolupaev 已提交
50
	reader(data), writer(data), merger(data), fetcher(data),
M
Merge  
Michael Kolupaev 已提交
51 52
	log(&Logger::get(database_name_ + "." + table_name + " (StorageReplicatedMergeTree)")),
	shutdown_event(false), permanent_shutdown_event(false)
M
Merge  
Michael Kolupaev 已提交
53
{
M
Merge  
Michael Kolupaev 已提交
54 55
	if (!zookeeper)
	{
M
Merge  
Michael Kolupaev 已提交
56 57 58
		if (!attach)
			throw Exception("Can't create replicated table without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);

M
Merge  
Michael Kolupaev 已提交
59 60 61 62
		goReadOnly();
		return;
	}

M
Merge  
Michael Kolupaev 已提交
63 64 65
	if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
		zookeeper_path.erase(zookeeper_path.end() - 1);
	replica_path = zookeeper_path + "/replicas/" + replica_name;
66

M
Merge  
Michael Kolupaev 已提交
67 68
	if (!attach)
	{
M
Merge  
Michael Kolupaev 已提交
69
		if (!zookeeper->exists(zookeeper_path))
M
Merge  
Michael Kolupaev 已提交
70 71
			createTable();

M
Merge  
Michael Kolupaev 已提交
72
		checkTableStructure(false);
M
Merge  
Michael Kolupaev 已提交
73
		createReplica();
M
Merge  
Michael Kolupaev 已提交
74 75 76
	}
	else
	{
M
Merge  
Michael Kolupaev 已提交
77 78 79 80 81 82
		bool skip_sanity_checks = false;
		if (zookeeper->exists(replica_path + "/flags/force_restore_data"))
		{
			skip_sanity_checks = true;
			zookeeper->remove(replica_path + "/flags/force_restore_data");

M
Merge  
Michael Kolupaev 已提交
83 84
			LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
				<< replica_path << "/flags/force_restore_data).");
M
Merge  
Michael Kolupaev 已提交
85 86 87 88
		}

		checkTableStructure(skip_sanity_checks);
		checkParts(skip_sanity_checks);
M
Merge  
Michael Kolupaev 已提交
89
	}
M
Merge  
Michael Kolupaev 已提交
90

M
Merge  
Michael Kolupaev 已提交
91
	initVirtualParts();
M
Merge  
Michael Kolupaev 已提交
92
	loadQueue();
M
Merge  
Michael Kolupaev 已提交
93 94 95 96 97 98

	String unreplicated_path = full_path + "unreplicated/";
	if (Poco::File(unreplicated_path).exists())
	{
		LOG_INFO(log, "Have unreplicated data");
		unreplicated_data.reset(new MergeTreeData(unreplicated_path, columns_, context_, primary_expr_ast_,
M
Merge  
Michael Kolupaev 已提交
99
			date_column_name_, sampling_expression_, index_granularity_, mode_, sign_column_, settings_,
M
Merge  
Michael Kolupaev 已提交
100
			database_name_ + "." + table_name + "[unreplicated]", false));
M
Merge  
Michael Kolupaev 已提交
101
		unreplicated_reader.reset(new MergeTreeDataSelectExecutor(*unreplicated_data));
M
Merge  
Michael Kolupaev 已提交
102
		unreplicated_merger.reset(new MergeTreeDataMerger(*unreplicated_data));
M
Merge  
Michael Kolupaev 已提交
103
	}
M
Merge  
Michael Kolupaev 已提交
104

M
Merge  
Michael Kolupaev 已提交
105 106 107 108 109 110
	/// Сгенерируем этому экземпляру случайный идентификатор.
	struct timespec times;
	if (clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times))
		throwFromErrno("Cannot clock_gettime.", ErrorCodes::CANNOT_CLOCK_GETTIME);
	active_node_identifier = toString(times.tv_nsec);

M
Merge  
Michael Kolupaev 已提交
111
	restarting_thread = std::thread(&StorageReplicatedMergeTree::restartingThread, this);
M
Merge  
Michael Kolupaev 已提交
112 113 114 115 116
}

StoragePtr StorageReplicatedMergeTree::create(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
117
	bool attach,
M
Merge  
Michael Kolupaev 已提交
118 119
	const String & path_, const String & database_name_, const String & name_,
	NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
120
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
121 122 123 124 125 126 127 128
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
{
M
Merge  
Michael Kolupaev 已提交
129
	StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
M
Merge  
Michael Kolupaev 已提交
130
		path_, database_name_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
M
Merge  
Michael Kolupaev 已提交
131 132
		index_granularity_, mode_, sign_column_, settings_);
	StoragePtr res_ptr = res->thisPtr();
M
Merge  
Michael Kolupaev 已提交
133 134 135
	if (!res->is_read_only)
	{
		String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
M
Merge  
Michael Kolupaev 已提交
136
		InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, *res);
M
Merge  
Michael Kolupaev 已提交
137 138
		res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
	}
M
Merge  
Michael Kolupaev 已提交
139
	return res_ptr;
M
Merge  
Michael Kolupaev 已提交
140 141
}

M
Merge  
Michael Kolupaev 已提交
142 143 144 145 146 147 148 149
static String formattedAST(const ASTPtr & ast)
{
	if (!ast)
		return "";
	std::stringstream ss;
	formatAST(*ast, ss, 0, false, true);
	return ss.str();
}
M
Merge  
Michael Kolupaev 已提交
150

M
Merge  
Michael Kolupaev 已提交
151
void StorageReplicatedMergeTree::createTable()
M
Merge  
Michael Kolupaev 已提交
152
{
M
Merge  
Michael Kolupaev 已提交
153 154
	LOG_DEBUG(log, "Creating table " << zookeeper_path);

M
Merge  
Michael Kolupaev 已提交
155
	zookeeper->create(zookeeper_path, "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
156

M
Merge  
Michael Kolupaev 已提交
157
	/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними параметры таблицы.
M
Merge  
Michael Kolupaev 已提交
158 159 160 161 162 163 164 165
	std::stringstream metadata;
	metadata << "metadata format version: 1" << std::endl;
	metadata << "date column: " << data.date_column_name << std::endl;
	metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
	metadata << "index granularity: " << data.index_granularity << std::endl;
	metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
	metadata << "sign column: " << data.sign_column << std::endl;
	metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
M
Merge  
Michael Kolupaev 已提交
166

M
Merge  
Michael Kolupaev 已提交
167
	zookeeper->create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
168
	zookeeper->create(zookeeper_path + "/columns", data.getColumnsList().toString(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
169

M
Merge  
Michael Kolupaev 已提交
170
	zookeeper->create(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
171 172 173 174
	zookeeper->create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
	zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
	zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
	zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
175
	zookeeper->create(zookeeper_path + "/flags", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
176
	/// Создадим replicas в последнюю очередь, чтобы нельзя было добавить реплику, пока все остальные ноды не созданы.
M
Merge  
Michael Kolupaev 已提交
177
	zookeeper->create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
178
}
M
Merge  
Michael Kolupaev 已提交
179

M
Merge  
Michael Kolupaev 已提交
180 181
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
	* Если нет - бросить исключение.
M
Merge  
Michael Kolupaev 已提交
182
	*/
M
Merge  
Michael Kolupaev 已提交
183
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
184
{
M
Merge  
Michael Kolupaev 已提交
185
	String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
M
Merge  
Michael Kolupaev 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198
	ReadBufferFromString buf(metadata_str);
	assertString("metadata format version: 1", buf);
	assertString("\ndate column: ", buf);
	assertString(data.date_column_name, buf);
	assertString("\nsampling expression: ", buf);
	assertString(formattedAST(data.sampling_expression), buf);
	assertString("\nindex granularity: ", buf);
	assertString(toString(data.index_granularity), buf);
	assertString("\nmode: ", buf);
	assertString(toString(static_cast<int>(data.mode)), buf);
	assertString("\nsign column: ", buf);
	assertString(data.sign_column, buf);
	assertString("\nprimary key: ", buf);
M
Merge  
Michael Kolupaev 已提交
199 200
	/// NOTE: Можно сделать менее строгую проверку совпадения выражений, чтобы таблицы не ломались от небольших изменений
	///       в коде formatAST.
M
Merge  
Michael Kolupaev 已提交
201
	assertString(formattedAST(data.primary_expr_ast), buf);
M
Merge  
Michael Kolupaev 已提交
202
	assertString("\n", buf);
M
Merge  
Michael Kolupaev 已提交
203
	assertEOF(buf);
M
Merge  
Michael Kolupaev 已提交
204 205 206 207

	zkutil::Stat stat;
	auto columns = NamesAndTypesList::parse(zookeeper->get(zookeeper_path + "/columns", &stat), context.getDataTypeFactory());
	columns_version = stat.version;
M
Merge  
Michael Kolupaev 已提交
208
	if (columns != data.getColumnsList())
M
Merge  
Michael Kolupaev 已提交
209
	{
M
Merge  
Michael Kolupaev 已提交
210
		if (data.getColumnsList().sizeOfDifference(columns) <= 2 || skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
211
		{
M
Merge  
Michael Kolupaev 已提交
212
			LOG_WARNING(log, "Table structure in ZooKeeper is a little different from local table structure. Assuming ALTER.");
213 214 215 216

			/// Без всяких блокировок, потому что таблица еще не создана.
			InterpreterAlterQuery::updateMetadata(database_name, table_name, columns, context);
			data.setColumnsList(columns);
M
Merge  
Michael Kolupaev 已提交
217
		}
M
Merge  
Michael Kolupaev 已提交
218
		else
M
Merge  
Michael Kolupaev 已提交
219
		{
M
Merge  
Michael Kolupaev 已提交
220 221
			throw Exception("Table structure in ZooKeeper is very different from local table structure.",
							ErrorCodes::INCOMPATIBLE_COLUMNS);
M
Merge  
Michael Kolupaev 已提交
222
		}
M
Merge  
Michael Kolupaev 已提交
223 224
	}
}
M
Merge  
Michael Kolupaev 已提交
225

M
Merge  
Michael Kolupaev 已提交
226 227
void StorageReplicatedMergeTree::createReplica()
{
M
Merge  
Michael Kolupaev 已提交
228
	LOG_DEBUG(log, "Creating replica " << replica_path);
M
Merge  
Michael Kolupaev 已提交
229

M
Merge  
Michael Kolupaev 已提交
230 231 232 233 234 235 236
	/** Запомним список других реплик.
	  * NOTE: Здесь есть race condition. Если почти одновременно добавить нескольких реплик, сразу же начиная в них писать,
	  *       небольшая часть данных может не реплицироваться.
	  */
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");

	/// Создадим пустую реплику.
M
Merge  
Michael Kolupaev 已提交
237
	zookeeper->create(replica_path, "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
238
	zookeeper->create(replica_path + "/columns", data.getColumnsList().toString(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
239
	zookeeper->create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
240
	zookeeper->create(replica_path + "/log_pointer", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
241 242
	zookeeper->create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
	zookeeper->create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
243

M
Merge  
Michael Kolupaev 已提交
244 245 246 247 248
	/** Нужно изменить данные ноды /replicas на что угодно, чтобы поток, удаляющий старые записи в логе,
	  *  споткнулся об это изменение и не удалил записи, которые мы еще не прочитали.
	  */
	zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name);

M
Merge  
Michael Kolupaev 已提交
249
	if (replicas.empty())
M
Merge  
Michael Kolupaev 已提交
250 251
	{
		LOG_DEBUG(log, "No other replicas");
M
Merge  
Michael Kolupaev 已提交
252
		return;
M
Merge  
Michael Kolupaev 已提交
253
	}
M
Merge  
Michael Kolupaev 已提交
254

M
Merge  
Michael Kolupaev 已提交
255 256
	/// "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатель на лог.
	String source_replica = replicas[rand() % replicas.size()];
M
Merge  
Michael Kolupaev 已提交
257

M
Merge  
Michael Kolupaev 已提交
258 259
	LOG_INFO(log, "Will mimic " << source_replica);

M
Merge  
Michael Kolupaev 已提交
260 261 262 263
	String source_path = zookeeper_path + "/replicas/" + source_replica;

	/// Порядок следующих трех действий важен. Записи в логе могут продублироваться, но не могут потеряться.

M
Merge  
Michael Kolupaev 已提交
264 265
	/// Скопируем у эталонной реплики ссылку на лог.
	zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));
M
Merge  
Michael Kolupaev 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295

	/// Запомним очередь эталонной реплики.
	Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
	std::sort(source_queue_names.begin(), source_queue_names.end());
	Strings source_queue;
	for (const String & entry_name : source_queue_names)
	{
		String entry;
		if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry))
			continue;
		source_queue.push_back(entry);
	}

	/// Добавим в очередь задания на получение всех активных кусков, которые есть у эталонной реплики.
	Strings parts = zookeeper->getChildren(source_path + "/parts");
	ActiveDataPartSet active_parts_set;
	for (const String & part : parts)
	{
		active_parts_set.add(part);
	}
	Strings active_parts = active_parts_set.getParts();
	for (const String & name : active_parts)
	{
		LogEntry log_entry;
		log_entry.type = LogEntry::GET_PART;
		log_entry.source_replica = "";
		log_entry.new_part_name = name;

		zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
	}
M
Merge  
Michael Kolupaev 已提交
296
	LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");
M
Merge  
Michael Kolupaev 已提交
297 298 299 300 301 302

	/// Добавим в очередь содержимое очереди эталонной реплики.
	for (const String & entry : source_queue)
	{
		zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
	}
M
Merge  
Michael Kolupaev 已提交
303
	LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
M
Merge  
Michael Kolupaev 已提交
304
}
M
Merge  
Michael Kolupaev 已提交
305

M
Merge  
Michael Kolupaev 已提交
306 307 308 309 310
void StorageReplicatedMergeTree::activateReplica()
{
	std::stringstream host;
	host << "host: " << context.getInterserverIOHost() << std::endl;
	host << "port: " << context.getInterserverIOPort() << std::endl;
M
Merge  
Michael Kolupaev 已提交
311

M
Merge  
Michael Kolupaev 已提交
312 313 314 315 316 317 318 319 320
	/** Если нода отмечена как активная, но отметка сделана в этом же экземпляре, удалим ее.
	  * Такое возможно только при истечении сессии в ZooKeeper.
	  * Здесь есть небольшой race condition (можем удалить не ту ноду, для которой сделали tryGet),
	  *  но он крайне маловероятен при нормальном использовании.
	  */
	String data;
	if (zookeeper->tryGet(replica_path + "/is_active", data) && data == active_node_identifier)
		zookeeper->tryRemove(replica_path + "/is_active");

M
Merge  
Michael Kolupaev 已提交
321
	/// Одновременно объявим, что эта реплика активна, и обновим хост.
M
Merge  
Michael Kolupaev 已提交
322
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
323
	ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral));
M
Merge  
Michael Kolupaev 已提交
324
	ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
M
Merge  
Michael Kolupaev 已提交
325 326 327

	try
	{
M
Merge  
Michael Kolupaev 已提交
328
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
329 330 331
	}
	catch (zkutil::KeeperException & e)
	{
332
		if (e.code == ZNODEEXISTS)
M
Merge  
Michael Kolupaev 已提交
333 334 335 336 337
			throw Exception("Replica " + replica_path + " appears to be already active. If you're sure it's not, "
				"try again in a minute or remove znode " + replica_path + "/is_active manually", ErrorCodes::REPLICA_IS_ALREADY_ACTIVE);

		throw;
	}
M
Merge  
Michael Kolupaev 已提交
338

M
Merge  
Michael Kolupaev 已提交
339
	replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", *zookeeper);
M
Merge  
Michael Kolupaev 已提交
340
}
M
Merge  
Michael Kolupaev 已提交
341

M
Merge  
Michael Kolupaev 已提交
342
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
343
{
M
Merge  
Michael Kolupaev 已提交
344
	Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
M
Merge  
Michael Kolupaev 已提交
345 346

	/// Куски в ZK.
M
Merge  
Michael Kolupaev 已提交
347 348
	NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());

M
Merge  
Michael Kolupaev 已提交
349
	MergeTreeData::DataParts parts = data.getAllDataParts();
M
Merge  
Michael Kolupaev 已提交
350

M
Merge  
Michael Kolupaev 已提交
351
	/// Локальные куски, которых нет в ZK.
M
Merge  
Michael Kolupaev 已提交
352
	MergeTreeData::DataParts unexpected_parts;
M
Merge  
Michael Kolupaev 已提交
353

M
Merge  
Michael Kolupaev 已提交
354 355 356 357 358 359 360 361
	for (const auto & part : parts)
	{
		if (expected_parts.count(part->name))
		{
			expected_parts.erase(part->name);
		}
		else
		{
M
Merge  
Michael Kolupaev 已提交
362
			unexpected_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
363 364 365
		}
	}

M
Merge  
Michael Kolupaev 已提交
366
	/// Какие локальные куски добавить в ZK.
M
Merge  
Michael Kolupaev 已提交
367
	MergeTreeData::DataPartsVector parts_to_add;
M
Merge  
Michael Kolupaev 已提交
368 369 370 371

	/// Какие куски нужно забрать с других реплик.
	Strings parts_to_fetch;

M
Merge  
Michael Kolupaev 已提交
372 373
	for (const String & missing_name : expected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
374
		/// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим.
M
Merge  
Michael Kolupaev 已提交
375
		auto containing = data.getActiveContainingPart(missing_name);
M
Merge  
Michael Kolupaev 已提交
376 377 378 379 380 381 382 383 384 385
		if (containing)
		{
			LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
			if (unexpected_parts.count(containing))
			{
				parts_to_add.push_back(containing);
				unexpected_parts.erase(containing);
			}
		}
		else
M
Merge  
Michael Kolupaev 已提交
386
		{
M
Merge  
Michael Kolupaev 已提交
387
			parts_to_fetch.push_back(missing_name);
M
Merge  
Michael Kolupaev 已提交
388 389
		}
	}
M
Merge  
Michael Kolupaev 已提交
390

M
Merge  
Michael Kolupaev 已提交
391 392 393 394 395 396
	for (const String & name : parts_to_fetch)
		expected_parts.erase(name);

	String sanity_report =
		"There are " + toString(unexpected_parts.size()) + " unexpected parts, "
					 + toString(parts_to_add.size()) + " unexpectedly merged parts, "
M
Merge  
Michael Kolupaev 已提交
397
					 + toString(expected_parts.size()) + " missing obsolete parts, "
M
Merge  
Michael Kolupaev 已提交
398 399 400
					 + toString(parts_to_fetch.size()) + " missing parts";
	bool insane =
		parts_to_add.size() > 2 ||
M
Merge  
Michael Kolupaev 已提交
401
		unexpected_parts.size() > 2 ||
M
Merge  
Michael Kolupaev 已提交
402 403 404
		expected_parts.size() > 20 ||
		parts_to_fetch.size() > 2;

M
Merge  
Michael Kolupaev 已提交
405
	if (insane && !skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
406 407 408
	{
		throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. "
			+ sanity_report,
M
Merge  
Michael Kolupaev 已提交
409
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);
M
Merge  
Michael Kolupaev 已提交
410 411
	}

M
Merge  
Michael Kolupaev 已提交
412 413 414 415 416
	if (insane)
	{
		LOG_WARNING(log, sanity_report);
	}

M
Merge  
Michael Kolupaev 已提交
417 418 419
	/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
	for (MergeTreeData::DataPartPtr part : parts_to_add)
	{
M
Merge  
Michael Kolupaev 已提交
420
		LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
M
Merge  
Michael Kolupaev 已提交
421

M
Merge  
Michael Kolupaev 已提交
422
		zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
423
		checkPartAndAddToZooKeeper(part, ops);
M
Merge  
Michael Kolupaev 已提交
424
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
425
	}
M
Merge  
Michael Kolupaev 已提交
426

M
Merge  
Michael Kolupaev 已提交
427 428 429
	/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
	for (const String & name : expected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
430 431 432
		LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);

		zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
433
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
M
Merge  
Michael Kolupaev 已提交
434 435 436 437 438 439 440 441 442 443 444 445
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
		zookeeper->multi(ops);
	}

	/// Добавим в очередь задание забрать недостающие куски с других реплик и уберем из ZK информацию, что они у нас есть.
	for (const String & name : parts_to_fetch)
	{
		LOG_ERROR(log, "Removing missing part from ZooKeeper and queueing a fetch: " << name);

		LogEntry log_entry;
		log_entry.type = LogEntry::GET_PART;
M
Merge  
Michael Kolupaev 已提交
446
		log_entry.source_replica = "";
M
Merge  
Michael Kolupaev 已提交
447 448 449
		log_entry.new_part_name = name;

		/// Полагаемся, что это происходит до загрузки очереди (loadQueue).
M
Merge  
Michael Kolupaev 已提交
450
		zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
451
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
M
Merge  
Michael Kolupaev 已提交
452 453
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
M
Merge  
Michael Kolupaev 已提交
454 455
		ops.push_back(new zkutil::Op::Create(
			replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
M
Merge  
Michael Kolupaev 已提交
456
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
457 458 459
	}

	/// Удалим лишние локальные куски.
M
Merge  
Michael Kolupaev 已提交
460 461
	for (MergeTreeData::DataPartPtr part : unexpected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
462
		LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name);
M
Merge  
Michael Kolupaev 已提交
463
		data.renameAndDetachPart(part, "ignored_");
M
Merge  
Michael Kolupaev 已提交
464 465
	}
}
M
Merge  
Michael Kolupaev 已提交
466

M
Merge  
Michael Kolupaev 已提交
467 468 469 470 471 472 473 474 475
void StorageReplicatedMergeTree::initVirtualParts()
{
	auto parts = data.getDataParts();
	for (const auto & part : parts)
	{
		virtual_parts.add(part->name);
	}
}

M
Merge  
Michael Kolupaev 已提交
476
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataPartPtr part, zkutil::Ops & ops)
M
Merge  
Michael Kolupaev 已提交
477
{
478
	check(part->columns);
M
Merge  
Michael Kolupaev 已提交
479
	int expected_columns_version = columns_version;
480

M
Merge  
Michael Kolupaev 已提交
481 482 483 484 485
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
	std::random_shuffle(replicas.begin(), replicas.end());
	String expected_columns_str = part->columns.toString();

	for (const String & replica : replicas)
M
Merge  
Michael Kolupaev 已提交
486
	{
M
Merge  
Michael Kolupaev 已提交
487 488 489 490 491 492 493 494 495 496
		zkutil::Stat stat_before, stat_after;
		String columns_str;
		if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", columns_str, &stat_before))
			continue;
		if (columns_str != expected_columns_str)
		{
			LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica
				<< " because columns are different");
			continue;
		}
M
Merge  
Michael Kolupaev 已提交
497
		String checksums_str;
M
Merge  
Michael Kolupaev 已提交
498 499 500 501 502
		/// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums.
		/// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным.
		if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/checksums", checksums_str) ||
			!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part->name + "/columns", &stat_after) ||
			stat_before.version != stat_after.version)
M
Merge  
Michael Kolupaev 已提交
503
		{
M
Merge  
Michael Kolupaev 已提交
504 505 506
			LOG_INFO(log, "Not checking checksums of part " << part->name << " with replica " << replica
				<< " because part changed while we were reading its checksums");
			continue;
M
Merge  
Michael Kolupaev 已提交
507
		}
M
Merge  
Michael Kolupaev 已提交
508 509 510

		auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
		checksums.checkEqual(part->checksums, true);
M
Merge  
Michael Kolupaev 已提交
511 512
	}

513 514 515
	ops.push_back(new zkutil::Op::Check(
		zookeeper_path + "/columns",
		expected_columns_version));
M
Merge  
Michael Kolupaev 已提交
516 517 518
	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name,
		"",
M
Merge  
Michael Kolupaev 已提交
519
		zookeeper->getDefaultACL(),
M
Merge  
Michael Kolupaev 已提交
520
		zkutil::CreateMode::Persistent));
M
Merge  
Michael Kolupaev 已提交
521 522 523 524 525
	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name + "/columns",
		part->columns.toString(),
		zookeeper->getDefaultACL(),
		zkutil::CreateMode::Persistent));
M
Merge  
Michael Kolupaev 已提交
526 527 528
	ops.push_back(new zkutil::Op::Create(
		replica_path + "/parts/" + part->name + "/checksums",
		part->checksums.toString(),
M
Merge  
Michael Kolupaev 已提交
529
		zookeeper->getDefaultACL(),
M
Merge  
Michael Kolupaev 已提交
530 531 532
		zkutil::CreateMode::Persistent));
}

M
Merge  
Michael Kolupaev 已提交
533 534
void StorageReplicatedMergeTree::clearOldParts()
{
M
Merge  
Michael Kolupaev 已提交
535 536
	MergeTreeData::DataPartsVector parts = data.grabOldParts();
	size_t count = parts.size();
M
Merge  
Michael Kolupaev 已提交
537

M
Merge  
Michael Kolupaev 已提交
538 539 540 541
	if (!count)
		return;

	try
M
Merge  
Michael Kolupaev 已提交
542
	{
M
Merge  
Michael Kolupaev 已提交
543 544 545 546 547 548 549 550
		while (!parts.empty())
		{
			MergeTreeData::DataPartPtr part = parts.back();

			zkutil::Ops ops;
			ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/columns", -1));
			ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
			ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
M
Merge  
Michael Kolupaev 已提交
551 552 553
			auto code = zookeeper->tryMulti(ops);
			if (code != ZOK)
				LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
M
Merge  
Michael Kolupaev 已提交
554 555 556 557 558 559 560 561 562

			part->remove();
			parts.pop_back();
		}
	}
	catch (...)
	{
		data.addOldParts(parts);
		throw;
M
Merge  
Michael Kolupaev 已提交
563 564
	}

M
Merge  
Michael Kolupaev 已提交
565
	LOG_DEBUG(log, "Removed " << count << " old parts");
M
Merge  
Michael Kolupaev 已提交
566 567
}

M
Merge  
Michael Kolupaev 已提交
568 569
void StorageReplicatedMergeTree::clearOldLogs()
{
M
Merge  
Michael Kolupaev 已提交
570 571 572 573 574 575 576 577 578 579 580
	zkutil::Stat stat;
	if (!zookeeper->exists(zookeeper_path + "/log", &stat))
		throw Exception(zookeeper_path + "/log doesn't exist", ErrorCodes::NOT_FOUND_NODE);

	int children_count = stat.numChildren;

	/// Будем ждать, пока накопятся в 1.1 раза больше записей, чем нужно.
	if (static_cast<double>(children_count) < data.settings.replicated_logs_to_keep * 1.1)
		return;

	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas", &stat);
M
Merge  
Michael Kolupaev 已提交
581 582 583
	UInt64 min_pointer = std::numeric_limits<UInt64>::max();
	for (const String & replica : replicas)
	{
M
Merge  
Michael Kolupaev 已提交
584 585
		String pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
		if (pointer.empty())
M
Merge  
Michael Kolupaev 已提交
586 587 588 589
			return;
		min_pointer = std::min(min_pointer, parse<UInt64>(pointer));
	}

M
Merge  
Michael Kolupaev 已提交
590
	Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
M
Merge  
Michael Kolupaev 已提交
591
	std::sort(entries.begin(), entries.end());
M
Merge  
Michael Kolupaev 已提交
592

M
Merge  
Michael Kolupaev 已提交
593 594
	/// Не будем трогать последние replicated_logs_to_keep записей.
	entries.erase(entries.end() - std::min(entries.size(), data.settings.replicated_logs_to_keep), entries.end());
M
Merge  
Michael Kolupaev 已提交
595 596 597 598 599
	/// Не будем трогать записи, не меньшие min_pointer.
	entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end());

	if (entries.empty())
		return;
M
Merge  
Michael Kolupaev 已提交
600

M
Merge  
Michael Kolupaev 已提交
601
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
602
	for (size_t i = 0; i < entries.size(); ++i)
M
Merge  
Michael Kolupaev 已提交
603
	{
M
Merge  
Michael Kolupaev 已提交
604 605 606
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/log/" + entries[i], -1));

		if (ops.size() > 400 || i + 1 == entries.size())
M
Merge  
Michael Kolupaev 已提交
607 608 609 610 611 612 613 614
		{
			/// Одновременно с очисткой лога проверим, не добавилась ли реплика с тех пор, как мы получили список реплик.
			ops.push_back(new zkutil::Op::Check(zookeeper_path + "/replicas", stat.version));
			zookeeper->multi(ops);
			ops.clear();
		}
	}

M
Merge  
Michael Kolupaev 已提交
615
	LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
M
Merge  
Michael Kolupaev 已提交
616 617 618 619 620
}

void StorageReplicatedMergeTree::clearOldBlocks()
{
	zkutil::Stat stat;
M
Merge  
Michael Kolupaev 已提交
621
	if (!zookeeper->exists(zookeeper_path + "/blocks", &stat))
M
Merge  
Michael Kolupaev 已提交
622 623
		throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);

624
	int children_count = stat.numChildren;
M
Merge  
Michael Kolupaev 已提交
625

M
Merge  
Michael Kolupaev 已提交
626
	/// Чтобы делать "асимптотически" меньше запросов exists, будем ждать, пока накопятся в 1.1 раза больше блоков, чем нужно.
M
Merge  
Michael Kolupaev 已提交
627
	if (static_cast<double>(children_count) < data.settings.replicated_deduplication_window * 1.1)
M
Merge  
Michael Kolupaev 已提交
628 629
		return;

M
Merge  
Michael Kolupaev 已提交
630
	LOG_TRACE(log, "Clearing about " << static_cast<size_t>(children_count) - data.settings.replicated_deduplication_window
M
Merge  
Michael Kolupaev 已提交
631
		<< " old blocks from ZooKeeper. This might take several minutes.");
M
Merge  
Michael Kolupaev 已提交
632

M
Merge  
Michael Kolupaev 已提交
633
	Strings blocks = zookeeper->getChildren(zookeeper_path + "/blocks");
M
Merge  
Michael Kolupaev 已提交
634 635 636 637 638 639

	std::vector<std::pair<Int64, String> > timed_blocks;

	for (const String & block : blocks)
	{
		zkutil::Stat stat;
M
Merge  
Michael Kolupaev 已提交
640
		zookeeper->exists(zookeeper_path + "/blocks/" + block, &stat);
641
		timed_blocks.push_back(std::make_pair(stat.czxid, block));
M
Merge  
Michael Kolupaev 已提交
642 643
	}

M
Merge  
Michael Kolupaev 已提交
644
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
645
	std::sort(timed_blocks.begin(), timed_blocks.end(), std::greater<std::pair<Int64, String>>());
M
Merge  
Michael Kolupaev 已提交
646 647
	for (size_t i = data.settings.replicated_deduplication_window; i <  timed_blocks.size(); ++i)
	{
M
Merge  
Michael Kolupaev 已提交
648
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/number", -1));
M
Merge  
Michael Kolupaev 已提交
649
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
M
Merge  
Michael Kolupaev 已提交
650 651
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
		ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
M
Merge  
Michael Kolupaev 已提交
652 653 654 655 656
		if (ops.size() > 400)
		{
			zookeeper->multi(ops);
			ops.clear();
		}
M
Merge  
Michael Kolupaev 已提交
657
	}
M
Merge  
Michael Kolupaev 已提交
658 659
	if (!ops.empty())
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
660 661 662 663

	LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}

M
Merge  
Michael Kolupaev 已提交
664 665 666 667
void StorageReplicatedMergeTree::loadQueue()
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

M
Merge  
Michael Kolupaev 已提交
668
	Strings children = zookeeper->getChildren(replica_path + "/queue");
M
Merge  
Michael Kolupaev 已提交
669 670 671
	std::sort(children.begin(), children.end());
	for (const String & child : children)
	{
M
Merge  
Michael Kolupaev 已提交
672
		String s = zookeeper->get(replica_path + "/queue/" + child);
M
Merge  
Michael Kolupaev 已提交
673 674
		LogEntry entry = LogEntry::parse(s);
		entry.znode_name = child;
M
Merge  
Michael Kolupaev 已提交
675
		entry.addResultToVirtualParts(*this);
M
Merge  
Michael Kolupaev 已提交
676
		queue.push_back(entry);
M
Merge  
Michael Kolupaev 已提交
677 678 679
	}
}

M
Merge  
Michael Kolupaev 已提交
680
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
M
Merge  
Michael Kolupaev 已提交
681 682
{
	Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
683

M
Merge  
Michael Kolupaev 已提交
684 685
	String index_str = zookeeper->get(replica_path + "/log_pointer");
	UInt64 index;
M
Merge  
Michael Kolupaev 已提交
686

M
Merge  
Michael Kolupaev 已提交
687
	if (index_str.empty())
M
Merge  
Michael Kolupaev 已提交
688
	{
M
Merge  
Michael Kolupaev 已提交
689 690 691
		/// Если у нас еще нет указателя на лог, поставим указатель на первую запись в нем.
		Strings entries = zookeeper->getChildren(zookeeper_path + "/log");
		index = entries.empty() ? 0 : parse<UInt64>(std::min_element(entries.begin(), entries.end())->substr(strlen("log-")));
M
Merge  
Michael Kolupaev 已提交
692

M
Merge  
Michael Kolupaev 已提交
693
		zookeeper->set(replica_path + "/log_pointer", toString(index));
M
Merge  
Michael Kolupaev 已提交
694 695
	}
	else
M
Merge  
Michael Kolupaev 已提交
696
	{
M
Merge  
Michael Kolupaev 已提交
697
		index = parse<UInt64>(index_str);
M
Merge  
Michael Kolupaev 已提交
698
	}
M
Merge  
Michael Kolupaev 已提交
699

M
Merge  
Michael Kolupaev 已提交
700
	size_t count = 0;
M
Merge  
Michael Kolupaev 已提交
701 702
	String entry_str;
	while (zookeeper->tryGet(zookeeper_path + "/log/log-" + padIndex(index), entry_str))
M
Merge  
Michael Kolupaev 已提交
703
	{
M
Merge  
Michael Kolupaev 已提交
704
		++count;
M
Merge  
Michael Kolupaev 已提交
705
		++index;
M
Merge  
Michael Kolupaev 已提交
706

M
Merge  
Michael Kolupaev 已提交
707
		LogEntry entry = LogEntry::parse(entry_str);
M
Merge  
Michael Kolupaev 已提交
708

M
Merge  
Michael Kolupaev 已提交
709 710 711
		/// Одновременно добавим запись в очередь и продвинем указатель на лог.
		zkutil::Ops ops;
		ops.push_back(new zkutil::Op::Create(
M
Merge  
Michael Kolupaev 已提交
712
			replica_path + "/queue/queue-", entry_str, zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
M
Merge  
Michael Kolupaev 已提交
713
		ops.push_back(new zkutil::Op::SetData(
M
Merge  
Michael Kolupaev 已提交
714
			replica_path + "/log_pointer", toString(index), -1));
M
Merge  
Michael Kolupaev 已提交
715
		auto results = zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
716

717
		String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
M
Merge  
Michael Kolupaev 已提交
718
		entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
M
Merge  
Michael Kolupaev 已提交
719
		entry.addResultToVirtualParts(*this);
M
Merge  
Michael Kolupaev 已提交
720
		queue.push_back(entry);
M
Merge  
Michael Kolupaev 已提交
721
	}
M
Merge  
Michael Kolupaev 已提交
722

M
Merge  
Michael Kolupaev 已提交
723 724 725 726
	if (next_update_event)
	{
		if (zookeeper->exists(zookeeper_path + "/log/log-" + padIndex(index), nullptr, next_update_event))
			next_update_event->set();
M
Merge  
Michael Kolupaev 已提交
727
	}
M
Merge  
Michael Kolupaev 已提交
728

M
Merge  
Michael Kolupaev 已提交
729 730 731
	if (!count)
		return;

M
Merge  
Michael Kolupaev 已提交
732 733
	if (queue_task_handle)
		queue_task_handle->wake();
M
Merge  
Michael Kolupaev 已提交
734

M
Merge  
Michael Kolupaev 已提交
735
	LOG_DEBUG(log, "Pulled " << count << " entries to queue");
M
Merge  
Michael Kolupaev 已提交
736 737
}

M
Merge  
Michael Kolupaev 已提交
738
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
M
Merge  
Michael Kolupaev 已提交
739
{
M
Merge  
Michael Kolupaev 已提交
740
	if ((entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART) && future_parts.count(entry.new_part_name))
M
Merge  
Michael Kolupaev 已提交
741 742 743 744 745 746
	{
		LOG_DEBUG(log, "Not executing log entry for part " << entry.new_part_name <<
			" because another log entry for the same part is being processed. This shouldn't happen often.");
		return false;
	}

M
Merge  
Michael Kolupaev 已提交
747 748 749 750 751 752 753 754 755 756 757
	if (entry.type == LogEntry::MERGE_PARTS)
	{
		/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
		  * Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
		  * Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
		  * Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
		  */
		for (const auto & name : entry.parts_to_merge)
		{
			if (future_parts.count(name))
			{
M
Merge  
Michael Kolupaev 已提交
758
				LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " because part " << name << " is not ready yet.");
M
Merge  
Michael Kolupaev 已提交
759 760 761 762 763 764
				return false;
			}
		}
	}

	return true;
M
Merge  
Michael Kolupaev 已提交
765 766
}

M
Merge  
Michael Kolupaev 已提交
767
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry, BackgroundProcessingPool::Context & pool_context)
M
Merge  
Michael Kolupaev 已提交
768 769 770 771 772
{
	if (entry.type == LogEntry::GET_PART ||
		entry.type == LogEntry::MERGE_PARTS)
	{
		/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
M
Merge  
Michael Kolupaev 已提交
773
		MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
M
Merge  
Michael Kolupaev 已提交
774 775

		/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
M
Merge  
Michael Kolupaev 已提交
776
		if (containing_part && zookeeper->exists(replica_path + "/parts/" + containing_part->name))
M
Merge  
Michael Kolupaev 已提交
777
		{
M
Merge  
Michael Kolupaev 已提交
778 779
			if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
				LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
M
Merge  
Michael Kolupaev 已提交
780
			return true;
M
Merge  
Michael Kolupaev 已提交
781
		}
M
Merge  
Michael Kolupaev 已提交
782 783
	}

M
Merge  
Michael Kolupaev 已提交
784
	if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
M
Merge  
Michael Kolupaev 已提交
785
		LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
M
Merge  
Michael Kolupaev 已提交
786 787 788

	bool do_fetch = false;

M
Merge  
Michael Kolupaev 已提交
789 790
	if (entry.type == LogEntry::GET_PART)
	{
M
Merge  
Michael Kolupaev 已提交
791
		do_fetch = true;
M
Merge  
Michael Kolupaev 已提交
792 793 794
	}
	else if (entry.type == LogEntry::MERGE_PARTS)
	{
M
Merge  
Michael Kolupaev 已提交
795
		MergeTreeData::DataPartsVector parts;
796
		bool have_all_parts = true;
M
Merge  
Michael Kolupaev 已提交
797 798
		for (const String & name : entry.parts_to_merge)
		{
M
Merge  
Michael Kolupaev 已提交
799
			MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
M
Merge  
Michael Kolupaev 已提交
800 801 802 803 804 805 806 807 808 809 810 811
			if (!part)
			{
				have_all_parts = false;
				break;
			}
			if (part->name != name)
			{
				LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
					<< " but should be merged into " << entry.new_part_name);
				have_all_parts = false;
				break;
			}
M
Merge  
Michael Kolupaev 已提交
812 813
			parts.push_back(part);
		}
M
Merge  
Michael Kolupaev 已提交
814

M
Merge  
Michael Kolupaev 已提交
815
		if (!have_all_parts)
M
Merge  
Michael Kolupaev 已提交
816
		{
M
Merge  
Michael Kolupaev 已提交
817 818 819
			/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
			do_fetch = true;
			LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
M
Merge  
Michael Kolupaev 已提交
820
		}
M
Merge  
Michael Kolupaev 已提交
821 822
		else
		{
M
Merge  
Michael Kolupaev 已提交
823 824 825
			/// Если собираемся сливать большие куски, увеличим счетчик потоков, сливающих большие куски.
			for (const auto & part : parts)
			{
M
Merge  
Michael Kolupaev 已提交
826
				if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
M
Merge  
Michael Kolupaev 已提交
827 828 829 830 831 832 833
				{
					pool_context.incrementCounter("big merges");
					pool_context.incrementCounter("replicated big merges");
					break;
				}
			}

834 835
			auto table_lock = lockStructure(true);

M
Merge  
Michael Kolupaev 已提交
836 837
			MergeTreeData::Transaction transaction;
			MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, &transaction);
M
Merge  
Michael Kolupaev 已提交
838 839

			zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
840
			checkPartAndAddToZooKeeper(part, ops);
M
Merge  
Michael Kolupaev 已提交
841

M
Merge  
Michael Kolupaev 已提交
842
			zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
843
			transaction.commit();
M
Merge  
Michael Kolupaev 已提交
844
			merge_selecting_event.set();
M
Merge  
Michael Kolupaev 已提交
845 846 847

			ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
		}
M
Merge  
Michael Kolupaev 已提交
848 849 850 851 852
	}
	else
	{
		throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
	}
M
Merge  
Michael Kolupaev 已提交
853 854 855

	if (do_fetch)
	{
M
Merge  
Michael Kolupaev 已提交
856 857
		String replica;

M
Merge  
Michael Kolupaev 已提交
858 859
		try
		{
M
Merge  
Michael Kolupaev 已提交
860
			replica = findReplicaHavingPart(entry.new_part_name, true);
M
Merge  
Michael Kolupaev 已提交
861 862 863 864 865
			if (replica.empty())
			{
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
				throw Exception("No active replica has part " + entry.new_part_name, ErrorCodes::NO_REPLICA_HAS_PART);
			}
M
Merge  
Michael Kolupaev 已提交
866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915
			fetchPart(entry.new_part_name, replica);

			if (entry.type == LogEntry::MERGE_PARTS)
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
		}
		catch (...)
		{
			/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
			  * а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
			  * для этого мерджа в конец очереди.
			  *
			  */
			try
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

				/// Найдем действие по объединению этого куска с другими. Запомним других.
				StringSet parts_for_merge;
				LogEntries::iterator merge_entry;
				for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
				{
					if (it->type == LogEntry::MERGE_PARTS)
					{
						if (std::find(it->parts_to_merge.begin(), it->parts_to_merge.end(), entry.new_part_name)
							!= it->parts_to_merge.end())
						{
							parts_for_merge = StringSet(it->parts_to_merge.begin(), it->parts_to_merge.end());
							merge_entry = it;
							break;
						}
					}
				}

				if (!parts_for_merge.empty())
				{
					/// Переместим в конец очереди действия, получающие parts_for_merge.
					for (LogEntries::iterator it = queue.begin(); it != queue.end();)
					{
						auto it0 = it;
						++it;

						if (it0 == merge_entry)
							break;

						if ((it0->type == LogEntry::MERGE_PARTS || it0->type == LogEntry::GET_PART)
							&& parts_for_merge.count(it0->new_part_name))
						{
							queue.splice(queue.end(), queue, it0, it);
						}
					}
M
Merge  
Michael Kolupaev 已提交
916 917 918 919 920 921 922 923 924

					/** Если этого куска ни у кого нет, но в очереди упоминается мердж с его участием, то наверно этот кусок такой старый,
					  *  что его все померджили и удалили. Не будем бросать исключение, чтобы queueTask лишний раз не спала.
					  */
					if (replica.empty())
					{
						LOG_INFO(log, "No replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
						return false;
					}
M
Merge  
Michael Kolupaev 已提交
925
				}
M
Merge  
Michael Kolupaev 已提交
926 927 928 929

				/// Если ни у кого нет куска, и в очереди нет слияний с его участием, проверим, есть ли у кого-то покрывающий его.
				if (replica.empty())
					enqueuePartForCheck(entry.new_part_name);
M
Merge  
Michael Kolupaev 已提交
930 931 932 933 934 935 936 937 938
			}
			catch (...)
			{
				tryLogCurrentException(__PRETTY_FUNCTION__);
			}

			throw;
		}
	}
M
Merge  
Michael Kolupaev 已提交
939 940

	return true;
M
Merge  
Michael Kolupaev 已提交
941 942 943 944 945 946
}

void StorageReplicatedMergeTree::queueUpdatingThread()
{
	while (!shutdown_called)
	{
M
Merge  
Michael Kolupaev 已提交
947 948
		try
		{
M
Merge  
Michael Kolupaev 已提交
949
			pullLogsToQueue(queue_updating_event);
M
Merge  
Michael Kolupaev 已提交
950

M
Merge  
Michael Kolupaev 已提交
951
			queue_updating_event->wait();
M
Merge  
Michael Kolupaev 已提交
952 953 954 955 956
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);

M
Merge  
Michael Kolupaev 已提交
957 958
			queue_updating_event->tryWait(ERROR_SLEEP_MS);
		}
M
Merge  
Michael Kolupaev 已提交
959
	}
M
Merge  
Michael Kolupaev 已提交
960 961

	LOG_DEBUG(log, "queue updating thread finished");
M
Merge  
Michael Kolupaev 已提交
962
}
M
Merge  
Michael Kolupaev 已提交
963

M
Merge  
Michael Kolupaev 已提交
964
bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & pool_context)
M
Merge  
Michael Kolupaev 已提交
965
{
M
Merge  
Michael Kolupaev 已提交
966 967
	LogEntry entry;
	bool have_work = false;
M
Merge  
Michael Kolupaev 已提交
968

M
Merge  
Michael Kolupaev 已提交
969 970 971 972 973
	try
	{
		Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
		bool empty = queue.empty();
		if (!empty)
M
Merge  
Michael Kolupaev 已提交
974
		{
M
Merge  
Michael Kolupaev 已提交
975
			for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
M
Merge  
Michael Kolupaev 已提交
976
			{
M
Merge  
Michael Kolupaev 已提交
977
				if (shouldExecuteLogEntry(*it))
M
Merge  
Michael Kolupaev 已提交
978
				{
M
Merge  
Michael Kolupaev 已提交
979 980 981 982 983
					entry = *it;
					entry.tagPartAsFuture(*this);
					queue.erase(it);
					have_work = true;
					break;
M
Merge  
Michael Kolupaev 已提交
984
				}
M
Merge  
Michael Kolupaev 已提交
985 986
			}
		}
M
Merge  
Michael Kolupaev 已提交
987 988 989 990 991
	}
	catch (...)
	{
		tryLogCurrentException(__PRETTY_FUNCTION__);
	}
M
Merge  
Michael Kolupaev 已提交
992

M
Merge  
Michael Kolupaev 已提交
993 994
	if (!have_work)
		return false;
M
Merge  
Michael Kolupaev 已提交
995

M
Merge  
Michael Kolupaev 已提交
996
	bool exception = true;
M
Merge  
Michael Kolupaev 已提交
997
	bool success = false;
M
Merge  
Michael Kolupaev 已提交
998

M
Merge  
Michael Kolupaev 已提交
999 1000
	try
	{
M
Merge  
Michael Kolupaev 已提交
1001
		success = executeLogEntry(entry, pool_context);
M
Merge  
Michael Kolupaev 已提交
1002

M
Merge  
Michael Kolupaev 已提交
1003 1004 1005 1006 1007 1008 1009
		if (success)
		{
			auto code = zookeeper->tryRemove(replica_path + "/queue/" + entry.znode_name);
			if (code != ZOK)
				LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
					<< zkutil::ZooKeeper::error2string(code) + ". There must be a bug somewhere. Ignoring it.");
		}
M
Merge  
Michael Kolupaev 已提交
1010

M
Merge  
Michael Kolupaev 已提交
1011
		exception = false;
M
Merge  
Michael Kolupaev 已提交
1012 1013 1014 1015
	}
	catch (Exception & e)
	{
		if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
M
Merge  
Michael Kolupaev 已提交
1016
			/// Если ни у кого нет нужного куска, наверно, просто не все реплики работают; не будем писать в лог с уровнем Error.
M
Merge  
Michael Kolupaev 已提交
1017 1018
			LOG_INFO(log, e.displayText());
		else
M
Merge  
Michael Kolupaev 已提交
1019
			tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
1020 1021 1022 1023 1024
	}
	catch (...)
	{
		tryLogCurrentException(__PRETTY_FUNCTION__);
	}
M
Merge  
Michael Kolupaev 已提交
1025

M
Merge  
Michael Kolupaev 已提交
1026 1027 1028 1029 1030 1031
	if (!success)
	{
		/// Добавим действие, которое не получилось выполнить, в конец очереди.
		entry.future_part_tagger = nullptr;
		Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
		queue.push_back(entry);
M
Merge  
Michael Kolupaev 已提交
1032
	}
M
Merge  
Michael Kolupaev 已提交
1033

M
Merge  
Michael Kolupaev 已提交
1034 1035
	/// Если не было исключения, не нужно спать.
	return !exception;
M
Merge  
Michael Kolupaev 已提交
1036 1037
}

M
Merge  
Michael Kolupaev 已提交
1038 1039 1040 1041 1042 1043
void StorageReplicatedMergeTree::mergeSelectingThread()
{
	pullLogsToQueue();

	while (!shutdown_called && is_leader_node)
	{
M
Michael Kolupaev 已提交
1044
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
1045

M
Michael Kolupaev 已提交
1046
		try
M
Merge  
Michael Kolupaev 已提交
1047
		{
M
Michael Kolupaev 已提交
1048
			size_t merges_queued = 0;
M
Merge  
Michael Kolupaev 已提交
1049
			/// Есть ли в очереди или в фоновом потоке мердж крупных кусков.
M
Merge  
Michael Kolupaev 已提交
1050
			bool has_big_merge = context.getBackgroundPool().getCounter("replicated big merges") > 0;
M
Merge  
Michael Kolupaev 已提交
1051

M
Merge  
Michael Kolupaev 已提交
1052
			if (!has_big_merge)
M
Michael Kolupaev 已提交
1053 1054
			{
				Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
M
Merge  
Michael Kolupaev 已提交
1055

M
Michael Kolupaev 已提交
1056
				for (const auto & entry : queue)
M
Merge  
Michael Kolupaev 已提交
1057
				{
M
Michael Kolupaev 已提交
1058
					if (entry.type == LogEntry::MERGE_PARTS)
M
Merge  
Michael Kolupaev 已提交
1059
					{
M
Michael Kolupaev 已提交
1060
						++merges_queued;
M
Merge  
Michael Kolupaev 已提交
1061 1062 1063 1064 1065

						if (!has_big_merge)
						{
							for (const String & name : entry.parts_to_merge)
							{
M
Merge  
Michael Kolupaev 已提交
1066
								MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
M
Merge  
Michael Kolupaev 已提交
1067 1068
								if (!part || part->name != name)
									continue;
M
Merge  
Michael Kolupaev 已提交
1069
								if (part->size_in_bytes > data.settings.max_bytes_to_merge_parts_small)
M
Merge  
Michael Kolupaev 已提交
1070 1071 1072 1073 1074 1075 1076 1077
								{
									has_big_merge = true;
									break;
								}
							}
						}
					}
				}
M
Michael Kolupaev 已提交
1078
			}
M
Merge  
Michael Kolupaev 已提交
1079

M
Merge  
Michael Kolupaev 已提交
1080
			do
M
Michael Kolupaev 已提交
1081
			{
M
Merge  
Michael Kolupaev 已提交
1082
				if (merges_queued >= data.settings.max_replicated_merges_in_queue)
M
Merge  
Michael Kolupaev 已提交
1083
					break;
M
Merge  
Michael Kolupaev 已提交
1084

M
Merge  
Michael Kolupaev 已提交
1085
				MergeTreeData::DataPartsVector parts;
M
Merge  
Michael Kolupaev 已提交
1086

M
Merge  
Michael Kolupaev 已提交
1087 1088 1089 1090
				String merged_name;
				auto can_merge = std::bind(
					&StorageReplicatedMergeTree::canMergeParts, this, std::placeholders::_1, std::placeholders::_2);

M
Merge  
Michael Kolupaev 已提交
1091 1092 1093
				if (!merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
												false, false, has_big_merge, can_merge) &&
					!merger.selectPartsToMerge(parts, merged_name, MergeTreeDataMerger::NO_LIMIT,
M
Merge  
Michael Kolupaev 已提交
1094
												true, false, has_big_merge, can_merge))
M
Merge  
Michael Kolupaev 已提交
1095
					break;
M
Merge  
Michael Kolupaev 已提交
1096

M
Merge  
Michael Kolupaev 已提交
1097 1098 1099 1100
				LogEntry entry;
				entry.type = LogEntry::MERGE_PARTS;
				entry.source_replica = replica_name;
				entry.new_part_name = merged_name;
M
Merge  
Michael Kolupaev 已提交
1101

M
Merge  
Michael Kolupaev 已提交
1102 1103 1104
				for (const auto & part : parts)
				{
					entry.parts_to_merge.push_back(part->name);
M
Merge  
Michael Kolupaev 已提交
1105 1106
				}

M
Merge  
Michael Kolupaev 已提交
1107
				zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
M
Merge  
Michael Kolupaev 已提交
1108

M
Merge  
Michael Kolupaev 已提交
1109
				/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
M
Merge  
Michael Kolupaev 已提交
1110
				///  (чтобы кусок добавился в virtual_parts).
M
Merge  
Michael Kolupaev 已提交
1111 1112 1113 1114
				pullLogsToQueue();

				String month_name = parts[0]->name.substr(0, 6);
				for (size_t i = 0; i + 1 < parts.size(); ++i)
M
Merge  
Michael Kolupaev 已提交
1115
				{
M
Merge  
Michael Kolupaev 已提交
1116 1117 1118
					/// Уберем больше не нужные отметки о несуществующих блоках.
					for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
					{
M
Merge  
Michael Kolupaev 已提交
1119
						String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
M
Merge  
Michael Kolupaev 已提交
1120

M
Merge  
Michael Kolupaev 已提交
1121
						zookeeper->tryRemove(path);
M
Merge  
Michael Kolupaev 已提交
1122
					}
M
Merge  
Michael Kolupaev 已提交
1123
				}
M
Merge  
Michael Kolupaev 已提交
1124 1125

				success = true;
M
Merge  
Michael Kolupaev 已提交
1126
			}
M
Merge  
Michael Kolupaev 已提交
1127
			while(false);
M
Merge  
Michael Kolupaev 已提交
1128 1129 1130 1131 1132 1133
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
1134
		if (shutdown_called || !is_leader_node)
M
Merge  
Michael Kolupaev 已提交
1135 1136
			break;

M
Merge  
Michael Kolupaev 已提交
1137
		if (!success)
M
Merge  
Michael Kolupaev 已提交
1138
			merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
M
Merge  
Michael Kolupaev 已提交
1139
	}
M
Merge  
Michael Kolupaev 已提交
1140 1141

	LOG_DEBUG(log, "merge selecting thread finished");
M
Merge  
Michael Kolupaev 已提交
1142 1143
}

M
Merge  
Michael Kolupaev 已提交
1144
void StorageReplicatedMergeTree::cleanupThread()
M
Merge  
Michael Kolupaev 已提交
1145
{
M
Merge  
Michael Kolupaev 已提交
1146
	while (!shutdown_called)
M
Merge  
Michael Kolupaev 已提交
1147 1148
	{
		try
M
Merge  
Michael Kolupaev 已提交
1149
		{
M
Merge  
Michael Kolupaev 已提交
1150 1151 1152 1153 1154 1155 1156
			clearOldParts();

			if (is_leader_node)
			{
				clearOldLogs();
				clearOldBlocks();
			}
M
Merge  
Michael Kolupaev 已提交
1157
		}
M
Merge  
Michael Kolupaev 已提交
1158 1159 1160 1161
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
M
Merge  
Michael Kolupaev 已提交
1162

M
Merge  
Michael Kolupaev 已提交
1163
		shutdown_event.tryWait(CLEANUP_SLEEP_MS);
M
Merge  
Michael Kolupaev 已提交
1164
	}
M
Merge  
Michael Kolupaev 已提交
1165 1166

	LOG_DEBUG(log, "cleanup thread finished");
M
Merge  
Michael Kolupaev 已提交
1167 1168
}

M
Merge  
Michael Kolupaev 已提交
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199
void StorageReplicatedMergeTree::alterThread()
{
	bool force_recheck_parts = true;

	while (!shutdown_called)
	{
		try
		{
			zkutil::Stat stat;
			String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
			NamesAndTypesList columns = NamesAndTypesList::parse(columns_str, context.getDataTypeFactory());

			bool changed = false;

			/// Проверим, что описание столбцов изменилось.
			/// Чтобы не останавливать лишний раз все запросы в таблицу, проверим сначала под локом на чтение.
			{
				auto table_lock = lockStructure(false);
				if (columns != data.getColumnsList())
					changed = true;
			}

			/// Если описание столбцов изменилось, обновим структуру таблицы локально.
			if (changed)
			{
				auto table_lock = lockStructureForAlter();
				if (columns != data.getColumnsList())
				{
					LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
					InterpreterAlterQuery::updateMetadata(database_name, table_name, columns, context);
					data.setColumnsList(columns);
M
Merge  
Michael Kolupaev 已提交
1200 1201
					if (unreplicated_data)
						unreplicated_data->setColumnsList(columns);
M
Merge  
Michael Kolupaev 已提交
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218
					columns_version = stat.version;
					LOG_INFO(log, "Applied changes to table.");
				}
				else
				{
					changed = false;
				}
			}

			/// Обновим куски.
			if (changed || force_recheck_parts)
			{
				if (changed)
					LOG_INFO(log, "ALTER-ing parts");

				int changed_parts = 0;

M
Merge  
Michael Kolupaev 已提交
1219 1220
				auto parts = data.getDataParts();

M
Merge  
Michael Kolupaev 已提交
1221 1222 1223
				for (const MergeTreeData::DataPartPtr & part : parts)
				{
					/// Обновим кусок и запишем результат во временные файлы.
M
Merge  
Michael Kolupaev 已提交
1224 1225
					/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
					///  нода /flags/force_alter.
M
Merge  
Michael Kolupaev 已提交
1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
					auto transaction = data.alterDataPart(part, columns);

					if (!transaction)
						continue;

					++changed_parts;

					/// Обновим метаданные куска в ZooKeeper.
					zkutil::Ops ops;
					ops.push_back(new zkutil::Op::SetData(replica_path + "/parts/" + part->name + "/columns", part->columns.toString(), -1));
					ops.push_back(new zkutil::Op::SetData(replica_path + "/parts/" + part->name + "/checksums", part->checksums.toString(), -1));
					zookeeper->multi(ops);

					/// Применим изменения файлов.
					transaction->commit();
				}

M
Merge  
Michael Kolupaev 已提交
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259
				/// То же самое для нереплицируемых данных.
				if (unreplicated_data)
				{
					parts = unreplicated_data->getDataParts();

					for (const MergeTreeData::DataPartPtr & part : parts)
					{
						auto transaction = unreplicated_data->alterDataPart(part, columns);

						if (!transaction)
							continue;

						++changed_parts;

						transaction->commit();
					}
				}
M
Merge  
Michael Kolupaev 已提交
1260 1261 1262

				zookeeper->set(replica_path + "/columns", columns.toString());

M
Merge  
Michael Kolupaev 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282
				if (changed || changed_parts != 0)
					LOG_INFO(log, "ALTER-ed " << changed_parts << " parts");
				force_recheck_parts = false;
			}

			alter_thread_event->wait();
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);

			force_recheck_parts = true;

			alter_thread_event->tryWait(ERROR_SLEEP_MS);
		}
	}

	LOG_DEBUG(log, "alter thread finished");
}

M
Merge  
Michael Kolupaev 已提交
1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{
	String part_path = replica_path + "/parts/" + part_name;

	LogEntry log_entry;
	log_entry.type = LogEntry::GET_PART;
	log_entry.source_replica = "";
	log_entry.new_part_name = part_name;

	zkutil::Ops ops;
	ops.push_back(new zkutil::Op::Create(
		replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(),
		zkutil::CreateMode::PersistentSequential));
	ops.push_back(new zkutil::Op::Remove(part_path + "/checksums", -1));
	ops.push_back(new zkutil::Op::Remove(part_path + "/columns", -1));
	ops.push_back(new zkutil::Op::Remove(part_path, -1));
	auto results = zookeeper->multi(ops);

	{
		Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

		String path_created = dynamic_cast<zkutil::Op::Create &>(ops[0]).getPathCreated();
		log_entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
		log_entry.addResultToVirtualParts(*this);
		queue.push_back(log_entry);
	}
}

void StorageReplicatedMergeTree::enqueuePartForCheck(const String & name)
{
	Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);

	if (parts_to_check_set.count(name))
		return;
	parts_to_check_queue.push_back(name);
	parts_to_check_set.insert(name);
	parts_to_check_event.set();
}

M
Merge  
Michael Kolupaev 已提交
1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
void StorageReplicatedMergeTree::partCheckThread()
{
	while (!shutdown_called)
	{
		try
		{
			/// Достанем из очереди кусок для проверки.
			String part_name;
			{
				Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);
				if (parts_to_check_queue.empty())
				{
					if (!parts_to_check_set.empty())
					{
						LOG_ERROR(log, "Non-empty parts_to_check_set with empty parts_to_check_queue. This is a bug.");
						parts_to_check_set.clear();
					}
				}
				else
				{
					part_name = parts_to_check_queue.front();
				}
			}
			if (part_name.empty())
			{
				parts_to_check_event.wait();
				continue;
			}

			LOG_WARNING(log, "Checking part " << part_name);
M
Merge  
Michael Kolupaev 已提交
1352
			ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
M
Merge  
Michael Kolupaev 已提交
1353

M
Merge  
Michael Kolupaev 已提交
1354
			auto part = data.getActiveContainingPart(part_name);
M
Merge  
Michael Kolupaev 已提交
1355 1356 1357 1358 1359 1360 1361 1362 1363 1364
			String part_path = replica_path + "/parts/" + part_name;

			/// Этого или покрывающего куска у нас нет.
			if (!part)
			{
				/// Если кусок есть в ZooKeeper, удалим его оттуда и добавим в очередь задание скачать его.
				if (zookeeper->exists(part_path))
				{
					LOG_WARNING(log, "Part " << part_name << " exists in ZooKeeper but not locally. "
						"Removing from ZooKeeper and queueing a fetch.");
M
Merge  
Michael Kolupaev 已提交
1365
					ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
M
Merge  
Michael Kolupaev 已提交
1366

M
Merge  
Michael Kolupaev 已提交
1367
					removePartAndEnqueueFetch(part_name);
M
Merge  
Michael Kolupaev 已提交
1368 1369 1370 1371
				}
				/// Если куска нет в ZooKeeper, проверим есть ли он хоть у кого-то.
				else
				{
M
Merge  
Michael Kolupaev 已提交
1372 1373
					ActiveDataPartSet::Part part_info;
					ActiveDataPartSet::parsePartName(part_name, part_info);
M
Merge  
Michael Kolupaev 已提交
1374

M
Merge  
Michael Kolupaev 已提交
1375 1376 1377 1378
					/** Будем проверять только куски, не полученные в результате слияния.
					  * Для кусков, полученных в результате слияния такая проверка была бы некорректной,
					  *  потому что слитого куска может еще ни у кого не быть.
					  */
M
Merge  
Michael Kolupaev 已提交
1379 1380 1381 1382 1383
					if (part_info.left != part_info.right)
					{
						LOG_WARNING(log, "Not checking if part " << part_name << " is lost because it is a result of a merge.");
					}
					else
M
Merge  
Michael Kolupaev 已提交
1384
					{
M
Merge  
Michael Kolupaev 已提交
1385 1386 1387 1388 1389
						LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");

						bool found = false;
						Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
						for (const String & replica : replicas)
M
Merge  
Michael Kolupaev 已提交
1390
						{
M
Merge  
Michael Kolupaev 已提交
1391 1392
							Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
							for (const String & part_on_replica : parts)
M
Merge  
Michael Kolupaev 已提交
1393
							{
M
Merge  
Michael Kolupaev 已提交
1394 1395 1396 1397 1398 1399
								if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
								{
									found = true;
									LOG_WARNING(log, "Found part " << part_on_replica << " on " << replica);
									break;
								}
M
Merge  
Michael Kolupaev 已提交
1400
							}
M
Merge  
Michael Kolupaev 已提交
1401 1402
							if (found)
								break;
M
Merge  
Michael Kolupaev 已提交
1403 1404
						}

M
Merge  
Michael Kolupaev 已提交
1405
						if (!found)
M
Merge  
Michael Kolupaev 已提交
1406
						{
M
Merge  
Michael Kolupaev 已提交
1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419
							/** Такая ситуация возможна при нормальной работе, без потери данных, например, так:
							* ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK,
							*  получил operation timeout, удалил локальный кусок и бросил исключение,
							*  а на самом деле, кусок добавился в ZK.
							*/
							LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. "
								<< "There might or might not be a data loss.");
							ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);

							/** Если ни у кого нет такого куска, удалим его из нашей очереди.
							  *
							  * Еще можно было бы добавить его в block_numbers, чтобы он не мешал слияниям,
							  *  но если так сделать, ZooKeeper почему-то пропустит один номер для автоинкремента,
M
Merge  
Michael Kolupaev 已提交
1420 1421
							  *  и в номерах блоков все равно останется дырка.
							  * TODO: можно это исправить, сделав две директории block_numbers: для автоинкрементных и ручных нод.
M
Merge  
Michael Kolupaev 已提交
1422
							  */
M
Merge  
Michael Kolupaev 已提交
1423

M
Merge  
Michael Kolupaev 已提交
1424
							{
M
Merge  
Michael Kolupaev 已提交
1425 1426 1427 1428 1429 1430 1431
								Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);

								/** NOTE: Не удалятся записи в очереди, которые сейчас выполняются.
								* Они пофейлятся и положат кусок снова в очередь на проверку.
								* Расчитываем, что это редкая ситуация.
								*/
								for (LogEntries::iterator it = queue.begin(); it != queue.end(); )
M
Merge  
Michael Kolupaev 已提交
1432
								{
M
Merge  
Michael Kolupaev 已提交
1433 1434 1435 1436 1437 1438 1439 1440 1441
									if (it->new_part_name == part_name)
									{
										zookeeper->remove(replica_path + "/queue/" + it->znode_name);
										queue.erase(it++);
									}
									else
									{
										++it;
									}
M
Merge  
Michael Kolupaev 已提交
1442 1443 1444 1445
								}
							}
						}
					}
M
Merge  
Michael Kolupaev 已提交
1446 1447 1448 1449 1450 1451 1452 1453
				}
			}
			/// У нас есть этот кусок, и он активен.
			else if (part->name == part_name)
			{
				/// Если кусок есть в ZooKeeper, сверим его данные с его чексуммами, а их с ZooKeeper.
				if (zookeeper->exists(replica_path + "/parts/" + part_name))
				{
M
Merge  
Michael Kolupaev 已提交
1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
					LOG_WARNING(log, "Checking data of part " << part_name << ".");

					try
					{
						auto zk_checksums = MergeTreeData::DataPart::Checksums::parse(
							zookeeper->get(replica_path + "/parts/" + part_name + "/checksums"));
						zk_checksums.checkEqual(part->checksums, true);

						auto zk_columns = NamesAndTypesList::parse(
							zookeeper->get(replica_path + "/parts/" + part_name + "/columns"), context.getDataTypeFactory());
						if (part->columns != zk_columns)
							throw Exception("Columns of local part " + part_name + " are different from ZooKeeper");

						MergeTreePartChecker::checkDataPart(
							data.getFullPath() + part_name, data.index_granularity, true, context.getDataTypeFactory());
M
Merge  
Michael Kolupaev 已提交
1469

M
Merge  
Michael Kolupaev 已提交
1470 1471 1472 1473 1474 1475
						LOG_INFO(log, "Part " << part_name << " looks good.");
					}
					catch (...)
					{
						tryLogCurrentException(__PRETTY_FUNCTION__);

M
Merge  
Michael Kolupaev 已提交
1476 1477
						LOG_ERROR(log, "Part " << part_name << " looks broken. Removing it and queueing a fetch.");
						ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
M
Merge  
Michael Kolupaev 已提交
1478 1479 1480 1481

						removePartAndEnqueueFetch(part_name);

						/// Удалим кусок локально.
M
Merge  
Michael Kolupaev 已提交
1482
						data.renameAndDetachPart(part, "broken_");
M
Merge  
Michael Kolupaev 已提交
1483
					}
M
Merge  
Michael Kolupaev 已提交
1484 1485 1486 1487
				}
				/// Если куска нет в ZooKeeper, удалим его локально.
				else
				{
M
Merge  
Michael Kolupaev 已提交
1488 1489
					ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);

M
Merge  
Michael Kolupaev 已提交
1490 1491
					LOG_ERROR(log, "Unexpected part " << part_name << ". Removing.");
					data.renameAndDetachPart(part, "unexpected_");
M
Merge  
Michael Kolupaev 已提交
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
				}
			}
			else
			{
				/// Если у нас есть покрывающий кусок, игнорируем все проблемы с этим куском.
				/// В худшем случае в лог еще old_parts_lifetime секунд будут валиться ошибки, пока кусок не удалится как старый.
			}

			/// Удалим кусок из очереди.
			{
				Poco::ScopedLock<Poco::FastMutex> lock(parts_to_check_mutex);
				if (parts_to_check_queue.empty() || parts_to_check_queue.front() != part_name)
				{
					LOG_ERROR(log, "Someone changed parts_to_check_queue.front(). This is a bug.");
				}
				else
				{
					parts_to_check_queue.pop_front();
					parts_to_check_set.erase(part_name);
				}
			}
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
			parts_to_check_event.tryWait(ERROR_SLEEP_MS);
		}
	}
}


M
Merge  
Michael Kolupaev 已提交
1523 1524
bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
M
Merge  
Michael Kolupaev 已提交
1525 1526 1527
	/// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать.
	if (virtual_parts.getContainingPart(left->name) != left->name ||
		virtual_parts.getContainingPart(right->name) != right->name)
M
Merge  
Michael Kolupaev 已提交
1528 1529
		return false;

M
Merge  
Michael Kolupaev 已提交
1530 1531 1532 1533 1534
	/// Если о каком-то из кусков нет информации в ZK, не будем сливать.
	if (!zookeeper->exists(replica_path + "/parts/" + left->name) ||
		!zookeeper->exists(replica_path + "/parts/" + right->name))
		return false;

M
Merge  
Michael Kolupaev 已提交
1535 1536
	String month_name = left->name.substr(0, 6);

M
Merge  
Michael Kolupaev 已提交
1537 1538 1539
	/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
	for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
	{
M
Merge  
Michael Kolupaev 已提交
1540
		String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
M
Merge  
Michael Kolupaev 已提交
1541

M
Merge  
Michael Kolupaev 已提交
1542
		if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
M
Merge  
Michael Kolupaev 已提交
1543 1544 1545 1546 1547 1548 1549 1550
			return false;
	}

	return true;
}

void StorageReplicatedMergeTree::becomeLeader()
{
M
Merge  
Michael Kolupaev 已提交
1551
	LOG_INFO(log, "Became leader");
M
Merge  
Michael Kolupaev 已提交
1552 1553 1554 1555
	is_leader_node = true;
	merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
}

M
Merge  
Michael Kolupaev 已提交
1556
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
M
Merge  
Michael Kolupaev 已提交
1557
{
M
Merge  
Michael Kolupaev 已提交
1558
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
1559 1560 1561 1562 1563 1564

	/// Из реплик, у которых есть кусок, выберем одну равновероятно.
	std::random_shuffle(replicas.begin(), replicas.end());

	for (const String & replica : replicas)
	{
M
Merge  
Michael Kolupaev 已提交
1565 1566
		if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
			(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
M
Merge  
Michael Kolupaev 已提交
1567 1568 1569
			return replica;
	}

M
Merge  
Michael Kolupaev 已提交
1570
	return "";
M
Merge  
Michael Kolupaev 已提交
1571 1572 1573 1574
}

void StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_name)
{
M
Merge  
Michael Kolupaev 已提交
1575 1576
	LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_name);

M
Merge  
Michael Kolupaev 已提交
1577 1578 1579 1580 1581
	auto table_lock = lockStructure(true);

	String host;
	int port;

M
Merge  
Michael Kolupaev 已提交
1582
	String host_port_str = zookeeper->get(zookeeper_path + "/replicas/" + replica_name + "/host");
M
Merge  
Michael Kolupaev 已提交
1583 1584 1585 1586 1587 1588 1589 1590
	ReadBufferFromString buf(host_port_str);
	assertString("host: ", buf);
	readString(host, buf);
	assertString("\nport: ", buf);
	readText(port, buf);
	assertString("\n", buf);
	assertEOF(buf);

M
Merge  
Michael Kolupaev 已提交
1591
	MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
M
Merge  
Michael Kolupaev 已提交
1592 1593 1594

	MergeTreeData::Transaction transaction;
	auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
M
Merge  
Michael Kolupaev 已提交
1595

M
Merge  
Michael Kolupaev 已提交
1596
	zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
1597
	checkPartAndAddToZooKeeper(part, ops);
M
Michael Kolupaev 已提交
1598

M
Merge  
Michael Kolupaev 已提交
1599
	zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
1600
	transaction.commit();
M
Merge  
Michael Kolupaev 已提交
1601
	merge_selecting_event.set();
M
Merge  
Michael Kolupaev 已提交
1602

M
Michael Kolupaev 已提交
1603 1604 1605 1606 1607 1608
	for (const auto & removed_part : removed_parts)
	{
		LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
		ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
	}

M
Merge  
Michael Kolupaev 已提交
1609 1610
	ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);

M
Merge  
Michael Kolupaev 已提交
1611
	LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_name);
M
Merge  
Michael Kolupaev 已提交
1612
}
M
Merge  
Michael Kolupaev 已提交
1613

M
Merge  
Michael Kolupaev 已提交
1614 1615
void StorageReplicatedMergeTree::shutdown()
{
M
Merge  
Michael Kolupaev 已提交
1616
	if (permanent_shutdown_called)
M
Merge  
Michael Kolupaev 已提交
1617 1618 1619
	{
		if (restarting_thread.joinable())
			restarting_thread.join();
M
Merge  
Michael Kolupaev 已提交
1620
		return;
M
Merge  
Michael Kolupaev 已提交
1621
	}
M
Merge  
Michael Kolupaev 已提交
1622

M
Merge  
Michael Kolupaev 已提交
1623
	permanent_shutdown_called = true;
M
Merge  
Michael Kolupaev 已提交
1624
	permanent_shutdown_event.set();
M
Merge  
Michael Kolupaev 已提交
1625
	restarting_thread.join();
M
Merge  
Michael Kolupaev 已提交
1626 1627

	endpoint_holder = nullptr;
M
Merge  
Michael Kolupaev 已提交
1628 1629 1630 1631
}

void StorageReplicatedMergeTree::partialShutdown()
{
M
Merge  
Michael Kolupaev 已提交
1632
	leader_election = nullptr;
M
Merge  
Michael Kolupaev 已提交
1633
	shutdown_called = true;
M
Merge  
Michael Kolupaev 已提交
1634
	shutdown_event.set();
M
Merge  
Michael Kolupaev 已提交
1635 1636 1637
	merge_selecting_event.set();
	queue_updating_event->set();
	alter_thread_event->set();
M
Merge  
Michael Kolupaev 已提交
1638
	alter_query_event->set();
M
Merge  
Michael Kolupaev 已提交
1639
	parts_to_check_event.set();
M
Merge  
Michael Kolupaev 已提交
1640 1641
	replica_is_active_node = nullptr;

M
Merge  
Michael Kolupaev 已提交
1642 1643 1644 1645
	merger.cancelAll();
	if (unreplicated_merger)
		unreplicated_merger->cancelAll();

M
Merge  
Michael Kolupaev 已提交
1646
	LOG_TRACE(log, "Waiting for threads to finish");
M
Merge  
Michael Kolupaev 已提交
1647
	if (is_leader_node)
M
Merge  
Michael Kolupaev 已提交
1648
	{
M
Merge  
Michael Kolupaev 已提交
1649
		is_leader_node = false;
M
Merge  
Michael Kolupaev 已提交
1650 1651
		if (merge_selecting_thread.joinable())
			merge_selecting_thread.join();
M
Merge  
Michael Kolupaev 已提交
1652
	}
M
Merge  
Michael Kolupaev 已提交
1653 1654
	if (queue_updating_thread.joinable())
		queue_updating_thread.join();
M
Merge  
Michael Kolupaev 已提交
1655 1656 1657 1658
	if (cleanup_thread.joinable())
		cleanup_thread.join();
	if (alter_thread.joinable())
		alter_thread.join();
M
Merge  
Michael Kolupaev 已提交
1659 1660
	if (part_check_thread.joinable())
		part_check_thread.join();
M
Merge  
Michael Kolupaev 已提交
1661 1662
	if (queue_task_handle)
		context.getBackgroundPool().removeTask(queue_task_handle);
M
Merge  
Michael Kolupaev 已提交
1663
	queue_task_handle.reset();
M
Merge  
Michael Kolupaev 已提交
1664
	LOG_TRACE(log, "Threads finished");
M
Merge  
Michael Kolupaev 已提交
1665 1666
}

M
Merge  
Michael Kolupaev 已提交
1667 1668 1669 1670 1671 1672
void StorageReplicatedMergeTree::goReadOnly()
{
	LOG_INFO(log, "Going to read-only mode");

	is_read_only = true;
	permanent_shutdown_called = true;
M
Merge  
Michael Kolupaev 已提交
1673
	permanent_shutdown_event.set();
M
Merge  
Michael Kolupaev 已提交
1674

M
Merge  
Michael Kolupaev 已提交
1675
	partialShutdown();
M
Merge  
Michael Kolupaev 已提交
1676 1677
}

M
Merge  
Michael Kolupaev 已提交
1678 1679 1680
void StorageReplicatedMergeTree::startup()
{
	shutdown_called = false;
M
Merge  
Michael Kolupaev 已提交
1681
	shutdown_event.reset();
M
Merge  
Michael Kolupaev 已提交
1682

M
Merge  
Michael Kolupaev 已提交
1683 1684 1685 1686
	merger.uncancelAll();
	if (unreplicated_merger)
		unreplicated_merger->uncancelAll();

M
Merge  
Michael Kolupaev 已提交
1687 1688
	activateReplica();

M
Merge  
Michael Kolupaev 已提交
1689
	leader_election = new zkutil::LeaderElection(zookeeper_path + "/leader_election", *zookeeper,
M
Merge  
Michael Kolupaev 已提交
1690 1691 1692
		std::bind(&StorageReplicatedMergeTree::becomeLeader, this), replica_name);

	queue_updating_thread = std::thread(&StorageReplicatedMergeTree::queueUpdatingThread, this);
M
Merge  
Michael Kolupaev 已提交
1693
	cleanup_thread = std::thread(&StorageReplicatedMergeTree::cleanupThread, this);
M
Merge  
Michael Kolupaev 已提交
1694
	alter_thread = std::thread(&StorageReplicatedMergeTree::alterThread, this);
M
Merge  
Michael Kolupaev 已提交
1695
	part_check_thread = std::thread(&StorageReplicatedMergeTree::partCheckThread, this);
M
Merge  
Michael Kolupaev 已提交
1696 1697
	queue_task_handle = context.getBackgroundPool().addTask(
		std::bind(&StorageReplicatedMergeTree::queueTask, this, std::placeholders::_1));
M
Merge  
Michael Kolupaev 已提交
1698 1699
}

M
Merge  
Michael Kolupaev 已提交
1700 1701
void StorageReplicatedMergeTree::restartingThread()
{
M
Merge  
Michael Kolupaev 已提交
1702
	try
M
Merge  
Michael Kolupaev 已提交
1703
	{
M
Merge  
Michael Kolupaev 已提交
1704 1705 1706
		startup();

		while (!permanent_shutdown_called)
M
Merge  
Michael Kolupaev 已提交
1707
		{
M
Merge  
Michael Kolupaev 已提交
1708 1709 1710
			if (zookeeper->expired())
			{
				LOG_WARNING(log, "ZooKeeper session has expired. Switching to a new session.");
M
Merge  
Michael Kolupaev 已提交
1711

M
Merge  
Michael Kolupaev 已提交
1712
				/// Запретим писать в таблицу, пока подменяем zookeeper.
M
Merge  
Michael Kolupaev 已提交
1713
				LOG_TRACE(log, "Locking INSERTs");
M
Merge  
Michael Kolupaev 已提交
1714
				auto structure_lock = lockDataForAlter();
M
Merge  
Michael Kolupaev 已提交
1715
				LOG_TRACE(log, "Locked INSERTs");
M
Merge  
Michael Kolupaev 已提交
1716

M
Merge  
Michael Kolupaev 已提交
1717
				partialShutdown();
M
Merge  
Michael Kolupaev 已提交
1718

M
Merge  
Michael Kolupaev 已提交
1719 1720 1721 1722
				zookeeper = context.getZooKeeper();

				startup();
			}
M
Merge  
Michael Kolupaev 已提交
1723

M
Merge  
Michael Kolupaev 已提交
1724
			permanent_shutdown_event.tryWait(60 * 1000);
M
Merge  
Michael Kolupaev 已提交
1725 1726 1727 1728 1729
		}
	}
	catch (...)
	{
		tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
M
Merge  
Michael Kolupaev 已提交
1730 1731
		LOG_ERROR(log, "Exception in restartingThread. The storage will be read-only until server restart.");
		goReadOnly();
M
Merge  
Michael Kolupaev 已提交
1732
		LOG_DEBUG(log, "restarting thread finished");
M
Merge  
Michael Kolupaev 已提交
1733
		return;
M
Merge  
Michael Kolupaev 已提交
1734 1735
	}

M
Merge  
Michael Kolupaev 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744
	try
	{
		endpoint_holder = nullptr;
		partialShutdown();
	}
	catch (...)
	{
		tryLogCurrentException("StorageReplicatedMergeTree::restartingThread");
	}
M
Merge  
Michael Kolupaev 已提交
1745 1746

	LOG_DEBUG(log, "restarting thread finished");
M
Merge  
Michael Kolupaev 已提交
1747 1748
}

M
Merge  
Michael Kolupaev 已提交
1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
	try
	{
		shutdown();
	}
	catch(...)
	{
		tryLogCurrentException("~StorageReplicatedMergeTree");
	}
}

BlockInputStreams StorageReplicatedMergeTree::read(
		const Names & column_names,
		ASTPtr query,
		const Settings & settings,
		QueryProcessingStage::Enum & processed_stage,
		size_t max_block_size,
		unsigned threads)
{
M
Merge  
Michael Kolupaev 已提交
1769 1770 1771 1772 1773 1774 1775 1776 1777
	BlockInputStreams res = reader.read(column_names, query, settings, processed_stage, max_block_size, threads);

	if (unreplicated_reader)
	{
		BlockInputStreams res2 = unreplicated_reader->read(column_names, query, settings, processed_stage, max_block_size, threads);
		res.insert(res.begin(), res2.begin(), res2.end());
	}

	return res;
M
Merge  
Michael Kolupaev 已提交
1778 1779
}

M
Merge  
Michael Kolupaev 已提交
1780 1781
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
M
Merge  
Michael Kolupaev 已提交
1782 1783 1784
	if (is_read_only)
		throw Exception("Table is in read only mode", ErrorCodes::TABLE_IS_READ_ONLY);

M
Merge  
Michael Kolupaev 已提交
1785
	String insert_id;
1786
	if (ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*query))
M
Merge  
Michael Kolupaev 已提交
1787 1788 1789 1790
		insert_id = insert->insert_id;

	return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
M
Merge  
Michael Kolupaev 已提交
1791

M
Merge  
Michael Kolupaev 已提交
1792 1793
bool StorageReplicatedMergeTree::optimize()
{
1794 1795 1796
	/// Померджим какие-нибудь куски из директории unreplicated.
	/// TODO: Мерджить реплицируемые куски тоже.
	/// TODO: Не давать вызывать это из нескольких потоков сразу: один кусок может принять участие в нескольких несовместимых слияниях.
M
Merge  
Michael Kolupaev 已提交
1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812

	if (!unreplicated_data)
		return false;

	unreplicated_data->clearOldParts();

	MergeTreeData::DataPartsVector parts;
	String merged_name;
	auto always_can_merge = [](const MergeTreeData::DataPartPtr &a, const MergeTreeData::DataPartPtr &b) { return true; };
	if (!unreplicated_merger->selectPartsToMerge(parts, merged_name, 0, true, true, false, always_can_merge))
		return false;

	unreplicated_merger->mergeParts(parts, merged_name);
	return true;
}

M
Merge  
Michael Kolupaev 已提交
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881
void StorageReplicatedMergeTree::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
	LOG_DEBUG(log, "Doing ALTER");

	NamesAndTypesList new_columns;
	String new_columns_str;
	int new_columns_version;
	zkutil::Stat stat;

	{
		auto table_lock = lockStructureForAlter();

		data.checkAlter(params);

		new_columns = data.getColumnsList();
		params.apply(new_columns);

		new_columns_str = new_columns.toString();

		/// Делаем ALTER.
		zookeeper->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);

		new_columns_version = stat.version;
	}

	LOG_DEBUG(log, "Updated columns in ZooKeeper. Waiting for replicas to apply changes.");

	/// Ждем, пока все реплики обновят данные.

	/// Подпишемся на изменения столбцов, чтобы перестать ждать, если кто-то еще сделает ALTER.
	if (!zookeeper->exists(zookeeper_path + "/columns", &stat, alter_query_event))
		throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
	if (stat.version != new_columns_version)
	{
		LOG_WARNING(log, zookeeper_path + "/columns changed before this ALTER finished; "
			"overlapping ALTER-s are fine but use caution with nontransitive changes");
		return;
	}

	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
	for (const String & replica : replicas)
	{
		LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");

		while (!shutdown_called)
		{
			String replica_columns_str;

			/// Реплику могли успеть удалить.
			if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
			{
				LOG_WARNING(log, replica << " was removed");
				break;
			}

			int replica_columns_version = stat.version;

			if (replica_columns_str == new_columns_str)
				break;

			if (!zookeeper->exists(zookeeper_path + "/columns", &stat))
				throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
			if (stat.version != new_columns_version)
			{
				LOG_WARNING(log, zookeeper_path + "/columns changed before ALTER finished; "
					"overlapping ALTER-s are fine but use caution with nontransitive changes");
				return;
			}

M
Merge  
Michael Kolupaev 已提交
1882
			if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
M
Merge  
Michael Kolupaev 已提交
1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900
			{
				LOG_WARNING(log, replica << " was removed");
				break;
			}

			if (stat.version != replica_columns_version)
				continue;

			alter_query_event->wait();
		}

		if (shutdown_called)
			break;
	}

	LOG_DEBUG(log, "ALTER finished");
}

M
Merge  
Michael Kolupaev 已提交
1901 1902
void StorageReplicatedMergeTree::drop()
{
M
Merge  
Michael Kolupaev 已提交
1903 1904 1905
	if (!zookeeper)
		throw Exception("Can't drop replicated table without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);

M
Merge  
Michael Kolupaev 已提交
1906 1907
	shutdown();

M
Merge  
Michael Kolupaev 已提交
1908
	LOG_INFO(log, "Removing replica " << replica_path);
M
Merge  
Michael Kolupaev 已提交
1909
	replica_is_active_node = nullptr;
M
Merge  
Michael Kolupaev 已提交
1910 1911
	zookeeper->tryRemoveRecursive(replica_path);

M
Merge  
Michael Kolupaev 已提交
1912
	/// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки.
M
Merge  
Michael Kolupaev 已提交
1913 1914
	Strings replicas;
	if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty())
M
Merge  
Michael Kolupaev 已提交
1915 1916
	{
		LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
M
Merge  
Michael Kolupaev 已提交
1917
		zookeeper->tryRemoveRecursive(zookeeper_path);
M
Merge  
Michael Kolupaev 已提交
1918
	}
M
Merge  
Michael Kolupaev 已提交
1919 1920

	data.dropAllData();
M
Merge  
Michael Kolupaev 已提交
1921 1922
}

M
Merge  
Michael Kolupaev 已提交
1923 1924 1925
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
	writeString("format version: 1\n", out);
M
Merge  
Michael Kolupaev 已提交
1926 1927 1928
	writeString("source replica: ", out);
	writeString(source_replica, out);
	writeString("\n", out);
M
Merge  
Michael Kolupaev 已提交
1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953
	switch (type)
	{
		case GET_PART:
			writeString("get\n", out);
			writeString(new_part_name, out);
			break;
		case MERGE_PARTS:
			writeString("merge\n", out);
			for (const String & s : parts_to_merge)
			{
				writeString(s, out);
				writeString("\n", out);
			}
			writeString("into\n", out);
			writeString(new_part_name, out);
			break;
	}
	writeString("\n", out);
}

void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
	String type_str;

	assertString("format version: 1\n", in);
M
Michael Kolupaev 已提交
1954
	assertString("source replica: ", in);
M
Merge  
Michael Kolupaev 已提交
1955
	readString(source_replica, in);
M
Merge  
Michael Kolupaev 已提交
1956
	assertString("\n", in);
M
Michael Kolupaev 已提交
1957 1958
	readString(type_str, in);
	assertString("\n", in);
M
Merge  
Michael Kolupaev 已提交
1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981

	if (type_str == "get")
	{
		type = GET_PART;
		readString(new_part_name, in);
	}
	else if (type_str == "merge")
	{
		type = MERGE_PARTS;
		while (true)
		{
			String s;
			readString(s, in);
			assertString("\n", in);
			if (s == "into")
				break;
			parts_to_merge.push_back(s);
		}
		readString(new_part_name, in);
	}
	assertString("\n", in);
}

M
Merge  
Michael Kolupaev 已提交
1982
}