From 15ee6207abff75386ade7d59ddf8e0903bc2d848 Mon Sep 17 00:00:00 2001 From: Igor Mineev Date: Fri, 24 May 2019 22:03:07 +0300 Subject: [PATCH] Chg 'Schema' to 'StoragePolicy'. Add system.disks. Add system.storage_politics. Add disk to system.parts. Add policy to system.tables. --- dbms/src/Common/ErrorCodes.cpp | 3 +- dbms/src/Interpreters/Context.cpp | 31 +++-- dbms/src/Interpreters/Context.h | 4 +- dbms/src/Storages/IStorage.h | 3 + .../Storages/MergeTree/DataPartsExchange.cpp | 7 +- .../Storages/MergeTree/DiskSpaceMonitor.cpp | 108 +++++++++++------- .../src/Storages/MergeTree/DiskSpaceMonitor.h | 33 ++++-- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 24 ++-- dbms/src/Storages/MergeTree/MergeTreeData.h | 8 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 2 +- .../Storages/MergeTree/MergeTreeSettings.h | 2 +- dbms/src/Storages/StorageMergeTree.cpp | 8 +- .../Storages/StorageReplicatedMergeTree.cpp | 6 +- dbms/src/Storages/System/CMakeLists.txt | 2 +- .../Storages/System/StorageSystemDisks.cpp | 69 +++++++++++ dbms/src/Storages/System/StorageSystemDisks.h | 38 ++++++ .../Storages/System/StorageSystemParts.cpp | 2 + .../System/StorageSystemPartsColumns.cpp | 3 + .../System/StorageSystemStoragePolicies.cpp | 75 ++++++++++++ .../System/StorageSystemStoragePolicies.h | 38 ++++++ .../Storages/System/StorageSystemTables.cpp | 13 +++ .../Storages/System/attachSystemTables.cpp | 4 + 22 files changed, 389 insertions(+), 94 deletions(-) create mode 100644 dbms/src/Storages/System/StorageSystemDisks.cpp create mode 100644 dbms/src/Storages/System/StorageSystemDisks.h create mode 100644 dbms/src/Storages/System/StorageSystemStoragePolicies.cpp create mode 100644 dbms/src/Storages/System/StorageSystemStoragePolicies.h diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 32ab02da8e..4cdda0e5d0 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -427,8 +427,9 @@ namespace ErrorCodes extern const int BAD_TTL_EXPRESSION = 450; extern const int BAD_TTL_FILE = 451; extern const int SETTING_CONSTRAINT_VIOLATION = 452; - extern const int UNKNOWN_SCHEMA = 453; + extern const int UNKNOWN_POLICY = 453; extern const int UNKNOWN_DISK = 454; + extern const int UNKNOWN_PROTOCOL = 455; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index d8696828ff..e38d876154 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -146,10 +146,10 @@ struct ContextShared std::unique_ptr ddl_worker; /// Process ddl commands from zk. /// Rules for selecting the compression settings, depending on the size of the part. mutable std::unique_ptr compression_codec_selector; - /// Storage schema chooser; + /// Storage disk chooser mutable std::unique_ptr merge_tree_disk_selector; - /// Storage schema chooser; - mutable std::unique_ptr merge_tree_schema_selector; + /// Storage policy chooser + mutable std::unique_ptr merge_tree_storage_policy_selector; std::optional merge_tree_settings; /// Settings of MergeTree* engines. size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default) size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default) @@ -1725,18 +1725,33 @@ DiskSelector & Context::getDiskSelector() const } -const SchemaPtr & Context::getSchema(const String & name) const +const StoragePolicyPtr & Context::getStoragePolicy(const String &name) const { auto lock = getLock(); - if (!shared->merge_tree_schema_selector) + if (!shared->merge_tree_storage_policy_selector) { - constexpr auto config_name = "storage_configuration.schemas"; + constexpr auto config_name = "storage_configuration.policies"; auto & config = getConfigRef(); - shared->merge_tree_schema_selector = std::make_unique(config, config_name, getDiskSelector()); + shared->merge_tree_storage_policy_selector = std::make_unique(config, config_name, getDiskSelector()); } - return (*shared->merge_tree_schema_selector)[name]; + return (*shared->merge_tree_storage_policy_selector)[name]; +} + + +StoragePolicySelector & Context::getStoragePolicySelector() const +{ + auto lock = getLock(); + + if (!shared->merge_tree_storage_policy_selector) + { + constexpr auto config_name = "storage_configuration.policies"; + auto & config = getConfigRef(); + + shared->merge_tree_storage_policy_selector = std::make_unique(config, config_name, getDiskSelector()); + } + return *shared->merge_tree_storage_policy_selector; } diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 8bdf51e9bc..a2db00bc8c 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -443,8 +443,10 @@ public: /// Provides storage disks const DiskPtr & getDisk(const String & name) const; + StoragePolicySelector & getStoragePolicySelector() const; + /// Provides storage politics schemes - const SchemaPtr & getSchema(const String & name) const; + const StoragePolicyPtr & getStoragePolicy(const String &name) const; /// Get the server uptime in seconds. time_t getUptimeSeconds() const; diff --git a/dbms/src/Storages/IStorage.h b/dbms/src/Storages/IStorage.h index 301b784b5a..3ca6416b0b 100644 --- a/dbms/src/Storages/IStorage.h +++ b/dbms/src/Storages/IStorage.h @@ -335,6 +335,9 @@ public: /// Returns additional columns that need to be read for FINAL to work. virtual Names getColumnsRequiredForFinal() const { return {}; } + /// Returns storage policy if table supports it + virtual StoragePolicyPtr getStoragePolicy() const { return {}; } + using ITableDeclaration::ITableDeclaration; using std::enable_shared_from_this::shared_from_this; diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index 3a3c21a374..96d7d7319e 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -26,7 +26,7 @@ namespace ErrorCodes extern const int CANNOT_WRITE_TO_OSTREAM; extern const int CHECKSUM_DOESNT_MATCH; extern const int UNKNOWN_TABLE; - extern const int UNKNOWN_ACTION; + extern const int UNKNOWN_PROTOCOL; } namespace DataPartsExchange @@ -62,7 +62,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo else if (protocol_version == "1") part_name = params.get("part_name"); else - throw Exception("Unsupported protocol version", ErrorCodes::UNKNOWN_ACTION); ///@TODO_IGR ASK Is it true error code? + throw Exception("Unsupported fetch protocol version", ErrorCodes::UNKNOWN_PROTOCOL); static std::atomic_uint total_sends {0}; @@ -232,10 +232,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( auto reservation = data.reserveSpaceForPart(sum_files_size); return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in); } - catch (...) ///@TODO_IGR catch exception + catch (const Exception & e) ///@TODO_IGR ASK maybe catch connection and others error here { if (!protocol_error) throw; + LOG_WARNING(log, "Looks like old ClickHouse version node. Trying to use fetch protocol version 0"); ///@TODO_IGR ASK new msg } /// Protocol error diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp index 19babc9ee7..1b7f5e5b8e 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.cpp @@ -1,5 +1,7 @@ #include +#include + #include #include @@ -88,18 +90,24 @@ void DiskSelector::add(const DiskPtr & disk) disks.emplace(disk->getName(), disk); } -Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector) +StoragePolicy::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector) { Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_prefix, keys); - for (const auto & name : keys) + Logger * logger = &Logger::get("StorageConfiguration"); + + for (const auto & disk : keys) { - if (startsWith(name, "disk")) + if (startsWith(disk, "disk")) { - auto disk_name = config.getString(config_prefix + "." + name); + auto disk_name = config.getString(config_prefix + "." + disk); disks.push_back(disk_selector[disk_name]); } + else + { + LOG_WARNING(logger, "Unused param " << config_prefix << '.' << disk); + } } if (disks.empty()) @@ -113,8 +121,6 @@ Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const s ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); } - Logger * logger = &Logger::get("StorageConfiguration"); - if (has_max_bytes) { max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes"); @@ -144,12 +150,13 @@ Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const s { max_data_part_size = std::numeric_limits::max(); } - constexpr UInt64 SIZE_8MB = 8ull << 20u; - if (max_data_part_size < SIZE_8MB) - LOG_WARNING(logger, "Volume max_data_part_size is too low (" << max_data_part_size << " < " << SIZE_8MB << ")"); + constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u; + if (max_data_part_size < MIN_PART_SIZE) + LOG_WARNING(logger, "Volume max_data_part_size is too low (" << formatReadableSizeWithBinarySuffix(max_data_part_size) << + " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")"); } -DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) const +DiskSpaceMonitor::ReservationPtr StoragePolicy::Volume::reserve(UInt64 expected_size) const { /// This volume can not store files which size greater than max_data_part_size @@ -163,19 +170,19 @@ DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) c size_t index = (start_from + i) % disks_num; auto reservation = DiskSpaceMonitor::tryToReserve(disks[index], expected_size); - if (reservation && *reservation) + if (reservation && reservation->isValid()) return reservation; } return {}; } -DiskSpaceMonitor::ReservationPtr Schema::Volume::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const +DiskSpaceMonitor::ReservationPtr StoragePolicy::Volume::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const { /// This volume can not store files which size greater than max_data_part_size /// Reserve and Warn it if (expected_size > max_data_part_size) - LOG_WARNING(&Logger::get("StorageSchema"), "Volume max_data_part_size limit exceed: " << expected_size); + LOG_WARNING(&Logger::get("StoragePolicy"), "Volume max_data_part_size limit exceed: " << expected_size); size_t disks_num = disks.size(); for (size_t i = 0; i != disks_num; ++i) @@ -185,15 +192,15 @@ DiskSpaceMonitor::ReservationPtr Schema::Volume::reserveAtDisk(const DiskPtr & d } auto reservation = DiskSpaceMonitor::tryToReserve(disks[i], expected_size); - if (reservation && *reservation) + if (reservation && reservation->isValid()) return reservation; return {}; } - LOG_DEBUG(&Logger::get("StorageSchema"), "Volume has no disk " << disk->getName()); + LOG_DEBUG(&Logger::get("StoragePolicy"), "Volume has no disk " << disk->getName()); return {}; } -UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const +UInt64 StoragePolicy::Volume::getMaxUnreservedFreeSpace() const { UInt64 res = 0; for (const auto & disk : disks) @@ -201,7 +208,7 @@ UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const return res; } -Schema::Schema(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, +StoragePolicy::StoragePolicy(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks) : name(name_) { Poco::Util::AbstractConfiguration::Keys keys; @@ -209,16 +216,29 @@ Schema::Schema(const String & name_, const Poco::Util::AbstractConfiguration & c for (const auto & attr_name : keys) { - if (!startsWith(name, "volume")) + if (!startsWith(attr_name, "volume")) throw Exception("Unknown element in config: " + config_prefix + "." + attr_name + ", must be 'volume'", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); volumes.emplace_back(config, config_prefix + "." + attr_name, disks); } if (volumes.empty()) - throw Exception("Schema must contain at least one Volume", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + throw Exception("StoragePolicy must contain at least one Volume", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + /// Check that disks are unique in Policy + std::set disk_names; + for (const auto & volume : volumes) + { + for (const auto & disk : volume.disks) + { + if (disk_names.find(disk->getName()) != disk_names.end()) + throw Exception("StoragePolicy disks must not be repeated: " + disk->getName(), ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + + disk_names.insert(disk->getName()); + } + } } -Schema::Disks Schema::getDisks() const +StoragePolicy::Disks StoragePolicy::getDisks() const { Disks res; for (const auto & volume : volumes) @@ -227,24 +247,24 @@ Schema::Disks Schema::getDisks() const return res; } -DiskPtr Schema::getAnyDisk() const +DiskPtr StoragePolicy::getAnyDisk() const { - /// Schema must contain at least one Volume + /// StoragePolicy must contain at least one Volume /// Volume must contain at least one Disk if (volumes.empty()) { - LOG_ERROR(&Logger::get("StorageSchema"), "No volumes at schema " << name); - throw Exception("Schema has no Volumes. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE); + LOG_ERROR(&Logger::get("StoragePolicy"), "No volumes at StoragePolicy " << name); + throw Exception("StoragePolicy has no Volumes. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE); } if (volumes[0].disks.empty()) { - LOG_ERROR(&Logger::get("StorageSchema"), "No Disks at volume 0 at schema " << name); - throw Exception("Schema Volume 1 has no Disks. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE); + LOG_ERROR(&Logger::get("StoragePolicy"), "No Disks at volume 0 at StoragePolicy " << name); + throw Exception("StoragePolicy Volume 1 has no Disks. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE); } return volumes[0].disks[0]; } -UInt64 Schema::getMaxUnreservedFreeSpace() const +UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const { UInt64 res = 0; for (const auto & volume : volumes) @@ -252,7 +272,7 @@ UInt64 Schema::getMaxUnreservedFreeSpace() const return res; } -DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const +DiskSpaceMonitor::ReservationPtr StoragePolicy::reserve(UInt64 expected_size) const { for (const auto & volume : volumes) { @@ -263,7 +283,7 @@ DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const return {}; } -DiskSpaceMonitor::ReservationPtr Schema::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const +DiskSpaceMonitor::ReservationPtr StoragePolicy::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const { for (const auto & volume : volumes) { @@ -274,7 +294,7 @@ DiskSpaceMonitor::ReservationPtr Schema::reserveAtDisk(const DiskPtr & disk, UIn return {}; } -DiskSpaceMonitor::ReservationPtr Schema::reserveOnMaxDiskWithoutReservation() const +DiskSpaceMonitor::ReservationPtr StoragePolicy::reserveOnMaxDiskWithoutReservation() const { UInt64 max_space = 0; DiskPtr max_disk; @@ -293,35 +313,35 @@ DiskSpaceMonitor::ReservationPtr Schema::reserveOnMaxDiskWithoutReservation() co return DiskSpaceMonitor::tryToReserve(max_disk, 0); } -SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks) +StoragePolicySelector::StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks) { Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_prefix, keys); - Logger * logger = &Logger::get("SchemaSelector"); + Logger * logger = &Logger::get("StoragePolicySelector"); for (const auto & name : keys) { if (!std::all_of(name.begin(), name.end(), isWordCharASCII)) - throw Exception("Schema name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); - schemas.emplace(name, std::make_shared(name, config, config_prefix + "." + name, disks)); - LOG_INFO(logger, "Storage schema " << name << " loaded"); + throw Exception("StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + policies.emplace(name, std::make_shared(name, config, config_prefix + "." + name, disks)); + LOG_INFO(logger, "Storage policy " << name << " loaded"); } - constexpr auto default_schema_name = "default"; + constexpr auto default_storage_policy_name = "default"; constexpr auto default_disk_name = "default"; - if (schemas.find(default_schema_name) == schemas.end()) - schemas.emplace(default_schema_name, - std::make_shared(default_schema_name, - Schema::Volumes{{std::vector{disks[default_disk_name]}, + if (policies.find(default_storage_policy_name) == policies.end()) + policies.emplace(default_storage_policy_name, + std::make_shared(default_storage_policy_name, + StoragePolicy::Volumes{{std::vector{disks[default_disk_name]}, std::numeric_limits::max()}})); } -const SchemaPtr & SchemaSelector::operator[](const String & name) const +const StoragePolicyPtr & StoragePolicySelector::operator[](const String & name) const { - auto it = schemas.find(name); - if (it == schemas.end()) - throw Exception("Unknown schema " + name, ErrorCodes::UNKNOWN_SCHEMA); + auto it = policies.find(name); + if (it == policies.end()) + throw Exception("Unknown StoragePolicy " + name, ErrorCodes::UNKNOWN_POLICY); return it->second; } diff --git a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h index e06d27c5fc..13d4315d25 100644 --- a/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h +++ b/dbms/src/Storages/MergeTree/DiskSpaceMonitor.h @@ -26,7 +26,7 @@ namespace ErrorCodes extern const int NOT_ENOUGH_SPACE; extern const int UNKNOWN_ELEMENT_IN_CONFIG; extern const int EXCESSIVE_ELEMENT_IN_CONFIG; - extern const int UNKNOWN_SCHEMA; + extern const int UNKNOWN_POLICY; extern const int UNKNOWN_DISK; } @@ -197,7 +197,7 @@ public: valid = true; } - explicit operator bool() const noexcept { + bool isValid() const { return valid; } @@ -270,19 +270,21 @@ public: void add(const DiskPtr & disk); + const auto & getDisksMap() const { return disks; } + private: std::map disks; }; -class Schema +class StoragePolicy { public: using Disks = std::vector; class Volume { - friend class Schema; + friend class StoragePolicy; public: Volume(std::vector disks_, UInt64 max_data_part_size_) @@ -319,19 +321,20 @@ public: UInt64 getMaxUnreservedFreeSpace() const; - private: UInt64 max_data_part_size = std::numeric_limits::max(); Disks disks; + + private: mutable std::atomic last_used = 0; }; using Volumes = std::vector; - Schema(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, + StoragePolicy(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks); - Schema(const String & name_, Volumes volumes_) : volumes(std::move(volumes_)), name(name_) { } + StoragePolicy(const String & name_, Volumes volumes_) : volumes(std::move(volumes_)), name(name_) { } Disks getDisks() const; @@ -339,6 +342,8 @@ public: UInt64 getMaxUnreservedFreeSpace() const; + const String & getName() const { return name; } + /// Returns valid reservation or null DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const; @@ -349,22 +354,26 @@ public: /// Do not use this function when it is possible to predict size!!! DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() const; + const auto & getVolumes() const { return volumes; } + private: Volumes volumes; String name; }; -using SchemaPtr = std::shared_ptr; +using StoragePolicyPtr = std::shared_ptr; -class SchemaSelector +class StoragePolicySelector { public: - SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks); + StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks); + + const StoragePolicyPtr & operator[](const String & name) const; - const SchemaPtr & operator[](const String & name) const; + const auto & getPoliciesMap() const { return policies; } private: - std::map schemas; + std::map policies; }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 8274c7b37f..098dbbc0e7 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -114,7 +114,7 @@ MergeTreeData::MergeTreeData( ttl_table_ast(ttl_table_ast_), require_part_metadata(require_part_metadata_), database_name(database_), table_name(table_), - schema(context_.getSchema(settings.storage_schema_name)), + storage_policy(context_.getStoragePolicy(settings.storage_policy_name)), broken_part_callback(broken_part_callback_), log_name(database_name + "." + table_name), log(&Logger::get(log_name)), data_parts_by_info(data_parts_indexes.get()), @@ -187,7 +187,7 @@ MergeTreeData::MergeTreeData( /// If not choose any if (version_file_path.empty()) - version_file_path = getFullPathOnDisk(schema->getAnyDisk()) + "format_version.txt"; + version_file_path = getFullPathOnDisk(storage_policy->getAnyDisk()) + "format_version.txt"; bool version_file_exists = Poco::File(version_file_path).exists(); @@ -766,7 +766,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) std::vector> part_names_with_disks; Poco::DirectoryIterator end; - auto disks = schema->getDisks(); + auto disks = storage_policy->getDisks(); for (auto disk_ptr : disks) { @@ -1123,7 +1123,7 @@ void MergeTreeData::rename(const String & /*new_path_to_db*/, const String & new auto old_file_table_name = escapeForFileName(table_name); auto new_file_table_name = escapeForFileName(new_table_name); - auto disks = schema->getDisks(); + auto disks = storage_policy->getDisks(); for (const auto & disk : disks) { @@ -2604,13 +2604,13 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size) { - constexpr UInt64 SIZE_1MB = 1ull << 20u; ///@TODO_IGR ASK Is it OK? - constexpr UInt64 MAGIC_CONST = 1; + constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; ///@TODO_IGR ASK Is it OK? + constexpr UInt64 RESERVATION_MULTIPLY_ESTIMATION_FACTOR = 1; - if (expected_size < SIZE_1MB) - expected_size = SIZE_1MB; + if (expected_size < RESERVATION_MIN_ESTIMATION_SIZE) + expected_size = RESERVATION_MIN_ESTIMATION_SIZE; - auto reservation = reserveSpace(expected_size * MAGIC_CONST); + auto reservation = reserveSpace(expected_size * RESERVATION_MULTIPLY_ESTIMATION_FACTOR); if (reservation) return reservation; @@ -2820,12 +2820,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size) { - return schema->reserve(expected_size); + return storage_policy->reserve(expected_size); } DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(const DiskPtr & disk, UInt64 expected_size) { - return schema->reserveAtDisk(disk, expected_size); + return storage_policy->reserveAtDisk(disk, expected_size); } String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const @@ -2836,7 +2836,7 @@ String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const Strings MergeTreeData::getDataPaths() const { Strings res; - auto disks = schema->getDisks(); + auto disks = storage_policy->getDisks(); for (const auto & disk : disks) res.push_back(getFullPathOnDisk(disk)); return res; diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 2b7df38598..d238865092 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -313,7 +313,7 @@ public: }; - /// Attach the table corresponding to the directory in full_path inside schema (must end with /), with the given columns. + /// Attach the table corresponding to the directory in full_path inside policy (must end with /), with the given columns. /// Correctness of names and paths is not checked. /// /// date_column_name - if not empty, the name of the Date column used for partitioning by month. @@ -356,6 +356,8 @@ public: Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; } Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); } + StoragePolicyPtr getStoragePolicy() const override { return storage_policy; } + bool supportsPrewhere() const override { return true; } bool supportsSampling() const override { return sample_by_ast != nullptr; } @@ -609,7 +611,7 @@ public: /// Choose disk with max available free space /// Reserves 0 bytes - DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() { return schema->reserveOnMaxDiskWithoutReservation(); } + DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() { return storage_policy->reserveOnMaxDiskWithoutReservation(); } MergeTreeDataFormatVersion format_version; @@ -687,7 +689,7 @@ protected: String database_name; String table_name; - SchemaPtr schema; + StoragePolicyPtr storage_policy; /// Current column sizes in compressed and uncompressed form. ColumnSizeByName column_sizes; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 554d429f57..a9456c5db1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -152,7 +152,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_ data.settings.max_bytes_to_merge_at_max_space_in_pool, static_cast(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge); - return std::min(max_size, static_cast(data.schema->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT)); + return std::min(max_size, static_cast(data.storage_policy->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT)); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index ef453f35ed..061efa9a48 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -79,7 +79,7 @@ struct MergeTreeSettings : public SettingsCollection M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \ M(SettingUInt64, index_granularity_bytes, 0, "Approximate amount of bytes in single granule (0 - disabled).") \ M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \ - M(SettingString, storage_schema_name, "default", "Name of storage schema") + M(SettingString, storage_policy_name, "default", "Name of storage policy") DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS) diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index beaba58959..40467070d2 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -336,7 +336,7 @@ public: void StorageMergeTree::mutate(const MutationCommands & commands, const Context &) { /// Choose any disk. - auto disk = schema->getAnyDisk(); + auto disk = storage_policy->getAnyDisk(); MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get()); String file_name; { @@ -489,7 +489,7 @@ bool StorageMergeTree::merge( } else { - UInt64 disk_space = schema->getMaxUnreservedFreeSpace(); + UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace(); selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason); } @@ -580,7 +580,7 @@ bool StorageMergeTree::tryMutatePart() std::optional tagger; { /// DataPArt can be store only at one disk. Get Max of free space at all disks - UInt64 disk_space = schema->getMaxUnreservedFreeSpace(); + UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace(); std::lock_guard lock(currently_merging_mutex); @@ -975,7 +975,7 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par { LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir); ActiveDataPartSet active_parts(format_version); - const auto disks = schema->getDisks(); + const auto disks = storage_policy->getDisks(); for (const DiskPtr & disk : disks) { const auto full_path = getFullPathOnDisk(disk); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index f969821850..772bbe533e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3002,7 +3002,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p for (const DataPartPtr & part : data_parts) partition_ids.emplace(part->info.partition_id); - UInt64 disk_space = schema->getMaxUnreservedFreeSpace(); + UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace(); for (const String & partition_id : partition_ids) { @@ -3029,7 +3029,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p } else { - UInt64 disk_space = schema->getMaxUnreservedFreeSpace(); + UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace(); String partition_id = getPartitionIDFromQuery(partition, query_context); selected = merger_mutator.selectAllPartsToMergeWithinPartition( future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason); @@ -3555,7 +3555,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool ActiveDataPartSet active_parts(format_version); std::set part_names; - const auto disks = schema->getDisks(); + const auto disks = storage_policy->getDisks(); for (const DiskPtr & disk : disks) { const auto full_path = getFullPathOnDisk(disk); diff --git a/dbms/src/Storages/System/CMakeLists.txt b/dbms/src/Storages/System/CMakeLists.txt index d59befa56b..34981776c3 100644 --- a/dbms/src/Storages/System/CMakeLists.txt +++ b/dbms/src/Storages/System/CMakeLists.txt @@ -11,5 +11,5 @@ configure_file (StorageSystemBuildOptions.generated.cpp.in ${CONFIG_BUILD}) include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake) add_headers_and_sources(storages_system .) list (APPEND storages_system_sources ${CONFIG_BUILD}) -add_library(clickhouse_storages_system ${storages_system_headers} ${storages_system_sources}) +add_library(clickhouse_storages_system ${storages_system_headers} ${storages_system_sources} StorageSystemDisks.cpp StorageSystemStoragePolicies.cpp StorageSystemStoragePolicies.h) target_link_libraries(clickhouse_storages_system PRIVATE dbms common string_utils clickhouse_common_zookeeper) diff --git a/dbms/src/Storages/System/StorageSystemDisks.cpp b/dbms/src/Storages/System/StorageSystemDisks.cpp new file mode 100644 index 0000000000..f0c4077bb0 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemDisks.cpp @@ -0,0 +1,69 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +} + + +StorageSystemDisks::StorageSystemDisks(const std::string & name_) + : name(name_) +{ + setColumns(ColumnsDescription( + { + {"name", std::make_shared()}, + {"path", std::make_shared()}, + {"free_space", std::make_shared()}, + {"total_space", std::make_shared()}, + {"keep_free_space", std::make_shared()}, + })); +} + +BlockInputStreams StorageSystemDisks::read( + const Names & column_names, + const SelectQueryInfo & /*query_info*/, + const Context & context, + QueryProcessingStage::Enum /*processed_stage*/, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + check(column_names); + + MutableColumnPtr col_name_mut = ColumnString::create(); + MutableColumnPtr col_path_mut = ColumnString::create(); + MutableColumnPtr col_free_mut = ColumnUInt64::create(); + MutableColumnPtr col_total_mut = ColumnUInt64::create(); + MutableColumnPtr col_keep_mut = ColumnUInt64::create(); + + const auto & disk_selector = context.getDiskSelector(); + + for (const auto & [name, disk_ptr] : disk_selector.getDisksMap()) + { + col_name_mut->insert(name); + col_path_mut->insert(disk_ptr->getPath()); + col_free_mut->insert(disk_ptr->getAvailableSpace()); + col_total_mut->insert(disk_ptr->getTotalSpace()); + col_keep_mut->insert(disk_ptr->getKeepingFreeSpace()); + } + + ColumnPtr col_name = std::move(col_name_mut); + ColumnPtr col_path = std::move(col_path_mut); + ColumnPtr col_free = std::move(col_free_mut); + ColumnPtr col_total = std::move(col_total_mut); + ColumnPtr col_keep = std::move(col_keep_mut); + + Block res = getSampleBlock().cloneEmpty(); + size_t col_num = 0; + res.getByPosition(col_num++).column = col_name; + res.getByPosition(col_num++).column = col_path; + res.getByPosition(col_num++).column = col_free; + res.getByPosition(col_num++).column = col_total; + res.getByPosition(col_num++).column = col_keep; + + return BlockInputStreams(1, std::make_shared(res)); +} + +} diff --git a/dbms/src/Storages/System/StorageSystemDisks.h b/dbms/src/Storages/System/StorageSystemDisks.h new file mode 100644 index 0000000000..45be4be905 --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemDisks.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +class Context; + + +/** Implements the system table `disks`, which allows you to get information about all disks. +*/ +class StorageSystemDisks : public ext::shared_ptr_helper, public IStorage +{ +public: + std::string getName() const override { return "SystemDisks"; } + std::string getTableName() const override { return name; } + + BlockInputStreams read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + +private: + const std::string name; + +protected: + StorageSystemDisks(const std::string & name_); +}; + +} diff --git a/dbms/src/Storages/System/StorageSystemParts.cpp b/dbms/src/Storages/System/StorageSystemParts.cpp index 9a8298f966..34af18a4cb 100644 --- a/dbms/src/Storages/System/StorageSystemParts.cpp +++ b/dbms/src/Storages/System/StorageSystemParts.cpp @@ -42,6 +42,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name) {"database", std::make_shared()}, {"table", std::make_shared()}, {"engine", std::make_shared()}, + {"disk_name", std::make_shared()}, ///@TODO_IGR ASK is is OK? {"path", std::make_shared()}, } ) @@ -95,6 +96,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor columns[i++]->insert(info.database); columns[i++]->insert(info.table); columns[i++]->insert(info.engine); + columns[i++]->insert(part->disk->getName()); columns[i++]->insert(part->getFullPath()); if (has_state_column) diff --git a/dbms/src/Storages/System/StorageSystemPartsColumns.cpp b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp index 29207e0724..f71c311f6f 100644 --- a/dbms/src/Storages/System/StorageSystemPartsColumns.cpp +++ b/dbms/src/Storages/System/StorageSystemPartsColumns.cpp @@ -42,6 +42,7 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name) {"database", std::make_shared()}, {"table", std::make_shared()}, {"engine", std::make_shared()}, + {"disk_name", std::make_shared()}, ///@TODO_IGR ASK is is OK? {"path", std::make_shared()}, {"column", std::make_shared()}, @@ -130,7 +131,9 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con columns[j++]->insert(info.database); columns[j++]->insert(info.table); columns[j++]->insert(info.engine); + columns[j++]->insert(part->disk->getName()); columns[j++]->insert(part->getFullPath()); + columns[j++]->insert(column.name); columns[j++]->insert(column.type->getName()); diff --git a/dbms/src/Storages/System/StorageSystemStoragePolicies.cpp b/dbms/src/Storages/System/StorageSystemStoragePolicies.cpp new file mode 100644 index 0000000000..b8a58dadde --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemStoragePolicies.cpp @@ -0,0 +1,75 @@ +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ +} + + +StorageSystemStoragePolicies::StorageSystemStoragePolicies(const std::string & name_) + : name(name_) +{ + setColumns(ColumnsDescription( + { + {"name", std::make_shared()}, + {"volume_priority", std::make_shared()}, + {"disks", std::make_shared(std::make_shared())}, + {"max_data_part_size", std::make_shared()}, + })); +} + +BlockInputStreams StorageSystemStoragePolicies::read( + const Names & column_names, + const SelectQueryInfo & /*query_info*/, + const Context & context, + QueryProcessingStage::Enum /*processed_stage*/, + const size_t /*max_block_size*/, + const unsigned /*num_streams*/) +{ + check(column_names); + + MutableColumnPtr col_name_mut = ColumnString::create(); + MutableColumnPtr col_priority_mut = ColumnUInt64::create(); + MutableColumnPtr col_disks_mut = ColumnArray::create(ColumnString::create()); + MutableColumnPtr col_max_part_size_mut = ColumnUInt64::create(); + + const auto & policy_selector = context.getStoragePolicySelector(); + + for (const auto & [name, policy_ptr] : policy_selector.getPoliciesMap()) + { + const auto & volumes = policy_ptr->getVolumes(); + for (size_t i = 0; i != volumes.size(); ++i) + { + col_name_mut->insert(name); + col_priority_mut->insert(i); + Array disks; + disks.reserve(volumes[i].disks.size()); + for (const auto & disk_ptr : volumes[i].disks) + disks.push_back(disk_ptr->getName()); + col_disks_mut->insert(disks); + col_max_part_size_mut->insert(volumes[i].max_data_part_size); + } + } + + ColumnPtr col_name = std::move(col_name_mut); + ColumnPtr col_priority = std::move(col_priority_mut); + ColumnPtr col_disks = std::move(col_disks_mut); + ColumnPtr col_max_part_size = std::move(col_max_part_size_mut); + + Block res = getSampleBlock().cloneEmpty(); + size_t col_num = 0; + res.getByPosition(col_num++).column = col_name; + res.getByPosition(col_num++).column = col_priority; + res.getByPosition(col_num++).column = col_disks; + res.getByPosition(col_num++).column = col_max_part_size; + + return BlockInputStreams(1, std::make_shared(res)); +} + +} diff --git a/dbms/src/Storages/System/StorageSystemStoragePolicies.h b/dbms/src/Storages/System/StorageSystemStoragePolicies.h new file mode 100644 index 0000000000..f29abab5ec --- /dev/null +++ b/dbms/src/Storages/System/StorageSystemStoragePolicies.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include + + +namespace DB +{ + +class Context; + + +/** Implements the system table `storage`, which allows you to get information about all disks. +*/ +class StorageSystemStoragePolicies : public ext::shared_ptr_helper, public IStorage +{ +public: + std::string getName() const override { return "SystemStoragePolicies"; } + std::string getTableName() const override { return name; } + + BlockInputStreams read( + const Names & column_names, + const SelectQueryInfo & query_info, + const Context & context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) override; + +private: + const std::string name; + +protected: + StorageSystemStoragePolicies(const std::string & name_); +}; + +} diff --git a/dbms/src/Storages/System/StorageSystemTables.cpp b/dbms/src/Storages/System/StorageSystemTables.cpp index 92f5bdda92..e80066c89f 100644 --- a/dbms/src/Storages/System/StorageSystemTables.cpp +++ b/dbms/src/Storages/System/StorageSystemTables.cpp @@ -45,6 +45,7 @@ StorageSystemTables::StorageSystemTables(const std::string & name_) {"sorting_key", std::make_shared()}, {"primary_key", std::make_shared()}, {"sampling_key", std::make_shared()}, + {"storage_policy", std::make_shared()}, })); } @@ -161,6 +162,9 @@ protected: if (columns_mask[src_index++]) res_columns[res_index++]->insertDefault(); + + if (columns_mask[src_index++]) + res_columns[res_index++]->insertDefault(); } } @@ -308,6 +312,15 @@ protected: else res_columns[res_index++]->insertDefault(); } + + if (columns_mask[src_index++]) + { + auto policy = table->getStoragePolicy(); + if (policy) + res_columns[res_index++]->insert(policy->getName()); + else + res_columns[res_index++]->insertDefault(); + } } } diff --git a/dbms/src/Storages/System/attachSystemTables.cpp b/dbms/src/Storages/System/attachSystemTables.cpp index 34e03032ff..420cc5d053 100644 --- a/dbms/src/Storages/System/attachSystemTables.cpp +++ b/dbms/src/Storages/System/attachSystemTables.cpp @@ -33,6 +33,8 @@ #include #include #include +#include +#include namespace DB @@ -65,6 +67,8 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper) attachSystemTablesLocal(system_database); system_database.attachTable("parts", StorageSystemParts::create("parts")); system_database.attachTable("parts_columns", StorageSystemPartsColumns::create("parts_columns")); + system_database.attachTable("disks", StorageSystemDisks::create("disks")); + system_database.attachTable("storage_policies", StorageSystemStoragePolicies::create("storage_policies")); system_database.attachTable("processes", StorageSystemProcesses::create("processes")); system_database.attachTable("metrics", StorageSystemMetrics::create("metrics")); system_database.attachTable("merges", StorageSystemMerges::create("merges")); -- GitLab