提交 a4c1a4ee 编写于 作者: V Vitaliy Lyudvichenko

Add requested changes. [#CLICKHOUSE-3207]

上级 7e90f34c
......@@ -97,7 +97,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(storage.zookeeper_path + "/log/" + entries[i], -1));
if (ops.size() > 400 || i + 1 == entries.size())
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
{
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
ops.emplace_back(std::make_unique<zkutil::Op::Check>(storage.zookeeper_path + "/replicas", stat.version));
......@@ -109,13 +109,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
LOG_DEBUG(log, "Removed " << entries.size() << " old log entries: " << entries.front() << " - " << entries.back());
}
namespace
{
/// Just a subset of zkutil::Stat fields required for the cache
struct RequiredStat
{
Int64 ctime;
int numChildren;
int64_t ctime = 0;
int32_t numChildren = 0;
RequiredStat() = default;
RequiredStat(const RequiredStat &) = default;
......@@ -125,12 +127,7 @@ struct RequiredStat
}
class ReplicatedMergeTreeCleanupThread::NodesStatCache : public std::map<String, RequiredStat>
{
};
/// Just a node name with its ZooKeeper's stat
struct ReplicatedMergeTreeCleanupThread::NodeWithStat
{
String node;
......@@ -141,10 +138,14 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
static bool greaterByTime (const NodeWithStat & lhs, const NodeWithStat & rhs)
{
return (lhs.stat.ctime != rhs.stat.ctime) ? lhs.stat.ctime > rhs.stat.ctime : lhs.node > rhs.node;
return std::greater<void>()(std::forward_as_tuple(lhs.stat.ctime, lhs.node), std::forward_as_tuple(rhs.stat.ctime, rhs.node));
}
};
/// Use simple map node_name -> zkutil::Stat (only required fields) as the cache
/// It is not declared in the header explicitly to hide extra implementation dependent structs like RequiredStat
class ReplicatedMergeTreeCleanupThread::NodesStatCache : public std::map<String, RequiredStat> {};
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
{
......@@ -158,7 +159,9 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().stat.ctime;
Int64 time_threshold = std::max(0L, current_time - static_cast<Int64>(storage.data.settings.replicated_deduplication_window_seconds));
Int64 time_threshold = std::max(0L, current_time - static_cast<Int64>(1000 * storage.data.settings.replicated_deduplication_window_seconds));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold("", RequiredStat(time_threshold));
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window);
......@@ -208,7 +211,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
if (num_nodes_not_deleted)
{
LOG_ERROR(log, "There was a problem with deleting " << num_nodes_not_deleted << " (from " << num_nodes_to_delete << ")"
LOG_ERROR(log, "There was a problem with deleting " << num_nodes_not_deleted << " (of " << num_nodes_to_delete << ")"
<< " old blocks from ZooKeeper, error: " << zkutil::ZooKeeper::error2string(last_error_code));
}
else
......@@ -238,7 +241,7 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeperPt
}
auto not_cached_blocks = stat.numChildren - cached_block_stats->size();
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
<< " to clear old ones from ZooKeeper. This might take several minutes.");
std::vector<std::pair<String, zkutil::ZooKeeper::ExistsFuture>> exists_futures;
......
......@@ -40,7 +40,7 @@ private:
struct NodeWithStat;
std::unique_ptr<NodesStatCache> cached_block_stats;
/// Returns list of blocks with stat sorted by ctime
/// Returns list of blocks (with their stat) sorted by ctime in descending order
void getBlocksSortedByTime(std::shared_ptr<zkutil::ZooKeeper> & zookeeper, std::vector<NodeWithStat> & timed_blocks);
/// TODO Removing old quorum/failed_parts
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册