提交 d6f72dc2 编写于 作者: M Maxim Akhmedov

Specify supported features for each storage engine, expose them via...

Specify supported features for each storage engine, expose them via system.table_engines table, properly check feature support in storage factory.
上级 c456ee5a
......@@ -411,7 +411,7 @@ bool StorageKafka::streamToViews()
void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
......@@ -636,7 +636,9 @@ void registerStorageKafka(StorageFactory & factory)
return StorageKafka::create(
args.table_id, args.context, args.columns,
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit);
});
};
factory.registerStorage("Kafka", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
......
......@@ -669,21 +669,28 @@ static StoragePtr create(const StorageFactory::Arguments & args)
void registerStorageMergeTree(StorageFactory & factory)
{
factory.registerStorage("MergeTree", create);
factory.registerStorage("CollapsingMergeTree", create);
factory.registerStorage("ReplacingMergeTree", create);
factory.registerStorage("AggregatingMergeTree", create);
factory.registerStorage("SummingMergeTree", create);
factory.registerStorage("GraphiteMergeTree", create);
factory.registerStorage("VersionedCollapsingMergeTree", create);
factory.registerStorage("ReplicatedMergeTree", create);
factory.registerStorage("ReplicatedCollapsingMergeTree", create);
factory.registerStorage("ReplicatedReplacingMergeTree", create);
factory.registerStorage("ReplicatedAggregatingMergeTree", create);
factory.registerStorage("ReplicatedSummingMergeTree", create);
factory.registerStorage("ReplicatedGraphiteMergeTree", create);
factory.registerStorage("ReplicatedVersionedCollapsingMergeTree", create);
StorageFactory::StorageFeatures features{
.supports_settings = true,
.supports_skipping_indices = true,
.supports_sort_order = true,
.supports_ttl = true,
};
factory.registerStorage("MergeTree", create, features);
factory.registerStorage("CollapsingMergeTree", create, features);
factory.registerStorage("ReplacingMergeTree", create, features);
factory.registerStorage("AggregatingMergeTree", create, features);
factory.registerStorage("SummingMergeTree", create, features);
factory.registerStorage("GraphiteMergeTree", create, features);
factory.registerStorage("VersionedCollapsingMergeTree", create, features);
factory.registerStorage("ReplicatedMergeTree", create, features);
factory.registerStorage("ReplicatedCollapsingMergeTree", create, features);
factory.registerStorage("ReplicatedReplacingMergeTree", create, features);
factory.registerStorage("ReplicatedAggregatingMergeTree", create, features);
factory.registerStorage("ReplicatedSummingMergeTree", create, features);
factory.registerStorage("ReplicatedGraphiteMergeTree", create, features);
factory.registerStorage("ReplicatedVersionedCollapsingMergeTree", create, features);
}
}
......@@ -31,9 +31,9 @@ static void checkAllTypesAreAllowedInTable(const NamesAndTypesList & names_and_t
}
void StorageFactory::registerStorage(const std::string & name, Creator creator)
void StorageFactory::registerStorage(const std::string & name, CreatorFn creator_fn, StorageFeatures features)
{
if (!storages.emplace(name, std::move(creator)).second)
if (!storages.emplace(name, Creator{std::move(creator_fn), features}).second)
throw Exception("TableFunctionFactory: the table function name '" + name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
}
......@@ -93,24 +93,6 @@ StoragePtr StorageFactory::get(
name = engine_def.name;
if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka" && name != "Join")
{
throw Exception(
"Engine " + name + " doesn't support SETTINGS clause. "
"Currently only the MergeTree family of engines, Kafka engine and Join engine support it",
ErrorCodes::BAD_ARGUMENTS);
}
if ((storage_def->partition_by || storage_def->primary_key || storage_def->order_by || storage_def->sample_by ||
storage_def->ttl_table || !columns.getColumnTTLs().empty() ||
(query.columns_list && query.columns_list->indices && !query.columns_list->indices->children.empty()))
&& !endsWith(name, "MergeTree"))
{
throw Exception(
"Engine " + name + " doesn't support PARTITION BY, PRIMARY KEY, ORDER BY, TTL or SAMPLE BY clauses and skipping indices. "
"Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS);
}
if (name == "View")
{
throw Exception(
......@@ -142,6 +124,45 @@ StoragePtr StorageFactory::get(
throw Exception("Unknown table engine " + name, ErrorCodes::UNKNOWN_STORAGE);
}
auto checkFeature = [&](String feature_description, FeatureMatcherFn feature_matcher_fn)
{
if (!feature_matcher_fn(it->second.features)) {
String msg = "Engine " + name + " doesn't support " + feature_description + ". "
"Currently only the following engines have support for the feature: [";
auto supporting_engines = getAllRegisteredNamesByFeatureMatcherFn(feature_matcher_fn);
for (size_t index = 0; index < supporting_engines.size(); ++index)
{
if (index)
msg += ", ";
msg += supporting_engines[index];
}
msg += "]";
throw Exception(msg, ErrorCodes::BAD_ARGUMENTS);
}
};
//if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka" && name != "Join")
if (storage_def->settings)
checkFeature(
"SETTINGS clause",
[](StorageFeatures features) { return features.supports_settings; });
if (storage_def->partition_by || storage_def->primary_key || storage_def->order_by || storage_def->sample_by)
checkFeature(
"PARTITION_BY, PRIMARY_KEY, ORDER_BY or SAMPLE_BY clauses",
[](StorageFeatures features) { return features.supports_sort_order; });
if (storage_def->ttl_table || !columns.getColumnTTLs().empty())
checkFeature(
"TTL clause",
[](StorageFeatures features) { return features.supports_ttl; });
if (query.columns_list && query.columns_list->indices && !query.columns_list->indices->children.empty())
checkFeature(
"skipping indices",
[](StorageFeatures features) { return features.supports_skipping_indices; });
Arguments arguments
{
.engine_name = name,
......@@ -158,7 +179,7 @@ StoragePtr StorageFactory::get(
.has_force_restore_data_flag = has_force_restore_data_flag
};
return it->second(arguments);
return it->second.creator_fn(arguments);
}
StorageFactory & StorageFactory::instance()
......
......@@ -46,7 +46,22 @@ public:
bool has_force_restore_data_flag;
};
using Creator = std::function<StoragePtr(const Arguments & arguments)>;
struct StorageFeatures
{
bool supports_settings = false;
bool supports_skipping_indices = false;
bool supports_sort_order = false;
bool supports_ttl = false;
};
using CreatorFn = std::function<StoragePtr(const Arguments & arguments)>;
struct Creator
{
CreatorFn creator_fn;
StorageFeatures features;
};
using Storages = std::unordered_map<std::string, Creator>;
StoragePtr get(
const ASTCreateQuery & query,
......@@ -59,9 +74,14 @@ public:
/// Register a table engine by its name.
/// No locking, you must register all engines before usage of get.
void registerStorage(const std::string & name, Creator creator);
void registerStorage(const std::string & name, CreatorFn creator_fn, StorageFeatures features = StorageFeatures{
.supports_settings = false,
.supports_skipping_indices = false,
.supports_sort_order = false,
.supports_ttl = false,
});
const auto & getAllStorages() const
const Storages & getAllStorages() const
{
return storages;
}
......@@ -74,8 +94,18 @@ public:
return result;
}
using FeatureMatcherFn = std::function<bool(StorageFeatures)>;
std::vector<String> getAllRegisteredNamesByFeatureMatcherFn(FeatureMatcherFn feature_matcher_fn) const
{
std::vector<String> result;
for (const auto& pair : storages)
if (feature_matcher_fn(pair.second.features))
result.push_back(pair.first);
return result;
}
private:
using Storages = std::unordered_map<std::string, Creator>;
Storages storages;
};
......
......@@ -95,7 +95,7 @@ size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
void registerStorageJoin(StorageFactory & factory)
{
factory.registerStorage("Join", [](const StorageFactory::Arguments & args)
auto creator_fn = [](const StorageFactory::Arguments & args)
{
/// Join(ANY, LEFT, k1, k2, ...)
......@@ -209,7 +209,9 @@ void registerStorageJoin(StorageFactory & factory)
args.constraints,
join_any_take_last_row,
args.context);
});
};
factory.registerStorage("Join", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
template <typename T>
......
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/StorageFactory.h>
#include <Storages/System/StorageSystemTableEngines.h>
......@@ -7,15 +8,22 @@ namespace DB
NamesAndTypesList StorageSystemTableEngines::getNamesAndTypes()
{
return {{"name", std::make_shared<DataTypeString>()}};
return {{"name", std::make_shared<DataTypeString>()},
{"supports_settings", std::make_shared<DataTypeUInt8>()},
{"supports_skipping_indices", std::make_shared<DataTypeUInt8>()},
{"supports_sort_order", std::make_shared<DataTypeUInt8>()},
{"supports_ttl", std::make_shared<DataTypeUInt8>()}};
}
void StorageSystemTableEngines::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
const auto & storages = StorageFactory::instance().getAllStorages();
for (const auto & pair : storages)
for (const auto & pair : StorageFactory::instance().getAllStorages())
{
res_columns[0]->insert(pair.first);
res_columns[1]->insert(pair.second.features.supports_settings);
res_columns[2]->insert(pair.second.features.supports_skipping_indices);
res_columns[3]->insert(pair.second.features.supports_sort_order);
res_columns[4]->insert(pair.second.features.supports_ttl);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册