提交 724b7be5 编写于 作者: A alesapin

Merge branch 'volumes-related-refactorings' of...

Merge branch 'volumes-related-refactorings' of https://github.com/excitoon-favorites/ClickHouse into volumes-refactorings
......@@ -106,7 +106,7 @@ void ClusterCopierApp::mainImpl()
context->setConfig(loaded_config.configuration);
context->setApplicationType(Context::ApplicationType::LOCAL);
context->setPath(process_path);
context->setPath(process_path + "/");
registerFunctions();
registerAggregateFunctions();
......
......@@ -497,6 +497,9 @@ namespace ErrorCodes
extern const int CANNOT_CONNECT_RABBITMQ = 530;
extern const int CANNOT_FSTAT = 531;
extern const int LDAP_ERROR = 532;
extern const int INCONSISTENT_RESERVATIONS = 533;
extern const int NO_RESERVATIONS_PROVIDED = 534;
extern const int UNKNOWN_RAID_TYPE = 535;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -25,6 +25,7 @@ using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
using Reservations = std::vector<ReservationPtr>;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
......
......@@ -10,6 +10,25 @@ namespace DB
namespace ErrorCodes
{
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int INCONSISTENT_RESERVATIONS;
extern const int NO_RESERVATIONS_PROVIDED;
extern const int UNKNOWN_VOLUME_TYPE;
}
String volumeTypeToString(VolumeType type)
{
switch (type)
{
case VolumeType::JBOD:
return "JBOD";
case VolumeType::RAID1:
return "RAID1";
case VolumeType::SINGLE_DISK:
return "SINGLE_DISK";
case VolumeType::UNKNOWN:
return "UNKNOWN";
}
throw Exception("Unknown volume type, please add it to DB::volumeTypeToString", ErrorCodes::UNKNOWN_VOLUME_TYPE);
}
IVolume::IVolume(
......@@ -40,4 +59,43 @@ UInt64 IVolume::getMaxUnreservedFreeSpace() const
return res;
}
MultiDiskReservation::MultiDiskReservation(Reservations & reservations_, UInt64 size_)
: reservations(std::move(reservations_))
, size(size_)
{
if (reservations.empty())
{
throw Exception("At least one reservation must be provided to MultiDiskReservation", ErrorCodes::NO_RESERVATIONS_PROVIDED);
}
for (auto & reservation : reservations)
{
if (reservation->getSize() != size_)
{
throw Exception("Reservations must have same size", ErrorCodes::INCONSISTENT_RESERVATIONS);
}
}
}
Disks MultiDiskReservation::getDisks() const
{
Disks res;
res.reserve(reservations.size());
for (const auto & reservation : reservations)
{
res.push_back(reservation->getDisk());
}
return res;
}
void MultiDiskReservation::update(UInt64 new_size)
{
for (auto & reservation : reservations)
{
reservation->update(new_size);
}
size = new_size;
}
}
......@@ -11,10 +11,13 @@ namespace DB
enum class VolumeType
{
JBOD,
RAID1,
SINGLE_DISK,
UNKNOWN
};
String volumeTypeToString(VolumeType t);
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
using Volumes = std::vector<VolumePtr>;
......@@ -61,4 +64,21 @@ protected:
const String name;
};
class MultiDiskReservation : public IReservation
{
public:
MultiDiskReservation(Reservations & reservations, UInt64 size);
UInt64 getSize() const override { return size; }
DiskPtr getDisk(size_t i) const override { return reservations[i]->getDisk(); }
Disks getDisks() const override;
void update(UInt64 new_size) override;
private:
Reservations reservations;
UInt64 size;
};
}
......@@ -22,5 +22,6 @@ public:
};
using VolumeSingleDiskPtr = std::shared_ptr<SingleDiskVolume>;
using VolumesSingleDiskPtr = std::vector<VolumeSingleDiskPtr>;
}
......@@ -71,7 +71,7 @@ StoragePolicy::StoragePolicy(
}
StoragePolicy::StoragePolicy(String name_, VolumesJBOD volumes_, double move_factor_)
StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_)
: volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
{
if (volumes.empty())
......@@ -257,7 +257,7 @@ StoragePolicySelector::StoragePolicySelector(
{
auto default_volume = std::make_shared<VolumeJBOD>(default_volume_name, std::vector<DiskPtr>{disks->get(default_disk_name)}, 0);
auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, VolumesJBOD{default_volume}, 0.0);
auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, Volumes{default_volume}, 0.0);
policies.emplace(default_storage_policy_name, default_policy);
}
}
......
......@@ -4,6 +4,7 @@
#include <Disks/IDisk.h>
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/VolumeRAID1.h>
#include <Disks/SingleDiskVolume.h>
#include <IO/WriteHelpers.h>
#include <Common/CurrentMetrics.h>
......@@ -33,7 +34,7 @@ class StoragePolicy
public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);
StoragePolicy(String name_, VolumesJBOD volumes_, double move_factor_);
StoragePolicy(String name_, Volumes volumes_, double move_factor_);
bool isDefaultPolicy() const;
......@@ -65,16 +66,16 @@ public:
/// Do not use this function when it is possible to predict size.
ReservationPtr makeEmptyReservationOnLargestDisk() const;
const VolumesJBOD & getVolumes() const { return volumes; }
const Volumes & getVolumes() const { return volumes; }
/// Returns number [0., 1.] -- fraction of free space on disk
/// which should be kept with help of background moves
double getMoveFactor() const { return move_factor; }
/// Get volume by index from storage_policy
VolumeJBODPtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumeJBODPtr()); }
VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); }
VolumeJBODPtr getVolumeByName(const String & volume_name) const
VolumePtr getVolumeByName(const String & volume_name) const
{
auto it = volumes_names.find(volume_name);
if (it == volumes_names.end())
......@@ -86,7 +87,7 @@ public:
void checkCompatibleWith(const StoragePolicyPtr & new_storage_policy) const;
private:
VolumesJBOD volumes;
Volumes volumes;
const String name;
std::map<String, size_t> volumes_names;
......
......@@ -64,7 +64,8 @@ DiskPtr VolumeJBOD::getNextDisk()
ReservationPtr VolumeJBOD::reserve(UInt64 bytes)
{
/// This volume can not store files which size greater than max_data_part_size
/// This volume can not store data which size is greater than `max_data_part_size`
/// to ensure that parts of size greater than that go to another volume(s).
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
......
......@@ -32,6 +32,7 @@ public:
/// - Used with policy for temporary data
/// - Ignores all limitations
/// - Shares last access with reserve()
// TODO: Remove getNextDisk, move it's behaviour to VolumeJBOD::getDisk(), if it was called without argument.
DiskPtr getNextDisk();
/// Uses Round-robin to choose disk for reservation.
......
#include "VolumeRAID1.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
namespace DB
{
ReservationPtr VolumeRAID1::reserve(UInt64 bytes)
{
/// This volume can not store data which size is greater than `max_data_part_size`
/// to ensure that parts of size greater than that go to another volume(s).
if (max_data_part_size != 0 && bytes > max_data_part_size)
return {};
Reservations res(disks.size());
for (size_t i = 0; i < disks.size(); ++i)
{
res[i] = disks[i]->reserve(bytes);
if (!res[i])
return {};
}
return std::make_unique<MultiDiskReservation>(res, bytes);
}
}
#pragma once
#include <Disks/createVolume.h>
#include <Disks/VolumeJBOD.h>
namespace DB
{
class VolumeRAID1 : public VolumeJBOD
{
public:
VolumeRAID1(String name_, Disks disks_, UInt64 max_data_part_size_)
: VolumeJBOD(name_, disks_, max_data_part_size_)
{
}
VolumeRAID1(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector
) : VolumeJBOD(name_, config, config_prefix, disk_selector)
{
}
VolumeType getType() const override { return VolumeType::RAID1; }
ReservationPtr reserve(UInt64 bytes) override;
};
using VolumeRAID1Ptr = std::shared_ptr<VolumeRAID1>;
}
#include "createVolume.h"
#include <Disks/SingleDiskVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/VolumeRAID1.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_RAID_TYPE;
}
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume)
{
if (other_volume->getType() == VolumeType::JBOD || other_volume->getType() == VolumeType::SINGLE_DISK)
{
/// Since reservation on JBOD choices one of disks and makes reservation there, volume
/// Since reservation on JBOD chooses one of disks and makes reservation there, volume
/// for such type of reservation will be with one disk.
return std::make_shared<SingleDiskVolume>(other_volume->getName(), reservation->getDisk());
}
if (other_volume->getType() == VolumeType::RAID1)
{
auto volume = std::dynamic_pointer_cast<VolumeRAID1>(other_volume);
return std::make_shared<VolumeRAID1>(volume->getName(), reservation->getDisks(), volume->max_data_part_size);
}
return nullptr;
}
VolumePtr createVolumeFromConfig(
String name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector
)
{
auto has_raid_type = config.has(config_prefix + ".raid_type");
if (!has_raid_type)
{
return std::make_shared<VolumeJBOD>(name, config, config_prefix, disk_selector);
}
String raid_type = config.getString(config_prefix + ".raid_type");
if (raid_type == "JBOD")
{
return std::make_shared<VolumeJBOD>(name, config, config_prefix, disk_selector);
}
throw Exception("Unknown raid type '" + raid_type + "'", ErrorCodes::UNKNOWN_RAID_TYPE);
}
}
#pragma once
#include <Disks/IVolume.h>
#include <Disks/VolumeJBOD.h>
#include <Disks/SingleDiskVolume.h>
namespace DB
{
VolumePtr createVolumeFromReservation(const ReservationPtr & reservation, VolumePtr other_volume);
VolumePtr createVolumeFromConfig(
String name_,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
DiskSelectorPtr disk_selector
);
}
......@@ -16,6 +16,7 @@ SRCS(
SingleDiskVolume.cpp
StoragePolicy.cpp
VolumeJBOD.cpp
VolumeRAID1.cpp
)
END()
......@@ -765,7 +765,17 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
&& worth_convert_to_two_level)
{
size_t size = current_memory_usage + params.min_free_disk_space;
const std::string tmp_path = params.tmp_volume->getNextDisk()->getPath();
std::string tmp_path;
VolumeJBODPtr vol;
if ((vol = dynamic_pointer_cast<VolumeJBOD>(params.tmp_volume)) != nullptr)
{
tmp_path = vol->getNextDisk()->getPath();
}
else
{
tmp_path = params.tmp_volume->getDisk()->getPath();
}
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
......@@ -851,9 +861,21 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
ReadableSize(uncompressed_bytes / elapsed_seconds),
ReadableSize(compressed_bytes / elapsed_seconds));
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
{
return writeToTemporaryFile(data_variants, params.tmp_volume->getNextDisk()->getPath());
String tmp_path;
auto volume_jbod = std::dynamic_pointer_cast<VolumeJBOD>(params.tmp_volume);
if (volume_jbod)
{
tmp_path = volume_jbod->getNextDisk()->getPath();
}
else
{
tmp_path = params.tmp_volume->getDisk()->getPath();
}
return writeToTemporaryFile(data_variants, tmp_path);
}
......
......@@ -24,6 +24,8 @@
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <Disks/SingleDiskVolume.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/AggregationCommon.h>
......@@ -45,9 +47,6 @@ namespace ErrorCodes
class IBlockOutputStream;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
* Data and pool ownership (states of aggregate functions)
......@@ -878,7 +877,7 @@ public:
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
VolumeJBODPtr tmp_volume;
VolumePtr tmp_volume;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
......@@ -891,7 +890,7 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumeJBODPtr tmp_volume_, size_t max_threads_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
......
......@@ -319,7 +319,7 @@ struct ContextShared
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumeJBODPtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
......@@ -547,7 +547,7 @@ String Context::getDictionariesLibPath() const
return shared->dictionaries_lib_path;
}
VolumeJBODPtr Context::getTemporaryVolume() const
VolumePtr Context::getTemporaryVolume() const
{
auto lock = getLock();
return shared->tmp_volume;
......@@ -572,7 +572,7 @@ void Context::setPath(const String & path)
shared->dictionaries_lib_path = shared->path + "dictionaries_lib/";
}
VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & policy_name)
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
{
std::lock_guard lock(shared->storage_policies_mutex);
......@@ -583,7 +583,7 @@ VolumeJBODPtr Context::setTemporaryStorage(const String & path, const String & p
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<VolumeJBOD>("_tmp_default", std::vector<DiskPtr>{disk}, 0);
shared->tmp_volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk);
}
else
{
......
......@@ -108,8 +108,8 @@ using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
struct NamedSession;
......@@ -227,14 +227,14 @@ public:
String getUserFilesPath() const;
String getDictionariesLibPath() const;
VolumeJBODPtr getTemporaryVolume() const;
VolumePtr getTemporaryVolume() const;
void setPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);
void setDictionariesLibPath(const String & path);
VolumeJBODPtr setTemporaryStorage(const String & path, const String & policy_name = "");
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
......
......@@ -203,7 +203,7 @@ BlockInputStreamPtr SortedBlocksWriter::streamFromFile(const TmpFilePtr & file)
String SortedBlocksWriter::getPath() const
{
return volume->getNextDisk()->getPath();
return volume->getDisk()->getPath();
}
......
......@@ -16,8 +16,8 @@ class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
struct SortedBlocksWriter
{
......@@ -63,7 +63,7 @@ struct SortedBlocksWriter
std::mutex insert_mutex;
std::condition_variable flush_condvar;
const SizeLimits & size_limits;
VolumeJBODPtr volume;
VolumePtr volume;
Block sample_block;
const SortDescription & sort_description;
Blocks inserted_blocks;
......@@ -76,7 +76,7 @@ struct SortedBlocksWriter
size_t flush_number = 0;
size_t flush_inflight = 0;
SortedBlocksWriter(const SizeLimits & size_limits_, VolumeJBODPtr volume_, const Block & sample_block_,
SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_,
const SortDescription & description, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
: size_limits(size_limits_)
, volume(volume_)
......
......@@ -14,7 +14,7 @@
namespace DB
{
TableJoin::TableJoin(const Settings & settings, VolumeJBODPtr tmp_volume_)
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)
......
......@@ -24,8 +24,8 @@ class DictionaryReader;
struct Settings;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
class TableJoin
{
......@@ -71,11 +71,11 @@ class TableJoin
/// Original name -> name. Only ranamed columns.
std::unordered_map<String, String> renames;
VolumeJBODPtr tmp_volume;
VolumePtr tmp_volume;
public:
TableJoin() = default;
TableJoin(const Settings &, VolumeJBODPtr tmp_volume);
TableJoin(const Settings &, VolumePtr tmp_volume);
/// for StorageJoin
TableJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness,
......@@ -97,7 +97,7 @@ public:
ASTTableJoin::Strictness strictness() const { return table_join.strictness; }
bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const;
const SizeLimits & sizeLimits() const { return size_limits; }
VolumeJBODPtr getTemporaryVolume() { return tmp_volume; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
bool allowMergeJoin() const;
bool allowDictJoin(const String & dict_key, const Block & sample_block, Names &, NamesAndTypesList &) const;
bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; }
......
......@@ -366,7 +366,7 @@ void HTTPHandler::processQuery(
if (buffer_until_eof)
{
const std::string tmp_path(context.getTemporaryVolume()->getNextDisk()->getPath());
const std::string tmp_path(context.getTemporaryVolume()->getDisk()->getPath());
const std::string tmp_path_template(tmp_path + "http_buffers/");
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
......
......@@ -561,7 +561,9 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
/// and keep monitor thread out from reading incomplete data
std::string first_file_tmp_path{};
const auto & [disk, data_path] = storage.getPath();
auto reservation = storage.getStoragePolicy()->reserve(block.bytes());
auto disk = reservation->getDisk()->getPath();
auto data_path = storage.getRelativeDataPath();
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
......
......@@ -290,7 +290,7 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_)
: IStorage(id_)
......@@ -300,7 +300,6 @@ StorageDistributed::StorageDistributed(
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
, cluster_name(global_context->getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, storage_policy(storage_policy_)
, relative_data_path(relative_data_path_)
{
StorageInMemoryMetadata storage_metadata;
......@@ -316,7 +315,11 @@ StorageDistributed::StorageDistributed(
}
if (!relative_data_path.empty())
createStorage();
{
storage_policy = global_context->getStoragePolicy(storage_policy_name_);
if (storage_policy->getVolumes().size() != 1)
throw Exception("Storage policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
}
/// Sanity check. Skip check if the table is already created to allow the server to start.
if (!attach_ && !cluster_name.empty())
......@@ -336,34 +339,14 @@ StorageDistributed::StorageDistributed(
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_, relative_data_path_, attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach)
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
void StorageDistributed::createStorage()
{
/// Create default policy with the relative_data_path_
if (storage_policy.empty())
{
std::string path(global_context->getPath());
/// Disk must ends with '/'
if (!path.ends_with('/'))
path += '/';
auto disk = std::make_shared<DiskLocal>("default", path, 0);
volume = std::make_shared<VolumeJBOD>("default", std::vector<DiskPtr>{disk}, 0);
}
else
{
auto policy = global_context->getStoragePolicy(storage_policy);
if (policy->getVolumes().size() != 1)
throw Exception("Policy for Distributed table, should have exactly one volume", ErrorCodes::BAD_ARGUMENTS);
volume = policy->getVolume(0);
}
}
StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
......@@ -539,7 +522,7 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
const auto & settings = context.getSettingsRef();
/// Ban an attempt to make async insert into the table belonging to DatabaseMemory
if (!volume && !owned_cluster && !settings.insert_distributed_sync)
if (!storage_policy && !owned_cluster && !settings.insert_distributed_sync)
{
throw Exception("Storage " + getName() + " must has own data directory to enable asynchronous inserts",
ErrorCodes::BAD_ARGUMENTS);
......@@ -595,10 +578,10 @@ void StorageDistributed::startup()
if (remote_database.empty() && !remote_table_function_ptr)
LOG_WARNING(log, "Name of remote database is empty. Default database will be used implicitly.");
if (!volume)
if (!storage_policy)
return;
for (const DiskPtr & disk : volume->getDisks())
for (const DiskPtr & disk : storage_policy->getDisks())
createDirectoryMonitors(disk->getPath());
for (const String & path : getDataPaths())
......@@ -632,7 +615,7 @@ void StorageDistributed::drop()
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
auto disks = volume->getDisks();
auto disks = storage_policy->getDisks();
for (const auto & disk : disks)
disk->removeRecursive(relative_data_path);
......@@ -646,7 +629,7 @@ Strings StorageDistributed::getDataPaths() const
if (relative_data_path.empty())
return paths;
for (const DiskPtr & disk : volume->getDisks())
for (const DiskPtr & disk : storage_policy->getDisks())
paths.push_back(disk->getPath() + relative_data_path);
return paths;
......@@ -669,9 +652,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co
StoragePolicyPtr StorageDistributed::getStoragePolicy() const
{
if (storage_policy.empty())
return {};
return global_context->getStoragePolicy(storage_policy);
return storage_policy;
}
void StorageDistributed::createDirectoryMonitors(const std::string & disk)
......@@ -718,11 +699,6 @@ size_t StorageDistributed::getShardCount() const
return getCluster()->getShardCount();
}
std::pair<const std::string &, const std::string &> StorageDistributed::getPath()
{
return {volume->getNextDisk()->getPath(), relative_data_path};
}
ClusterPtr StorageDistributed::getCluster() const
{
return owned_cluster ? owned_cluster : global_context->getCluster(cluster_name);
......@@ -868,9 +844,11 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Sto
renameOnDisk(new_path_to_table_data);
renameInMemory(new_table_id);
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : volume->getDisks())
for (const DiskPtr & disk : storage_policy->getDisks())
{
const String path(disk->getPath());
auto new_path = path + new_path_to_table_data;
......@@ -925,7 +903,7 @@ void registerStorageDistributed(StorageFactory & factory)
String remote_table = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
const auto & sharding_key = engine_args.size() >= 4 ? engine_args[3] : nullptr;
const auto & storage_policy = engine_args.size() >= 5 ? engine_args[4]->as<ASTLiteral &>().value.safeGet<String>() : "";
const auto & storage_policy = engine_args.size() >= 5 ? engine_args[4]->as<ASTLiteral &>().value.safeGet<String>() : "default";
/// Check that sharding_key exists in the table and has numeric type.
if (sharding_key)
......
......@@ -18,8 +18,8 @@ namespace DB
struct Settings;
class Context;
class VolumeJBOD;
using VolumeJBODPtr = std::shared_ptr<VolumeJBOD>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
......@@ -102,7 +102,7 @@ public:
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
size_t getShardCount() const;
std::pair<const std::string &, const std::string &> getPath();
const String & getRelativeDataPath() const { return relative_data_path; }
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
std::string getClusterName() const { return cluster_name; } /// Returns empty string if tables is used by TableFunctionRemote
......@@ -163,7 +163,7 @@ protected:
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_);
......@@ -175,16 +175,14 @@ protected:
const String & cluster_name_,
const Context & context_,
const ASTPtr & sharding_key_,
const String & storage_policy_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach);
void createStorage();
String storage_policy;
String relative_data_path;
/// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created.
VolumeJBODPtr volume;
StoragePolicyPtr storage_policy;
struct ClusterNodeData
{
......
#include <Storages/System/StorageSystemStoragePolicies.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemStoragePolicies.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Interpreters/Context.h>
......@@ -24,9 +27,11 @@ StorageSystemStoragePolicies::StorageSystemStoragePolicies(const std::string & n
{"volume_name", std::make_shared<DataTypeString>()},
{"volume_priority", std::make_shared<DataTypeUInt64>()},
{"disks", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"max_data_part_size", std::make_shared<DataTypeUInt64>()},
{"move_factor", std::make_shared<DataTypeFloat32>()}
{"volume_type", std::make_shared<DataTypeString>()},
{"max_data_part_size", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
{"move_factor", std::make_shared<DataTypeFloat32>()},
}));
// TODO: Add string column with custom volume-type-specific options
setInMemoryMetadata(storage_metadata);
}
......@@ -45,7 +50,8 @@ Pipes StorageSystemStoragePolicies::read(
MutableColumnPtr col_volume_name = ColumnString::create();
MutableColumnPtr col_priority = ColumnUInt64::create();
MutableColumnPtr col_disks = ColumnArray::create(ColumnString::create());
MutableColumnPtr col_max_part_size = ColumnUInt64::create();
MutableColumnPtr col_volume_type = ColumnString::create();
MutableColumnPtr col_max_part_size = ColumnNullable::create(ColumnUInt64::create(), ColumnUInt8::create());
MutableColumnPtr col_move_factor = ColumnFloat32::create();
for (const auto & [policy_name, policy_ptr] : context.getPoliciesMap())
......@@ -61,7 +67,12 @@ Pipes StorageSystemStoragePolicies::read(
for (const auto & disk_ptr : volumes[i]->getDisks())
disks.push_back(disk_ptr->getName());
col_disks->insert(disks);
col_max_part_size->insert(volumes[i]->max_data_part_size);
col_volume_type->insert(volumeTypeToString(volumes[i]->getType()));
auto volume_jbod = std::dynamic_pointer_cast<VolumeJBOD>(volumes[i]);
if (volume_jbod)
col_max_part_size->insert(volume_jbod->max_data_part_size);
else
col_max_part_size->insert(Null{});
col_move_factor->insert(policy_ptr->getMoveFactor());
}
}
......@@ -71,6 +82,7 @@ Pipes StorageSystemStoragePolicies::read(
res_columns.emplace_back(std::move(col_volume_name));
res_columns.emplace_back(std::move(col_priority));
res_columns.emplace_back(std::move(col_disks));
res_columns.emplace_back(std::move(col_volume_type));
res_columns.emplace_back(std::move(col_max_part_size));
res_columns.emplace_back(std::move(col_move_factor));
......
......@@ -73,6 +73,7 @@ def test_system_tables(start_cluster):
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
},
......@@ -81,6 +82,7 @@ def test_system_tables(start_cluster):
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
},
......@@ -89,6 +91,7 @@ def test_system_tables(start_cluster):
"volume_name": "m",
"volume_priority": "1",
"disks": ["jbod1"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
},
......@@ -97,6 +100,7 @@ def test_system_tables(start_cluster):
"volume_name": "e",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
},
......@@ -105,6 +109,7 @@ def test_system_tables(start_cluster):
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1", "jbod2"],
"volume_type": "JBOD",
"max_data_part_size": "10485760",
"move_factor": 0.1,
},
......@@ -113,6 +118,7 @@ def test_system_tables(start_cluster):
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
},
......@@ -121,6 +127,7 @@ def test_system_tables(start_cluster):
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.7,
},
......@@ -129,6 +136,7 @@ def test_system_tables(start_cluster):
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.7,
},
......@@ -137,6 +145,7 @@ def test_system_tables(start_cluster):
"volume_name": "small",
"volume_priority": "1",
"disks": ["default"],
"volume_type": "JBOD",
"max_data_part_size": "2097152",
"move_factor": 0.1,
},
......@@ -145,6 +154,7 @@ def test_system_tables(start_cluster):
"volume_name": "big",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "20971520",
"move_factor": 0.1,
},
......@@ -153,6 +163,7 @@ def test_system_tables(start_cluster):
"volume_name": "special_warning_zero_volume",
"volume_priority": "1",
"disks": ["default"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
},
......@@ -161,6 +172,7 @@ def test_system_tables(start_cluster):
"volume_name": "special_warning_default_volume",
"volume_priority": "2",
"disks": ["external"],
"volume_type": "JBOD",
"max_data_part_size": "0",
"move_factor": 0.1,
},
......@@ -169,6 +181,7 @@ def test_system_tables(start_cluster):
"volume_name": "special_warning_small_volume",
"volume_priority": "3",
"disks": ["jbod1"],
"volume_type": "JBOD",
"max_data_part_size": "1024",
"move_factor": 0.1,
},
......@@ -177,6 +190,7 @@ def test_system_tables(start_cluster):
"volume_name": "special_warning_big_volume",
"volume_priority": "4",
"disks": ["jbod2"],
"volume_type": "JBOD",
"max_data_part_size": "1024000000",
"move_factor": 0.1,
},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册