StorageReplicatedMergeTree.cpp 144.2 KB
Newer Older
1 2 3
#include <zkutil/Types.h>
#include <zkutil/KeeperException.h>

4
#include <DB/Core/FieldVisitors.h>
5

6
#include <DB/Storages/ColumnsDescription.h>
M
Merge  
Michael Kolupaev 已提交
7
#include <DB/Storages/StorageReplicatedMergeTree.h>
M
Merge  
Michael Kolupaev 已提交
8
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
9
#include <DB/Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
10
#include <DB/Storages/MergeTree/MergeList.h>
A
Merge  
Andrey Mironov 已提交
11
#include <DB/Storages/MergeTree/MergeTreeWhereOptimizer.h>
12
#include <DB/Storages/MergeTree/ReplicatedMergeTreeAddress.h>
A
Merge  
Alexey Milovidov 已提交
13
#include <DB/Storages/MergeTree/ReshardingWorker.h>
14

15 16
#include <DB/Databases/IDatabase.h>

M
Merge  
Michael Kolupaev 已提交
17
#include <DB/Parsers/formatAST.h>
18
#include <DB/Parsers/ASTInsertQuery.h>
19
#include <DB/Parsers/ASTSelectQuery.h>
A
Alexey Milovidov 已提交
20
#include <DB/Parsers/queryToString.h>
21

M
Merge  
Michael Kolupaev 已提交
22
#include <DB/IO/ReadBufferFromString.h>
23
#include <DB/IO/Operators.h>
24

M
Merge  
Michael Kolupaev 已提交
25
#include <DB/Interpreters/InterpreterAlterQuery.h>
26
#include <DB/Interpreters/PartLog.h>
27

28
#include <DB/DataStreams/AddingConstColumnBlockInputStream.h>
29 30 31
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/DataStreams/NullBlockOutputStream.h>
#include <DB/DataStreams/copyData.h>
32

33
#include <DB/Common/Macros.h>
34
#include <DB/Common/VirtualColumnUtils.h>
A
Merge  
Alexey Milovidov 已提交
35
#include <DB/Common/formatReadable.h>
36
#include <DB/Common/setThreadName.h>
37
#include <DB/Common/escapeForFileName.h>
38
#include <DB/Common/StringUtils.h>
39

40
#include <Poco/DirectoryIterator.h>
M
Merge  
Michael Kolupaev 已提交
41

42
#include <DB/Common/ThreadPool.h>
A
Merge  
Alexey Milovidov 已提交
43 44

#include <ext/range.hpp>
A
Alexey Milovidov 已提交
45 46
#include <ext/scope_guard.hpp>

A
Merge  
Alexey Milovidov 已提交
47 48 49 50 51 52
#include <cfenv>
#include <ctime>
#include <thread>
#include <future>


53 54 55 56 57 58 59 60
namespace ProfileEvents
{
	extern const Event ReplicatedPartMerges;
	extern const Event ReplicatedPartFailedFetches;
	extern const Event ReplicatedPartFetchesOfMerged;
	extern const Event ObsoleteReplicatedParts;
	extern const Event ReplicatedPartFetches;
}
61

M
Merge  
Michael Kolupaev 已提交
62 63 64
namespace DB
{

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
namespace ErrorCodes
{
	extern const int NO_ZOOKEEPER;
	extern const int INCORRECT_DATA;
	extern const int INCOMPATIBLE_COLUMNS;
	extern const int REPLICA_IS_ALREADY_EXIST;
	extern const int NO_SUCH_REPLICA;
	extern const int NO_REPLICA_HAS_PART;
	extern const int LOGICAL_ERROR;
	extern const int TOO_MANY_UNEXPECTED_DATA_PARTS;
	extern const int ABORTED;
	extern const int REPLICA_IS_NOT_IN_QUORUM;
	extern const int TABLE_IS_READ_ONLY;
	extern const int NOT_FOUND_NODE;
	extern const int NO_ACTIVE_REPLICAS;
	extern const int LEADERSHIP_CHANGED;
	extern const int TABLE_IS_READ_ONLY;
	extern const int TABLE_WAS_NOT_DROPPED;
	extern const int PARTITION_ALREADY_EXISTS;
	extern const int TOO_MUCH_RETRIES_TO_FETCH_PARTS;
	extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
	extern const int PARTITION_DOESNT_EXIST;
A
Merge  
Alexey Milovidov 已提交
87 88 89 90 91 92
	extern const int INCONSISTENT_TABLE_ACCROSS_SHARDS;
	extern const int INSUFFICIENT_SPACE_FOR_RESHARDING;
	extern const int RESHARDING_NO_WORKER;
	extern const int INVALID_PARTITIONS_INTERVAL;
	extern const int RESHARDING_INVALID_PARAMETERS;
	extern const int INVALID_SHARD_WEIGHT;
A
Merge  
Alexey Arno 已提交
93
	extern const int DUPLICATE_SHARD_PATHS;
94
	extern const int RESHARDING_COORDINATOR_DELETED;
A
Merge  
Alexey Arno 已提交
95 96 97 98
	extern const int RESHARDING_NO_SUCH_COORDINATOR;
	extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP;
	extern const int RESHARDING_ALREADY_SUBSCRIBED;
	extern const int RESHARDING_INVALID_QUERY;
99
	extern const int RWLOCK_NO_SUCH_LOCK;
A
Merge  
Alexey Arno 已提交
100
	extern const int NO_SUCH_BARRIER;
101 102
	extern const int CHECKSUM_DOESNT_MATCH;
	extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
103
	extern const int UNFINISHED;
104
	extern const int METADATA_MISMATCH;
105
	extern const int RESHARDING_NULLABLE_SHARDING_KEY;
106 107
}

M
Merge  
Michael Kolupaev 已提交
108

A
Merge  
Alexey Milovidov 已提交
109 110
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS 	= 1 * 1000;
static const auto MERGE_SELECTING_SLEEP_MS		= 5 * 1000;
M
Merge  
Michael Kolupaev 已提交
111

112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
/** Добавляемым блокам данных присваиваются некоторые номера - целые числа.
  * Для добавляемых обычным способом (INSERT) блоков, номера выделяются по возрастанию.
  * Слияния делаются для диапазонов номеров блоков на числовой прямой:
  *  если в слиянии участвуют номера блоков x, z, и есть блок с номером y, что x < y < z, то блок с номером y тоже участвует в слиянии.
  * Это требуется для сохранения свойств некоторых операций, которые могут производиться при слиянии - например, в CollapsingMergeTree.
  * В частности, это позволяет во время слияния знать, что в одном куске все данные были добавлены раньше, чем все данные в другом куске.
  *
  * Изредка возникает необходимость добавить в таблицу какой-то заведомо старый кусок данных,
  *  чтобы он воспринимался как старый в логике работы CollapsingMergeTree.
  * Такой кусок данных можно добавить с помощью специального запроса ATTACH.
  * И в этом случае, мы должны выделить этому куску номера меньшие, чем номера всех остальных кусков.
  * В связи с этим, номера обычных кусков, добавляемых INSERT-ом, начинаются не с нуля, а с большего числа,
  *  а меньшие номера считаются "зарезервированными".
  *
  * Почему это число равно 200?
  * Дело в том, что раньше не поддерживались отрицательные номера блоков.
  * А также, слияние сделано так, что при увеличении количества кусков, вставка новых кусков специально замедляется,
  *  пока слияния не успеют уменьшить число кусков; и это было рассчитано примерно для 200 кусков.
  * А значит, что при вставке в таблицу всех кусков из другой таблицы, 200 номеров наверняка достаточно.
  * В свою очередь, это число выбрано почти наугад.
  */
133
extern const Int64 RESERVED_BLOCK_NUMBERS = 200;
M
Merge  
Michael Kolupaev 已提交
134

M
Merge  
Michael Kolupaev 已提交
135

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
/** Для каждого куска есть сразу три места, где он должен быть:
  * 1. В оперативке (RAM), MergeTreeData::data_parts, all_data_parts.
  * 2. В файловой системе (FS), директория с данными таблицы.
  * 3. В ZooKeeper (ZK).
  *
  * При добавлении куска, его надо добавить сразу в эти три места.
  * Это делается так:
  * - [FS] сначала записываем кусок во временную директорию на файловой системе;
  * - [FS] переименовываем временный кусок в результирующий на файловой системе;
  * - [RAM] сразу же после этого добавляем его в data_parts, и удаляем из data_parts покрываемые им куски;
  * - [RAM] также устанавливаем объект Transaction, который в случае исключения (в следующем пункте),
  *   откатит изменения в data_parts (из предыдущего пункта) назад;
  * - [ZK] затем отправляем транзакцию (multi) на добавление куска в ZooKeeper (и ещё некоторых действий);
  * - [FS, ZK] кстати, удаление покрываемых (старых) кусков из файловой системы, из ZooKeeper и из all_data_parts
  *   делается отложенно, через несколько минут.
  *
  * Здесь нет никакой атомарности.
  * Можно было бы добиться атомарности с помощью undo/redo логов и флага в DataPart, когда он полностью готов.
  * Но это было бы неудобно - пришлось бы писать undo/redo логи для каждого Part-а в ZK, а это увеличило бы и без того большое количество взаимодействий.
  *
  * Вместо этого, мы вынуждены работать в ситуации, когда в любой момент времени
  *  (из другого потока, или после рестарта сервера) может наблюдаться недоделанная до конца транзакция.
  *  (заметим - для этого кусок должен быть в RAM)
  * Из этих случаев наиболее частый - когда кусок уже есть в data_parts, но его ещё нет в ZooKeeper.
  * Этот случай надо отличить от случая, когда такая ситуация достигается вследствие какого-то повреждения состояния.
  *
  * Делаем это с помощью порога на время.
  * Если кусок достаточно молодой, то его отсутствие в ZooKeeper будем воспринимать оптимистично - как будто он просто не успел ещё туда добавиться
  *  - как будто транзакция ещё не выполнена, но скоро выполнится.
  * А если кусок старый, то его отсутствие в ZooKeeper будем воспринимать как недоделанную транзакцию, которую нужно откатить.
  *
  * PS. Возможно, было бы лучше добавить в DataPart флаг о том, что кусок вставлен в ZK.
  * Но здесь уже слишком легко запутаться с консистентностью этого флага.
  */
170
extern const int MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER = 5 * 60;
171 172


A
Merge  
Alexey Milovidov 已提交
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper)
{
	std::lock_guard<std::mutex> lock(current_zookeeper_mutex);
	current_zookeeper = zookeeper;
}

zkutil::ZooKeeperPtr StorageReplicatedMergeTree::tryGetZooKeeper()
{
	std::lock_guard<std::mutex> lock(current_zookeeper_mutex);
	return current_zookeeper;
}

zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper()
{
	auto res = tryGetZooKeeper();
	if (!res)
		throw Exception("Cannot get ZooKeeper", ErrorCodes::NO_ZOOKEEPER);
	return res;
}


M
Merge  
Michael Kolupaev 已提交
194 195 196
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
197
	bool attach,
M
Merge  
Michael Kolupaev 已提交
198 199
	const String & path_, const String & database_name_, const String & name_,
	NamesAndTypesListPtr columns_,
200
	const NamesAndTypesList & materialized_columns_,
201 202
	const NamesAndTypesList & alias_columns_,
	const ColumnDefaults & column_defaults_,
M
Merge  
Michael Kolupaev 已提交
203
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
204 205 206 207
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
A
Alexey Milovidov 已提交
208
	const MergeTreeData::MergingParams & merging_params_,
209
	bool has_force_restore_data_flag,
M
Merge  
Michael Kolupaev 已提交
210
	const MergeTreeSettings & settings_)
211
	: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
A
Merge  
Alexey Milovidov 已提交
212
	current_zookeeper(context.getZooKeeper()), database_name(database_name_),
M
Merge  
Michael Kolupaev 已提交
213 214 215
	table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
	zookeeper_path(context.getMacros().expand(zookeeper_path_)),
	replica_name(context.getMacros().expand(replica_name_)),
216 217
	data(database_name, table_name,
		full_path, columns_,
218 219
		materialized_columns_, alias_columns_, column_defaults_,
		context_, primary_expr_ast_, date_column_name_,
A
Merge  
Alexey Milovidov 已提交
220
		sampling_expression_, index_granularity_, merging_params_,
221
		settings_, database_name_ + "." + table_name, true, attach,
222
		[this] (const std::string & name) { enqueuePartForCheck(name); }),
223
	reader(data), writer(data, context), merger(data, context.getBackgroundPool()), fetcher(data), sharded_partition_uploader_client(*this),
224 225
	shutdown_event(false), part_check_thread(*this),
	log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
M
Merge  
Michael Kolupaev 已提交
226
{
227 228
	if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
		zookeeper_path.resize(zookeeper_path.size() - 1);
M
Merge  
Michael Kolupaev 已提交
229 230 231 232
	replica_path = zookeeper_path + "/replicas/" + replica_name;

	bool skip_sanity_checks = false;

233
	try
M
Merge  
Michael Kolupaev 已提交
234
	{
A
Merge  
Alexey Milovidov 已提交
235
		if (current_zookeeper && current_zookeeper->exists(replica_path + "/flags/force_restore_data"))
236 237
		{
			skip_sanity_checks = true;
A
Merge  
Alexey Milovidov 已提交
238
			current_zookeeper->remove(replica_path + "/flags/force_restore_data");
M
Merge  
Michael Kolupaev 已提交
239

240 241 242
			LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag "
				<< replica_path << "/flags/force_restore_data).");
		}
243 244 245 246 247 248
		else if (has_force_restore_data_flag)
		{
			skip_sanity_checks = true;

			LOG_WARNING(log, "Skipping the limits on severity of changes to data parts and columns (flag force_restore_data).");
		}
249 250 251 252 253 254 255
	}
	catch (const zkutil::KeeperException & e)
	{
		/// Не удалось соединиться с ZK (об этом стало известно при попытке выполнить первую операцию).
		if (e.code == ZCONNECTIONLOSS)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
A
Merge  
Alexey Milovidov 已提交
256
			current_zookeeper = nullptr;
257 258 259
		}
		else
			throw;
M
Merge  
Michael Kolupaev 已提交
260 261 262 263
	}

	data.loadDataParts(skip_sanity_checks);

A
Merge  
Alexey Milovidov 已提交
264
	if (!current_zookeeper)
M
Merge  
Michael Kolupaev 已提交
265
	{
M
Merge  
Michael Kolupaev 已提交
266 267 268
		if (!attach)
			throw Exception("Can't create replicated table without ZooKeeper", ErrorCodes::NO_ZOOKEEPER);

269
		/// Не активируем реплику. Она будет в режиме readonly.
A
Merge  
Alexey Milovidov 已提交
270 271
		LOG_ERROR(log, "No ZooKeeper: table will be in readonly mode.");
		is_readonly = true;
M
Merge  
Michael Kolupaev 已提交
272 273 274
		return;
	}

M
Merge  
Michael Kolupaev 已提交
275 276
	if (!attach)
	{
A
Merge  
Alexey Milovidov 已提交
277 278 279
		if (!data.getDataParts().empty())
			throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);

M
Merge  
Michael Kolupaev 已提交
280
		createTableIfNotExists();
M
Merge  
Michael Kolupaev 已提交
281

M
Merge  
Michael Kolupaev 已提交
282
		checkTableStructure(false, false);
M
Merge  
Michael Kolupaev 已提交
283
		createReplica();
M
Merge  
Michael Kolupaev 已提交
284 285 286
	}
	else
	{
M
Merge  
Michael Kolupaev 已提交
287
		checkTableStructure(skip_sanity_checks, true);
M
Merge  
Michael Kolupaev 已提交
288
		checkParts(skip_sanity_checks);
M
Merge  
Michael Kolupaev 已提交
289
	}
M
Merge  
Michael Kolupaev 已提交
290

291 292
	createNewZooKeeperNodes();

M
Merge  
Michael Kolupaev 已提交
293 294 295
	String unreplicated_path = full_path + "unreplicated/";
	if (Poco::File(unreplicated_path).exists())
	{
296 297 298
		unreplicated_data = std::make_unique<MergeTreeData>(
			database_name, table_name,
			unreplicated_path, columns_,
299
			materialized_columns_, alias_columns_, column_defaults_,
300
			context_, primary_expr_ast_,
A
Merge  
Alexey Milovidov 已提交
301
			date_column_name_, sampling_expression_, index_granularity_, merging_params_, settings_,
302
			database_name_ + "." + table_name + "[unreplicated]", /* require_part_metadata = */ false, /* attach = */ true);
303 304 305

		unreplicated_data->loadDataParts(skip_sanity_checks);

306 307 308 309 310 311 312
		if (unreplicated_data->getDataPartsVector().empty())
		{
			unreplicated_data.reset();
		}
		else
		{
			LOG_INFO(log, "Have unreplicated data");
313
			unreplicated_reader = std::make_unique<MergeTreeDataSelectExecutor>(*unreplicated_data);
P
proller 已提交
314
			unreplicated_merger = std::make_unique<MergeTreeDataMerger>(*unreplicated_data, context.getBackgroundPool());
315
		}
M
Merge  
Michael Kolupaev 已提交
316
	}
M
Merge  
Michael Kolupaev 已提交
317

A
Alexey Milovidov 已提交
318 319 320 321
	queue.initialize(
		zookeeper_path, replica_path,
		database_name + "." + table_name + " (ReplicatedMergeTreeQueue)",
		data.getDataParts(), current_zookeeper);
M
Merge  
Michael Kolupaev 已提交
322

323 324
	queue.pullLogsToQueue(current_zookeeper, nullptr);

325
	/// В этом потоке реплика будет активирована.
326
	restarting_thread = std::make_unique<ReplicatedMergeTreeRestartingThread>(*this);
M
Merge  
Michael Kolupaev 已提交
327 328
}

329

330 331 332 333 334 335 336 337 338 339 340
void StorageReplicatedMergeTree::createNewZooKeeperNodes()
{
	auto zookeeper = getZooKeeper();

	/// Работа с кворумом.
	zookeeper->createIfNotExists(zookeeper_path + "/quorum", "");
	zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", "");
	zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", "");

	/// Отслеживание отставания реплик.
	zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", "");
341
	zookeeper->createIfNotExists(replica_path + "/max_processed_insert_time", "");
342 343 344
}


M
Merge  
Michael Kolupaev 已提交
345 346 347
StoragePtr StorageReplicatedMergeTree::create(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
348
	bool attach,
M
Merge  
Michael Kolupaev 已提交
349 350
	const String & path_, const String & database_name_, const String & name_,
	NamesAndTypesListPtr columns_,
351
	const NamesAndTypesList & materialized_columns_,
352 353
	const NamesAndTypesList & alias_columns_,
	const ColumnDefaults & column_defaults_,
M
Merge  
Michael Kolupaev 已提交
354
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
355 356 357 358
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
A
Alexey Milovidov 已提交
359
	const MergeTreeData::MergingParams & merging_params_,
360
	bool has_force_restore_data_flag_,
M
Merge  
Michael Kolupaev 已提交
361 362
	const MergeTreeSettings & settings_)
{
363
	auto res = make_shared(
364 365
		zookeeper_path_, replica_name_, attach,
		path_, database_name_, name_,
366
		columns_, materialized_columns_, alias_columns_, column_defaults_,
367
		context_, primary_expr_ast_, date_column_name_,
A
Merge  
Alexey Milovidov 已提交
368
		sampling_expression_, index_granularity_,
369 370
		merging_params_, has_force_restore_data_flag_, settings_);
	StoragePtr res_ptr = res;
371

A
Merge  
Alexey Milovidov 已提交
372 373
	auto get_endpoint_holder = [&res](InterserverIOEndpointPtr endpoint)
	{
374
		return std::make_shared<InterserverIOEndpointHolder>(
375 376 377
			endpoint->getId(res->replica_path),
			endpoint,
			res->context.getInterserverIOHandler());
A
Merge  
Alexey Milovidov 已提交
378 379
	};

380
	if (res->tryGetZooKeeper())
M
Merge  
Michael Kolupaev 已提交
381
	{
A
Merge  
Alexey Milovidov 已提交
382
		{
383
			InterserverIOEndpointPtr endpoint = std::make_shared<DataPartsExchange::Service>(res->data, res_ptr);
A
Merge  
Alexey Milovidov 已提交
384 385 386 387 388 389
			res->endpoint_holder = get_endpoint_holder(endpoint);
		}

		/// Сервисы для перешардирования.

		{
390
			InterserverIOEndpointPtr endpoint = std::make_shared<RemoteDiskSpaceMonitor::Service>(res->context);
A
Merge  
Alexey Milovidov 已提交
391 392 393 394
			res->disk_space_monitor_endpoint_holder = get_endpoint_holder(endpoint);
		}

		{
395
			InterserverIOEndpointPtr endpoint = std::make_shared<ShardedPartitionUploader::Service>(res_ptr);
A
Merge  
Alexey Arno 已提交
396
			res->sharded_partition_uploader_endpoint_holder = get_endpoint_holder(endpoint);
A
Merge  
Alexey Milovidov 已提交
397 398 399
		}

		{
400
			InterserverIOEndpointPtr endpoint = std::make_shared<RemoteQueryExecutor::Service>(res->context);
A
Merge  
Alexey Milovidov 已提交
401 402
			res->remote_query_executor_endpoint_holder = get_endpoint_holder(endpoint);
		}
A
Merge  
Alexey Arno 已提交
403 404

		{
405
			InterserverIOEndpointPtr endpoint = std::make_shared<RemotePartChecker::Service>(res_ptr);
A
Merge  
Alexey Arno 已提交
406 407
			res->remote_part_checker_endpoint_holder = get_endpoint_holder(endpoint);
		}
M
Merge  
Michael Kolupaev 已提交
408
	}
409

410
	return res;
M
Merge  
Michael Kolupaev 已提交
411 412
}

A
Merge  
Alexey Milovidov 已提交
413

M
Merge  
Michael Kolupaev 已提交
414 415 416 417 418 419 420 421
static String formattedAST(const ASTPtr & ast)
{
	if (!ast)
		return "";
	std::stringstream ss;
	formatAST(*ast, ss, 0, false, true);
	return ss.str();
}
M
Merge  
Michael Kolupaev 已提交
422

A
Merge  
Alexey Milovidov 已提交
423

424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
namespace
{
	/** Основные параметры движка таблицы для сохранения в ZooKeeper.
	  * Позволяет проверить, что они совпадают с локальными.
	  */
	struct TableMetadata
	{
		const MergeTreeData & data;

		TableMetadata(const MergeTreeData & data_)
			: data(data_) {}

		void write(WriteBuffer & out) const
		{
			out << "metadata format version: 1" << "\n"
				<< "date column: " << data.date_column_name << "\n"
				<< "sampling expression: " << formattedAST(data.sampling_expression) << "\n"
				<< "index granularity: " << data.index_granularity << "\n"
A
Merge  
Alexey Milovidov 已提交
442
				<< "mode: " << static_cast<int>(data.merging_params.mode) << "\n"
A
Alexey Milovidov 已提交
443
				<< "sign column: " << data.merging_params.sign_column << "\n"
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492
				<< "primary key: " << formattedAST(data.primary_expr_ast) << "\n";
		}

