提交 1275be58 编写于 作者: A Alexander Kuzmenkov

Merge remote-tracking branch 'origin/master' into HEAD

......@@ -190,7 +190,7 @@ case "$stage" in
# Lost connection to the server. This probably means that the server died
# with abort.
echo "failure" > status.txt
if ! grep -ao "Received signal.*\|Logical error.*\|Assertion.*failed\|Failed assertion.*\|.*runtime error: .*\|.*is located.*\|SUMMARY: MemorySanitizer:.*\|SUMMARY: ThreadSanitizer:.*" server.log > description.txt
if ! grep -ao "Received signal.*\|Logical error.*\|Assertion.*failed\|Failed assertion.*\|.*runtime error: .*\|.*is located.*\|SUMMARY: MemorySanitizer:.*\|SUMMARY: ThreadSanitizer:.*\|.*_LIBCPP_ASSERT.*" server.log > description.txt
then
echo "Lost connection to server. See the logs." > description.txt
fi
......
......@@ -100,8 +100,8 @@ endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp)
list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h)
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/replicate.cpp)
list (APPEND dbms_headers Functions/IFunctionImpl.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h)
list (APPEND dbms_sources
AggregateFunctions/AggregateFunctionFactory.cpp
......
#include <boost/program_options.hpp>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Interpreters/Context.h>
#include <IO/copyData.h>
#include <Interpreters/DatabaseCatalog.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>
#include <Storages/StorageMemory.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Pipe.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Core/ExternalTable.h>
#include <Poco/Net/MessageHeader.h>
#include <Formats/FormatFactory.h>
#include <common/find_symbols.h>
......@@ -39,7 +45,7 @@ ExternalTableDataPtr BaseExternalTable::getData(const Context & context)
return data;
}
void BaseExternalTable::clean()
void BaseExternalTable::clear()
{
name.clear();
file.clear();
......@@ -49,17 +55,6 @@ void BaseExternalTable::clean()
read_buffer.reset();
}
/// Function for debugging information output
void BaseExternalTable::write()
{
std::cerr << "file " << file << std::endl;
std::cerr << "name " << name << std::endl;
std::cerr << "format " << format << std::endl;
std::cerr << "structure: \n";
for (const auto & elem : structure)
std::cerr << '\t' << elem.first << ' ' << elem.second << std::endl;
}
void BaseExternalTable::parseStructureFromStructureField(const std::string & argument)
{
std::vector<std::string> vals;
......@@ -182,7 +177,7 @@ void ExternalTablesHandler::handlePart(const Poco::Net::MessageHeader & header,
executor->execute(/*num_threads = */ 1);
/// We are ready to receive the next file, for this we clear all the information received
clean();
clear();
}
}
......@@ -61,10 +61,7 @@ public:
protected:
/// Clear all accumulated information
void clean();
/// Function for debugging information output
void write();
void clear();
/// Construct the `structure` vector from the text field `structure`
virtual void parseStructureFromStructureField(const std::string & argument);
......
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <Interpreters/addMissingDefaults.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
AddingDefaultBlockOutputStream::AddingDefaultBlockOutputStream(
const BlockOutputStreamPtr & output_,
const Block & header_,
const ColumnsDescription & columns_,
const Context & context_)
: output(output_), header(header_)
{
auto dag = addMissingDefaults(header_, output->getHeader().getNamesAndTypesList(), columns_, context_);
adding_defaults_actions = std::make_shared<ExpressionActions>(std::move(dag));
}
void AddingDefaultBlockOutputStream::write(const Block & block)
{
output->write(addMissingDefaults(block, output_block.getNamesAndTypesList(), columns, context));
auto copy = block;
adding_defaults_actions->execute(copy);
output->write(copy);
}
void AddingDefaultBlockOutputStream::flush()
......
......@@ -8,6 +8,9 @@
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Context;
/** This stream adds three types of columns into block
......@@ -22,13 +25,8 @@ public:
AddingDefaultBlockOutputStream(
const BlockOutputStreamPtr & output_,
const Block & header_,
const Block & output_block_,
const ColumnsDescription & columns_,
const Context & context_)
: output(output_), header(header_), output_block(output_block_),
columns(columns_), context(context_)
{
}
const Context & context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
......@@ -41,10 +39,7 @@ public:
private:
BlockOutputStreamPtr output;
const Block header;
/// Blocks after this stream should have this structure
const Block output_block;
const ColumnsDescription columns;
const Context & context;
ExpressionActionsPtr adding_defaults_actions;
};
......
......@@ -171,7 +171,12 @@ Block AddingDefaultsBlockInputStream::readImpl()
if (!evaluate_block.columns())
evaluate_block.insert({ColumnConst::create(ColumnUInt8::create(1, 0), res.rows()), std::make_shared<DataTypeUInt8>(), "_dummy"});
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
auto dag = evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), columns, context, false);
if (dag)
{
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
actions->execute(evaluate_block);
}
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
......
......@@ -5,7 +5,6 @@
#include <Interpreters/Context.h>
#include <Core/Settings.h>
#include <DataStreams/MaterializingBlockOutputStream.h>
#include <DataStreams/SquashingBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h>
......
......@@ -83,9 +83,9 @@ struct ArrayDifferenceImpl
}
res_ptr = ColumnArray::create(std::move(res_nested), array.getOffsetsPtr());
return true;
}
static ColumnPtr execute(const ColumnArray & array, ColumnPtr mapped)
{
ColumnPtr res;
......@@ -107,7 +107,6 @@ struct ArrayDifferenceImpl
else
throw Exception("Unexpected column for arrayDifference: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN);
}
};
struct NameArrayDifference { static constexpr auto name = "arrayDifference"; };
......
......@@ -27,6 +27,8 @@ public:
return name;
}
bool useDefaultImplementationForLowCardinalityColumns() const override { return false; }
size_t getNumberOfArguments() const override
{
return 1;
......
#include <Functions/replicate.h>
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
......@@ -11,60 +12,50 @@ namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ILLEGAL_COLUMN;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
}
namespace
DataTypePtr FunctionReplicate::getReturnTypeImpl(const DataTypes & arguments) const
{
if (arguments.size() < 2)
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION,
"Function {} expect at least two arguments, got {}", getName(), arguments.size());
/** Creates an array, multiplying the column (the first argument) by the number of elements in the array (the second argument).
*/
class FunctionReplicate : public IFunction
{
public:
static constexpr auto name = "replicate";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionReplicate>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
for (size_t i = 1; i < arguments.size(); ++i)
{
return 2;
}
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[1].get());
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[i].get());
if (!array_type)
throw Exception("Second argument for function " + getName() + " must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeArray>(arguments[0]);
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument {} for function {} must be array.",
i + 1, getName());
}
return std::make_shared<DataTypeArray>(arguments[0]);
}
ColumnPtr FunctionReplicate::executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const
{
ColumnPtr first_column = arguments[0].column;
ColumnPtr offsets;
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
for (size_t i = 1; i < arguments.size(); ++i)
{
ColumnPtr first_column = arguments[0].column;
const ColumnArray * array_column = checkAndGetColumn<ColumnArray>(arguments[1].column.get());
const ColumnArray * array_column = checkAndGetColumn<ColumnArray>(arguments[i].column.get());
ColumnPtr temp_column;
if (!array_column)
{
const auto * const_array_column = checkAndGetColumnConst<ColumnArray>(arguments[1].column.get());
const auto * const_array_column = checkAndGetColumnConst<ColumnArray>(arguments[i].column.get());
if (!const_array_column)
throw Exception("Unexpected column for replicate", ErrorCodes::ILLEGAL_COLUMN);
temp_column = const_array_column->convertToFullColumn();
array_column = checkAndGetColumn<ColumnArray>(temp_column.get());
}
return ColumnArray::create(first_column->replicate(array_column->getOffsets())->convertToFullColumnIfConst(), array_column->getOffsetsPtr());
if (!offsets || offsets->empty())
offsets = array_column->getOffsetsPtr();
}
};
const auto & offsets_data = assert_cast<const ColumnArray::ColumnOffsets &>(*offsets).getData();
return ColumnArray::create(first_column->replicate(offsets_data)->convertToFullColumnIfConst(), offsets);
}
void registerFunctionReplicate(FunctionFactory & factory)
......
#pragma once
#include <Functions/IFunctionImpl.h>
namespace DB
{
class Context;
/// Creates an array, multiplying the column (the first argument) by the number of elements in the array (the second argument).
/// Function may accept more then two arguments. If so, the first array with non-empty offsets is chosen.
class FunctionReplicate : public IFunction
{
public:
static constexpr auto name = "replicate";
static FunctionPtr create(const Context &)
{
return std::make_shared<FunctionReplicate>();
}
String getName() const override
{
return name;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool isVariadic() const override { return true; }
bool useDefaultImplementationForNulls() const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override;
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override;
};
}
......@@ -55,10 +55,10 @@ ActionsDAG::ActionsDAG(const ColumnsWithTypeAndName & inputs_)
}
}
ActionsDAG::Node & ActionsDAG::addNode(Node node, bool can_replace)
ActionsDAG::Node & ActionsDAG::addNode(Node node, bool can_replace, bool add_to_index)
{
auto it = index.find(node.result_name);
if (it != index.end() && !can_replace)
if (it != index.end() && !can_replace && add_to_index)
throw Exception("Column '" + node.result_name + "' already exists", ErrorCodes::DUPLICATE_COLUMN);
auto & res = nodes.emplace_back(std::move(node));
......@@ -66,7 +66,8 @@ ActionsDAG::Node & ActionsDAG::addNode(Node node, bool can_replace)
if (res.type == ActionType::INPUT)
inputs.emplace_back(&res);
index.replace(&res);
if (add_to_index)
index.replace(&res);
return res;
}
......@@ -100,7 +101,7 @@ const ActionsDAG::Node & ActionsDAG::addInput(ColumnWithTypeAndName column, bool
return addNode(std::move(node), can_replace);
}
const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, bool can_replace)
const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, bool can_replace, bool materialize)
{
if (!column.column)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add column {} because it is nullptr", column.name);
......@@ -111,7 +112,22 @@ const ActionsDAG::Node & ActionsDAG::addColumn(ColumnWithTypeAndName column, boo
node.result_name = std::move(column.name);
node.column = std::move(column.column);
return addNode(std::move(node), can_replace);
auto * res = &addNode(std::move(node), can_replace, !materialize);
if (materialize)
{
auto & name = res->result_name;
FunctionOverloadResolverPtr func_builder_materialize =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionMaterialize>()));
res = &addFunction(func_builder_materialize, {res}, {}, true, false);
res = &addAlias(*res, name, true);
}
return *res;
}
const ActionsDAG::Node & ActionsDAG::addAlias(const std::string & name, std::string alias, bool can_replace)
......@@ -152,7 +168,8 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
const FunctionOverloadResolverPtr & function,
const Names & argument_names,
std::string result_name,
const Context & context [[maybe_unused]])
const Context & context [[maybe_unused]],
bool can_replace)
{
const auto & all_settings = context.getSettingsRef();
settings.max_temporary_columns = all_settings.max_temporary_columns;
......@@ -171,14 +188,15 @@ const ActionsDAG::Node & ActionsDAG::addFunction(
for (const auto & name : argument_names)
children.push_back(&getNode(name));
return addFunction(function, children, std::move(result_name), false);
return addFunction(function, children, std::move(result_name), can_replace);
}
ActionsDAG::Node & ActionsDAG::addFunction(
const FunctionOverloadResolverPtr & function,
Inputs children,
std::string result_name,
bool can_replace)
bool can_replace,
bool add_to_index)
{
size_t num_arguments = children.size();
......@@ -258,7 +276,7 @@ ActionsDAG::Node & ActionsDAG::addFunction(
node.result_name = std::move(result_name);
return addNode(std::move(node), can_replace);
return addNode(std::move(node), can_replace, add_to_index);
}
......@@ -637,6 +655,26 @@ bool ActionsDAG::trivial() const
return true;
}
void ActionsDAG::addMaterializingOutputActions()
{
FunctionOverloadResolverPtr func_builder_materialize =
std::make_shared<FunctionOverloadResolverAdaptor>(
std::make_unique<DefaultOverloadResolver>(
std::make_shared<FunctionMaterialize>()));
Index new_index;
std::vector<Node *> index_nodes(index.begin(), index.end());
for (auto * node : index_nodes)
{
auto & name = node->result_name;
node = &addFunction(func_builder_materialize, {node}, {}, true, false);
node = &addAlias(*node, name, true);
new_index.insert(node);
}
index.swap(new_index);
}
ActionsDAGPtr ActionsDAG::makeConvertingActions(
const ColumnsWithTypeAndName & source,
const ColumnsWithTypeAndName & result,
......
......@@ -198,14 +198,15 @@ public:
const Node & addInput(std::string name, DataTypePtr type, bool can_replace = false);
const Node & addInput(ColumnWithTypeAndName column, bool can_replace = false);
const Node & addColumn(ColumnWithTypeAndName column, bool can_replace = false);
const Node & addColumn(ColumnWithTypeAndName column, bool can_replace = false, bool materialize = false);
const Node & addAlias(const std::string & name, std::string alias, bool can_replace = false);
const Node & addArrayJoin(const std::string & source_name, std::string result_name);
const Node & addFunction(
const FunctionOverloadResolverPtr & function,
const Names & argument_names,
std::string result_name,
const Context & context);
const Context & context,
bool can_replace = false);
/// Call addAlias several times.
void addAliases(const NamesWithAliases & aliases);
......@@ -232,6 +233,9 @@ public:
ActionsDAGPtr clone() const;
/// For apply materialize() function for every output.
/// Also add aliases so the result names remain unchanged.
void addMaterializingOutputActions();
enum class MatchColumnsMode
{
......@@ -275,7 +279,7 @@ public:
SplitResult splitActionsForFilter(const std::string & column_name) const;
private:
Node & addNode(Node node, bool can_replace = false);
Node & addNode(Node node, bool can_replace = false, bool add_to_index = true);
Node & getNode(const std::string & name);
Node & addAlias(Node & child, std::string alias, bool can_replace);
......@@ -283,7 +287,8 @@ private:
const FunctionOverloadResolverPtr & function,
Inputs children,
std::string result_name,
bool can_replace);
bool can_replace,
bool add_to_index = true);
ActionsDAGPtr cloneEmpty() const
{
......
......@@ -331,7 +331,7 @@ struct ContextShared
mutable std::optional<ExternalModelsLoader> external_models_loader;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
AccessControlManager access_control_manager;
std::unique_ptr<AccessControlManager> access_control_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
ProcessList process_list; /// Executing queries at the moment.
......@@ -388,7 +388,8 @@ struct ContextShared
Context::ConfigReloadCallback config_reload_callback;
ContextShared()
: macros(std::make_unique<Macros>())
: access_control_manager(std::make_unique<AccessControlManager>())
, macros(std::make_unique<Macros>())
{
/// TODO: make it singleton (?)
static std::atomic<size_t> num_calls{0};
......@@ -434,6 +435,7 @@ struct ContextShared
/// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
/// TODO: Get rid of this.
access_control_manager.reset();
system_logs.reset();
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
......@@ -640,7 +642,7 @@ void Context::setConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
shared->config = config;
shared->access_control_manager.setExternalAuthenticatorsConfig(*shared->config);
shared->access_control_manager->setExternalAuthenticatorsConfig(*shared->config);
}
const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
......@@ -652,25 +654,25 @@ const Poco::Util::AbstractConfiguration & Context::getConfigRef() const
AccessControlManager & Context::getAccessControlManager()
{
return shared->access_control_manager;
return *shared->access_control_manager;
}
const AccessControlManager & Context::getAccessControlManager() const
{
return shared->access_control_manager;
return *shared->access_control_manager;
}
void Context::setExternalAuthenticatorsConfig(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();
shared->access_control_manager.setExternalAuthenticatorsConfig(config);
shared->access_control_manager->setExternalAuthenticatorsConfig(config);
}
void Context::setUsersConfig(const ConfigurationPtr & config)
{
auto lock = getLock();
shared->users_config = config;
shared->access_control_manager.setUsersConfig(*shared->users_config);
shared->access_control_manager->setUsersConfig(*shared->users_config);
}
ConfigurationPtr Context::getUsersConfig()
......
......@@ -12,7 +12,6 @@
#include <Core/Block.h>
#include <Core/NamesAndTypes.h>
#include <Databases/IDatabase.h>
#include <Storages/StorageMemory.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/IdentifierSemantic.h>
......
......@@ -166,7 +166,7 @@ BlockIO InterpreterInsertQuery::execute()
BlockIO res;
StoragePtr table = getTable(query);
auto table_lock = table->lockForShare(context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
auto table_lock = table->lockForShare(context.getInitialQueryId(), settings.lock_acquire_timeout);
auto metadata_snapshot = table->getInMemoryMetadataPtr();
auto query_sample_block = getSampleBlock(query, table, metadata_snapshot);
......@@ -289,7 +289,7 @@ BlockIO InterpreterInsertQuery::execute()
new_settings.max_threads = std::max<UInt64>(1, settings.max_insert_threads);
if (settings.min_insert_block_size_rows)
if (settings.min_insert_block_size_rows && table->prefersLargeBlocks())
new_settings.max_block_size = settings.min_insert_block_size_rows;
Context new_context = context;
......@@ -341,20 +341,22 @@ BlockIO InterpreterInsertQuery::execute()
/// Actually we don't know structure of input blocks from query/table,
/// because some clients break insertion protocol (columns != header)
out = std::make_shared<AddingDefaultBlockOutputStream>(
out, query_sample_block, out->getHeader(), metadata_snapshot->getColumns(), context);
out, query_sample_block, metadata_snapshot->getColumns(), context);
/// It's important to squash blocks as early as possible (before other transforms),
/// because other transforms may work inefficient if block size is small.
/// Do not squash blocks if it is a sync INSERT into Distributed, since it lead to double bufferization on client and server side.
/// Client-side bufferization might cause excessive timeouts (especially in case of big blocks).
if (!(context.getSettingsRef().insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch)
if (!(settings.insert_distributed_sync && table->isRemote()) && !no_squash && !query.watch)
{
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out = std::make_shared<SquashingBlockOutputStream>(
out,
out->getHeader(),
context.getSettingsRef().min_insert_block_size_rows,
context.getSettingsRef().min_insert_block_size_bytes);
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0);
}
auto out_wrapper = std::make_shared<CountingBlockOutputStream>(out);
......
......@@ -606,7 +606,7 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery &)
context.checkAccess(AccessType::SYSTEM_FLUSH_DISTRIBUTED, table_id);
if (auto * storage_distributed = dynamic_cast<StorageDistributed *>(DatabaseCatalog::instance().getTable(table_id, context).get()))
storage_distributed->flushClusterNodesAllData();
storage_distributed->flushClusterNodesAllData(context);
else
throw Exception("Table " + table_id.getNameForLogs() + " is not distributed", ErrorCodes::BAD_ARGUMENTS);
}
......
......@@ -23,7 +23,7 @@ void RewriteCountVariantsVisitor::visit(ASTPtr & node)
void RewriteCountVariantsVisitor::visit(ASTFunction & func)
{
if (func.arguments->children.empty() || func.arguments->children.size() > 1 || !func.arguments->children[0])
if (!func.arguments || func.arguments->children.empty() || func.arguments->children.size() > 1 || !func.arguments->children[0])
return;
auto name = Poco::toLower(func.name);
......
......@@ -7,75 +7,85 @@
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Block.h>
#include <Storages/ColumnsDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/materialize.h>
namespace DB
{
Block addMissingDefaults(
const Block & block,
ActionsDAGPtr addMissingDefaults(
const Block & header,
const NamesAndTypesList & required_columns,
const ColumnsDescription & columns,
const Context & context)
{
/// For missing columns of nested structure, you need to create not a column of empty arrays, but a column of arrays of correct lengths.
/// First, remember the offset columns for all arrays in the block.
std::map<String, ColumnPtr> offset_columns;
std::map<String, Names> nested_groups;
for (size_t i = 0, size = block.columns(); i < size; ++i)
for (size_t i = 0, size = header.columns(); i < size; ++i)
{
const auto & elem = block.getByPosition(i);
const auto & elem = header.getByPosition(i);
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(&*elem.column))
if (typeid_cast<const ColumnArray *>(&*elem.column))
{
String offsets_name = Nested::extractTableName(elem.name);
auto & offsets_column = offset_columns[offsets_name];
/// If for some reason there are different offset columns for one nested structure, then we take nonempty.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsPtr();
auto & group = nested_groups[offsets_name];
if (group.empty())
group.push_back({});
group.push_back(elem.name);
}
}
const size_t rows = block.rows();
Block res;
auto actions = std::make_shared<ActionsDAG>(header.getColumnsWithTypeAndName());
FunctionOverloadResolverPtr func_builder_replicate = FunctionFactory::instance().get("replicate", context);
/// We take given columns from input block and missed columns without default value
/// (default and materialized will be computed later).
for (const auto & column : required_columns)
{
if (block.has(column.name))
{
res.insert(block.getByName(column.name));
if (header.has(column.name))
continue;
}
if (columns.hasDefault(column.name))
continue;
String offsets_name = Nested::extractTableName(column.name);
if (offset_columns.count(offsets_name))
if (nested_groups.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<const DataTypeArray &>(*column.type).getNestedType();
UInt64 nested_rows = rows ? get<UInt64>((*offsets_column)[rows - 1]) : 0;
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(0);
const auto & constant = actions->addColumn({std::move(nested_column), nested_type, column.name}, true);
auto & group = nested_groups[offsets_name];
group[0] = constant.result_name;
actions->addFunction(func_builder_replicate, group, constant.result_name, context, true);
ColumnPtr nested_column = nested_type->createColumnConstWithDefaultValue(nested_rows)->convertToFullColumnIfConst();
auto new_column = ColumnArray::create(nested_column, offsets_column);
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
continue;
}
/** It is necessary to turn a constant column into a full column, since in part of blocks (from other parts),
* it can be full (or the interpreter may decide that it is constant everywhere).
*/
auto new_column = column.type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst();
res.insert(ColumnWithTypeAndName(std::move(new_column), column.type, column.name));
auto new_column = column.type->createColumnConstWithDefaultValue(0);
actions->addColumn({std::move(new_column), column.type, column.name}, true, true);
}
/// Computes explicitly specified values by default and materialized columns.
evaluateMissingDefaults(res, required_columns, columns, context);
return res;
if (auto dag = evaluateMissingDefaults(actions->getResultColumns(), required_columns, columns, context))
actions = ActionsDAG::merge(std::move(*actions), std::move(*dag));
else
/// Removes unused columns and reorders result.
/// The same is done in evaluateMissingDefaults if not empty dag is returned.
actions->removeUnusedActions(required_columns.getNames());
return actions;
}
}
......@@ -2,6 +2,7 @@
#include <unordered_map>
#include <string>
#include <memory>
namespace DB
......@@ -12,14 +13,17 @@ class Context;
class NamesAndTypesList;
class ColumnsDescription;
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
/** Adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
* 2. Columns, that are missed inside request, but present in table with defaults (columns with default values)
* 3. Columns that materialized from other columns (materialized columns)
* All three types of columns are materialized (not constants).
*/
Block addMissingDefaults(
const Block & block,
ActionsDAGPtr addMissingDefaults(
const Block & header,
const NamesAndTypesList & required_columns,
const ColumnsDescription & columns,
const Context & context);
......
......@@ -24,7 +24,7 @@ namespace
{
/// Add all required expressions for missing columns calculation
void addDefaultRequiredExpressionsRecursively(Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns)
void addDefaultRequiredExpressionsRecursively(const Block & block, const String & required_column, const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns)
{
checkStackSize();
if (block.has(required_column) || added_columns.count(required_column))
......@@ -52,7 +52,7 @@ void addDefaultRequiredExpressionsRecursively(Block & block, const String & requ
}
}
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns)
ASTPtr defaultRequiredExpressions(const Block & block, const NamesAndTypesList & required_columns, const ColumnsDescription & columns)
{
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
......@@ -87,67 +87,29 @@ ASTPtr convertRequiredExpressions(Block & block, const NamesAndTypesList & requi
return conversion_expr_list;
}
void executeExpressionsOnBlock(
Block & block,
ActionsDAGPtr createExpressions(
const Block & header,
ASTPtr expr_list,
bool save_unneeded_columns,
const NamesAndTypesList & required_columns,
const Context & context)
{
if (!expr_list)
return;
if (!save_unneeded_columns)
{
auto syntax_result = TreeRewriter(context).analyze(expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{expr_list, syntax_result, context}.getActions(true)->execute(block);
return;
}
/** ExpressionAnalyzer eliminates "unused" columns, in order to ensure their safety
* we are going to operate on a copy instead of the original block */
Block copy_block{block};
return nullptr;
auto syntax_result = TreeRewriter(context).analyze(expr_list, block.getNamesAndTypesList());
auto syntax_result = TreeRewriter(context).analyze(expr_list, header.getNamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{expr_list, syntax_result, context};
auto required_source_columns = syntax_result->requiredSourceColumns();
auto rows_was = copy_block.rows();
// Delete all not needed columns in DEFAULT expression.
// They can intersect with columns added in PREWHERE
// test 00950_default_prewhere
// CLICKHOUSE-4523
for (const auto & delete_column : copy_block.getNamesAndTypesList())
{
if (std::find(required_source_columns.begin(), required_source_columns.end(), delete_column.name) == required_source_columns.end())
{
copy_block.erase(delete_column.name);
}
}
auto dag = std::make_shared<ActionsDAG>(header.getNamesAndTypesList());
auto actions = expression_analyzer.getActionsDAG(true, !save_unneeded_columns);
dag = ActionsDAG::merge(std::move(*dag), std::move(*actions));
if (copy_block.columns() == 0)
if (save_unneeded_columns)
{
// Add column to indicate block size in execute()
copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared<DataTypeUInt8>(), "__dummy"});
dag->removeUnusedActions(required_columns.getNames());
dag->addMaterializingOutputActions();
}
expression_analyzer.getActions(true)->execute(copy_block);
/// move evaluated columns to the original block, materializing them at the same time
size_t pos = 0;
for (auto col = required_columns.begin(); col != required_columns.end(); ++col, ++pos)
{
if (copy_block.has(col->name))
{
auto evaluated_col = copy_block.getByName(col->name);
evaluated_col.column = evaluated_col.column->convertToFullColumnIfConst();
if (block.has(col->name))
block.getByName(col->name) = std::move(evaluated_col);
else
block.insert(pos, std::move(evaluated_col));
}
}
return dag;
}
}
......@@ -157,19 +119,25 @@ void performRequiredConversions(Block & block, const NamesAndTypesList & require
ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns);
if (conversion_expr_list->children.empty())
return;
executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context);
if (auto dag = createExpressions(block, conversion_expr_list, true, required_columns, context))
{
auto expression = std::make_shared<ExpressionActions>(std::move(dag));
expression->execute(block);
}
}
void evaluateMissingDefaults(Block & block,
ActionsDAGPtr evaluateMissingDefaults(
const Block & header,
const NamesAndTypesList & required_columns,
const ColumnsDescription & columns,
const Context & context, bool save_unneeded_columns)
{
if (!columns.hasDefaults())
return;
return nullptr;
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, columns);
executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context);
ASTPtr expr_list = defaultRequiredExpressions(header, required_columns, columns);
return createExpressions(header, expr_list, save_unneeded_columns, required_columns, context);
}
}
......@@ -2,6 +2,7 @@
#include <unordered_map>
#include <string>
#include <memory>
namespace DB
......@@ -12,9 +13,13 @@ class Context;
class NamesAndTypesList;
class ColumnsDescription;
/// Adds missing defaults to block according to required_columns
/// using columns description
void evaluateMissingDefaults(Block & block,
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
/// Create actions which adds missing defaults to block according to required_columns using columns description.
/// Return nullptr if no actions required.
ActionsDAGPtr evaluateMissingDefaults(
const Block & header,
const NamesAndTypesList & required_columns,
const ColumnsDescription & columns,
const Context & context, bool save_unneeded_columns = true);
......
#include <Processors/QueryPlan/AddingMissedStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/AddingMissedTransform.h>
#include <IO/Operators.h>
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// TODO: check if true later.
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}
AddingMissedStep::AddingMissedStep(
const DataStream & input_stream_,
Block result_header_,
ColumnsDescription columns_,
const Context & context_)
: ITransformingStep(input_stream_, result_header_, getTraits())
, columns(std::move(columns_))
, context(context_)
{
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void AddingMissedStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingMissedTransform>(header, output_stream->header, columns, context);
});
}
}
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
/// Convert one block structure to another. See ConvertingTransform.
class AddingMissedStep : public ITransformingStep
{
public:
AddingMissedStep(const DataStream & input_stream_,
Block result_header_,
ColumnsDescription columns_,
const Context & context_);
String getName() const override { return "AddingMissed"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
ColumnsDescription columns;
const Context & context;
};
}
#include <Processors/Transforms/AddingMissedTransform.h>
#include <Interpreters/addMissingDefaults.h>
namespace DB
{
AddingMissedTransform::AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnsDescription & columns_,
const Context & context_)
: ISimpleTransform(std::move(header_), std::move(result_header_), false)
, columns(columns_), context(context_)
{
}
void AddingMissedTransform::transform(Chunk & chunk)
{
auto num_rows = chunk.getNumRows();
Block src = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto res = addMissingDefaults(src, getOutputPort().getHeader().getNamesAndTypesList(), columns, context);
chunk.setColumns(res.getColumns(), num_rows);
}
}
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
/** This stream adds three types of columns into block
* 1. Columns, that are missed inside request, but present in table without defaults (missed columns)
* 2. Columns, that are missed inside request, but present in table with defaults (columns with default values)
* 3. Columns that materialized from other columns (materialized columns)
* All three types of columns are materialized (not constants).
*/
class AddingMissedTransform : public ISimpleTransform
{
public:
AddingMissedTransform(
Block header_,
Block result_header_,
const ColumnsDescription & columns_,
const Context & context_);
String getName() const override { return "AddingMissed"; }
private:
void transform(Chunk &) override;
const ColumnsDescription columns;
const Context & context;
};
}
......@@ -93,7 +93,6 @@ SRCS(
Port.cpp
QueryPipeline.cpp
QueryPlan/AddingDelayedSourceStep.cpp
QueryPlan/AddingMissedStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/ArrayJoinStep.cpp
QueryPlan/CreatingSetsStep.cpp
......@@ -137,7 +136,6 @@ SRCS(
Sources/SinkToOutputStream.cpp
Sources/SourceFromInputStream.cpp
Sources/SourceWithProgress.cpp
Transforms/AddingMissedTransform.cpp
Transforms/AddingSelectorTransform.cpp
Transforms/AggregatingInOrderTransform.cpp
Transforms/AggregatingTransform.cpp
......
......@@ -800,7 +800,6 @@ bool DynamicQueryHandler::customizeQueryParam(Context & context, const std::stri
std::string DynamicQueryHandler::getQuery(Poco::Net::HTTPServerRequest & request, HTMLForm & params, Context & context)
{
if (likely(!startsWith(request.getContentType(), "multipart/form-data")))
{
/// Part of the query can be passed in the 'query' parameter and the rest in the request body
......
......@@ -22,7 +22,6 @@
#include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Core/ExternalTable.h>
......@@ -1181,33 +1180,44 @@ bool TCPHandler::receiveData(bool scalar)
if (block)
{
if (scalar)
{
/// Scalar value
query_context->addScalar(temporary_id.table_name, block);
else
}
else if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
{
/// If there is an insert request, then the data should be written directly to `state.io.out`.
/// Otherwise, we write the blocks in the temporary `external_table_name` table.
if (!state.need_receive_data_for_insert && !state.need_receive_data_for_input)
/// Data for external tables
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
StoragePtr storage;
/// If such a table does not exist, create it.
if (resolved)
{
auto resolved = query_context->tryResolveStorageID(temporary_id, Context::ResolveExternal);
StoragePtr storage;
/// If such a table does not exist, create it.
if (resolved)
storage = DatabaseCatalog::instance().getTable(resolved, *query_context);
else
{
NamesAndTypesList columns = block.getNamesAndTypesList();
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
/// The data will be written directly to the table.
state.io.out = storage->write(ASTPtr(), metadata_snapshot, *query_context);
storage = DatabaseCatalog::instance().getTable(resolved, *query_context);
}
if (state.need_receive_data_for_input)
state.block_for_input = block;
else
state.io.out->write(block);
{
NamesAndTypesList columns = block.getNamesAndTypesList();
auto temporary_table = TemporaryTableHolder(*query_context, ColumnsDescription{columns}, {});
storage = temporary_table.getTable();
query_context->addExternalTable(temporary_id.table_name, std::move(temporary_table));
}
auto metadata_snapshot = storage->getInMemoryMetadataPtr();
/// The data will be written directly to the table.
auto temporary_table_out = storage->write(ASTPtr(), metadata_snapshot, *query_context);
temporary_table_out->write(block);
temporary_table_out->writeSuffix();
}
else if (state.need_receive_data_for_input)
{
/// 'input' table function.
state.block_for_input = block;
}
else
{
/// INSERT query.
state.io.out->write(block);
}
return true;
}
......
......@@ -131,6 +131,10 @@ public:
/// Returns true if the storage supports reading of subcolumns of complex types.
virtual bool supportsSubcolumns() const { return false; }
/// Requires squashing small blocks to large for optimal storage.
/// This is true for most storages that store data on disk.
virtual bool prefersLargeBlocks() const { return true; }
/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
......
......@@ -186,7 +186,13 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
}
DB::evaluateMissingDefaults(additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
auto dag = DB::evaluateMissingDefaults(
additional_columns, columns, metadata_snapshot->getColumns(), storage.global_context);
if (dag)
{
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
actions->execute(additional_columns);
}
/// Move columns from block.
name_and_type = columns.begin();
......
......@@ -1490,16 +1490,31 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
getPartitionIDFromQuery(command.partition, global_context);
}
/// Some type changes for version column is allowed despite it's a part of sorting key
if (command.type == AlterCommand::MODIFY_COLUMN && command.column_name == merging_params.version_column)
if (command.column_name == merging_params.version_column)
{
const IDataType * new_type = command.data_type.get();
const IDataType * old_type = old_types[command.column_name];
/// Some type changes for version column is allowed despite it's a part of sorting key
if (command.type == AlterCommand::MODIFY_COLUMN)
{
const IDataType * new_type = command.data_type.get();
const IDataType * old_type = old_types[command.column_name];
checkVersionColumnTypesConversion(old_type, new_type, command.column_name);
checkVersionColumnTypesConversion(old_type, new_type, command.column_name);
/// No other checks required
continue;
/// No other checks required
continue;
}
else if (command.type == AlterCommand::DROP_COLUMN)
{
throw Exception(
"Trying to ALTER DROP version " + backQuoteIfNeed(command.column_name) + " column",
ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN);
}
else if (command.type == AlterCommand::RENAME_COLUMN)
{
throw Exception(
"Trying to ALTER RENAME version " + backQuoteIfNeed(command.column_name) + " column",
ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN);
}
}
if (command.type == AlterCommand::MODIFY_ORDER_BY && !is_custom_partitioned)
......
......@@ -140,7 +140,7 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri
/// If we didn't finished last granule than we will continue to write it from new block
if (!last_granule.is_complete)
{
if (settings.blocks_are_granules_size)
if (settings.can_use_adaptive_granularity && settings.blocks_are_granules_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete granules are not allowed while blocks are granules size. "
"Mark number {} (rows {}), rows written in last mark {}, rows to write in last mark from block {} (from row {}), total marks currently {}",
last_granule.mark_number, index_granularity.getMarkRows(last_granule.mark_number), rows_written_in_last_mark,
......@@ -506,7 +506,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
WrittenOffsetColumns offset_columns;
if (rows_written_in_last_mark > 0)
{
if (settings.blocks_are_granules_size)
if (settings.can_use_adaptive_granularity && settings.blocks_are_granules_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Incomplete granule is not allowed while blocks are granules size even for last granule. "
"Mark number {} (rows {}), rows written for last mark {}, total marks {}",
getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount());
......
......@@ -4,7 +4,7 @@
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Processors/QueryPlan/AddingMissedStep.h>
#include <Interpreters/addMissingDefaults.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h>
......@@ -246,10 +246,15 @@ void StorageBuffer::read(
if (query_plan.isInitialized())
{
auto adding_missed = std::make_unique<AddingMissedStep>(
auto actions = addMissingDefaults(
query_plan.getCurrentDataStream().header,
header_after_adding_defaults.getNamesAndTypesList(),
metadata_snapshot->getColumns(),
context);
auto adding_missed = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
header_after_adding_defaults,
metadata_snapshot->getColumns(), context);
std::move(actions));
adding_missed->setStepDescription("Add columns missing in destination table");
query_plan.addStep(std::move(adding_missed));
......
......@@ -681,7 +681,7 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
{
it->second.shutdownAndDropAllData();
it->second.directory_monitor->shutdownAndDropAllData();
it = cluster_nodes_data.erase(it);
}
......@@ -799,16 +799,6 @@ ClusterPtr StorageDistributed::getOptimizedCluster(const Context & context, cons
return cluster;
}
void StorageDistributed::ClusterNodeData::flushAllData() const
{
directory_monitor->flushAllData();
}
void StorageDistributed::ClusterNodeData::shutdownAndDropAllData() const
{
directory_monitor->shutdownAndDropAllData();
}
IColumn::Selector StorageDistributed::createSelector(const ClusterPtr cluster, const ColumnWithTypeAndName & result)
{
const auto & slot_to_shard = cluster->getSlotToShard();
......@@ -892,13 +882,24 @@ ActionLock StorageDistributed::getActionLock(StorageActionBlockType type)
return {};
}
void StorageDistributed::flushClusterNodesAllData()
void StorageDistributed::flushClusterNodesAllData(const Context & context)
{
std::lock_guard lock(cluster_nodes_mutex);
/// Sync SYSTEM FLUSH DISTRIBUTED with TRUNCATE
auto table_lock = lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
std::vector<std::shared_ptr<StorageDistributedDirectoryMonitor>> directory_monitors;
{
std::lock_guard lock(cluster_nodes_mutex);
directory_monitors.reserve(cluster_nodes_data.size());
for (auto & node : cluster_nodes_data)
directory_monitors.push_back(node.second.directory_monitor);
}
/// TODO: Maybe it should be executed in parallel
for (auto & node : cluster_nodes_data)
node.second.flushAllData();
for (auto & node : directory_monitors)
node->flushAllData();
}
void StorageDistributed::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
......
......@@ -114,7 +114,7 @@ public:
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
std::vector<StorageDistributedDirectoryMonitor::Status> getDirectoryMonitorsStatuses() const;
void flushClusterNodesAllData();
void flushClusterNodesAllData(const Context & context);
ClusterPtr getCluster() const;
......@@ -200,11 +200,8 @@ protected:
struct ClusterNodeData
{
std::unique_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
std::shared_ptr<StorageDistributedDirectoryMonitor> directory_monitor;
ConnectionPoolPtr connection_pool;
void flushAllData() const;
void shutdownAndDropAllData() const;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;
......
......@@ -104,33 +104,46 @@ private:
class MemoryBlockOutputStream : public IBlockOutputStream
{
public:
explicit MemoryBlockOutputStream(
MemoryBlockOutputStream(
StorageMemory & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
{
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
const auto size_bytes_diff = block.allocatedBytes();
const auto size_rows_diff = block.rows();
metadata_snapshot->check(block, true);
{
std::lock_guard lock(storage.mutex);
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
new_data->push_back(block);
storage.data.set(std::move(new_data));
new_blocks.emplace_back(block);
}
void writeSuffix() override
{
size_t inserted_bytes = 0;
size_t inserted_rows = 0;
storage.total_size_bytes.fetch_add(size_bytes_diff, std::memory_order_relaxed);
storage.total_size_rows.fetch_add(size_rows_diff, std::memory_order_relaxed);
for (const auto & block : new_blocks)
{
inserted_bytes += block.allocatedBytes();
inserted_rows += block.rows();
}
std::lock_guard lock(storage.mutex);
auto new_data = std::make_unique<Blocks>(*(storage.data.get()));
new_data->insert(new_data->end(), new_blocks.begin(), new_blocks.end());
storage.data.set(std::move(new_data));
storage.total_size_bytes.fetch_add(inserted_bytes, std::memory_order_relaxed);
storage.total_size_rows.fetch_add(inserted_rows, std::memory_order_relaxed);
}
private:
Blocks new_blocks;
StorageMemory & storage;
StorageMetadataPtr metadata_snapshot;
};
......
......@@ -40,9 +40,11 @@ public:
unsigned num_streams) override;
bool supportsParallelInsert() const override { return true; }
bool supportsSubcolumns() const override { return true; }
/// Smaller blocks (e.g. 64K rows) are better for CPU cache.
bool prefersLargeBlocks() const override { return false; }
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & metadata_snapshot, const Context & context) override;
void drop() override;
......
......@@ -11,7 +11,7 @@
<query>SELECT min(d32), max(d32), argMin(x, d32), argMax(x, d32) FROM t</query>
<query>SELECT min(d64), max(d64), argMin(x, d64), argMax(x, d64) FROM t</query>
<query>SELECT min(d128), max(d128), argMin(x, d128), argMax(x, d128) FROM t</query>
<query>SELECT avg(d32), sum(d32), sumWithOverflow(d32) FROM t</query>
<query>SELECT avg(d64), sum(d64), sumWithOverflow(d64) FROM t</query>
<query>SELECT avg(d128), sum(d128), sumWithOverflow(d128) FROM t</query>
......@@ -19,11 +19,11 @@
<query>SELECT uniq(d32), uniqCombined(d32), uniqExact(d32), uniqHLL12(d32) FROM (SELECT * FROM t LIMIT 10000000)</query>
<query>SELECT uniq(d64), uniqCombined(d64), uniqExact(d64), uniqHLL12(d64) FROM (SELECT * FROM t LIMIT 10000000)</query>
<query>SELECT uniq(d128), uniqCombined(d128), uniqExact(d128), uniqHLL12(d128) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT median(d32), medianExact(d32), medianExactWeighted(d32, 2) FROM (SELECT * FROM t LIMIT 10000000)</query>
<query>SELECT median(d64), medianExact(d64), medianExactWeighted(d64, 2) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT median(d128), medianExact(d128), medianExactWeighted(d128, 2) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT quantile(d32), quantileExact(d32), quantileExactWeighted(d32, 2) FROM (SELECT * FROM t LIMIT 10000000)</query>
<query>SELECT quantile(d64), quantileExact(d64), quantileExactWeighted(d64, 2) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT quantile(d128), quantileExact(d128), quantileExactWeighted(d128, 2) FROM (SELECT * FROM t LIMIT 1000000)</query>
......@@ -31,8 +31,8 @@
<query>SELECT quantilesExact(0.1, 0.9)(d32), quantilesExactWeighted(0.1, 0.9)(d32, 2) FROM (SELECT * FROM t LIMIT 10000000)</query>
<query>SELECT quantilesExact(0.1, 0.9)(d64), quantilesExactWeighted(0.1, 0.9)(d64, 2) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT quantilesExact(0.1, 0.9)(d128), quantilesExactWeighted(0.1, 0.9)(d128, 2) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT varPop(d32), varSamp(d32), stddevPop(d32) FROM t</query>
<query>SELECT varPop(d64), varSamp(d64), stddevPop(d64) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT varPop(d128), varSamp(d128), stddevPop(d128) FROM (SELECT * FROM t LIMIT 1000000)</query>
<query>SELECT varPop(d64), varSamp(d64), stddevPop(d64) FROM (SELECT * FROM t LIMIT 10000000)</query>
<query>SELECT varPop(d128), varSamp(d128), stddevPop(d128) FROM (SELECT * FROM t LIMIT 10000000)</query>
</test>
<test>
<create_query>CREATE TABLE test_memory (x UInt64) ENGINE Memory</create_query>
<fill_query>INSERT INTO test_memory SELECT 1 FROM numbers(1000000000)</fill_query>
<query>SELECT sum(x * x + x) FROM test_memory</query>
<drop_query>DROP TABLE IF EXISTS test_memory</drop_query>
</test>
DROP TABLE IF EXISTS numbers_squashed;
CREATE TABLE numbers_squashed (number UInt8) ENGINE = Memory;
CREATE TABLE numbers_squashed (number UInt8) ENGINE = StripeLog;
SET min_insert_block_size_rows = 100;
SET min_insert_block_size_bytes = 0;
......
SET max_insert_threads = 1, max_threads = 100, min_insert_block_size_rows = 1048576, max_block_size = 65536;
CREATE TEMPORARY TABLE t (x UInt64);
DROP TABLE IF EXISTS t;
CREATE TABLE t (x UInt64) ENGINE = StripeLog;
-- For trivial INSERT SELECT, max_threads is lowered to max_insert_threads and max_block_size is changed to min_insert_block_size_rows.
INSERT INTO t SELECT * FROM numbers_mt(1000000);
SET max_threads = 1;
-- If data was inserted by more threads, we will probably see data out of order.
SELECT DISTINCT blockSize(), runningDifference(x) FROM t;
DROP TABLE t;
DROP TABLE IF EXISTS old_school_table;
CREATE TABLE old_school_table
(
key UInt64,
value String
)
ENGINE = MergeTree()
ORDER BY key
SETTINGS index_granularity_bytes = 0, enable_mixed_granularity_parts = 0, min_bytes_for_wide_part = 0,
vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO old_school_table VALUES (1, '1');
INSERT INTO old_school_table VALUES (2, '2');
OPTIMIZE TABLE old_school_table FINAL;
SELECT * FROM old_school_table ORDER BY key;
OPTIMIZE TABLE old_school_table FINAL; -- just to be sure
SELECT * FROM old_school_table ORDER BY key;
ALTER TABLE old_school_table MODIFY SETTING vertical_merge_algorithm_min_rows_to_activate = 10000, vertical_merge_algorithm_min_columns_to_activate = 10000;
OPTIMIZE TABLE old_school_table FINAL; -- and horizontal merge
SELECT * FROM old_school_table ORDER BY key;
DROP TABLE IF EXISTS old_school_table;
DROP TABLE IF EXISTS alter_drop_version;
CREATE TABLE alter_drop_version
(
`key` UInt64,
`value` String,
`ver` Int8
)
ENGINE = ReplacingMergeTree(ver)
ORDER BY key;
INSERT INTO alter_drop_version VALUES (1, '1', 1);
ALTER TABLE alter_drop_version DROP COLUMN ver; --{serverError 524}
ALTER TABLE alter_drop_version RENAME COLUMN ver TO rev; --{serverError 524}
DETACH TABLE alter_drop_version;
ATTACH TABLE alter_drop_version;
SELECT * FROM alter_drop_version;
DROP TABLE IF EXISTS alter_drop_version;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册