提交 faafb365 编写于 作者: M Michael Kolupaev

Merge

上级 201e6318
......@@ -353,6 +353,43 @@ public:
typedef std::vector<DataPartPtr> DataPartsVector;
/// Некоторые операции над множеством кусков могут возвращать такой объект.
/// Если не был вызван commit, деструктор откатывает операцию.
class Transaction : private boost::noncopyable
{
public:
Transaction() {}
void commit()
{
data = nullptr;
removed_parts.clear();
added_parts.clear();
}
~Transaction()
{
try
{
if (data && (!removed_parts.empty() || !added_parts.empty()))
{
LOG_DEBUG(data->log, "Undoing transaction");
data->replaceParts(removed_parts, added_parts);
}
}
catch(...)
{
tryLogCurrentException("~MergeTreeData::Transaction");
}
}
private:
friend class MergeTreeData;
MergeTreeData * data = nullptr;
DataPartsVector removed_parts;
DataPartsVector added_parts;
};
/// Режим работы. См. выше.
enum Mode
{
......@@ -422,13 +459,18 @@ public:
/** Переименовывает временный кусок в постоянный и добавляет его в рабочий набор.
* Если increment!=nullptr, индекс куска берется из инкремента. Иначе индекс куска не меняется.
* Предполагается, что кусок не пересекается с существующими.
* Если out_transaction не nullptr, присваивает туда объект, позволяющий откатить добавление куска (но не переименование).
*/
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment = nullptr);
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
/** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски.
* Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке).
*/
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr);
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr, Transaction * out_transaction = nullptr);
/** Убирает из рабочего набора куски remove и добавляет куски add.
*/
void replaceParts(const DataPartsVector & remove, const DataPartsVector & add);
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
* Лучше использовать только когда никто не может читать или писать этот кусок
......
......@@ -35,7 +35,8 @@ public:
const AllowedMergingPredicate & can_merge);
/// Сливает куски.
MergeTreeData::DataPartPtr mergeParts(const MergeTreeData::DataPartsVector & parts, const String & merged_name);
MergeTreeData::DataPartPtr mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeTreeData::Transaction * out_transaction = nullptr);
/// Примерное количество места на диске, нужное для мерджа. С запасом.
size_t estimateDiskSpaceForMerge(const MergeTreeData::DataPartsVector & parts);
......
......@@ -45,7 +45,8 @@ public:
LOG_DEBUG(log, "Wrote block " << part_number << " with ID " << block_id);
storage.data.renameTempPartAndAdd(part);
MergeTreeData::Transaction transaction; /// Если не получится добавить кусок в ZK, снова уберем его из рабочего набора.
storage.data.renameTempPartAndAdd(part, nullptr, &transaction);
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
......@@ -81,39 +82,35 @@ public:
block_number_lock.getUnlockOps(ops);
auto code = storage.zookeeper->tryMulti(ops);
if (code != ZOK)
if (code == ZOK)
{
if (code == ZNODEEXISTS)
transaction.commit();
}
else if (code == ZNODEEXISTS)
{
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
/// Если блок с таким ID уже есть в таблице, не будем его вставлять, и удалим только что записанные данные.
/// NOTE: В короткое время между renameTempPartAndAdd и deletePart в таблице на этой реплике доступны
/// продублированные данные.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
auto found_checksums = part->checksums;
storage.data.deletePart(part);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
}
else
{
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
}
else
{
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
else
{
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
}
......
......@@ -584,9 +584,9 @@ void MergeTreeData::commitAlterModify(const ASTAlterQuery::Parameters & params)
}
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment, Transaction * out_transaction)
{
auto removed = renameTempPartAndReplace(part, increment);
auto removed = renameTempPartAndReplace(part, increment, out_transaction);
if (!removed.empty())
{
LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
......@@ -594,8 +594,12 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * in
}
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment)
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr part, Increment * increment, Transaction * out_transaction)
{
if (out_transaction && out_transaction->data)
throw Exception("Using the same MergeTreeData::Transaction for overlapping transactions is invalid");
LOG_TRACE(log, "Renaming " << part->name << ".");
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
......@@ -665,9 +669,33 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDa
all_data_parts.insert(part);
if (out_transaction)
{
out_transaction->data = this;
out_transaction->added_parts = res;
out_transaction->removed_parts = DataPartsVector(1, part);
}
return res;
}
void MergeTreeData::replaceParts(const DataPartsVector & remove, const DataPartsVector & add)
{
LOG_TRACE(log, "Removing " << remove.size() << " parts and adding " << add.size() << " parts.");
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
for (const DataPartPtr & part : remove)
{
part->remove_time = time(0);
data_parts.erase(part);
}
for (const DataPartPtr & part : add)
{
data_parts.insert(part);
}
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
......
......@@ -248,7 +248,8 @@ bool MergeTreeDataMerger::selectPartsToMerge(MergeTreeData::DataPartsVector & pa
/// parts должны быть отсортированы.
MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & parts, const String & merged_name)
MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(
const MergeTreeData::DataPartsVector & parts, const String & merged_name, MergeTreeData::Transaction * out_transaction)
{
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name << " into " << merged_name);
......@@ -329,7 +330,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::
new_data_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part_tmp_path);
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part);
auto replaced_parts = data.renameTempPartAndReplace(new_data_part, nullptr, out_transaction);
if (new_data_part->name != merged_name)
LOG_ERROR(log, "Unexpected part name: " << new_data_part->name << " instead of " << merged_name);
......
......@@ -776,12 +776,14 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
}
else
{
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);
MergeTreeData::Transaction transaction;
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
......@@ -1183,12 +1185,15 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
assertEOF(buf);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
auto removed_parts = data.renameTempPartAndReplace(part);
MergeTreeData::Transaction transaction;
auto removed_parts = data.renameTempPartAndReplace(part, nullptr, &transaction);
zkutil::Ops ops;
checkPartAndAddToZooKeeper(part, ops);
zookeeper->multi(ops);
transaction.commit();
for (const auto & removed_part : removed_parts)
{
......
......@@ -89,6 +89,8 @@ private:
{
while (!shutdown)
{
bool success = false;
try
{
Strings children = zookeeper.getChildren(path);
......@@ -106,6 +108,8 @@ private:
if (zookeeper.exists(path + "/" + *(it - 1), nullptr, event))
event->tryWait(60 * 1000);
success = true;
}
catch (const DB::Exception & e)
{
......@@ -126,6 +130,9 @@ private:
{
LOG_ERROR(log, "Unknown exception in LeaderElection");
}
if (!success)
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
};
......
......@@ -294,7 +294,7 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, EventPtr watch
if (tryGet(path, res, stat, watch))
return res;
else
throw KeeperException("Fail to get data for node " + path);
throw KeeperException("Can't get data for node " + path + ": node doesn't exist");
}
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, EventPtr watch)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册