		String toString() const
		{
			String res;
			WriteBufferFromString out(res);
			write(out);
			return res;
		}

		void check(ReadBuffer & in) const
		{
			/// TODO Можно сделать менее громоздко.

			in >> "metadata format version: 1";

			in >> "\ndate column: ";
			String read_date_column_name;
			in >> read_date_column_name;

			if (read_date_column_name != data.date_column_name)
				throw Exception("Existing table metadata in ZooKeeper differs in date index column."
					" Stored in ZooKeeper: " + read_date_column_name + ", local: " + data.date_column_name,
					ErrorCodes::METADATA_MISMATCH);

			in >> "\nsampling expression: ";
			String read_sample_expression;
			String local_sample_expression = formattedAST(data.sampling_expression);
			in >> read_sample_expression;

			if (read_sample_expression != local_sample_expression)
				throw Exception("Existing table metadata in ZooKeeper differs in sample expression."
					" Stored in ZooKeeper: " + read_sample_expression + ", local: " + local_sample_expression,
					ErrorCodes::METADATA_MISMATCH);

			in >> "\nindex granularity: ";
			size_t read_index_granularity = 0;
			in >> read_index_granularity;

			if (read_index_granularity != data.index_granularity)
				throw Exception("Existing table metadata in ZooKeeper differs in index granularity."
					" Stored in ZooKeeper: " + DB::toString(read_index_granularity) + ", local: " + DB::toString(data.index_granularity),
					ErrorCodes::METADATA_MISMATCH);

			in >> "\nmode: ";
			int read_mode = 0;
			in >> read_mode;

A
Merge  
Alexey Milovidov 已提交
493
			if (read_mode != static_cast<int>(data.merging_params.mode))
494
				throw Exception("Existing table metadata in ZooKeeper differs in mode of merge operation."
A
Merge  
Alexey Milovidov 已提交
495 496
					" Stored in ZooKeeper: " + DB::toString(read_mode) + ", local: "
					+ DB::toString(static_cast<int>(data.merging_params.mode)),
497 498 499 500 501 502
					ErrorCodes::METADATA_MISMATCH);

			in >> "\nsign column: ";
			String read_sign_column;
			in >> read_sign_column;

A
Alexey Milovidov 已提交
503
			if (read_sign_column != data.merging_params.sign_column)
504
				throw Exception("Existing table metadata in ZooKeeper differs in sign column."
A
Alexey Milovidov 已提交
505
					" Stored in ZooKeeper: " + read_sign_column + ", local: " + data.merging_params.sign_column,
506 507 508 509 510 511 512 513
					ErrorCodes::METADATA_MISMATCH);

			in >> "\nprimary key: ";
			String read_primary_key;
			String local_primary_key = formattedAST(data.primary_expr_ast);
			in >> read_primary_key;

			/// NOTE: Можно сделать менее строгую проверку совпадения выражений, чтобы таблицы не ломались от небольших изменений
514
			///	   в коде formatAST.
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
			if (read_primary_key != local_primary_key)
				throw Exception("Existing table metadata in ZooKeeper differs in primary key."
					" Stored in ZooKeeper: " + read_primary_key + ", local: " + local_primary_key,
					ErrorCodes::METADATA_MISMATCH);

			in >> "\n";
			assertEOF(in);
		}

		void check(const String & s) const
		{
			ReadBufferFromString in(s);
			check(in);
		}
	};
}


M
Merge  
Michael Kolupaev 已提交
533
void StorageReplicatedMergeTree::createTableIfNotExists()
M
Merge  
Michael Kolupaev 已提交
534
{
A
Merge  
Alexey Milovidov 已提交
535 536
	auto zookeeper = getZooKeeper();

M
Merge  
Michael Kolupaev 已提交
537 538
	if (zookeeper->exists(zookeeper_path))
		return;
M
Merge  
Michael Kolupaev 已提交
539

M
Merge  
Michael Kolupaev 已提交
540
	LOG_DEBUG(log, "Creating table " << zookeeper_path);
M
Merge  
Michael Kolupaev 已提交
541

M
Merge  
Michael Kolupaev 已提交
542 543
	zookeeper->createAncestors(zookeeper_path);

M
Merge  
Michael Kolupaev 已提交
544
	/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними параметры таблицы.
545
	String metadata = TableMetadata(data).toString();
M
Merge  
Michael Kolupaev 已提交
546

A
Alexey Milovidov 已提交
547 548
	auto acl = zookeeper->getDefaultACL();

M
Merge  
Michael Kolupaev 已提交
549
	zkutil::Ops ops;
550
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path, "",
551
		acl, zkutil::CreateMode::Persistent));
552
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/metadata", metadata,
553
		acl, zkutil::CreateMode::Persistent));
554
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/columns", ColumnsDescription<false>{
555 556 557
		data.getColumnsListNonMaterialized(), data.materialized_columns,
		data.alias_columns, data.column_defaults}.toString(),
		acl, zkutil::CreateMode::Persistent));
558
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/log", "",
559
		acl, zkutil::CreateMode::Persistent));
560
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/blocks", "",
561
		acl, zkutil::CreateMode::Persistent));
562
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/block_numbers", "",
563
		acl, zkutil::CreateMode::Persistent));
564
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/nonincrement_block_numbers", "",
565
		acl, zkutil::CreateMode::Persistent));
566
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/leader_election", "",
567
		acl, zkutil::CreateMode::Persistent));
568
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/temp", "",
569
		acl, zkutil::CreateMode::Persistent));
570
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(zookeeper_path + "/replicas", "",
571
		acl, zkutil::CreateMode::Persistent));
M
Merge  
Michael Kolupaev 已提交
572 573 574 575

	auto code = zookeeper->tryMulti(ops);
	if (code != ZOK && code != ZNODEEXISTS)
		throw zkutil::KeeperException(code);
M
Merge  
Michael Kolupaev 已提交
576
}
M
Merge  
Michael Kolupaev 已提交
577

A
Merge  
Alexey Milovidov 已提交
578

M
Merge  
Michael Kolupaev 已提交
579 580
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
	* Если нет - бросить исключение.
M
Merge  
Michael Kolupaev 已提交
581
	*/
M
Merge  
Michael Kolupaev 已提交
582
void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bool allow_alter)
M
Merge  
Michael Kolupaev 已提交
583
{
A
Merge  
Alexey Milovidov 已提交
584 585
	auto zookeeper = getZooKeeper();

M
Merge  
Michael Kolupaev 已提交
586
	String metadata_str = zookeeper->get(zookeeper_path + "/metadata");
587
	TableMetadata(data).check(metadata_str);
M
Merge  
Michael Kolupaev 已提交
588 589

	zkutil::Stat stat;
590
	auto columns_desc = ColumnsDescription<true>::parse(zookeeper->get(zookeeper_path + "/columns", &stat));
591 592 593 594 595

	auto & columns = columns_desc.columns;
	auto & materialized_columns = columns_desc.materialized;
	auto & alias_columns = columns_desc.alias;
	auto & column_defaults = columns_desc.defaults;
M
Merge  
Michael Kolupaev 已提交
596
	columns_version = stat.version;
597

598 599 600 601
	if (columns != data.getColumnsListNonMaterialized() ||
		materialized_columns != data.materialized_columns ||
		alias_columns != data.alias_columns ||
		column_defaults != data.column_defaults)
M
Merge  
Michael Kolupaev 已提交
602
	{
603 604 605 606
		if (allow_alter &&
			(skip_sanity_checks ||
			 data.getColumnsListNonMaterialized().sizeOfDifference(columns) +
			 data.materialized_columns.sizeOfDifference(materialized_columns) <= 2))
M
Merge  
Michael Kolupaev 已提交
607
		{
M
Merge  
Michael Kolupaev 已提交
608
			LOG_WARNING(log, "Table structure in ZooKeeper is a little different from local table structure. Assuming ALTER.");
609 610

			/// Без всяких блокировок, потому что таблица еще не создана.
611 612 613 614
			context.getDatabase(database_name)->alterTable(
				context, table_name,
				columns, materialized_columns, alias_columns, column_defaults, {});

615
			data.setColumnsList(columns);
616 617 618
			data.materialized_columns = std::move(materialized_columns);
			data.alias_columns = std::move(alias_columns);
			data.column_defaults = std::move(column_defaults);
M
Merge  
Michael Kolupaev 已提交
619
		}
M
Merge  
Michael Kolupaev 已提交
620
		else
M
Merge  
Michael Kolupaev 已提交
621
		{
622
			throw Exception("Table structure in ZooKeeper is too much different from local table structure.",
M
Merge  
Michael Kolupaev 已提交
623
							ErrorCodes::INCOMPATIBLE_COLUMNS);
M
Merge  
Michael Kolupaev 已提交
624
		}
M
Merge  
Michael Kolupaev 已提交
625 626
	}
}
M
Merge  
Michael Kolupaev 已提交
627

A
Merge  
Alexey Milovidov 已提交
628

629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
/** При необходимости восстановить кусок, реплика сама добавляет в свою очередь запись на его получение.
  * Какое поставить время для этой записи в очереди? Время учитывается при расчёте отставания реплики.
  * Для этих целей имеет смысл использовать время создания недостающего куска
  *  (то есть, при расчёте отставания будет учитано, насколько старый кусок нам нужно восстановить).
  */
static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const String & replica_path, const String & part_name)
{
	time_t res = 0;

	/// Узнаем время создания part-а, если он ещё существует (не был, например, смерджен).
	zkutil::Stat stat;
	String unused;
	if (zookeeper->tryGet(replica_path + "/parts/" + part_name, unused, &stat))
		res = stat.ctime / 1000;

	return res;
}


M
Merge  
Michael Kolupaev 已提交
648 649
void StorageReplicatedMergeTree::createReplica()
{
A
Merge  
Alexey Milovidov 已提交
650 651
	auto zookeeper = getZooKeeper();

M
Merge  
Michael Kolupaev 已提交
652
	LOG_DEBUG(log, "Creating replica " << replica_path);
M
Merge  
Michael Kolupaev 已提交
653

M
Merge  
Michael Kolupaev 已提交
654
	/// Создадим пустую реплику. Ноду columns создадим в конце - будем использовать ее в качестве признака, что создание реплики завершено.
A
Alexey Milovidov 已提交
655
	auto acl = zookeeper->getDefaultACL();
M
Merge  
Michael Kolupaev 已提交
656
	zkutil::Ops ops;
657 658 659 660 661 662
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(replica_path, "", acl, zkutil::CreateMode::Persistent));
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(replica_path + "/host", "", acl, zkutil::CreateMode::Persistent));
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(replica_path + "/log_pointer", "", acl, zkutil::CreateMode::Persistent));
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(replica_path + "/queue", "", acl, zkutil::CreateMode::Persistent));
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(replica_path + "/parts", "", acl, zkutil::CreateMode::Persistent));
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(replica_path + "/flags", "", acl, zkutil::CreateMode::Persistent));
663 664 665 666 667 668 669 670

	try
	{
		zookeeper->multi(ops);
	}
	catch (const zkutil::KeeperException & e)
	{
		if (e.code == ZNODEEXISTS)
671
			throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST);
672 673 674

		throw;
	}
M
Merge  
Michael Kolupaev 已提交
675

M
Merge  
Michael Kolupaev 已提交
676 677 678 679 680
	/** Нужно изменить данные ноды /replicas на что угодно, чтобы поток, удаляющий старые записи в логе,
	  *  споткнулся об это изменение и не удалил записи, которые мы еще не прочитали.
	  */
	zookeeper->set(zookeeper_path + "/replicas", "last added replica: " + replica_name);

M
Merge  
Michael Kolupaev 已提交
681
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
682

M
Merge  
Michael Kolupaev 已提交
683 684 685 686
	/** "Эталонная" реплика, у которой мы возьмем информацию о множестве кусков, очередь и указатель на лог.
	  * Возьмем случайную из реплик, созданных раньше этой.
	  */
	String source_replica;
M
Merge  
Michael Kolupaev 已提交
687

M
Merge  
Michael Kolupaev 已提交
688 689 690
	Stat stat;
	zookeeper->exists(replica_path, &stat);
	auto my_create_time = stat.czxid;
M
Merge  
Michael Kolupaev 已提交
691

M
Merge  
Michael Kolupaev 已提交
692 693
	std::random_shuffle(replicas.begin(), replicas.end());
	for (const String & replica : replicas)
M
Merge  
Michael Kolupaev 已提交
694
	{
M
Merge  
Michael Kolupaev 已提交
695 696 697 698 699 700 701 702
		if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica, &stat))
			throw Exception("Replica " + zookeeper_path + "/replicas/" + replica + " was removed from right under our feet.",
							ErrorCodes::NO_SUCH_REPLICA);
		if (stat.czxid < my_create_time)
		{
			source_replica = replica;
			break;
		}
M
Merge  
Michael Kolupaev 已提交
703 704
	}

M
Merge  
Michael Kolupaev 已提交
705
	if (source_replica.empty())
M
Merge  
Michael Kolupaev 已提交
706
	{
M
Merge  
Michael Kolupaev 已提交
707
		LOG_INFO(log, "This is the first replica");
M
Merge  
Michael Kolupaev 已提交
708
	}
M
Merge  
Michael Kolupaev 已提交
709
	else
M
Merge  
Michael Kolupaev 已提交
710
	{
M
Merge  
Michael Kolupaev 已提交
711
		LOG_INFO(log, "Will mimic " << source_replica);
M
Merge  
Michael Kolupaev 已提交
712

M
Merge  
Michael Kolupaev 已提交
713
		String source_path = zookeeper_path + "/replicas/" + source_replica;
M
Merge  
Michael Kolupaev 已提交
714

M
Merge  
Michael Kolupaev 已提交
715 716
		/** Если эталонная реплика еще не до конца создана, подождем.
		  * NOTE: Если при ее создании что-то пошло не так, можем провисеть тут вечно.
717 718 719
		  *	   Можно создавать на время создания эфемерную ноду, чтобы быть уверенным, что реплика создается, а не заброшена.
		  *	   То же можно делать и для таблицы. Можно автоматически удалять ноду реплики/таблицы,
		  *		если видно, что она создана не до конца, а создающий ее умер.
M
Merge  
Michael Kolupaev 已提交
720 721 722 723 724
		  */
		while (!zookeeper->exists(source_path + "/columns"))
		{
			LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");

725
			zkutil::EventPtr event = std::make_shared<Poco::Event>();
M
Merge  
Michael Kolupaev 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
			if (zookeeper->exists(source_path + "/columns", nullptr, event))
			{
				LOG_WARNING(log, "Oops, a watch has leaked");
				break;
			}

			event->wait();
		}

		/// Порядок следующих трех действий важен. Записи в логе могут продублироваться, но не могут потеряться.

		/// Скопируем у эталонной реплики ссылку на лог.
		zookeeper->set(replica_path + "/log_pointer", zookeeper->get(source_path + "/log_pointer"));

		/// Запомним очередь эталонной реплики.
		Strings source_queue_names = zookeeper->getChildren(source_path + "/queue");
		std::sort(source_queue_names.begin(), source_queue_names.end());
		Strings source_queue;
		for (const String & entry_name : source_queue_names)
		{
			String entry;
			if (!zookeeper->tryGet(source_path + "/queue/" + entry_name, entry))
				continue;
			source_queue.push_back(entry);
		}

		/// Добавим в очередь задания на получение всех активных кусков, которые есть у эталонной реплики.
		Strings parts = zookeeper->getChildren(source_path + "/parts");
754 755
		ActiveDataPartSet active_parts_set(parts);

M
Merge  
Michael Kolupaev 已提交
756 757 758 759 760 761 762
		Strings active_parts = active_parts_set.getParts();
		for (const String & name : active_parts)
		{
			LogEntry log_entry;
			log_entry.type = LogEntry::GET_PART;
			log_entry.source_replica = "";
			log_entry.new_part_name = name;
763
			log_entry.create_time = tryGetPartCreateTime(zookeeper, source_path, name);
764

M
Merge  
Michael Kolupaev 已提交
765 766 767 768 769 770 771 772 773
			zookeeper->create(replica_path + "/queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential);
		}
		LOG_DEBUG(log, "Queued " << active_parts.size() << " parts to be fetched");

		/// Добавим в очередь содержимое очереди эталонной реплики.
		for (const String & entry : source_queue)
		{
			zookeeper->create(replica_path + "/queue/queue-", entry, zkutil::CreateMode::PersistentSequential);
		}
774

775
		/// Далее оно будет загружено в переменную queue в методе queue.initialize.
776

M
Merge  
Michael Kolupaev 已提交
777
		LOG_DEBUG(log, "Copied " << source_queue.size() << " queue entries");
M
Merge  
Michael Kolupaev 已提交
778
	}
M
Merge  
Michael Kolupaev 已提交
779

780 781 782 783 784 785
	zookeeper->create(replica_path + "/columns", ColumnsDescription<false>{
			data.getColumnsListNonMaterialized(),
			data.materialized_columns,
			data.alias_columns,
			data.column_defaults
		}.toString(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
786
}
M
Merge  
Michael Kolupaev 已提交
787 788


M
Merge  
Michael Kolupaev 已提交
789
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
M
Merge  
Michael Kolupaev 已提交
790
{
A
Merge  
Alexey Milovidov 已提交
791 792
	auto zookeeper = getZooKeeper();

M
Merge  
Michael Kolupaev 已提交
793
	Strings expected_parts_vec = zookeeper->getChildren(replica_path + "/parts");
M
Merge  
Michael Kolupaev 已提交
794 795

	/// Куски в ZK.
M
Merge  
Michael Kolupaev 已提交
796 797
	NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());

M
Merge  
Michael Kolupaev 已提交
798
	MergeTreeData::DataParts parts = data.getAllDataParts();
M
Merge  
Michael Kolupaev 已提交
799

M
Merge  
Michael Kolupaev 已提交
800
	/// Локальные куски, которых нет в ZK.
M
Merge  
Michael Kolupaev 已提交
801
	MergeTreeData::DataParts unexpected_parts;
M
Merge  
Michael Kolupaev 已提交
802

M
Merge  
Michael Kolupaev 已提交
803 804 805 806 807
	for (const auto & part : parts)
	{
		if (expected_parts.count(part->name))
			expected_parts.erase(part->name);
		else
M
Merge  
Michael Kolupaev 已提交
808
			unexpected_parts.insert(part);
M
Merge  
Michael Kolupaev 已提交
809 810
	}

M
Merge  
Michael Kolupaev 已提交
811
	/// Какие локальные куски добавить в ZK.
M
Merge  
Michael Kolupaev 已提交
812
	MergeTreeData::DataPartsVector parts_to_add;
M
Merge  
Michael Kolupaev 已提交
813 814 815 816

	/// Какие куски нужно забрать с других реплик.
	Strings parts_to_fetch;

M
Merge  
Michael Kolupaev 已提交
817 818
	for (const String & missing_name : expected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
819
		/// Если локально не хватает какого-то куска, но есть покрывающий его кусок, можно заменить в ZK недостающий покрывающим.
M
Merge  
Michael Kolupaev 已提交
820
		auto containing = data.getActiveContainingPart(missing_name);
M
Merge  
Michael Kolupaev 已提交
821 822 823 824 825 826 827 828 829 830
		if (containing)
		{
			LOG_ERROR(log, "Ignoring missing local part " << missing_name << " because part " << containing->name << " exists");
			if (unexpected_parts.count(containing))
			{
				parts_to_add.push_back(containing);
				unexpected_parts.erase(containing);
			}
		}
		else
M
Merge  
Michael Kolupaev 已提交
831
		{
M
Merge  
Michael Kolupaev 已提交
832
			LOG_ERROR(log, "Fetching missing part " << missing_name);
M
Merge  
Michael Kolupaev 已提交
833
			parts_to_fetch.push_back(missing_name);
M
Merge  
Michael Kolupaev 已提交
834 835
		}
	}
M
Merge  
Michael Kolupaev 已提交
836

M
Merge  
Michael Kolupaev 已提交
837 838 839
	for (const String & name : parts_to_fetch)
		expected_parts.erase(name);

840 841 842 843 844 845 846 847 848 849 850 851 852 853 854
	/** Для проверки адекватности, для кусков, которые есть в ФС, но нет в ZK, будем учитывать только не самые новые куски.
	  * Потому что неожиданные новые куски обычно возникают лишь оттого, что они не успели записаться в ZK при грубом перезапуске сервера.
	  * Также это возникает от дедуплицированных кусков, которые не успели удалиться.
	  */
	size_t unexpected_parts_nonnew = 0;
	for (const auto & part : unexpected_parts)
		if (part->level > 0 || part->right < RESERVED_BLOCK_NUMBERS)
			++unexpected_parts_nonnew;

	String sanity_report = "There are "
			+ toString(unexpected_parts.size()) + " unexpected parts ("
			+ toString(unexpected_parts_nonnew) + " of them is not just-written), "
			+ toString(parts_to_add.size()) + " unexpectedly merged parts, "
			+ toString(expected_parts.size()) + " missing obsolete parts, "
			+ toString(parts_to_fetch.size()) + " missing parts";
M
Merge  
Michael Kolupaev 已提交
855

856
	/** Можно автоматически синхронизировать данные,
857 858
	  *  если количество ошибок каждого из четырёх типов не больше соответствующих порогов,
	  *  или если отношение общего количества ошибок к общему количеству кусков (минимальному - в локальной файловой системе или в ZK)
859
	  *  не больше некоторого отношения (например 5%).
860 861 862 863
	  *
	  * Большое количество несовпадений в данных на файловой системе и ожидаемых данных
	  *  может свидетельствовать об ошибке конфигурации (сервер случайно подключили как реплику не от того шарда).
	  * В этом случае, защитный механизм не даёт стартовать серверу.
864 865 866
	  */

	size_t min_parts_local_or_expected = std::min(expected_parts_vec.size(), parts.size());
867
	size_t total_difference = parts_to_add.size() + unexpected_parts_nonnew + expected_parts.size() + parts_to_fetch.size();
868 869 870

	bool insane =
		(parts_to_add.size() > data.settings.replicated_max_unexpectedly_merged_parts
871
			|| unexpected_parts_nonnew > data.settings.replicated_max_unexpected_parts
872 873
			|| expected_parts.size() > data.settings.replicated_max_missing_obsolete_parts
			|| parts_to_fetch.size() > data.settings.replicated_max_missing_active_parts)
874
		&& (total_difference > min_parts_local_or_expected * data.settings.replicated_max_ratio_of_wrong_parts);
M
Merge  
Michael Kolupaev 已提交
875

876 877 878 879 880 881
	if (insane && !skip_sanity_checks)
		throw Exception("The local set of parts of table " + getTableName() + " doesn't look like the set of parts in ZooKeeper. "
			+ sanity_report, ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);

	if (total_difference > 0)
		LOG_WARNING(log, sanity_report);
M
Merge  
Michael Kolupaev 已提交
882

M
Merge  
Michael Kolupaev 已提交
883
	/// Добавим в ZK информацию о кусках, покрывающих недостающие куски.
884
	for (const MergeTreeData::DataPartPtr & part : parts_to_add)
M
Merge  
Michael Kolupaev 已提交
885
	{
M
Merge  
Michael Kolupaev 已提交
886
		LOG_ERROR(log, "Adding unexpected local part to ZooKeeper: " << part->name);
M
Merge  
Michael Kolupaev 已提交
887

M
Merge  
Michael Kolupaev 已提交
888
		zkutil::Ops ops;
M
Merge  
Michael Kolupaev 已提交
889
		checkPartAndAddToZooKeeper(part, ops);
M
Merge  
Michael Kolupaev 已提交
890
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
891
	}
M
Merge  
Michael Kolupaev 已提交
892

M
Merge  
Michael Kolupaev 已提交
893 894 895
	/// Удалим из ZK информацию о кусках, покрытых только что добавленными.
	for (const String & name : expected_parts)
	{
M
Merge  
Michael Kolupaev 已提交
896 897 898
		LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);

		zkutil::Ops ops;
899
		removePartFromZooKeeper(name, ops);
M
Merge  
Michael Kolupaev 已提交
900 901 902 903 904 905 906 907 908 909
		zookeeper->multi(ops);
	}

	/// Добавим в очередь задание забрать недостающие куски с других реплик и уберем из ZK информацию, что они у нас есть.
	for (const String & name : parts_to_fetch)
	{
		LOG_ERROR(log, "Removing missing part from ZooKeeper and queueing a fetch: " << name);

		LogEntry log_entry;
		log_entry.type = LogEntry::GET_PART;
M
Merge  
Michael Kolupaev 已提交
910
		log_entry.source_replica = "";
M
Merge  
Michael Kolupaev 已提交
911
		log_entry.new_part_name = name;
912
		log_entry.create_time = tryGetPartCreateTime(zookeeper, replica_path, name);
M
Merge  
Michael Kolupaev 已提交
913

914
		/// Полагаемся, что это происходит до загрузки очереди (queue.initialize).
M
Merge  
Michael Kolupaev 已提交
915
		zkutil::Ops ops;
916
		removePartFromZooKeeper(name, ops);
917
		ops.emplace_back(std::make_unique<zkutil::Op::Create>(
M
Merge  
Michael Kolupaev 已提交
918
			replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
M
Merge  
Michael Kolupaev 已提交
919
		zookeeper->multi(ops);
M
Merge  
Michael Kolupaev 已提交
920 921 922
	}

	/// Удалим лишние локальные куски.
923
	for (const MergeTreeData::DataPartPtr & part : unexpected_parts)
M
Merge  
Michael Kolupaev 已提交
924
	{
M
Merge  
Michael Kolupaev 已提交
925
		LOG_ERROR(log, "Renaming unexpected part " << part->name << " to ignored_" + part->name);
M
Merge  
Michael Kolupaev 已提交
926
		data.renameAndDetachPart(part, "ignored_", true);
M
Merge  
Michael Kolupaev 已提交
927 928
	}
}
M
Merge  
Michael Kolupaev 已提交
929

A
Merge  
Alexey Milovidov 已提交
930

931 932
void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(
	const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name)
M
Merge  
Michael Kolupaev 已提交
933
{
A
Merge  
Alexey Milovidov 已提交
934 935
	auto zookeeper = getZooKeeper();

M
Merge  
Michael Kolupaev 已提交
936 937 938
	if (part_name.empty())
		part_name = part->name;

939
	check(part->columns);
M
Merge  
Michael Kolupaev 已提交
940
	int expected_columns_version = columns_version;
941

M
Merge  
Michael Kolupaev 已提交
942 943 944 945 946
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
	std::random_shuffle(replicas.begin(), replicas.end());
	String expected_columns_str = part->columns.toString();

	for (const String & replica : replicas)
M
Merge  
Michael Kolupaev 已提交
947
	{
M
Merge  
Michael Kolupaev 已提交
948 949
		zkutil::Stat stat_before, stat_after;
		String columns_str;
M
Merge  
Michael Kolupaev 已提交
950
		if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", columns_str, &stat_before))
M
Merge  
Michael Kolupaev 已提交
951 952 953
			continue;
		if (columns_str != expected_columns_str)
		{
M
Merge  
Michael Kolupaev 已提交
954
			LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
M
Merge  
Michael Kolupaev 已提交
955 956 957
				<< " because columns are different");
			continue;
		}
M
Merge  
Michael Kolupaev 已提交
958
		String checksums_str;
M
Merge  
Michael Kolupaev 已提交
959 960
		/// Проверим, что версия ноды со столбцами не изменилась, пока мы читали checksums.
		/// Это гарантирует, что столбцы и чексуммы относятся к одним и тем же данным.
M
Merge  
Michael Kolupaev 已提交
961 962
		if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/checksums", checksums_str) ||
			!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name + "/columns", &stat_after) ||
M
Merge  
Michael Kolupaev 已提交
963
			stat_before.version != stat_after.version)
M
Merge  
Michael Kolupaev 已提交
964
		{
M
Merge  
Michael Kolupaev 已提交
965
			LOG_INFO(log, "Not checking checksums of part " << part_name << " with replica " << replica
M
Merge  
Michael Kolupaev 已提交
966 967
				<< " because part changed while we were reading its checksums");
			continue;
M
Merge  
Michael Kolupaev 已提交
968
		}
M
Merge  
Michael Kolupaev 已提交
969 970 971

		auto checksums = MergeTreeData::DataPart::Checksums::parse(checksums_str);
		checksums.checkEqual(part->checksums, true);
M
Merge  
Michael Kolupaev 已提交
972 973
	}

