提交 ea01a3b2 编写于 作者: A Amos Bird 提交者: alexey-milovidov

add OPTIMIZE FINAL support for ReplicatedMergeTree.

上级 a0814164
......@@ -243,7 +243,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
FuturePart & future_part,
size_t available_disk_space,
size_t & available_disk_space,
const AllowedMergingPredicate & can_merge,
const String & partition_id,
bool final,
......@@ -306,6 +306,7 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
future_part.assign(std::move(parts));
available_disk_space -= required_disk_space;
return true;
}
......
......@@ -71,7 +71,7 @@ public:
*/
bool selectAllPartsToMergeWithinPartition(
FuturePart & future_part,
size_t available_disk_space,
size_t & available_disk_space,
const AllowedMergingPredicate & can_merge,
const String & partition_id,
bool final,
......
......@@ -106,7 +106,6 @@ namespace ErrorCodes
extern const int INCORRECT_FILE_NAME;
extern const int CANNOT_ASSIGN_OPTIMIZE;
extern const int KEEPER_EXCEPTION;
extern const int BAD_ARGUMENTS;
}
namespace ActionLocks
......@@ -2905,29 +2904,9 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
std::lock_guard<std::mutex> merge_selecting_lock(merge_selecting_mutex);
size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
MergeTreeDataMergerMutator::FuturePart future_merged_part;
String disable_reason;
bool selected = false;
auto zookeeper = getZooKeeper();
ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper);
if (!partition)
{
if (final)
throw Exception("FINAL flag for OPTIMIZE query on Replicated table is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS);
selected = merger_mutator.selectPartsToMerge(
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
}
else
{
String partition_id = data.getPartitionIDFromQuery(partition, context);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);
}
auto handle_noop = [&] (const String & message)
{
if (context.getSettingsRef().optimize_throw_if_noop)
......@@ -2935,14 +2914,50 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
return false;
};
if (!selected)
if (!partition && final)
{
LOG_INFO(log, "Cannot select parts for optimization" + (disable_reason.empty() ? "" : ": " + disable_reason));
return handle_noop(disable_reason);
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
std::unordered_set<String> partition_ids;
for (const MergeTreeData::DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
for (const String & partition_id : partition_ids)
{
MergeTreeDataMergerMutator::FuturePart future_merged_part;
bool selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, true, nullptr);
if (selected &&
!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
return handle_noop("Can't create merge queue node in ZooKeeper");
}
}
else
{
MergeTreeDataMergerMutator::FuturePart future_merged_part;
String disable_reason;
bool selected = false;
if (!partition)
{
selected = merger_mutator.selectPartsToMerge(
future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason);
}
else
{
String partition_id = data.getPartitionIDFromQuery(partition, context);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);
}
if (!selected)
{
LOG_INFO(log, "Cannot select parts for optimization" + (disable_reason.empty() ? "" : ": " + disable_reason));
return handle_noop(disable_reason);
}
if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
return handle_noop("Can't create merge queue node in ZooKeeper");
if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry))
return handle_noop("Can't create merge queue node in ZooKeeper");
}
}
/// TODO: Bad setting name for such purpose
......
2000-01-01 1 first 1
2000-01-01 1 first 2
2000-01-01 2 first 2
2000-01-02 1 first 3
2000-01-01 1 first 3
2000-01-01 2 first 2
2000-01-02 1 first 3
DROP TABLE IF EXISTS test.partitioned_by_tuple_replica1;
DROP TABLE IF EXISTS test.partitioned_by_tuple_replica2;
CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1') PARTITION BY (d, x) ORDER BY (d, x, w);
CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2') PARTITION BY (d, x) ORDER BY (d, x, w);
INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-02', 1, 'first', 3);
INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-01', 2, 'first', 2);
INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-01', 1, 'first', 1), ('2000-01-01', 1, 'first', 2);
OPTIMIZE TABLE test.partitioned_by_tuple_replica1;
SELECT * FROM test.partitioned_by_tuple_replica2 ORDER BY d, x, w, y;
OPTIMIZE TABLE test.partitioned_by_tuple_replica1 FINAL;
SELECT * FROM test.partitioned_by_tuple_replica2 ORDER BY d, x, w, y;
DROP TABLE test.partitioned_by_tuple_replica1;
DROP TABLE test.partitioned_by_tuple_replica2;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册