提交 70dc82f9 编写于 作者: A Alexander Tokmakov

add StorageID to ASTInsertQuery

上级 fa27ecf3
......@@ -252,7 +252,7 @@ void MySQLHandler::comFieldList(ReadBuffer & payload)
ComFieldList packet;
packet.readPayload(payload);
String database = connection_context.getCurrentDatabase();
StoragePtr tablePtr = connection_context.getTable(database, packet.table);
StoragePtr tablePtr = DatabaseCatalog::instance().getTable({database, packet.table});
for (const NameAndTypePair & column: tablePtr->getColumns().getAll())
{
ColumnDefinition column_definition(
......
......@@ -463,11 +463,11 @@ void TCPHandler::processInsertQuery(const Settings & connection_settings)
/// Send ColumnsDescription for insertion table
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
{
const auto & db_and_table = query_context->getInsertionTable();
const auto & table_id = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
{
if (!db_and_table.second.empty())
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
if (!table_id.empty())
sendTableColumns(DatabaseCatalog::instance().getTable(table_id)->getColumns());
}
}
......
......@@ -19,12 +19,12 @@ namespace ErrorCodes
CheckConstraintsBlockOutputStream::CheckConstraintsBlockOutputStream(
const String & table_,
const StorageID & table_id_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
const Context & context_)
: table(table_),
: table_id(table_id_),
output(output_),
header(header_),
constraints(constraints_),
......@@ -62,7 +62,7 @@ void CheckConstraintsBlockOutputStream::write(const Block & block)
std::stringstream exception_message;
exception_message << "Constraint " << backQuote(constraints.constraints[i]->name)
<< " for table " << backQuote(table)
<< " for table " << table_id.getNameForLogs()
<< " is violated at row " << (rows_written + row_idx + 1)
<< ". Expression: (" << serializeAST(*(constraints.constraints[i]->expr), true) << ")"
<< ". Column values";
......
......@@ -2,6 +2,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageID.h>
namespace DB
......@@ -15,7 +16,7 @@ class CheckConstraintsBlockOutputStream : public IBlockOutputStream
{
public:
CheckConstraintsBlockOutputStream(
const String & table_,
const StorageID & table_,
const BlockOutputStreamPtr & output_,
const Block & header_,
const ConstraintsDescription & constraints_,
......@@ -30,7 +31,7 @@ public:
void writeSuffix() override;
private:
String table;
StorageID table_id;
BlockOutputStreamPtr output;
Block header;
const ConstraintsDescription constraints;
......
......@@ -56,9 +56,9 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->table.empty() && !input_function)
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->table_id.empty() && !input_function)
{
StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table);
StoragePtr storage = DatabaseCatalog::instance().getTable(ast_insert_query->table_id);
auto column_defaults = storage->getColumns().getDefaults();
if (!column_defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, column_defaults, context);
......
......@@ -61,8 +61,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
query = materialized_view->getInnerQuery();
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = inner_table_id.database_name;
insert->table = inner_table_id.table_name;
insert->table_id = inner_table_id;
/// Get list of columns we get from select query.
auto header = InterpreterSelectQuery(query, *views_context, SelectQueryOptions().analyze())
......
......@@ -160,7 +160,7 @@ private:
using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query.
std::pair<String, String> insertion_table; /// Saved insertion table in query context
StorageID insertion_table = StorageID::createEmpty(); /// Saved insertion table in query context
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
/// Thus, used in HTTP interface. If not specified - then some globally default format is used.
......@@ -297,6 +297,7 @@ public:
ResolveCurrentDatabase = 2u, /// Use current database
ResolveOrdinary = ResolveGlobal | ResolveCurrentDatabase, /// If database name is not specified, use current database
ResolveExternal = 4u, /// Try get external table
ResolveExternalOrGlobal = ResolveGlobal | ResolveExternal, /// If external table doesn't exist, database name must be specifies
ResolveAll = ResolveExternal | ResolveOrdinary /// If database name is not specified, try get external table,
/// if external table not found use current database.
};
......@@ -332,8 +333,8 @@ public:
void killCurrentQuery();
void setInsertionTable(std::pair<String, String> && db_and_table) { insertion_table = db_and_table; }
const std::pair<String, String> & getInsertionTable() const { return insertion_table; }
void setInsertionTable(StorageID db_and_table) { insertion_table = std::move(db_and_table); }
const StorageID & getInsertionTable() const { return insertion_table; }
String getDefaultFormat() const; /// If default_format is not specified, some global default format is returned.
void setDefaultFormat(const String & name);
......
......@@ -662,11 +662,7 @@ BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create)
&& !create.is_view && !create.is_live_view && (!create.is_materialized_view || create.is_populate))
{
auto insert = std::make_shared<ASTInsertQuery>();
if (!create.temporary)
insert->database = create.database;
insert->table = create.table;
insert->table_id = context.getSessionContext().resolveStorageID({create.database, create.table, create.uuid}, Context::ResolveExternalOrGlobal);
insert->select = create.select->clone();
if (create.temporary && !context.getSessionContext().hasQueryContext())
......
......@@ -56,10 +56,8 @@ StoragePtr InterpreterInsertQuery::getTable(ASTInsertQuery & query)
return table_function_ptr->execute(query.table_function, context, table_function_ptr->getName());
}
/// Into what table to write.
if (query.database.empty() && !context.isExternalTableExist(query.table))
query.database = context.getCurrentDatabase();
return context.getTable(query.database, query.table);
query.table_id = context.resolveStorageID(query.table_id);
return DatabaseCatalog::instance().getTable(query.table_id);
}
Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table)
......@@ -83,7 +81,7 @@ Block InterpreterInsertQuery::getSampleBlock(const ASTInsertQuery & query, const
/// The table does not have a column with that name
if (!table_sample.has(current_name))
throw Exception("No such column " + current_name + " in table " + query.table, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
throw Exception("No such column " + current_name + " in table " + query.table_id.getNameForLogs(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
if (!allow_materialized && !table_sample_non_materialized.has(current_name))
throw Exception("Cannot insert column " + current_name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
......@@ -107,8 +105,8 @@ BlockIO InterpreterInsertQuery::execute()
auto table_lock = table->lockStructureForShare(true, context.getInitialQueryId());
auto query_sample_block = getSampleBlock(query, table);
if (!query.table.empty() && !query.database.empty() /* always allow access to temporary tables */)
context.checkAccess(AccessType::INSERT, query.database, query.table, query_sample_block.getNames());
if (!query.table_function)
context.checkAccess(AccessType::INSERT, query.table_id.database_name, query.table_id.table_name, query_sample_block.getNames());
BlockInputStreams in_streams;
size_t out_streams_size = 1;
......@@ -159,7 +157,7 @@ BlockIO InterpreterInsertQuery::execute()
out, query_sample_block, out->getHeader(), table->getColumns().getDefaults(), context);
if (const auto & constraints = table->getConstraints(); !constraints.empty())
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table,
out = std::make_shared<CheckConstraintsBlockOutputStream>(query.table_id,
out, query_sample_block, table->getConstraints(), context);
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
......@@ -207,10 +205,9 @@ BlockIO InterpreterInsertQuery::execute()
}
std::pair<String, String> InterpreterInsertQuery::getDatabaseTable() const
StorageID InterpreterInsertQuery::getDatabaseTable() const
{
const auto & query = query_ptr->as<ASTInsertQuery &>();
return {query.database, query.table};
return query_ptr->as<ASTInsertQuery &>().table_id;
}
}
......@@ -29,7 +29,7 @@ public:
*/
BlockIO execute() override;
std::pair<String, String> getDatabaseTable() const;
StorageID getDatabaseTable() const;
private:
StoragePtr getTable(ASTInsertQuery & query);
......
......@@ -339,8 +339,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
/// This is needed to support DEFAULT-columns in table.
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = table_id.database_name;
insert->table = table_id.table_name;
insert->table_id = table_id;
ASTPtr query_ptr(insert.release());
InterpreterInsertQuery interpreter(query_ptr, context);
......
......@@ -330,9 +330,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto db_table = insert_interpreter->getDatabaseTable();
if (!db_table.second.empty())
context.setInsertionTable(std::move(db_table));
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context.setInsertionTable(std::move(table_id));
}
if (process_list_entry)
......
......@@ -2,6 +2,7 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTFunction.h>
#include <Common/quoteString.h>
#include <IO/WriteHelpers.h>
namespace DB
......@@ -25,7 +26,8 @@ void ASTInsertQuery::formatImpl(const FormatSettings & settings, FormatState & s
}
else
settings.ostr << (settings.hilite ? hilite_none : "")
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table);
<< (!table_id.database_name.empty() ? backQuoteIfNeed(table_id.database_name) + "." : "") << backQuoteIfNeed(table_id.table_name)
<< (table_id.hasUUID() ? " UUID " : "") << (table_id.hasUUID() ? quoteString(toString(table_id.uuid)) : "");
if (columns)
{
......
#pragma once
#include <Parsers/IAST.h>
#include <Storages/StorageID.h>
namespace DB
{
......@@ -12,8 +12,7 @@ namespace DB
class ASTInsertQuery : public IAST
{
public:
String database;
String table;
StorageID table_id = StorageID::createEmpty();
ASTPtr columns;
String format;
ASTPtr select;
......@@ -31,7 +30,7 @@ public:
void tryFindInputFunction(ASTPtr & input_function) const;
/** Get the text that identifies this element. */
String getID(char delim) const override { return "InsertQuery" + (delim + database) + delim + table; }
String getID(char delim) const override { return "InsertQuery" + (delim + table_id.database_name) + delim + table_id.table_name; }
ASTPtr clone() const override
{
......
......@@ -151,8 +151,8 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
else
{
tryGetIdentifierNameInto(database, query->database);
tryGetIdentifierNameInto(table, query->table);
tryGetIdentifierNameInto(database, query->table_id.database_name);
tryGetIdentifierNameInto(table, query->table_id.table_name);
}
tryGetIdentifierNameInto(format, query->format);
......
......@@ -353,8 +353,7 @@ bool StorageKafka::streamToViews()
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
insert->database = table_id.database_name;
insert->table = table_id.table_name;
insert->table_id = table_id;
const Settings & settings = global_context.getSettingsRef();
size_t block_size = max_block_size;
......
......@@ -643,9 +643,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
auto insert = std::make_shared<ASTInsertQuery>();
insert->database = destination_id.database_name;
insert->table = destination_id.table_name;
insert->table_id = destination_id;
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
* This will support some of the cases (but not all) when the table structure does not match.
......
......@@ -110,8 +110,7 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
ASTPtr createInsertToRemoteTableQuery(const std::string & database, const std::string & table, const Block & sample_block_non_materialized)
{
auto query = std::make_shared<ASTInsertQuery>();
query->database = database;
query->table = table;
query->table_id = StorageID(database, table);
auto columns = std::make_shared<ASTExpressionList>();
query->columns = columns;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册