M
Merge  
Michael Kolupaev 已提交
974
	if (zookeeper->exists(replica_path + "/parts/" + part_name))
M
Merge  
Michael Kolupaev 已提交
975
	{
M
Merge  
Michael Kolupaev 已提交
976
		LOG_ERROR(log, "checkPartAndAddToZooKeeper: node " << replica_path + "/parts/" + part_name << " already exists");
M
Merge  
Michael Kolupaev 已提交
977 978 979
		return;
	}

A
Alexey Milovidov 已提交
980 981
	auto acl = zookeeper->getDefaultACL();

982
	ops.emplace_back(std::make_unique<zkutil::Op::Check>(
983 984
		zookeeper_path + "/columns",
		expected_columns_version));
985
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(
M
Merge  
Michael Kolupaev 已提交
986
		replica_path + "/parts/" + part_name,
M
Merge  
Michael Kolupaev 已提交
987
		"",
A
Alexey Milovidov 已提交
988
		acl,
M
Merge  
Michael Kolupaev 已提交
989
		zkutil::CreateMode::Persistent));
990
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(
M
Merge  
Michael Kolupaev 已提交
991
		replica_path + "/parts/" + part_name + "/columns",
M
Merge  
Michael Kolupaev 已提交
992
		part->columns.toString(),
A
Alexey Milovidov 已提交
993
		acl,
M
Merge  
Michael Kolupaev 已提交
994
		zkutil::CreateMode::Persistent));
995
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(
M
Merge  
Michael Kolupaev 已提交
996
		replica_path + "/parts/" + part_name + "/checksums",
M
Merge  
Michael Kolupaev 已提交
997
		part->checksums.toString(),
A
Alexey Milovidov 已提交
998
		acl,
M
Merge  
Michael Kolupaev 已提交
999 1000 1001
		zkutil::CreateMode::Persistent));
}

A
Merge  
Alexey Milovidov 已提交
1002

1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
void StorageReplicatedMergeTree::addNewPartToZooKeeper(const MergeTreeData::DataPartPtr & part, zkutil::Ops & ops, String part_name)
{
	auto zookeeper = getZooKeeper();

	if (part_name.empty())
		part_name = part->name;

	check(part->columns);

	auto acl = zookeeper->getDefaultACL();

1014
	ops.emplace_back(std::make_unique<zkutil::Op::Check>(
1015 1016
		zookeeper_path + "/columns",
		columns_version));
1017
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(
1018 1019 1020 1021
		replica_path + "/parts/" + part_name,
		"",
		acl,
		zkutil::CreateMode::Persistent));
1022
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(
1023 1024 1025 1026
		replica_path + "/parts/" + part_name + "/columns",
		part->columns.toString(),
		acl,
		zkutil::CreateMode::Persistent));
1027
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(
1028 1029 1030 1031 1032 1033 1034
		replica_path + "/parts/" + part_name + "/checksums",
		part->checksums.toString(),
		acl,
		zkutil::CreateMode::Persistent));
}


M
Merge  
Michael Kolupaev 已提交
1035
void StorageReplicatedMergeTree::pullLogsToQueue(zkutil::EventPtr next_update_event)
M
Merge  
Michael Kolupaev 已提交
1036
{
A
Alexey Milovidov 已提交
1037
	if (queue.pullLogsToQueue(getZooKeeper(), next_update_event))
M
Merge  
Michael Kolupaev 已提交
1038
	{
A
Alexey Milovidov 已提交
1039 1040
		if (queue_task_handle)
			queue_task_handle->wake();
M
Merge  
Michael Kolupaev 已提交
1041
	}
M
Merge  
Michael Kolupaev 已提交
1042 1043
}

A
Merge  
Alexey Milovidov 已提交
1044

1045
bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
M
Merge  
Michael Kolupaev 已提交
1046
{
M
Merge  
Michael Kolupaev 已提交
1047
	if (entry.type == LogEntry::DROP_RANGE)
M
Merge  
Michael Kolupaev 已提交
1048 1049 1050 1051
	{
		executeDropRange(entry);
		return true;
	}
M
Merge  
Michael Kolupaev 已提交
1052

M
Merge  
Michael Kolupaev 已提交
1053
	if (entry.type == LogEntry::GET_PART ||
M
Merge  
Michael Kolupaev 已提交
1054 1055
		entry.type == LogEntry::MERGE_PARTS ||
		entry.type == LogEntry::ATTACH_PART)
M
Merge  
Michael Kolupaev 已提交
1056 1057
	{
		/// Если у нас уже есть этот кусок или покрывающий его кусок, ничего делать не нужно.
M
Merge  
Michael Kolupaev 已提交
1058
		MergeTreeData::DataPartPtr containing_part = data.getActiveContainingPart(entry.new_part_name);
M
Merge  
Michael Kolupaev 已提交
1059

1060
		/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper. Проверим, что он там есть.
1061
		if (containing_part && getZooKeeper()->exists(replica_path + "/parts/" + containing_part->name))
M
Merge  
Michael Kolupaev 已提交
1062
		{
M
Merge  
Michael Kolupaev 已提交
1063
			if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
1064
				LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " - part already exists.");
M
Merge  
Michael Kolupaev 已提交
1065
			return true;
M
Merge  
Michael Kolupaev 已提交
1066
		}
M
Merge  
Michael Kolupaev 已提交
1067 1068
	}

M
Merge  
Michael Kolupaev 已提交
1069
	if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
M
Merge  
Michael Kolupaev 已提交
1070
		LOG_WARNING(log, "Part " << entry.new_part_name << " from own log doesn't exist.");
M
Merge  
Michael Kolupaev 已提交
1071

1072
	/// Возможно, этот кусок нам не нужен, так как при записи с кворумом, кворум пофейлился (см. ниже про /quorum/failed_parts).
1073
	if (entry.quorum && getZooKeeper()->exists(zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name))
1074 1075 1076 1077 1078
	{
		LOG_DEBUG(log, "Skipping action for part " << entry.new_part_name << " because quorum for that part was failed.");
		return true;	/// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей.
	}

M
Merge  
Michael Kolupaev 已提交
1079 1080
	bool do_fetch = false;

M
Merge  
Michael Kolupaev 已提交
1081 1082
	if (entry.type == LogEntry::GET_PART)
	{
M
Merge  
Michael Kolupaev 已提交
1083
		do_fetch = true;
M
Merge  
Michael Kolupaev 已提交
1084
	}
M
Merge  
Michael Kolupaev 已提交
1085 1086 1087 1088
	else if (entry.type == LogEntry::ATTACH_PART)
	{
		do_fetch = !executeAttachPart(entry);
	}
M
Merge  
Michael Kolupaev 已提交
1089 1090
	else if (entry.type == LogEntry::MERGE_PARTS)
	{
1091 1092 1093 1094 1095 1096 1097 1098
		std::stringstream log_message;
		log_message << "Executing log entry to merge parts ";
		for (auto i : ext::range(0, entry.parts_to_merge.size()))
			log_message << (i != 0 ? ", " : "") << entry.parts_to_merge[i];
		log_message << " to " << entry.new_part_name;

		LOG_TRACE(log, log_message.rdbuf());

M
Merge  
Michael Kolupaev 已提交
1099
		MergeTreeData::DataPartsVector parts;
1100
		bool have_all_parts = true;
M
Merge  
Michael Kolupaev 已提交
1101 1102
		for (const String & name : entry.parts_to_merge)
		{
M
Merge  
Michael Kolupaev 已提交
1103
			MergeTreeData::DataPartPtr part = data.getActiveContainingPart(name);
M
Merge  
Michael Kolupaev 已提交
1104 1105 1106 1107 1108 1109 1110
			if (!part)
			{
				have_all_parts = false;
				break;
			}
			if (part->name != name)
			{
M
Merge  
Michael Kolupaev 已提交
1111 1112
				LOG_WARNING(log, "Part " << name << " is covered by " << part->name
					<< " but should be merged into " << entry.new_part_name << ". This shouldn't happen often.");
M
Merge  
Michael Kolupaev 已提交
1113 1114 1115
				have_all_parts = false;
				break;
			}
M
Merge  
Michael Kolupaev 已提交
1116 1117
			parts.push_back(part);
		}
M
Merge  
Michael Kolupaev 已提交
1118

M
Merge  
Michael Kolupaev 已提交
1119
		if (!have_all_parts)
M
Merge  
Michael Kolupaev 已提交
1120
		{
M
Merge  
Michael Kolupaev 已提交
1121 1122 1123
			/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
			do_fetch = true;
			LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
M
Merge  
Michael Kolupaev 已提交
1124
		}
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
		else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold <= time(nullptr))
		{
			/// If entry is old enough, and have enough size, and part are exists in any replica,
			///  then prefer fetching of merged part from replica.

			size_t sum_parts_size_in_bytes = 0;
			for (const auto & part : parts)
				sum_parts_size_in_bytes += part->size_in_bytes;

			if (sum_parts_size_in_bytes >= data.settings.prefer_fetch_merged_part_size_threshold)
			{
				String replica = findReplicaHavingPart(entry.new_part_name, true);	/// NOTE excessive ZK requests for same data later, may remove.
				if (!replica.empty())
				{
					do_fetch = true;
					LOG_DEBUG(log, "Preffering to fetch " << entry.new_part_name << " from replica");
				}
			}
		}

		if (!do_fetch)
M
Merge  
Michael Kolupaev 已提交
1146
		{
1147
			size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts);
1148 1149

			/// Может бросить исключение.
1150
			DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_merge);
A
Merge  
Alexey Milovidov 已提交
1151

M
Merge  
Michael Kolupaev 已提交
1152
			auto table_lock = lockStructure(false);
1153

1154
			MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts);
M
Merge  
Michael Kolupaev 已提交
1155
			MergeTreeData::Transaction transaction;
A
Merge  
Alexey Arno 已提交
1156
			size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
1157

1158
			auto part = merger.mergePartsToTemporaryPart(
1159
				parts, entry.new_part_name, *merge_entry, aio_threshold, entry.create_time, reserved_space.get());
M
Merge  
Michael Kolupaev 已提交
1160 1161 1162

			zkutil::Ops ops;

1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
			try
			{
				/// Здесь проверяются чексуммы и заполняется ops. Реально кусок добавляется в ZK чуть ниже, при выполнении multi.
				checkPartAndAddToZooKeeper(part, ops, entry.new_part_name);
			}
			catch (const Exception & e)
			{
				if (e.code() == ErrorCodes::CHECKSUM_DOESNT_MATCH
					|| e.code() == ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART)
				{
					do_fetch = true;
					part->remove();

					LOG_ERROR(log, getCurrentExceptionMessage(false) << ". "
						"Data after merge is not byte-identical to data on another replicas. "
						"There could be several reasons: "
						"1. Using newer version of compression library after server update. "
						"2. Using another compression method. "
						"3. Non-deterministic compression algorithm (highly unlikely). "
						"4. Non-deterministic merge algorithm due to logical error in code. "
						"5. Data corruption in memory due to bug in code. "
						"6. Data corruption in memory due to hardware issue. "
1185
						"7. Manual modification of source data after server startup. "
1186 1187 1188
						"8. Manual modification of checksums stored in ZooKeeper. "
						"We will download merged part from replica to force byte-identical result.");
				}
1189 1190
				else
					throw;
1191
			}
M
Merge  
Michael Kolupaev 已提交
1192

1193 1194 1195
			if (!do_fetch)
			{
				merger.renameMergedTemporaryPart(parts, part, entry.new_part_name, &transaction);
1196
				getZooKeeper()->multi(ops);		/// After long merge, get fresh ZK handle, because previous session may be expired.
M
Merge  
Michael Kolupaev 已提交
1197

1198 1199
				/** Удаление старых кусков из ZK и с диска делается отложенно - см. ReplicatedMergeTreeCleanupThread, clearOldParts.
				  */
1200

1201 1202 1203 1204 1205
				/** При ZCONNECTIONLOSS или ZOPERATIONTIMEOUT можем зря откатить локальные изменения кусков.
				  * Это не проблема, потому что в таком случае слияние останется в очереди, и мы попробуем снова.
				  */
				transaction.commit();
				merge_selecting_event.set();
M
Merge  
Michael Kolupaev 已提交
1206

1207 1208
				ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
			}
M
Merge  
Michael Kolupaev 已提交
1209
		}
M
Merge  
Michael Kolupaev 已提交
1210 1211 1212 1213 1214
	}
	else
	{
		throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
	}
