提交 99b2d135 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

clear deduplication blocks when doing DROP PARTITION [#CLICKHOUSE-3208]

上级 3ffb2391
......@@ -131,7 +131,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
hash.get128(hash_value.bytes);
/// We take the hash from the data as ID. That is, do not insert the same data twice.
block_id = toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
block_id = part->info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
LOG_DEBUG(log, "Wrote block with ID '" << block_id << "', " << block.rows() << " rows");
}
......@@ -331,7 +331,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
}
catch (const zkutil::KeeperException & e)
{
/** If the connection is lost, and we do not know if the changes were applied, you can not delete the local chunk
/** If the connection is lost, and we do not know if the changes were applied, you can not delete the local part
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
if (e.code == ZOPERATIONTIMEOUT ||
......
......@@ -16,8 +16,9 @@ namespace ErrorCodes
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")),
thread([this] { run(); }),
cached_block_stats(std::make_unique<NodesStatCache>()) {}
thread([this] { run(); })
{
}
void ReplicatedMergeTreeCleanupThread::run()
......@@ -110,131 +111,104 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
}
namespace
{
/// Just a subset of zkutil::Stat fields required for the cache
struct RequiredStat
{
int64_t ctime = 0;
int32_t numChildren = 0;
RequiredStat() = default;
RequiredStat(const RequiredStat &) = default;
explicit RequiredStat(const zkutil::Stat & s) : ctime(s.ctime), numChildren(s.numChildren) {};
explicit RequiredStat(Int64 ctime_) : ctime(ctime_) {}
};
}
/// Just a node name with its ZooKeeper's stat
struct ReplicatedMergeTreeCleanupThread::NodeWithStat
{
String node;
RequiredStat stat;
Int64 ctime = 0;
NodeWithStat() = default;
NodeWithStat(const String & node_, const RequiredStat & stat_) : node(node_), stat(stat_) {}
NodeWithStat(String node_, Int64 ctime_) : node(std::move(node_)), ctime(ctime_) {}
static bool greaterByTime (const NodeWithStat & lhs, const NodeWithStat & rhs)
static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs)
{
return std::greater<void>()(std::forward_as_tuple(lhs.stat.ctime, lhs.node), std::forward_as_tuple(rhs.stat.ctime, rhs.node));
return std::forward_as_tuple(lhs.ctime, lhs.node) > std::forward_as_tuple(rhs.ctime, rhs.node);
}
};
/// Use simple map node_name -> zkutil::Stat (only required fields) as the cache
/// It is not declared in the header explicitly to hide extra implementation dependent structs like RequiredStat
class ReplicatedMergeTreeCleanupThread::NodesStatCache : public std::map<String, RequiredStat> {};
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
{
auto zookeeper = storage.getZooKeeper();
std::vector<NodeWithStat> timed_blocks;
getBlocksSortedByTime(zookeeper, timed_blocks);
getBlocksSortedByTime(*zookeeper, timed_blocks);
if (timed_blocks.empty())
return;
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().stat.ctime;
Int64 current_time = timed_blocks.front().ctime;
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage.data.settings.replicated_deduplication_window_seconds));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold("", RequiredStat(time_threshold));
NodeWithStat block_threshold{{}, time_threshold};
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window.value);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
/// TODO After about half a year, we could remain only multi op, because there will be no obsolete children nodes.
zkutil::Ops ops;
std::vector<std::pair<String, zkutil::ZooKeeper::TryRemoveFuture>> try_remove_futures;
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
String path = storage.zookeeper_path + "/blocks/" + it->node;
if (it->stat.numChildren == 0)
{
ops.emplace_back(new zkutil::Op::Remove(path, -1));
if (ops.size() >= zkutil::MULTI_BATCH_SIZE)
{
zookeeper->multi(ops);
ops.clear();
}
}
else
zookeeper->removeRecursive(path);
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path));
}
if (!ops.empty())
for (auto & pair : try_remove_futures)
{
zookeeper->multi(ops);
ops.clear();
const String & path = pair.first;
int32_t rc = pair.second.get();
if (rc == ZNOTEMPTY)
{
/// Can happen if there are leftover block nodes with children created by previous server versions.
zookeeper->removeRecursive(path);
}
else if (rc != ZOK)
LOG_WARNING(log,
"Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
}
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
LOG_TRACE(log, "Cleared " << num_nodes_to_delete << " old blocks from ZooKeeper");
if (num_nodes_to_delete)
LOG_TRACE(log, "Cleared " << num_nodes_to_delete << " old blocks from ZooKeeper");
}
void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeperPtr & zookeeper, std::vector<NodeWithStat> & timed_blocks)
void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks)
{
timed_blocks.clear();
Strings blocks;
zkutil::Stat stat;
if (ZOK != zookeeper->tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat))
if (ZOK != zookeeper.tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat))
throw Exception(storage.zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
/// Clear already deleted blocks from the cache, cached_block_ctime should be subset of blocks
{
NameSet blocks_set(blocks.begin(), blocks.end());
for (auto it = cached_block_stats->begin(); it != cached_block_stats->end();)
for (auto it = cached_block_stats.begin(); it != cached_block_stats.end();)
{
if (!blocks_set.count(it->first))
it = cached_block_stats->erase(it);
it = cached_block_stats.erase(it);
else
++it;
}
}
auto not_cached_blocks = stat.numChildren - cached_block_stats->size();
auto not_cached_blocks = stat.numChildren - cached_block_stats.size();
if (not_cached_blocks)
{
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
<< " to clear old ones from ZooKeeper. This might take several minutes.");
<< " to clear old ones from ZooKeeper.");
}
std::vector<std::pair<String, zkutil::ZooKeeper::ExistsFuture>> exists_futures;
for (const String & block : blocks)
{
auto it = cached_block_stats->find(block);
if (it == cached_block_stats->end())
auto it = cached_block_stats.find(block);
if (it == cached_block_stats.end())
{
/// New block. Fetch its stat stat asynchronously
exists_futures.emplace_back(block, zookeeper->asyncExists(storage.zookeeper_path + "/blocks/" + block));
/// New block. Fetch its stat asynchronously.
exists_futures.emplace_back(block, zookeeper.asyncExists(storage.zookeeper_path + "/blocks/" + block));
}
else
{
......@@ -247,11 +221,11 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeperPt
for (auto & elem : exists_futures)
{
zkutil::ZooKeeper::StatAndExists status = elem.second.get();
if (!status.exists)
throw zkutil::KeeperException("A block node was suddenly deleted", ZNONODE);
cached_block_stats->emplace(elem.first, RequiredStat(status.stat));
timed_blocks.emplace_back(elem.first, RequiredStat(status.stat));
if (status.exists)
{
cached_block_stats.emplace(elem.first, status.stat.ctime);
timed_blocks.emplace_back(elem.first, status.stat.ctime);
}
}
std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
......
......@@ -33,15 +33,15 @@ private:
/// Remove old records from ZooKeeper.
void clearOldLogs();
/// Remove old block hashes from ZooKeeper. This makes a leading replica.
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks();
class NodesStatCache;
struct NodeWithStat;
std::unique_ptr<NodesStatCache> cached_block_stats;
using NodeCTimeCache = std::map<String, Int64>;
NodeCTimeCache cached_block_stats;
/// Returns list of blocks (with their stat) sorted by ctime in descending order
void getBlocksSortedByTime(std::shared_ptr<zkutil::ZooKeeper> & zookeeper, std::vector<NodeWithStat> & timed_blocks);
struct NodeWithStat;
/// Returns list of blocks (with their stat) sorted by ctime in descending order.
void getBlocksSortedByTime(zkutil::ZooKeeper & zookeeper, std::vector<NodeWithStat> & timed_blocks);
/// TODO Removing old quorum/failed_parts
/// TODO Removing old nonincrement_block_numbers
......
......@@ -2669,7 +2669,8 @@ static String getFakePartNameCoveringPartRange(
}
String StorageReplicatedMergeTree::getFakePartNameCoveringAllPartsInPartition(const String & partition_id)
String StorageReplicatedMergeTree::getFakePartNameCoveringAllPartsInPartition(
const String & partition_id, Int64 * out_min_block, Int64 * out_max_block)
{
/// Even if there is no data in the partition, you still need to mark the range for deletion.
/// - Because before executing DETACH, tasks for downloading parts to this partition can be executed.
......@@ -2695,6 +2696,11 @@ String StorageReplicatedMergeTree::getFakePartNameCoveringAllPartsInPartition(co
return {};
--right;
if (out_min_block)
*out_min_block = left;
if (out_max_block)
*out_max_block = right;
return getFakePartNameCoveringPartRange(data.format_version, partition_id, left, right);
}
......@@ -2740,6 +2746,8 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
{
assertNotReadonly();
zkutil::ZooKeeperPtr zookeeper = getZooKeeper();
if (!is_leader_node)
{
sendRequestToLeaderReplica(query, context.getSettingsRef());
......@@ -2747,7 +2755,10 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
}
String partition_id = data.getPartitionIDFromQuery(partition, context);
String fake_part_name = getFakePartNameCoveringAllPartsInPartition(partition_id);
Int64 min_block = 0;
Int64 max_block = 0;
String fake_part_name = getFakePartNameCoveringAllPartsInPartition(partition_id, &min_block, &max_block);
if (fake_part_name.empty())
{
......@@ -2755,6 +2766,8 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
return;
}
clearBlocksInPartition(*zookeeper, partition_id, min_block, max_block);
/** Forbid to choose the parts to be deleted for merging.
* Invariant: after the `DROP_RANGE` entry appears in the log, merge of deleted parts will not appear in the log.
*/
......@@ -2773,7 +2786,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPt
entry.detach = detach;
entry.create_time = time(nullptr);
String log_znode_path = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
String log_znode_path = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
/// If necessary, wait until the operation is performed on itself or on all replicas.
......@@ -4037,4 +4050,56 @@ void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr &
}
void StorageReplicatedMergeTree::clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num)
{
Strings blocks;
if (ZOK != zookeeper.tryGetChildren(zookeeper_path + "/blocks", blocks))
throw Exception(zookeeper_path + "/blocks doesn't exist", ErrorCodes::NOT_FOUND_NODE);
String partition_prefix = partition_id + "_";
std::vector<std::pair<String, zkutil::ZooKeeper::TryGetFuture>> get_futures;
for (const String & block_id : blocks)
{
if (startsWith(block_id, partition_prefix))
{
String path = zookeeper_path + "/blocks/" + block_id;
get_futures.emplace_back(path, zookeeper.asyncTryGet(path));
}
}
std::vector<std::pair<String, zkutil::ZooKeeper::TryRemoveFuture>> to_delete_futures;
for (auto & pair : get_futures)
{
const String & path = pair.first;
zkutil::ZooKeeper::ValueAndStatAndExists result = pair.second.get();
if (!result.exists)
continue;
ReadBufferFromString buf(result.value);
Int64 block_num = 0;
bool parsed = tryReadIntText(block_num, buf) && buf.eof();
if (!parsed || (min_block_num <= block_num && block_num <= max_block_num))
to_delete_futures.emplace_back(path, zookeeper.asyncTryRemove(path));
}
for (auto & pair : to_delete_futures)
{
const String & path = pair.first;
int32_t rc = pair.second.get();
if (rc == ZNOTEMPTY)
{
/// Can happen if there are leftover block nodes with children created by previous server versions.
zookeeper.removeRecursive(path);
}
else if (rc != ZOK)
LOG_WARNING(log,
"Error while deleting ZooKeeper path `" << path << "`: " + zkutil::ZooKeeper::error2string(rc) << ", ignoring.");
}
LOG_TRACE(log, "Deleted " << to_delete_futures.size() << " deduplication block IDs in partition ID " << partition_id);
}
}
......@@ -289,7 +289,7 @@ private:
/// Limiting parallel fetches per one table
std::atomic_uint current_table_fetches {0};
/// Streams
/// Threads.
/// A thread that keeps track of the updates in the logs of all replicas and loads them into the queue.
std::thread queue_updating_thread;
......@@ -455,14 +455,18 @@ private:
void assertNotReadonly() const;
/// The name of an imaginary part covering all parts in the specified partition (at the call moment).
/// Returns empty string if partition is empty.
String getFakePartNameCoveringAllPartsInPartition(const String & partition_id);
/// Returns empty string if the partition doesn't exist yet.
String getFakePartNameCoveringAllPartsInPartition(
const String & partition_id, Int64 * out_min_block = nullptr, Int64 * out_max_block = nullptr);
/// Check for a node in ZK. If it is, remember this information, and then immediately answer true.
std::unordered_set<std::string> existing_nodes_cache;
std::mutex existing_nodes_cache_mutex;
bool existsNodeCached(const std::string & path);
/// Remove block IDs from `blocks/` in ZooKeeper for the given partition ID in the given block number range.
void clearBlocksInPartition(
zkutil::ZooKeeper & zookeeper, const String & partition_id, Int64 min_block_num, Int64 max_block_num);
/// Resharding.
struct ReplicaSpaceInfo
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册