提交 85c722ca 编写于 作者: M Michael Kolupaev

Merge

上级 a6e8c35b
......@@ -617,9 +617,17 @@ public:
*/
void renameAndDetachPart(DataPartPtr part, const String & prefix);
/** Удалить неактуальные куски. Возвращает имена удаленных кусков.
/** Возвращает старые неактуальные куски, которые можно удалить. Одновременно удаляет их из списка кусков, но не с диска.
*/
Strings clearOldParts();
DataPartsVector grabOldParts();
/** Обращает изменения, сделанные grabOldParts().
*/
void addOldParts(const DataPartsVector & parts);
/** Удалить неактуальные куски.
*/
void clearOldParts();
/** После вызова dropAllData больше ничего вызывать нельзя.
* Удаляет директорию с данными и сбрасывает кеши разжатых блоков и засечек.
......@@ -703,9 +711,6 @@ private:
/// Загрузить множество кусков с данными с диска. Вызывается один раз - при создании объекта.
void loadDataParts();
/// Определить, не битые ли данные в директории. Проверяет индекс и засечеки, но не сами данные.
bool isBrokenPart(const String & path);
/** Выражение, преобразующее типы столбцов.
* Если преобразований типов нет, out_expression=nullptr.
* out_rename_map отображает файлы-столбцы на выходе выражения в новые файлы таблицы.
......
......@@ -86,36 +86,52 @@ public:
zkutil::CreateMode::PersistentSequential));
block_number_lock.getUnlockOps(ops);
auto code = storage.zookeeper->tryMulti(ops);
if (code == ZOK)
try
{
transaction.commit();
storage.merge_selecting_event.set();
}
else if (code == ZNODEEXISTS)
{
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
auto code = storage.zookeeper->tryMulti(ops);
if (code == ZOK)
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
transaction.commit();
storage.merge_selecting_event.set();
}
else if (code == ZNODEEXISTS)
{
/// Если блок с таким ID уже есть в таблице, откатим его вставку.
String expected_checksums_str;
if (!block_id.empty() && storage.zookeeper->tryGet(
storage.zookeeper_path + "/blocks/" + block_id + "/checksums", expected_checksums_str))
{
LOG_INFO(log, "Block with ID " << block_id << " already exists; ignoring it (removing part " << part->name << ")");
auto expected_checksums = MergeTreeData::DataPart::Checksums::parse(expected_checksums_str);
/// Если данные отличались от тех, что были вставлены ранее с тем же ID, бросим исключение.
expected_checksums.checkEqual(part->checksums, true);
}
else
{
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
else
{
throw Exception("Unexpected ZNODEEXISTS while adding block " + toString(part_number) + " with ID " + block_id + ": "
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
}
}
else
catch (zkutil::KeeperException & e)
{
throw Exception("Unexpected error while adding block " + toString(part_number) + " with ID " + block_id + ": "
+ zkutil::ZooKeeper::error2string(code), ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR);
/** Если потерялось соединение, и мы не знаем, применились ли изменения, нельзя удалять локальный кусок:
* если изменения применились, в /blocks/ появился вставленный блок, и его нельзя будет вставить снова.
*/
if (e.code == ZOPERATIONTIMEOUT ||
e.code == ZCONNECTIONLOSS)
{
transaction.commit();
}
throw;
}
}
}
......
......@@ -251,34 +251,14 @@ void MergeTreeData::loadDataParts()
}
Strings MergeTreeData::clearOldParts()
MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
{
Poco::ScopedTry<Poco::FastMutex> lock;
Strings res;
DataPartsVector res;
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
if (!lock.lock(&all_data_parts_mutex))
{
return res;
}
time_t now = time(0);
for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
{
int ref_count = it->use_count();
if (ref_count == 1 && /// После этого ref_count не может увеличиться.
(*it)->remove_time < now &&
now - (*it)->remove_time > settings.old_parts_lifetime)
{
LOG_DEBUG(log, "Removing part " << (*it)->name);
res.push_back((*it)->name);
(*it)->remove();
all_data_parts.erase(it++);
}
else
++it;
}
/// Удаляем временные директории старше суток.
Strings all_file_names;
......@@ -302,9 +282,41 @@ Strings MergeTreeData::clearOldParts()
}
}
time_t now = time(0);
for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
{
int ref_count = it->use_count();
if (ref_count == 1 && /// После этого ref_count не может увеличиться.
(*it)->remove_time < now &&
now - (*it)->remove_time > settings.old_parts_lifetime)
{
res.push_back(*it);
all_data_parts.erase(it++);
}
else
++it;
}
return res;
}
void MergeTreeData::addOldParts(const MergeTreeData::DataPartsVector & parts)
{
Poco::ScopedLock<Poco::FastMutex> lock(all_data_parts_mutex);
all_data_parts.insert(parts.begin(), parts.end());
}
void MergeTreeData::clearOldParts()
{
auto parts_to_remove = grabOldParts();
for (DataPartPtr part : parts_to_remove)
{
LOG_DEBUG(log, "Removing part " << part->name);
part->remove();
}
}
void MergeTreeData::setPath(const String & new_full_path)
{
Poco::File(full_path).renameTo(new_full_path);
......
......@@ -522,21 +522,35 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(MergeTreeData::DataP
void StorageReplicatedMergeTree::clearOldParts()
{
Strings parts = data.clearOldParts();
MergeTreeData::DataPartsVector parts = data.grabOldParts();
size_t count = parts.size();
for (const String & name : parts)
if (!count)
return;
try
{
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + name, -1));
int32_t code = zookeeper->tryMulti(ops);
if (code != ZOK)
LOG_DEBUG(log, "Couldn't remove part " << name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(code));
while (!parts.empty())
{
MergeTreeData::DataPartPtr part = parts.back();
zkutil::Ops ops;
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(replica_path + "/parts/" + part->name, -1));
zookeeper->multi(ops);
part->remove();
parts.pop_back();
}
}
catch (...)
{
data.addOldParts(parts);
throw;
}
if (!parts.empty())
LOG_DEBUG(log, "Removed " << parts.size() << " old parts");
LOG_DEBUG(log, "Removed " << count << " old parts");
}
void StorageReplicatedMergeTree::clearOldLogs()
......@@ -1462,16 +1476,8 @@ void StorageReplicatedMergeTree::partCheckThread()
{
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/// Если этот кусок еще и получен в результате слияния, это уже чересчур странно.
if (part->left != part->right)
{
LOG_ERROR(log, "Unexpected part " << part_name << " is a result of a merge. You have to resolve this manually.");
}
else
{
LOG_ERROR(log, "Unexpected part " << part_name << ". Removing.");
data.renameAndDetachPart(part, "unexpected_");
}
LOG_ERROR(log, "Unexpected part " << part_name << ". Removing.");
data.renameAndDetachPart(part, "unexpected_");
}
}
else
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册