M
Merge  
Michael Kolupaev 已提交
1215 1216 1217

	if (do_fetch)
	{
1218 1219
		String covering_part;
		String replica = findReplicaHavingCoveringPart(entry.new_part_name, true, covering_part);
M
Merge  
Michael Kolupaev 已提交
1220

1221
		if (replica.empty() && entry.type == LogEntry::ATTACH_PART)
M
Merge  
Michael Kolupaev 已提交
1222
		{
1223 1224 1225
			/** Если ATTACH - куска может не быть, потому что реплика, на которой кусок есть, ещё сама не успела его прицепить.
				* В таком случае, надо подождать этого.
				*/
A
Merge  
Alexey Milovidov 已提交
1226

1227 1228 1229 1230
			/// Кусок должен быть на реплике-инициаторе.
			if (entry.source_replica.empty() || entry.source_replica == replica_name)
				throw Exception("Logical error: no source replica specified for ATTACH_PART log entry;"
					" or trying to fetch part on source replica", ErrorCodes::LOGICAL_ERROR);
A
Merge  
Alexey Milovidov 已提交
1231

1232 1233
			throw Exception("No active replica has attached part " + entry.new_part_name + " or covering part yet", ErrorCodes::NO_REPLICA_HAS_PART);
		}
A
Merge  
Alexey Milovidov 已提交
1234

1235 1236
		try
		{
M
Merge  
Michael Kolupaev 已提交
1237 1238
			if (replica.empty())
			{
1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
				/** Если кусок должен быть записан с кворумом, и кворум ещё недостигнут,
				  *  то (из-за того, что кусок невозможно прямо сейчас скачать),
				  *  кворумную запись следует считать безуспешной.
				  * TODO Сложный код, вынести отдельно.
				  */
				if (entry.quorum)
				{
					if (entry.type != LogEntry::GET_PART)
						throw Exception("Logical error: log entry with quorum but type is not GET_PART", ErrorCodes::LOGICAL_ERROR);

1249
					if (entry.block_id.empty())
1250
						throw Exception("Logical error: log entry with quorum has empty block_id", ErrorCodes::LOGICAL_ERROR);
1251

1252 1253 1254 1255 1256 1257 1258 1259
					LOG_DEBUG(log, "No active replica has part " << entry.new_part_name << " which needs to be written with quorum."
						" Will try to mark that quorum as failed.");

					/** Атомарно:
					  * - если реплики не стали активными;
					  * - если существует узел quorum с этим куском;
					  * - удалим узел quorum;
					  * - установим nonincrement_block_numbers, чтобы разрешить мерджи через номер потерянного куска;
1260 1261
					  * - добавим кусок в список quorum/failed_parts;
					  * - если кусок ещё не удалён из списка для дедупликации blocks/block_num, то удалим его;
1262
					  *
1263 1264 1265 1266 1267 1268 1269 1270
					  * Если что-то изменится, то ничего не сделаем - попадём сюда снова в следующий раз.
					  */

					/** Соберём версии узлов host у реплик.
					  * Когда реплика становится активной, она в той же транзакции (с созданием is_active), меняет значение host.
					  * Это позволит проследить, что реплики не стали активными.
					  */

1271 1272
					auto zookeeper = getZooKeeper();

1273 1274 1275 1276 1277 1278 1279 1280 1281
					Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");

					zkutil::Ops ops;

					for (size_t i = 0, size = replicas.size(); i < size; ++i)
					{
						Stat stat;
						String path = zookeeper_path + "/replicas/" + replicas[i] + "/host";
						zookeeper->get(path, &stat);
1282
						ops.emplace_back(std::make_unique<zkutil::Op::Check>(path, stat.version));
1283 1284 1285 1286 1287
					}

					/// Проверяем, что пока мы собирали версии, не ожила реплика с нужным куском.
					replica = findReplicaHavingPart(entry.new_part_name, true);

1288 1289
					/// Также за это время могла быть создана совсем новая реплика.
					/// Но если на старых не появится куска, то на новой его тоже не может быть.
1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300

					if (replica.empty())
					{
						Stat quorum_stat;
						String quorum_path = zookeeper_path + "/quorum/status";
						String quorum_str = zookeeper->get(quorum_path, &quorum_stat);
						ReplicatedMergeTreeQuorumEntry quorum_entry;
						quorum_entry.fromString(quorum_str);

						if (quorum_entry.part_name == entry.new_part_name)
						{
1301
							ops.emplace_back(std::make_unique<zkutil::Op::Remove>(quorum_path, quorum_stat.version));
1302 1303 1304 1305 1306 1307 1308 1309 1310

							const auto partition_str = entry.new_part_name.substr(0, 6);
							ActiveDataPartSet::Part part_info;
							ActiveDataPartSet::parsePartName(entry.new_part_name, part_info);

							if (part_info.left != part_info.right)
								throw Exception("Logical error: log entry with quorum for part covering more than one block number",
									ErrorCodes::LOGICAL_ERROR);

1311 1312
							zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + partition_str, "");

A
Alexey Milovidov 已提交
1313 1314
							auto acl = zookeeper->getDefaultACL();

1315
							ops.emplace_back(std::make_unique<zkutil::Op::Create>(
1316 1317
								zookeeper_path + "/nonincrement_block_numbers/" + partition_str + "/block-" + padIndex(part_info.left),
								"",
A
Alexey Milovidov 已提交
1318
								acl,
1319 1320
								zkutil::CreateMode::Persistent));

1321
							ops.emplace_back(std::make_unique<zkutil::Op::Create>(
1322 1323
								zookeeper_path + "/quorum/failed_parts/" + entry.new_part_name,
								"",
A
Alexey Milovidov 已提交
1324
								acl,
1325 1326
								zkutil::CreateMode::Persistent));

1327 1328 1329
							/// Удаление из blocks.
							if (zookeeper->exists(zookeeper_path + "/blocks/" + entry.block_id))
							{
1330 1331 1332
								ops.emplace_back(std::make_unique<zkutil::Op::Remove>(zookeeper_path + "/blocks/" + entry.block_id + "/number", -1));
								ops.emplace_back(std::make_unique<zkutil::Op::Remove>(zookeeper_path + "/blocks/" + entry.block_id + "/checksum", -1));
								ops.emplace_back(std::make_unique<zkutil::Op::Remove>(zookeeper_path + "/blocks/" + entry.block_id, -1));
1333 1334
							}

1335 1336 1337 1338 1339 1340 1341 1342 1343 1344
							auto code = zookeeper->tryMulti(ops);

							if (code == ZOK)
							{
								LOG_DEBUG(log, "Marked quorum for part " << entry.new_part_name << " as failed.");
								return true;	/// NOTE Удаление из virtual_parts не делается, но оно нужно только для мерджей.
							}
							else if (code == ZBADVERSION || code == ZNONODE || code == ZNODEEXISTS)
							{
								LOG_DEBUG(log, "State was changed or isn't expected when trying to mark quorum for part "
1345
									<< entry.new_part_name << " as failed. Code: " << zerror(code));
1346 1347 1348 1349 1350 1351 1352 1353
							}
							else
								throw zkutil::KeeperException(code);
						}
						else
						{
							LOG_WARNING(log, "No active replica has part " << entry.new_part_name
								<< ", but that part needs quorum and /quorum/status contains entry about another part " << quorum_entry.part_name
1354 1355
								<< ". It means that part was successfully written to " << entry.quorum
								<< " replicas, but then all of them goes offline."
1356 1357 1358 1359 1360 1361 1362 1363
								<< " Or it is a bug.");
						}
					}
				}

				if (replica.empty())
				{
					ProfileEvents::increment(ProfileEvents::ReplicatedPartFailedFetches);
1364
					throw Exception("No active replica has part " + entry.new_part_name + " or covering part", ErrorCodes::NO_REPLICA_HAS_PART);
1365
				}
M
Merge  
Michael Kolupaev 已提交
1366
			}
A
Merge  
Alexey Milovidov 已提交
1367

A
Alexey Milovidov 已提交
1368 1369
			if (!fetchPart(covering_part, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
				return false;
M
Merge  
Michael Kolupaev 已提交
1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381

			if (entry.type == LogEntry::MERGE_PARTS)
				ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
		}
		catch (...)
		{
			/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
			  * а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
			  * для этого мерджа в конец очереди.
			  */
			try
			{
A
Alexey Milovidov 已提交
1382
				auto parts_for_merge = queue.moveSiblingPartsForMergeToEndOfQueue(entry.new_part_name);
M
Merge  
Michael Kolupaev 已提交
1383

A
Alexey Milovidov 已提交
1384
				if (!parts_for_merge.empty() && replica.empty())
M
Merge  
Michael Kolupaev 已提交
1385
				{
A
Alexey Milovidov 已提交
1386 1387
					LOG_INFO(log, "No active replica has part " << entry.new_part_name << ". Will fetch merged part instead.");
					return false;
M
Merge  
Michael Kolupaev 已提交
1388
				}
M
Merge  
Michael Kolupaev 已提交
1389

1390 1391 1392
				/** Если ни у какой активной реплики нет куска, и в очереди нет слияний с его участием,
				  * проверим, есть ли у любой (активной или неактивной) реплики такой кусок или покрывающий его.
				  */
M
Merge  
Michael Kolupaev 已提交
1393 1394
				if (replica.empty())
					enqueuePartForCheck(entry.new_part_name);
M
Merge  
Michael Kolupaev 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403
			}
			catch (...)
			{
				tryLogCurrentException(__PRETTY_FUNCTION__);
			}

			throw;
		}
	}
M
Merge  
Michael Kolupaev 已提交
1404 1405

	return true;
M
Merge  
Michael Kolupaev 已提交
1406 1407
}

A
Merge  
Alexey Milovidov 已提交
1408

M
Merge  
Michael Kolupaev 已提交
1409
void StorageReplicatedMergeTree::executeDropRange(const StorageReplicatedMergeTree::LogEntry & entry)
M
Merge  
Michael Kolupaev 已提交
1410
{
M
Merge  
Michael Kolupaev 已提交
1411
	LOG_INFO(log, (entry.detach ? "Detaching" : "Removing") << " parts inside " << entry.new_part_name << ".");
M
Merge  
Michael Kolupaev 已提交
1412

1413
	queue.removeGetsAndMergesInRange(getZooKeeper(), entry.new_part_name);
M
Merge  
Michael Kolupaev 已提交
1414

M
Merge  
Michael Kolupaev 已提交
1415
	LOG_DEBUG(log, (entry.detach ? "Detaching" : "Removing") << " parts.");
M
Merge  
Michael Kolupaev 已提交
1416 1417 1418
	size_t removed_parts = 0;

	/// Удалим куски, содержащиеся в удаляемом диапазоне.
1419 1420 1421 1422 1423
	/// Важно, чтобы не осталось и старых кусков (оставшихся после мерджа), так как иначе,
	///  после добавления новой реплики, эта новая реплика их скачает, но не удалит.
	/// А также, если этого не делать, куски будут оживать после перезапуска сервера.
	/// Поэтому, используем getAllDataParts.
	auto parts = data.getAllDataParts();
M
Merge  
Michael Kolupaev 已提交
1424 1425 1426 1427
	for (const auto & part : parts)
	{
		if (!ActiveDataPartSet::contains(entry.new_part_name, part->name))
			continue;
1428

M
Merge  
Michael Kolupaev 已提交
1429 1430 1431
		LOG_DEBUG(log, "Removing part " << part->name);
		++removed_parts;

M
Merge  
Michael Kolupaev 已提交
1432 1433 1434 1435
		/// Если кусок удалять не нужно, надежнее переместить директорию до изменений в ZooKeeper.
		if (entry.detach)
			data.renameAndDetachPart(part);

M
Merge  
Michael Kolupaev 已提交
1436
		zkutil::Ops ops;
1437
		removePartFromZooKeeper(part->name, ops);
1438
		auto code = getZooKeeper()->tryMulti(ops);
1439

1440 1441
		/// Если кусок уже удалён (например, потому что он так и не был добавлен в ZK из-за сбоя,
		///  см. ReplicatedMergeTreeBlockOutputStream), то всё Ок.
1442 1443
		if (code != ZOK && code != ZNONODE)
			throw zkutil::KeeperException(code);
M
Merge  
Michael Kolupaev 已提交
1444

M
Merge  
Michael Kolupaev 已提交
1445
		/// Если кусок нужно удалить, надежнее удалить директорию после изменений в ZooKeeper.
M
Merge  
Michael Kolupaev 已提交
1446
		if (!entry.detach)
M
Merge  
Michael Kolupaev 已提交
1447
			data.replaceParts({part}, {}, true);
M
Merge  
Michael Kolupaev 已提交
1448 1449
	}

M
Merge  
Michael Kolupaev 已提交
1450
	LOG_INFO(log, (entry.detach ? "Detached " : "Removed ") << removed_parts << " parts inside " << entry.new_part_name << ".");
M
Merge  
Michael Kolupaev 已提交
1451 1452
}

A
Merge  
Alexey Milovidov 已提交
1453

M
Merge  
Michael Kolupaev 已提交
1454 1455 1456 1457 1458 1459
bool StorageReplicatedMergeTree::executeAttachPart(const StorageReplicatedMergeTree::LogEntry & entry)
{
	String source_path = (entry.attach_unreplicated ? "unreplicated/" : "detached/") + entry.source_part_name;

	LOG_INFO(log, "Attaching part " << entry.source_part_name << " from " << source_path << " as " << entry.new_part_name);

M
Merge  
Michael Kolupaev 已提交
1460
	if (!Poco::File(data.getFullPath() + source_path).exists())
M
Merge  
Michael Kolupaev 已提交
1461
	{
M
Merge  
Michael Kolupaev 已提交
1462
		LOG_INFO(log, "No part at " << source_path << ". Will fetch it instead");
M
Merge  
Michael Kolupaev 已提交
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
		return false;
	}

	LOG_DEBUG(log, "Checking data");
	MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);

	zkutil::Ops ops;
	checkPartAndAddToZooKeeper(part, ops, entry.new_part_name);

	if (entry.attach_unreplicated && unreplicated_data)
	{
		MergeTreeData::DataPartPtr unreplicated_part = unreplicated_data->getPartIfExists(entry.source_part_name);
		if (unreplicated_part)
			unreplicated_data->detachPartInPlace(unreplicated_part);
		else
			LOG_WARNING(log, "Unreplicated part " << entry.source_part_name << " is already detached");
	}

1481
	getZooKeeper()->multi(ops);
M
Merge  
Michael Kolupaev 已提交
1482 1483 1484 1485 1486 1487 1488 1489 1490

	/// NOTE: Не можем использовать renameTempPartAndAdd, потому что кусок не временный - если что-то пойдет не так, его не нужно удалять.
	part->renameTo(entry.new_part_name);
	part->name = entry.new_part_name;
	ActiveDataPartSet::parsePartName(part->name, *part);

	data.attachPart(part);

	LOG_INFO(log, "Finished attaching part " << entry.new_part_name);
M
Merge  
Michael Kolupaev 已提交
1491

M
Merge  
Michael Kolupaev 已提交
1492 1493 1494
	/// На месте удаленных кусков могут появиться новые, с другими данными.
	context.resetCaches();

M
Merge  
Michael Kolupaev 已提交
1495 1496 1497
	return true;
}

A
Merge  
Alexey Milovidov 已提交
1498

M
Merge  
Michael Kolupaev 已提交
1499 1500
void StorageReplicatedMergeTree::queueUpdatingThread()
{
1501 1502
	setThreadName("ReplMTQueueUpd");

M
Merge  
Michael Kolupaev 已提交
1503 1504
	while (!shutdown_called)
	{
M
Merge  
Michael Kolupaev 已提交
1505 1506
		try
		{
M
Merge  
Michael Kolupaev 已提交
1507
			pullLogsToQueue(queue_updating_event);
1508
			queue_updating_event->wait();
M
Merge  
Michael Kolupaev 已提交
1509
		}
1510
		catch (const zkutil::KeeperException & e)
M
Merge  
Michael Kolupaev 已提交
1511 1512
		{
			if (e.code == ZINVALIDSTATE)
1513
				restarting_thread->wakeup();
M
Merge  
Michael Kolupaev 已提交
1514 1515

			tryLogCurrentException(__PRETTY_FUNCTION__);
1516
			queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS);
M
Merge  
Michael Kolupaev 已提交
1517
		}
M
Merge  
Michael Kolupaev 已提交
1518 1519 1520
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
1521
			queue_updating_event->tryWait(QUEUE_UPDATE_ERROR_SLEEP_MS);
M
Merge  
Michael Kolupaev 已提交
1522
		}
M
Merge  
Michael Kolupaev 已提交
1523
	}
M
Merge  
Michael Kolupaev 已提交
1524

1525
	LOG_DEBUG(log, "Queue updating thread finished");
M
Merge  
Michael Kolupaev 已提交
1526
}
M
Merge  
Michael Kolupaev 已提交
1527

A
Merge  
Alexey Milovidov 已提交
1528

1529
bool StorageReplicatedMergeTree::queueTask()
M
Merge  
Michael Kolupaev 已提交
1530
{
1531 1532
	/// Этот объект будет помечать элемент очереди как выполняющийся.
	ReplicatedMergeTreeQueue::SelectedEntry selected;
M
Merge  
Michael Kolupaev 已提交
1533

M
Merge  
Michael Kolupaev 已提交
1534 1535
	try
	{
1536
		selected = queue.selectEntryToProcess(merger, data);
M
Merge  
Michael Kolupaev 已提交
1537 1538 1539 1540 1541
	}
	catch (...)
	{
		tryLogCurrentException(__PRETTY_FUNCTION__);
	}
M
Merge  
Michael Kolupaev 已提交
1542

1543 1544
	LogEntryPtr & entry = selected.first;

M
Merge  
Michael Kolupaev 已提交
1545
	if (!entry)
M
Merge  
Michael Kolupaev 已提交
1546
		return false;
M
Merge  
Michael Kolupaev 已提交
1547

1548 1549
	time_t prev_attempt_time = entry->last_attempt_time;

1550
	bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry)
M
Merge  
Michael Kolupaev 已提交
1551
	{
1552
		try
M
Merge  
Michael Kolupaev 已提交
1553
		{
1554
			return executeLogEntry(*entry);
A
Alexey Milovidov 已提交
1555 1556 1557 1558
		}
		catch (const Exception & e)
		{
			if (e.code() == ErrorCodes::NO_REPLICA_HAS_PART)
1559
			{
A
Alexey Milovidov 已提交
1560 1561
				/// Если ни у кого нет нужного куска, наверно, просто не все реплики работают; не будем писать в лог с уровнем Error.
				LOG_INFO(log, e.displayText());
1562
			}
A
Alexey Milovidov 已提交
1563 1564 1565 1566 1567 1568 1569
			else if (e.code() == ErrorCodes::ABORTED)
			{
				/// Прерванный мердж или скачивание куска - не ошибка.
				LOG_INFO(log, e.message());
			}
			else
				tryLogCurrentException(__PRETTY_FUNCTION__);
1570

1571 1572 1573 1574
			/** Это исключение будет записано в элемент очереди, и его можно будет посмотреть с помощью таблицы system.replication_queue.
			  * Поток, выполняющий это действие, будет спать несколько секунд после исключения.
			  * См. функцию queue.processEntry.
			  */
1575
			throw;
1576 1577 1578
		}
		catch (...)
		{
M
Merge  
Michael Kolupaev 已提交
1579
			tryLogCurrentException(__PRETTY_FUNCTION__);
1580
			throw;
M
Merge  
Michael Kolupaev 已提交
1581
		}
A
Alexey Milovidov 已提交
1582
	});
M
Merge  
Michael Kolupaev 已提交
1583

1584 1585 1586
	/// Будем спать, если обработка прошла неуспешно и если мы недавно уже обрабатывали эту запись.
	bool need_sleep = !res && (entry->last_attempt_time - prev_attempt_time < 10);

M
Merge  
Michael Kolupaev 已提交
1587
	/// Если не было исключения, не нужно спать.
1588
	return !need_sleep;
M
Merge  
Michael Kolupaev 已提交
1589 1590
}

A
Merge  
Alexey Milovidov 已提交
1591

1592 1593 1594 1595
bool StorageReplicatedMergeTree::canMergeParts(
	const MergeTreeData::DataPartPtr & left,
	const MergeTreeData::DataPartPtr & right,
	MemoizedPartsThatCouldBeMerged * memo)
M
Merge  
Michael Kolupaev 已提交
1596
{
A
Merge  
Alexey Milovidov 已提交
1597 1598 1599 1600 1601 1602 1603
	/** Может много времени тратиться на определение, можно ли мерджить два рядом стоящих куска.
	  * Два рядом стоящих куска можно мерджить, если все номера блоков между их номерами не используются ("заброшены", abandoned).
	  * Это значит, что между этими кусками не может быть вставлен другой кусок.
	  *
	  * Но если номера соседних блоков отличаются достаточно сильно (обычно, если между ними много "заброшенных" блоков),
	  *  то делается слишком много чтений из ZooKeeper, чтобы узнать, можно ли их мерджить.
	  *
1604
	  * Воспользуемся утверждением, что если пару кусков было можно мерджить, и их мердж ещё не запланирован,
1605 1606
	  *  то и сейчас их можно мерджить, и будем запоминать это состояние (если задан параметр memo),
	  *  чтобы не делать много раз одинаковые запросы в ZooKeeper.
A
Merge  
Alexey Milovidov 已提交
1607 1608 1609 1610
	  *
	  * TODO Интересно, как это сочетается с DROP PARTITION и затем ATTACH PARTITION.
	  */

1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626
	/// Если какой-то из кусков уже собираются слить в больший, не соглашаемся его сливать.
	if (queue.partWillBeMergedOrMergesDisabled(left->name)
		|| (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name)))
		return false;

	auto key = std::make_pair(left->name, right->name);
	if (memo && memo->count(key))
		return true;

	String month_name = left->name.substr(0, 6);
	auto zookeeper = getZooKeeper();

	/// Нельзя сливать куски, среди которых находится кусок, для которого неудовлетворён кворум.
	/// Замечание: теоретически, это можно было бы разрешить. Но это сделает логику более сложной.
	String quorum_node_value;
	if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_node_value))
A
Merge  
Alexey Milovidov 已提交
1627
	{
1628 1629
		ReplicatedMergeTreeQuorumEntry quorum_entry;
		quorum_entry.fromString(quorum_node_value);
1630

1631 1632
		ActiveDataPartSet::Part part_info;
		ActiveDataPartSet::parsePartName(quorum_entry.part_name, part_info);
A
Merge  
Alexey Milovidov 已提交
1633

1634 1635
		if (part_info.left != part_info.right)
			throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);
1636

1637 1638 1639
		if (left->right <= part_info.left && right->left >= part_info.right)
			return false;
	}
1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654

	/// Won't merge last_part even if quorum is satisfied, because we gonna check if replica has this part
	/// on SELECT execution.
	String quorum_last_part;
	if (zookeeper->tryGet(zookeeper_path + "/quorum/last_part", quorum_last_part) && quorum_last_part.empty() == false)
	{
		ActiveDataPartSet::Part part_info;
		ActiveDataPartSet::parsePartName(quorum_last_part, part_info);

		if (part_info.left != part_info.right)
			throw Exception("Logical error: part written with quorum covers more than one block numbers", ErrorCodes::LOGICAL_ERROR);

		if (left->right <= part_info.left && right->left >= part_info.right)
			return false;
	}
1655

1656 1657 1658 1659 1660 1661 1662 1663 1664 1665
	/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
	for (Int64 number = left->right + 1; number <= right->left - 1; ++number)
	{
		/** Для номеров до RESERVED_BLOCK_NUMBERS не используется AbandonableLock
			*  - такие номера не могут быть "заброшены" - то есть, не использованными для кусков.
			* Это номера кусков, которые были добавлены с помощью ALTER ... ATTACH.
			* Они должны идти без пропусков (для каждого номера должен быть кусок).
			* Проверяем, что для всех таких номеров есть куски,
			*  иначе, через "дыры" - отсутствующие куски, нельзя мерджить.
			*/
1666

1667 1668 1669 1670 1671 1672 1673
		if (number < RESERVED_BLOCK_NUMBERS)
		{
			if (!data.hasBlockNumberInMonth(number, left->month))
				return false;
		}
		else
		{
1674
			String path1 = zookeeper_path +	             "/block_numbers/" + month_name + "/block-" + padIndex(number);
1675
			String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
1676

1677 1678
			if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
				AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
1679 1680
				return false;
		}
1681
	}
1682

1683 1684
	if (memo)
		memo->insert(key);
A
Merge  
Alexey Milovidov 已提交
1685

1686 1687
	return true;
}
1688

1689

1690 1691 1692
void StorageReplicatedMergeTree::mergeSelectingThread()
{
	setThreadName("ReplMTMergeSel");
A
Alexey Milovidov 已提交
1693
	LOG_DEBUG(log, "Merge selecting thread started");
1694 1695 1696 1697 1698 1699 1700 1701 1702

	bool need_pull = true;

	MemoizedPartsThatCouldBeMerged memoized_parts_that_could_be_merged;

	auto can_merge = [&memoized_parts_that_could_be_merged, this]
		(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) -> bool
	{
		return canMergeParts(left, right, &memoized_parts_that_could_be_merged);
A
Merge  
Alexey Milovidov 已提交
1703 1704
	};

M
Merge  
Michael Kolupaev 已提交
1705 1706
	while (!shutdown_called && is_leader_node)
	{
M
Michael Kolupaev 已提交
1707
		bool success = false;
M
Merge  
Michael Kolupaev 已提交
1708

M
Michael Kolupaev 已提交
1709
		try
M
Merge  
Michael Kolupaev 已提交
1710
		{
M
Merge  
Michael Kolupaev 已提交
1711 1712
			if (need_pull)
			{
A
Alexey Milovidov 已提交
1713 1714
				/// Нужно загрузить новые записи в очередь перед тем, как выбирать куски для слияния.
				///  (чтобы мы знали, какие куски уже собираются сливать).
M
Merge  
Michael Kolupaev 已提交
1715 1716 1717 1718
				pullLogsToQueue();
				need_pull = false;
			}

A
Alexey Milovidov 已提交
1719 1720
			std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);

1721 1722 1723
			/** If many merges is already queued, then will queue only small enough merges.
			  * Otherwise merge queue could be filled with only large merges,
			  *  and in the same time, many small parts could be created and won't be merged.
1724
			  */
1725
			size_t merges_queued = queue.countMerges();
1726

1727
			if (merges_queued >= data.settings.max_replicated_merges_in_queue)
M
Michael Kolupaev 已提交
1728
			{
1729 1730 1731 1732
				LOG_TRACE(log, "Number of queued merges (" << merges_queued
					<< ") is greater than max_replicated_merges_in_queue ("
					<< data.settings.max_replicated_merges_in_queue << "), so won't select new parts to merge.");
			}
A
Alexey Milovidov 已提交
1733 1734 1735 1736
			else
			{
				MergeTreeData::DataPartsVector parts;
				String merged_name;
M
Merge  
Michael Kolupaev 已提交
1737

1738
				if (merger.selectPartsToMerge(
1739 1740 1741
					parts, merged_name, false,
					merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued),
					can_merge)
A
Alexey Milovidov 已提交
1742 1743 1744 1745 1746
					&& createLogEntryToMergeParts(parts, merged_name))
				{
					success = true;
					need_pull = true;
				}
M
Merge  
Michael Kolupaev 已提交
1747 1748 1749 1750 1751 1752 1753
			}
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}

M
Merge  
Michael Kolupaev 已提交
1754
		if (shutdown_called || !is_leader_node)
