diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh index 9af401238a36a71d887b272a5724041d8c5cef6d..766fec76179fa0e4ddf70f25bab089b533f6e2d4 100755 --- a/docker/test/fuzzer/run-fuzzer.sh +++ b/docker/test/fuzzer/run-fuzzer.sh @@ -190,7 +190,7 @@ case "$stage" in # Lost connection to the server. This probably means that the server died # with abort. echo "failure" > status.txt - if ! grep -ao "Received signal.*\|Logical error.*\|Assertion.*failed\|Failed assertion.*\|.*runtime error: .*\|.*is located.*\|SUMMARY: MemorySanitizer:.*\|SUMMARY: ThreadSanitizer:.*" server.log > description.txt + if ! grep -ao "Received signal.*\|Logical error.*\|Assertion.*failed\|Failed assertion.*\|.*runtime error: .*\|.*is located.*\|SUMMARY: MemorySanitizer:.*\|SUMMARY: ThreadSanitizer:.*\|.*_LIBCPP_ASSERT.*" server.log > description.txt then echo "Lost connection to server. See the logs." > description.txt fi diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dba9385fe27056b64e90840de35257dd8a439a15..86db7742c9776f932e2e59279ec9921611da3fc7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -100,8 +100,8 @@ endif() list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD}) list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON}) -list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp) -list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h) +list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/replicate.cpp) +list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h) list (APPEND dbms_sources AggregateFunctions/AggregateFunctionFactory.cpp diff --git a/src/Core/ExternalTable.cpp b/src/Core/ExternalTable.cpp index 722bc5705c3c3b0d3281084ac141cb4cae33c8a9..767ed9599509c67432b11741bda31721574dcc2b 100644 --- a/src/Core/ExternalTable.cpp +++ b/src/Core/ExternalTable.cpp @@ -1,18 +1,24 @@ #include +#include #include #include +#include +#include +#include #include -#include +#include #include #include #include -#include -#include + #include #include #include +#include + #include #include +#include #include @@ -39,7 +45,7 @@ ExternalTableDataPtr BaseExternalTable::getData(const Context & context) return data; } -void BaseExternalTable::clean() +void BaseExternalTable::clear() { name.clear(); file.clear(); @@ -49,17 +55,6 @@ void BaseExternalTable::clean() read_buffer.reset(); } -/// Function for debugging information output -void BaseExternalTable::write() -{ - std::cerr << "file " << file << std::endl; - std::cerr << "name " << name << std::endl; - std::cerr << "format " << format << std::endl; - std::cerr << "structure: \n"; - for (const auto & elem : structure) - std::cerr << '\t' << elem.first << ' ' << elem.second << std::endl; -} - void BaseExternalTable::parseStructureFromStructureField(const std::string & argument) { std::vector vals; @@ -182,7 +177,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header, executor->execute(/*num_threads = */ 1); /// We are ready to receive the next file, for this we clear all the information received - clean(); + clear(); } } diff --git a/src/Core/ExternalTable.h b/src/Core/ExternalTable.h index f26af1cc6ca4b54113d063cb2f166690ba6622d8..0d8e0aaf8acd45ec0d478e70637c49ffa5aa6aee 100644 --- a/src/Core/ExternalTable.h +++ b/src/Core/ExternalTable.h @@ -61,10 +61,7 @@ public: protected: /// Clear all accumulated information - void clean(); - - /// Function for debugging information output - void write(); + void clear(); /// Construct the `structure` vector from the text field `structure` virtual void parseStructureFromStructureField(const std::string & argument); diff --git a/src/DataStreams/AddingDefaultBlockOutputStream.cpp b/src/DataStreams/AddingDefaultBlockOutputStream.cpp index 74300a371fb983c62bb3d3c73747b064c1e02e03..db1542801d6605663b32fc21c78dbab3d7b6f16e 100644 --- a/src/DataStreams/AddingDefaultBlockOutputStream.cpp +++ b/src/DataStreams/AddingDefaultBlockOutputStream.cpp @@ -1,13 +1,27 @@ #include #include +#include namespace DB { +AddingDefaultBlockOutputStream::AddingDefaultBlockOutputStream( + const BlockOutputStreamPtr & output_, + const Block & header_, + const ColumnsDescription & columns_, + const Context & context_) + : output(output_), header(header_) +{ + auto dag = addMissingDefaults(header_, output->getHeader().getNamesAndTypesList(), columns_, context_); + adding_defaults_actions = std::make_shared(std::move(dag)); +} + void AddingDefaultBlockOutputStream::write(const Block & block) { - output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), columns, context)); + auto copy = block; + adding_defaults_actions->execute(copy); + output->write(copy); } void AddingDefaultBlockOutputStream::flush() diff --git a/src/DataStreams/AddingDefaultBlockOutputStream.h b/src/DataStreams/AddingDefaultBlockOutputStream.h index 5b46c533f7f616bb5924f8e150713612d1411e2b..5fbbe2aed6045ef23f86da48a44334c911a0dda9 100644 --- a/src/DataStreams/AddingDefaultBlockOutputStream.h +++ b/src/DataStreams/AddingDefaultBlockOutputStream.h @@ -8,6 +8,9 @@ namespace DB { +class ExpressionActions; +using ExpressionActionsPtr = std::shared_ptr; + class Context; /** This stream adds three types of columns into block @@ -22,13 +25,8 @@ public: AddingDefaultBlockOutputStream( const BlockOutputStreamPtr & output_, const Block & header_, - const Block & output_block_, const ColumnsDescription & columns_, - const Context & context_) - : output(output_), header(header_), output_block(output_block_), - columns(columns_), context(context_) - { - } + const Context & context_); Block getHeader() const override { return header; } void write(const Block & block) override; @@ -41,10 +39,7 @@ public: private: BlockOutputStreamPtr output; const Block header; - /// Blocks after this stream should have this structure - const Block output_block; - const ColumnsDescription columns; - const Context & context; + ExpressionActionsPtr adding_defaults_actions; }; diff --git a/src/DataStreams/AddingDefaultsBlockInputStream.cpp b/src/DataStreams/AddingDefaultsBlockInputStream.cpp index 160d1b4fb7631ef5800b7eafc437b82cb244a495..4b8dcff18705aa12f1a1a60752ec9886b6dc4f2d 100644 --- a/src/DataStreams/AddingDefaultsBlockInputStream.cpp +++ b/src/DataStreams/AddingDefaultsBlockInputStream.cpp @@ -171,7 +171,12 @@ Block AddingDefaultsBlockInputStream::readImpl() if (!evaluate_block.columns()) evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared(), "_dummy"}); - evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false); + auto dag = evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false); + if (dag) + { + auto actions = std::make_shared(std::move(dag)); + actions->execute(evaluate_block); + } std::unordered_map mixed_columns; diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 86cf12fbf68aa1a94c782e12bdb8c8090622dc84..f7f32cf9b6f5fccbb5ef70258917797424106647 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Functions/array/arrayDifference.cpp b/src/Functions/array/arrayDifference.cpp index c02533c2564866b3c68f86447a99c8cbb1f2827b..2c71c58867f64a5f0d024f9688d95977e44c48db 100644 --- a/src/Functions/array/arrayDifference.cpp +++ b/src/Functions/array/arrayDifference.cpp @@ -83,9 +83,9 @@ struct ArrayDifferenceImpl } res_ptr = ColumnArray::create(std::move(res_nested), array.getOffsetsPtr()); return true; - } + static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped) { ColumnPtr res; @@ -107,7 +107,6 @@ struct ArrayDifferenceImpl else throw Exception("Unexpected column for arrayDifference: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN); } - }; struct NameArrayDifference { static constexpr auto name = "arrayDifference"; }; diff --git a/src/Functions/materialize.h b/src/Functions/materialize.h index ccdbe455c34f203d7c33636101a35fd85c82d083..5b06ac36da7910b4ca11d48490f525af8b7bdeef 100644 --- a/src/Functions/materialize.h +++ b/src/Functions/materialize.h @@ -27,6 +27,8 @@ public: return name; } + bool useDefaultImplementationForLowCardinalityColumns() const override { return false; } + size_t getNumberOfArguments() const override { return 1; diff --git a/src/Functions/replicate.cpp b/src/Functions/replicate.cpp index adbb37a7c911aff6288a42655e25161f2b97bf46..ca391bec6ce786548e0691fe4c13eb9269cfd57a 100644 --- a/src/Functions/replicate.cpp +++ b/src/Functions/replicate.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -11,60 +12,50 @@ namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int ILLEGAL_COLUMN; + extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION; } -namespace +DataTypePtr FunctionReplicate::getReturnTypeImpl(const DataTypes & arguments) const { + if (arguments.size() < 2) + throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, + "Function {} expect at least two arguments, got {}", getName(), arguments.size()); -/** Creates an array, multiplying the column (the first argument) by the number of elements in the array (the second argument). - */ -class FunctionReplicate : public IFunction -{ -public: - static constexpr auto name = "replicate"; - - static FunctionPtr create(const Context &) - { - return std::make_shared(); - } - - String getName() const override - { - return name; - } - - size_t getNumberOfArguments() const override + for (size_t i = 1; i < arguments.size(); ++i) { - return 2; - } - - bool useDefaultImplementationForNulls() const override { return false; } - - DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override - { - const DataTypeArray * array_type = checkAndGetDataType(arguments[1].get()); + const DataTypeArray * array_type = checkAndGetDataType(arguments[i].get()); if (!array_type) - throw Exception("Second argument for function " + getName() + " must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - return std::make_shared(arguments[0]); + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, + "Argument {} for function {} must be array.", + i + 1, getName()); } + return std::make_shared(arguments[0]); +} + +ColumnPtr FunctionReplicate::executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const +{ + ColumnPtr first_column = arguments[0].column; + ColumnPtr offsets; - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override + for (size_t i = 1; i < arguments.size(); ++i) { - ColumnPtr first_column = arguments[0].column; - const ColumnArray * array_column = checkAndGetColumn(arguments[1].column.get()); + const ColumnArray * array_column = checkAndGetColumn(arguments[i].column.get()); ColumnPtr temp_column; if (!array_column) { - const auto * const_array_column = checkAndGetColumnConst(arguments[1].column.get()); + const auto * const_array_column = checkAndGetColumnConst(arguments[i].column.get()); if (!const_array_column) throw Exception("Unexpected column for replicate", ErrorCodes::ILLEGAL_COLUMN); temp_column = const_array_column->convertToFullColumn(); array_column = checkAndGetColumn(temp_column.get()); } - return ColumnArray::create(first_column->replicate(array_column->getOffsets())->convertToFullColumnIfConst(), array_column->getOffsetsPtr()); + + if (!offsets || offsets->empty()) + offsets = array_column->getOffsetsPtr(); } -}; + const auto & offsets_data = assert_cast(*offsets).getData(); + return ColumnArray::create(first_column->replicate(offsets_data)->convertToFullColumnIfConst(), offsets); } void registerFunctionReplicate(FunctionFactory & factory) diff --git a/src/Functions/replicate.h b/src/Functions/replicate.h new file mode 100644 index 0000000000000000000000000000000000000000..9a33951b2a31f9f527123e02009e3b1556376599 --- /dev/null +++ b/src/Functions/replicate.h @@ -0,0 +1,40 @@ +#pragma once +#include + +namespace DB +{ + +class Context; + +/// Creates an array, multiplying the column (the first argument) by the number of elements in the array (the second argument). +/// Function may accept more then two arguments. If so, the first array with non-empty offsets is chosen. +class FunctionReplicate : public IFunction +{ +public: + static constexpr auto name = "replicate"; + + static FunctionPtr create(const Context &) + { + return std::make_shared(); + } + + String getName() const override + { + return name; + } + + size_t getNumberOfArguments() const override + { + return 0; + } + + bool isVariadic() const override { return true; } + + bool useDefaultImplementationForNulls() const override { return false; } + + DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override; + + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override; +}; + +} diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 8d2613372168aa7b5135bd11005aa3ff3eaae67f..12942371d4f753b301ac58594f6b898b78f27d20 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -55,10 +55,10 @@ ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_) } } -ActionsDAG::Node & ActionsDAG::addNode(Node node, bool can_replace) +ActionsDAG::Node & ActionsDAG::addNode(Node node, bool can_replace, bool add_to_index) { auto it = index.find(node.result_name); - if (it != index.end() && !can_replace) + if (it != index.end() && !can_replace && add_to_index) throw Exception("Column '" + node.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN); auto & res = nodes.emplace_back(std::move(node)); @@ -66,7 +66,8 @@ ActionsDAG::Node & ActionsDAG::addNode(Node node, bool can_replace) if (res.type == ActionType::INPUT) inputs.emplace_back(&res); - index.replace(&res); + if (add_to_index) + index.replace(&res); return res; } @@ -100,7 +101,7 @@ const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column, bool return addNode(std::move(node), can_replace); } -const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, bool can_replace) +const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, bool can_replace, bool materialize) { if (!column.column) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add column {} because it is nullptr", column.name); @@ -111,7 +112,22 @@ const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, boo node.result_name = std::move(column.name); node.column = std::move(column.column); - return addNode(std::move(node), can_replace); + auto * res = &addNode(std::move(node), can_replace, !materialize); + + if (materialize) + { + auto & name = res->result_name; + + FunctionOverloadResolverPtr func_builder_materialize = + std::make_shared( + std::make_unique( + std::make_shared())); + + res = &addFunction(func_builder_materialize, {res}, {}, true, false); + res = &addAlias(*res, name, true); + } + + return *res; } const ActionsDAG::Node & ActionsDAG::addAlias(const std::string & name, std::string alias, bool can_replace) @@ -152,7 +168,8 @@ const ActionsDAG::Node & ActionsDAG::addFunction( const FunctionOverloadResolverPtr & function, const Names & argument_names, std::string result_name, - const Context & context [[maybe_unused]]) + const Context & context [[maybe_unused]], + bool can_replace) { const auto & all_settings = context.getSettingsRef(); settings.max_temporary_columns = all_settings.max_temporary_columns; @@ -171,14 +188,15 @@ const ActionsDAG::Node & ActionsDAG::addFunction( for (const auto & name : argument_names) children.push_back(&getNode(name)); - return addFunction(function, children, std::move(result_name), false); + return addFunction(function, children, std::move(result_name), can_replace); } ActionsDAG::Node & ActionsDAG::addFunction( const FunctionOverloadResolverPtr & function, Inputs children, std::string result_name, - bool can_replace) + bool can_replace, + bool add_to_index) { size_t num_arguments = children.size(); @@ -258,7 +276,7 @@ ActionsDAG::Node & ActionsDAG::addFunction( node.result_name = std::move(result_name); - return addNode(std::move(node), can_replace); + return addNode(std::move(node), can_replace, add_to_index); } @@ -637,6 +655,26 @@ bool ActionsDAG::trivial() const return true; } +void ActionsDAG::addMaterializingOutputActions() +{ + FunctionOverloadResolverPtr func_builder_materialize = + std::make_shared( + std::make_unique( + std::make_shared())); + + Index new_index; + std::vector index_nodes(index.begin(), index.end()); + for (auto * node : index_nodes) + { + auto & name = node->result_name; + node = &addFunction(func_builder_materialize, {node}, {}, true, false); + node = &addAlias(*node, name, true); + new_index.insert(node); + } + + index.swap(new_index); +} + ActionsDAGPtr ActionsDAG::makeConvertingActions( const ColumnsWithTypeAndName & source, const ColumnsWithTypeAndName & result, diff --git a/src/Interpreters/ActionsDAG.h b/src/Interpreters/ActionsDAG.h index e13a9bd62b3a27f57ae317cf42b302e935d30ce3..3c8778e239a07c5595e5b163edc5e043adb6087b 100644 --- a/src/Interpreters/ActionsDAG.h +++ b/src/Interpreters/ActionsDAG.h @@ -198,14 +198,15 @@ public: const Node & addInput(std::string name, DataTypePtr type, bool can_replace = false); const Node & addInput(ColumnWithTypeAndName column, bool can_replace = false); - const Node & addColumn(ColumnWithTypeAndName column, bool can_replace = false); + const Node & addColumn(ColumnWithTypeAndName column, bool can_replace = false, bool materialize = false); const Node & addAlias(const std::string & name, std::string alias, bool can_replace = false); const Node & addArrayJoin(const std::string & source_name, std::string result_name); const Node & addFunction( const FunctionOverloadResolverPtr & function, const Names & argument_names, std::string result_name, - const Context & context); + const Context & context, + bool can_replace = false); /// Call addAlias several times. void addAliases(const NamesWithAliases & aliases); @@ -232,6 +233,9 @@ public: ActionsDAGPtr clone() const; + /// For apply materialize() function for every output. + /// Also add aliases so the result names remain unchanged. + void addMaterializingOutputActions(); enum class MatchColumnsMode { @@ -275,7 +279,7 @@ public: SplitResult splitActionsForFilter(const std::string & column_name) const; private: - Node & addNode(Node node, bool can_replace = false); + Node & addNode(Node node, bool can_replace = false, bool add_to_index = true); Node & getNode(const std::string & name); Node & addAlias(Node & child, std::string alias, bool can_replace); @@ -283,7 +287,8 @@ private: const FunctionOverloadResolverPtr & function, Inputs children, std::string result_name, - bool can_replace); + bool can_replace, + bool add_to_index = true); ActionsDAGPtr cloneEmpty() const { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 5c99d39dc2e072ab72fc0ad577f7df75b0049954..ca4a313da62a4448914c853e0ee01d3f7ac4815b 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -331,7 +331,7 @@ struct ContextShared mutable std::optional external_models_loader; String default_profile_name; /// Default profile name used for default values. String system_profile_name; /// Profile used by system processes - AccessControlManager access_control_manager; + std::unique_ptr access_control_manager; mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks. mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. ProcessList process_list; /// Executing queries at the moment. @@ -388,7 +388,8 @@ struct ContextShared Context::ConfigReloadCallback config_reload_callback; ContextShared() - : macros(std::make_unique()) + : access_control_manager(std::make_unique()) + , macros(std::make_unique()) { /// TODO: make it singleton (?) static std::atomic num_calls{0}; @@ -434,6 +435,7 @@ struct ContextShared /// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference). /// TODO: Get rid of this. + access_control_manager.reset(); system_logs.reset(); embedded_dictionaries.reset(); external_dictionaries_loader.reset(); @@ -640,7 +642,7 @@ void Context::setConfig(const ConfigurationPtr & config) { auto lock = getLock(); shared->config = config; - shared->access_control_manager.setExternalAuthenticatorsConfig(*shared->config); + shared->access_control_manager->setExternalAuthenticatorsConfig(*shared->config); } const Poco::Util::AbstractConfiguration & Context::getConfigRef() const @@ -652,25 +654,25 @@ const Poco::Util::AbstractConfiguration & Context::getConfigRef() const AccessControlManager & Context::getAccessControlManager() { - return shared->access_control_manager; + return *shared->access_control_manager; } const AccessControlManager & Context::getAccessControlManager() const { - return shared->access_control_manager; + return *shared->access_control_manager; } void Context::setExternalAuthenticatorsConfig(const Poco::Util::AbstractConfiguration & config) { auto lock = getLock(); - shared->access_control_manager.setExternalAuthenticatorsConfig(config); + shared->access_control_manager->setExternalAuthenticatorsConfig(config); } void Context::setUsersConfig(const ConfigurationPtr & config) { auto lock = getLock(); shared->users_config = config; - shared->access_control_manager.setUsersConfig(*shared->users_config); + shared->access_control_manager->setUsersConfig(*shared->users_config); } ConfigurationPtr Context::getUsersConfig() diff --git a/src/Interpreters/GlobalSubqueriesVisitor.h b/src/Interpreters/GlobalSubqueriesVisitor.h index cde59d1e6c99568858af7e48bcb329c78c405c73..80d133ebea615cf4c0844b80199ca07563929c6e 100644 --- a/src/Interpreters/GlobalSubqueriesVisitor.h +++ b/src/Interpreters/GlobalSubqueriesVisitor.h @@ -12,7 +12,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 55c4d19206fb5493380627edaecaeecd6974982c..6b488a3edc3208a8ecdd883073d59b60186ee254 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -166,7 +166,7 @@ BlockIO InterpreterInsertQuery::execute() BlockIO res; StoragePtr table = getTable(query); - auto table_lock = table->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout); + auto table_lock = table->lockForShare(context.getInitialQueryId(), settings.lock_acquire_timeout); auto metadata_snapshot = table->getInMemoryMetadataPtr(); auto query_sample_block = getSampleBlock(query, table, metadata_snapshot); @@ -289,7 +289,7 @@ BlockIO InterpreterInsertQuery::execute() new_settings.max_threads = std::max(1, settings.max_insert_threads); - if (settings.min_insert_block_size_rows) + if (settings.min_insert_block_size_rows && table->prefersLargeBlocks()) new_settings.max_block_size = settings.min_insert_block_size_rows; Context new_context = context; @@ -341,20 +341,22 @@ BlockIO InterpreterInsertQuery::execute() /// Actually we don't know structure of input blocks from query/table, /// because some clients break insertion protocol (columns != header) out = std::make_shared( - out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns(), context); + out, query_sample_block, metadata_snapshot->getColumns(), context); /// It's important to squash blocks as early as possible (before other transforms), /// because other transforms may work inefficient if block size is small. /// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side. /// Client-side bufferization might cause excessive timeouts (especially in case of big blocks). - if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch) + if (!(settings.insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch) { + bool table_prefers_large_blocks = table->prefersLargeBlocks(); + out = std::make_shared( out, out->getHeader(), - context.getSettingsRef().min_insert_block_size_rows, - context.getSettingsRef().min_insert_block_size_bytes); + table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size, + table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0); } auto out_wrapper = std::make_shared(out); diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 86706701141f8a6f4fb7f2088a5f4ddb928c390b..0e9683de95f10f17bfcfae6a07bd226dfb999507 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -606,7 +606,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &) context.checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id); if (auto * storage_distributed = dynamic_cast(DatabaseCatalog::instance().getTable(table_id, context).get())) - storage_distributed->flushClusterNodesAllData(); + storage_distributed->flushClusterNodesAllData(context); else throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS); } diff --git a/src/Interpreters/RewriteCountVariantsVisitor.cpp b/src/Interpreters/RewriteCountVariantsVisitor.cpp index f7cce82a478dfa3221b6bae4a3426545676a3178..63bf5fce2e8dd87ba22da8d39adbadf43aac15d1 100644 --- a/src/Interpreters/RewriteCountVariantsVisitor.cpp +++ b/src/Interpreters/RewriteCountVariantsVisitor.cpp @@ -23,7 +23,7 @@ void RewriteCountVariantsVisitor::visit(ASTPtr & node) void RewriteCountVariantsVisitor::visit(ASTFunction & func) { - if (func.arguments->children.empty() || func.arguments->children.size() > 1 || !func.arguments->children[0]) + if (!func.arguments || func.arguments->children.empty() || func.arguments->children.size() > 1 || !func.arguments->children[0]) return; auto name = Poco::toLower(func.name); diff --git a/src/Interpreters/addMissingDefaults.cpp b/src/Interpreters/addMissingDefaults.cpp index 37a0812826b0589cfd2a010b3faf11b01a9788ea..9e8ce1f75b415ce91134391f05738d4134fb32a4 100644 --- a/src/Interpreters/addMissingDefaults.cpp +++ b/src/Interpreters/addMissingDefaults.cpp @@ -7,75 +7,85 @@ #include #include #include +#include +#include +#include namespace DB { -Block addMissingDefaults( - const Block & block, +ActionsDAGPtr addMissingDefaults( + const Block & header, const NamesAndTypesList & required_columns, const ColumnsDescription & columns, const Context & context) { /// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths. /// First, remember the offset columns for all arrays in the block. - std::map offset_columns; + std::map nested_groups; - for (size_t i = 0, size = block.columns(); i < size; ++i) + for (size_t i = 0, size = header.columns(); i < size; ++i) { - const auto & elem = block.getByPosition(i); + const auto & elem = header.getByPosition(i); - if (const ColumnArray * array = typeid_cast(&*elem.column)) + if (typeid_cast(&*elem.column)) { String offsets_name = Nested::extractTableName(elem.name); - auto & offsets_column = offset_columns[offsets_name]; - /// If for some reason there are different offset columns for one nested structure, then we take nonempty. - if (!offsets_column || offsets_column->empty()) - offsets_column = array->getOffsetsPtr(); + auto & group = nested_groups[offsets_name]; + if (group.empty()) + group.push_back({}); + + group.push_back(elem.name); } } - const size_t rows = block.rows(); - Block res; + auto actions = std::make_shared(header.getColumnsWithTypeAndName()); + + FunctionOverloadResolverPtr func_builder_replicate = FunctionFactory::instance().get("replicate", context); /// We take given columns from input block and missed columns without default value /// (default and materialized will be computed later). for (const auto & column : required_columns) { - if (block.has(column.name)) - { - res.insert(block.getByName(column.name)); + if (header.has(column.name)) continue; - } if (columns.hasDefault(column.name)) continue; String offsets_name = Nested::extractTableName(column.name); - if (offset_columns.count(offsets_name)) + if (nested_groups.count(offsets_name)) { - ColumnPtr offsets_column = offset_columns[offsets_name]; + DataTypePtr nested_type = typeid_cast(*column.type).getNestedType(); - UInt64 nested_rows = rows ? get((*offsets_column)[rows - 1]) : 0; + ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(0); + const auto & constant = actions->addColumn({std::move(nested_column), nested_type, column.name}, true); + + auto & group = nested_groups[offsets_name]; + group[0] = constant.result_name; + actions->addFunction(func_builder_replicate, group, constant.result_name, context, true); - ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst(); - auto new_column = ColumnArray::create(nested_column, offsets_column); - res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name)); continue; } /** It is necessary to turn a constant column into a full column, since in part of blocks (from other parts), * it can be full (or the interpreter may decide that it is constant everywhere). */ - auto new_column = column.type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst(); - res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name)); + auto new_column = column.type->createColumnConstWithDefaultValue(0); + actions->addColumn({std::move(new_column), column.type, column.name}, true, true); } /// Computes explicitly specified values by default and materialized columns. - evaluateMissingDefaults(res, required_columns, columns, context); - return res; + if (auto dag = evaluateMissingDefaults(actions->getResultColumns(), required_columns, columns, context)) + actions = ActionsDAG::merge(std::move(*actions), std::move(*dag)); + else + /// Removes unused columns and reorders result. + /// The same is done in evaluateMissingDefaults if not empty dag is returned. + actions->removeUnusedActions(required_columns.getNames()); + + return actions; } } diff --git a/src/Interpreters/addMissingDefaults.h b/src/Interpreters/addMissingDefaults.h index ed5d5ce61ff6bf2b7f4df1d1a9bc8187318cfbb5..e746c7cc9e66a4348b1c8da039fa481b43ca1597 100644 --- a/src/Interpreters/addMissingDefaults.h +++ b/src/Interpreters/addMissingDefaults.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -12,14 +13,17 @@ class Context; class NamesAndTypesList; class ColumnsDescription; +class ActionsDAG; +using ActionsDAGPtr = std::shared_ptr; + /** Adds three types of columns into block * 1. Columns, that are missed inside request, but present in table without defaults (missed columns) * 2. Columns, that are missed inside request, but present in table with defaults (columns with default values) * 3. Columns that materialized from other columns (materialized columns) * All three types of columns are materialized (not constants). */ -Block addMissingDefaults( - const Block & block, +ActionsDAGPtr addMissingDefaults( + const Block & header, const NamesAndTypesList & required_columns, const ColumnsDescription & columns, const Context & context); diff --git a/src/Interpreters/inplaceBlockConversions.cpp b/src/Interpreters/inplaceBlockConversions.cpp index ab74aa7d6317e510b584eca216ff9291f1e7d36c..eba03d7aa61a6d0720579f529f822bd70dffcf1e 100644 --- a/src/Interpreters/inplaceBlockConversions.cpp +++ b/src/Interpreters/inplaceBlockConversions.cpp @@ -24,7 +24,7 @@ namespace { /// Add all required expressions for missing columns calculation -void addDefaultRequiredExpressionsRecursively(Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns) +void addDefaultRequiredExpressionsRecursively(const Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns) { checkStackSize(); if (block.has(required_column) || added_columns.count(required_column)) @@ -52,7 +52,7 @@ void addDefaultRequiredExpressionsRecursively(Block & block, const String & requ } } -ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns) +ASTPtr defaultRequiredExpressions(const Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns) { ASTPtr default_expr_list = std::make_shared(); @@ -87,67 +87,29 @@ ASTPtr convertRequiredExpressions(Block & block, const NamesAndTypesList & requi return conversion_expr_list; } -void executeExpressionsOnBlock( - Block & block, +ActionsDAGPtr createExpressions( + const Block & header, ASTPtr expr_list, bool save_unneeded_columns, const NamesAndTypesList & required_columns, const Context & context) { if (!expr_list) - return; - - if (!save_unneeded_columns) - { - auto syntax_result = TreeRewriter(context).analyze(expr_list, block.getNamesAndTypesList()); - ExpressionAnalyzer{expr_list, syntax_result, context}.getActions(true)->execute(block); - return; - } - - /** ExpressionAnalyzer eliminates "unused" columns, in order to ensure their safety - * we are going to operate on a copy instead of the original block */ - Block copy_block{block}; + return nullptr; - auto syntax_result = TreeRewriter(context).analyze(expr_list, block.getNamesAndTypesList()); + auto syntax_result = TreeRewriter(context).analyze(expr_list, header.getNamesAndTypesList()); auto expression_analyzer = ExpressionAnalyzer{expr_list, syntax_result, context}; - auto required_source_columns = syntax_result->requiredSourceColumns(); - auto rows_was = copy_block.rows(); - - // Delete all not needed columns in DEFAULT expression. - // They can intersect with columns added in PREWHERE - // test 00950_default_prewhere - // CLICKHOUSE-4523 - for (const auto & delete_column : copy_block.getNamesAndTypesList()) - { - if (std::find(required_source_columns.begin(), required_source_columns.end(), delete_column.name) == required_source_columns.end()) - { - copy_block.erase(delete_column.name); - } - } + auto dag = std::make_shared(header.getNamesAndTypesList()); + auto actions = expression_analyzer.getActionsDAG(true, !save_unneeded_columns); + dag = ActionsDAG::merge(std::move(*dag), std::move(*actions)); - if (copy_block.columns() == 0) + if (save_unneeded_columns) { - // Add column to indicate block size in execute() - copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared(), "__dummy"}); + dag->removeUnusedActions(required_columns.getNames()); + dag->addMaterializingOutputActions(); } - expression_analyzer.getActions(true)->execute(copy_block); - - /// move evaluated columns to the original block, materializing them at the same time - size_t pos = 0; - for (auto col = required_columns.begin(); col != required_columns.end(); ++col, ++pos) - { - if (copy_block.has(col->name)) - { - auto evaluated_col = copy_block.getByName(col->name); - evaluated_col.column = evaluated_col.column->convertToFullColumnIfConst(); - - if (block.has(col->name)) - block.getByName(col->name) = std::move(evaluated_col); - else - block.insert(pos, std::move(evaluated_col)); - } - } + return dag; } } @@ -157,19 +119,25 @@ void performRequiredConversions(Block & block, const NamesAndTypesList & require ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns); if (conversion_expr_list->children.empty()) return; - executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context); + + if (auto dag = createExpressions(block, conversion_expr_list, true, required_columns, context)) + { + auto expression = std::make_shared(std::move(dag)); + expression->execute(block); + } } -void evaluateMissingDefaults(Block & block, +ActionsDAGPtr evaluateMissingDefaults( + const Block & header, const NamesAndTypesList & required_columns, const ColumnsDescription & columns, const Context & context, bool save_unneeded_columns) { if (!columns.hasDefaults()) - return; + return nullptr; - ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, columns); - executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context); + ASTPtr expr_list = defaultRequiredExpressions(header, required_columns, columns); + return createExpressions(header, expr_list, save_unneeded_columns, required_columns, context); } } diff --git a/src/Interpreters/inplaceBlockConversions.h b/src/Interpreters/inplaceBlockConversions.h index 066975ab4bc1aa6ce6910822bef1ef27ba108056..63540e2994db9d7eb63deaab717250c0f37d1346 100644 --- a/src/Interpreters/inplaceBlockConversions.h +++ b/src/Interpreters/inplaceBlockConversions.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -12,9 +13,13 @@ class Context; class NamesAndTypesList; class ColumnsDescription; -/// Adds missing defaults to block according to required_columns -/// using columns description -void evaluateMissingDefaults(Block & block, +class ActionsDAG; +using ActionsDAGPtr = std::shared_ptr; + +/// Create actions which adds missing defaults to block according to required_columns using columns description. +/// Return nullptr if no actions required. +ActionsDAGPtr evaluateMissingDefaults( + const Block & header, const NamesAndTypesList & required_columns, const ColumnsDescription & columns, const Context & context, bool save_unneeded_columns = true); diff --git a/src/Processors/QueryPlan/AddingMissedStep.cpp b/src/Processors/QueryPlan/AddingMissedStep.cpp deleted file mode 100644 index 359d0d46a878bc0502cb1bc45215dea8756375e4..0000000000000000000000000000000000000000 --- a/src/Processors/QueryPlan/AddingMissedStep.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include -#include -#include -#include - -namespace DB -{ - -static ITransformingStep::Traits getTraits() -{ - return ITransformingStep::Traits - { - { - .preserves_distinct_columns = false, /// TODO: check if true later. - .returns_single_stream = false, - .preserves_number_of_streams = true, - .preserves_sorting = true, - }, - { - .preserves_number_of_rows = true, - } - }; -} - -AddingMissedStep::AddingMissedStep( - const DataStream & input_stream_, - Block result_header_, - ColumnsDescription columns_, - const Context & context_) - : ITransformingStep(input_stream_, result_header_, getTraits()) - , columns(std::move(columns_)) - , context(context_) -{ - updateDistinctColumns(output_stream->header, output_stream->distinct_columns); -} - -void AddingMissedStep::transformPipeline(QueryPipeline & pipeline) -{ - pipeline.addSimpleTransform([&](const Block & header) - { - return std::make_shared(header, output_stream->header, columns, context); - }); -} - -} diff --git a/src/Processors/QueryPlan/AddingMissedStep.h b/src/Processors/QueryPlan/AddingMissedStep.h deleted file mode 100644 index ce755b79fdf791965937e9bc8133d037fbf89aee..0000000000000000000000000000000000000000 --- a/src/Processors/QueryPlan/AddingMissedStep.h +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once -#include -#include - -namespace DB -{ - -/// Convert one block structure to another. See ConvertingTransform. -class AddingMissedStep : public ITransformingStep -{ -public: - AddingMissedStep(const DataStream & input_stream_, - Block result_header_, - ColumnsDescription columns_, - const Context & context_); - - String getName() const override { return "AddingMissed"; } - - void transformPipeline(QueryPipeline & pipeline) override; - -private: - ColumnsDescription columns; - const Context & context; -}; - -} diff --git a/src/Processors/Transforms/AddingMissedTransform.cpp b/src/Processors/Transforms/AddingMissedTransform.cpp deleted file mode 100644 index 1344cce22a745c5fe3c6539aa6cd6197f5fb5825..0000000000000000000000000000000000000000 --- a/src/Processors/Transforms/AddingMissedTransform.cpp +++ /dev/null @@ -1,27 +0,0 @@ -#include -#include - - -namespace DB -{ - -AddingMissedTransform::AddingMissedTransform( - Block header_, - Block result_header_, - const ColumnsDescription & columns_, - const Context & context_) - : ISimpleTransform(std::move(header_), std::move(result_header_), false) - , columns(columns_), context(context_) -{ -} - -void AddingMissedTransform::transform(Chunk & chunk) -{ - auto num_rows = chunk.getNumRows(); - Block src = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()); - - auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), columns, context); - chunk.setColumns(res.getColumns(), num_rows); -} - -} diff --git a/src/Processors/Transforms/AddingMissedTransform.h b/src/Processors/Transforms/AddingMissedTransform.h deleted file mode 100644 index 561f908acef29ccaad1162ace59c5372fc8b5c71..0000000000000000000000000000000000000000 --- a/src/Processors/Transforms/AddingMissedTransform.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once - -#include -#include - - -namespace DB -{ - - -/** This stream adds three types of columns into block - * 1. Columns, that are missed inside request, but present in table without defaults (missed columns) - * 2. Columns, that are missed inside request, but present in table with defaults (columns with default values) - * 3. Columns that materialized from other columns (materialized columns) - * All three types of columns are materialized (not constants). - */ -class AddingMissedTransform : public ISimpleTransform -{ -public: - AddingMissedTransform( - Block header_, - Block result_header_, - const ColumnsDescription & columns_, - const Context & context_); - - String getName() const override { return "AddingMissed"; } - -private: - void transform(Chunk &) override; - - const ColumnsDescription columns; - const Context & context; -}; - - -} diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 2862873a920f40443adb8ce9a32a29825cc8576d..34ff61d03c52c5fd5b2aed1ce85c2fdd5de9742d 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -93,7 +93,6 @@ SRCS( Port.cpp QueryPipeline.cpp QueryPlan/AddingDelayedSourceStep.cpp - QueryPlan/AddingMissedStep.cpp QueryPlan/AggregatingStep.cpp QueryPlan/ArrayJoinStep.cpp QueryPlan/CreatingSetsStep.cpp @@ -137,7 +136,6 @@ SRCS( Sources/SinkToOutputStream.cpp Sources/SourceFromInputStream.cpp Sources/SourceWithProgress.cpp - Transforms/AddingMissedTransform.cpp Transforms/AddingSelectorTransform.cpp Transforms/AggregatingInOrderTransform.cpp Transforms/AggregatingTransform.cpp diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index 5e0d1f0ac66a3b5a312fe2ae330aa36c8c64f846..eb4d6119c6fe0e4a30e8e4fae5408cd25bf1c1db 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -800,7 +800,6 @@ bool DynamicQueryHandler::customizeQueryParam(Context & context, const std::stri std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context) { - if (likely(!startsWith(request.getContentType(), "multipart/form-data"))) { /// Part of the query can be passed in the 'query' parameter and the rest in the request body diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index fa213dcdc551434f94dec3c860474737f12057ce..d66639ef111099096090fab7201b9fd85582a54b 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -1181,33 +1180,44 @@ bool TCPHandler::receiveData(bool scalar) if (block) { if (scalar) + { + /// Scalar value query_context->addScalar(temporary_id.table_name, block); - else + } + else if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input) { - /// If there is an insert request, then the data should be written directly to `state.io.out`. - /// Otherwise, we write the blocks in the temporary `external_table_name` table. - if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input) + /// Data for external tables + + auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal); + StoragePtr storage; + /// If such a table does not exist, create it. + if (resolved) { - auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal); - StoragePtr storage; - /// If such a table does not exist, create it. - if (resolved) - storage = DatabaseCatalog::instance().getTable(resolved, *query_context); - else - { - NamesAndTypesList columns = block.getNamesAndTypesList(); - auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {}); - storage = temporary_table.getTable(); - query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table)); - } - auto metadata_snapshot = storage->getInMemoryMetadataPtr(); - /// The data will be written directly to the table. - state.io.out = storage->write(ASTPtr(), metadata_snapshot, *query_context); + storage = DatabaseCatalog::instance().getTable(resolved, *query_context); } - if (state.need_receive_data_for_input) - state.block_for_input = block; else - state.io.out->write(block); + { + NamesAndTypesList columns = block.getNamesAndTypesList(); + auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {}); + storage = temporary_table.getTable(); + query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table)); + } + auto metadata_snapshot = storage->getInMemoryMetadataPtr(); + /// The data will be written directly to the table. + auto temporary_table_out = storage->write(ASTPtr(), metadata_snapshot, *query_context); + temporary_table_out->write(block); + temporary_table_out->writeSuffix(); + + } + else if (state.need_receive_data_for_input) + { + /// 'input' table function. + state.block_for_input = block; + } + else + { + /// INSERT query. + state.io.out->write(block); } return true; } diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 1c0149ac261dd5259108a7e1f5d3d8a164da93c3..651688f41bb911e6c925e02df46751f6b48573e2 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -131,6 +131,10 @@ public: /// Returns true if the storage supports reading of subcolumns of complex types. virtual bool supportsSubcolumns() const { return false; } + /// Requires squashing small blocks to large for optimal storage. + /// This is true for most storages that store data on disk. + virtual bool prefersLargeBlocks() const { return true; } + /// Optional size information of each physical column. /// Currently it's only used by the MergeTree family for query optimizations. diff --git a/src/Storages/MergeTree/IMergeTreeReader.cpp b/src/Storages/MergeTree/IMergeTreeReader.cpp index 0140b32e12cf6d1d112db820a9b3345cd28030fb..f28ca28b124e1c1b9c2318631e08bd3aa3c8610c 100644 --- a/src/Storages/MergeTree/IMergeTreeReader.cpp +++ b/src/Storages/MergeTree/IMergeTreeReader.cpp @@ -186,7 +186,13 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name}); } - DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context); + auto dag = DB::evaluateMissingDefaults( + additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context); + if (dag) + { + auto actions = std::make_shared(std::move(dag)); + actions->execute(additional_columns); + } /// Move columns from block. name_and_type = columns.begin(); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a4cc0b8ed84f8e946288cc6061e63c20e440dce8..c6e77a56db65bacf2e82e3e6db2938f0aa564f72 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1490,16 +1490,31 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S getPartitionIDFromQuery(command.partition, global_context); } - /// Some type changes for version column is allowed despite it's a part of sorting key - if (command.type == AlterCommand::MODIFY_COLUMN && command.column_name == merging_params.version_column) + if (command.column_name == merging_params.version_column) { - const IDataType * new_type = command.data_type.get(); - const IDataType * old_type = old_types[command.column_name]; + /// Some type changes for version column is allowed despite it's a part of sorting key + if (command.type == AlterCommand::MODIFY_COLUMN) + { + const IDataType * new_type = command.data_type.get(); + const IDataType * old_type = old_types[command.column_name]; - checkVersionColumnTypesConversion(old_type, new_type, command.column_name); + checkVersionColumnTypesConversion(old_type, new_type, command.column_name); - /// No other checks required - continue; + /// No other checks required + continue; + } + else if (command.type == AlterCommand::DROP_COLUMN) + { + throw Exception( + "Trying to ALTER DROP version " + backQuoteIfNeed(command.column_name) + " column", + ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN); + } + else if (command.type == AlterCommand::RENAME_COLUMN) + { + throw Exception( + "Trying to ALTER RENAME version " + backQuoteIfNeed(command.column_name) + " column", + ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN); + } } if (command.type == AlterCommand::MODIFY_ORDER_BY && !is_custom_partitioned) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 81a6539780c2a314d0eecbf2f33ce6caf357a8f9..f2bbf53bd97d8f68b7e69f7ba6caf5077ea180b4 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -140,7 +140,7 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri /// If we didn't finished last granule than we will continue to write it from new block if (!last_granule.is_complete) { - if (settings.blocks_are_granules_size) + if (settings.can_use_adaptive_granularity && settings.blocks_are_granules_size) throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete granules are not allowed while blocks are granules size. " "Mark number {} (rows {}), rows written in last mark {}, rows to write in last mark from block {} (from row {}), total marks currently {}", last_granule.mark_number, index_granularity.getMarkRows(last_granule.mark_number), rows_written_in_last_mark, @@ -506,7 +506,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch WrittenOffsetColumns offset_columns; if (rows_written_in_last_mark > 0) { - if (settings.blocks_are_granules_size) + if (settings.can_use_adaptive_granularity && settings.blocks_are_granules_size) throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete granule is not allowed while blocks are granules size even for last granule. " "Mark number {} (rows {}), rows written for last mark {}, total marks {}", getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount()); diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index ce74567c62b86bf9ab2d0a0a212a64505d004249..bf02a04c704a36b5b99c009accef109fd9149f62 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include #include #include @@ -246,10 +246,15 @@ void StorageBuffer::read( if (query_plan.isInitialized()) { - auto adding_missed = std::make_unique( + auto actions = addMissingDefaults( + query_plan.getCurrentDataStream().header, + header_after_adding_defaults.getNamesAndTypesList(), + metadata_snapshot->getColumns(), + context); + + auto adding_missed = std::make_unique( query_plan.getCurrentDataStream(), - header_after_adding_defaults, - metadata_snapshot->getColumns(), context); + std::move(actions)); adding_missed->setStepDescription("Add columns missing in destination table"); query_plan.addStep(std::move(adding_missed)); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 02ee70dc8f4d0f8bf6f704f38859dd3d54524506..c08dc38fa2d98823bb38b63e883674f5d101d4fb 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -681,7 +681,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();) { - it->second.shutdownAndDropAllData(); + it->second.directory_monitor->shutdownAndDropAllData(); it = cluster_nodes_data.erase(it); } @@ -799,16 +799,6 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons return cluster; } -void StorageDistributed::ClusterNodeData::flushAllData() const -{ - directory_monitor->flushAllData(); -} - -void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() const -{ - directory_monitor->shutdownAndDropAllData(); -} - IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result) { const auto & slot_to_shard = cluster->getSlotToShard(); @@ -892,13 +882,24 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type) return {}; } -void StorageDistributed::flushClusterNodesAllData() +void StorageDistributed::flushClusterNodesAllData(const Context & context) { - std::lock_guard lock(cluster_nodes_mutex); + /// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE + auto table_lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout); + + std::vector> directory_monitors; + + { + std::lock_guard lock(cluster_nodes_mutex); + + directory_monitors.reserve(cluster_nodes_data.size()); + for (auto & node : cluster_nodes_data) + directory_monitors.push_back(node.second.directory_monitor); + } /// TODO: Maybe it should be executed in parallel - for (auto & node : cluster_nodes_data) - node.second.flushAllData(); + for (auto & node : directory_monitors) + node->flushAllData(); } void StorageDistributed::rename(const String & new_path_to_table_data, const StorageID & new_table_id) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 585efafddfb5767c01a665b3e0284b09ba698e97..4d3869f7c5c2892bfff069681d3297fb0971e09e 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -114,7 +114,7 @@ public: /// (note that monitors are created lazily, i.e. until at least one INSERT executed) std::vector getDirectoryMonitorsStatuses() const; - void flushClusterNodesAllData(); + void flushClusterNodesAllData(const Context & context); ClusterPtr getCluster() const; @@ -200,11 +200,8 @@ protected: struct ClusterNodeData { - std::unique_ptr directory_monitor; + std::shared_ptr directory_monitor; ConnectionPoolPtr connection_pool; - - void flushAllData() const; - void shutdownAndDropAllData() const; }; std::unordered_map cluster_nodes_data; mutable std::mutex cluster_nodes_mutex; diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 1474fbcee029f7ca63523a02d1c1b7cac54d3a6d..4530d93c274ade340935d8d1a7452c6a4a2615fd 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -104,33 +104,46 @@ private: class MemoryBlockOutputStream : public IBlockOutputStream { public: - explicit MemoryBlockOutputStream( + MemoryBlockOutputStream( StorageMemory & storage_, const StorageMetadataPtr & metadata_snapshot_) : storage(storage_) , metadata_snapshot(metadata_snapshot_) - {} + { + } Block getHeader() const override { return metadata_snapshot->getSampleBlock(); } void write(const Block & block) override { - const auto size_bytes_diff = block.allocatedBytes(); - const auto size_rows_diff = block.rows(); - metadata_snapshot->check(block, true); - { - std::lock_guard lock(storage.mutex); - auto new_data = std::make_unique(*(storage.data.get())); - new_data->push_back(block); - storage.data.set(std::move(new_data)); + new_blocks.emplace_back(block); + } + + void writeSuffix() override + { + size_t inserted_bytes = 0; + size_t inserted_rows = 0; - storage.total_size_bytes.fetch_add(size_bytes_diff, std::memory_order_relaxed); - storage.total_size_rows.fetch_add(size_rows_diff, std::memory_order_relaxed); + for (const auto & block : new_blocks) + { + inserted_bytes += block.allocatedBytes(); + inserted_rows += block.rows(); } + std::lock_guard lock(storage.mutex); + + auto new_data = std::make_unique(*(storage.data.get())); + new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end()); + + storage.data.set(std::move(new_data)); + storage.total_size_bytes.fetch_add(inserted_bytes, std::memory_order_relaxed); + storage.total_size_rows.fetch_add(inserted_rows, std::memory_order_relaxed); } + private: + Blocks new_blocks; + StorageMemory & storage; StorageMetadataPtr metadata_snapshot; }; diff --git a/src/Storages/StorageMemory.h b/src/Storages/StorageMemory.h index 702cb265ea928dff39f67f1825a8ff73be97b989..dc695427156bca5913201a1553c9d545a945f26b 100644 --- a/src/Storages/StorageMemory.h +++ b/src/Storages/StorageMemory.h @@ -40,9 +40,11 @@ public: unsigned num_streams) override; bool supportsParallelInsert() const override { return true; } - bool supportsSubcolumns() const override { return true; } + /// Smaller blocks (e.g. 64K rows) are better for CPU cache. + bool prefersLargeBlocks() const override { return false; } + BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const Context & context) override; void drop() override; diff --git a/tests/performance/decimal_aggregates.xml b/tests/performance/decimal_aggregates.xml index 615c3201843ad7d0a705438f4d8cd3f4578b26a2..f7bc2ac18681feb1d867f8e749b8275b2e763846 100644 --- a/tests/performance/decimal_aggregates.xml +++ b/tests/performance/decimal_aggregates.xml @@ -11,7 +11,7 @@ SELECT min(d32), max(d32), argMin(x, d32), argMax(x, d32) FROM t SELECT min(d64), max(d64), argMin(x, d64), argMax(x, d64) FROM t SELECT min(d128), max(d128), argMin(x, d128), argMax(x, d128) FROM t - + SELECT avg(d32), sum(d32), sumWithOverflow(d32) FROM t SELECT avg(d64), sum(d64), sumWithOverflow(d64) FROM t SELECT avg(d128), sum(d128), sumWithOverflow(d128) FROM t @@ -19,11 +19,11 @@ SELECT uniq(d32), uniqCombined(d32), uniqExact(d32), uniqHLL12(d32) FROM (SELECT * FROM t LIMIT 10000000) SELECT uniq(d64), uniqCombined(d64), uniqExact(d64), uniqHLL12(d64) FROM (SELECT * FROM t LIMIT 10000000) SELECT uniq(d128), uniqCombined(d128), uniqExact(d128), uniqHLL12(d128) FROM (SELECT * FROM t LIMIT 1000000) - + SELECT median(d32), medianExact(d32), medianExactWeighted(d32, 2) FROM (SELECT * FROM t LIMIT 10000000) SELECT median(d64), medianExact(d64), medianExactWeighted(d64, 2) FROM (SELECT * FROM t LIMIT 1000000) SELECT median(d128), medianExact(d128), medianExactWeighted(d128, 2) FROM (SELECT * FROM t LIMIT 1000000) - + SELECT quantile(d32), quantileExact(d32), quantileExactWeighted(d32, 2) FROM (SELECT * FROM t LIMIT 10000000) SELECT quantile(d64), quantileExact(d64), quantileExactWeighted(d64, 2) FROM (SELECT * FROM t LIMIT 1000000) SELECT quantile(d128), quantileExact(d128), quantileExactWeighted(d128, 2) FROM (SELECT * FROM t LIMIT 1000000) @@ -31,8 +31,8 @@ SELECT quantilesExact(0.1, 0.9)(d32), quantilesExactWeighted(0.1, 0.9)(d32, 2) FROM (SELECT * FROM t LIMIT 10000000) SELECT quantilesExact(0.1, 0.9)(d64), quantilesExactWeighted(0.1, 0.9)(d64, 2) FROM (SELECT * FROM t LIMIT 1000000) SELECT quantilesExact(0.1, 0.9)(d128), quantilesExactWeighted(0.1, 0.9)(d128, 2) FROM (SELECT * FROM t LIMIT 1000000) - + SELECT varPop(d32), varSamp(d32), stddevPop(d32) FROM t - SELECT varPop(d64), varSamp(d64), stddevPop(d64) FROM (SELECT * FROM t LIMIT 1000000) - SELECT varPop(d128), varSamp(d128), stddevPop(d128) FROM (SELECT * FROM t LIMIT 1000000) + SELECT varPop(d64), varSamp(d64), stddevPop(d64) FROM (SELECT * FROM t LIMIT 10000000) + SELECT varPop(d128), varSamp(d128), stddevPop(d128) FROM (SELECT * FROM t LIMIT 10000000) diff --git a/tests/performance/memory_cache_friendliness.xml b/tests/performance/memory_cache_friendliness.xml new file mode 100644 index 0000000000000000000000000000000000000000..92b796615405f00571b440acf164146228219922 --- /dev/null +++ b/tests/performance/memory_cache_friendliness.xml @@ -0,0 +1,8 @@ + + CREATE TABLE test_memory (x UInt64) ENGINE Memory + INSERT INTO test_memory SELECT 1 FROM numbers(1000000000) + + SELECT sum(x * x + x) FROM test_memory + + DROP TABLE IF EXISTS test_memory + diff --git a/tests/queries/0_stateless/00341_squashing_insert_select2.sql b/tests/queries/0_stateless/00341_squashing_insert_select2.sql index 469fdaaa64ae1df980ebb2cab1ee5cef84c9a0a9..3eb5a2682e039098d116ebe180b5af1536f5048c 100644 --- a/tests/queries/0_stateless/00341_squashing_insert_select2.sql +++ b/tests/queries/0_stateless/00341_squashing_insert_select2.sql @@ -1,5 +1,5 @@ DROP TABLE IF EXISTS numbers_squashed; -CREATE TABLE numbers_squashed (number UInt8) ENGINE = Memory; +CREATE TABLE numbers_squashed (number UInt8) ENGINE = StripeLog; SET min_insert_block_size_rows = 100; SET min_insert_block_size_bytes = 0; diff --git a/tests/queries/0_stateless/00979_live_view_watch_live_moving_avg.py b/tests/queries/0_stateless/00979_live_view_watch_live_moving_avg.py.disabled similarity index 100% rename from tests/queries/0_stateless/00979_live_view_watch_live_moving_avg.py rename to tests/queries/0_stateless/00979_live_view_watch_live_moving_avg.py.disabled diff --git a/tests/queries/0_stateless/01455_optimize_trivial_insert_select.sql b/tests/queries/0_stateless/01455_optimize_trivial_insert_select.sql index de470fe6a572e60266c32395e24b77d18376c7a9..5b59bc065ddaf6b2f77e136fde93a5d7427f5956 100644 --- a/tests/queries/0_stateless/01455_optimize_trivial_insert_select.sql +++ b/tests/queries/0_stateless/01455_optimize_trivial_insert_select.sql @@ -1,7 +1,9 @@ SET max_insert_threads = 1, max_threads = 100, min_insert_block_size_rows = 1048576, max_block_size = 65536; -CREATE TEMPORARY TABLE t (x UInt64); +DROP TABLE IF EXISTS t; +CREATE TABLE t (x UInt64) ENGINE = StripeLog; -- For trivial INSERT SELECT, max_threads is lowered to max_insert_threads and max_block_size is changed to min_insert_block_size_rows. INSERT INTO t SELECT * FROM numbers_mt(1000000); SET max_threads = 1; -- If data was inserted by more threads, we will probably see data out of order. SELECT DISTINCT blockSize(), runningDifference(x) FROM t; +DROP TABLE t; diff --git a/tests/queries/0_stateless/01712_no_adaptive_granularity_vertical_merge.reference b/tests/queries/0_stateless/01712_no_adaptive_granularity_vertical_merge.reference new file mode 100644 index 0000000000000000000000000000000000000000..51acb06639407c3655bb5799773c20f3dfd1bd04 --- /dev/null +++ b/tests/queries/0_stateless/01712_no_adaptive_granularity_vertical_merge.reference @@ -0,0 +1,6 @@ +1 1 +2 2 +1 1 +2 2 +1 1 +2 2 diff --git a/tests/queries/0_stateless/01712_no_adaptive_granularity_vertical_merge.sql b/tests/queries/0_stateless/01712_no_adaptive_granularity_vertical_merge.sql new file mode 100644 index 0000000000000000000000000000000000000000..0acf6992c1e468dea1c1084ab7f5e14e9507c6be --- /dev/null +++ b/tests/queries/0_stateless/01712_no_adaptive_granularity_vertical_merge.sql @@ -0,0 +1,30 @@ +DROP TABLE IF EXISTS old_school_table; + +CREATE TABLE old_school_table +( + key UInt64, + value String +) +ENGINE = MergeTree() +ORDER BY key +SETTINGS index_granularity_bytes = 0, enable_mixed_granularity_parts = 0, min_bytes_for_wide_part = 0, +vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1; + +INSERT INTO old_school_table VALUES (1, '1'); +INSERT INTO old_school_table VALUES (2, '2'); + +OPTIMIZE TABLE old_school_table FINAL; + +SELECT * FROM old_school_table ORDER BY key; + +OPTIMIZE TABLE old_school_table FINAL; -- just to be sure + +SELECT * FROM old_school_table ORDER BY key; + +ALTER TABLE old_school_table MODIFY SETTING vertical_merge_algorithm_min_rows_to_activate = 10000, vertical_merge_algorithm_min_columns_to_activate = 10000; + +OPTIMIZE TABLE old_school_table FINAL; -- and horizontal merge + +SELECT * FROM old_school_table ORDER BY key; + +DROP TABLE IF EXISTS old_school_table; diff --git a/tests/queries/0_stateless/01714_alter_drop_version.reference b/tests/queries/0_stateless/01714_alter_drop_version.reference new file mode 100644 index 0000000000000000000000000000000000000000..72749c905a314fb1c4bdabd91f28aef935074b97 --- /dev/null +++ b/tests/queries/0_stateless/01714_alter_drop_version.reference @@ -0,0 +1 @@ +1 1 1 diff --git a/tests/queries/0_stateless/01714_alter_drop_version.sql b/tests/queries/0_stateless/01714_alter_drop_version.sql new file mode 100644 index 0000000000000000000000000000000000000000..e3d5db3385917b78330ed3d18d62721ae366b569 --- /dev/null +++ b/tests/queries/0_stateless/01714_alter_drop_version.sql @@ -0,0 +1,23 @@ +DROP TABLE IF EXISTS alter_drop_version; + +CREATE TABLE alter_drop_version +( + `key` UInt64, + `value` String, + `ver` Int8 +) +ENGINE = ReplacingMergeTree(ver) +ORDER BY key; + +INSERT INTO alter_drop_version VALUES (1, '1', 1); + +ALTER TABLE alter_drop_version DROP COLUMN ver; --{serverError 524} +ALTER TABLE alter_drop_version RENAME COLUMN ver TO rev; --{serverError 524} + +DETACH TABLE alter_drop_version; + +ATTACH TABLE alter_drop_version; + +SELECT * FROM alter_drop_version; + +DROP TABLE IF EXISTS alter_drop_version;