提交 15ee6207 编写于 作者: I Igor Mineev

Chg 'Schema' to 'StoragePolicy'. Add system.disks. Add...

Chg 'Schema' to 'StoragePolicy'. Add system.disks. Add system.storage_politics. Add disk to system.parts. Add policy to system.tables.
上级 4f61c9d3
......@@ -427,8 +427,9 @@ namespace ErrorCodes
extern const int BAD_TTL_EXPRESSION = 450;
extern const int BAD_TTL_FILE = 451;
extern const int SETTING_CONSTRAINT_VIOLATION = 452;
extern const int UNKNOWN_SCHEMA = 453;
extern const int UNKNOWN_POLICY = 453;
extern const int UNKNOWN_DISK = 454;
extern const int UNKNOWN_PROTOCOL = 455;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -146,10 +146,10 @@ struct ContextShared
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage schema chooser;
/// Storage disk chooser
mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector;
/// Storage schema chooser;
mutable std::unique_ptr<SchemaSelector> merge_tree_schema_selector;
/// Storage policy chooser
mutable std::unique_ptr<StoragePolicySelector> merge_tree_storage_policy_selector;
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
......@@ -1725,18 +1725,33 @@ DiskSelector & Context::getDiskSelector() const
}
const SchemaPtr & Context::getSchema(const String & name) const
const StoragePolicyPtr & Context::getStoragePolicy(const String &name) const
{
auto lock = getLock();
if (!shared->merge_tree_schema_selector)
if (!shared->merge_tree_storage_policy_selector)
{
constexpr auto config_name = "storage_configuration.schemas";
constexpr auto config_name = "storage_configuration.policies";
auto & config = getConfigRef();
shared->merge_tree_schema_selector = std::make_unique<SchemaSelector>(config, config_name, getDiskSelector());
shared->merge_tree_storage_policy_selector = std::make_unique<StoragePolicySelector>(config, config_name, getDiskSelector());
}
return (*shared->merge_tree_schema_selector)[name];
return (*shared->merge_tree_storage_policy_selector)[name];
}
StoragePolicySelector & Context::getStoragePolicySelector() const
{
auto lock = getLock();
if (!shared->merge_tree_storage_policy_selector)
{
constexpr auto config_name = "storage_configuration.policies";
auto & config = getConfigRef();
shared->merge_tree_storage_policy_selector = std::make_unique<StoragePolicySelector>(config, config_name, getDiskSelector());
}
return *shared->merge_tree_storage_policy_selector;
}
......
......@@ -443,8 +443,10 @@ public:
/// Provides storage disks
const DiskPtr & getDisk(const String & name) const;
StoragePolicySelector & getStoragePolicySelector() const;
/// Provides storage politics schemes
const SchemaPtr & getSchema(const String & name) const;
const StoragePolicyPtr & getStoragePolicy(const String &name) const;
/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;
......
......@@ -335,6 +335,9 @@ public:
/// Returns additional columns that need to be read for FINAL to work.
virtual Names getColumnsRequiredForFinal() const { return {}; }
/// Returns storage policy if table supports it
virtual StoragePolicyPtr getStoragePolicy() const { return {}; }
using ITableDeclaration::ITableDeclaration;
using std::enable_shared_from_this<IStorage>::shared_from_this;
......
......@@ -26,7 +26,7 @@ namespace ErrorCodes
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int UNKNOWN_TABLE;
extern const int UNKNOWN_ACTION;
extern const int UNKNOWN_PROTOCOL;
}
namespace DataPartsExchange
......@@ -62,7 +62,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
else if (protocol_version == "1")
part_name = params.get("part_name");
else
throw Exception("Unsupported protocol version", ErrorCodes::UNKNOWN_ACTION); ///@TODO_IGR ASK Is it true error code?
throw Exception("Unsupported fetch protocol version", ErrorCodes::UNKNOWN_PROTOCOL);
static std::atomic_uint total_sends {0};
......@@ -232,10 +232,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
auto reservation = data.reserveSpaceForPart(sum_files_size);
return downloadPart(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);
}
catch (...) ///@TODO_IGR catch exception
catch (const Exception & e) ///@TODO_IGR ASK maybe catch connection and others error here
{
if (!protocol_error)
throw;
LOG_WARNING(log, "Looks like old ClickHouse version node. Trying to use fetch protocol version 0"); ///@TODO_IGR ASK new msg
}
/// Protocol error
......
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <set>
#include <Common/escapeForFileName.h>
#include <Poco/File.h>
......@@ -88,18 +90,24 @@ void DiskSelector::add(const DiskPtr & disk)
disks.emplace(disk->getName(), disk);
}
Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector)
StoragePolicy::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & name : keys)
Logger * logger = &Logger::get("StorageConfiguration");
for (const auto & disk : keys)
{
if (startsWith(name, "disk"))
if (startsWith(disk, "disk"))
{
auto disk_name = config.getString(config_prefix + "." + name);
auto disk_name = config.getString(config_prefix + "." + disk);
disks.push_back(disk_selector[disk_name]);
}
else
{
LOG_WARNING(logger, "Unused param " << config_prefix << '.' << disk);
}
}
if (disks.empty())
......@@ -113,8 +121,6 @@ Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const s
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
Logger * logger = &Logger::get("StorageConfiguration");
if (has_max_bytes)
{
max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes");
......@@ -144,12 +150,13 @@ Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const s
{
max_data_part_size = std::numeric_limits<UInt64>::max();
}
constexpr UInt64 SIZE_8MB = 8ull << 20u;
if (max_data_part_size < SIZE_8MB)
LOG_WARNING(logger, "Volume max_data_part_size is too low (" << max_data_part_size << " < " << SIZE_8MB << ")");
constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
if (max_data_part_size < MIN_PART_SIZE)
LOG_WARNING(logger, "Volume max_data_part_size is too low (" << formatReadableSizeWithBinarySuffix(max_data_part_size) <<
" < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")");
}
DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) const
DiskSpaceMonitor::ReservationPtr StoragePolicy::Volume::reserve(UInt64 expected_size) const
{
/// This volume can not store files which size greater than max_data_part_size
......@@ -163,19 +170,19 @@ DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) c
size_t index = (start_from + i) % disks_num;
auto reservation = DiskSpaceMonitor::tryToReserve(disks[index], expected_size);
if (reservation && *reservation)
if (reservation && reservation->isValid())
return reservation;
}
return {};
}
DiskSpaceMonitor::ReservationPtr Schema::Volume::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const
DiskSpaceMonitor::ReservationPtr StoragePolicy::Volume::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const
{
/// This volume can not store files which size greater than max_data_part_size
/// Reserve and Warn it
if (expected_size > max_data_part_size)
LOG_WARNING(&Logger::get("StorageSchema"), "Volume max_data_part_size limit exceed: " << expected_size);
LOG_WARNING(&Logger::get("StoragePolicy"), "Volume max_data_part_size limit exceed: " << expected_size);
size_t disks_num = disks.size();
for (size_t i = 0; i != disks_num; ++i)
......@@ -185,15 +192,15 @@ DiskSpaceMonitor::ReservationPtr Schema::Volume::reserveAtDisk(const DiskPtr & d
}
auto reservation = DiskSpaceMonitor::tryToReserve(disks[i], expected_size);
if (reservation && *reservation)
if (reservation && reservation->isValid())
return reservation;
return {};
}
LOG_DEBUG(&Logger::get("StorageSchema"), "Volume has no disk " << disk->getName());
LOG_DEBUG(&Logger::get("StoragePolicy"), "Volume has no disk " << disk->getName());
return {};
}
UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const
UInt64 StoragePolicy::Volume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
for (const auto & disk : disks)
......@@ -201,7 +208,7 @@ UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const
return res;
}
Schema::Schema(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
StoragePolicy::StoragePolicy(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const DiskSelector & disks) : name(name_)
{
Poco::Util::AbstractConfiguration::Keys keys;
......@@ -209,16 +216,29 @@ Schema::Schema(const String & name_, const Poco::Util::AbstractConfiguration & c
for (const auto & attr_name : keys)
{
if (!startsWith(name, "volume"))
if (!startsWith(attr_name, "volume"))
throw Exception("Unknown element in config: " + config_prefix + "." + attr_name + ", must be 'volume'",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
volumes.emplace_back(config, config_prefix + "." + attr_name, disks);
}
if (volumes.empty())
throw Exception("Schema must contain at least one Volume", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
throw Exception("StoragePolicy must contain at least one Volume", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
/// Check that disks are unique in Policy
std::set<String> disk_names;
for (const auto & volume : volumes)
{
for (const auto & disk : volume.disks)
{
if (disk_names.find(disk->getName()) != disk_names.end())
throw Exception("StoragePolicy disks must not be repeated: " + disk->getName(), ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
disk_names.insert(disk->getName());
}
}
}
Schema::Disks Schema::getDisks() const
StoragePolicy::Disks StoragePolicy::getDisks() const
{
Disks res;
for (const auto & volume : volumes)
......@@ -227,24 +247,24 @@ Schema::Disks Schema::getDisks() const
return res;
}
DiskPtr Schema::getAnyDisk() const
DiskPtr StoragePolicy::getAnyDisk() const
{
/// Schema must contain at least one Volume
/// StoragePolicy must contain at least one Volume
/// Volume must contain at least one Disk
if (volumes.empty())
{
LOG_ERROR(&Logger::get("StorageSchema"), "No volumes at schema " << name);
throw Exception("Schema has no Volumes. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE);
LOG_ERROR(&Logger::get("StoragePolicy"), "No volumes at StoragePolicy " << name);
throw Exception("StoragePolicy has no Volumes. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE);
}
if (volumes[0].disks.empty())
{
LOG_ERROR(&Logger::get("StorageSchema"), "No Disks at volume 0 at schema " << name);
throw Exception("Schema Volume 1 has no Disks. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE);
LOG_ERROR(&Logger::get("StoragePolicy"), "No Disks at volume 0 at StoragePolicy " << name);
throw Exception("StoragePolicy Volume 1 has no Disks. it's a bug", ErrorCodes::NOT_ENOUGH_SPACE);
}
return volumes[0].disks[0];
}
UInt64 Schema::getMaxUnreservedFreeSpace() const
UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
for (const auto & volume : volumes)
......@@ -252,7 +272,7 @@ UInt64 Schema::getMaxUnreservedFreeSpace() const
return res;
}
DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const
DiskSpaceMonitor::ReservationPtr StoragePolicy::reserve(UInt64 expected_size) const
{
for (const auto & volume : volumes)
{
......@@ -263,7 +283,7 @@ DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const
return {};
}
DiskSpaceMonitor::ReservationPtr Schema::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const
DiskSpaceMonitor::ReservationPtr StoragePolicy::reserveAtDisk(const DiskPtr & disk, UInt64 expected_size) const
{
for (const auto & volume : volumes)
{
......@@ -274,7 +294,7 @@ DiskSpaceMonitor::ReservationPtr Schema::reserveAtDisk(const DiskPtr & disk, UIn
return {};
}
DiskSpaceMonitor::ReservationPtr Schema::reserveOnMaxDiskWithoutReservation() const
DiskSpaceMonitor::ReservationPtr StoragePolicy::reserveOnMaxDiskWithoutReservation() const
{
UInt64 max_space = 0;
DiskPtr max_disk;
......@@ -293,35 +313,35 @@ DiskSpaceMonitor::ReservationPtr Schema::reserveOnMaxDiskWithoutReservation() co
return DiskSpaceMonitor::tryToReserve(max_disk, 0);
}
SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks)
StoragePolicySelector::StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
Logger * logger = &Logger::get("SchemaSelector");
Logger * logger = &Logger::get("StoragePolicySelector");
for (const auto & name : keys)
{
if (!std::all_of(name.begin(), name.end(), isWordCharASCII))
throw Exception("Schema name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
schemas.emplace(name, std::make_shared<Schema>(name, config, config_prefix + "." + name, disks));
LOG_INFO(logger, "Storage schema " << name << " loaded");
throw Exception("StoragePolicy name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
policies.emplace(name, std::make_shared<StoragePolicy>(name, config, config_prefix + "." + name, disks));
LOG_INFO(logger, "Storage policy " << name << " loaded");
}
constexpr auto default_schema_name = "default";
constexpr auto default_storage_policy_name = "default";
constexpr auto default_disk_name = "default";
if (schemas.find(default_schema_name) == schemas.end())
schemas.emplace(default_schema_name,
std::make_shared<Schema>(default_schema_name,
Schema::Volumes{{std::vector<DiskPtr>{disks[default_disk_name]},
if (policies.find(default_storage_policy_name) == policies.end())
policies.emplace(default_storage_policy_name,
std::make_shared<StoragePolicy>(default_storage_policy_name,
StoragePolicy::Volumes{{std::vector<DiskPtr>{disks[default_disk_name]},
std::numeric_limits<UInt64>::max()}}));
}
const SchemaPtr & SchemaSelector::operator[](const String & name) const
const StoragePolicyPtr & StoragePolicySelector::operator[](const String & name) const
{
auto it = schemas.find(name);
if (it == schemas.end())
throw Exception("Unknown schema " + name, ErrorCodes::UNKNOWN_SCHEMA);
auto it = policies.find(name);
if (it == policies.end())
throw Exception("Unknown StoragePolicy " + name, ErrorCodes::UNKNOWN_POLICY);
return it->second;
}
......
......@@ -26,7 +26,7 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_SPACE;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int UNKNOWN_SCHEMA;
extern const int UNKNOWN_POLICY;
extern const int UNKNOWN_DISK;
}
......@@ -197,7 +197,7 @@ public:
valid = true;
}
explicit operator bool() const noexcept {
bool isValid() const {
return valid;
}
......@@ -270,19 +270,21 @@ public:
void add(const DiskPtr & disk);
const auto & getDisksMap() const { return disks; }
private:
std::map<String, DiskPtr> disks;
};
class Schema
class StoragePolicy
{
public:
using Disks = std::vector<DiskPtr>;
class Volume
{
friend class Schema;
friend class StoragePolicy;
public:
Volume(std::vector<DiskPtr> disks_, UInt64 max_data_part_size_)
......@@ -319,19 +321,20 @@ public:
UInt64 getMaxUnreservedFreeSpace() const;
private:
UInt64 max_data_part_size = std::numeric_limits<decltype(max_data_part_size)>::max();
Disks disks;
private:
mutable std::atomic<size_t> last_used = 0;
};
using Volumes = std::vector<Volume>;
Schema(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
StoragePolicy(const String & name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const DiskSelector & disks);
Schema(const String & name_, Volumes volumes_) : volumes(std::move(volumes_)), name(name_) { }
StoragePolicy(const String & name_, Volumes volumes_) : volumes(std::move(volumes_)), name(name_) { }
Disks getDisks() const;
......@@ -339,6 +342,8 @@ public:
UInt64 getMaxUnreservedFreeSpace() const;
const String & getName() const { return name; }
/// Returns valid reservation or null
DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const;
......@@ -349,22 +354,26 @@ public:
/// Do not use this function when it is possible to predict size!!!
DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() const;
const auto & getVolumes() const { return volumes; }
private:
Volumes volumes;
String name;
};
using SchemaPtr = std::shared_ptr<const Schema>;
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;
class SchemaSelector
class StoragePolicySelector
{
public:
SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks);
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks);
const StoragePolicyPtr & operator[](const String & name) const;
const SchemaPtr & operator[](const String & name) const;
const auto & getPoliciesMap() const { return policies; }
private:
std::map<String, SchemaPtr> schemas;
std::map<String, StoragePolicyPtr> policies;
};
}
......@@ -114,7 +114,7 @@ MergeTreeData::MergeTreeData(
ttl_table_ast(ttl_table_ast_),
require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
schema(context_.getSchema(settings.storage_schema_name)),
storage_policy(context_.getStoragePolicy(settings.storage_policy_name)),
broken_part_callback(broken_part_callback_),
log_name(database_name + "." + table_name), log(&Logger::get(log_name)),
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
......@@ -187,7 +187,7 @@ MergeTreeData::MergeTreeData(
/// If not choose any
if (version_file_path.empty())
version_file_path = getFullPathOnDisk(schema->getAnyDisk()) + "format_version.txt";
version_file_path = getFullPathOnDisk(storage_policy->getAnyDisk()) + "format_version.txt";
bool version_file_exists = Poco::File(version_file_path).exists();
......@@ -766,7 +766,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
Poco::DirectoryIterator end;
auto disks = schema->getDisks();
auto disks = storage_policy->getDisks();
for (auto disk_ptr : disks)
{
......@@ -1123,7 +1123,7 @@ void MergeTreeData::rename(const String & /*new_path_to_db*/, const String & new
auto old_file_table_name = escapeForFileName(table_name);
auto new_file_table_name = escapeForFileName(new_table_name);
auto disks = schema->getDisks();
auto disks = storage_policy->getDisks();
for (const auto & disk : disks)
{
......@@ -2604,13 +2604,13 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size)
{
constexpr UInt64 SIZE_1MB = 1ull << 20u; ///@TODO_IGR ASK Is it OK?
constexpr UInt64 MAGIC_CONST = 1;
constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; ///@TODO_IGR ASK Is it OK?
constexpr UInt64 RESERVATION_MULTIPLY_ESTIMATION_FACTOR = 1;
if (expected_size < SIZE_1MB)
expected_size = SIZE_1MB;
if (expected_size < RESERVATION_MIN_ESTIMATION_SIZE)
expected_size = RESERVATION_MIN_ESTIMATION_SIZE;
auto reservation = reserveSpace(expected_size * MAGIC_CONST);
auto reservation = reserveSpace(expected_size * RESERVATION_MULTIPLY_ESTIMATION_FACTOR);
if (reservation)
return reservation;
......@@ -2820,12 +2820,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size)
{
return schema->reserve(expected_size);
return storage_policy->reserve(expected_size);
}
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(const DiskPtr & disk, UInt64 expected_size)
{
return schema->reserveAtDisk(disk, expected_size);
return storage_policy->reserveAtDisk(disk, expected_size);
}
String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
......@@ -2836,7 +2836,7 @@ String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const
Strings MergeTreeData::getDataPaths() const
{
Strings res;
auto disks = schema->getDisks();
auto disks = storage_policy->getDisks();
for (const auto & disk : disks)
res.push_back(getFullPathOnDisk(disk));
return res;
......
......@@ -313,7 +313,7 @@ public:
};
/// Attach the table corresponding to the directory in full_path inside schema (must end with /), with the given columns.
/// Attach the table corresponding to the directory in full_path inside policy (must end with /), with the given columns.
/// Correctness of names and paths is not checked.
///
/// date_column_name - if not empty, the name of the Date column used for partitioning by month.
......@@ -356,6 +356,8 @@ public:
Names getColumnsRequiredForSampling() const override { return columns_required_for_sampling; }
Names getColumnsRequiredForFinal() const override { return sorting_key_expr->getRequiredColumns(); }
StoragePolicyPtr getStoragePolicy() const override { return storage_policy; }
bool supportsPrewhere() const override { return true; }
bool supportsSampling() const override { return sample_by_ast != nullptr; }
......@@ -609,7 +611,7 @@ public:
/// Choose disk with max available free space
/// Reserves 0 bytes
DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() { return schema->reserveOnMaxDiskWithoutReservation(); }
DiskSpaceMonitor::ReservationPtr reserveOnMaxDiskWithoutReservation() { return storage_policy->reserveOnMaxDiskWithoutReservation(); }
MergeTreeDataFormatVersion format_version;
......@@ -687,7 +689,7 @@ protected:
String database_name;
String table_name;
SchemaPtr schema;
StoragePolicyPtr storage_policy;
/// Current column sizes in compressed and uncompressed form.
ColumnSizeByName column_sizes;
......
......@@ -152,7 +152,7 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_
data.settings.max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge);
return std::min(max_size, static_cast<UInt64>(data.schema->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT));
return std::min(max_size, static_cast<UInt64>(data.storage_policy->getMaxUnreservedFreeSpace() / DISK_USAGE_COEFFICIENT_TO_SELECT));
}
......
......@@ -79,7 +79,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
M(SettingUInt64, index_granularity_bytes, 0, "Approximate amount of bytes in single granule (0 - disabled).") \
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
M(SettingString, storage_schema_name, "default", "Name of storage schema")
M(SettingString, storage_policy_name, "default", "Name of storage policy")
DECLARE_SETTINGS_COLLECTION(LIST_OF_MERGE_TREE_SETTINGS)
......
......@@ -336,7 +336,7 @@ public:
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
/// Choose any disk.
auto disk = schema->getAnyDisk();
auto disk = storage_policy->getAnyDisk();
MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get());
String file_name;
{
......@@ -489,7 +489,7 @@ bool StorageMergeTree::merge(
}
else
{
UInt64 disk_space = schema->getMaxUnreservedFreeSpace();
UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}
......@@ -580,7 +580,7 @@ bool StorageMergeTree::tryMutatePart()
std::optional<CurrentlyMergingPartsTagger> tagger;
{
/// DataPArt can be store only at one disk. Get Max of free space at all disks
UInt64 disk_space = schema->getMaxUnreservedFreeSpace();
UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
std::lock_guard lock(currently_merging_mutex);
......@@ -975,7 +975,7 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts(format_version);
const auto disks = schema->getDisks();
const auto disks = storage_policy->getDisks();
for (const DiskPtr & disk : disks)
{
const auto full_path = getFullPathOnDisk(disk);
......
......@@ -3002,7 +3002,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
for (const DataPartPtr & part : data_parts)
partition_ids.emplace(part->info.partition_id);
UInt64 disk_space = schema->getMaxUnreservedFreeSpace();
UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
for (const String & partition_id : partition_ids)
{
......@@ -3029,7 +3029,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p
}
else
{
UInt64 disk_space = schema->getMaxUnreservedFreeSpace();
UInt64 disk_space = storage_policy->getMaxUnreservedFreeSpace();
String partition_id = getPartitionIDFromQuery(partition, query_context);
selected = merger_mutator.selectAllPartsToMergeWithinPartition(
future_merged_part, disk_space, can_merge, partition_id, final, &disable_reason);
......@@ -3555,7 +3555,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
ActiveDataPartSet active_parts(format_version);
std::set<String> part_names;
const auto disks = schema->getDisks();
const auto disks = storage_policy->getDisks();
for (const DiskPtr & disk : disks)
{
const auto full_path = getFullPathOnDisk(disk);
......
......@@ -11,5 +11,5 @@ configure_file (StorageSystemBuildOptions.generated.cpp.in ${CONFIG_BUILD})
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(storages_system .)
list (APPEND storages_system_sources ${CONFIG_BUILD})
add_library(clickhouse_storages_system ${storages_system_headers} ${storages_system_sources})
add_library(clickhouse_storages_system ${storages_system_headers} ${storages_system_sources} StorageSystemDisks.cpp StorageSystemStoragePolicies.cpp StorageSystemStoragePolicies.h)
target_link_libraries(clickhouse_storages_system PRIVATE dbms common string_utils clickhouse_common_zookeeper)
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemDisks.h>
namespace DB
{
namespace ErrorCodes
{
}
StorageSystemDisks::StorageSystemDisks(const std::string & name_)
: name(name_)
{
setColumns(ColumnsDescription(
{
{"name", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"free_space", std::make_shared<DataTypeUInt64>()},
{"total_space", std::make_shared<DataTypeUInt64>()},
{"keep_free_space", std::make_shared<DataTypeUInt64>()},
}));
}
BlockInputStreams StorageSystemDisks::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
MutableColumnPtr col_name_mut = ColumnString::create();
MutableColumnPtr col_path_mut = ColumnString::create();
MutableColumnPtr col_free_mut = ColumnUInt64::create();
MutableColumnPtr col_total_mut = ColumnUInt64::create();
MutableColumnPtr col_keep_mut = ColumnUInt64::create();
const auto & disk_selector = context.getDiskSelector();
for (const auto & [name, disk_ptr] : disk_selector.getDisksMap())
{
col_name_mut->insert(name);
col_path_mut->insert(disk_ptr->getPath());
col_free_mut->insert(disk_ptr->getAvailableSpace());
col_total_mut->insert(disk_ptr->getTotalSpace());
col_keep_mut->insert(disk_ptr->getKeepingFreeSpace());
}
ColumnPtr col_name = std::move(col_name_mut);
ColumnPtr col_path = std::move(col_path_mut);
ColumnPtr col_free = std::move(col_free_mut);
ColumnPtr col_total = std::move(col_total_mut);
ColumnPtr col_keep = std::move(col_keep_mut);
Block res = getSampleBlock().cloneEmpty();
size_t col_num = 0;
res.getByPosition(col_num++).column = col_name;
res.getByPosition(col_num++).column = col_path;
res.getByPosition(col_num++).column = col_free;
res.getByPosition(col_num++).column = col_total;
res.getByPosition(col_num++).column = col_keep;
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res));
}
}
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Formats/FormatSettings.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class Context;
/** Implements the system table `disks`, which allows you to get information about all disks.
*/
class StorageSystemDisks : public ext::shared_ptr_helper<StorageSystemDisks>, public IStorage
{
public:
std::string getName() const override { return "SystemDisks"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
protected:
StorageSystemDisks(const std::string & name_);
};
}
......@@ -42,6 +42,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name)
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"disk_name", std::make_shared<DataTypeString>()}, ///@TODO_IGR ASK is is OK?
{"path", std::make_shared<DataTypeString>()},
}
)
......@@ -95,6 +96,7 @@ void StorageSystemParts::processNextStorage(MutableColumns & columns, const Stor
columns[i++]->insert(info.database);
columns[i++]->insert(info.table);
columns[i++]->insert(info.engine);
columns[i++]->insert(part->disk->getName());
columns[i++]->insert(part->getFullPath());
if (has_state_column)
......
......@@ -42,6 +42,7 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name)
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"disk_name", std::make_shared<DataTypeString>()}, ///@TODO_IGR ASK is is OK?
{"path", std::make_shared<DataTypeString>()},
{"column", std::make_shared<DataTypeString>()},
......@@ -130,7 +131,9 @@ void StorageSystemPartsColumns::processNextStorage(MutableColumns & columns, con
columns[j++]->insert(info.database);
columns[j++]->insert(info.table);
columns[j++]->insert(info.engine);
columns[j++]->insert(part->disk->getName());
columns[j++]->insert(part->getFullPath());
columns[j++]->insert(column.name);
columns[j++]->insert(column.type->getName());
......
#include <Columns/ColumnArray.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemStoragePolicies.h>
#include <DataTypes/DataTypeArray.h>
namespace DB
{
namespace ErrorCodes
{
}
StorageSystemStoragePolicies::StorageSystemStoragePolicies(const std::string & name_)
: name(name_)
{
setColumns(ColumnsDescription(
{
{"name", std::make_shared<DataTypeString>()},
{"volume_priority", std::make_shared<DataTypeUInt64>()},
{"disks", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"max_data_part_size", std::make_shared<DataTypeUInt64>()},
}));
}
BlockInputStreams StorageSystemStoragePolicies::read(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
MutableColumnPtr col_name_mut = ColumnString::create();
MutableColumnPtr col_priority_mut = ColumnUInt64::create();
MutableColumnPtr col_disks_mut = ColumnArray::create(ColumnString::create());
MutableColumnPtr col_max_part_size_mut = ColumnUInt64::create();
const auto & policy_selector = context.getStoragePolicySelector();
for (const auto & [name, policy_ptr] : policy_selector.getPoliciesMap())
{
const auto & volumes = policy_ptr->getVolumes();
for (size_t i = 0; i != volumes.size(); ++i)
{
col_name_mut->insert(name);
col_priority_mut->insert(i);
Array disks;
disks.reserve(volumes[i].disks.size());
for (const auto & disk_ptr : volumes[i].disks)
disks.push_back(disk_ptr->getName());
col_disks_mut->insert(disks);
col_max_part_size_mut->insert(volumes[i].max_data_part_size);
}
}
ColumnPtr col_name = std::move(col_name_mut);
ColumnPtr col_priority = std::move(col_priority_mut);
ColumnPtr col_disks = std::move(col_disks_mut);
ColumnPtr col_max_part_size = std::move(col_max_part_size_mut);
Block res = getSampleBlock().cloneEmpty();
size_t col_num = 0;
res.getByPosition(col_num++).column = col_name;
res.getByPosition(col_num++).column = col_priority;
res.getByPosition(col_num++).column = col_disks;
res.getByPosition(col_num++).column = col_max_part_size;
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res));
}
}
#pragma once
#include <ext/shared_ptr_helper.h>
#include <Formats/FormatSettings.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
class Context;
/** Implements the system table `storage`, which allows you to get information about all disks.
*/
class StorageSystemStoragePolicies : public ext::shared_ptr_helper<StorageSystemStoragePolicies>, public IStorage
{
public:
std::string getName() const override { return "SystemStoragePolicies"; }
std::string getTableName() const override { return name; }
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override;
private:
const std::string name;
protected:
StorageSystemStoragePolicies(const std::string & name_);
};
}
......@@ -45,6 +45,7 @@ StorageSystemTables::StorageSystemTables(const std::string & name_)
{"sorting_key", std::make_shared<DataTypeString>()},
{"primary_key", std::make_shared<DataTypeString>()},
{"sampling_key", std::make_shared<DataTypeString>()},
{"storage_policy", std::make_shared<DataTypeString>()},
}));
}
......@@ -161,6 +162,9 @@ protected:
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
res_columns[res_index++]->insertDefault();
}
}
......@@ -308,6 +312,15 @@ protected:
else
res_columns[res_index++]->insertDefault();
}
if (columns_mask[src_index++])
{
auto policy = table->getStoragePolicy();
if (policy)
res_columns[res_index++]->insert(policy->getName());
else
res_columns[res_index++]->insertDefault();
}
}
}
......
......@@ -33,6 +33,8 @@
#include <Storages/System/StorageSystemTables.h>
#include <Storages/System/StorageSystemZooKeeper.h>
#include <Storages/System/StorageSystemContributors.h>
#include <Storages/System/StorageSystemDisks.h>
#include <Storages/System/StorageSystemStoragePolicies.h>
namespace DB
......@@ -65,6 +67,8 @@ void attachSystemTablesServer(IDatabase & system_database, bool has_zookeeper)
attachSystemTablesLocal(system_database);
system_database.attachTable("parts", StorageSystemParts::create("parts"));
system_database.attachTable("parts_columns", StorageSystemPartsColumns::create("parts_columns"));
system_database.attachTable("disks", StorageSystemDisks::create("disks"));
system_database.attachTable("storage_policies", StorageSystemStoragePolicies::create("storage_policies"));
system_database.attachTable("processes", StorageSystemProcesses::create("processes"));
system_database.attachTable("metrics", StorageSystemMetrics::create("metrics"));
system_database.attachTable("merges", StorageSystemMerges::create("merges"));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册