提交 4e975b43 编写于 作者: M Michael Kolupaev

Merge

上级 4a45c7c0
......@@ -26,6 +26,10 @@
M(FunctionExecute, "Function executes") \
M(MarkCacheHits, "Mark cache hits") \
M(MarkCacheMisses, "Mark cache misses") \
M(ReplicatedPartFetches, "Replicated part fetches") \
M(ObsoleteReplicatedParts, "Replicated parts rendered obsolete by fetches") \
M(ReplicatedPartMerges, "Replicated part merges") \
M(ReplicatedPartFetchesOfMerged, "Replicated part merges replaced with fetches") \
\
M(END, "")
......
......@@ -346,14 +346,16 @@ public:
*/
DataPartPtr getContainingPart(const String & part_name);
/** Удаляет куски old_parts и добавляет кусок new_part. Если какого-нибудь из удаляемых кусков нет, бросает исключение.
*/
void replaceParts(DataPartsVector old_parts, DataPartPtr new_part);
/** Переименовывает временный кусок в постоянный и добавляет его в рабочий набор.
* Если increment!=nullptr, индекс куска берется из инкремента. Иначе индекс куска не меняется.
* Предполагается, что кусок не пересекается с существующими.
*/
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment = nullptr);
/** То же, что renameTempPartAndAdd, но кусок может покрывать существующие куски.
* Удаляет и возвращает все куски, покрытые добавляемым (в возрастающем порядке).
*/
void renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment);
DataPartsVector renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment = nullptr);
/** Переименовывает кусок в prefix_кусок и убирает его из рабочего набора.
* Лучше использовать только когда никто не может читать или писать этот кусок
......
......@@ -45,10 +45,11 @@ public:
continue;
}
storage.data.renameTempPartAndAdd(part, nullptr);
storage.data.renameTempPartAndAdd(part);
StorageReplicatedMergeTree::LogEntry log_entry;
log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART;
log_entry.source_replica = storage.replica_name;
log_entry.new_part_name = part->name;
String checksums_str = part->checksums.toString();
......
......@@ -67,6 +67,7 @@ public:
private:
friend class ReplicatedMergeTreeBlockOutputStream;
/// Добавляет куски в множество currently_merging.
struct CurrentlyMergingPartsTagger
{
Strings parts;
......@@ -105,6 +106,36 @@ private:
typedef Poco::SharedPtr<CurrentlyMergingPartsTagger> CurrentlyMergingPartsTaggerPtr;
/// Добавляет кусок в множество future_parts.
struct FuturePartTagger
{
String part;
StorageReplicatedMergeTree & storage;
FuturePartTagger(const String & part_, StorageReplicatedMergeTree & storage_)
: part(part_), storage(storage_)
{
if (!storage.future_parts.insert(part).second)
throw Exception("Tagging already tagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
~FuturePartTagger()
{
try
{
Poco::ScopedLock<Poco::FastMutex> lock(storage.queue_mutex);
if (!storage.currently_merging.erase(part))
throw Exception("Untagging already untagged future part " + part + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
};
typedef Poco::SharedPtr<FuturePartTagger> FuturePartTaggerPtr;
struct LogEntry
{
enum Type
......@@ -116,10 +147,12 @@ private:
String znode_name;
Type type;
String source_replica;
String new_part_name;
Strings parts_to_merge;
CurrentlyMergingPartsTaggerPtr currently_merging_tagger;
FuturePartTaggerPtr future_part_tagger;
void tagPartsAsCurrentlyMerging(StorageReplicatedMergeTree & storage)
{
......@@ -127,6 +160,12 @@ private:
currently_merging_tagger = new CurrentlyMergingPartsTagger(parts_to_merge, storage);
}
void tagPartAsFuture(StorageReplicatedMergeTree & storage)
{
if (type == MERGE_PARTS || type == GET_PART)
future_part_tagger = new FuturePartTagger(new_part_name, storage);
}
void writeText(WriteBuffer & out) const;
void readText(ReadBuffer & in);
......@@ -162,12 +201,17 @@ private:
StringSet currently_merging;
Poco::FastMutex currently_merging_mutex;
/** "Очередь" того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/).
* В ZK записи в хронологическом порядке. Здесь записи в том порядке, в котором их лучше выполнять.
/** Очередь того, что нужно сделать на этой реплике, чтобы всех догнать. Берется из ZooKeeper (/replicas/me/queue/).
* В ZK записи в хронологическом порядке. Здесь - не обязательно.
*/
LogEntries queue;
Poco::FastMutex queue_mutex;
/** Куски, которые появятся в результате действий, выполняемых прямо сейчас фоновыми потоками (этих действий нет в очереди).
* Использовать под залоченным queue_mutex.
*/
StringSet future_parts;
String table_name;
String full_path;
......@@ -256,11 +300,10 @@ private:
*/
void pullLogsToQueue();
/** Делает преобразования над очередью:
* - Если есть MERGE_PARTS кусков, не все из которых у нас есть, заменяем его на GET_PART и
* убираем GET_PART для всех составляющих его кусков. NOTE: Наверно, это будет плохо работать. Придумать эвристики получше.
/** Можно ли сейчас попробовать выполнить это действие. Если нет, нужно оставить его в очереди и попробовать выполнить другое.
* Вызывается под queue_mutex.
*/
void optimizeQueue();
bool shouldExecuteLogEntry(const LogEntry & entry);
/** Выполнить действие из очереди. Бросает исключение, если что-то не так.
*/
......@@ -274,11 +317,11 @@ private:
*/
void queueThread();
void becomeLeader();
/// Выбор кусков для слияния.
/** В бесконечном цикле выбирается куски для слияния и записывает в лог.
void becomeLeader();
/** В бесконечном цикле выбирает куски для слияния и записывает в лог.
*/
void mergeSelectingThread();
......
......@@ -661,23 +661,17 @@ Strings MergeTreeData::tryRestorePart(const String & path, const String & file_n
return restored_parts;
}
void MergeTreeData::replaceParts(DataPartsVector old_parts, DataPartPtr new_part)
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> all_lock(all_data_parts_mutex);
for (size_t i = 0; i < old_parts.size(); ++i)
if (data_parts.end() == data_parts.find(old_parts[i]))
throw Exception("Logical error: cannot find data part " + old_parts[i]->name + " in list", ErrorCodes::LOGICAL_ERROR);
data_parts.insert(new_part);
all_data_parts.insert(new_part);
for (size_t i = 0; i < old_parts.size(); ++i)
data_parts.erase(data_parts.find(old_parts[i]));
auto removed = renameTempPartAndReplace(part, increment);
if (!removed.empty())
{
LOG_ERROR(log, "Added part " << part->name << + " covers " << toString(removed.size())
<< " existing part(s) (including " << removed[0]->name << ")");
}
}
void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * increment)
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(MutableDataPartPtr part, Increment * increment)
{
LOG_TRACE(log, "Renaming.");
......@@ -686,17 +680,14 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * in
String old_path = getFullPath() + part->name + "/";
UInt64 part_id = part->left;
/** Важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
/** Для StorageMergeTree важно, что получение номера куска происходит атомарно с добавлением этого куска в набор.
* Иначе есть race condition - может произойти слияние пары кусков, диапазоны номеров которых
* содержат ещё не добавленный кусок.
*/
if (increment)
part_id = increment->get(false);
part->left = part->right = increment->get(false);
part->left = part->right = part_id;
part->name = getPartName(part->left_date, part->right_date, part_id, part_id, 0);
part->name = getPartName(part->left_date, part->right_date, part->left, part->right, 0);
if (data_parts.count(part))
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
......@@ -706,8 +697,30 @@ void MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr part, Increment * in
/// Переименовываем кусок.
Poco::File(old_path).renameTo(new_path);
DataPartsVector res;
/// Куски, содержащиеся в part, идут в data_parts подряд, задевая место, куда вставился бы сам part.
DataParts::iterator it = data_parts.lower_bound(part);
/// Пойдем влево.
while (it != data_parts.begin())
{
--it;
if (!part->contains(**it))
break;
res.push_back(*it);
data_parts.erase(it++); /// Да, ++, а не --.
}
std::reverse(res.begin(), res.end()); /// Нужно получить куски в порядке возрастания.
/// Пойдем вправо.
while (it != data_parts.end() && part->contains(**it))
{
res.push_back(*it);
data_parts.erase(it++);
}
data_parts.insert(part);
all_data_parts.insert(part);
return res;
}
void MergeTreeData::renameAndDetachPart(DataPartPtr part, const String & prefix)
......
......@@ -248,7 +248,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
data.parsePartName(merged_name, *new_data_part);
new_data_part->name = merged_name;
new_data_part->name = "tmp_" + merged_name;
/** Читаем из всех кусков, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
......@@ -285,8 +285,7 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::
throw Exception("Unknown mode of operation for MergeTreeData: " + toString(data.mode), ErrorCodes::LOGICAL_ERROR);
}
String new_part_tmp_path = data.getFullPath() + "tmp_" + new_data_part->name + "/";
String new_part_res_path = data.getFullPath() + new_data_part->name + "/";
String new_part_tmp_path = data.getFullPath() + "tmp_" + merged_name + "/";
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(data, new_part_tmp_path, data.getColumnsList());
......@@ -314,15 +313,33 @@ MergeTreeData::DataPartPtr MergeTreeDataMerger::mergeParts(const MergeTreeData::
if (0 == to->marksCount())
{
LOG_INFO(log, "All rows have been deleted while merging from " << parts.front()->name << " to " << parts.back()->name);
return nullptr;
throw Exception("All rows have been deleted while merging from " + parts.front()->name
+ " to " + parts.back()->name, ErrorCodes::LOGICAL_ERROR);
}
/// Переименовываем кусок.
Poco::File(new_part_tmp_path).renameTo(new_part_res_path);
/// Переименовываем новый кусок, добавляем в набор и убираем исходные куски.
auto replaced_parts = data.renameTempPartAndReplace(new_data_part);
/// Добавляем новый кусок в набор.
data.replaceParts(parts, new_data_part);
if (new_data_part->name != merged_name)
LOG_ERROR(log, "Unexpected part name: " << new_data_part->name << " instead of " << merged_name);
/// Проверим, что удалились все исходные куски и только они.
if (replaced_parts.size() != parts.size())
{
LOG_ERROR(log, "Unexpected number of parts removed when adding " << new_data_part->name << ": " << replaced_parts.size()
<< " instead of " << parts.size());
}
else
{
for (size_t i = 0; i < parts.size(); ++i)
{
if (parts[i]->name != replaced_parts[i]->name)
{
LOG_ERROR(log, "Unexpected part removed when adding " << new_data_part->name << ": " << replaced_parts[i]->name
<< " instead of " << parts[i]->name);
}
}
}
LOG_TRACE(log, "Merged " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
......
......@@ -369,8 +369,26 @@ void StorageReplicatedMergeTree::pullLogsToQueue()
LOG_DEBUG(log, "Pulled " << count << " entries to queue");
}
void StorageReplicatedMergeTree::optimizeQueue()
bool StorageReplicatedMergeTree::shouldExecuteLogEntry(const LogEntry & entry)
{
if (entry.type == LogEntry::MERGE_PARTS)
{
/** Если какая-то из нужных частей сейчас передается или мерджится, подождем окончания этой операции.
* Иначе, даже если всех нужных частей для мерджа нет, нужно попытаться сделать мердж.
* Если каких-то частей не хватает, вместо мерджа будет попытка скачать кусок.
* Такая ситуация возможна, если получение какого-то куска пофейлилось, и его переместили в конец очереди.
*/
for (const auto & name : entry.parts_to_merge)
{
if (future_parts.count(name))
{
LOG_TRACE(log, "Not merging into part " << entry.new_part_name << " yet because part " << name << " is not ready yet.");
return false;
}
}
}
return true;
}
void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
......@@ -384,54 +402,148 @@ void StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
/// Даже если кусок есть локально, его (в исключительных случаях) может не быть в zookeeper.
if (containing_part && zookeeper.exists(replica_path + "/parts/" + containing_part->name))
{
LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
if (!(entry.type == LogEntry::GET_PART && entry.source_replica == replica_name))
LOG_DEBUG(log, "Skipping action for part " + entry.new_part_name + " - part already exists");
return;
}
}
if (entry.type == LogEntry::GET_PART && entry.source_replica == replica_name)
LOG_ERROR(log, "Part " << entry.new_part_name << " from own log doesn't exist. This is a bug.");
bool do_fetch = false;
if (entry.type == LogEntry::GET_PART)
{
String replica = findActiveReplicaHavingPart(entry.new_part_name);
fetchPart(entry.new_part_name, replica);
do_fetch = true;
}
else if (entry.type == LogEntry::MERGE_PARTS)
{
MergeTreeData::DataPartsVector parts;
bool have_all_parts = true;;
for (const String & name : entry.parts_to_merge)
{
MergeTreeData::DataPartPtr part = data.getContainingPart(name);
if (!part || part->name != name)
throw Exception("Part to merge doesn't exist: " + name, ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
if (!part)
{
have_all_parts = false;
break;
}
if (part->name != name)
{
LOG_ERROR(log, "Log and parts set look inconsistent: " << name << " is covered by " << part->name
<< " but should be merged into " << entry.new_part_name);
have_all_parts = false;
break;
}
parts.push_back(part);
}
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
"",
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
part->checksums.toString(),
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
for (const auto & part : parts)
if (!have_all_parts)
{
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
/// Если нет всех нужных кусков, попробуем взять у кого-нибудь уже помердженный кусок.
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
}
else
{
MergeTreeData::DataPartPtr part = merger.mergeParts(parts, entry.new_part_name);
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name,
"",
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
ops.push_back(new zkutil::Op::Create(
replica_path + "/parts/" + part->name + "/checksums",
part->checksums.toString(),
zookeeper.getDefaultACL(),
zkutil::CreateMode::Persistent));
for (const auto & part : parts)
{
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
}
zookeeper.multi(ops);
zookeeper.multi(ops);
data.clearOldParts();
data.clearOldParts();
ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);
}
}
else
{
throw Exception("Unexpected log entry type: " + toString(static_cast<int>(entry.type)));
}
if (do_fetch)
{
try
{
String replica = findActiveReplicaHavingPart(entry.new_part_name);
fetchPart(entry.new_part_name, replica);
if (entry.type == LogEntry::MERGE_PARTS)
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetchesOfMerged);
}
catch (...)
{
/** Если не получилось скачать кусок, нужный для какого-то мерджа, лучше не пытаться получить другие куски для этого мерджа,
* а попытаться сразу получить помердженный кусок. Чтобы так получилось, переместим действия для получения остальных кусков
* для этого мерджа в конец очереди.
*
*/
try
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
/// Найдем действие по объединению этого куска с другими. Запомним других.
StringSet parts_for_merge;
LogEntries::iterator merge_entry;
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if (it->type == LogEntry::MERGE_PARTS)
{
if (std::find(it->parts_to_merge.begin(), it->parts_to_merge.end(), entry.new_part_name)
!= it->parts_to_merge.end())
{
parts_for_merge = StringSet(it->parts_to_merge.begin(), it->parts_to_merge.end());
merge_entry = it;
break;
}
}
}
if (!parts_for_merge.empty())
{
/// Переместим в конец очереди действия, получающие parts_for_merge.
for (LogEntries::iterator it = queue.begin(); it != queue.end();)
{
auto it0 = it;
++it;
if (it0 == merge_entry)
break;
if ((it0->type == LogEntry::MERGE_PARTS || it0->type == LogEntry::GET_PART)
&& parts_for_merge.count(it0->new_part_name))
{
queue.splice(queue.end(), queue, it0, it);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
throw;
}
}
}
void StorageReplicatedMergeTree::queueUpdatingThread()
......@@ -441,7 +553,6 @@ void StorageReplicatedMergeTree::queueUpdatingThread()
try
{
pullLogsToQueue();
optimizeQueue();
}
catch (...)
{
......@@ -457,19 +568,33 @@ void StorageReplicatedMergeTree::queueThread()
while (!shutdown_called)
{
LogEntry entry;
bool empty;
bool have_work = false;
try
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
empty = queue.empty();
bool empty = queue.empty();
if (!empty)
{
entry = queue.front();
queue.pop_front();
for (LogEntries::iterator it = queue.begin(); it != queue.end(); ++it)
{
if (shouldExecuteLogEntry(*it))
{
entry = *it;
entry.tagPartAsFuture(*this);
queue.erase(it);
have_work = true;
break;
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
if (empty)
if (!have_work)
{
std::this_thread::sleep_for(QUEUE_NO_WORK_SLEEP);
continue;
......@@ -480,7 +605,11 @@ void StorageReplicatedMergeTree::queueThread()
try
{
executeLogEntry(entry);
zookeeper.remove(replica_path + "/queue/" + entry.znode_name);
auto code = zookeeper.tryRemove(replica_path + "/queue/" + entry.znode_name);
if (code != zkutil::ReturnCode::Ok)
LOG_ERROR(log, "Couldn't remove " << replica_path + "/queue/" + entry.znode_name << ": "
<< zkutil::ReturnCode::toString(code) + ". There must be a bug somewhere. Ignoring it.");
success = true;
}
......@@ -500,6 +629,7 @@ void StorageReplicatedMergeTree::queueThread()
{
{
/// Добавим действие, которое не получилось выполнить, в конец очереди.
entry.future_part_tagger = nullptr;
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
queue.push_back(entry);
}
......@@ -571,6 +701,7 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
{
LogEntry entry;
entry.type = LogEntry::MERGE_PARTS;
entry.source_replica = replica_name;
entry.new_part_name = merged_name;
for (const auto & part : parts)
......@@ -681,7 +812,13 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
assertEOF(buf);
MergeTreeData::MutableDataPartPtr part = fetcher.fetchPart(part_name, zookeeper_path + "/replicas/" + replica_name, host, port);
data.renameTempPartAndAdd(part, nullptr);
auto removed_parts = data.renameTempPartAndReplace(part);
for (const auto & removed_part : removed_parts)
{
LOG_DEBUG(log, "Part " << removed_part->name << " is rendered obsolete by fetching part " << part_name);
ProfileEvents::increment(ProfileEvents::ObsoleteReplicatedParts);
}
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Create(
......@@ -696,6 +833,8 @@ void StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
zkutil::CreateMode::Persistent));
zookeeper.multi(ops);
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part");
}
......@@ -762,6 +901,9 @@ void StorageReplicatedMergeTree::drop()
void StorageReplicatedMergeTree::LogEntry::writeText(WriteBuffer & out) const
{
writeString("format version: 1\n", out);
writeString("source replica: ", out);
writeString(source_replica, out);
writeString("\n", out);
switch (type)
{
case GET_PART:
......@@ -788,6 +930,8 @@ void StorageReplicatedMergeTree::LogEntry::readText(ReadBuffer & in)
assertString("format version: 1\n", in);
readString(type_str, in);
assertString("\nsource replica: ", in);
readString(source_replica, in);
assertString("\n", in);
if (type_str == "get")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册