M
Merge  
Michael Kolupaev 已提交
1755 1756
			break;

M
Merge  
Michael Kolupaev 已提交
1757
		if (!success)
M
Merge  
Michael Kolupaev 已提交
1758
			merge_selecting_event.tryWait(MERGE_SELECTING_SLEEP_MS);
M
Merge  
Michael Kolupaev 已提交
1759
	}
M
Merge  
Michael Kolupaev 已提交
1760

1761
	LOG_DEBUG(log, "Merge selecting thread finished");
M
Merge  
Michael Kolupaev 已提交
1762 1763
}

M
Merge  
Michael Kolupaev 已提交
1764

1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799
bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
	const MergeTreeData::DataPartsVector & parts, const String & merged_name, ReplicatedMergeTreeLogEntryData * out_log_entry)
{
	auto zookeeper = getZooKeeper();

	bool all_in_zk = true;
	for (const auto & part : parts)
	{
		/// Если о каком-то из кусков нет информации в ZK, не будем сливать.
		if (!zookeeper->exists(replica_path + "/parts/" + part->name))
		{
			all_in_zk = false;

			if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < time(0))
			{
				LOG_WARNING(log, "Part " << part->name << " (that was selected for merge)"
					<< " with age " << (time(0) - part->modification_time)
					<< " seconds exists locally but not in ZooKeeper."
					<< " Won't do merge with that part and will check it.");
				enqueuePartForCheck(part->name);
			}
		}
	}
	if (!all_in_zk)
		return false;

	LogEntry entry;
	entry.type = LogEntry::MERGE_PARTS;
	entry.source_replica = replica_name;
	entry.new_part_name = merged_name;
	entry.create_time = time(0);

	for (const auto & part : parts)
		entry.parts_to_merge.push_back(part->name);

A
Alexey Milovidov 已提交
1800 1801
	String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
	entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
1802 1803 1804 1805 1806 1807 1808

	String month_name = parts[0]->name.substr(0, 6);
	for (size_t i = 0; i + 1 < parts.size(); ++i)
	{
		/// Уберем больше не нужные отметки о несуществующих блоках.
		for (Int64 number = std::max(RESERVED_BLOCK_NUMBERS, parts[i]->right + 1); number <= parts[i + 1]->left - 1; ++number)
		{
1809
			zookeeper->tryRemove(zookeeper_path +	           "/block_numbers/" + month_name + "/block-" + padIndex(number));
1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820
			zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number));
		}
	}

	if (out_log_entry)
		*out_log_entry = entry;

	return true;
}


1821 1822 1823 1824
void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops)
{
	String part_path = replica_path + "/parts/" + part_name;

1825 1826 1827
	ops.emplace_back(std::make_unique<zkutil::Op::Remove>(part_path + "/checksums", -1));
	ops.emplace_back(std::make_unique<zkutil::Op::Remove>(part_path + "/columns", -1));
	ops.emplace_back(std::make_unique<zkutil::Op::Remove>(part_path, -1));
1828 1829 1830
}


M
Merge  
Michael Kolupaev 已提交
1831 1832
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{
A
Merge  
Alexey Milovidov 已提交
1833 1834
	auto zookeeper = getZooKeeper();

M
Merge  
Michael Kolupaev 已提交
1835 1836
	String part_path = replica_path + "/parts/" + part_name;

1837
	LogEntryPtr log_entry = std::make_shared<LogEntry>();
M
Merge  
Michael Kolupaev 已提交
1838
	log_entry->type = LogEntry::GET_PART;
1839
	log_entry->create_time = tryGetPartCreateTime(zookeeper, replica_path, part_name);
M
Merge  
Michael Kolupaev 已提交
1840 1841
	log_entry->source_replica = "";
	log_entry->new_part_name = part_name;
M
Merge  
Michael Kolupaev 已提交
1842 1843

	zkutil::Ops ops;
1844
	ops.emplace_back(std::make_unique<zkutil::Op::Create>(
M
Merge  
Michael Kolupaev 已提交
1845
		replica_path + "/queue/queue-", log_entry->toString(), zookeeper->getDefaultACL(),
M
Merge  
Michael Kolupaev 已提交
1846
		zkutil::CreateMode::PersistentSequential));
1847 1848 1849

	removePartFromZooKeeper(part_name, ops);

M
Merge  
Michael Kolupaev 已提交
1850 1851
	auto results = zookeeper->multi(ops);

1852
	String path_created = dynamic_cast<zkutil::Op::Create &>(*ops[0]).getPathCreated();
A
Alexey Milovidov 已提交
1853
	log_entry->znode_name = path_created.substr(path_created.find_last_of('/') + 1);
1854
	queue.insert(zookeeper, log_entry);
M
Merge  
Michael Kolupaev 已提交
1855 1856
}

A
Merge  
Alexey Milovidov 已提交
1857

M
Merge  
Michael Kolupaev 已提交
1858 1859
void StorageReplicatedMergeTree::becomeLeader()
{
1860 1861
	std::lock_guard<std::mutex> lock(leader_node_mutex);

1862 1863 1864
	if (shutdown_called)
		return;

M
Merge  
Michael Kolupaev 已提交
1865
	LOG_INFO(log, "Became leader");
M
Merge  
Michael Kolupaev 已提交
1866 1867 1868 1869
	is_leader_node = true;
	merge_selecting_thread = std::thread(&StorageReplicatedMergeTree::mergeSelectingThread, this);
}

A
Merge  
Alexey Milovidov 已提交
1870

M
Merge  
Michael Kolupaev 已提交
1871
String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active)
M
Merge  
Michael Kolupaev 已提交
1872
{
A
Merge  
Alexey Milovidov 已提交
1873
	auto zookeeper = getZooKeeper();
M
Merge  
Michael Kolupaev 已提交
1874
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
M
Merge  
Michael Kolupaev 已提交
1875

1876
	/// Select replicas in uniformly random order.
M
Merge  
Michael Kolupaev 已提交
1877 1878 1879 1880
	std::random_shuffle(replicas.begin(), replicas.end());

	for (const String & replica : replicas)
	{
A
Alexey Milovidov 已提交
1881 1882 1883 1884
		/// We don't interested in ourself.
		if (replica == replica_name)
			continue;

M
Merge  
Michael Kolupaev 已提交
1885 1886
		if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) &&
			(!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")))
M
Merge  
Michael Kolupaev 已提交
1887
			return replica;
1888

A
Alexey Milovidov 已提交
1889
		/// Obviously, replica could become inactive or even vanish after return from this method.
M
Merge  
Michael Kolupaev 已提交
1890 1891
	}

1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
	return {};
}


String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(const String & part_name, bool active, String & out_covering_part_name)
{
	auto zookeeper = getZooKeeper();
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");

	/// Select replicas in uniformly random order.
	std::random_shuffle(replicas.begin(), replicas.end());

	for (const String & replica : replicas)
	{
A
Alexey Milovidov 已提交
1906 1907 1908
		if (replica == replica_name)
			continue;

1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928
		if (active && !zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
			continue;

		String largest_part_found;
		Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts");
		for (const String & part_on_replica : parts)
		{
			if (part_on_replica == part_name || ActiveDataPartSet::contains(part_on_replica, part_name))
			{
				if (largest_part_found.empty()
					|| ActiveDataPartSet::contains(part_on_replica, largest_part_found))
				{
					largest_part_found = part_on_replica;
				}
			}
		}

		if (!largest_part_found.empty())
		{
			out_covering_part_name = largest_part_found;
A
Alexey Milovidov 已提交
1929
			return replica;
1930 1931 1932 1933
		}
	}

	return {};
M
Merge  
Michael Kolupaev 已提交
1934 1935
}

A
Merge  
Alexey Milovidov 已提交
1936

1937 1938
/** Если для куска отслеживается кворум, то обновить информацию о нём в ZK.
  */
1939
void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
1940
{
1941
	auto zookeeper = getZooKeeper();
1942

1943
	/// Информация, на какие реплики был добавлен кусок, если кворум ещё не достигнут.
1944
	const String quorum_status_path = zookeeper_path + "/quorum/status";
1945 1946 1947
	/// Имя предыдущего куска, для которого был достигнут кворум.
	const String quorum_last_part_path = zookeeper_path + "/quorum/last_part";

1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966
	String value;
	zkutil::Stat stat;

	/// Если узла нет, значит по всем кворумным INSERT-ам уже был достигнут кворум, и ничего делать не нужно.
	while (zookeeper->tryGet(quorum_status_path, value, &stat))
	{
		ReplicatedMergeTreeQuorumEntry quorum_entry;
		quorum_entry.fromString(value);

		if (quorum_entry.part_name != part_name)
		{
			/// Кворум уже был достигнут. Более того, уже начался другой INSERT с кворумом.
			break;
		}

		quorum_entry.replicas.insert(replica_name);

		if (quorum_entry.replicas.size() >= quorum_entry.required_number_of_replicas)
		{
1967 1968 1969
			/// Кворум достигнут. Удаляем узел, а также обновляем информацию о последнем куске, который был успешно записан с кворумом.

			zkutil::Ops ops;
1970 1971
			ops.emplace_back(std::make_unique<zkutil::Op::Remove>(quorum_status_path, stat.version));
			ops.emplace_back(std::make_unique<zkutil::Op::SetData>(quorum_last_part_path, part_name, -1));
1972
			auto code = zookeeper->tryMulti(ops);
1973

1974 1975 1976 1977 1978
			if (code == ZOK)
			{
				break;
			}
			else if (code == ZNONODE)
1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995
			{
				/// Кворум уже был достигнут.
				break;
			}
			else if (code == ZBADVERSION)
			{
				/// Узел успели обновить. Надо заново его прочитать и повторить все действия.
				continue;
			}
			else
				throw zkutil::KeeperException(code, quorum_status_path);
		}
		else
		{
			/// Обновляем узел, прописывая туда на одну реплику больше.
			auto code = zookeeper->trySet(quorum_status_path, quorum_entry.toString(), stat.version);

1996 1997 1998 1999 2000
			if (code == ZOK)
			{
				break;
			}
			else if (code == ZNONODE)
2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016
			{
				/// Кворум уже был достигнут.
				break;
			}
			else if (code == ZBADVERSION)
			{
				/// Узел успели обновить. Надо заново его прочитать и повторить все действия.
				continue;
			}
			else
				throw zkutil::KeeperException(code, quorum_status_path);
		}
	}
}


A
Alexey Milovidov 已提交
2017
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
M
Merge  
Michael Kolupaev 已提交
2018
{
A
Alexey Milovidov 已提交
2019 2020 2021 2022 2023 2024 2025 2026 2027 2028
	{
		std::lock_guard<std::mutex> lock(currently_fetching_parts_mutex);
		if (!currently_fetching_parts.insert(part_name).second)
		{
			LOG_DEBUG(log, "Part " << part_name << " is already fetching right now");
			return false;
		}
	}

	SCOPE_EXIT
P
proller 已提交
2029
	({
A
Alexey Milovidov 已提交
2030 2031
		std::lock_guard<std::mutex> lock(currently_fetching_parts_mutex);
		currently_fetching_parts.erase(part_name);
P
proller 已提交
2032
	});
A
Alexey Milovidov 已提交
2033

2034
	LOG_DEBUG(log, "Fetching part " << part_name << " from " << replica_path);
M
Merge  
Michael Kolupaev 已提交
2035

2036 2037 2038
	TableStructureReadLockPtr table_lock;
	if (!to_detached)
		table_lock = lockStructure(true);
M
Merge  
Michael Kolupaev 已提交
2039

2040
	ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
M
Merge  
Michael Kolupaev 已提交
2041

2042 2043 2044 2045
	Stopwatch stopwatch;
	PartLogElement elem;
	elem.event_time = time(0);

A
Alexey Milovidov 已提交
2046 2047
	MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(
		part_name, replica_path, address.host, address.replication_port, to_detached);
M
Merge  
Michael Kolupaev 已提交
2048

2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062
	PartLog * part_log = context.getPartLog();
	if (part_log)
	{
		elem.event_type = PartLogElement::DOWNLOAD_PART;
		elem.size_in_bytes = part->size_in_bytes;
		elem.duration_ms = stopwatch.elapsed() / 10000000;

		elem.database_name = part->storage.getDatabaseName();
		elem.table_name = part->storage.getTableName();
		elem.part_name = part->name;

		part_log->add(elem);
	}

2063 2064 2065
	if (!to_detached)
	{
		zkutil::Ops ops;
A
Alexey Milovidov 已提交
2066 2067 2068 2069 2070 2071

		/** NOTE
		  * Здесь возникает эксепшен, если произошёл ALTER с изменением типа столбца или удалением столбца,
		  *  а кусок на удалённом сервере ещё не модифицирован.
		  * Через некоторое время одна из следующих попыток сделать fetchPart будет успешной.
		  */
2072
		checkPartAndAddToZooKeeper(part, ops, part_name);
M
Merge  
Michael Kolupaev 已提交
2073

2074 2075
		MergeTreeData::Transaction transaction;
		auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
M
Merge  
Michael Kolupaev 已提交
2076

2077
		getZooKeeper()->multi(ops);
2078
		transaction.commit();
2079 2080

		/** Если для этого куска отслеживается кворум, то надо его обновить.
2081
		  * Если не успеем, в случае потери сессии, при перезапуске сервера - см. метод ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart.
2082
		  */
2083 2084
		if (quorum)
			updateQuorum(part_name);
2085

2086
		merge_selecting_event.set();
M
Merge  
Michael Kolupaev 已提交
2087

2088 2089 2090 2091 2092 2093 2094
		for (const auto & removed_part : removed_parts)
		{
			LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
			ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
		}
	}
	else
M
Michael Kolupaev 已提交
2095
	{
2096
		Poco::File(data.getFullPath() + "detached/tmp_" + part_name).renameTo(data.getFullPath() + "detached/" + part_name);
M
Michael Kolupaev 已提交
2097 2098
	}

M
Merge  
Michael Kolupaev 已提交
2099 2100
	ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);

2101
	LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_path << (to_detached ? " (to 'detached' directory)" : ""));
A
Alexey Milovidov 已提交
2102
	return true;
M
Merge  
Michael Kolupaev 已提交
2103
}
M
Merge  
Michael Kolupaev 已提交
2104

A
Merge  
Alexey Milovidov 已提交
2105

M
Merge  
Michael Kolupaev 已提交
2106 2107
void StorageReplicatedMergeTree::shutdown()
{
2108 2109 2110 2111 2112 2113
	/** This must be done before waiting for restarting_thread.
	  * Because restarting_thread will wait for finishing of tasks in background pool,
	  *  and parts are fetched in that tasks.
	  */
	fetcher.cancel();

2114
	if (restarting_thread)
M
Merge  
Michael Kolupaev 已提交
2115
	{
2116 2117
		restarting_thread->stop();
		restarting_thread.reset();
M
Merge  
Michael Kolupaev 已提交
2118
	}
M
Merge  
Michael Kolupaev 已提交
2119

2120 2121 2122 2123 2124
	if (endpoint_holder)
	{
		endpoint_holder->cancel();
		endpoint_holder = nullptr;
	}
A
Merge  
Alexey Milovidov 已提交
2125

2126 2127 2128 2129 2130
	if (disk_space_monitor_endpoint_holder)
	{
		disk_space_monitor_endpoint_holder->cancel();
		disk_space_monitor_endpoint_holder = nullptr;
	}
A
Merge  
Alexey Milovidov 已提交
2131
	disk_space_monitor_client.cancel();
A
Merge  
Alexey Milovidov 已提交
2132

2133 2134 2135 2136 2137
	if (sharded_partition_uploader_endpoint_holder)
	{
		sharded_partition_uploader_endpoint_holder->cancel();
		sharded_partition_uploader_endpoint_holder = nullptr;
	}
A
Merge  
Alexey Arno 已提交
2138
	sharded_partition_uploader_client.cancel();
A
Merge  
Alexey Milovidov 已提交
2139

2140 2141 2142 2143 2144
	if (remote_query_executor_endpoint_holder)
	{
		remote_query_executor_endpoint_holder->cancel();
		remote_query_executor_endpoint_holder = nullptr;
	}
A
Merge  
Alexey Milovidov 已提交
2145
	remote_query_executor_client.cancel();
A
Merge  
Alexey Arno 已提交
2146 2147 2148 2149 2150 2151

	if (remote_part_checker_endpoint_holder)
	{
		remote_part_checker_endpoint_holder->cancel();
		remote_part_checker_endpoint_holder = nullptr;
	}
M
Merge  
Michael Kolupaev 已提交
2152 2153 2154
}


M
Merge  
Michael Kolupaev 已提交
2155 2156 2157 2158 2159 2160 2161 2162
StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
	try
	{
		shutdown();
	}
	catch(...)
	{
2163
		tryLogCurrentException(__PRETTY_FUNCTION__);
M
Merge  
Michael Kolupaev 已提交
2164 2165 2166
	}
}

A
Merge  
Alexey Milovidov 已提交
2167

M
Merge  
Michael Kolupaev 已提交
2168
BlockInputStreams StorageReplicatedMergeTree::read(
2169 2170 2171 2172 2173 2174 2175
	const Names & column_names,
	ASTPtr query,
	const Context & context,
	const Settings & settings,
	QueryProcessingStage::Enum & processed_stage,
	const size_t max_block_size,
	const unsigned threads)
M
Merge  
Michael Kolupaev 已提交
2176
{
2177 2178 2179 2180 2181 2182 2183 2184
	/** У таблицы может быть два вида данных:
	  * - реплицируемые данные;
	  * - старые, нереплицируемые данные - они лежат отдельно и их целостность никак не контролируется.
	  * А ещё движок таблицы предоставляет возможность использовать "виртуальные столбцы".
	  * Один из них - _replicated позволяет определить, из какой части прочитаны данные,
	  *  или, при использовании в WHERE - выбрать данные только из одной части.
	  */

M
Merge  
Michael Kolupaev 已提交
2185 2186 2187 2188 2189 2190 2191 2192
	Names virt_column_names;
	Names real_column_names;
	for (const auto & it : column_names)
		if (it == "_replicated")
			virt_column_names.push_back(it);
		else
			real_column_names.push_back(it);

2193
	auto & select = typeid_cast<const ASTSelectQuery &>(*query);
A
Merge  
Andrey Mironov 已提交
2194 2195

	/// Try transferring some condition from WHERE to PREWHERE if enabled and viable
2196
	if (settings.optimize_move_to_prewhere && select.where_expression && !select.prewhere_expression && !select.final())
2197
		MergeTreeWhereOptimizer{query, context, data, real_column_names, log};
A
Merge  
Andrey Mironov 已提交
2198

M
Merge  
Michael Kolupaev 已提交
2199
	Block virtual_columns_block;
2200
	auto column = std::make_shared<ColumnUInt8>(2);
M
Merge  
Michael Kolupaev 已提交
2201 2202 2203
	ColumnPtr column_ptr = column;
	column->getData()[0] = 0;
	column->getData()[1] = 1;
2204
	virtual_columns_block.insert(ColumnWithTypeAndName(column_ptr, std::make_shared<DataTypeUInt8>(), "_replicated"));
M
Merge  
Michael Kolupaev 已提交
2205

2206
	/// Если запрошен столбец _replicated, пробуем индексировать.
M
Merge  
Michael Kolupaev 已提交
2207
	if (!virt_column_names.empty())
2208
		VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
M
Merge  
Michael Kolupaev 已提交
2209

2210
	std::multiset<UInt8> values = VirtualColumnUtils::extractSingleValueFromBlock<UInt8>(virtual_columns_block, "_replicated");
M
Merge  
Michael Kolupaev 已提交
2211

M
Merge  
Michael Kolupaev 已提交
2212 2213
	BlockInputStreams res;

M
Merge  
Michael Kolupaev 已提交
2214 2215
	size_t part_index = 0;

2216 2217 2218 2219 2220 2221
	/** Настройки parallel_replica_offset и parallel_replicas_count позволяют читать с одной реплики одну часть данных, а с другой - другую.
	  * Для реплицируемых, данные разбиваются таким же механизмом, как работает секция SAMPLE.
	  * А для нереплицируемых данных, так как их целостность между репликами не контролируется,
	  *  с первой (settings.parallel_replica_offset == 0) реплики выбираются все данные, а с остальных - никакие.
	  */

2222
	if ((settings.parallel_replica_offset == 0) && unreplicated_reader && values.count(0))
M
Merge  
Michael Kolupaev 已提交
2223
	{
2224 2225
		res = unreplicated_reader->read(real_column_names, query,
										context, settings, processed_stage,
2226
										max_block_size, threads, &part_index, 0);
M
Merge  
Michael Kolupaev 已提交
2227 2228 2229 2230 2231 2232

		for (auto & virtual_column : virt_column_names)
		{
			if (virtual_column == "_replicated")
			{
				for (auto & stream : res)
2233
					stream = std::make_shared<AddingConstColumnBlockInputStream<UInt8>>(stream, std::make_shared<DataTypeUInt8>(), 0, "_replicated");
M
Merge  
Michael Kolupaev 已提交
2234 2235 2236 2237
			}
		}
	}

M
Merge  
Michael Kolupaev 已提交
2238
	if (values.count(1))
M
Merge  
Michael Kolupaev 已提交
2239
	{
2240 2241 2242 2243 2244 2245 2246 2247 2248
		/** Настройка select_sequential_consistency имеет два смысла:
		  * 1. Кидать исключение, если на реплике есть не все куски, которые были записаны на кворум остальных реплик.
		  * 2. Не читать куски, которые ещё не были записаны на кворум реплик.
		  * Для этого приходится синхронно сходить в ZooKeeper.
		  */
		Int64 max_block_number_to_read = 0;
		if (settings.select_sequential_consistency)
		{
			auto zookeeper = getZooKeeper();
A
Merge  
Alexey Milovidov 已提交
2249

2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278
			String last_part;
			zookeeper->tryGet(zookeeper_path + "/quorum/last_part", last_part);

			if (!last_part.empty() && !data.getPartIfExists(last_part))	/// TODO Отключение реплики при распределённых запросах.
				throw Exception("Replica doesn't have part " + last_part + " which was successfully written to quorum of other replicas."
					" Send query to another replica or disable 'select_sequential_consistency' setting.", ErrorCodes::REPLICA_IS_NOT_IN_QUORUM);

			if (last_part.empty())	/// Если ещё ни один кусок не был записан с кворумом.
			{
				String quorum_str;
				if (zookeeper->tryGet(zookeeper_path + "/quorum/status", quorum_str))
				{
					ReplicatedMergeTreeQuorumEntry quorum_entry;
					quorum_entry.fromString(quorum_str);
					ActiveDataPartSet::Part part_info;
					ActiveDataPartSet::parsePartName(quorum_entry.part_name, part_info);
					max_block_number_to_read = part_info.left - 1;
				}
			}
			else
			{
				ActiveDataPartSet::Part part_info;
				ActiveDataPartSet::parsePartName(last_part, part_info);
				max_block_number_to_read = part_info.right;
			}
		}

		auto res2 = reader.read(
			real_column_names, query, context, settings, processed_stage, max_block_size, threads, &part_index, max_block_number_to_read);
M
Merge  
Michael Kolupaev 已提交
2279 2280 2281 2282 2283 2284

		for (auto & virtual_column : virt_column_names)
		{
			if (virtual_column == "_replicated")
			{
				for (auto & stream : res2)
2285
					stream = std::make_shared<AddingConstColumnBlockInputStream<UInt8>>(stream, std::make_shared<DataTypeUInt8>(), 1, "_replicated");
M
Merge  
Michael Kolupaev 已提交
2286 2287 2288
			}
		}

M
Merge  
Michael Kolupaev 已提交
2289
		res.insert(res.end(), res2.begin(), res2.end());
M
Merge  
Michael Kolupaev 已提交
2290 2291 2292
	}

	return res;
M
Merge  
Michael Kolupaev 已提交
2293 2294
}

