未验证 提交 04206db7 编写于 作者: T tavplubix 提交者: GitHub

Merge pull request #15974 from ClickHouse/merging_14295

Merging #14295
......@@ -52,6 +52,7 @@ RUN apt-get update \
RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 history_size=7'" >> /etc/environment; \
echo "UBSAN_OPTIONS='print_stacktrace=1'" >> /etc/environment; \
echo "MSAN_OPTIONS='abort_on_error=1'" >> /etc/environment; \
echo "LSAN_OPTIONS='suppressions=/usr/share/clickhouse-test/config/lsan_suppressions.txt'" >> /etc/environment; \
ln -s /usr/lib/llvm-${LLVM_VERSION}/bin/llvm-symbolizer /usr/bin/llvm-symbolizer;
# Sanitizer options for current shell (not current, but the one that will be spawned on "docker run")
# (but w/o verbosity for TSAN, otherwise test.reference will not match)
......
......@@ -54,9 +54,12 @@ std::pair<String, StoragePtr> createTableFromAST(
if (ast_create_query.as_table_function)
{
const auto & table_function = ast_create_query.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
StoragePtr storage = factory.get(table_function.name, context)->execute(ast_create_query.as_table_function, context, ast_create_query.table);
auto table_function = factory.get(ast_create_query.as_table_function, context);
ColumnsDescription columns;
if (ast_create_query.columns_list && ast_create_query.columns_list->columns)
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, false);
StoragePtr storage = table_function->execute(ast_create_query.as_table_function, context, ast_create_query.table, std::move(columns));
storage->renameInMemory(ast_create_query);
return {ast_create_query.table, storage};
}
......
......@@ -254,9 +254,12 @@ void DatabaseOrdinary::alterTable(const Context & context, const StorageID & tab
auto & ast_create_query = ast->as<ASTCreateQuery &>();
if (ast_create_query.as_table_function)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function", backQuote(table_name));
bool has_structure = ast_create_query.columns_list && ast_create_query.columns_list->columns;
if (ast_create_query.as_table_function && !has_structure)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot alter table {} because it was created AS table function"
" and doesn't have structure in metadata", backQuote(table_name));
assert(has_structure);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(metadata.constraints);
......
......@@ -159,8 +159,7 @@ void SelectStreamFactory::createForShard(
if (table_func_ptr)
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
}
else
......
......@@ -933,7 +933,7 @@ StoragePtr Context::executeTableFunction(const ASTPtr & table_expression)
if (!res)
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression->as<ASTFunction>()->name, *this);
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression, *this);
/// Run it and remember the result
res = table_function_ptr->execute(table_expression, *this, table_function_ptr->getName());
......
......@@ -453,6 +453,9 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
if (create.columns_list)
{
if (create.as_table_function && (create.columns_list->indices || create.columns_list->constraints))
throw Exception("Indexes and constraints are not supported for table functions", ErrorCodes::INCORRECT_QUERY);
if (create.columns_list->columns)
{
bool sanity_check_compression_codecs = !create.attach && !context.getSettingsRef().allow_suspicious_codecs;
......@@ -489,7 +492,12 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
}
else if (create.as_table_function)
return {};
{
/// Table function without columns list.
auto table_function = TableFunctionFactory::instance().get(create.as_table_function, context);
properties.columns = table_function->getActualTableStructure(context);
assert(!properties.columns.empty());
}
else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
......@@ -575,9 +583,12 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
return;
if (create.storage || create.is_view || create.is_materialized_view || create.is_live_view || create.is_dictionary)
{
if (create.temporary && create.storage->engine->name != "Memory")
if (create.temporary && create.storage && create.storage->engine && create.storage->engine->name != "Memory")
throw Exception(
"Temporary tables can only be created with ENGINE = Memory, not " + create.storage->engine->name,
ErrorCodes::INCORRECT_QUERY);
......@@ -757,9 +768,8 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
/// NOTE: CREATE query may be rewritten by Storage creator or table function
if (create.as_table_function)
{
const auto & table_function = create.as_table_function->as<ASTFunction &>();
const auto & factory = TableFunctionFactory::instance();
res = factory.get(table_function.name, context)->execute(create.as_table_function, context, create.table);
res = factory.get(create.as_table_function, context)->execute(create.as_table_function, context, create.table, properties.columns);
res->renameInMemory({create.database, create.table, create.uuid});
}
else
......
......@@ -72,23 +72,16 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
table_expression.subquery->children.at(0), context).getNamesAndTypesList();
columns = ColumnsDescription(std::move(names_and_types));
}
else if (table_expression.table_function)
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, context);
columns = table_function_ptr->getActualTableStructure(context);
}
else
{
StoragePtr table;
if (table_expression.table_function)
{
const auto & table_function = table_expression.table_function->as<ASTFunction &>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function.name, context);
/// Run the table function and remember the result
table = table_function_ptr->execute(table_expression.table_function, context, table_function_ptr->getName());
}
else
{
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
context.checkAccess(AccessType::SHOW_COLUMNS, table_id);
table = DatabaseCatalog::instance().getTable(table_id, context);
}
auto table_id = context.resolveStorageID(table_expression.database_and_table_name);
context.checkAccess(AccessType::SHOW_COLUMNS, table_id);
auto table = DatabaseCatalog::instance().getTable(table_id, context);
auto table_lock = table->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
columns = metadata_snapshot->getColumns();
......
......@@ -67,9 +67,8 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
{
if (query.table_function)
{
const auto * table_function = query.table_function->as<ASTFunction>();
const auto & factory = TableFunctionFactory::instance();
TableFunctionPtr table_function_ptr = factory.get(table_function->name, context);
TableFunctionPtr table_function_ptr = factory.get(query.table_function, context);
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
}
......
......@@ -1114,6 +1114,7 @@ void InterpreterSelectQuery::executeFetchColumns(
bool optimize_trivial_count =
syntax_analyzer_result->optimize_trivial_count
&& storage
&& storage->getName() != "MaterializeMySQL"
&& !filter_info
&& processing_stage == QueryProcessingStage::FetchColumns
&& query_analyzer->hasAggregation()
......
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/IStorage.h>
#include <DataStreams/OneBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Rewrite original query removing joined tables from it
bool removeJoin(ASTSelectQuery & select)
{
const auto & tables = select.tables();
if (!tables || tables->children.size() < 2)
return false;
const auto & joined_table = tables->children[1]->as<ASTTablesInSelectQueryElement &>();
if (!joined_table.table_join)
return false;
/// The most simple temporary solution: leave only the first table in query.
/// TODO: we also need to remove joined columns and related functions (taking in account aliases if any).
tables->children.resize(1);
return true;
}
Block getHeaderForProcessingStage(
const IStorage & storage,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage)
{
switch (processed_stage)
{
case QueryProcessingStage::FetchColumns:
{
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID());
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
}
return header;
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
case QueryProcessingStage::WithMergeableStateAfterAggregation:
case QueryProcessingStage::MAX:
{
auto query = query_info.query->clone();
removeJoin(*query->as<ASTSelectQuery>());
auto stream = std::make_shared<OneBlockInputStream>(
metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals(), storage.getStorageID()));
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}
}
#pragma once
#include <Core/Block.h>
#include <Core/Names.h>
#include <Core/QueryProcessingStage.h>
namespace DB
{
class IStorage;
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct SelectQueryInfo;
class Context;
class ASTSelectQuery;
bool removeJoin(ASTSelectQuery & select);
Block getHeaderForProcessingStage(
const IStorage & storage,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage);
}
......@@ -63,6 +63,7 @@ SRCS(
ExtractExpressionInfoVisitor.cpp
FillingRow.cpp
getClusterName.cpp
getHeaderForProcessingStage.cpp
getTableExpressions.cpp
HashJoin.cpp
IdentifierSemantic.cpp
......
......@@ -282,13 +282,23 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
if (as_table_function)
{
if (columns_list)
{
frame.expression_list_always_start_on_new_line = true;
settings.ostr << (settings.one_line ? " (" : "\n(");
FormatStateStacked frame_nested = frame;
columns_list->formatImpl(settings, state, frame_nested);
settings.ostr << (settings.one_line ? ")" : "\n)");
frame.expression_list_always_start_on_new_line = false;
}
settings.ostr << (settings.hilite ? hilite_keyword : "") << " AS " << (settings.hilite ? hilite_none : "");
as_table_function->formatImpl(settings, state, frame);
}
frame.expression_list_always_start_on_new_line = true;
if (columns_list)
if (columns_list && !as_table_function)
{
settings.ostr << (settings.one_line ? " (" : "\n(");
FormatStateStacked frame_nested = frame;
......
......@@ -416,7 +416,12 @@ bool ParserCreateTableQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
return false;
if (!storage_p.parse(pos, storage, expected) && !is_temporary)
return false;
{
if (!s_as.ignore(pos, expected))
return false;
if (!table_function_p.parse(pos, as_table_function, expected))
return false;
}
}
else
{
......
......@@ -360,12 +360,14 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_)
bool attach_,
ClusterPtr owned_cluster_)
: IStorage(id_)
, remote_database(remote_database_)
, remote_table(remote_table_)
, global_context(std::make_unique<Context>(context_))
, log(&Poco::Logger::get("StorageDistributed (" + id_.table_name + ")"))
, owned_cluster(std::move(owned_cluster_))
, cluster_name(global_context->getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, relative_data_path(relative_data_path_)
......@@ -411,39 +413,13 @@ StorageDistributed::StorageDistributed(
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach)
bool attach,
ClusterPtr owned_cluster_)
: StorageDistributed(id_, columns_, constraints_, String{}, String{}, cluster_name_, context_, sharding_key_, storage_policy_name_, relative_data_path_, attach, std::move(owned_cluster_))
{
remote_table_function_ptr = std::move(remote_table_function_ptr_);
}
StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_database_, remote_table_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = std::move(owned_cluster_);
return res;
}
StoragePtr StorageDistributed::createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_,
ClusterPtr & owned_cluster_,
const Context & context_)
{
auto res = create(table_id_, columns_, ConstraintsDescription{}, remote_table_function_ptr_, String{}, context_, ASTPtr(), String(), String(), false);
res->owned_cluster = owned_cluster_;
return res;
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(const Context &context, QueryProcessingStage::Enum to_stage, const ASTPtr & query_ptr) const
{
const auto & settings = context.getSettingsRef();
......
......@@ -42,21 +42,6 @@ class StorageDistributed final : public ext::shared_ptr_helper<StorageDistribute
public:
~StorageDistributed() override;
static StoragePtr createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const String & remote_database_, /// database on remote servers.
const String & remote_table_, /// The name of the table on the remote servers.
ClusterPtr owned_cluster_,
const Context & context_);
static StoragePtr createWithOwnCluster(
const StorageID & table_id_,
const ColumnsDescription & columns_,
ASTPtr & remote_table_function_ptr_, /// Table function ptr.
ClusterPtr & owned_cluster_,
const Context & context_);
std::string getName() const override { return "Distributed"; }
bool supportsSampling() const override { return true; }
......@@ -165,7 +150,8 @@ protected:
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach_);
bool attach_,
ClusterPtr owned_cluster_ = {});
StorageDistributed(
const StorageID & id_,
......@@ -177,7 +163,8 @@ protected:
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
bool attach);
bool attach,
ClusterPtr owned_cluster_ = {});
String relative_data_path;
......
......@@ -51,6 +51,7 @@ namespace ErrorCodes
extern const int UNKNOWN_IDENTIFIER;
extern const int INCORRECT_FILE_NAME;
extern const int FILE_DOESNT_EXIST;
extern const int INCOMPATIBLE_COLUMNS;
}
namespace
......@@ -125,11 +126,33 @@ void checkCreationIsAllowed(const Context & context_global, const std::string &
}
}
Strings StorageFile::getPathsList(const String & table_path, const String & user_files_path, const Context & context)
{
String user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
Poco::Path poco_path = Poco::Path(table_path);
if (poco_path.isRelative())
poco_path = Poco::Path(user_files_absolute_path, poco_path);
Strings paths;
const String path = poco_path.absolute().toString();
if (path.find_first_of("*?{") == std::string::npos)
paths.push_back(path);
else
paths = listFilesWithRegexpMatching("/", path);
for (const auto & cur_path : paths)
checkCreationIsAllowed(context, user_files_absolute_path, cur_path);
return paths;
}
StorageFile::StorageFile(int table_fd_, CommonArguments args)
: StorageFile(args)
{
if (args.context.getApplicationType() == Context::ApplicationType::SERVER)
throw Exception("Using file descriptor as source of storage isn't allowed for server daemons", ErrorCodes::DATABASE_ACCESS_DENIED);
if (args.format_name == "Distributed")
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
is_db_table = false;
use_table_fd = true;
......@@ -144,32 +167,22 @@ StorageFile::StorageFile(const std::string & table_path_, const std::string & us
: StorageFile(args)
{
is_db_table = false;
std::string user_files_absolute_path = Poco::Path(user_files_path).makeAbsolute().makeDirectory().toString();
Poco::Path poco_path = Poco::Path(table_path_);
if (poco_path.isRelative())
poco_path = Poco::Path(user_files_absolute_path, poco_path);
const std::string path = poco_path.absolute().toString();
if (path.find_first_of("*?{") == std::string::npos)
paths.push_back(path);
else
paths = listFilesWithRegexpMatching("/", path);
for (const auto & cur_path : paths)
checkCreationIsAllowed(args.context, user_files_absolute_path, cur_path);
paths = getPathsList(table_path_, user_files_path, args.context);
if (args.format_name == "Distributed")
{
if (!paths.empty())
{
auto & first_path = paths[0];
Block header = StorageDistributedDirectoryMonitor::createStreamFromFile(first_path)->getHeader();
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(ColumnsDescription(header.getNamesAndTypesList()));
setInMemoryMetadata(storage_metadata);
}
if (paths.empty())
throw Exception("Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME);
auto & first_path = paths[0];
Block header = StorageDistributedDirectoryMonitor::createStreamFromFile(first_path)->getHeader();
StorageInMemoryMetadata storage_metadata;
auto columns = ColumnsDescription(header.getNamesAndTypesList());
if (!args.columns.empty() && columns != args.columns)
throw Exception("Table structure and file structure are different", ErrorCodes::INCOMPATIBLE_COLUMNS);
storage_metadata.setColumns(columns);
setInMemoryMetadata(storage_metadata);
}
}
......@@ -178,6 +191,8 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
{
if (relative_table_dir_path.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
if (args.format_name == "Distributed")
throw Exception("Distributed format is allowed only with explicit file path", ErrorCodes::INCORRECT_FILE_NAME);
String table_dir_path = base_path + relative_table_dir_path + "/";
Poco::File(table_dir_path).createDirectories();
......
......@@ -60,6 +60,8 @@ public:
NamesAndTypesList getVirtuals() const override;
static Strings getPathsList(const String & table_path, const String & user_files_path, const Context & context);
protected:
friend class StorageFileSource;
friend class StorageFileBlockOutputStream;
......
......@@ -23,7 +23,7 @@ namespace DB
{
StorageMaterializeMySQL::StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_)
: IStorage(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_)
: StorageProxy(nested_storage_->getStorageID()), nested_storage(nested_storage_), database(database_)
{
auto nested_memory_metadata = nested_storage->getInMemoryMetadata();
StorageInMemoryMetadata in_memory_metadata;
......
......@@ -4,31 +4,41 @@
#if USE_MYSQL
#include <Storages/IStorage.h>
#include <Storages/StorageProxy.h>
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
namespace DB
{
class StorageMaterializeMySQL final : public ext::shared_ptr_helper<StorageMaterializeMySQL>, public IStorage
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
class StorageMaterializeMySQL final : public ext::shared_ptr_helper<StorageMaterializeMySQL>, public StorageProxy
{
friend struct ext::shared_ptr_helper<StorageMaterializeMySQL>;
public:
String getName() const override { return "MaterializeMySQL"; }
bool supportsFinal() const override { return nested_storage->supportsFinal(); }
bool supportsSampling() const override { return nested_storage->supportsSampling(); }
StorageMaterializeMySQL(const StoragePtr & nested_storage_, const DatabaseMaterializeMySQL * database_);
Pipe read(
const Names & column_names, const StorageMetadataPtr & metadata_snapshot, const SelectQueryInfo & query_info,
const Context & context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, unsigned num_streams) override;
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr &, const Context &) override { throwNotAllowed(); }
NamesAndTypesList getVirtuals() const override;
ColumnSizeByName getColumnSizes() const override;
private:
StoragePtr getNested() const override { return nested_storage; }
[[noreturn]] void throwNotAllowed() const
{
throw Exception("This method is not allowed for MaterializeMySQ", ErrorCodes::NOT_IMPLEMENTED);
}
StoragePtr nested_storage;
const DatabaseMaterializeMySQL * database;
};
......
......@@ -9,6 +9,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
......@@ -42,23 +43,6 @@ namespace ErrorCodes
namespace
{
/// Rewrite original query removing joined tables from it
bool removeJoin(ASTSelectQuery & select)
{
const auto & tables = select.tables();
if (!tables || tables->children.size() < 2)
return false;
const auto & joined_table = tables->children[1]->as<ASTTablesInSelectQueryElement &>();
if (!joined_table.table_join)
return false;
/// The most simple temporary solution: leave only the first table in query.
/// TODO: we also need to remove joined columns and related functions (taking in account aliases if any).
tables->children.resize(1);
return true;
}
void modifySelect(ASTSelectQuery & select, const TreeRewriterResult & rewriter_result)
{
if (removeJoin(select))
......@@ -83,7 +67,6 @@ void modifySelect(ASTSelectQuery & select, const TreeRewriterResult & rewriter_r
}
StorageMerge::StorageMerge(
const StorageID & table_id_,
const ColumnsDescription & columns_,
......@@ -203,7 +186,7 @@ Pipe StorageMerge::read(
modified_context->setSetting("optimize_move_to_prewhere", false);
/// What will be result structure depending on query processed stage in source tables?
Block header = getQueryHeader(column_names, metadata_snapshot, query_info, context, processed_stage);
Block header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot, query_info, context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
......@@ -456,42 +439,6 @@ void StorageMerge::alter(
setInMemoryMetadata(storage_metadata);
}
Block StorageMerge::getQueryHeader(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage)
{
switch (processed_stage)
{
case QueryProcessingStage::FetchColumns:
{
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
if (query_info.prewhere_info->remove_prewhere_column)
header.erase(query_info.prewhere_info->prewhere_column_name);
}
return header;
}
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
case QueryProcessingStage::WithMergeableStateAfterAggregation:
case QueryProcessingStage::MAX:
{
auto query = query_info.query->clone();
removeJoin(*query->as<ASTSelectQuery>());
auto stream = std::make_shared<OneBlockInputStream>(
metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()));
return InterpreterSelectQuery(query, context, stream, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
}
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
}
void StorageMerge::convertingSourceStream(
const Block & header,
const StorageMetadataPtr & metadata_snapshot,
......
......@@ -76,13 +76,6 @@ protected:
const String & table_name_regexp_,
const Context & context_);
Block getQueryHeader(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage);
Pipe createSources(
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
......
#pragma once
#include <Storages/IStorage.h>
#include <Processors/Pipe.h>
namespace DB
{
class StorageProxy : public IStorage
{
public:
StorageProxy(const StorageID & table_id_) : IStorage(table_id_) {}
virtual StoragePtr getNested() const = 0;
String getName() const override { return "StorageProxy"; }
bool isRemote() const override { return getNested()->isRemote(); }
bool isView() const override { return getNested()->isView(); }
bool supportsSampling() const override { return getNested()->supportsSampling(); }
bool supportsFinal() const override { return getNested()->supportsFinal(); }
bool supportsPrewhere() const override { return getNested()->supportsPrewhere(); }
bool supportsReplication() const override { return getNested()->supportsReplication(); }
bool supportsParallelInsert() const override { return getNested()->supportsParallelInsert(); }
bool supportsDeduplication() const override { return getNested()->supportsDeduplication(); }
bool supportsSettings() const override { return getNested()->supportsSettings(); }
bool noPushingToViews() const override { return getNested()->noPushingToViews(); }
bool hasEvenlyDistributedRead() const override { return getNested()->hasEvenlyDistributedRead(); }
ColumnSizeByName getColumnSizes() const override { return getNested()->getColumnSizes(); }
NamesAndTypesList getVirtuals() const override { return getNested()->getVirtuals(); }
QueryProcessingStage::Enum getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum to_stage, const ASTPtr & ast) const override
{
return getNested()->getQueryProcessingStage(context, to_stage, ast);
}
BlockInputStreams watch(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override
{
return getNested()->watch(column_names, query_info, context, processed_stage, max_block_size, num_streams);
}
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override
{
return getNested()->read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context) override
{
return getNested()->write(query, metadata_snapshot, context);
}
void drop() override { getNested()->drop(); }
void truncate(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context,
TableExclusiveLockHolder & lock) override
{
getNested()->truncate(query, metadata_snapshot, context, lock);
}
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override
{
getNested()->rename(new_path_to_table_data, new_table_id);
IStorage::renameInMemory(new_table_id);
}
void renameInMemory(const StorageID & new_table_id) override
{
getNested()->renameInMemory(new_table_id);
IStorage::renameInMemory(new_table_id);
}
void alter(const AlterCommands & params, const Context & context, TableLockHolder & alter_lock_holder) override
{
getNested()->alter(params, context, alter_lock_holder);
IStorage::setInMemoryMetadata(getNested()->getInMemoryMetadata());
}
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) const override
{
getNested()->checkAlterIsPossible(commands, settings);
}
Pipe alterPartition(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
const Context & context) override
{
return getNested()->alterPartition(query, metadata_snapshot, commands, context);
}
void checkAlterPartitionIsPossible(const PartitionCommands & commands, const StorageMetadataPtr & metadata_snapshot, const Settings & settings) const override
{
getNested()->checkAlterPartitionIsPossible(commands, metadata_snapshot, settings);
}
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Context & context) override
{
return getNested()->optimize(query, metadata_snapshot, partition, final, deduplicate, context);
}
void mutate(const MutationCommands & commands, const Context & context) override { getNested()->mutate(commands, context); }
CancellationCode killMutation(const String & mutation_id) override { return getNested()->killMutation(mutation_id); }
void startup() override { getNested()->startup(); }
void shutdown() override { getNested()->shutdown(); }
ActionLock getActionLock(StorageActionBlockType action_type) override { return getNested()->getActionLock(action_type); }
bool supportsIndexForIn() const override { return getNested()->supportsIndexForIn(); }
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context, const StorageMetadataPtr & metadata_snapshot) const override
{
return getNested()->mayBenefitFromIndexForIn(left_in_operand, query_context, metadata_snapshot);
}
CheckResults checkData(const ASTPtr & query , const Context & context) override { return getNested()->checkData(query, context); }
void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); }
void checkPartitionCanBeDropped(const ASTPtr & partition) override { getNested()->checkPartitionCanBeDropped(partition); }
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }
std::optional<UInt64> totalRows() const override { return getNested()->totalRows(); }
std::optional<UInt64> totalBytes() const override { return getNested()->totalBytes(); }
std::optional<UInt64> lifetimeRows() const override { return getNested()->lifetimeRows(); }
std::optional<UInt64> lifetimeBytes() const override { return getNested()->lifetimeBytes(); }
};
}
#pragma once
#include <Storages/IStorage.h>
#include <TableFunctions/ITableFunction.h>
#include <Processors/Pipe.h>
#include <Storages/StorageProxy.h>
#include <Common/CurrentThread.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Interpreters/getHeaderForProcessingStage.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCOMPATIBLE_COLUMNS;
}
using GetNestedStorageFunc = std::function<StoragePtr()>;
/// Lazily creates underlying storage.
/// Adds ConversionTransform in case of structure mismatch.
class StorageTableFunctionProxy final : public StorageProxy
{
public:
StorageTableFunctionProxy(const StorageID & table_id_, GetNestedStorageFunc get_nested_,
ColumnsDescription cached_columns, bool add_conversion_ = true)
: StorageProxy(table_id_), get_nested(std::move(get_nested_)), add_conversion(add_conversion_)
{
StorageInMemoryMetadata cached_metadata;
cached_metadata.setColumns(std::move(cached_columns));
setInMemoryMetadata(cached_metadata);
}
StoragePtr getNested() const override
{
std::lock_guard lock{nested_mutex};
if (nested)
return nested;
auto nested_storage = get_nested();
nested_storage->startup();
nested_storage->renameInMemory(getStorageID());
nested = nested_storage;
get_nested = {};
return nested;
}
String getName() const override
{
std::lock_guard lock{nested_mutex};
if (nested)
return nested->getName();
return StorageProxy::getName();
}
void startup() override { }
void shutdown() override
{
std::lock_guard lock{nested_mutex};
if (nested)
nested->shutdown();
}
void drop() override
{
std::lock_guard lock{nested_mutex};
if (nested)
nested->drop();
}
Pipe read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams) override
{
String cnames;
for (const auto & c : column_names)
cnames += c + " ";
auto storage = getNested();
auto nested_metadata = storage->getInMemoryMetadataPtr();
auto pipe = storage->read(column_names, nested_metadata, query_info, context,
processed_stage, max_block_size, num_streams);
if (!pipe.empty() && add_conversion)
{
auto to_header = getHeaderForProcessingStage(*this, column_names, metadata_snapshot,
query_info, context, processed_stage);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header,
to_header,
ConvertingTransform::MatchColumnsMode::Name);
});
}
return pipe;
}
BlockOutputStreamPtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const Context & context) override
{
auto storage = getNested();
auto cached_structure = metadata_snapshot->getSampleBlock();
auto actual_structure = storage->getInMemoryMetadataPtr()->getSampleBlock();
if (!blocksHaveEqualStructure(actual_structure, cached_structure) && add_conversion)
{
throw Exception("Source storage and table function have different structure", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
return storage->write(query, metadata_snapshot, context);
}
void renameInMemory(const StorageID & new_table_id) override
{
std::lock_guard lock{nested_mutex};
if (nested)
StorageProxy::renameInMemory(new_table_id);
else
IStorage::renameInMemory(new_table_id);
}
bool isView() const override { return false; }
void checkTableCanBeDropped() const override {}
private:
mutable std::mutex nested_mutex;
mutable GetNestedStorageFunc get_nested;
mutable StoragePtr nested;
const bool add_conversion;
};
}
......@@ -34,6 +34,7 @@ public:
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
std::string getName() const override;
private:
BridgeHelperPtr bridge_helper;
......@@ -61,8 +62,6 @@ private:
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const override;
std::string getName() const override;
};
}
......@@ -38,10 +38,8 @@ ColumnsDescription getStructureOfRemoteTableInShard(
{
if (shard_info.isLocal())
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
auto storage_ptr = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
return storage_ptr->getInMemoryMetadataPtr()->getColumns();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
return table_function_ptr->getActualTableStructure(context);
}
auto table_func_name = queryToString(table_func_ptr);
......
......@@ -6,3 +6,4 @@ list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFuncti
add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
#include <TableFunctions/ITableFunction.h>
#include <Interpreters/Context.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageTableFunction.h>
#include <Access/AccessFlags.h>
#include <Common/ProfileEvents.h>
......@@ -13,11 +14,23 @@ namespace ProfileEvents
namespace DB
{
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
StoragePtr ITableFunction::execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name,
ColumnsDescription cached_columns) const
{
ProfileEvents::increment(ProfileEvents::TableFunctionExecute);
context.checkAccess(AccessType::CREATE_TEMPORARY_TABLE | StorageFactory::instance().getSourceAccessType(getStorageTypeName()));
return executeImpl(ast_function, context, table_name);
if (cached_columns.empty() || (hasStaticStructure() && cached_columns == getActualTableStructure(context)))
return executeImpl(ast_function, context, table_name, std::move(cached_columns));
auto get_storage = [=, tf = shared_from_this()]() -> StoragePtr
{
return tf->executeImpl(ast_function, context, table_name, cached_columns);
};
/// It will request actual table structure and create underlying storage lazily
return std::make_shared<StorageTableFunctionProxy>(StorageID(getDatabaseName(), table_name), std::move(get_storage),
std::move(cached_columns), needStructureConversion());
}
}
......@@ -2,6 +2,7 @@
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/ColumnsDescription.h>
#include <memory>
#include <string>
......@@ -21,9 +22,18 @@ class Context;
* Example:
* SELECT count() FROM remote('example01-01-1', merge, hits)
* - go to `example01-01-1`, in `merge` database, `hits` table.
*
*
* When creating table AS table_function(...) we probably don't know structure of the table
* and have to request if from remote server, because structure is required to create a Storage.
* To avoid failures on server startup, we write obtained structure to metadata file.
* So, table function may have two different columns lists:
* - cached_columns written to metadata
* - the list returned from getActualTableStructure(...)
* See StorageTableFunctionProxy.
*/
class ITableFunction
class ITableFunction : public std::enable_shared_from_this<ITableFunction>
{
public:
static inline std::string getDatabaseName() { return "_table_function"; }
......@@ -31,13 +41,24 @@ public:
/// Get the main function name.
virtual std::string getName() const = 0;
/// Returns true if we always know table structure when executing table function
/// (e.g. structure is specified in table function arguments)
virtual bool hasStaticStructure() const { return false; }
/// Returns false if storage returned by table function supports type conversion (e.g. StorageDistributed)
virtual bool needStructureConversion() const { return true; }
virtual void parseArguments(const ASTPtr & /*ast_function*/, const Context & /*context*/) {}
/// Returns actual table structure probably requested from remote server, may fail
virtual ColumnsDescription getActualTableStructure(const Context & /*context*/) const = 0;
/// Create storage according to the query.
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const;
StoragePtr execute(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns_ = {}) const;
virtual ~ITableFunction() {}
private:
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const = 0;
virtual StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
virtual const char * getStorageTypeName() const = 0;
};
......
......@@ -9,11 +9,12 @@
#include <Common/typeid_cast.h>
#include <Storages/StorageFile.h>
#include <Storages/Distributed/DirectoryMonitor.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
namespace DB
{
......@@ -21,9 +22,10 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int INCORRECT_FILE_NAME;
}
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void ITableFunctionFileLike::parseArguments(const ASTPtr & ast_function, const Context & context)
{
/// Parse args
ASTs & args_func = ast_function->children;
......@@ -39,8 +41,8 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
std::string filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::string format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() == 2 && getName() == "file")
{
......@@ -51,24 +53,33 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
throw Exception("Table function '" + getName() + "' requires 3 or 4 arguments: filename, format, structure and compression method (default auto).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ColumnsDescription columns;
std::string compression_method = "auto";
if (args.size() > 2)
{
auto structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
columns = parseColumnsListFromString(structure, context);
}
structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() == 4)
compression_method = args[3]->as<ASTLiteral &>().value.safeGet<String>();
}
/// Create table
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
StoragePtr storage = getStorage(filename, format, columns, const_cast<Context &>(context), table_name, compression_method);
storage->startup();
return storage;
}
ColumnsDescription ITableFunctionFileLike::getActualTableStructure(const Context & context) const
{
if (structure.empty())
{
assert(getName() == "file" && format == "Distributed");
Strings paths = StorageFile::getPathsList(filename, context.getUserFilesPath(), context);
if (paths.empty())
throw Exception("Cannot get table structure from file, because no files match specified name", ErrorCodes::INCORRECT_FILE_NAME);
auto read_stream = StorageDistributedDirectoryMonitor::createStreamFromFile(paths[0]);
return ColumnsDescription{read_stream->getHeader().getNamesAndTypesList()};
}
return parseColumnsListFromString(structure, context);
}
}
......@@ -13,8 +13,21 @@ class Context;
class ITableFunctionFileLike : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
virtual StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const = 0;
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method) const = 0;
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
bool hasStaticStructure() const override { return true; }
String filename;
String format;
String structure;
String compression_method = "auto";
};
}
......@@ -24,11 +24,10 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int UNKNOWN_EXCEPTION;
extern const int LOGICAL_ERROR;
}
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void ITableFunctionXDBC::parseArguments(const ASTPtr & ast_function, const Context & context)
{
const auto & args_func = ast_function->as<ASTFunction &>();
......@@ -44,10 +43,6 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
std::string connection_string;
std::string schema_name;
std::string remote_table_name;
if (args.size() == 3)
{
connection_string = args[0]->as<ASTLiteral &>().value.safeGet<String>();
......@@ -60,11 +55,16 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
remote_table_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
/* Infer external table structure */
/// Have to const_cast, because bridges store their commands inside context
BridgeHelperPtr helper = createBridgeHelper(const_cast<Context &>(context), context.getSettingsRef().http_receive_timeout.value, connection_string);
helper = createBridgeHelper(const_cast<Context &>(context), context.getSettingsRef().http_receive_timeout.value, connection_string);
helper->startBridgeSync();
}
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(const Context & context) const
{
assert(helper);
/* Infer external table structure */
Poco::URI columns_info_uri = helper->getColumnsInfoURI();
columns_info_uri.addQueryParameter("connection_string", connection_string);
if (!schema_name.empty())
......@@ -73,7 +73,7 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
const auto use_nulls = context.getSettingsRef().external_table_functions_use_nulls;
columns_info_uri.addQueryParameter("external_table_functions_use_nulls",
Poco::NumberFormatter::format(use_nulls));
Poco::NumberFormatter::format(use_nulls));
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
......@@ -81,11 +81,14 @@ StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & ast_function, const Co
readStringBinary(columns_info, buf);
NamesAndTypesList columns = NamesAndTypesList::parse(columns_info);
auto result = std::make_shared<StorageXDBC>(StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, ColumnsDescription{columns}, context, helper);
if (!result)
throw Exception("Failed to instantiate storage from table function " + getName(), ErrorCodes::UNKNOWN_EXCEPTION);
return ColumnsDescription{columns};
}
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
assert(helper);
auto columns = getActualTableStructure(context);
auto result = std::make_shared<StorageXDBC>(StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, context, helper);
result->startup();
return result;
}
......
......@@ -18,12 +18,21 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(Context & context,
const Poco::Timespan & http_timeout_,
const std::string & connection_string_) const = 0;
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String connection_string;
String schema_name;
String remote_table_name;
BridgeHelperPtr helper;
};
class TableFunctionJDBC : public ITableFunctionXDBC
......
......@@ -3,6 +3,7 @@
#include <Interpreters/Context.h>
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
#include <Parsers/ASTFunction.h>
namespace DB
......@@ -28,19 +29,21 @@ void TableFunctionFactory::registerFunction(const std::string & name, Value crea
}
TableFunctionPtr TableFunctionFactory::get(
const std::string & name,
const ASTPtr & ast_function,
const Context & context) const
{
auto res = tryGet(name, context);
const auto * table_function = ast_function->as<ASTFunction>();
auto res = tryGet(table_function->name, context);
if (!res)
{
auto hints = getHints(name);
auto hints = getHints(table_function->name);
if (!hints.empty())
throw Exception("Unknown table function " + name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_FUNCTION);
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}. Maybe you meant: {}", table_function->name , toString(hints));
else
throw Exception("Unknown table function " + name, ErrorCodes::UNKNOWN_FUNCTION);
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unknown table function {}", table_function->name);
}
res->parseArguments(ast_function, context);
return res;
}
......
......@@ -41,7 +41,7 @@ public:
}
/// Throws an exception if not found.
TableFunctionPtr get(const std::string & name, const Context & context) const;
TableFunctionPtr get(const ASTPtr & ast_function, const Context & context) const;
/// Returns nullptr if not found.
TableFunctionPtr tryGet(const std::string & name, const Context & context) const;
......
......@@ -9,9 +9,10 @@
namespace DB
{
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method) const
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method_) const
{
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format, compression_method, columns, ConstraintsDescription{}, global_context};
StorageFile::CommonArguments args{StorageID(getDatabaseName(), table_name), format_, compression_method_, columns, ConstraintsDescription{}, global_context};
return StorageFile::create(source, global_context.getUserFilesPath(), args);
}
......
......@@ -23,6 +23,7 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method) const override;
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const std::string & compression_method_) const override;
const char * getStorageTypeName() const override { return "File"; }
};}
......@@ -26,7 +26,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionGenerateRandom::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
{
ASTs & args_func = ast_function->children;
......@@ -58,11 +58,7 @@ StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function,
}
/// Parsing first argument as table structure and creating a sample block
std::string structure = args[0]->as<const ASTLiteral &>().value.safeGet<String>();
UInt64 max_string_length = 10;
UInt64 max_array_length = 10;
std::optional<UInt64> random_seed;
structure = args[0]->as<const ASTLiteral &>().value.safeGet<String>();
if (args.size() >= 2)
{
......@@ -76,10 +72,16 @@ StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & ast_function,
if (args.size() == 4)
max_array_length = args[3]->as<const ASTLiteral &>().value.safeGet<UInt64>();
}
ColumnsDescription TableFunctionGenerateRandom::getActualTableStructure(const Context & context) const
{
return parseColumnsListFromString(structure, context);
}
ColumnsDescription columns = parseColumnsListFromString(structure, context);
StoragePtr TableFunctionGenerateRandom::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto res = StorageGenerateRandom::create(StorageID(getDatabaseName(), table_name), columns, max_array_length, max_string_length, random_seed);
res->startup();
return res;
......
......@@ -13,9 +13,19 @@ class TableFunctionGenerateRandom : public ITableFunction
public:
static constexpr auto name = "generateRandom";
std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "GenerateRandom"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String structure;
UInt64 max_string_length = 10;
UInt64 max_array_length = 10;
std::optional<UInt64> random_seed;
};
......
......@@ -10,15 +10,17 @@
namespace DB
{
StoragePtr TableFunctionHDFS::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const
{
return StorageHDFS::create(source,
return StorageHDFS::create(
source,
StorageID(getDatabaseName(), table_name),
format,
format_,
columns,
ConstraintsDescription{},
global_context,
compression_method);
compression_method_);
}
......
......@@ -26,7 +26,8 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const override;
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "HDFS"; }
};
......
......@@ -22,7 +22,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionInput::parseArguments(const ASTPtr & ast_function, const Context & context)
{
const auto * function = ast_function->as<ASTFunction>();
......@@ -35,12 +35,18 @@ StoragePtr TableFunctionInput::executeImpl(const ASTPtr & ast_function, const Co
throw Exception("Table function '" + getName() + "' requires exactly 1 argument: structure",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context)->as<ASTLiteral &>().value.safeGet<String>();
auto columns = parseColumnsListFromString(structure, context);
StoragePtr storage = StorageInput::create(StorageID(getDatabaseName(), table_name), columns);
structure = evaluateConstantExpressionOrIdentifierAsLiteral(args[0], context)->as<ASTLiteral &>().value.safeGet<String>();
}
storage->startup();
ColumnsDescription TableFunctionInput::getActualTableStructure(const Context & context) const
{
return parseColumnsListFromString(structure, context);
}
StoragePtr TableFunctionInput::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto storage = StorageInput::create(StorageID(getDatabaseName(), table_name), getActualTableStructure(context));
storage->startup();
return storage;
}
......
......@@ -15,10 +15,16 @@ class TableFunctionInput : public ITableFunction
public:
static constexpr auto name = "input";
std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Input"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String structure;
};
}
......@@ -45,8 +45,7 @@ static NamesAndTypesList chooseColumns(const String & source_database, const Str
return any_table->getInMemoryMetadataPtr()->getColumns().getAllPhysical();
}
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionMerge::parseArguments(const ASTPtr & ast_function, const Context & context)
{
ASTs & args_func = ast_function->children;
......@@ -65,15 +64,24 @@ StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & ast_function, const Co
args[0] = evaluateConstantExpressionForDatabaseName(args[0], context);
args[1] = evaluateConstantExpressionAsLiteral(args[1], context);
String source_database = args[0]->as<ASTLiteral &>().value.safeGet<String>();
String table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
source_database = args[0]->as<ASTLiteral &>().value.safeGet<String>();
table_name_regexp = args[1]->as<ASTLiteral &>().value.safeGet<String>();
}
ColumnsDescription TableFunctionMerge::getActualTableStructure(const Context & context) const
{
return ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)};
}
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto res = StorageMerge::create(
StorageID(getDatabaseName(), table_name),
ColumnsDescription{chooseColumns(source_database, table_name_regexp, context)},
getActualTableStructure(context),
source_database,
table_name_regexp,
context);
res->startup();
return res;
}
......
......@@ -16,8 +16,14 @@ public:
static constexpr auto name = "merge";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Merge"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String source_database;
String table_name_regexp;
};
......
......@@ -24,8 +24,6 @@
# include <Databases/MySQL/DatabaseConnectionMySQL.h> // for fetchTablesColumnsList
# include <mysqlxx/Pool.h>
namespace DB
{
......@@ -38,8 +36,7 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
}
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionMySQL::parseArguments(const ASTPtr & ast_function, const Context & context)
{
const auto & args_func = ast_function->as<ASTFunction &>();
......@@ -55,14 +52,12 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
std::string host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::string remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::string remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
std::string user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
std::string password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
String host_port = args[0]->as<ASTLiteral &>().value.safeGet<String>();
remote_database_name = args[1]->as<ASTLiteral &>().value.safeGet<String>();
remote_table_name = args[2]->as<ASTLiteral &>().value.safeGet<String>();
user_name = args[3]->as<ASTLiteral &>().value.safeGet<String>();
password = args[4]->as<ASTLiteral &>().value.safeGet<String>();
bool replace_query = false;
std::string on_duplicate_clause;
if (args.size() >= 6)
replace_query = args[5]->as<ASTLiteral &>().value.safeGet<UInt64>() > 0;
if (args.size() == 7)
......@@ -74,27 +69,46 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
ErrorCodes::BAD_ARGUMENTS);
/// 3306 is the default MySQL port number
auto parsed_host_port = parseAddress(host_port, 3306);
parsed_host_port = parseAddress(host_port, 3306);
}
ColumnsDescription TableFunctionMySQL::getActualTableStructure(const Context & context) const
{
assert(!parsed_host_port.first.empty());
if (!pool)
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
mysqlxx::Pool pool(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
const auto & settings = context.getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level);
const auto tables_and_columns = fetchTablesColumnsList(*pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level);
const auto columns = tables_and_columns.find(remote_table_name);
if (columns == tables_and_columns.end())
throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
return ColumnsDescription{columns->second};
}
StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
assert(!parsed_host_port.first.empty());
if (!pool)
pool.emplace(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
auto columns = getActualTableStructure(context);
auto res = StorageMySQL::create(
StorageID(getDatabaseName(), table_name),
std::move(pool),
std::move(*pool),
remote_database_name,
remote_table_name,
replace_query,
on_duplicate_clause,
ColumnsDescription{columns->second},
columns,
ConstraintsDescription{},
context);
pool.reset();
res->startup();
return res;
}
......
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_MYSQL
#include <TableFunctions/ITableFunction.h>
#include <mysqlxx/Pool.h>
namespace DB
......@@ -19,8 +24,23 @@ public:
return name;
}
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "MySQL"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
std::pair<std::string, UInt16> parsed_host_port;
String remote_database_name;
String remote_table_name;
String user_name;
String password;
bool replace_query = false;
String on_duplicate_clause;
mutable std::optional<mysqlxx::Pool> pool;
};
}
#endif
......@@ -17,23 +17,30 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionNull::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionNull::parseArguments(const ASTPtr & ast_function, const Context & context)
{
if (const auto * function = ast_function->as<ASTFunction>())
{
auto arguments = function->arguments->children;
const auto * function = ast_function->as<ASTFunction>();
if (!function || !function->arguments)
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() != 1)
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto & arguments = function->arguments->children;
if (arguments.size() != 1)
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto structure = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context)->as<ASTLiteral>()->value.safeGet<String>();
ColumnsDescription columns = parseColumnsListFromString(structure, context);
structure = evaluateConstantExpressionOrIdentifierAsLiteral(arguments[0], context)->as<ASTLiteral>()->value.safeGet<String>();
}
auto res = StorageNull::create(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription());
res->startup();
return res;
}
throw Exception("Table function '" + getName() + "' requires 'structure'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ColumnsDescription TableFunctionNull::getActualTableStructure(const Context & context) const
{
return parseColumnsListFromString(structure, context);
}
StoragePtr TableFunctionNull::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto res = StorageNull::create(StorageID(getDatabaseName(), table_name), columns, ConstraintsDescription());
res->startup();
return res;
}
void registerTableFunctionNull(TableFunctionFactory & factory)
......
......@@ -17,8 +17,13 @@ public:
static constexpr auto name = "null";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Null"; }
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
ColumnsDescription getActualTableStructure(const Context & context) const override;
String structure;
};
}
......@@ -7,6 +7,7 @@
#include <Storages/System/StorageSystemNumbers.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include <DataTypes/DataTypesNumber.h>
#include "registerTableFunctions.h"
......@@ -18,8 +19,16 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <bool multithreaded>
ColumnsDescription TableFunctionNumbers<multithreaded>::getActualTableStructure(const Context & /*context*/) const
{
/// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418
return ColumnsDescription{{{"number", std::make_shared<DataTypeUInt64>()}}};
}
template <bool multithreaded>
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
......@@ -28,7 +37,6 @@ StoragePtr TableFunctionNumbers<multithreaded>::executeImpl(const ASTPtr & ast_f
if (arguments.size() != 1 && arguments.size() != 2)
throw Exception("Table function '" + getName() + "' requires 'length' or 'offset, length'.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
UInt64 offset = arguments.size() == 2 ? evaluateArgument(context, arguments[0]) : 0;
UInt64 length = arguments.size() == 2 ? evaluateArgument(context, arguments[1]) : evaluateArgument(context, arguments[0]);
......
......@@ -17,11 +17,14 @@ class TableFunctionNumbers : public ITableFunction
public:
static constexpr auto name = multithreaded ? "numbers_mt" : "numbers";
std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "SystemNumbers"; }
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
ColumnsDescription getActualTableStructure(const Context & context) const override;
};
......
......@@ -27,7 +27,8 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionRemote::parseArguments(const ASTPtr & ast_function, const Context & context)
{
ASTs & args_func = ast_function->children;
......@@ -44,7 +45,6 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
String cluster_description;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
String username;
String password;
......@@ -136,7 +136,6 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
for (auto ast : args)
setIdentifierSpecial(ast);
ClusterPtr cluster;
if (!cluster_name.empty())
{
/// Use an existing cluster from the main config
......@@ -189,30 +188,54 @@ StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & ast_function, const C
if (!remote_table_function_ptr && remote_table.empty())
throw Exception("The name of remote table cannot be empty", ErrorCodes::BAD_ARGUMENTS);
auto remote_table_id = StorageID::createEmpty();
remote_table_id.database_name = remote_database;
remote_table_id.table_name = remote_table;
auto structure_remote_table = getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
}
StoragePtr TableFunctionRemote::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const
{
/// StorageDistributed supports mismatching structure of remote table, so we can use outdated structure for CREATE ... AS remote(...)
/// without additional conversion in StorageTableFunctionProxy
if (cached_columns.empty())
cached_columns = getActualTableStructure(context);
assert(cluster);
StoragePtr res = remote_table_function_ptr
? StorageDistributed::createWithOwnCluster(
? StorageDistributed::create(
StorageID(getDatabaseName(), table_name),
structure_remote_table,
cached_columns,
ConstraintsDescription{},
remote_table_function_ptr,
cluster,
context)
: StorageDistributed::createWithOwnCluster(
String{},
context,
ASTPtr{},
String{},
String{},
false,
cluster)
: StorageDistributed::create(
StorageID(getDatabaseName(), table_name),
structure_remote_table,
remote_database,
remote_table,
cluster,
context);
cached_columns,
ConstraintsDescription{},
remote_table_id.database_name,
remote_table_id.table_name,
String{},
context,
ASTPtr{},
String{},
String{},
false,
cluster);
res->startup();
return res;
}
ColumnsDescription TableFunctionRemote::getActualTableStructure(const Context & context) const
{
assert(cluster);
return getStructureOfRemoteTable(*cluster, remote_table_id, context, remote_table_function_ptr);
}
TableFunctionRemote::TableFunctionRemote(const std::string & name_, bool secure_)
: name{name_}, secure{secure_}
......
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/StorageID.h>
namespace DB
......@@ -20,14 +22,24 @@ public:
std::string getName() const override { return name; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
bool needStructureConversion() const override { return false; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Distributed"; }
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
std::string name;
bool is_cluster_function;
std::string help_message;
bool secure;
ClusterPtr cluster;
StorageID remote_table_id = StorageID::createEmpty();
ASTPtr remote_table_function_ptr;
};
}
......@@ -21,7 +21,7 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, const Context & context)
{
/// Parse args
ASTs & args_func = ast_function->children;
......@@ -38,11 +38,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
String filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
String format;
String structure;
String access_key_id;
String secret_access_key;
filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() < 5)
{
......@@ -57,47 +53,38 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & ast_function, const Conte
structure = args[4]->as<ASTLiteral &>().value.safeGet<String>();
}
String compression_method;
if (args.size() == 4 || args.size() == 6)
compression_method = args.back()->as<ASTLiteral &>().value.safeGet<String>();
else
compression_method = "auto";
}
ColumnsDescription columns = parseColumnsListFromString(structure, context);
ColumnsDescription TableFunctionS3::getActualTableStructure(const Context & context) const
{
return parseColumnsListFromString(structure, context);
}
/// Create table
StoragePtr storage = getStorage(filename, access_key_id, secret_access_key, format, columns, const_cast<Context &>(context), table_name, compression_method);
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
Poco::URI uri (filename);
S3::URI s3_uri (uri);
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
StoragePtr storage = StorageS3::create(
s3_uri,
access_key_id,
secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
min_upload_part_size,
getActualTableStructure(context),
ConstraintsDescription{},
const_cast<Context &>(context),
compression_method);
storage->startup();
return storage;
}
StoragePtr TableFunctionS3::getStorage(
const String & source,
const String & access_key_id,
const String & secret_access_key,
const String & format,
const ColumnsDescription & columns,
Context & global_context,
const std::string & table_name,
const String & compression_method)
{
Poco::URI uri (source);
S3::URI s3_uri (uri);
UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
return StorageS3::create(
s3_uri,
access_key_id,
secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
min_upload_part_size,
columns,
ConstraintsDescription{},
global_context,
compression_method);
}
void registerTableFunctionS3(TableFunctionFactory & factory)
{
......
......@@ -22,24 +22,26 @@ public:
{
return name;
}
bool hasStaticStructure() const override { return true; }
protected:
StoragePtr executeImpl(
const ASTPtr & ast_function,
const Context & context,
const std::string & table_name) const override;
static StoragePtr getStorage(
const String & source,
const String & access_key_id,
const String & secret_access_key,
const String & format,
const ColumnsDescription & columns,
Context & global_context,
const std::string & table_name,
const String & compression_method);
ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "S3"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String filename;
String format;
String structure;
String access_key_id;
String secret_access_key;
String compression_method = "auto";
};
class TableFunctionCOS : public TableFunctionS3
......
......@@ -10,10 +10,12 @@
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const
{
Poco::URI uri(source);
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format, columns, ConstraintsDescription{}, global_context, compression_method);
return StorageURL::create(uri, StorageID(getDatabaseName(), table_name), format_, columns, ConstraintsDescription{},
global_context, compression_method_);
}
void registerTableFunctionURL(TableFunctionFactory & factory)
......
......@@ -21,7 +21,8 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const override;
const String & source, const String & format_, const ColumnsDescription & columns, Context & global_context,
const std::string & table_name, const String & compression_method_) const override;
const char * getStorageTypeName() const override { return "URL"; }
};
......
......@@ -62,8 +62,10 @@ static void parseAndInsertValues(MutableColumns & res_columns, const ASTs & args
}
}
StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionValues::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
{
ASTs & args_func = ast_function->children;
if (args_func.size() != 1)
......@@ -83,9 +85,18 @@ StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const C
"Got '{}' instead", getName(), args[0]->formatForErrorMessage()),
ErrorCodes::BAD_ARGUMENTS);
}
std::string structure = args[0]->as<ASTLiteral &>().value.safeGet<String>();
ColumnsDescription columns = parseColumnsListFromString(structure, context);
structure = args[0]->as<ASTLiteral &>().value.safeGet<String>();
}
ColumnsDescription TableFunctionValues::getActualTableStructure(const Context & context) const
{
return parseColumnsListFromString(structure, context);
}
StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
Block sample_block;
for (const auto & name_type : columns.getOrdinary())
......@@ -93,6 +104,8 @@ StoragePtr TableFunctionValues::executeImpl(const ASTPtr & ast_function, const C
MutableColumns res_columns = sample_block.cloneEmptyColumns();
ASTs & args = ast_function->children.at(0)->children;
/// Parsing other arguments as values and inserting them into columns
parseAndInsertValues(res_columns, args, sample_block, context);
......
......@@ -12,9 +12,15 @@ class TableFunctionValues : public ITableFunction
public:
static constexpr auto name = "values";
std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "Values"; }
ColumnsDescription getActualTableStructure(const Context & context) const override;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String structure;
};
......
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Storages/StorageView.h>
......@@ -16,24 +15,37 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
StoragePtr TableFunctionView::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
void TableFunctionView::parseArguments(const ASTPtr & ast_function, const Context & /*context*/)
{
if (const auto * function = ast_function->as<ASTFunction>())
const auto * function = ast_function->as<ASTFunction>();
if (function)
{
if (auto * select = function->tryGetQueryArgument())
{
auto sample = InterpreterSelectWithUnionQuery::getSampleBlock(function->arguments->children[0] /* ASTPtr */, context);
auto columns = ColumnsDescription(sample.getNamesAndTypesList());
ASTCreateQuery create;
create.select = select;
auto res = StorageView::create(StorageID(getDatabaseName(), table_name), create, columns);
res->startup();
return res;
create.set(create.select, select->clone());
return;
}
}
throw Exception("Table function '" + getName() + "' requires a query argument.", ErrorCodes::BAD_ARGUMENTS);
}
ColumnsDescription TableFunctionView::getActualTableStructure(const Context & context) const
{
assert(create.select);
assert(create.children.size() == 1);
assert(create.children[0]->as<ASTSelectWithUnionQuery>());
auto sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.children[0], context);
return ColumnsDescription(sample.getNamesAndTypesList());
}
StoragePtr TableFunctionView::executeImpl(const ASTPtr & /*ast_function*/, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
auto columns = getActualTableStructure(context);
auto res = StorageView::create(StorageID(getDatabaseName(), table_name), create, columns);
res->startup();
return res;
}
void registerTableFunctionView(TableFunctionFactory & factory)
{
factory.registerFunction<TableFunctionView>();
......
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Parsers/ASTCreateQuery.h>
#include <common/types.h>
namespace DB
{
......@@ -17,10 +17,13 @@ public:
static constexpr auto name = "view";
std::string getName() const override { return name; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "View"; }
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
ColumnsDescription getActualTableStructure(const Context & context) const override;
ASTCreateQuery create;
};
......
......@@ -3,8 +3,8 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Common/typeid_cast.h>
#include <Storages/System/StorageSystemZeros.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/Context.h>
#include "registerTableFunctions.h"
......@@ -18,8 +18,16 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
template <bool multithreaded>
ColumnsDescription TableFunctionZeros<multithreaded>::getActualTableStructure(const Context & /*context*/) const
{
/// NOTE: https://bugs.llvm.org/show_bug.cgi?id=47418
return ColumnsDescription{{{"zero", std::make_shared<DataTypeUInt8>()}}};
}
template <bool multithreaded>
StoragePtr TableFunctionZeros<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
StoragePtr TableFunctionZeros<multithreaded>::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
if (const auto * function = ast_function->as<ASTFunction>())
{
......
......@@ -17,11 +17,14 @@ class TableFunctionZeros : public ITableFunction
public:
static constexpr auto name = multithreaded ? "zeros_mt" : "zeros";
std::string getName() const override { return name; }
bool hasStaticStructure() const override { return true; }
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "SystemZeros"; }
UInt64 evaluateArgument(const Context & context, ASTPtr & argument) const;
ColumnsDescription getActualTableStructure(const Context & context) const override;
};
......
# See https://bugs.llvm.org/show_bug.cgi?id=47418
# leak:getActualTableStructure
......@@ -34,7 +34,7 @@ def test_single_file(started_cluster, cluster):
assert out == '1\ta\n2\tbb\n3\tccc\n'
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin');" \
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_1/shard1_replica1/1.bin');" \
"select * from t"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
......@@ -57,7 +57,7 @@ def test_two_files(started_cluster, cluster):
assert out == '0\t_\n1\ta\n2\tbb\n3\tccc\n'
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin');" \
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_2/shard1_replica1/{1,2,3,4}.bin');" \
"select * from t order by x"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
......@@ -77,7 +77,7 @@ def test_single_file_old(started_cluster, cluster):
assert out == '1\ta\n2\tbb\n3\tccc\n'
query = "create table t (dummy UInt32) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin');" \
query = "create table t (x UInt64, s String) engine = File('Distributed', '/var/lib/clickhouse/data/test/distr_3/default@not_existing:9000/1.bin');" \
"select * from t"
out = node.exec_in_container(['/usr/bin/clickhouse', 'local', '--stacktrace', '-q', query])
......
CREATE TABLE default.file\n(\n `n` Int8\n)\nENGINE = File(\'TSVWithNamesAndTypes\')
CREATE TABLE default.buffer\n(\n `n` Int8\n)\nENGINE = Buffer(\'default\', \'file\', 16, 10, 200, 10000, 1000000, 10000000, 1000000000)
CREATE TABLE default.merge\n(\n `n` Int8\n)\nENGINE = Merge(\'default\', \'distributed\')
CREATE TABLE default.merge_tf AS merge(\'default\', \'.*\')
CREATE TABLE default.merge_tf\n(\n `n` Int8\n) AS merge(\'default\', \'.*\')
CREATE TABLE default.distributed\n(\n `n` Int8\n)\nENGINE = Distributed(\'test_shard_localhost\', \'default\', \'file\')
CREATE TABLE default.distributed_tf AS cluster(\'test_shard_localhost\', \'default\', \'buffer\')
CREATE TABLE default.distributed_tf\n(\n `n` Int8\n) AS cluster(\'test_shard_localhost\', \'default\', \'buffer\')
CREATE TABLE default.url\n(\n `n` UInt64,\n `col` String\n)\nENGINE = URL(\'https://localhost:8443/?query=select+n,+_table+from+default.merge+format+CSV\', \'CSV\')
CREATE TABLE default.rich_syntax AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'default\', \'view\')))
CREATE TABLE default.rich_syntax\n(\n `n` Int64\n) AS remote(\'localhos{x|y|t}\', cluster(\'test_shard_localhost\', remote(\'127.0.0.{1..4}\', \'default\', \'view\')))
CREATE VIEW default.view\n(\n `n` Int64\n) AS\nSELECT toInt64(n) AS n\nFROM \n(\n SELECT toString(n) AS n\n FROM default.merge\n WHERE _table != \'qwerty\'\n ORDER BY _table ASC\n)\nUNION ALL\nSELECT *\nFROM default.file
CREATE DICTIONARY default.dict\n(\n `n` UInt64,\n `col` String DEFAULT \'42\'\n)\nPRIMARY KEY n\nSOURCE(CLICKHOUSE(HOST \'localhost\' PORT 9440 SECURE 1 USER \'default\' TABLE \'url\'))\nLIFETIME(MIN 0 MAX 1)\nLAYOUT(CACHE(SIZE_IN_CELLS 1))
16
CREATE TABLE test_01457.tf_remote\n(\n `n` Int8\n) AS remote(\'localhost\', \'default\', \'tmp\')
CREATE TABLE test_01457.tf_remote_explicit_structure\n(\n `n` UInt64\n) AS remote(\'localhost\', \'default\', \'tmp\')
CREATE TABLE test_01457.tf_numbers\n(\n `number` String\n) AS numbers(1)
CREATE TABLE test_01457.tf_merge\n(\n `n` Int8\n) AS merge(\'default\', \'tmp\')
42
0 Int8
0 Int8
0 UInt64
0 String
0 Int8
DROP DATABASE IF EXISTS test_01457;
CREATE DATABASE test_01457;
CREATE TABLE tmp (n Int8) ENGINE=Memory;
CREATE TABLE test_01457.tf_remote AS remote('localhost', currentDatabase(), 'tmp');
SHOW CREATE TABLE test_01457.tf_remote;
CREATE TABLE test_01457.tf_remote_explicit_structure (n UInt64) AS remote('localhost', currentDatabase(), 'tmp');
SHOW CREATE TABLE test_01457.tf_remote_explicit_structure;
CREATE TABLE test_01457.tf_numbers (number String) AS numbers(1);
SHOW CREATE TABLE test_01457.tf_numbers;
CREATE TABLE test_01457.tf_merge AS merge(currentDatabase(), 'tmp');
SHOW CREATE TABLE test_01457.tf_merge;
DROP TABLE tmp;
DETACH DATABASE test_01457;
ATTACH DATABASE test_01457;
CREATE TABLE tmp (n Int8) ENGINE=Memory;
INSERT INTO test_01457.tf_remote_explicit_structure VALUES ('42');
SELECT * FROM tmp;
TRUNCATE TABLE tmp;
INSERT INTO test_01457.tf_remote VALUES (0);
SELECT (*,).1 AS c, toTypeName(c) FROM tmp;
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_remote;
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_remote_explicit_structure;
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_numbers;
SELECT (*,).1 AS c, toTypeName(c) FROM test_01457.tf_merge;
DROP DATABASE test_01457;
CREATE TABLE default.table_from_remote AS remote(\'localhost\', \'system\', \'numbers\')
CREATE TABLE default.table_from_remote AS remote(\'localhost\', \'system\', \'numbers\')
CREATE TABLE default.table_from_numbers AS numbers(1000)
CREATE TABLE default.table_from_numbers AS numbers(1000)
CREATE TABLE default.table_from_remote\n(\n `number` UInt64\n) AS remote(\'localhost\', \'system\', \'numbers\')
CREATE TABLE default.table_from_remote\n(\n `number` UInt64,\n `col` UInt8\n) AS remote(\'localhost\', \'system\', \'numbers\')
CREATE TABLE default.table_from_numbers\n(\n `number` UInt64\n) AS numbers(1000)
CREATE TABLE default.table_from_numbers\n(\n `number` UInt64\n) AS numbers(1000)
CREATE TABLE default.table_from_select\n(\n `number` UInt64\n)\nENGINE = MergeTree()\nORDER BY tuple()\nSETTINGS index_granularity = 8192
CREATE TABLE default.table_from_select\n(\n `number` UInt64,\n `col` UInt8\n)\nENGINE = MergeTree()\nORDER BY tuple()\nSETTINGS index_granularity = 8192
1
......@@ -6,7 +6,7 @@ CREATE TABLE table_from_remote AS remote('localhost', 'system', 'numbers');
SHOW CREATE TABLE table_from_remote;
ALTER TABLE table_from_remote ADD COLUMN col UInt8; --{serverError 48}
ALTER TABLE table_from_remote ADD COLUMN col UInt8;
SHOW CREATE TABLE table_from_remote;
......@@ -26,8 +26,6 @@ ALTER TABLE table_from_select ADD COLUMN col UInt8;
SHOW CREATE TABLE table_from_select;
SELECT 1;
DROP TABLE IF EXISTS table_from_remote;
DROP TABLE IF EXISTS table_from_select;
DROP TABLE IF EXISTS table_from_numbers;
......@@ -141,6 +141,7 @@
01460_DistributedFilesToInsert
01474_executable_dictionary
01474_bad_global_join
01457_create_as_table_function_structure
01473_event_time_microseconds
01461_query_start_time_microseconds
01455_shard_leaf_max_rows_bytes_to_read
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册