From ea01a3b22eb8a9dd405de6b669dfce24629a0ca3 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Fri, 6 Jul 2018 23:25:22 +0800 Subject: [PATCH] add OPTIMIZE FINAL support for ReplicatedMergeTree. --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 3 +- .../MergeTree/MergeTreeDataMergerMutator.h | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 67 ++++++++++++------- ...ated_without_partition_zookeeper.reference | 7 ++ ...replicated_without_partition_zookeeper.sql | 19 ++++++ 5 files changed, 70 insertions(+), 28 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.reference create mode 100644 dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 870d690183..3597482388 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -243,7 +243,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition( FuturePart & future_part, - size_t available_disk_space, + size_t & available_disk_space, const AllowedMergingPredicate & can_merge, const String & partition_id, bool final, @@ -306,6 +306,7 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition( LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name); future_part.assign(std::move(parts)); + available_disk_space -= required_disk_space; return true; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 68f2df6cdb..5e6c9b634b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -71,7 +71,7 @@ public: */ bool selectAllPartsToMergeWithinPartition( FuturePart & future_part, - size_t available_disk_space, + size_t & available_disk_space, const AllowedMergingPredicate & can_merge, const String & partition_id, bool final, diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 75d5721061..199e23394a 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -106,7 +106,6 @@ namespace ErrorCodes extern const int INCORRECT_FILE_NAME; extern const int CANNOT_ASSIGN_OPTIMIZE; extern const int KEEPER_EXCEPTION; - extern const int BAD_ARGUMENTS; } namespace ActionLocks @@ -2905,29 +2904,9 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p std::lock_guard merge_selecting_lock(merge_selecting_mutex); size_t disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path); - - MergeTreeDataMergerMutator::FuturePart future_merged_part; - String disable_reason; - bool selected = false; - auto zookeeper = getZooKeeper(); ReplicatedMergeTreeMergePredicate can_merge = queue.getMergePredicate(zookeeper); - if (!partition) - { - if (final) - throw Exception("FINAL flag for OPTIMIZE query on Replicated table is meaningful only with specified PARTITION", ErrorCodes::BAD_ARGUMENTS); - - selected = merger_mutator.selectPartsToMerge( - future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason); - } - else - { - String partition_id = data.getPartitionIDFromQuery(partition, context); - selected = merger_mutator.selectAllPartsToMergeWithinPartition( - future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason); - } - auto handle_noop = [&] (const String & message) { if (context.getSettingsRef().optimize_throw_if_noop) @@ -2935,14 +2914,50 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p return false; }; - if (!selected) + if (!partition && final) { - LOG_INFO(log, "Cannot select parts for optimization" + (disable_reason.empty() ? "" : ": " + disable_reason)); - return handle_noop(disable_reason); + MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector(); + std::unordered_set partition_ids; + + for (const MergeTreeData::DataPartPtr & part : data_parts) + partition_ids.emplace(part->info.partition_id); + + for (const String & partition_id : partition_ids) + { + MergeTreeDataMergerMutator::FuturePart future_merged_part; + bool selected = merger_mutator.selectAllPartsToMergeWithinPartition( + future_merged_part, disk_space, can_merge, partition_id, true, nullptr); + if (selected && + !createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry)) + return handle_noop("Can't create merge queue node in ZooKeeper"); + } } + else + { + MergeTreeDataMergerMutator::FuturePart future_merged_part; + String disable_reason; + bool selected = false; + if (!partition) + { + selected = merger_mutator.selectPartsToMerge( + future_merged_part, true, data.settings.max_bytes_to_merge_at_max_space_in_pool, can_merge, &disable_reason); + } + else + { + String partition_id = data.getPartitionIDFromQuery(partition, context); + selected = merger_mutator.selectAllPartsToMergeWithinPartition( + future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason); + } + + if (!selected) + { + LOG_INFO(log, "Cannot select parts for optimization" + (disable_reason.empty() ? "" : ": " + disable_reason)); + return handle_noop(disable_reason); + } - if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry)) - return handle_noop("Can't create merge queue node in ZooKeeper"); + if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry)) + return handle_noop("Can't create merge queue node in ZooKeeper"); + } } /// TODO: Bad setting name for such purpose diff --git a/dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.reference b/dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.reference new file mode 100644 index 0000000000..c56a995623 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.reference @@ -0,0 +1,7 @@ +2000-01-01 1 first 1 +2000-01-01 1 first 2 +2000-01-01 2 first 2 +2000-01-02 1 first 3 +2000-01-01 1 first 3 +2000-01-01 2 first 2 +2000-01-02 1 first 3 diff --git a/dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql b/dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql new file mode 100644 index 0000000000..372b09d2c2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00661_optimize_final_replicated_without_partition_zookeeper.sql @@ -0,0 +1,19 @@ +DROP TABLE IF EXISTS test.partitioned_by_tuple_replica1; +DROP TABLE IF EXISTS test.partitioned_by_tuple_replica2; +CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1') PARTITION BY (d, x) ORDER BY (d, x, w); +CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, w String, y UInt8) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2') PARTITION BY (d, x) ORDER BY (d, x, w); + +INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-02', 1, 'first', 3); +INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-01', 2, 'first', 2); +INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-01', 1, 'first', 1), ('2000-01-01', 1, 'first', 2); + +OPTIMIZE TABLE test.partitioned_by_tuple_replica1; + +SELECT * FROM test.partitioned_by_tuple_replica2 ORDER BY d, x, w, y; + +OPTIMIZE TABLE test.partitioned_by_tuple_replica1 FINAL; + +SELECT * FROM test.partitioned_by_tuple_replica2 ORDER BY d, x, w, y; + +DROP TABLE test.partitioned_by_tuple_replica1; +DROP TABLE test.partitioned_by_tuple_replica2; -- GitLab