A
Merge  
Alexey Milovidov 已提交
2295

A
Merge  
Alexey Milovidov 已提交
2296
void StorageReplicatedMergeTree::assertNotReadonly() const
M
Merge  
Michael Kolupaev 已提交
2297
{
2298 2299
	if (is_readonly)
		throw Exception("Table is in readonly mode", ErrorCodes::TABLE_IS_READ_ONLY);
A
Merge  
Alexey Milovidov 已提交
2300 2301 2302 2303 2304 2305
}


BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query, const Settings & settings)
{
	assertNotReadonly();
M
Merge  
Michael Kolupaev 已提交
2306

M
Merge  
Michael Kolupaev 已提交
2307
	String insert_id;
M
Merge  
Michael Kolupaev 已提交
2308 2309 2310
	if (query)
		if (ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*query))
			insert_id = insert->insert_id;
M
Merge  
Michael Kolupaev 已提交
2311

2312
	return std::make_shared<ReplicatedMergeTreeBlockOutputStream>(*this, insert_id,
2313
		settings.insert_quorum, settings.insert_quorum_timeout.totalMilliseconds());
M
Merge  
Michael Kolupaev 已提交
2314
}
M
Merge  
Michael Kolupaev 已提交
2315

A
Merge  
Alexey Milovidov 已提交
2316

2317
bool StorageReplicatedMergeTree::optimize(const String & partition, bool final, const Settings & settings)
M
Merge  
Michael Kolupaev 已提交
2318
{
2319 2320 2321 2322 2323
	/// Если есть не реплицируемые данные, то мерджим сначала их.
	if (unreplicated_data)
	{
		std::lock_guard<std::mutex> lock(unreplicated_mutex);
		unreplicated_data->clearOldParts();
M
Merge  
Michael Kolupaev 已提交
2324

2325 2326 2327
		MergeTreeData::DataPartsVector parts;
		String merged_name;
		auto always_can_merge = [](const MergeTreeData::DataPartPtr & a, const MergeTreeData::DataPartPtr & b) { return true; };
M
Merge  
Michael Kolupaev 已提交
2328

2329
		if (unreplicated_merger->selectPartsToMerge(parts, merged_name, true, 0, always_can_merge))
2330
		{
2331
			MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, merged_name, parts);
M
Merge  
Michael Kolupaev 已提交
2332

2333 2334
			auto new_part = unreplicated_merger->mergePartsToTemporaryPart(
				parts, merged_name, *merge_entry, settings.min_bytes_to_use_direct_io, time(0));
M
Merge  
Michael Kolupaev 已提交
2335

2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
			unreplicated_merger->renameMergedTemporaryPart(parts, new_part, merged_name, nullptr);
			return true;
		}
	}

	assertNotReadonly();

	if (!is_leader_node)
		throw Exception("Method OPTIMIZE for ReplicatedMergeTree could be called only on leader replica", ErrorCodes::NOT_IMPLEMENTED);

	auto can_merge = [this]
		(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
	{
		return canMergeParts(left, right, nullptr);
	};

	pullLogsToQueue();

	ReplicatedMergeTreeLogEntryData merge_entry;
	{
		std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);

		MergeTreeData::DataPartsVector parts;
		String merged_name;

		size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
M
Merge  
Michael Kolupaev 已提交
2362

2363
		bool selected = false;
2364

2365 2366
		if (partition.empty())
		{
2367
			selected = merger.selectPartsToMerge(parts, merged_name, false, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge);
2368 2369 2370 2371 2372 2373
		}
		else
		{
			DayNum_t month = MergeTreeData::getMonthFromName(partition);
			selected = merger.selectAllPartsToMergeWithinPartition(parts, merged_name, disk_space, can_merge, month, final);
		}
2374

2375 2376 2377 2378 2379 2380
		if (!selected)
			return false;

		if (!createLogEntryToMergeParts(parts, merged_name, &merge_entry))
			return false;
	}
2381

2382
	waitForAllReplicasToProcessLogEntry(merge_entry);
M
Merge  
Michael Kolupaev 已提交
2383 2384 2385
	return true;
}

A
Merge  
Alexey Milovidov 已提交
2386

M
Merge  
Michael Kolupaev 已提交
2387
void StorageReplicatedMergeTree::alter(const AlterCommands & params,
A
Merge  
Alexey Milovidov 已提交
2388
	const String & database_name, const String & table_name, const Context & context)
M
Merge  
Michael Kolupaev 已提交
2389
{
A
Merge  
Alexey Milovidov 已提交
2390 2391
	assertNotReadonly();

M
Merge  
Michael Kolupaev 已提交
2392 2393 2394
	LOG_DEBUG(log, "Doing ALTER");

	int new_columns_version;
P
proller 已提交
2395
	String new_columns_str;
M
Merge  
Michael Kolupaev 已提交
2396 2397 2398
	zkutil::Stat stat;

	{
P
proller 已提交
2399 2400
		/// Just to read current structure. Alter will be done in separate thread.
		auto table_lock = lockStructure(false);
M
Merge  
Michael Kolupaev 已提交
2401

2402 2403
		if (is_readonly)
			throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
M
Merge  
Michael Kolupaev 已提交
2404

M
Merge  
Michael Kolupaev 已提交
2405 2406
		data.checkAlter(params);

2407 2408 2409 2410
		for (const AlterCommand & param : params)
			if (param.type == AlterCommand::MODIFY_PRIMARY_KEY)
				throw Exception("Modification of primary key is not supported for replicated tables", ErrorCodes::NOT_IMPLEMENTED);

P
proller 已提交
2411 2412 2413 2414
		NamesAndTypesList new_columns = data.getColumnsListNonMaterialized();
		NamesAndTypesList new_materialized_columns = data.materialized_columns;
		NamesAndTypesList new_alias_columns = data.alias_columns;
		ColumnDefaults new_column_defaults = data.column_defaults;
2415
		params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
M
Merge  
Michael Kolupaev 已提交
2416

2417 2418 2419 2420
		new_columns_str = ColumnsDescription<false>{
			new_columns, new_materialized_columns,
			new_alias_columns, new_column_defaults
		}.toString();
M
Merge  
Michael Kolupaev 已提交
2421 2422

		/// Делаем ALTER.
2423
		getZooKeeper()->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);
M
Merge  
Michael Kolupaev 已提交
2424 2425 2426 2427 2428 2429

		new_columns_version = stat.version;
	}

	LOG_DEBUG(log, "Updated columns in ZooKeeper. Waiting for replicas to apply changes.");

2430
	/// Wait until all replicas will apply ALTER.
M
Merge  
Michael Kolupaev 已提交
2431

2432
	/// Subscribe to change of columns, to finish waiting if someone will do another ALTER.
2433
	if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat, alter_query_event))
M
Merge  
Michael Kolupaev 已提交
2434
		throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
2435

M
Merge  
Michael Kolupaev 已提交
2436 2437 2438 2439 2440 2441 2442
	if (stat.version != new_columns_version)
	{
		LOG_WARNING(log, zookeeper_path + "/columns changed before this ALTER finished; "
			"overlapping ALTER-s are fine but use caution with nontransitive changes");
		return;
	}

2443
	Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
2444 2445 2446 2447 2448 2449

	std::set<String> inactive_replicas;
	std::set<String> timed_out_replicas;

	time_t replication_alter_columns_timeout = context.getSettingsRef().replication_alter_columns_timeout;

M
Merge  
Michael Kolupaev 已提交
2450 2451 2452 2453 2454 2455
	for (const String & replica : replicas)
	{
		LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");

		while (!shutdown_called)
		{
2456
			/// Replica could be inactive.
2457
			if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
2458 2459 2460 2461 2462 2463 2464 2465
			{
				LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
					" ALTER will be done asynchronously when replica becomes active.");

				inactive_replicas.emplace(replica);
				break;
			}

M
Merge  
Michael Kolupaev 已提交
2466 2467
			String replica_columns_str;

2468
			/// Replica could has been removed.
2469
			if (!getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/columns", replica_columns_str, &stat))
M
Merge  
Michael Kolupaev 已提交
2470 2471 2472 2473 2474 2475 2476
			{
				LOG_WARNING(log, replica << " was removed");
				break;
			}

			int replica_columns_version = stat.version;

2477
			/// The ALTER has been successfully applied.
M
Merge  
Michael Kolupaev 已提交
2478 2479 2480
			if (replica_columns_str == new_columns_str)
				break;

2481
			if (!getZooKeeper()->exists(zookeeper_path + "/columns", &stat))
M
Merge  
Michael Kolupaev 已提交
2482
				throw Exception(zookeeper_path + "/columns doesn't exist", ErrorCodes::NOT_FOUND_NODE);
2483

M
Merge  
Michael Kolupaev 已提交
2484 2485 2486 2487 2488 2489 2490
			if (stat.version != new_columns_version)
			{
				LOG_WARNING(log, zookeeper_path + "/columns changed before ALTER finished; "
					"overlapping ALTER-s are fine but use caution with nontransitive changes");
				return;
			}

2491
			if (!getZooKeeper()->exists(zookeeper_path + "/replicas/" + replica + "/columns", &stat, alter_query_event))
M
Merge  
Michael Kolupaev 已提交
2492 2493 2494 2495 2496 2497 2498 2499
			{
				LOG_WARNING(log, replica << " was removed");
				break;
			}

			if (stat.version != replica_columns_version)
				continue;

2500 2501 2502
			if (!replication_alter_columns_timeout)
			{
				alter_query_event->wait();
2503
				/// Everything is fine.
2504 2505 2506
			}
			else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000))
			{
2507
				/// Everything is fine.
2508 2509 2510 2511 2512 2513 2514
			}
			else
			{
				LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER."
					" ALTER will be done asynchronously.");

				timed_out_replicas.emplace(replica);
2515
				break;
2516
			}
M
Merge  
Michael Kolupaev 已提交
2517 2518 2519
		}

		if (shutdown_called)
2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552
			throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.",
				ErrorCodes::UNFINISHED);

		if (!inactive_replicas.empty() || !timed_out_replicas.empty())
		{
			std::stringstream exception_message;
			exception_message << "Alter is not finished because";

			if (!inactive_replicas.empty())
			{
				exception_message << " some replicas are inactive right now";

				for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it)
					exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it;
			}

			if (!timed_out_replicas.empty() && !inactive_replicas.empty())
				exception_message << " and";

			if (!timed_out_replicas.empty())
			{
				exception_message << " timeout when waiting for some replicas";

				for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it)
					exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it;

				exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")";
			}

			exception_message << ". Alter will be done asynchronously.";

			throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
		}
M
Merge  
Michael Kolupaev 已提交
2553 2554 2555 2556 2557
	}

	LOG_DEBUG(log, "ALTER finished");
}

M
Merge  
Michael Kolupaev 已提交
2558 2559 2560 2561 2562

/// Название воображаемого куска, покрывающего все возможные куски в указанном месяце с номерами в указанном диапазоне.
static String getFakePartNameForDrop(const String & month_name, UInt64 left, UInt64 right)
{
	/// Диапазон дат - весь месяц.
A
Merge  
Alexey Arno 已提交
2563
	const auto & lut = DateLUT::instance();
2564
	time_t start_time = lut.YYYYMMDDToDate(parse<UInt32>(month_name + "01"));
M
Merge  
Michael Kolupaev 已提交
2565 2566 2567 2568
	DayNum_t left_date = lut.toDayNum(start_time);
	DayNum_t right_date = DayNum_t(static_cast<size_t>(left_date) + lut.daysInMonth(start_time) - 1);

	/// Уровень - right-left+1: кусок не мог образоваться в результате такого или большего количества слияний.
2569
	/// TODO This is not true for parts after ATTACH.
M
Merge  
Michael Kolupaev 已提交
2570 2571 2572
	return ActiveDataPartSet::getPartName(left_date, right_date, left, right, right - left + 1);
}

A
Merge  
Alexey Milovidov 已提交
2573

2574
void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partition, const bool detach, const Settings & settings)
M
Merge  
Michael Kolupaev 已提交
2575
{
A
Merge  
Andrey Mironov 已提交
2576 2577 2578 2579 2580
	if (!unreplicated_data)
		return;

	/// Просит завершить мерджи и не позволяет им начаться.
	/// Это защищает от "оживания" данных за удалённую партицию после завершения мерджа.
2581
	auto merge_blocker = unreplicated_merger->cancel();
A
Merge  
Andrey Mironov 已提交
2582 2583 2584 2585 2586 2587 2588 2589 2590
	auto structure_lock = lockStructure(true);

	const DayNum_t month = MergeTreeData::getMonthDayNum(partition);

	size_t removed_parts = 0;
	MergeTreeData::DataParts parts = unreplicated_data->getDataParts();

	for (const auto & part : parts)
	{
A
Merge  
Alexey Milovidov 已提交
2591
		if (part->month != month)
A
Merge  
Andrey Mironov 已提交
2592 2593 2594 2595 2596
			continue;

		LOG_DEBUG(log, "Removing unreplicated part " << part->name);
		++removed_parts;

2597 2598 2599 2600
		if (detach)
			unreplicated_data->renameAndDetachPart(part, "");
		else
			unreplicated_data->replaceParts({part}, {}, false);
A
Merge  
Andrey Mironov 已提交
2601 2602
	}

2603
	LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " unreplicated parts inside " << applyVisitor(FieldVisitorToString(), partition) << ".");
A
Merge  
Andrey Mironov 已提交
2604 2605 2606
}


2607 2608
void StorageReplicatedMergeTree::dropPartition(
	ASTPtr query, const Field & field, bool detach, bool unreplicated, const Settings & settings)
A
Merge  
Andrey Mironov 已提交
2609 2610 2611
{
	if (unreplicated)
	{
2612
		dropUnreplicatedPartition(field, detach, settings);
A
Merge  
Andrey Mironov 已提交
2613 2614 2615
		return;
	}

A
Merge  
Alexey Milovidov 已提交
2616 2617
	assertNotReadonly();

A
Merge  
Alexey Milovidov 已提交
2618
	String month_name = MergeTreeData::getMonthName(field);
M
Merge  
Michael Kolupaev 已提交
2619 2620

	if (!is_leader_node)
2621 2622 2623
	{
		/// Проксируем запрос в лидера.

2624
		auto live_replicas = getZooKeeper()->getChildren(zookeeper_path + "/leader_election");
2625 2626 2627
		if (live_replicas.empty())
			throw Exception("No active replicas", ErrorCodes::NO_ACTIVE_REPLICAS);

2628
		std::sort(live_replicas.begin(), live_replicas.end());
2629
		const auto leader = getZooKeeper()->get(zookeeper_path + "/leader_election/" + live_replicas.front());
2630 2631 2632 2633

		if (leader == replica_name)
			throw Exception("Leader was suddenly changed or logical error.", ErrorCodes::LEADERSHIP_CHANGED);

2634
		ReplicatedMergeTreeAddress leader_address(getZooKeeper()->get(zookeeper_path + "/replicas/" + leader + "/host"));
2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655

		auto new_query = query->clone();
		auto & alter = typeid_cast<ASTAlterQuery &>(*new_query);

		alter.database = leader_address.database;
		alter.table = leader_address.table;

		/// NOTE Работает только если есть доступ от пользователя default без пароля. Можно исправить с помощью добавления параметра в конфиг сервера.

		Connection connection(
			leader_address.host,
			leader_address.queries_port,
			leader_address.database,
			"", "", "ClickHouse replica");

		RemoteBlockInputStream stream(connection, formattedAST(new_query), &settings);
		NullBlockOutputStream output;

		copyData(stream, output);
		return;
	}
M
Merge  
Michael Kolupaev 已提交
2656

2657 2658
	auto number_and_exists = data.getMinBlockNumberForMonth(data.getMonthFromName(month_name));

2659 2660 2661
	/// Даже если в партиции нет данных, то всё-равно нужно отметить диапазон для удаления.
	/// - Потому что до выполнения DETACH, могут выполниться задачи на скачивание кусков в эту партицию.
	Int64 left = number_and_exists.second ? number_and_exists.first : RESERVED_BLOCK_NUMBERS;
2662

M
Merge  
Michael Kolupaev 已提交
2663 2664 2665 2666
	/** Пропустим один номер в block_numbers для удаляемого месяца, и будем удалять только куски до этого номера.
	  * Это запретит мерджи удаляемых кусков с новыми вставляемыми данными.
	  * Инвариант: в логе не появятся слияния удаляемых кусков с другими кусками.
	  * NOTE: Если понадобится аналогично поддержать запрос DROP PART, для него придется придумать какой-нибудь новый механизм,
2667
	  *	    чтобы гарантировать этот инвариант.
M
Merge  
Michael Kolupaev 已提交
2668
	  */
A
Merge  
Alexey Milovidov 已提交
2669
	Int64 right;
M
Merge  
Michael Kolupaev 已提交
2670 2671

	{
A
Merge  
Alexey Arno 已提交
2672
		AbandonableLockInZooKeeper block_number_lock = allocateBlockNumber(month_name);
M
Merge  
Michael Kolupaev 已提交
2673 2674 2675 2676 2677 2678
		right = block_number_lock.getNumber();
		block_number_lock.unlock();
	}

	/// Такого никогда не должно происходить.
	if (right == 0)
2679
		throw Exception("Logical error: newly allocated block number is zero", ErrorCodes::LOGICAL_ERROR);
M
Merge  
Michael Kolupaev 已提交
2680 2681
	--right;

2682
	String fake_part_name = getFakePartNameForDrop(month_name, left, right);
M
Merge  
Michael Kolupaev 已提交
2683

A
Alexey Milovidov 已提交
2684
	/** Запретим выбирать для слияния удаляемые куски.
M
Merge  
Michael Kolupaev 已提交
2685 2686 2687
	  * Инвариант: после появления в логе записи DROP_RANGE, в логе не появятся слияния удаляемых кусков.
	  */
	{
2688
		std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
A
Alexey Milovidov 已提交
2689
		queue.disableMergesInRange(fake_part_name);
M
Merge  
Michael Kolupaev 已提交
2690 2691
	}

2692 2693
	LOG_DEBUG(log, "Disabled merges in range " << left << " - " << right << " for month " << month_name);

A
Merge  
Alexey Arno 已提交
2694 2695 2696 2697 2698 2699
	/// Наконец, добившись нужных инвариантов, можно положить запись в лог.
	LogEntry entry;
	entry.type = LogEntry::DROP_RANGE;
	entry.source_replica = replica_name;
	entry.new_part_name = fake_part_name;
	entry.detach = detach;
2700
	String log_znode_path = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
A
Merge  
Alexey Arno 已提交
2701 2702
	entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
	entry.create_time = time(0);
A
Merge  
Alexey Milovidov 已提交
2703

A
Merge  
Alexey Arno 已提交
2704 2705 2706 2707 2708 2709 2710 2711
	/// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
	if (settings.replication_alter_partitions_sync != 0)
	{
		if (settings.replication_alter_partitions_sync == 1)
			waitForReplicaToProcessLogEntry(replica_name, entry);
		else
			waitForAllReplicasToProcessLogEntry(entry);
	}
A
Merge  
Alexey Milovidov 已提交
2712
}
A
Merge  
Alexey Milovidov 已提交
2713

A
Merge  
Alexey Arno 已提交
2714

2715
void StorageReplicatedMergeTree::attachPartition(ASTPtr query, const Field & field, bool unreplicated, bool attach_part, const Settings & settings)
M
Merge  
Michael Kolupaev 已提交
2716
{
A
Merge  
Alexey Milovidov 已提交
2717 2718
	assertNotReadonly();

A
Merge  
Alexey Milovidov 已提交
2719
	String partition;
M
Merge  
Michael Kolupaev 已提交
2720

A
Merge  
Alexey Milovidov 已提交
2721
	if (attach_part)
A
Merge  
Alexey Milovidov 已提交
2722
		partition = field.safeGet<String>();
A
Merge  
Alexey Milovidov 已提交
2723 2724
	else
		partition = MergeTreeData::getMonthName(field);
M
Merge  
Michael Kolupaev 已提交
2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737

	String source_dir = (unreplicated ? "unreplicated/" : "detached/");

	/// Составим список кусков, которые нужно добавить.
	Strings parts;
	if (attach_part)
	{
		parts.push_back(partition);
	}
	else
	{
		LOG_DEBUG(log, "Looking for parts for partition " << partition << " in " << source_dir);
		ActiveDataPartSet active_parts;
2738 2739

		std::set<String> part_names;
M
Merge  
Michael Kolupaev 已提交
2740 2741 2742 2743 2744
		for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
		{
			String name = it.name();
			if (!ActiveDataPartSet::isPartDirectory(name))
				continue;
2745
			if (!startsWith(name, partition))
M
Merge  
Michael Kolupaev 已提交
2746 2747 2748
				continue;
			LOG_DEBUG(log, "Found part " << name);
			active_parts.add(name);
2749
			part_names.insert(name);
M
Merge  
Michael Kolupaev 已提交
2750 2751 2752
		}
		LOG_DEBUG(log, active_parts.size() << " of them are active");
		parts = active_parts.getParts();
2753 2754 2755 2756 2757 2758 2759 2760

		/// Неактивные куски переименовываем, чтобы они не могли быть прикреплены в случае повторного ATTACH-а.
		for (const auto & name : part_names)
		{
			String containing_part = active_parts.getContainingPart(name);
			if (!containing_part.empty() && containing_part != name)
				Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name);
		}
M
Merge  
Michael Kolupaev 已提交
2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772
	}

	/// Синхронно проверим, что добавляемые куски существуют и не испорчены хотя бы на этой реплике. Запишем checksums.txt, если его нет.
	LOG_DEBUG(log, "Checking parts");
	for (const String & part : parts)
	{
		LOG_DEBUG(log, "Checking part " << part);
		data.loadPartAndFixMetadata(source_dir + part);
	}

	/// Выделим добавляемым кускам максимальные свободные номера, меньшие RESERVED_BLOCK_NUMBERS.
	/// NOTE: Проверка свободности номеров никак не синхронизируется. Выполнять несколько запросов ATTACH/DETACH/DROP одновременно нельзя.
