StorageReplicatedMergeTree.cpp 11.6 KB
Newer Older
M
Merge  
Michael Kolupaev 已提交
1
#include <DB/Storages/StorageReplicatedMergeTree.h>
M
Merge  
Michael Kolupaev 已提交
2
#include <DB/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
M
Merge  
Michael Kolupaev 已提交
3
#include <DB/Storages/MergeTree/ReplicatedMergeTreePartsExchange.h>
M
Merge  
Michael Kolupaev 已提交
4 5 6
#include <DB/Parsers/formatAST.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/IO/ReadBufferFromString.h>
M
Merge  
Michael Kolupaev 已提交
7 8 9 10 11 12 13

namespace DB
{

StorageReplicatedMergeTree::StorageReplicatedMergeTree(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
14
	bool attach,
M
Merge  
Michael Kolupaev 已提交
15
	const String & path_, const String & name_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
16
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
17 18 19 20 21 22 23 24
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
	:
M
Merge  
Michael Kolupaev 已提交
25
	context(context_), zookeeper(context.getZooKeeper()),
M
Merge  
Michael Kolupaev 已提交
26 27 28 29 30
	path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), zookeeper_path(zookeeper_path_),
	replica_name(replica_name_),
	data(	full_path, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
			index_granularity_,mode_, sign_column_, settings_),
	reader(data), writer(data),
M
Merge  
Michael Kolupaev 已提交
31
	log(&Logger::get("StorageReplicatedMergeTree")),
M
Merge  
Michael Kolupaev 已提交
32 33
	shutdown_called(false)
{
M
Merge  
Michael Kolupaev 已提交
34 35 36
	if (!zookeeper_path.empty() && *zookeeper_path.rbegin() == '/')
		zookeeper_path.erase(zookeeper_path.end() - 1);
	replica_path = zookeeper_path + "/replicas/" + replica_name;
37

M
Merge  
Michael Kolupaev 已提交
38 39
	if (!attach)
	{
M
Merge  
Michael Kolupaev 已提交
40 41 42 43 44 45 46 47
		if (!zookeeper.exists(zookeeper_path))
			createTable();

		if (!isTableEmpty())
			throw Exception("Can't add new replica to non-empty table", ErrorCodes::ADDING_REPLICA_TO_NON_EMPTY_TABLE);

		checkTableStructure();
		createReplica();
M
Merge  
Michael Kolupaev 已提交
48 49 50 51
	}
	else
	{
		checkTableStructure();
M
Merge  
Michael Kolupaev 已提交
52
		checkParts();
M
Merge  
Michael Kolupaev 已提交
53
	}
M
Merge  
Michael Kolupaev 已提交
54 55

	activateReplica();
M
Merge  
Michael Kolupaev 已提交
56 57 58 59 60
}

StoragePtr StorageReplicatedMergeTree::create(
	const String & zookeeper_path_,
	const String & replica_name_,
M
Merge  
Michael Kolupaev 已提交
61
	bool attach,
M
Merge  
Michael Kolupaev 已提交
62
	const String & path_, const String & name_, NamesAndTypesListPtr columns_,
M
Merge  
Michael Kolupaev 已提交
63
	Context & context_,
M
Merge  
Michael Kolupaev 已提交
64 65 66 67 68 69 70 71
	ASTPtr & primary_expr_ast_,
	const String & date_column_name_,
	const ASTPtr & sampling_expression_,
	size_t index_granularity_,
	MergeTreeData::Mode mode_,
	const String & sign_column_,
	const MergeTreeSettings & settings_)
{
M
Merge  
Michael Kolupaev 已提交
72 73 74 75
	StorageReplicatedMergeTree * res = new StorageReplicatedMergeTree(zookeeper_path_, replica_name_, attach,
		path_, name_, columns_, context_, primary_expr_ast_, date_column_name_, sampling_expression_,
		index_granularity_, mode_, sign_column_, settings_);
	StoragePtr res_ptr = res->thisPtr();
M
Merge  
Michael Kolupaev 已提交
76 77 78
	String endpoint_name = "ReplicatedMergeTree:" + res->replica_path;
	InterserverIOEndpointPtr endpoint = new ReplicatedMergeTreePartsServer(res->data, res_ptr);
	res->endpoint_holder = new InterserverIOEndpointHolder(endpoint_name, endpoint, res->context.getInterserverIOHandler());
M
Merge  
Michael Kolupaev 已提交
79
	return res_ptr;
M
Merge  
Michael Kolupaev 已提交
80 81
}

M
Merge  
Michael Kolupaev 已提交
82 83 84 85 86 87 88 89
static String formattedAST(const ASTPtr & ast)
{
	if (!ast)
		return "";
	std::stringstream ss;
	formatAST(*ast, ss, 0, false, true);
	return ss.str();
}
M
Merge  
Michael Kolupaev 已提交
90

M
Merge  
Michael Kolupaev 已提交
91
void StorageReplicatedMergeTree::createTable()
M
Merge  
Michael Kolupaev 已提交
92
{
M
Merge  
Michael Kolupaev 已提交
93
	zookeeper.create(zookeeper_path, "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
94

M
Merge  
Michael Kolupaev 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
	/// Запишем метаданные таблицы, чтобы реплики могли сверять с ними свою локальную структуру таблицы.
	std::stringstream metadata;
	metadata << "metadata format version: 1" << std::endl;
	metadata << "date column: " << data.date_column_name << std::endl;
	metadata << "sampling expression: " << formattedAST(data.sampling_expression) << std::endl;
	metadata << "index granularity: " << data.index_granularity << std::endl;
	metadata << "mode: " << static_cast<int>(data.mode) << std::endl;
	metadata << "sign column: " << data.sign_column << std::endl;
	metadata << "primary key: " << formattedAST(data.primary_expr_ast) << std::endl;
	metadata << "columns:" << std::endl;
	WriteBufferFromOStream buf(metadata);
	for (auto & it : data.getColumnsList())
	{
		writeBackQuotedString(it.first, buf);
		writeChar(' ', buf);
		writeString(it.second->getName(), buf);
		writeChar('\n', buf);
	}
	buf.next();
M
Merge  
Michael Kolupaev 已提交
114

M
Merge  
Michael Kolupaev 已提交
115
	zookeeper.create(zookeeper_path + "/metadata", metadata.str(), zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
116

M
Merge  
Michael Kolupaev 已提交
117 118 119 120
	/// Создадим нужные "директории".
	zookeeper.create(zookeeper_path + "/replicas", "", zkutil::CreateMode::Persistent);
	zookeeper.create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
	zookeeper.create(zookeeper_path + "/block-numbers", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
121
	zookeeper.create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
M
Merge  
Michael Kolupaev 已提交
122
}
M
Merge  
Michael Kolupaev 已提交
123

M
Merge  
Michael Kolupaev 已提交
124 125
/** Проверить, что список столбцов и настройки таблицы совпадают с указанными в ZK (/metadata).
	* Если нет - бросить исключение.
M
Merge  
Michael Kolupaev 已提交
126
	*/
M
Merge  
Michael Kolupaev 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
void StorageReplicatedMergeTree::checkTableStructure()
{
	String metadata_str = zookeeper.get(zookeeper_path + "/metadata");
	ReadBufferFromString buf(metadata_str);
	assertString("metadata format version: 1", buf);
	assertString("\ndate column: ", buf);
	assertString(data.date_column_name, buf);
	assertString("\nsampling expression: ", buf);
	assertString(formattedAST(data.sampling_expression), buf);
	assertString("\nindex granularity: ", buf);
	assertString(toString(data.index_granularity), buf);
	assertString("\nmode: ", buf);
	assertString(toString(static_cast<int>(data.mode)), buf);
	assertString("\nsign column: ", buf);
	assertString(data.sign_column, buf);
	assertString("\nprimary key: ", buf);
	assertString(formattedAST(data.primary_expr_ast), buf);
	assertString("\ncolumns:\n", buf);
	for (auto & it : data.getColumnsList())
	{
		String name;
		readBackQuotedString(name, buf);
		if (name != it.first)
			throw Exception("Unexpected column name in ZooKeeper: expected " + it.first + ", found " + name,
				ErrorCodes::UNKNOWN_IDENTIFIER);
		assertString(" ", buf);
		assertString(it.second->getName(), buf);
		assertString("\n", buf);
	}
M
Merge  
Michael Kolupaev 已提交
156
	assertEOF(buf);
M
Merge  
Michael Kolupaev 已提交
157
}
M
Merge  
Michael Kolupaev 已提交
158

M
Merge  
Michael Kolupaev 已提交
159 160 161 162 163 164 165 166 167
void StorageReplicatedMergeTree::createReplica()
{
	zookeeper.create(replica_path, "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/host", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/log", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/log_pointers", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/queue", "", zkutil::CreateMode::Persistent);
	zookeeper.create(replica_path + "/parts", "", zkutil::CreateMode::Persistent);
}
M
Merge  
Michael Kolupaev 已提交
168

M
Merge  
Michael Kolupaev 已提交
169 170 171 172 173
void StorageReplicatedMergeTree::activateReplica()
{
	std::stringstream host;
	host << "host: " << context.getInterserverIOHost() << std::endl;
	host << "port: " << context.getInterserverIOPort() << std::endl;
M
Merge  
Michael Kolupaev 已提交
174

M
Merge  
Michael Kolupaev 已提交
175 176 177 178 179
	/// Одновременно объявим, что эта реплика активна и обновим хост.
	zkutil::Ops ops;
	ops.push_back(new zkutil::Op::Create(replica_path + "/is_active", "", zookeeper.getDefaultACL(), zkutil::CreateMode::Ephemeral));
	ops.push_back(new zkutil::Op::SetData(replica_path + "/host", host.str(), -1));
	zookeeper.multi(ops);
M
Merge  
Michael Kolupaev 已提交
180

M
Merge  
Michael Kolupaev 已提交
181 182
	replica_is_active_node = zkutil::EphemeralNodeHolder::existing(replica_path + "/is_active", zookeeper);
}
M
Merge  
Michael Kolupaev 已提交
183

M
Merge  
Michael Kolupaev 已提交
184 185 186 187 188 189 190 191 192 193
bool StorageReplicatedMergeTree::isTableEmpty()
{
	Strings replicas = zookeeper.getChildren(zookeeper_path + "/replicas");
	for (const auto & replica : replicas)
	{
		if (!zookeeper.getChildren(zookeeper_path + "/replicas/" + replica + "/parts").empty())
			return false;
	}
	return true;
}
M
Merge  
Michael Kolupaev 已提交
194

M
Merge  
Michael Kolupaev 已提交
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
void StorageReplicatedMergeTree::checkParts()
{
	Strings expected_parts_vec = zookeeper.getChildren(replica_path + "/parts");
	NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());

	MergeTreeData::DataParts parts = data.getDataParts();

	MergeTreeData::DataPartsVector unexpected_parts;
	for (const auto & part : parts)
	{
		if (expected_parts.count(part->name))
		{
			expected_parts.erase(part->name);
		}
		else
		{
			unexpected_parts.push_back(part);
		}
	}

	if (!expected_parts.empty())
		throw Exception("Not found " + toString(expected_parts.size())
M
Merge  
Michael Kolupaev 已提交
217
			+ " parts (including " + *expected_parts.begin() + ") in table " + data.getTableName(),
M
Merge  
Michael Kolupaev 已提交
218 219 220 221 222 223 224 225 226 227
			ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);

	if (unexpected_parts.size() > 1)
		throw Exception("More than one unexpected part (including " + unexpected_parts[0]->name
			+ ") in table " + data.getTableName(),
			ErrorCodes::TOO_MANY_UNEXPECTED_DATA_PARTS);

	for (MergeTreeData::DataPartPtr part : unexpected_parts)
	{
		LOG_ERROR(log, "Unexpected part " << part->name << ". Renaming it to ignored_" + part->name);
M
Merge  
Michael Kolupaev 已提交
228
		data.renameAndDetachPart(part, "ignored_");
M
Merge  
Michael Kolupaev 已提交
229 230
	}
}
M
Merge  
Michael Kolupaev 已提交
231 232 233 234 235 236 237 238 239 240 241 242

void StorageReplicatedMergeTree::loadQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

void StorageReplicatedMergeTree::pullLogsToQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

void StorageReplicatedMergeTree::optimizeQueue() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

void StorageReplicatedMergeTree::executeSomeQueueEntry() { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

bool StorageReplicatedMergeTree::tryExecute(const LogEntry & entry) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }
M
Merge  
Michael Kolupaev 已提交
243

M
Merge  
Michael Kolupaev 已提交
244 245
void StorageReplicatedMergeTree::getPart(const String & name, const String & replica_name) { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

M
Merge  
Michael Kolupaev 已提交
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
void StorageReplicatedMergeTree::shutdown()
{
	if (shutdown_called)
		return;
	shutdown_called = true;
	replica_is_active_node = nullptr;
	endpoint_holder = nullptr;

	/// Кажется, чтобы был невозможен дедлок, тут придется дождаться удаления MyInterserverIOEndpoint.
}

StorageReplicatedMergeTree::~StorageReplicatedMergeTree()
{
	try
	{
		shutdown();
	}
	catch(...)
	{
		tryLogCurrentException("~StorageReplicatedMergeTree");
	}
}

BlockInputStreams StorageReplicatedMergeTree::read(
		const Names & column_names,
		ASTPtr query,
		const Settings & settings,
		QueryProcessingStage::Enum & processed_stage,
		size_t max_block_size,
		unsigned threads)
{
	return reader.read(column_names, query, settings, processed_stage, max_block_size, threads);
}

M
Merge  
Michael Kolupaev 已提交
280 281 282 283 284 285 286 287
BlockOutputStreamPtr StorageReplicatedMergeTree::write(ASTPtr query)
{
	String insert_id;
	if (ASTInsertQuery * insert = dynamic_cast<ASTInsertQuery *>(&*query))
		insert_id = insert->insert_id;

	return new ReplicatedMergeTreeBlockOutputStream(*this, insert_id);
}
M
Merge  
Michael Kolupaev 已提交
288 289 290 291 292 293 294 295 296

void StorageReplicatedMergeTree::drop()
{
	replica_is_active_node = nullptr;
	zookeeper.removeRecursive(replica_path);
	if (zookeeper.getChildren(zookeeper_path + "/replicas").empty())
		zookeeper.removeRecursive(zookeeper_path);
}

M
Merge  
Michael Kolupaev 已提交
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
	writeString("format version: 1\n", out);
	switch (type)
	{
		case GET_PART:
			writeString("get\n", out);
			writeString(new_part_name, out);
			break;
		case MERGE_PARTS:
			writeString("merge\n", out);
			for (const String & s : parts_to_merge)
			{
				writeString(s, out);
				writeString("\n", out);
			}
			writeString("into\n", out);
			writeString(new_part_name, out);
			break;
	}
	writeString("\n", out);
}

void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
{
	String type_str;

	assertString("format version: 1\n", in);
	readString(type_str, in);
	assertString("\n", in);

	if (type_str == "get")
	{
		type = GET_PART;
		readString(new_part_name, in);
	}
	else if (type_str == "merge")
	{
		type = MERGE_PARTS;
		while (true)
		{
			String s;
			readString(s, in);
			assertString("\n", in);
			if (s == "into")
				break;
			parts_to_merge.push_back(s);
		}
		readString(new_part_name, in);
	}
	assertString("\n", in);
}

M
Merge  
Michael Kolupaev 已提交
350
}