提交 c782f7f1 编写于 作者: A Alexey Milovidov

Merge

......@@ -96,6 +96,11 @@ public:
return UNLOCKED;
}
static void createAbandonedIfNotExists(const String & path, zkutil::ZooKeeper & zookeeper)
{
zookeeper.createIfNotExists(path, "");
}
private:
zkutil::ZooKeeper & zookeeper;
String path_prefix;
......
......@@ -129,6 +129,7 @@ public:
e.code == ZCONNECTIONLOSS)
{
transaction.commit();
storage.enqueuePartForCheck(part->name);
}
throw;
......
......@@ -170,6 +170,7 @@ void StorageReplicatedMergeTree::createTable()
zookeeper->create(zookeeper_path + "/log", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/blocks", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/nonincrement_block_numbers", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/leader_election", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/temp", "", zkutil::CreateMode::Persistent);
zookeeper->create(zookeeper_path + "/flags", "", zkutil::CreateMode::Persistent);
......@@ -649,14 +650,12 @@ void StorageReplicatedMergeTree::clearOldBlocks()
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/columns", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second + "/checksums", -1));
ops.push_back(new zkutil::Op::Remove(zookeeper_path + "/blocks/" + timed_blocks[i].second, -1));
if (ops.size() > 400)
if (ops.size() > 400 || i + 1 == timed_blocks.size())
{
zookeeper->multi(ops);
ops.clear();
}
}
if (!ops.empty())
zookeeper->multi(ops);
LOG_TRACE(log, "Cleared " << blocks.size() - data.settings.replicated_deduplication_window << " old blocks from ZooKeeper");
}
......@@ -1039,7 +1038,7 @@ bool StorageReplicatedMergeTree::queueTask(BackgroundProcessingPool::Context & p
void StorageReplicatedMergeTree::mergeSelectingThread()
{
pullLogsToQueue();
bool need_pull = true;
while (!shutdown_called && is_leader_node)
{
......@@ -1047,6 +1046,14 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
try
{
if (need_pull)
{
/// Нужно загрузить новую запись в очередь перед тем, как выбирать куски для слияния.
/// (чтобы кусок добавился в virtual_parts).
pullLogsToQueue();
need_pull = false;
}
size_t merges_queued = 0;
/// Есть ли в очереди или в фоновом потоке мердж крупных кусков.
bool has_big_merge = context.getBackgroundPool().getCounter("replicated big merges") > 0;
......@@ -1106,11 +1113,9 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
entry.parts_to_merge.push_back(part->name);
}
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
need_pull = true;
/// Нужно загрузить новую запись в очередь перед тем, как в следующий раз выбирать куски для слияния.
/// (чтобы кусок добавился в virtual_parts).
pullLogsToQueue();
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
String month_name = parts[0]->name.substr(0, 6);
for (size_t i = 0; i + 1 < parts.size(); ++i)
......@@ -1118,9 +1123,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
/// Уберем больше не нужные отметки о несуществующих блоках.
for (UInt64 number = parts[i]->right + 1; number <= parts[i + 1]->left - 1; ++number)
{
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
zookeeper->tryRemove(path);
zookeeper->tryRemove(zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number));
zookeeper->tryRemove(zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number));
}
}
......@@ -1378,11 +1382,7 @@ void StorageReplicatedMergeTree::partCheckThread()
* Для кусков, полученных в результате слияния такая проверка была бы некорректной,
* потому что слитого куска может еще ни у кого не быть.
*/
if (part_info.left != part_info.right)
{
LOG_WARNING(log, "Not checking if part " << part_name << " is lost because it is a result of a merge.");
}
else
if (part_info.left == part_info.right)
{
LOG_WARNING(log, "Checking if anyone has part covering " << part_name << ".");
......@@ -1406,22 +1406,12 @@ void StorageReplicatedMergeTree::partCheckThread()
if (!found)
{
/** Такая ситуация возможна при нормальной работе, без потери данных, например, так:
* ReplicatedMergeTreeBlockOutputStream записал кусок, попытался добавить его в ZK,
* получил operation timeout, удалил локальный кусок и бросил исключение,
* а на самом деле, кусок добавился в ZK.
*/
LOG_ERROR(log, "No replica has part covering " << part_name << ". This part is lost forever. "
<< "There might or might not be a data loss.");
LOG_ERROR(log, "No replica has part covering " << part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
/** Если ни у кого нет такого куска, удалим его из нашей очереди.
*
* Еще можно было бы добавить его в block_numbers, чтобы он не мешал слияниям,
* но если так сделать, ZooKeeper почему-то пропустит один номер для автоинкремента,
* и в номерах блоков все равно останется дырка.
* TODO: можно это исправить, сделав две директории block_numbers: для автоинкрементных и ручных нод.
*/
/// Если ни у кого нет такого куска, удалим его из нашей очереди.
bool was_in_queue = false;
{
Poco::ScopedLock<Poco::FastMutex> lock(queue_mutex);
......@@ -1436,6 +1426,7 @@ void StorageReplicatedMergeTree::partCheckThread()
{
zookeeper->remove(replica_path + "/queue/" + it->znode_name);
queue.erase(it++);
was_in_queue = true;
}
else
{
......@@ -1443,6 +1434,26 @@ void StorageReplicatedMergeTree::partCheckThread()
}
}
}
if (was_in_queue)
{
/** Такая ситуация возможна, если на всех репликах, где был кусок, он испортился.
* Например, у реплики, которая только что его записала, отключили питание, и данные не записались из кеша на диск.
*/
LOG_ERROR(log, "Part " << part_name << " is lost forever. Say goodbye to a piece of data!");
/** Нужно добавить отсутствующий кусок в block_numbers, чтобы он не мешал слияниям.
* Вот только в сам block_numbers мы его добавить не можем - если так сделать,
* ZooKeeper зачем-то пропустит один номер для автоинкремента,
* и в номерах блоков все равно останется дырка.
* Специально из-за этого приходится отдельно иметь nonincrement_block_numbers.
*/
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers", "");
zookeeper->createIfNotExists(zookeeper_path + "/nonincrement_block_numbers/" + part_name.substr(0, 6), "");
AbandonableLockInZooKeeper::createAbandonedIfNotExists(
zookeeper_path + "/nonincrement_block_numbers/" + part_name.substr(0, 6) + "/block-" + padIndex(part_info.left),
*zookeeper);
}
}
}
}
......@@ -1539,9 +1550,11 @@ bool StorageReplicatedMergeTree::canMergeParts(const MergeTreeData::DataPartPtr
/// Можно слить куски, если все номера между ними заброшены - не соответствуют никаким блокам.
for (UInt64 number = left->right + 1; number <= right->left - 1; ++number)
{
String path = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path1 = zookeeper_path + "/block_numbers/" + month_name + "/block-" + padIndex(number);
String path2 = zookeeper_path + "/nonincrement_block_numbers/" + month_name + "/block-" + padIndex(number);
if (AbandonableLockInZooKeeper::check(path, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
if (AbandonableLockInZooKeeper::check(path1, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED &&
AbandonableLockInZooKeeper::check(path2, *zookeeper) != AbandonableLockInZooKeeper::ABANDONED)
return false;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册