A
Merge  
Alexey Milovidov 已提交
2773
	Int64 min_used_number = RESERVED_BLOCK_NUMBERS;
2774
	DayNum_t month = MergeTreeData::getMonthFromPartPrefix(partition);
M
Merge  
Michael Kolupaev 已提交
2775

2776
	auto num_and_exists = data.getMinBlockNumberForMonth(month);
2777
	if (num_and_exists.second && num_and_exists.first < min_used_number)
2778
		min_used_number = num_and_exists.first;
M
Merge  
Michael Kolupaev 已提交
2779 2780 2781 2782 2783 2784 2785 2786 2787 2788

	/// Добавим записи в лог.
	std::reverse(parts.begin(), parts.end());
	std::list<LogEntry> entries;
	zkutil::Ops ops;
	for (const String & part_name : parts)
	{
		ActiveDataPartSet::Part part;
		ActiveDataPartSet::parsePartName(part_name, part);
		part.left = part.right = --min_used_number;
2789
		part.level = 0;		/// previous level has no sense after attach.
M
Merge  
Michael Kolupaev 已提交
2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800
		String new_part_name = ActiveDataPartSet::getPartName(part.left_date, part.right_date, part.left, part.right, part.level);

		LOG_INFO(log, "Will attach " << part_name << " as " << new_part_name);

		entries.emplace_back();
		LogEntry & entry = entries.back();
		entry.type = LogEntry::ATTACH_PART;
		entry.source_replica = replica_name;
		entry.source_part_name = part_name;
		entry.new_part_name = new_part_name;
		entry.attach_unreplicated = unreplicated;
2801 2802
		entry.create_time = time(0);

2803
		ops.emplace_back(std::make_unique<zkutil::Op::Create>(
2804
			zookeeper_path + "/log/log-", entry.toString(), getZooKeeper()->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
M
Merge  
Michael Kolupaev 已提交
2805 2806
	}

A
Merge  
Alexey Milovidov 已提交
2807
	LOG_DEBUG(log, "Adding attaches to log");
A
Merge  
Alexey Milovidov 已提交
2808

2809
	getZooKeeper()->multi(ops);
2810

2811 2812
	/// Если надо - дожидаемся выполнения операции на себе или на всех репликах.
	if (settings.replication_alter_partitions_sync != 0)
M
Merge  
Michael Kolupaev 已提交
2813
	{
2814 2815 2816
		size_t i = 0;
		for (LogEntry & entry : entries)
		{
2817
			String log_znode_path = dynamic_cast<zkutil::Op::Create &>(*ops[i]).getPathCreated();
2818 2819 2820 2821 2822 2823
			entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);

			if (settings.replication_alter_partitions_sync == 1)
				waitForReplicaToProcessLogEntry(replica_name, entry);
			else
				waitForAllReplicasToProcessLogEntry(entry);
M
Merge  
Michael Kolupaev 已提交
2824

2825 2826
			++i;
		}
M
Merge  
Michael Kolupaev 已提交
2827
	}
M
Merge  
Michael Kolupaev 已提交
2828 2829
}

2830 2831 2832
bool StorageReplicatedMergeTree::checkTableCanBeDropped() const
{
	/// Consider only synchronized data
2833
	context.checkTableCanBeDropped(database_name, table_name, getData().getTotalCompressedSize());
2834 2835
	return true;
}
A
Merge  
Alexey Milovidov 已提交
2836

M
Merge  
Michael Kolupaev 已提交
2837 2838
void StorageReplicatedMergeTree::drop()
{
2839 2840
	{
		auto zookeeper = tryGetZooKeeper();
A
Merge  
Alexey Milovidov 已提交
2841

2842 2843
		if (is_readonly || !zookeeper)
			throw Exception("Can't drop readonly replicated table (need to drop data in ZooKeeper as well)", ErrorCodes::TABLE_IS_READ_ONLY);
M
Merge  
Michael Kolupaev 已提交
2844

2845
		// checkTableCanBeDropped(); // uncomment to feel yourself safe
2846

2847
		shutdown();
M
Merge  
Michael Kolupaev 已提交
2848

2849 2850
		if (zookeeper->expired())
			throw Exception("Table was not dropped because ZooKeeper session has expired.", ErrorCodes::TABLE_WAS_NOT_DROPPED);
2851

2852 2853 2854
		LOG_INFO(log, "Removing replica " << replica_path);
		replica_is_active_node = nullptr;
		zookeeper->tryRemoveRecursive(replica_path);
M
Merge  
Michael Kolupaev 已提交
2855

2856 2857 2858 2859 2860 2861 2862
		/// Проверяем, что zookeeper_path существует: его могла удалить другая реплика после выполнения предыдущей строки.
		Strings replicas;
		if (zookeeper->tryGetChildren(zookeeper_path + "/replicas", replicas) == ZOK && replicas.empty())
		{
			LOG_INFO(log, "Removing table " << zookeeper_path << " (this might take several minutes)");
			zookeeper->tryRemoveRecursive(zookeeper_path);
		}
M
Merge  
Michael Kolupaev 已提交
2863
	}
M
Merge  
Michael Kolupaev 已提交
2864 2865

	data.dropAllData();
M
Merge  
Michael Kolupaev 已提交
2866 2867
}

A
Merge  
Alexey Milovidov 已提交
2868

M
Merge  
Michael Kolupaev 已提交
2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883
void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
	std::string new_full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';

	data.setPath(new_full_path, true);
	if (unreplicated_data)
		unreplicated_data->setPath(new_full_path + "unreplicated/", false);

	database_name = new_database_name;
	table_name = new_table_name;
	full_path = new_full_path;

	/// TODO: Можно обновить названия логгеров.
}

A
Merge  
Alexey Milovidov 已提交
2884

2885 2886 2887 2888 2889 2890 2891 2892
bool StorageReplicatedMergeTree::existsNodeCached(const std::string & path)
{
	{
		std::lock_guard<std::mutex> lock(existing_nodes_cache_mutex);
		if (existing_nodes_cache.count(path))
			return true;
	}

2893
	bool res = getZooKeeper()->exists(path);
2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904

	if (res)
	{
		std::lock_guard<std::mutex> lock(existing_nodes_cache_mutex);
		existing_nodes_cache.insert(path);
	}

	return res;
}


M
Merge  
Michael Kolupaev 已提交
2905 2906
AbandonableLockInZooKeeper StorageReplicatedMergeTree::allocateBlockNumber(const String & month_name)
{
A
Merge  
Alexey Milovidov 已提交
2907 2908
	auto zookeeper = getZooKeeper();

M
Merge  
Michael Kolupaev 已提交
2909
	String month_path = zookeeper_path + "/block_numbers/" + month_name;
2910
	if (!existsNodeCached(month_path))
M
Merge  
Michael Kolupaev 已提交
2911
	{
A
Merge  
Alexey Milovidov 已提交
2912
		/// Создадим в block_numbers ноду для месяца и пропустим в ней N=RESERVED_BLOCK_NUMBERS значений инкремента.
M
Merge  
Michael Kolupaev 已提交
2913 2914 2915
		/// Нужно, чтобы в будущем при необходимости можно было добавить данные в начало.
		zkutil::Ops ops;
		auto acl = zookeeper->getDefaultACL();
2916
		ops.emplace_back(std::make_unique<zkutil::Op::Create>(month_path, "", acl, zkutil::CreateMode::Persistent));
M
Merge  
Michael Kolupaev 已提交
2917
		for (size_t i = 0; i < RESERVED_BLOCK_NUMBERS; ++i)
M
Merge  
Michael Kolupaev 已提交
2918
		{
2919 2920
			ops.emplace_back(std::make_unique<zkutil::Op::Create>(month_path + "/skip_increment", "", acl, zkutil::CreateMode::Persistent));
			ops.emplace_back(std::make_unique<zkutil::Op::Remove>(month_path + "/skip_increment", -1));
M
Merge  
Michael Kolupaev 已提交
2921 2922 2923 2924 2925 2926 2927 2928 2929 2930
		}
		/// Игнорируем ошибки - не получиться могло только если кто-то еще выполнил эту строчку раньше нас.
		zookeeper->tryMulti(ops);
	}

	return AbandonableLockInZooKeeper(
		zookeeper_path + "/block_numbers/" + month_name + "/block-",
		zookeeper_path + "/temp", *zookeeper);
}

A
Merge  
Alexey Milovidov 已提交
2931

2932
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry)
M
Merge  
Michael Kolupaev 已提交
2933
{
M
Merge  
Michael Kolupaev 已提交
2934 2935
	LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);

2936
	Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
A
Merge  
Alexey Milovidov 已提交
2937 2938 2939 2940 2941 2942 2943
	for (const String & replica : replicas)
		waitForReplicaToProcessLogEntry(replica, entry);

	LOG_DEBUG(log, "Finished waiting for all replicas to process " << entry.znode_name);
}


2944
void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
A
Merge  
Alexey Milovidov 已提交
2945
{
2946 2947
	String entry_str = entry.toString();
	String log_node_name;
M
Merge  
Michael Kolupaev 已提交
2948

2949 2950 2951 2952 2953 2954 2955 2956 2957
	/** В эту функцию могут передать entry двух видов:
	  * 1. (более часто) Из директории log - общего лога, откуда реплики копируют записи в свою queue.
	  * 2. Из директории queue одной из реплик.
	  *
	  * Проблема в том, что номера (sequential нод) элементов очереди в log и в queue не совпадают.
	  * (И в queue не совпадают номера у одного и того же элемента лога для разных реплик.)
	  *
	  * Поэтому следует рассматривать эти случаи по-отдельности.
	  */
M
Merge  
Michael Kolupaev 已提交
2958

2959 2960 2961 2962 2963 2964
	/** Первое - нужно дождаться, пока реплика возьмёт к себе в queue элемент очереди из log,
	  *  если она ещё этого не сделала (см. функцию pullLogsToQueue).
	  *
	  * Для этого проверяем её узел log_pointer - максимальный номер взятого элемента из log плюс единица.
	  */

2965
	if (startsWith(entry.znode_name, "log-"))
A
Merge  
Alexey Milovidov 已提交
2966
	{
2967 2968
		/** В этом случае просто берём номер из имени ноды log-xxxxxxxxxx.
		  */
M
Merge  
Michael Kolupaev 已提交
2969

2970 2971
		UInt64 log_index = parse<UInt64>(entry.znode_name.substr(entry.znode_name.size() - 10));
		log_node_name = entry.znode_name;
M
Merge  
Michael Kolupaev 已提交
2972

2973 2974 2975 2976 2977
		LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");

		/// Дождемся, пока запись попадет в очередь реплики.
		while (true)
		{
2978
			zkutil::EventPtr event = std::make_shared<Poco::Event>();
2979

2980
			String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
2981 2982 2983
			if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
				break;

2984
			event->wait();
2985
		}
A
Merge  
Alexey Milovidov 已提交
2986
	}
2987
	else if (startsWith(entry.znode_name, "queue-"))
2988 2989 2990 2991 2992
	{
		/** В этом случае номер log-ноды неизвестен. Нужно просмотреть все от log_pointer до конца,
		  *  ища ноду с таким же содержимым. И если мы её не найдём - значит реплика уже взяла эту запись в свою queue.
		  */

2993
		String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer");
M
Merge  
Michael Kolupaev 已提交
2994

2995
		Strings log_entries = getZooKeeper()->getChildren(zookeeper_path + "/log");
2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006
		UInt64 log_index = 0;
		bool found = false;

		for (const String & log_entry_name : log_entries)
		{
			log_index = parse<UInt64>(log_entry_name.substr(log_entry_name.size() - 10));

			if (!log_pointer.empty() && log_index < parse<UInt64>(log_pointer))
				continue;

			String log_entry_str;
3007
			bool exists = getZooKeeper()->tryGet(zookeeper_path + "/log/" + log_entry_name, log_entry_str);
3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022
			if (exists && entry_str == log_entry_str)
			{
				found = true;
				log_node_name = log_entry_name;
				break;
			}
		}

		if (found)
		{
			LOG_DEBUG(log, "Waiting for " << replica << " to pull " << log_node_name << " to queue");

			/// Дождемся, пока запись попадет в очередь реплики.
			while (true)
			{
3023
				zkutil::EventPtr event = std::make_shared<Poco::Event>();
3024

3025
				String log_pointer = getZooKeeper()->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
3026 3027 3028
				if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
					break;

3029
				event->wait();
3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044
			}
		}
	}
	else
		throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR);

	if (!log_node_name.empty())
		LOG_DEBUG(log, "Looking for node corresponding to " << log_node_name << " in " << replica << " queue");
	else
		LOG_DEBUG(log, "Looking for corresponding node in " << replica << " queue");

	/** Второе - найдем соответствующую запись в очереди указанной реплики (replica).
	  * Её номер может не совпадать ни с log-узлом, ни с queue-узлом у текущей реплики (у нас).
	  * Поэтому, ищем путём сравнения содержимого.
	  */
M
Merge  
Michael Kolupaev 已提交
3045

3046
	Strings queue_entries = getZooKeeper()->getChildren(zookeeper_path + "/replicas/" + replica + "/queue");
3047
	String queue_entry_to_wait_for;
M
Merge  
Michael Kolupaev 已提交
3048

A
Merge  
Alexey Milovidov 已提交
3049 3050 3051
	for (const String & entry_name : queue_entries)
	{
		String queue_entry_str;
3052
		bool exists = getZooKeeper()->tryGet(zookeeper_path + "/replicas/" + replica + "/queue/" + entry_name, queue_entry_str);
3053
		if (exists && queue_entry_str == entry_str)
M
Merge  
Michael Kolupaev 已提交
3054
		{
3055
			queue_entry_to_wait_for = entry_name;
A
Merge  
Alexey Milovidov 已提交
3056
			break;
M
Merge  
Michael Kolupaev 已提交
3057
		}
A
Merge  
Alexey Milovidov 已提交
3058
	}
M
Merge  
Michael Kolupaev 已提交
3059

A
Merge  
Alexey Milovidov 已提交
3060
	/// Пока искали запись, ее уже выполнили и удалили.
3061 3062
	if (queue_entry_to_wait_for.empty())
	{
A
Alexey Milovidov 已提交
3063
		LOG_DEBUG(log, "No corresponding node found. Assuming it has been already processed." " Found " << queue_entries.size() << " nodes.");
A
Merge  
Alexey Milovidov 已提交
3064
		return;
3065 3066 3067
	}

	LOG_DEBUG(log, "Waiting for " << queue_entry_to_wait_for << " to disappear from " << replica << " queue");
M
Merge  
Michael Kolupaev 已提交
3068

3069
	/// Третье - дождемся, пока запись исчезнет из очереди реплики.
3070
	getZooKeeper()->waitForDisappear(zookeeper_path + "/replicas/" + replica + "/queue/" + queue_entry_to_wait_for);
M
Merge  
Michael Kolupaev 已提交
3071 3072 3073
}


3074
void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
3075
{
A
Merge  
Alexey Milovidov 已提交
3076
	auto zookeeper = tryGetZooKeeper();
A
Merge  
Alexey Milovidov 已提交
3077

3078
	res.is_leader = is_leader_node;
3079
	res.is_readonly = is_readonly;
3080
	res.is_session_expired = !zookeeper || zookeeper->expired();
3081

A
Alexey Milovidov 已提交
3082
	res.queue = queue.getStatus();
3083
	res.parts_to_check = part_check_thread.size();
3084 3085 3086 3087 3088 3089

	res.zookeeper_path = zookeeper_path;
	res.replica_name = replica_name;
	res.replica_path = replica_path;
	res.columns_version = columns_version;

3090
	if (res.is_session_expired || !with_zk_fields)
3091 3092 3093 3094 3095 3096 3097 3098 3099 3100
	{
		res.log_max_index = 0;
		res.log_pointer = 0;
		res.total_replicas = 0;
		res.active_replicas = 0;
	}
	else
	{
		auto log_entries = zookeeper->getChildren(zookeeper_path + "/log");

3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112
		if (log_entries.empty())
		{
			res.log_max_index = 0;
		}
		else
		{
			const String & last_log_entry = *std::max_element(log_entries.begin(), log_entries.end());
			res.log_max_index = parse<UInt64>(last_log_entry.substr(strlen("log-")));
		}

		String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
		res.log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123

		auto all_replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
		res.total_replicas = all_replicas.size();

		res.active_replicas = 0;
		for (const String & replica : all_replicas)
			if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
				++res.active_replicas;
	}
}

3124

3125 3126 3127
void StorageReplicatedMergeTree::getQueue(LogEntriesData & res, String & replica_name_)
{
	replica_name_ = replica_name;
A
Alexey Milovidov 已提交
3128
	queue.getEntries(res);
3129 3130 3131
}


3132
void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, time_t & out_relative_delay)
3133
{
3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150
	assertNotReadonly();

	/** Абсолютная задержка - задержка отставания текущей реплики от реального времени.
	  */

	time_t min_unprocessed_insert_time = 0;
	time_t max_processed_insert_time = 0;
	queue.getInsertTimes(min_unprocessed_insert_time, max_processed_insert_time);

	time_t current_time = time(0);
	out_absolute_delay = 0;
	out_relative_delay = 0;

	if (min_unprocessed_insert_time)
		out_absolute_delay = current_time - min_unprocessed_insert_time;

	/** Относительная задержка - максимальная разница абсолютной задержки от какой-либо другой реплики,
3151
	  *  (если эта реплика отстаёт от какой-либо другой живой реплики, или ноль, иначе).
3152 3153 3154 3155 3156 3157 3158 3159 3160
	  * Вычисляется только если абсолютная задержка достаточно большая.
	  */

	if (out_absolute_delay < static_cast<time_t>(data.settings.min_relative_delay_to_yield_leadership))
		return;

	auto zookeeper = getZooKeeper();

	time_t max_replicas_unprocessed_insert_time = 0;
3161 3162
	bool have_replica_with_nothing_unprocessed = false;

3163 3164 3165 3166 3167 3168 3169
	Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");

	for (const auto & replica : replicas)
	{
		if (replica == replica_name)
			continue;

3170 3171 3172 3173
		/// Пропускаем неживые реплики.
		if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
			continue;

3174 3175 3176 3177 3178
		String value;
		if (!zookeeper->tryGet(zookeeper_path + "/replicas/" + replica + "/min_unprocessed_insert_time", value))
			continue;

		time_t replica_time = value.empty() ? 0 : parse<time_t>(value);
3179 3180 3181

		if (replica_time == 0)
		{
3182 3183 3184 3185 3186 3187 3188 3189
			/** Замечание:
			  * Вывод о том, что реплика не отстаёт, может быть неверен,
			  *  потому что информация о min_unprocessed_insert_time берётся
			  *  только из той части лога, которая перенесена в очередь.
			  * Если у реплики почему-то не работает queueUpdatingThread,
			  *  то и min_unprocessed_insert_time будет неправильным.
			  */

3190 3191 3192 3193
			have_replica_with_nothing_unprocessed = true;
			break;
		}

3194 3195 3196
		if (replica_time > max_replicas_unprocessed_insert_time)
			max_replicas_unprocessed_insert_time = replica_time;
	}
3197

3198 3199 3200
	if (have_replica_with_nothing_unprocessed)
		out_relative_delay = out_absolute_delay;
	else if (max_replicas_unprocessed_insert_time > min_unprocessed_insert_time)
3201
		out_relative_delay = max_replicas_unprocessed_insert_time - min_unprocessed_insert_time;
3202 3203 3204
}


3205
void StorageReplicatedMergeTree::fetchPartition(const Field & partition, const String & from_, const Settings & settings)
3206
{
3207 3208
	String partition_str = MergeTreeData::getMonthName(partition);

3209
	String from = from_;
3210 3211
	if (from.back() == '/')
		from.resize(from.size() - 1);
3212

3213 3214
	LOG_INFO(log, "Will fetch partition " << partition_str << " from shard " << from_);

3215 3216 3217 3218 3219
	/** Проверим, что в директории detached (куда мы будем записывать скаченные куски) ещё нет такой партиции.
	  * Ненадёжно (есть race condition) - такая партиция может появиться чуть позже.
	  */
	Poco::DirectoryIterator dir_end;
	for (Poco::DirectoryIterator dir_it{data.getFullPath() + "detached/"}; dir_it != dir_end; ++dir_it)
3220
		if (startsWith(dir_it.name(), partition_str))
3221
			throw Exception("Detached partition " + partition_str + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
3222

3223
	zkutil::Strings replicas;
3224
	zkutil::Strings active_replicas;
3225
	String best_replica;
3226

3227 3228
	{
		auto zookeeper = getZooKeeper();
3229

3230 3231
		/// Список реплик шарда-источника.
		replicas = zookeeper->getChildren(from + "/replicas");
3232

3233 3234
		/// Оставим только активные реплики.
		active_replicas.reserve(replicas.size());
3235

3236 3237 3238
		for (const String & replica : replicas)
			if (zookeeper->exists(from + "/replicas/" + replica + "/is_active"))
				active_replicas.push_back(replica);
3239

3240 3241
		if (active_replicas.empty())
			throw Exception("No active replicas for shard " + from, ErrorCodes::NO_ACTIVE_REPLICAS);
3242

3243 3244 3245 3246 3247 3248 3249 3250
		/** Надо выбрать лучшую (наиболее актуальную) реплику.
		* Это реплика с максимальным log_pointer, затем с минимальным размером queue.
		* NOTE Это не совсем лучший критерий. Для скачивания старых партиций это не имеет смысла,
		*  и было бы неплохо уметь выбирать реплику, ближайшую по сети.
		* NOTE Разумеется, здесь есть data race-ы. Можно решить ретраями.
		*/
		Int64 max_log_pointer = -1;
		UInt64 min_queue_size = std::numeric_limits<UInt64>::max();
3251

3252
		for (const String & replica : active_replicas)
3253
		{
3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269
			String current_replica_path = from + "/replicas/" + replica;

			String log_pointer_str = zookeeper->get(current_replica_path + "/log_pointer");
			Int64 log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);

			zkutil::Stat stat;
			zookeeper->get(current_replica_path + "/queue", &stat);
			size_t queue_size = stat.numChildren;

			if (log_pointer > max_log_pointer
				|| (log_pointer == max_log_pointer && queue_size < min_queue_size))
			{
				max_log_pointer = log_pointer;
				min_queue_size = queue_size;
				best_replica = replica;
			}
3270 3271 3272 3273 3274 3275 3276 3277 3278
		}
	}

	if (best_replica.empty())
		throw Exception("Logical error: cannot choose best replica.", ErrorCodes::LOGICAL_ERROR);

	LOG_INFO(log, "Found " << replicas.size() << " replicas, " << active_replicas.size() << " of them are active."
		<< " Selected " << best_replica << " to fetch from.");

3279 3280
	String best_replica_path = from + "/replicas/" + best_replica;

3281
	/// Выясним, какие куски есть на лучшей реплике.
3282

3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297
	/** Пытаемся скачать эти куски.
	  * Часть из них могла удалиться из-за мерджа.
	  * В этом случае, обновляем информацию о доступных кусках и пробуем снова.
	  */

	unsigned try_no = 0;
	Strings missing_parts;
	do
	{
		if (try_no)
			LOG_INFO(log, "Some of parts (" << missing_parts.size() << ") are missing. Will try to fetch covering parts.");

		if (try_no >= 5)
			throw Exception("Too much retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MUCH_RETRIES_TO_FETCH_PARTS);

3298
		Strings parts = getZooKeeper()->getChildren(best_replica_path + "/parts");
3299 3300 3301 3302 3303 3304 3305 3306 3307 3308
		ActiveDataPartSet active_parts_set(parts);
		Strings parts_to_fetch;

		if (missing_parts.empty())
		{
			parts_to_fetch = active_parts_set.getParts();

			/// Оставляем только куски нужной партиции.
			Strings parts_to_fetch_partition;
			for (const String & part : parts_to_fetch)
3309
				if (startsWith(part, partition_str))
3310 3311 3312
					parts_to_fetch_partition.push_back(part);

			parts_to_fetch = std::move(parts_to_fetch_partition);
3313 3314 3315

			if (parts_to_fetch.empty())
				throw Exception("Partition " + partition_str + " on " + best_replica_path + " doesn't exist", ErrorCodes::PARTITION_DOESNT_EXIST);
3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335
		}
		else
		{
			for (const String & missing_part : missing_parts)
			{
				String containing_part = active_parts_set.getContainingPart(missing_part);
				if (!containing_part.empty())
					parts_to_fetch.push_back(containing_part);
				else
					LOG_WARNING(log, "Part " << missing_part << " on replica " << best_replica_path << " has been vanished.");
			}
		}

		LOG_INFO(log, "Parts to fetch: " << parts_to_fetch.size());

		missing_parts.clear();
		for (const String & part : parts_to_fetch)
		{
			try
			{
3336
				fetchPart(part, best_replica_path, true, 0);
3337 3338 3339 3340 3341 3342 3343 3344 3345 3346
			}
			catch (const DB::Exception & e)
			{
				if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER)
					throw;

				LOG_INFO(log, e.displayText());
				missing_parts.push_back(part);
			}
		}
3347

3348 3349
		++try_no;
	} while (!missing_parts.empty());
3350 3351 3352
}


3353
void StorageReplicatedMergeTree::freezePartition(const Field & partition, const String & with_name, const Settings & settings)
3354 3355 3356 3357 3358 3359
{
	/// Префикс может быть произвольным. Не обязательно месяц - можно указать лишь год.
	String prefix = partition.getType() == Field::Types::UInt64
		? toString(partition.get<UInt64>())
		: partition.safeGet<String>();

3360
	data.freezePartition(prefix, with_name);
3361
	if (unreplicated_data)
3362
		unreplicated_data->freezePartition(prefix, with_name);
3363 3364
}

3365

A
Merge  
Alexey Arno 已提交
3366 3367 3368
void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & database_name,
	const Field & first_partition, const Field & last_partition,
	const WeightedZooKeeperPaths & weighted_zookeeper_paths,
A
Merge  
Alexey Arno 已提交
3369
	const ASTPtr & sharding_key_expr, bool do_copy, const Field & coordinator,
A
Merge  
Alexey Milovidov 已提交
3370 3371 3372 3373
	const Settings & settings)
{
	auto & resharding_worker = context.getReshardingWorker();
	if (!resharding_worker.isStarted())
A
Merge  
Alexey Arno 已提交
3374
		throw Exception{"Resharding background thread is not running", ErrorCodes::RESHARDING_NO_WORKER};
A
Merge  
Alexey Milovidov 已提交
3375

3376
	bool has_coordinator = !coordinator.isNull();
A
Merge  
Alexey Arno 已提交
3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391
	std::string coordinator_id;
	UInt64 block_number = 0;

	/// List of local partitions that need to be resharded.
	ReshardingWorker::PartitionList partition_list;

	/// The aforementioned list comprises:
	/// - first, the list of partitions that are to be resharded on more than one
	/// shard. Given any such partition, a job runs on each shard under the supervision
	/// of a coordinator;
	/// - second, the list of partitions that are to be resharded only on this shard.
	/// The iterator below indicates the beginning of the list of these so-called
	/// uncoordinated partitions.
	ReshardingWorker::PartitionList::const_iterator uncoordinated_begin;

A
Merge  
Alexey Arno 已提交
3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408
	std::string dumped_coordinator_state;

	auto handle_exception = [&](const std::string & msg = "")
	{
		try
		{
			/// Before jobs are submitted, errors and cancellations are both
			/// considered as errors.
			resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR, msg);
			dumped_coordinator_state = resharding_worker.dumpCoordinatorState(coordinator_id);
		}
		catch (...)
		{
			tryLogCurrentException(__PRETTY_FUNCTION__);
		}
	};

A
Merge  
Alexey Arno 已提交
3409
	try
A
Merge  
Alexey Arno 已提交
3410
	{
3411 3412 3413 3414 3415
		zkutil::RWLock deletion_lock;

		if (has_coordinator)
		{
			coordinator_id = coordinator.get<const String &>();
A
Alexey Arno 已提交
3416
			deletion_lock = resharding_worker.createDeletionLock(coordinator_id);
3417 3418 3419 3420
		}

		zkutil::RWLock::Guard<zkutil::RWLock::Read, zkutil::RWLock::NonBlocking> guard{deletion_lock};
		if (!deletion_lock.ownsLock())
A
Merge  
Alexey Arno 已提交
3421
			throw Exception{"Coordinator has been deleted", ErrorCodes::RESHARDING_COORDINATOR_DELETED};
3422 3423 3424 3425

		if (has_coordinator)
			block_number = resharding_worker.subscribe(coordinator_id, queryToString(query));

3426
		NameAndTypePair column_desc = ITableDeclaration::getColumn(sharding_key_expr->getColumnName());
3427
		if (column_desc.type->isNullable())
3428 3429
			throw Exception{"Sharding key must not be nullable", ErrorCodes::RESHARDING_NULLABLE_SHARDING_KEY};

A
Merge  
Alexey Arno 已提交
3430
		for (const auto & weighted_path : weighted_zookeeper_paths)
A
Merge  
Alexey Arno 已提交
3431 3432 3433
		{
			UInt64 weight = weighted_path.second;
			if (weight == 0)
A
Merge  
Alexey Arno 已提交
3434
				throw Exception{"Shard has invalid weight", ErrorCodes::INVALID_SHARD_WEIGHT};
A
Merge  
Alexey Arno 已提交
3435
		}
A
Merge  
Alexey Arno 已提交
3436

A
Merge  
Alexey Arno 已提交
3437 3438 3439 3440 3441 3442 3443
		{
			std::vector<std::string> all_paths;
			all_paths.reserve(weighted_zookeeper_paths.size());
			for (const auto & weighted_path : weighted_zookeeper_paths)
				all_paths.push_back(weighted_path.first);
			std::sort(all_paths.begin(), all_paths.end());
			if (std::adjacent_find(all_paths.begin(), all_paths.end()) != all_paths.end())
A
Merge  
Alexey Arno 已提交
3444
				throw Exception{"Shard paths must be distinct", ErrorCodes::DUPLICATE_SHARD_PATHS};
A
Merge  
Alexey Arno 已提交
3445
		}
A
Merge  
Alexey Milovidov 已提交
3446

A
Merge  
Alexey Arno 已提交
3447 3448
		DayNum_t first_partition_num = !first_partition.isNull() ? MergeTreeData::getMonthDayNum(first_partition) : DayNum_t();
		DayNum_t last_partition_num = !last_partition.isNull() ? MergeTreeData::getMonthDayNum(last_partition) : DayNum_t();
A
Merge  
Alexey Milovidov 已提交
3449

A
Merge  
Alexey Arno 已提交
3450 3451 3452
		if (first_partition_num && last_partition_num)
		{
			if (first_partition_num > last_partition_num)
A
Merge  
Alexey Arno 已提交
3453
				throw Exception{"Invalid interval of partitions", ErrorCodes::INVALID_PARTITIONS_INTERVAL};
A
Merge  
Alexey Arno 已提交
3454
		}
A
Merge  
Alexey Milovidov 已提交
3455

A
Merge  
Alexey Arno 已提交
3456
		if (!first_partition_num && last_partition_num)
A
Merge  
Alexey Arno 已提交
3457
			throw Exception{"Received invalid parameters for resharding", ErrorCodes::RESHARDING_INVALID_PARAMETERS};
A
Merge  
Alexey Milovidov 已提交
3458

A
Merge  
Alexey Arno 已提交
3459
		bool include_all = !first_partition_num;
A
Merge  
Alexey Milovidov 已提交
3460

A
Merge  
Alexey Arno 已提交
3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475
		/// Составить список локальных партиций, которые надо перешардировать.
		std::set<std::string> unique_partition_list;
		const MergeTreeData::DataParts & data_parts = data.getDataParts();
		for (MergeTreeData::DataParts::iterator it = data_parts.cbegin(); it != data_parts.cend(); ++it)
		{
			const MergeTreeData::DataPartPtr & current_part = *it;
			DayNum_t month = current_part->month;
			if (include_all || ((month >= first_partition_num) && (month <= last_partition_num)))
				unique_partition_list.insert(MergeTreeData::getMonthName(month));
		}

		partition_list.assign(unique_partition_list.begin(), unique_partition_list.end());

		if (partition_list.empty())
		{
3476
			if (!has_coordinator)
A
Merge  
Alexey Arno 已提交
3477
				throw Exception{"No existing partition found", ErrorCodes::PARTITION_DOESNT_EXIST};
A
Merge  
Alexey Arno 已提交
3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489
		}
		else
		{
			/// Убедиться, что структуры локальной и реплицируемых таблиц совпадают.
			enforceShardsConsistency(weighted_zookeeper_paths);

			/// Проверить, что для всех задач имеется достаточно свободного места локально и на всех репликах.
			auto replica_to_space_info = gatherReplicaSpaceInfo(weighted_zookeeper_paths);
			for (const auto & partition : partition_list)
			{
				size_t partition_size = data.getPartitionSize(partition);
				if (!checkSpaceForResharding(replica_to_space_info, partition_size))
A
Merge  
Alexey Arno 已提交
3490 3491
					throw Exception{"Insufficient space available for resharding operation "
						"on partition " + partition, ErrorCodes::INSUFFICIENT_SPACE_FOR_RESHARDING};
A
Merge  
Alexey Arno 已提交
3492 3493 3494
			}
		}

3495
		if (has_coordinator)
A
Merge  
Alexey Arno 已提交
3496 3497 3498 3499 3500
		{
			size_t old_node_count = resharding_worker.getNodeCount(coordinator_id);
			resharding_worker.addPartitions(coordinator_id, partition_list);
			resharding_worker.waitForCheckCompletion(coordinator_id);

3501 3502
			/// At this point, all the performers know exactly the number of partitions
			/// that are to be processed.
A
Merge  
Alexey Arno 已提交
3503 3504 3505

			auto count = resharding_worker.getPartitionCount(coordinator_id);
			if (count == 0)
A
Merge  
Alexey Arno 已提交
3506
				throw Exception{"No existing partition found", ErrorCodes::PARTITION_DOESNT_EXIST};
A
Merge  
Alexey Arno 已提交
3507 3508 3509 3510 3511 3512 3513 3514 3515

			if (partition_list.empty())
			{
				/// We have no partitions, so we opt out.
				resharding_worker.unsubscribe(coordinator_id);
			}

			resharding_worker.waitForOptOutCompletion(coordinator_id, old_node_count);

3516 3517
			/// At this point, all the performers that actually have some partitions
			/// are in a coherent state.
A
Merge  
Alexey Arno 已提交
3518 3519 3520 3521 3522 3523 3524 3525

			if (partition_list.empty())
				return;

			if (resharding_worker.getNodeCount(coordinator_id) == 1)
			{
				/// Degenerate case: we are the only participating node.
				/// All our jobs are uncoordinated.
A
Alexey Arno 已提交
3526
				deletion_lock.release();
A
Merge  
Alexey Arno 已提交
3527 3528 3529 3530 3531 3532 3533 3534 3535
				resharding_worker.deleteCoordinator(coordinator_id);
				uncoordinated_begin = partition_list.cbegin();
			}
			else
			{
				/// Split the list of partitions into a list of coordinated jobs
				/// and a list of uncoordinated jobs.
				uncoordinated_begin = resharding_worker.categorizePartitions(coordinator_id, partition_list);
			}
A
Merge  
Alexey Milovidov 已提交
3536

A
Merge  
Alexey Arno 已提交
3537 3538 3539
			if (uncoordinated_begin == partition_list.cbegin())
			{
				coordinator_id.clear();
3540
				has_coordinator = false;
A
Merge  
Alexey Arno 已提交
3541 3542 3543 3544 3545 3546 3547
			}
		}
		else
		{
			/// All our jobs are uncoordinated.
			uncoordinated_begin = partition_list.cbegin();
		}
A
Merge  
Alexey Milovidov 已提交
3548

A
Merge  
Alexey Arno 已提交
3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559
		/// First, submit coordinated background resharding jobs.
		for (auto it = partition_list.cbegin(); it != uncoordinated_begin; ++it)
		{
			ReshardingJob job;
			job.database_name = database_name;
			job.table_name = getTableName();
			job.partition = *it;
			job.paths = weighted_zookeeper_paths;
			job.sharding_key_expr = sharding_key_expr;
			job.coordinator_id = coordinator_id;
			job.block_number = block_number;
A
Merge  
Alexey Arno 已提交
3560
			job.do_copy = do_copy;
A
Merge  
Alexey Milovidov 已提交
3561

A
Merge  
Alexey Arno 已提交
3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573
			resharding_worker.submitJob(job);
		}

		/// Then, submit uncoordinated background resharding jobs.
		for (auto it = uncoordinated_begin; it != partition_list.cend(); ++it)
		{
			ReshardingJob job;
			job.database_name = database_name;
			job.table_name = getTableName();
			job.partition = *it;
			job.paths = weighted_zookeeper_paths;
			job.sharding_key_expr = sharding_key_expr;
A
Merge  
Alexey Arno 已提交
3574
			job.do_copy = do_copy;
A
Merge  
Alexey Arno 已提交
3575 3576 3577 3578 3579

			resharding_worker.submitJob(job);
		}
	}
	catch (const Exception & ex)
A
Merge  
Alexey Milovidov 已提交
3580
	{
3581
		if (has_coordinator)
A
Merge  
Alexey Arno 已提交
3582 3583 3584 3585 3586 3587
		{
			if ((ex.code() == ErrorCodes::RESHARDING_NO_SUCH_COORDINATOR) ||
				(ex.code() == ErrorCodes::RESHARDING_NO_COORDINATOR_MEMBERSHIP) ||
				(ex.code() == ErrorCodes::RESHARDING_ALREADY_SUBSCRIBED) ||
				(ex.code() == ErrorCodes::RESHARDING_INVALID_QUERY))
			{
3588 3589 3590 3591 3592 3593
				/// Any of these errors occurs only when a user attempts to send
				/// manually a query ALTER TABLE ... RESHARD ... that specifies
				/// the parameter COORDINATE WITH, in spite of the fact that no user
				/// should ever use this parameter. Since taking into account such
				/// errors may botch an ongoing distributed resharding job, we
				/// intentionally ignore them.
A
Merge  
Alexey Arno 已提交
3594
			}
3595
			else if ((ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK) ||
A
Merge  
Alexey Arno 已提交
3596
				(ex.code() == ErrorCodes::NO_SUCH_BARRIER) ||
3597
				(ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED))
3598
			{
3599 3600
				/// For any reason the coordinator has disappeared. So obviously
				/// we don't have any means to notify other nodes of an error.
A
Merge  
Alexey Arno 已提交
3601
			}
3602 3603 3604 3605
			else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
			{
				/// nothing here
			}
A
Merge  
Alexey Arno 已提交
3606
			else
3607
			{
A
Merge  
Alexey Arno 已提交
3608
				handle_exception(ex.message());
3609 3610
				LOG_ERROR(log, dumped_coordinator_state);
			}
A
Merge  
Alexey Arno 已提交
3611 3612 3613
		}

		throw;
A
Merge  
Alexey Milovidov 已提交
3614
	}
A
Merge  
Alexey Arno 已提交
3615 3616 3617 3618 3619 3620 3621 3622 3623
	catch (const std::exception & ex)
	{
		if (has_coordinator)
		{
			handle_exception(ex.what());
			LOG_ERROR(log, dumped_coordinator_state);
		}
		throw;
	}
A
Merge  
Alexey Arno 已提交
3624 3625
	catch (...)
	{
3626
		if (has_coordinator)
A
Merge  
Alexey Arno 已提交
3627
		{
A
Merge  
Alexey Arno 已提交
3628 3629
			handle_exception();
			LOG_ERROR(log, dumped_coordinator_state);
A
Merge  
Alexey Arno 已提交
3630 3631 3632
		}
		throw;
	}
A
Merge  
Alexey Milovidov 已提交
3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645
}

void StorageReplicatedMergeTree::enforceShardsConsistency(const WeightedZooKeeperPaths & weighted_zookeeper_paths)
{
	const auto & columns = getColumnsList();

	auto zookeeper = getZooKeeper();

	for (const auto & weighted_path : weighted_zookeeper_paths)
	{
		auto columns_str = zookeeper->get(weighted_path.first + "/columns");
		auto columns_desc = ColumnsDescription<true>::parse(columns_str);

3646
		if (!std::equal(columns.begin(), columns.end(), columns_desc.columns.begin()))
A
Merge  
Alexey Arno 已提交
3647
			throw Exception{"Table is inconsistent accross shards", ErrorCodes::INCONSISTENT_TABLE_ACCROSS_SHARDS};
A
Merge  
Alexey Milovidov 已提交
3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661
	}
}

StorageReplicatedMergeTree::ReplicaToSpaceInfo
StorageReplicatedMergeTree::gatherReplicaSpaceInfo(const WeightedZooKeeperPaths & weighted_zookeeper_paths)
{
	struct TaskInfo
	{
		TaskInfo(const std::string & replica_path_,
			const ReplicatedMergeTreeAddress & address_)
			: replica_path(replica_path_), address(address_)
		{
		}

3662 3663 3664 3665 3666 3667
		TaskInfo(const TaskInfo &) = delete;
		TaskInfo & operator=(const TaskInfo &) = delete;

		TaskInfo(TaskInfo &&) = default;
		TaskInfo & operator=(TaskInfo &&) = default;

A
Merge  
Alexey Milovidov 已提交
3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690
		std::string replica_path;
		ReplicatedMergeTreeAddress address;
	};

	using TaskInfoList = std::vector<TaskInfo>;
	TaskInfoList task_info_list;

	ReplicaToSpaceInfo replica_to_space_info;

	/// Теперь проверяем наличие свободного места на удаленных репликах.
	UInt64 total_weight = 0;
	for (const auto & weighted_path : weighted_zookeeper_paths)
	{
		UInt64 weight = weighted_path.second;
		total_weight += weight;
	}

	auto & local_space_info = replica_to_space_info[replica_path];
	local_space_info.factor = 1.1;
	local_space_info.available_size = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);

	for (const auto & weighted_path : weighted_zookeeper_paths)
	{
3691 3692
		auto zookeeper = getZooKeeper();

A
Merge  
Alexey Milovidov 已提交
3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713
		const auto & path = weighted_path.first;
		UInt64 weight = weighted_path.second;

		long double factor = (weight / static_cast<long double>(total_weight)) * 1.1;

		auto children = zookeeper->getChildren(path + "/replicas");
		for (const auto & child : children)
		{
			const std::string child_replica_path = path + "/replicas/" + child;
			if (child_replica_path != replica_path)
			{
				replica_to_space_info[child_replica_path].factor = factor;

				auto host = zookeeper->get(child_replica_path + "/host");
				ReplicatedMergeTreeAddress host_desc(host);

				task_info_list.emplace_back(child_replica_path, host_desc);
			}
		}
	}

3714
	ThreadPool pool(task_info_list.size());
A
Merge  
Alexey Milovidov 已提交
3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726

	using Tasks = std::vector<std::packaged_task<size_t()> >;
	Tasks tasks(task_info_list.size());

	try
	{
		for (size_t i = 0; i < task_info_list.size(); ++i)
		{
			const auto & entry = task_info_list[i];
			const auto & replica_path = entry.replica_path;
			const auto & address = entry.address;

A
Merge  
Alexey Arno 已提交
3727
			InterserverIOEndpointLocation location{replica_path, address.host, address.replication_port};
A
Merge  
Alexey Milovidov 已提交
3728

A
Merge  
Alexey Arno 已提交
3729 3730
			tasks[i] = Tasks::value_type{std::bind(&RemoteDiskSpaceMonitor::Client::getFreeSpace,
				&disk_space_monitor_client, location)};
A
Merge  
Alexey Milovidov 已提交
3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763
			pool.schedule([i, &tasks]{ tasks[i](); });
		}
	}
	catch (...)
	{
		pool.wait();
		throw;
	}

	pool.wait();

	for (size_t i = 0; i < task_info_list.size(); ++i)
	{
		size_t remote_available_size = tasks[i].get_future().get();
		const auto & remote_replica_path = task_info_list[i].replica_path;
		replica_to_space_info.at(remote_replica_path).available_size = remote_available_size;
	}

	return replica_to_space_info;
}

bool StorageReplicatedMergeTree::checkSpaceForResharding(const ReplicaToSpaceInfo & replica_to_space_info,
	size_t partition_size) const
{
	/// Безопасное умножение.
	auto scale_size = [](size_t size, long double factor)
	{
		feclearexcept(FE_OVERFLOW);
		feclearexcept(FE_UNDERFLOW);

		long double result = static_cast<long double>(size) * factor;

		if ((fetestexcept(FE_OVERFLOW) != 0) || (fetestexcept(FE_UNDERFLOW) != 0))
A
Merge  
Alexey Arno 已提交
3764
			throw Exception{"StorageReplicatedMergeTree: floating point exception triggered", ErrorCodes::LOGICAL_ERROR};
A
Merge  
Alexey Milovidov 已提交
3765
		if (result > static_cast<long double>(std::numeric_limits<size_t>::max()))
A
Merge  
Alexey Arno 已提交
3766
			throw Exception{"StorageReplicatedMergeTree: integer overflow", ErrorCodes::LOGICAL_ERROR};
A
Merge  
Alexey Milovidov 已提交
3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780

		return static_cast<size_t>(result);
	};

	for (const auto & entry : replica_to_space_info)
	{
		const auto & info = entry.second;
		size_t required_size = scale_size(partition_size, info.factor);
		if (info.available_size < required_size)
			return false;
	}

	return true;
}
3781

M
Merge  
Michael Kolupaev 已提交
3782
}