提交 66f099cf 编写于 作者: A Alexey Milovidov

Merge branch 'master' of github.com:yandex/ClickHouse

......@@ -450,7 +450,7 @@ void DatabaseOrdinary::alterTable(
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults,
const ASTModifier & engine_modifier)
const ASTModifier & storage_modifier)
{
/// Read the definition of the table and replace the necessary parts with new ones.
......@@ -471,14 +471,10 @@ void DatabaseOrdinary::alterTable(
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(columns, materialized_columns, alias_columns, column_defaults);
auto it = std::find(ast_create_query.children.begin(), ast_create_query.children.end(), ast_create_query.columns);
if (it == ast_create_query.children.end())
throw Exception("Logical error: cannot find columns child in ASTCreateQuery", ErrorCodes::LOGICAL_ERROR);
*it = new_columns;
ast_create_query.columns = new_columns;
if (engine_modifier)
engine_modifier(ast_create_query.storage);
ast_create_query.replace(ast_create_query.columns, new_columns);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);
statement = getTableDefinitionFromCreateQuery(ast);
......
......@@ -27,7 +27,7 @@ String getTableDefinitionFromCreateQuery(const ASTPtr & query)
create.if_not_exists = false;
create.is_populate = false;
String engine = typeid_cast<ASTFunction &>(*create.storage).name;
String engine = create.storage->engine->name;
/// For engine VIEW it is necessary to save the SELECT query itself, for the rest - on the contrary
if (engine != "View" && engine != "MaterializedView")
......@@ -59,7 +59,7 @@ std::pair<String, StoragePtr> createTableFromDefinition(
/// - the database has not been created yet;
/// - the code is simpler, since the query is already brought to a suitable form.
InterpreterCreateQuery::ColumnsInfo columns_info = InterpreterCreateQuery::getColumnsInfo(ast_create_query.columns, context);
InterpreterCreateQuery::ColumnsInfo columns_info = InterpreterCreateQuery::getColumnsInfo(*ast_create_query.columns, context);
String storage_name;
......@@ -68,14 +68,14 @@ std::pair<String, StoragePtr> createTableFromDefinition(
else if (ast_create_query.is_materialized_view)
storage_name = "MaterializedView";
else
storage_name = typeid_cast<ASTFunction &>(*ast_create_query.storage).name;
storage_name = ast_create_query.storage->engine->name;
return
{
ast_create_query.table,
StorageFactory::instance().get(
storage_name, database_data_path, ast_create_query.table, database_name, context,
context.getGlobalContext(), ast, columns_info.columns,
context.getGlobalContext(), ast_create_query, columns_info.columns,
columns_info.materialized_columns, columns_info.alias_columns, columns_info.column_defaults,
true, has_force_restore_data_flag)
};
......
......@@ -107,7 +107,7 @@ public:
IDatabase & to_database,
const String & to_name) = 0;
using ASTModifier = std::function<void(ASTPtr &)>;
using ASTModifier = std::function<void(IAST &)>;
/// Change the table structure in metadata.
/// You must call under the TableStructureLock of the corresponding table . If engine_modifier is empty, then engine does not change.
......
......@@ -79,23 +79,26 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
if (!create.storage)
{
database_engine_name = "Ordinary"; /// Default database engine.
auto func = std::make_shared<ASTFunction>();
func->name = database_engine_name;
create.storage = func;
auto engine = std::make_shared<ASTFunction>();
engine->name = database_engine_name;
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, engine);
create.set(create.storage, storage);
}
else
{
const ASTFunction & engine_id = typeid_cast<const ASTFunction &>(*create.storage);
const ASTStorage & storage = *create.storage;
const ASTFunction & engine = *storage.engine;
/// Currently, there are no database engines, that support any arguments.
if (engine_id.arguments || engine_id.parameters)
if (engine.arguments || engine.parameters
|| storage.partition_by || storage.order_by || storage.sample_by || storage.settings)
{
std::stringstream ostr;
formatAST(*create.storage, ostr, 0, false, false);
formatAST(storage, ostr, 0, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
database_engine_name = engine_id.name;
database_engine_name = engine.name;
}
String database_name_escaped = escapeForFileName(database_name);
......@@ -160,11 +163,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
using ColumnsAndDefaults = std::pair<NamesAndTypesList, ColumnDefaults>;
/// AST to the list of columns with types. Columns of Nested type are expanded into a list of real columns.
static ColumnsAndDefaults parseColumns(
ASTPtr expression_list, const Context & context)
static ColumnsAndDefaults parseColumns(const ASTExpressionList & column_list_ast, const Context & context)
{
auto & column_list_ast = typeid_cast<ASTExpressionList &>(*expression_list);
/// list of table columns in correct order
NamesAndTypesList columns{};
ColumnDefaults defaults{};
......@@ -177,7 +177,7 @@ static ColumnsAndDefaults parseColumns(
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
default_expr_list->children.reserve(column_list_ast.children.size());
for (auto & ast : column_list_ast.children)
for (const auto & ast : column_list_ast.children)
{
auto & col_decl = typeid_cast<ASTColumnDeclaration &>(*ast);
......@@ -346,7 +346,7 @@ ASTPtr InterpreterCreateQuery::formatColumns(NamesAndTypesList columns,
InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::getColumnsInfo(
const ASTPtr & columns, const Context & context)
const ASTExpressionList & columns, const Context & context)
{
ColumnsInfo res;
......@@ -370,7 +370,7 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
if (create.columns)
{
res = getColumnsInfo(create.columns, context);
res = getColumnsInfo(*create.columns, context);
}
else if (!create.as_table.empty())
{
......@@ -391,16 +391,9 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
ASTPtr new_columns = formatColumns(*res.columns, res.materialized_columns, res.alias_columns, res.column_defaults);
if (create.columns)
{
auto it = std::find(create.children.begin(), create.children.end(), create.columns);
if (it != create.children.end())
*it = new_columns;
else
create.children.push_back(new_columns);
}
create.replace(create.columns, new_columns);
else
create.children.push_back(new_columns);
create.columns = new_columns;
create.set(create.columns, new_columns);
/// Check for duplicates
std::set<String> all_columns;
......@@ -421,22 +414,23 @@ InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
}
String InterpreterCreateQuery::setEngine(
ASTCreateQuery & create, const StoragePtr & as_storage) const
String InterpreterCreateQuery::setEngine(ASTCreateQuery & create, const StoragePtr & as_storage) const
{
String storage_name;
auto set_engine = [&](const char * engine)
{
storage_name = engine;
auto func = std::make_shared<ASTFunction>();
func->name = engine;
create.storage = func;
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = engine;
auto storage_ast = std::make_shared<ASTStorage>();
storage_ast->set(storage_ast->engine, engine_ast);
create.set(create.storage, storage_ast);
};
if (create.storage)
{
storage_name = typeid_cast<ASTFunction &>(*create.storage).name;
storage_name = create.storage->engine->name;
}
else if (!create.as_table.empty())
{
......@@ -445,17 +439,17 @@ String InterpreterCreateQuery::setEngine(
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
String as_table_name = create.as_table;
auto as_create_ptr = context.getCreateQuery(as_database_name, as_table_name);
auto & as_create = typeid_cast<const ASTCreateQuery &>(*as_create_ptr);
ASTPtr as_create_ptr = context.getCreateQuery(as_database_name, as_table_name);
const auto & as_create = typeid_cast<const ASTCreateQuery &>(*as_create_ptr);
if (!create.storage)
{
if (as_create.is_view || as_create.is_materialized_view)
create.storage = as_create.inner_storage;
create.set(create.storage, as_create.inner_storage->ptr());
else
create.storage = as_create.storage;
create.set(create.storage, as_create.storage->ptr());
storage_name = typeid_cast<const ASTFunction &>(*create.storage).name;
storage_name = create.storage->engine->name;
}
else
storage_name = as_storage->getName();
......@@ -494,7 +488,8 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
/// For `view` type tables, you may need `sample_block` to get the columns.
if (create.select && (!create.attach || (!create.columns && (create.is_view || create.is_materialized_view))))
{
interpreter_select = std::make_unique<InterpreterSelectQuery>(create.select, context);
create.select->setDatabaseIfNeeded(database_name);
interpreter_select = std::make_unique<InterpreterSelectQuery>(create.select->ptr(), context);
as_select_sample = interpreter_select->getSampleBlock();
}
......@@ -543,7 +538,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
res = StorageFactory::instance().get(
storage_name, data_path, table_name, database_name, context,
context.getGlobalContext(), query_ptr, columns.columns,
context.getGlobalContext(), create, columns.columns,
columns.materialized_columns, columns.alias_columns, columns.column_defaults, create.attach, false);
if (create.is_temporary)
......
......@@ -11,6 +11,7 @@ namespace DB
class Context;
class ASTCreateQuery;
class ASTExpressionList;
class IStorage;
using StoragePtr = std::shared_ptr<IStorage>;
......@@ -52,7 +53,7 @@ public:
};
/// Obtain information about columns, their types and default values, for case when columns in CREATE query is specified explicitly.
static ColumnsInfo getColumnsInfo(const ASTPtr & columns, const Context & context);
static ColumnsInfo getColumnsInfo(const ASTExpressionList & columns, const Context & context);
private:
BlockIO createDatabase(ASTCreateQuery & create);
......
......@@ -12,7 +12,7 @@ class ASTSetQuery;
using ASTPtr = std::shared_ptr<IAST>;
/** Change one or several settings, for session or globally, or just for current context.
/** Change one or several settings for the session or just for the current context.
*/
class InterpreterSetQuery : public IInterpreter
{
......@@ -20,7 +20,7 @@ public:
InterpreterSetQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_), context(context_) {}
/** Usual SET query. Set setting for session or globally (if GLOBAL was specified).
/** Usual SET query. Set setting for the session.
*/
BlockIO execute() override;
......@@ -34,8 +34,6 @@ public:
private:
ASTPtr query_ptr;
Context & context;
void executeImpl(const ASTSetQuery & ast, Context & target);
};
......
......@@ -298,13 +298,11 @@ struct Settings
/* Timeout for DDL query responses from all hosts in cluster. Negative value means infinite. */ \
M(SettingInt64, distributed_ddl_task_timeout, 120) \
\
/** If true, and the date parameter of MergeTree engines is an expression (not a column name), \
* it will be interpreted as the partitioning expression, allowing custom partitions. \
* IMPORTANT: Don't use this setting just yet. \
* It is for testing purposes, the syntax will likely change soon and the server will not be able \
* to load the tables created this way. You have been warned. \
/** If true, allow parameters of storage engines such as partitioning expression, primary key, etc. \
* to be set not in the engine parameters but as separate clauses (PARTITION BY, ORDER BY...) \
* Enable this setting to allow custom MergeTree partitions. \
*/ \
M(SettingBool, experimental_merge_tree_allow_custom_partitions, false) \
M(SettingBool, experimental_allow_extended_storage_definition_syntax, false) \
/* Timeout for flushing data from streaming storages. */ \
M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS) \
/* Schema identifier (used by schema-based formats) */ \
......
......@@ -10,7 +10,7 @@
#include <Common/Stopwatch.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTInsertQuery.h>
......@@ -86,7 +86,7 @@ public:
Context & context_,
const String & database_name_,
const String & table_name_,
const String & engine_,
const String & storage_def_,
size_t flush_interval_milliseconds_);
~SystemLog();
......@@ -105,8 +105,8 @@ private:
Context & context;
const String database_name;
const String table_name;
const String storage_def;
StoragePtr table;
const String engine;
const size_t flush_interval_milliseconds;
using QueueItem = std::pair<bool, LogElement>; /// First element is shutdown flag for thread.
......@@ -142,10 +142,10 @@ template <typename LogElement>
SystemLog<LogElement>::SystemLog(Context & context_,
const String & database_name_,
const String & table_name_,
const String & engine_,
const String & storage_def_,
size_t flush_interval_milliseconds_)
: context(context_),
database_name(database_name_), table_name(table_name_), engine(engine_),
database_name(database_name_), table_name(table_name_), storage_def(storage_def_),
flush_interval_milliseconds(flush_interval_milliseconds_)
{
log = &Logger::get("SystemLog (" + database_name + "." + table_name + ")");
......@@ -328,11 +328,13 @@ void SystemLog<LogElement>::prepareTable()
create->table = table_name;
Block sample = LogElement::createBlock();
create->columns = InterpreterCreateQuery::formatColumns(sample.getColumnsList());
create->set(create->columns, InterpreterCreateQuery::formatColumns(sample.getColumnsList()));
ParserFunction engine_parser;
create->storage = parseQuery(engine_parser, engine.data(), engine.data() + engine.size(), "ENGINE to create table for" + LogElement::name());
ParserStorage storage_parser;
ASTPtr storage_ast = parseQuery(
storage_parser, storage_def.data(), storage_def.data() + storage_def.size(),
"Storage to create table for " + LogElement::name());
create->set(create->storage, storage_ast);
InterpreterCreateQuery(create, context).execute();
......
#include <Core/Block.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ExpressionElementParsers.h>
......@@ -23,7 +22,7 @@ namespace ErrorCodes
}
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(std::shared_ptr<IAST> & node, const Context & context)
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(const ASTPtr & node, const Context & context)
{
ExpressionActionsPtr expr_for_constant_folding = ExpressionAnalyzer(
node, context, nullptr, NamesAndTypesList{{ "_dummy", std::make_shared<DataTypeUInt8>() }}).getConstActions();
......@@ -51,7 +50,7 @@ std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(std::sha
}
ASTPtr evaluateConstantExpressionAsLiteral(ASTPtr & node, const Context & context)
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context)
{
if (typeid_cast<const ASTLiteral *>(node.get()))
return node;
......@@ -61,7 +60,7 @@ ASTPtr evaluateConstantExpressionAsLiteral(ASTPtr & node, const Context & contex
}
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(ASTPtr & node, const Context & context)
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context)
{
if (auto id = typeid_cast<const ASTIdentifier *>(node.get()))
return std::make_shared<ASTLiteral>(node->range, Field(id->name));
......
......@@ -2,13 +2,13 @@
#include <memory>
#include <Core/Field.h>
#include <Parsers/IAST.h>
#include <Parsers/IParser.h>
namespace DB
{
class IAST;
class Context;
class IDataType;
......@@ -17,13 +17,13 @@ class IDataType;
* Used in rare cases - for elements of set for IN, for data to INSERT.
* Quite suboptimal.
*/
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(std::shared_ptr<IAST> & node, const Context & context);
std::pair<Field, std::shared_ptr<IDataType>> evaluateConstantExpression(const ASTPtr & node, const Context & context);
/** Evaluate constant expression
* and returns ASTLiteral with its value.
*/
std::shared_ptr<IAST> evaluateConstantExpressionAsLiteral(std::shared_ptr<IAST> & node, const Context & context);
ASTPtr evaluateConstantExpressionAsLiteral(const ASTPtr & node, const Context & context);
/** Evaluate constant expression
......@@ -31,7 +31,7 @@ std::shared_ptr<IAST> evaluateConstantExpressionAsLiteral(std::shared_ptr<IAST>
* Also, if AST is identifier, then return string literal with its name.
* Useful in places where some name may be specified as identifier, or as result of a constant expression.
*/
std::shared_ptr<IAST> evaluateConstantExpressionOrIdentifierAsLiteral(std::shared_ptr<IAST> & node, const Context & context);
ASTPtr evaluateConstantExpressionOrIdentifierAsLiteral(const ASTPtr & node, const Context & context);
/** Parses a name of an object which could be written in 3 forms:
* name, `name` or 'name' */
......
......@@ -2,6 +2,8 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTQueryWithOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
......@@ -9,6 +11,69 @@
namespace DB
{
class ASTStorage : public IAST
{
public:
ASTFunction * engine = nullptr;
IAST * partition_by = nullptr;
IAST * order_by = nullptr;
IAST * sample_by = nullptr;
ASTSetQuery * settings = nullptr;
ASTStorage() = default;
ASTStorage(StringRange range_) : IAST(range_) {}
String getID() const override { return "Storage definition"; }
ASTPtr clone() const override
{
auto res = std::make_shared<ASTStorage>(*this);
res->children.clear();
if (engine)
res->set(res->engine, engine->clone());
if (partition_by)
res->set(res->partition_by, partition_by->clone());
if (order_by)
res->set(res->order_by, order_by->clone());
if (sample_by)
res->set(res->sample_by, sample_by->clone());
if (settings)
res->set(res->settings, settings->clone());
return res;
}
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override
{
if (engine)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "ENGINE" << (s.hilite ? hilite_none : "") << " = ";
engine->formatImpl(s, state, frame);
}
if (partition_by)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "PARTITION BY " << (s.hilite ? hilite_none : "");
partition_by->formatImpl(s, state, frame);
}
if (order_by)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "ORDER BY " << (s.hilite ? hilite_none : "");
order_by->formatImpl(s, state, frame);
}
if (sample_by)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "SAMPLE BY " << (s.hilite ? hilite_none : "");
sample_by->formatImpl(s, state, frame);
}
if (settings)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << "SETTINGS " << (s.hilite ? hilite_none : "");
settings->formatImpl(s, state, frame);
}
}
};
/// CREATE TABLE or ATTACH TABLE query
class ASTCreateQuery : public ASTQueryWithOutput, public ASTQueryWithOnCluster
......@@ -22,12 +87,12 @@ public:
bool is_temporary{false};
String database;
String table;
ASTPtr columns;
ASTPtr storage;
ASTPtr inner_storage; /// Internal engine for the CREATE MATERIALIZED VIEW query
ASTExpressionList * columns = nullptr;
ASTStorage * storage = nullptr;
ASTStorage * inner_storage = nullptr; /// Internal engine for the CREATE MATERIALIZED VIEW query
String as_database;
String as_table;
ASTPtr select;
ASTSelectQuery * select = nullptr;
ASTCreateQuery() = default;
ASTCreateQuery(const StringRange range_) : ASTQueryWithOutput(range_) {}
......@@ -40,10 +105,14 @@ public:
auto res = std::make_shared<ASTCreateQuery>(*this);
res->children.clear();
if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (storage) { res->storage = storage->clone(); res->children.push_back(res->storage); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (inner_storage) { res->inner_storage = inner_storage->clone(); res->children.push_back(res->inner_storage); }
if (columns)
res->set(res->columns, columns->clone());
if (storage)
res->set(res->storage, storage->clone());
if (select)
res->set(res->select, select->clone());
if (inner_storage)
res->set(res->inner_storage, inner_storage->clone());
cloneOutputOptions(*res);
......@@ -77,10 +146,7 @@ protected:
formatOnCluster(settings);
if (storage)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = ";
storage->formatImpl(settings, state, frame);
}
return;
}
......@@ -119,16 +185,10 @@ protected:
}
if (storage && !is_materialized_view && !is_view)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = ";
storage->formatImpl(settings, state, frame);
}
if (inner_storage)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " ENGINE" << (settings.hilite ? hilite_none : "") << " = ";
inner_storage->formatImpl(settings, state, frame);
}
if (is_populate)
{
......
......@@ -17,14 +17,14 @@ public:
ASTQueryWithOutput() = default;
explicit ASTQueryWithOutput(const StringRange range_) : IAST(range_) {}
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override final;
protected:
/// NOTE: call this helper at the end of the clone() method of descendant class.
void cloneOutputOptions(ASTQueryWithOutput & cloned) const;
/// Format only the query part of the AST (without output options).
virtual void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const = 0;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override final;
};
......
......@@ -315,15 +315,7 @@ void ASTSelectQuery::formatQueryImpl(const FormatSettings & s, FormatState & sta
if (settings)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "SETTINGS " << (s.hilite ? hilite_none : "");
const ASTSetQuery & ast_set = typeid_cast<const ASTSetQuery &>(*settings);
for (ASTSetQuery::Changes::const_iterator it = ast_set.changes.begin(); it != ast_set.changes.end(); ++it)
{
if (it != ast_set.changes.begin())
s.ostr << ", ";
s.ostr << it->name << " = " << applyVisitor(FieldVisitorToString(), it->value);
}
settings->formatImpl(s, state, frame);
}
if (next_union_all)
......
......@@ -14,6 +14,8 @@ namespace DB
class ASTSetQuery : public IAST
{
public:
bool is_standalone = true; /// If false, this AST is a part of another query, such as SELECT.
struct Change
{
String name;
......@@ -31,10 +33,10 @@ public:
ASTPtr clone() const override { return std::make_shared<ASTSetQuery>(*this); }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "SET " << (settings.hilite ? hilite_none : "");
if (is_standalone)
settings.ostr << (settings.hilite ? hilite_keyword : "") << "SET " << (settings.hilite ? hilite_none : "");
for (ASTSetQuery::Changes::const_iterator it = changes.begin(); it != changes.end(); ++it)
{
......
......@@ -20,6 +20,7 @@ namespace ErrorCodes
extern const int NOT_A_COLUMN;
extern const int UNKNOWN_TYPE_OF_AST_NODE;
extern const int UNKNOWN_ELEMENT_IN_AST;
extern const int LOGICAL_ERROR;
}
using IdentifierNameSet = std::set<String>;
......@@ -33,7 +34,7 @@ class WriteBuffer;
/** Element of the syntax tree (hereinafter - directed acyclic graph with elements of semantics)
*/
class IAST
class IAST : public std::enable_shared_from_this<IAST>
{
public:
ASTs children;
......@@ -66,6 +67,8 @@ public:
/** Get the text that identifies this element. */
virtual String getID() const = 0;
ASTPtr ptr() { return shared_from_this(); }
/** Get a deep copy of the tree. */
virtual ASTPtr clone() const = 0;
......@@ -110,6 +113,42 @@ public:
child->collectIdentifierNames(set);
}
template <typename T>
void set(T * & field, const ASTPtr & child)
{
if (!child)
return;
T * casted = dynamic_cast<T *>(child.get());
if (!casted)
throw Exception("Could not cast AST subtree", ErrorCodes::LOGICAL_ERROR);
children.push_back(child);
field = casted;
}
template <typename T>
void replace(T * & field, const ASTPtr & child)
{
if (!child)
throw Exception("Trying to replace AST subtree with nullptr", ErrorCodes::LOGICAL_ERROR);
T * casted = dynamic_cast<T *>(child.get());
if (!casted)
throw Exception("Could not cast AST subtree", ErrorCodes::LOGICAL_ERROR);
for (ASTPtr & current_child : children)
{
if (current_child.get() == field)
{
current_child = child;
field = casted;
return;
}
}
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
}
/// Convert to a string.
......
......@@ -5,6 +5,7 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/ParserSelectQuery.h>
#include <Parsers/ParserSetQuery.h>
namespace DB
......@@ -106,21 +107,78 @@ bool ParserColumnDeclarationList::parseImpl(Pos & pos, ASTPtr & node, Expected &
}
bool ParserEngine::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool ParserStorage::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_engine("ENGINE");
ParserToken s_eq(TokenType::Equals);
ParserIdentifierWithOptionalParameters storage_p;
ParserKeyword s_partition_by("PARTITION BY");
ParserKeyword s_order_by("ORDER BY");
ParserKeyword s_sample_by("SAMPLE BY");
ParserKeyword s_settings("SETTINGS");
if (s_engine.ignore(pos, expected))
ParserIdentifierWithOptionalParameters ident_with_optional_params_p;
ParserExpression expression_p;
ParserSetQuery settings_p(/* parse_only_internals_ = */ true);
Pos begin = pos;
ASTPtr engine;
ASTPtr partition_by;
ASTPtr order_by;
ASTPtr sample_by;
ASTPtr settings;
if (!s_engine.ignore(pos, expected))
return false;
s_eq.ignore(pos, expected);
if (!ident_with_optional_params_p.parse(pos, engine, expected))
return false;
while (true)
{
if (!s_eq.ignore(pos, expected))
return false;
if (!partition_by && s_partition_by.ignore(pos, expected))
{
if (expression_p.parse(pos, partition_by, expected))
continue;
else
return false;
}
if (!storage_p.parse(pos, node, expected))
return false;
if (!order_by && s_order_by.ignore(pos, expected))
{
if (expression_p.parse(pos, order_by, expected))
continue;
else
return false;
}
if (!sample_by && s_sample_by.ignore(pos, expected))
{
if (expression_p.parse(pos, sample_by, expected))
continue;
else
return false;
}
if (s_settings.ignore(pos, expected))
{
if (!settings_p.parse(pos, settings, expected))
return false;
}
break;
}
auto storage = std::make_shared<ASTStorage>(StringRange(begin, pos));
storage->set(storage->engine, engine);
storage->set(storage->partition_by, partition_by);
storage->set(storage->order_by, order_by);
storage->set(storage->sample_by, sample_by);
storage->set(storage->settings, settings);
node = storage;
return true;
}
......@@ -136,16 +194,16 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_database("DATABASE");
ParserKeyword s_if_not_exists("IF NOT EXISTS");
ParserKeyword s_as("AS");
ParserKeyword s_select("SELECT");
ParserKeyword s_view("VIEW");
ParserKeyword s_materialized("MATERIALIZED");
ParserKeyword s_populate("POPULATE");
ParserToken s_dot(TokenType::Dot);
ParserToken s_lparen(TokenType::OpeningRoundBracket);
ParserToken s_rparen(TokenType::ClosingRoundBracket);
ParserEngine engine_p;
ParserStorage storage_p;
ParserIdentifier name_p;
ParserColumnDeclarationList columns_p;
ParserSelectQuery select_p;
ASTPtr database;
ASTPtr table;
......@@ -190,7 +248,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}
engine_p.parse(pos, storage, expected);
storage_p.parse(pos, storage, expected);
}
else if (s_table.ignore(pos, expected))
{
......@@ -222,39 +280,31 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!s_rparen.ignore(pos, expected))
return false;
if (!engine_p.parse(pos, storage, expected))
if (!storage_p.parse(pos, storage, expected) && !is_temporary)
return false;
/// For engine VIEW, you also need to read AS SELECT
if (storage && (typeid_cast<ASTFunction &>(*storage).name == "View"
|| typeid_cast<ASTFunction &>(*storage).name == "MaterializedView"))
if (storage)
{
if (!s_as.ignore(pos, expected))
return false;
Pos before_select = pos;
if (!s_select.ignore(pos, expected))
return false;
pos = before_select;
ParserSelectQuery select_p;
select_p.parse(pos, select, expected);
const auto & storage_ast = typeid_cast<const ASTStorage &>(*storage);
/// For engine VIEW, you also need to read AS SELECT
if (storage_ast.engine->name == "View" || storage_ast.engine->name == "MaterializedView")
{
if (!s_as.ignore(pos, expected))
return false;
if (!select_p.parse(pos, select, expected))
return false;
}
}
}
else
{
engine_p.parse(pos, storage, expected);
storage_p.parse(pos, storage, expected);
if (!s_as.ignore(pos, expected))
return false;
/// AS SELECT ...
Pos before_select = pos;
if (s_select.ignore(pos, expected))
{
pos = before_select;
ParserSelectQuery select_p;
select_p.parse(pos, select, expected);
}
else
if (!select_p.parse(pos, select, expected)) /// AS SELECT ...
{
/// AS [db.]table
if (!name_p.parse(pos, as_table, expected))
......@@ -268,7 +318,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
/// Optional - ENGINE can be specified.
engine_p.parse(pos, storage, expected);
storage_p.parse(pos, storage, expected);
}
}
}
......@@ -315,7 +365,7 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
/// Optional - internal ENGINE for MATERIALIZED VIEW can be specified
engine_p.parse(pos, inner_storage, expected);
storage_p.parse(pos, inner_storage, expected);
if (s_populate.ignore(pos, expected))
is_populate = true;
......@@ -344,25 +394,15 @@ bool ParserCreateQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (table)
query->table = typeid_cast<ASTIdentifier &>(*table).name;
query->cluster = cluster_str;
if (inner_storage)
query->inner_storage = inner_storage;
query->set(query->inner_storage, inner_storage);
query->columns = columns;
query->storage = storage;
query->set(query->columns, columns);
query->set(query->storage, storage);
if (as_database)
query->as_database = typeid_cast<ASTIdentifier &>(*as_database).name;
if (as_table)
query->as_table = typeid_cast<ASTIdentifier &>(*as_table).name;
query->select = select;
if (columns)
query->children.push_back(columns);
if (storage)
query->children.push_back(storage);
if (select)
query->children.push_back(select);
if (inner_storage)
query->children.push_back(inner_storage);
query->set(query->select, select);
return true;
}
......
......@@ -187,11 +187,11 @@ protected:
};
/** ENGINE = name. */
class ParserEngine : public IParserBase
/** ENGINE = name [PARTITION BY expr] [ORDER BY expr] [SAMPLE BY expr] [SETTINGS name = value, ...] */
class ParserStorage : public IParserBase
{
protected:
const char * getName() const { return "ENGINE"; }
const char * getName() const { return "storage definition"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
......
......@@ -47,7 +47,6 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!parse_only_internals)
{
ParserKeyword s_set("SET");
ParserKeyword s_global("GLOBAL");
if (!s_set.ignore(pos, expected))
return false;
......@@ -69,6 +68,7 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
auto query = std::make_shared<ASTSetQuery>(StringRange(begin, pos));
node = query;
query->is_standalone = !parse_only_internals;
query->changes = changes;
return true;
......
......@@ -8,7 +8,7 @@ namespace DB
{
/** Query like this:
* SET [GLOBAL] name1 = value1, name2 = value2, ...
* SET name1 = value1, name2 = value2, ...
*/
class ParserSetQuery : public IParserBase
{
......@@ -19,7 +19,7 @@ protected:
const char * getName() const override { return "SET query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
/// Parse the list `name = value` pairs, without SET [GLOBAL].
/// Parse the list `name = value` pairs, without SET.
bool parse_only_internals;
};
......
......@@ -30,6 +30,7 @@ using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
class ASTCreateQuery;
class IStorage;
......
......@@ -83,7 +83,7 @@ MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor(
const MergeTreeData::DataPartPtr & data_part_, const Names & columns, const Block & sample_block)
: data_part(data_part_)
{
number_of_rows_in_part = data_part->getExactSizeRows();
number_of_rows_in_part = data_part->rows_count;
/// Initialize with sample block untill update won't called.
initialize(sample_block, columns);
}
......
......@@ -86,7 +86,6 @@ MergeTreeData::MergeTreeData(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,
......@@ -96,7 +95,7 @@ MergeTreeData::MergeTreeData(
PartsCleanCallback parts_clean_callback_)
: ITableDeclaration{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
index_granularity(settings_.index_granularity),
merging_params(merging_params_),
settings(settings_),
primary_expr_ast(primary_expr_ast_),
......@@ -110,11 +109,16 @@ MergeTreeData::MergeTreeData(
{
merging_params.check(*columns);
if (primary_expr_ast && merging_params.mode == MergingParams::Unsorted)
throw Exception("Primary key cannot be set for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
if (!primary_expr_ast && merging_params.mode != MergingParams::Unsorted)
throw Exception("Primary key could be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
throw Exception("Primary key can be empty only for UnsortedMergeTree", ErrorCodes::BAD_ARGUMENTS);
initPrimaryKey();
if (sampling_expression && (!primary_expr_ast || !primary_key_sample.has(sampling_expression->getColumnName())))
throw Exception("Sampling expression must be present in the primary key", ErrorCodes::BAD_ARGUMENTS);
MergeTreeDataFormatVersion min_format_version(0);
if (!date_column_name.empty())
{
......@@ -577,7 +581,7 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
time_t current_time = time(nullptr);
ssize_t deadline = (custom_directories_lifetime_seconds >= 0)
? current_time - custom_directories_lifetime_seconds
: current_time - settings.temporary_directories_lifetime;
: current_time - settings.temporary_directories_lifetime.totalSeconds();
/// Delete temporary directories older than a day.
Poco::DirectoryIterator end;
......@@ -622,7 +626,7 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts()
{
if (it->unique() && /// After this ref_count cannot increase.
(*it)->remove_time < now &&
now - (*it)->remove_time > settings.old_parts_lifetime)
now - (*it)->remove_time > settings.old_parts_lifetime.totalSeconds())
{
res.push_back(*it);
all_data_parts.erase(it++);
......@@ -1087,7 +1091,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
else
{
const IDataType & type = *new_primary_key_sample.safeGetByPosition(i).type;
new_index[i] = type.createConstColumn(part->size, type.getDefault())->convertToFullColumnIfConst();
new_index[i] = type.createConstColumn(part->marks_count, type.getDefault())->convertToFullColumnIfConst();
}
}
......@@ -1098,7 +1102,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
WriteBufferFromFile index_file(index_tmp_path);
HashingWriteBuffer index_stream(index_file);
for (size_t i = 0, size = part->size; i < size; ++i)
for (size_t i = 0, marks_count = part->marks_count; i < marks_count; ++i)
for (size_t j = 0; j < new_key_size; ++j)
new_primary_key_sample.getByPosition(j).type->serializeBinary(*new_index[j].get(), i, index_stream);
......@@ -1118,7 +1122,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
/// Apply the expression and write the result to temporary files.
if (expression)
{
MarkRanges ranges{MarkRange(0, part->size)};
MarkRanges ranges{MarkRange(0, part->marks_count)};
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
......
......@@ -62,7 +62,8 @@ namespace ErrorCodes
/// Part directory - / partiiton-id _ min-id _ max-id _ level /
/// Inside the part directory:
/// The same files as for month-partitioned tables, plus
/// partition.dat - contains the value of the partitioning expression
/// count.txt - contains total number of rows in this part.
/// partition.dat - contains the value of the partitioning expression.
/// minmax_[Column].idx - MinMax indexes (see MergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
///
/// Several modes are implemented. Modes determine additional actions during merge:
......@@ -236,7 +237,6 @@ public:
/// primary_expr_ast - expression used for sorting; empty for UnsortedMergeTree.
/// date_column_name - if not empty, the name of the Date column used for partitioning by month.
/// Otherwise, partition_expr_ast is used for partitioning.
/// index_granularity - how many rows correspond to one primary key value.
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData( const String & database_, const String & table_,
......@@ -249,7 +249,6 @@ public:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergingParams & merging_params_,
const MergeTreeSettings & settings_,
const String & log_name_,
......
......@@ -499,7 +499,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
merge_entry->total_size_marks += part->marks_count;
}
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
......@@ -557,7 +557,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->size)),
data, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, part->marks_count)),
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(MergeProgressCallback(
......@@ -691,7 +691,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)},
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->marks_count)},
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(MergeProgressCallbackVerticalStep(
......@@ -755,7 +755,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
to.writeSuffixAndFinalizePart(new_data_part, &all_columns, &checksums_gathered_columns);
/// For convenience, even CollapsingSortedBlockInputStream can not return zero rows.
if (0 == to.marksCount())
if (0 == to.getRowsCount())
throw Exception("Empty part after merge", ErrorCodes::LOGICAL_ERROR);
return new_data_part;
......@@ -862,12 +862,16 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
/// Merge all parts of the partition.
size_t total_input_rows = 0;
for (const MergeTreeData::DataPartPtr & part : parts)
{
total_input_rows += part->rows_count;
std::shared_lock<std::shared_mutex> part_lock(part->columns_lock);
merge_entry->total_size_bytes_compressed += part->size_in_bytes;
merge_entry->total_size_marks += part->size;
merge_entry->total_size_marks += part->marks_count;
}
MergeTreeData::DataPart::ColumnToSize merged_column_to_size;
......@@ -882,22 +886,18 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
BlockInputStreams src_streams;
size_t sum_rows_approx = 0;
const auto rows_total = merge_entry->total_size_marks * data.index_granularity;
for (size_t i = 0; i < parts.size(); ++i)
{
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
MarkRanges ranges(1, MarkRange(0, parts[i]->marks_count));
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names,
ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)
input->setProgressCallback([&merge_entry, total_input_rows] (const Progress & value)
{
const auto new_rows_read = merge_entry->rows_read += value.rows;
merge_entry->progress = static_cast<Float64>(new_rows_read) / rows_total;
merge_entry->progress = static_cast<Float64>(new_rows_read) / total_input_rows;
merge_entry->bytes_read_uncompressed += value.bytes;
});
......@@ -906,8 +906,6 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
std::make_shared<ExpressionBlockInputStream>(BlockInputStreamPtr(std::move(input)), data.getPrimaryExpression())));
else
src_streams.emplace_back(std::move(input));
sum_rows_approx += parts[i]->size * data.index_granularity;
}
/// Sharding of merged blocks.
......@@ -1038,7 +1036,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
if (disk_reservation)
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / sum_rows_approx)) * initial_reservation));
disk_reservation->update(static_cast<size_t>((1 - std::min(1., 1. * rows_written / total_input_rows)) * initial_reservation));
}
}
......@@ -1050,7 +1048,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
abortReshardPartitionIfRequested();
MergedBlockOutputStreamPtr & output_stream = per_shard_output.at(shard_no);
if (0 == output_stream->marksCount())
if (0 == output_stream->getRowsCount())
{
/// There was no data in this shard. Ignore.
LOG_WARNING(log, "No data in partition for shard " + job.paths[shard_no].first);
......
......@@ -319,6 +319,7 @@ void MergeTreeDataPart::MinMaxIndex::store(const MergeTreeData & storage, const
HashingWriteBuffer out_hashing(out);
type->serializeBinary(min_values[i], out_hashing);
type->serializeBinary(max_values[i], out_hashing);
out_hashing.next();
checksums.files[file_name].file_size = out_hashing.count();
checksums.files[file_name].file_hash = out_hashing.getHash();
}
......@@ -426,43 +427,6 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
}
size_t MergeTreeDataPart::getExactSizeRows() const
{
size_t rows_approx = storage.index_granularity * size;
for (const NameAndTypePair & column : columns)
{
ColumnPtr column_col = column.type->createColumn();
const auto checksum = tryGetBinChecksum(column.name);
/// Should be fixed non-nullable column
if (!checksum || !column_col->isFixed() || column_col->isNullable())
continue;
size_t sizeof_field = column_col->sizeOfField();
size_t rows = checksum->uncompressed_size / sizeof_field;
if (checksum->uncompressed_size % sizeof_field != 0)
{
throw Exception(
"Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size)
+ ", sizeof " + toString(sizeof_field),
ErrorCodes::LOGICAL_ERROR);
}
if (!(rows_approx - storage.index_granularity < rows && rows <= rows_approx))
{
throw Exception("Unexpected size of column " + column.name + ": " + toString(rows) + " rows",
ErrorCodes::LOGICAL_ERROR);
}
return rows;
}
throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR);
}
String MergeTreeDataPart::getFullPath() const
{
if (relative_path.empty())
......@@ -647,6 +611,7 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
loadColumns(require_columns_checksums);
loadChecksums(require_columns_checksums);
loadIndex();
loadRowsCount(); /// Must be called after loadIndex() as it uses the value of `marks_count`.
loadPartitionAndMinMaxIndex();
if (check_consistency)
checkConsistency(require_columns_checksums);
......@@ -655,13 +620,12 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
void MergeTreeDataPart::loadIndex()
{
/// Size - in number of marks.
if (!size)
if (!marks_count)
{
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
size = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk")
marks_count = Poco::File(getFullPath() + escapeForFileName(columns.front().name) + ".mrk")
.getSize() / MERGE_TREE_MARK_SIZE;
}
......@@ -675,20 +639,20 @@ void MergeTreeDataPart::loadIndex()
for (size_t i = 0; i < key_size; ++i)
{
index[i] = storage.primary_key_data_types[i]->createColumn();
index[i]->reserve(size);
index[i]->reserve(marks_count);
}
String index_path = getFullPath() + "primary.idx";
ReadBufferFromFile index_file = openForReading(index_path);
for (size_t i = 0; i < size; ++i)
for (size_t i = 0; i < marks_count; ++i)
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_data_types[j]->deserializeBinary(*index[j].get(), index_file);
for (size_t i = 0; i < key_size; ++i)
if (index[i]->size() != size)
if (index[i]->size() != marks_count)
throw Exception("Cannot read all data from index file " + index_path
+ "(expected size: " + toString(size) + ", read: " + toString(index[i]->size()) + ")",
+ "(expected size: " + toString(marks_count) + ", read: " + toString(index[i]->size()) + ")",
ErrorCodes::CANNOT_READ_ALL_DATA);
if (!index_file.eof())
......@@ -740,6 +704,54 @@ void MergeTreeDataPart::loadChecksums(bool require)
assertEOF(file);
}
void MergeTreeDataPart::loadRowsCount()
{
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
String path = getFullPath() + "count.txt";
if (!Poco::File(path).exists())
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
ReadBufferFromFile file = openForReading(path);
readIntText(rows_count, file);
assertEOF(file);
}
else
{
size_t rows_approx = storage.index_granularity * marks_count;
for (const NameAndTypePair & column : columns)
{
ColumnPtr column_col = column.type->createColumn();
const auto checksum = tryGetBinChecksum(column.name);
/// Should be fixed non-nullable column
if (!checksum || !column_col->isFixed() || column_col->isNullable())
continue;
size_t sizeof_field = column_col->sizeOfField();
rows_count = checksum->uncompressed_size / sizeof_field;
if (checksum->uncompressed_size % sizeof_field != 0)
{
throw Exception(
"Column " + column.name + " has indivisible uncompressed size " + toString(checksum->uncompressed_size)
+ ", sizeof " + toString(sizeof_field),
ErrorCodes::LOGICAL_ERROR);
}
if (!(rows_count <= rows_approx && rows_approx < rows_count + storage.index_granularity))
throw Exception(
"Unexpected size of column " + column.name + ": " + toString(rows_count) + " rows",
ErrorCodes::LOGICAL_ERROR);
return;
}
throw Exception("Data part doesn't contain fixed size column (even Date column)", ErrorCodes::LOGICAL_ERROR);
}
}
void MergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
{
std::shared_lock<std::shared_mutex> part_lock(columns_lock);
......@@ -799,6 +811,9 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
if (!checksums.files.count("count.txt"))
throw Exception("No checksum for count.txt", ErrorCodes::NO_FILE_IN_DATA_PART);
if (storage.partition_expr && !checksums.files.count("partition.dat"))
throw Exception("No checksum for partition.dat", ErrorCodes::NO_FILE_IN_DATA_PART);
......@@ -827,6 +842,8 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
{
check_file_not_empty(path + "count.txt");
if (storage.partition_expr)
check_file_not_empty(path + "partition.dat");
......
......@@ -108,9 +108,6 @@ struct MergeTreeDataPart
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinumumCompressedSize() const;
/// If part has column with fixed size, will return exact size of part (in rows)
size_t getExactSizeRows() const;
/// Returns full path to part dir
String getFullPath() const;
......@@ -132,7 +129,8 @@ struct MergeTreeDataPart
/// Examples: 'detached/tmp_fetch_<name>', 'tmp_<name>', '<name>'
mutable String relative_path;
size_t size = 0; /// in number of marks.
size_t rows_count = 0;
size_t marks_count = 0;
std::atomic<size_t> size_in_bytes {0}; /// size in bytes, 0 - if not counted;
/// is used from several threads without locks (it is changed with ALTER).
time_t modification_time = 0;
......@@ -239,9 +237,13 @@ private:
/// If checksums.txt exists, reads files' checksums (and sizes) from it
void loadChecksums(bool require);
/// Loads index file. Also calculates this->size if size=0
/// Loads index file. Also calculates this->marks_count if marks_count = 0
void loadIndex();
/// Load rows count for this part from disk (for the newer storage format version).
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
void loadPartitionAndMinMaxIndex();
void checkConsistency(bool require_part_metadata);
......
......@@ -526,7 +526,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
else
ranges.ranges = MarkRanges{MarkRange{0, part->size}};
ranges.ranges = MarkRanges{MarkRange{0, part->marks_count}};
if (!ranges.ranges.empty())
{
......
......@@ -125,6 +125,7 @@ void MergeTreePartition::store(const MergeTreeData & storage, const String & par
HashingWriteBuffer out_hashing(out);
for (size_t i = 0; i < value.size(); ++i)
storage.partition_expr_column_types[i]->serializeBinary(value[i], out_hashing);
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
}
......
......@@ -403,7 +403,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
std::string filename = name + NULL_MAP_EXTENSION;
streams.emplace(filename, std::make_unique<Stream>(
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->size,
path + escaped_column_name, NULL_MAP_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
......@@ -425,7 +425,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
if (!streams.count(size_name))
streams.emplace(size_name, std::make_unique<Stream>(
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->size,
path + escaped_size_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
......@@ -436,7 +436,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
}
else
streams.emplace(name, std::make_unique<Stream>(
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->size,
path + escaped_column_name, DATA_FILE_EXTENSION, data_part->marks_count,
all_mark_ranges, mark_cache, save_marks_in_cache,
uncompressed_cache, aio_threshold, max_read_buffer_size, profile_callback, clock_type));
}
......
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
extern const int BAD_ARGUMENTS;
}
void MergeTreeSettings::loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
if (!config.has(config_elem))
return;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
for (const String & key : config_keys)
{
String value = config.getString(config_elem + "." + key);
#define SET(TYPE, NAME, DEFAULT) \
else if (key == #NAME) NAME.set(value);
if (false) {}
APPLY_FOR_MERGE_TREE_SETTINGS(SET)
else
throw Exception("Unknown MergeTree setting " + key + " in config", ErrorCodes::INVALID_CONFIG_PARAMETER);
#undef SET
}
}
void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
for (const ASTSetQuery::Change & setting : storage_def.settings->changes)
{
#define SET(TYPE, NAME, DEFAULT) \
else if (setting.name == #NAME) NAME.set(setting.value);
if (false) {}
APPLY_FOR_MERGE_TREE_SETTINGS(SET)
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + storage_def.engine->name,
ErrorCodes::BAD_ARGUMENTS);
#undef SET
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
ASTSetQuery::Changes & changes = storage_def.settings->changes;
#define ADD_IF_ABSENT(NAME) \
if (std::find_if(changes.begin(), changes.end(), \
[](const ASTSetQuery::Change & c) { return c.name == #NAME; }) \
== changes.end()) \
changes.push_back(ASTSetQuery::Change{#NAME, NAME.value});
APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(ADD_IF_ABSENT);
#undef ADD_IF_ABSENT
}
}
......@@ -3,173 +3,143 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/Defines.h>
#include <Core/Types.h>
#include <Common/Exception.h>
#include <Interpreters/SettingsCommon.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_CONFIG_PARAMETER;
}
class ASTStorage;
/** Advanced settings of MergeTree.
* Could be loaded from config.
/** Settings for the MergeTree family of engines.
* Could be loaded from config or from a CREATE TABLE query (SETTINGS clause).
*/
struct MergeTreeSettings
{
/** Merge settings. */
/// Maximum in total size of parts to merge, when there are maximum (minimum) free threads in background pool (or entries in replication queue).
size_t max_bytes_to_merge_at_max_space_in_pool = size_t(150) * 1024 * 1024 * 1024;
size_t max_bytes_to_merge_at_min_space_in_pool = 1024 * 1024;
/// How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue.
size_t max_replicated_merges_in_queue = 16;
/// When there is less than specified number of free entries in pool (or replicated queue),
/// start to lower maximum size of merge to process (or to put in queue).
/// This is to allow small merges to process - not filling the pool with long running merges.
size_t number_of_free_entries_in_pool_to_lower_max_size_of_merge = 8;
/// How many seconds to keep obsolete parts.
time_t old_parts_lifetime = 8 * 60;
/// How many seconds to keep tmp_-directories.
time_t temporary_directories_lifetime = 86400;
/** Inserts settings. */
/// If table contains at least that many active parts, artificially slow down insert into table.
size_t parts_to_delay_insert = 150;
/// If more than this number active parts, throw 'Too much parts ...' exception
size_t parts_to_throw_insert = 300;
/// Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts.
size_t max_delay_to_insert = 1;
/** Replication settings. */
/// How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted).
size_t replicated_deduplication_window = 100;
/// Similar to previous, but determines old blocks by their lifetime.
/// Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one "window".
/// You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.
size_t replicated_deduplication_window_seconds = 7 * 24 * 60 * 60; /// one week
/// Keep about this number of last records in ZooKeeper log, even if they are obsolete.
/// It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.
size_t replicated_logs_to_keep = 100;
/// After specified amount of time passed after replication log entry creation
/// and sum size of parts is greater than threshold,
/// prefer fetching merged part from replica instead of doing merge locally.
/// To speed up very long merges.
time_t prefer_fetch_merged_part_time_threshold = 3600;
size_t prefer_fetch_merged_part_size_threshold = 10ULL * 1024 * 1024 * 1024;
/// Max broken parts, if more - deny automatic deletion.
size_t max_suspicious_broken_parts = 10;
/// Not apply ALTER if number of files for modification(deletion, addition) more than this.
size_t max_files_to_modify_in_alter_columns = 75;
/// Not apply ALTER, if number of files for deletion more than this.
size_t max_files_to_remove_in_alter_columns = 50;
/// If ratio of wrong parts to total number of parts is less than this - allow to start.
double replicated_max_ratio_of_wrong_parts = 0.5;
/// Limit parallel fetches
size_t replicated_max_parallel_fetches = 0;
size_t replicated_max_parallel_fetches_for_table = 0;
/// Limit parallel sends
size_t replicated_max_parallel_sends = 0;
size_t replicated_max_parallel_sends_for_table = 0;
/// If true, Replicated tables replicas on this node will try to acquire leadership.
bool replicated_can_become_leader = true;
/// In seconds.
size_t zookeeper_session_expiration_check_period = 60;
/** Check delay of replicas settings. */
/// Period to check replication delay and compare with other replicas.
size_t check_delay_period = 60;
/// Period to clean old queue logs, blocks hashes and parts
size_t cleanup_delay_period = 30;
/// Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited.
size_t min_relative_delay_to_yield_leadership = 120;
/// Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.
size_t min_relative_delay_to_close = 300;
/// Minimal absolute delay to close, stop serving requests and not return Ok during status check.
size_t min_absolute_delay_to_close = 0;
/// Enable usage of Vertical merge algorithm.
size_t enable_vertical_merge_algorithm = 1;
/// Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm
size_t vertical_merge_algorithm_min_rows_to_activate = 16 * DEFAULT_MERGE_BLOCK_SIZE;
/// Minimal amount of non-PK columns to activate Vertical merge algorithm
size_t vertical_merge_algorithm_min_columns_to_activate = 11;
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config)
{
#define SET(NAME, GETTER) \
try \
{ \
NAME = config.GETTER(config_elem + "." #NAME, NAME); \
} \
catch (const Poco::Exception & e) \
{ \
throw Exception( \
"Invalid config parameter: " + config_elem + "/" #NAME + ": " + e.message() + ".", \
ErrorCodes::INVALID_CONFIG_PARAMETER); \
}
SET(max_bytes_to_merge_at_max_space_in_pool, getUInt64);
SET(max_bytes_to_merge_at_min_space_in_pool, getUInt64);
SET(max_replicated_merges_in_queue, getUInt64);
SET(number_of_free_entries_in_pool_to_lower_max_size_of_merge, getUInt64);
SET(old_parts_lifetime, getUInt64);
SET(temporary_directories_lifetime, getUInt64);
SET(parts_to_delay_insert, getUInt64);
SET(parts_to_throw_insert, getUInt64);
SET(max_delay_to_insert, getUInt64);
SET(replicated_deduplication_window, getUInt64);
SET(replicated_deduplication_window_seconds, getUInt64);
SET(replicated_logs_to_keep, getUInt64);
SET(prefer_fetch_merged_part_time_threshold, getUInt64);
SET(prefer_fetch_merged_part_size_threshold, getUInt64);
SET(max_suspicious_broken_parts, getUInt64);
SET(max_files_to_modify_in_alter_columns, getUInt64);
SET(max_files_to_remove_in_alter_columns, getUInt64);
SET(replicated_max_ratio_of_wrong_parts, getDouble);
SET(replicated_max_parallel_fetches, getUInt64);
SET(replicated_max_parallel_fetches_for_table, getUInt64);
SET(replicated_max_parallel_sends, getUInt64);
SET(replicated_max_parallel_sends_for_table, getUInt64);
SET(replicated_can_become_leader, getBool);
SET(zookeeper_session_expiration_check_period, getUInt64);
SET(check_delay_period, getUInt64);
SET(cleanup_delay_period, getUInt64);
SET(min_relative_delay_to_yield_leadership, getUInt64);
SET(min_relative_delay_to_close, getUInt64);
SET(min_absolute_delay_to_close, getUInt64);
SET(enable_vertical_merge_algorithm, getUInt64);
SET(vertical_merge_algorithm_min_rows_to_activate, getUInt64);
SET(vertical_merge_algorithm_min_columns_to_activate, getUInt64);
#undef SET
}
#define APPLY_FOR_MERGE_TREE_SETTINGS(M) \
/** How many rows correspond to one primary key value. */ \
M(SettingUInt64, index_granularity, 8192) \
\
/** Merge settings. */ \
\
/** Maximum in total size of parts to merge, when there are maximum (minimum) free threads \
* in background pool (or entries in replication queue). */ \
M(SettingUInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024) \
M(SettingUInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024) \
\
/** How many tasks of merging parts are allowed simultaneously in ReplicatedMergeTree queue. */ \
M(SettingUInt64, max_replicated_merges_in_queue, 16) \
\
/** When there is less than specified number of free entries in pool (or replicated queue), \
* start to lower maximum size of merge to process (or to put in queue). \
* This is to allow small merges to process - not filling the pool with long running merges. */ \
M(SettingUInt64, number_of_free_entries_in_pool_to_lower_max_size_of_merge, 8) \
\
/** How many seconds to keep obsolete parts. */ \
M(SettingSeconds, old_parts_lifetime, 8 * 60) \
\
/** How many seconds to keep tmp_-directories. */ \
M(SettingSeconds, temporary_directories_lifetime, 86400) \
\
/** Inserts settings. */ \
\
/** If table contains at least that many active parts, artificially slow down insert into table. */ \
M(SettingUInt64, parts_to_delay_insert, 150) \
\
/** If more than this number active parts, throw 'Too much parts ...' exception */ \
M(SettingUInt64, parts_to_throw_insert, 300) \
\
/** Max delay of inserting data into MergeTree table in seconds, if there are a lot of unmerged parts. */ \
M(SettingUInt64, max_delay_to_insert, 1) \
\
/** Replication settings. */ \
\
/** How many last blocks of hashes should be kept in ZooKeeper (old blocks will be deleted). */ \
M(SettingUInt64, replicated_deduplication_window, 100) \
/** Similar to previous, but determines old blocks by their lifetime. \
* Hash of an inserted block will be deleted (and the block will not be deduplicated after) \
* if it outside of one "window". You can set very big replicated_deduplication_window to avoid \
* duplicating INSERTs during that period of time. */ \
M(SettingUInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60) /** one week */ \
\
/** Keep about this number of last records in ZooKeeper log, even if they are obsolete. \
* It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning. */ \
M(SettingUInt64, replicated_logs_to_keep, 100) \
\
/** After specified amount of time passed after replication log entry creation \
* and sum size of parts is greater than threshold, \
* prefer fetching merged part from replica instead of doing merge locally. \
* To speed up very long merges. */ \
M(SettingSeconds, prefer_fetch_merged_part_time_threshold, 3600) \
M(SettingUInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024) \
\
/** Max broken parts, if more - deny automatic deletion. */ \
M(SettingUInt64, max_suspicious_broken_parts, 10) \
\
/** Not apply ALTER if number of files for modification(deletion, addition) more than this. */ \
M(SettingUInt64, max_files_to_modify_in_alter_columns, 75) \
/** Not apply ALTER, if number of files for deletion more than this. */ \
M(SettingUInt64, max_files_to_remove_in_alter_columns, 50) \
\
/** If ratio of wrong parts to total number of parts is less than this - allow to start. */ \
M(SettingFloat, replicated_max_ratio_of_wrong_parts, 0.5) \
\
/** Limit parallel fetches */ \
M(SettingUInt64, replicated_max_parallel_fetches, 0) \
M(SettingUInt64, replicated_max_parallel_fetches_for_table, 0) \
/** Limit parallel sends */ \
M(SettingUInt64, replicated_max_parallel_sends, 0) \
M(SettingUInt64, replicated_max_parallel_sends_for_table, 0) \
\
/** If true, Replicated tables replicas on this node will try to acquire leadership. */ \
M(SettingBool, replicated_can_become_leader, true) \
\
M(SettingSeconds, zookeeper_session_expiration_check_period, 60) \
\
/** Check delay of replicas settings. */ \
\
/** Period to check replication delay and compare with other replicas. */ \
M(SettingUInt64, check_delay_period, 60) \
\
/** Period to clean old queue logs, blocks hashes and parts */ \
M(SettingUInt64, cleanup_delay_period, 30) \
\
/** Minimal delay from other replicas to yield leadership. Here and further 0 means unlimited. */ \
M(SettingUInt64, min_relative_delay_to_yield_leadership, 120) \
\
/** Minimal delay from other replicas to close, stop serving requests and not return Ok \
* during status check. */ \
M(SettingUInt64, min_relative_delay_to_close, 300) \
\
/** Minimal absolute delay to close, stop serving requests and not return Ok during status check. */ \
M(SettingUInt64, min_absolute_delay_to_close, 0) \
\
/** Enable usage of Vertical merge algorithm. */ \
M(SettingUInt64, enable_vertical_merge_algorithm, 1) \
\
/** Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm */ \
M(SettingUInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * DEFAULT_MERGE_BLOCK_SIZE) \
\
/** Minimal amount of non-PK columns to activate Vertical merge algorithm */ \
M(SettingUInt64, vertical_merge_algorithm_min_columns_to_activate, 11)
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \
M(index_granularity)
#define DECLARE(TYPE, NAME, DEFAULT) \
TYPE NAME {DEFAULT};
APPLY_FOR_MERGE_TREE_SETTINGS(DECLARE)
#undef DECLARE
public:
void loadFromConfig(const String & config_elem, Poco::Util::AbstractConfiguration & config);
/// NOTE: will rewrite the AST to add immutable settings.
void loadFromQuery(ASTStorage & storage_def);
};
}
......@@ -408,7 +408,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
column_streams.clear();
if (marks_count == 0)
if (rows_count == 0)
{
/// A part is empty - all records are deleted.
Poco::File(part_path).remove(true);
......@@ -419,6 +419,13 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
{
new_part->partition.store(storage, part_path, checksums);
new_part->minmax_idx.store(storage, part_path, checksums);
WriteBufferFromFile count_out(part_path + "count.txt", 4096);
HashingWriteBuffer count_out_hashing(count_out);
writeIntText(rows_count, count_out_hashing);
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
}
{
......@@ -433,7 +440,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
checksums.write(out);
}
new_part->size = marks_count;
new_part->rows_count = rows_count;
new_part->marks_count = marks_count;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
new_part->index.swap(index_columns);
......@@ -441,11 +449,6 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
new_part->size_in_bytes = MergeTreeData::DataPart::calcTotalSize(new_part->getFullPath());
}
size_t MergedBlockOutputStream::marksCount()
{
return marks_count;
}
void MergedBlockOutputStream::init()
{
Poco::File(part_path).createDirectories();
......@@ -525,6 +528,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
rows_count += rows;
{
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here (maybe in context of INSERT query),
......
......@@ -130,8 +130,8 @@ public:
const NamesAndTypesList * total_columns_list = nullptr,
MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr);
/// How many marks are already written.
size_t marksCount();
/// How many rows are already written.
size_t getRowsCount() const { return rows_count; }
private:
void init();
......@@ -145,6 +145,7 @@ private:
NamesAndTypesList columns_list;
String part_path;
size_t rows_count = 0;
size_t marks_count = 0;
std::unique_ptr<WriteBufferFromFile> index_file_stream;
......
......@@ -85,7 +85,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
std::sort(entries.begin(), entries.end());
/// We will not touch the last `replicated_logs_to_keep` records.
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep), entries.end());
entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.replicated_logs_to_keep.value), entries.end());
/// We will not touch records that are no less than `min_pointer`.
entries.erase(std::lower_bound(entries.begin(), entries.end(), "log-" + padIndex(min_pointer)), entries.end());
......@@ -164,7 +164,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold("", RequiredStat(time_threshold));
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window);
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window.value);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
......
......@@ -50,7 +50,7 @@ void ReplicatedMergeTreeRestartingThread::run()
constexpr auto retry_period_ms = 10 * 1000;
/// The frequency of checking expiration of session in ZK.
Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period * 1000;
Int64 check_period_ms = storage.data.settings.zookeeper_session_expiration_check_period.totalSeconds() * 1000;
/// Periodicity of checking lag of replica.
if (check_period_ms > static_cast<Int64>(storage.data.settings.check_delay_period) * 1000)
......
......@@ -16,19 +16,18 @@ namespace DB
StoragePtr StorageDictionary::create(
const String & table_name,
Context & context,
ASTPtr & query,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
const ColumnDefaults & column_defaults)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query);
const ASTFunction & function = typeid_cast<const ASTFunction &> (*create.storage);
const ASTFunction & engine = *query.storage->engine;
String dictionary_name;
if (function.arguments)
if (engine.arguments)
{
std::stringstream iss;
function.arguments->format(IAST::FormatSettings(iss, false, false));
engine.arguments->format(IAST::FormatSettings(iss, false, false));
dictionary_name = iss.str();
}
......
......@@ -24,7 +24,7 @@ class StorageDictionary : private ext::shared_ptr_helper<StorageDictionary>, pub
public:
static StoragePtr create(const String & table_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
......
......@@ -24,7 +24,7 @@ public:
const String & database_name,
Context & local_context,
Context & context,
ASTPtr & query,
ASTCreateQuery & query,
NamesAndTypesListPtr columns,
const NamesAndTypesList & materialized_columns,
const NamesAndTypesList & alias_columns,
......
......@@ -57,7 +57,7 @@ StorageMaterializedView::StorageMaterializedView(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
......@@ -66,20 +66,13 @@ StorageMaterializedView::StorageMaterializedView(
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), context(context_), columns(columns_)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_);
if (!create.select)
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
if (!create.inner_storage)
if (!query.inner_storage)
throw Exception("ENGINE of MaterializedView should be specified explicitly", ErrorCodes::INCORRECT_QUERY);
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*create.select);
/// If the internal query does not specify a database, retrieve it from the context and write it to the query.
select.setDatabaseIfNeeded(database_name);
extractDependentTable(select, select_database_name, select_table_name);
extractDependentTable(*query.select, select_database_name, select_table_name);
if (!select_table_name.empty())
context.getGlobalContext().addDependency(
......@@ -87,7 +80,7 @@ StorageMaterializedView::StorageMaterializedView(
DatabaseAndTableName(database_name, table_name));
String inner_table_name = getInnerTableName();
inner_query = create.select;
inner_query = query.select->ptr();
/// If there is an ATTACH request, then the internal table must already be connected.
if (!attach_)
......@@ -96,10 +89,8 @@ StorageMaterializedView::StorageMaterializedView(
auto manual_create_query = std::make_shared<ASTCreateQuery>();
manual_create_query->database = database_name;
manual_create_query->table = inner_table_name;
manual_create_query->columns = create.columns;
manual_create_query->children.push_back(manual_create_query->columns);
manual_create_query->storage = create.inner_storage;
manual_create_query->children.push_back(manual_create_query->storage);
manual_create_query->set(manual_create_query->columns, query.columns->ptr());
manual_create_query->set(manual_create_query->storage, query.inner_storage->ptr());
/// Execute the query.
try
......@@ -176,5 +167,4 @@ StoragePtr StorageMaterializedView::getInnerTable() const
return context.getTable(database_name, getInnerTableName());
}
}
......@@ -58,7 +58,7 @@ private:
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
......
......@@ -44,10 +44,9 @@ StorageMergeTree::StorageMergeTree(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_)
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
context(context_), background_pool(context_.getBackgroundPool()),
......@@ -55,7 +54,7 @@ StorageMergeTree::StorageMergeTree(
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
sampling_expression_, index_granularity_, merging_params_,
sampling_expression_, merging_params_,
settings_, database_name_ + "." + table_name, false, attach),
reader(data), writer(data), merger(data, context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
......@@ -198,9 +197,9 @@ void StorageMergeTree::alter(
auto table_hard_lock = lockStructureForAlter(__PRETTY_FUNCTION__);
IDatabase::ASTModifier engine_modifier;
IDatabase::ASTModifier storage_modifier;
if (primary_key_is_modified)
engine_modifier = [&new_primary_key_ast] (ASTPtr & engine_ast)
storage_modifier = [&new_primary_key_ast] (IAST & ast)
{
auto tuple = std::make_shared<ASTFunction>(new_primary_key_ast->range);
tuple->name = "tuple";
......@@ -209,13 +208,14 @@ void StorageMergeTree::alter(
/// Primary key is in the second place in table engine description and can be represented as a tuple.
/// TODO: Not always in second place. If there is a sampling key, then the third one. Fix it.
typeid_cast<ASTExpressionList &>(*typeid_cast<ASTFunction &>(*engine_ast).arguments).children.at(1) = tuple;
auto & storage_ast = typeid_cast<ASTStorage &>(ast);
typeid_cast<ASTExpressionList &>(*storage_ast.engine->arguments).children.at(1) = tuple;
};
context.getDatabase(database_name)->alterTable(
context, table_name,
new_columns, new_materialized_columns, new_alias_columns, new_column_defaults,
engine_modifier);
storage_modifier);
materialized_columns = new_materialized_columns;
alias_columns = new_alias_columns;
......
......@@ -119,7 +119,6 @@ private:
* primary_expr_ast - expression for sorting;
* date_column_name - if not empty, the name of the column with the date used for partitioning by month;
otherwise, partition_expr_ast is used as the partitioning expression;
* index_granularity - fow how many rows one index value is written.
*/
StorageMergeTree(
const String & path_,
......@@ -135,10 +134,9 @@ private:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
/** Determines what parts should be merged and merges it.
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
......
......@@ -188,10 +188,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_)
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, context(context_),
current_zookeeper(context.getZooKeeper()), database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
......@@ -201,7 +200,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
full_path, columns_,
materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
sampling_expression_, index_granularity_, merging_params_,
sampling_expression_, merging_params_,
settings_, database_name_ + "." + table_name, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); },
[this] () { clearOldPartsAndRemoveFromZK(); }),
......@@ -311,18 +310,17 @@ StoragePtr StorageReplicatedMergeTree::create(
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag_,
const MergeTreeSettings & settings_)
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag_)
{
auto res = make_shared(
zookeeper_path_, replica_name_, attach,
path_, database_name_, name_,
columns_, materialized_columns_, alias_columns_, column_defaults_,
context_, primary_expr_ast_, date_column_name, partition_expr_ast_,
sampling_expression_, index_granularity_,
merging_params_, has_force_restore_data_flag_, settings_);
sampling_expression_, merging_params_, settings_,
has_force_restore_data_flag_);
StoragePtr res_ptr = res;
auto get_endpoint_holder = [&res](InterserverIOEndpointPtr endpoint)
......@@ -1096,7 +1094,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
do_fetch = true;
LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead");
}
else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold <= time(nullptr))
else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr))
{
/// If entry is old enough, and have enough size, and part are exists in any replica,
/// then prefer fetching of merged part from replica.
......@@ -1239,7 +1237,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
static std::atomic_uint total_fetches {0};
if (data.settings.replicated_max_parallel_fetches && total_fetches >= data.settings.replicated_max_parallel_fetches)
{
throw Exception("Too much total fetches from replicas, maximum: " + toString(data.settings.replicated_max_parallel_fetches),
throw Exception("Too much total fetches from replicas, maximum: " + data.settings.replicated_max_parallel_fetches.toString(),
ErrorCodes::TOO_MUCH_FETCHES);
}
......@@ -1248,7 +1246,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry)
if (data.settings.replicated_max_parallel_fetches_for_table && current_table_fetches >= data.settings.replicated_max_parallel_fetches_for_table)
{
throw Exception("Too much fetches from replicas for table, maximum: " + toString(data.settings.replicated_max_parallel_fetches_for_table),
throw Exception("Too much fetches from replicas for table, maximum: " + data.settings.replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MUCH_FETCHES);
}
......
......@@ -91,10 +91,9 @@ public:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_, /// nullptr, if sampling is not supported.
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
void startup() override;
void shutdown() override;
......@@ -339,10 +338,9 @@ private:
const String & date_column_name,
const ASTPtr & partition_expr_ast_,
const ASTPtr & sampling_expression_,
size_t index_granularity_,
const MergeTreeData::MergingParams & merging_params_,
bool has_force_restore_data_flag,
const MergeTreeSettings & settings_);
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag);
/// Initialization.
......
......@@ -12,7 +12,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INCORRECT_QUERY;
}
......@@ -20,7 +20,7 @@ StorageView::StorageView(
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
......@@ -28,13 +28,10 @@ StorageView::StorageView(
: IStorage{materialized_columns_, alias_columns_, column_defaults_}, table_name(table_name_),
database_name(database_name_), context(context_), columns(columns_)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_);
ASTSelectQuery & select = typeid_cast<ASTSelectQuery &>(*create.select);
if (!query.select)
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
/// If the internal query does not specify a database, retrieve it from the context and write it to the query.
select.setDatabaseIfNeeded(database_name);
inner_query = create.select;
inner_query = query.select->ptr();
}
......
......@@ -47,7 +47,7 @@ private:
const String & table_name_,
const String & database_name_,
Context & context_,
ASTPtr & query_,
const ASTCreateQuery & query,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
......
......@@ -209,7 +209,7 @@ BlockInputStreams StorageSystemParts::read(
}
block.getByPosition(i++).column->insert(part->name);
block.getByPosition(i++).column->insert(static_cast<UInt64>(active_parts.count(part)));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->size));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->marks_count));
size_t marks_size = 0;
for (const NameAndTypePair & it : part->columns)
......@@ -221,7 +221,7 @@ BlockInputStreams StorageSystemParts::read(
}
block.getByPosition(i++).column->insert(static_cast<UInt64>(marks_size));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->getExactSizeRows()));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->rows_count));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->size_in_bytes));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->modification_time));
block.getByPosition(i++).column->insert(static_cast<UInt64>(part->remove_time));
......
......@@ -48,3 +48,15 @@ Sum before DROP PARTITION:
15
Sum after DROP PARTITION:
8
*** Table without columns with fixed size ***
Parts:
1 1_1_1_0 2
2 2_2_2_0 2
Before DROP PARTITION:
a
aa
b
cc
After DROP PARTITION:
aa
cc
-- IMPORTANT: Don't use this setting just yet.
-- It is for testing purposes, the syntax will likely change soon and the server will not be able
-- to load the tables created this way. You have been warned.
SET experimental_merge_tree_allow_custom_partitions = 1;
SET experimental_allow_extended_storage_definition_syntax = 1;
SELECT '*** Not partitioned ***';
DROP TABLE IF EXISTS test.not_partitioned;
CREATE TABLE test.not_partitioned(x UInt8) ENGINE = MergeTree(tuple(), x, 8192);
CREATE TABLE test.not_partitioned(x UInt8) ENGINE MergeTree ORDER BY x;
INSERT INTO test.not_partitioned VALUES (1), (2), (3);
INSERT INTO test.not_partitioned VALUES (4), (5);
......@@ -28,7 +25,7 @@ DROP TABLE test.not_partitioned;
SELECT '*** Partitioned by week ***';
DROP TABLE IF EXISTS test.partitioned_by_week;
CREATE TABLE test.partitioned_by_week(d Date, x UInt8) ENGINE = MergeTree(toMonday(d), x, 8192);
CREATE TABLE test.partitioned_by_week(d Date, x UInt8) ENGINE = MergeTree PARTITION BY toMonday(d) ORDER BY x;
-- 2000-01-03 belongs to a different week than 2000-01-01 and 2000-01-02
INSERT INTO test.partitioned_by_week VALUES ('2000-01-01', 1), ('2000-01-02', 2), ('2000-01-03', 3);
......@@ -51,7 +48,7 @@ DROP TABLE test.partitioned_by_week;
SELECT '*** Partitioned by a (Date, UInt8) tuple ***';
DROP TABLE IF EXISTS test.partitioned_by_tuple;
CREATE TABLE test.partitioned_by_tuple(d Date, x UInt8, y UInt8) ENGINE = MergeTree((d, x), x, 8192);
CREATE TABLE test.partitioned_by_tuple(d Date, x UInt8, y UInt8) ENGINE MergeTree ORDER BY x PARTITION BY (d, x);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-01', 1, 1), ('2000-01-01', 2, 2), ('2000-01-02', 1, 3);
INSERT INTO test.partitioned_by_tuple VALUES ('2000-01-02', 1, 4), ('2000-01-01', 1, 5);
......@@ -74,7 +71,7 @@ DROP TABLE test.partitioned_by_tuple;
SELECT '*** Partitioned by String ***';
DROP TABLE IF EXISTS test.partitioned_by_string;
CREATE TABLE test.partitioned_by_string(s String, x UInt8) ENGINE = MergeTree(tuple(s), x, 8192);
CREATE TABLE test.partitioned_by_string(s String, x UInt8) ENGINE = MergeTree PARTITION BY s ORDER BY x;
INSERT INTO test.partitioned_by_string VALUES ('aaa', 1), ('aaa', 2), ('bbb', 3);
INSERT INTO test.partitioned_by_string VALUES ('bbb', 4), ('aaa', 5);
......@@ -92,3 +89,21 @@ SELECT 'Sum after DROP PARTITION:';
SELECT sum(x) FROM test.partitioned_by_string;
DROP TABLE test.partitioned_by_string;
SELECT '*** Table without columns with fixed size ***';
DROP TABLE IF EXISTS test.without_fixed_size_columns;
CREATE TABLE test.without_fixed_size_columns(s String) ENGINE MergeTree PARTITION BY length(s) ORDER BY s;
INSERT INTO test.without_fixed_size_columns VALUES ('a'), ('aa'), ('b'), ('cc');
SELECT 'Parts:';
SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns' AND active ORDER BY name;
SELECT 'Before DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns ORDER BY s;
ALTER TABLE test.without_fixed_size_columns DROP PARTITION 1;
SELECT 'After DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns ORDER BY s;
DROP TABLE test.without_fixed_size_columns;
......@@ -48,3 +48,15 @@ Sum before DROP PARTITION:
15
Sum after DROP PARTITION:
8
*** Table without columns with fixed size ***
Parts:
1 1_0_0_1 2
2 2_0_0_0 2
Before DROP PARTITION:
a
aa
b
cc
After DROP PARTITION:
aa
cc
-- IMPORTANT: Don't use this setting just yet.
-- It is for testing purposes, the syntax will likely change soon and the server will not be able
-- to load the tables created this way. You have been warned.
SET experimental_merge_tree_allow_custom_partitions = 1;
SET experimental_allow_extended_storage_definition_syntax = 1;
SET replication_alter_partitions_sync = 2;
......@@ -9,8 +6,8 @@ SELECT '*** Not partitioned ***';
DROP TABLE IF EXISTS test.not_partitioned_replica1;
DROP TABLE IF EXISTS test.not_partitioned_replica2;
CREATE TABLE test.not_partitioned_replica1(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '1', tuple(), x, 8192);
CREATE TABLE test.not_partitioned_replica2(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '2', tuple(), x, 8192);
CREATE TABLE test.not_partitioned_replica1(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '1') ORDER BY x;
CREATE TABLE test.not_partitioned_replica2(x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/not_partitioned', '2') ORDER BY x;
INSERT INTO test.not_partitioned_replica1 VALUES (1), (2), (3);
INSERT INTO test.not_partitioned_replica1 VALUES (4), (5);
......@@ -34,8 +31,8 @@ SELECT '*** Partitioned by week ***';
DROP TABLE IF EXISTS test.partitioned_by_week_replica1;
DROP TABLE IF EXISTS test.partitioned_by_week_replica2;
CREATE TABLE test.partitioned_by_week_replica1(d Date, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '1', toMonday(d), x, 8192);
CREATE TABLE test.partitioned_by_week_replica2(d Date, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '2', toMonday(d), x, 8192);
CREATE TABLE test.partitioned_by_week_replica1(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '1') PARTITION BY toMonday(d) ORDER BY x;
CREATE TABLE test.partitioned_by_week_replica2(d Date, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_week', '2') PARTITION BY toMonday(d) ORDER BY x;
-- 2000-01-03 belongs to a different week than 2000-01-01 and 2000-01-02
INSERT INTO test.partitioned_by_week_replica1 VALUES ('2000-01-01', 1), ('2000-01-02', 2), ('2000-01-03', 3);
......@@ -60,8 +57,8 @@ SELECT '*** Partitioned by a (Date, UInt8) tuple ***';
DROP TABLE IF EXISTS test.partitioned_by_tuple_replica1;
DROP TABLE IF EXISTS test.partitioned_by_tuple_replica2;
CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1', (d, x), x, 8192);
CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2', (d, x), x, 8192);
CREATE TABLE test.partitioned_by_tuple_replica1(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '1') ORDER BY x PARTITION BY (d, x);
CREATE TABLE test.partitioned_by_tuple_replica2(d Date, x UInt8, y UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_tuple', '2') ORDER BY x PARTITION BY (d, x);
INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-01', 1, 1), ('2000-01-01', 2, 2), ('2000-01-02', 1, 3);
INSERT INTO test.partitioned_by_tuple_replica1 VALUES ('2000-01-02', 1, 4), ('2000-01-01', 1, 5);
......@@ -86,8 +83,8 @@ SELECT '*** Partitioned by String ***';
DROP TABLE IF EXISTS test.partitioned_by_string_replica1;
DROP TABLE IF EXISTS test.partitioned_by_string_replica2;
CREATE TABLE test.partitioned_by_string_replica1(s String, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '1', tuple(s), x, 8192);
CREATE TABLE test.partitioned_by_string_replica2(s String, x UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '2', tuple(s), x, 8192);
CREATE TABLE test.partitioned_by_string_replica1(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '1') PARTITION BY s ORDER BY x;
CREATE TABLE test.partitioned_by_string_replica2(s String, x UInt8) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/partitioned_by_string', '2') PARTITION BY s ORDER BY x;
INSERT INTO test.partitioned_by_string_replica1 VALUES ('aaa', 1), ('aaa', 2), ('bbb', 3);
INSERT INTO test.partitioned_by_string_replica1 VALUES ('bbb', 4), ('aaa', 5);
......@@ -106,3 +103,26 @@ SELECT sum(x) FROM test.partitioned_by_string_replica2;
DROP TABLE test.partitioned_by_string_replica1;
DROP TABLE test.partitioned_by_string_replica2;
SELECT '*** Table without columns with fixed size ***';
DROP TABLE IF EXISTS test.without_fixed_size_columns_replica1;
DROP TABLE IF EXISTS test.without_fixed_size_columns_replica2;
CREATE TABLE test.without_fixed_size_columns_replica1(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '1') PARTITION BY length(s) ORDER BY s;
CREATE TABLE test.without_fixed_size_columns_replica2(s String) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/without_fixed_size_columns', '2') PARTITION BY length(s) ORDER BY s;
INSERT INTO test.without_fixed_size_columns_replica1 VALUES ('a'), ('aa'), ('b'), ('cc');
OPTIMIZE TABLE test.without_fixed_size_columns_replica2 PARTITION 1 FINAL; -- Wait for replication.
SELECT 'Parts:';
SELECT partition, name, rows FROM system.parts WHERE database = 'test' AND table = 'without_fixed_size_columns_replica2' AND active ORDER BY name;
SELECT 'Before DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s;
ALTER TABLE test.without_fixed_size_columns_replica1 DROP PARTITION 1;
SELECT 'After DROP PARTITION:';
SELECT * FROM test.without_fixed_size_columns_replica2 ORDER BY s;
DROP TABLE test.without_fixed_size_columns_replica1;
DROP TABLE test.without_fixed_size_columns_replica2;
*** Without PARTITION BY and ORDER BY ***
1
2
*** Replicated with sampling ***
1
*** Replacing with implicit version ***
2017-10-23 1 c
*** Replicated Collapsing ***
2017-10-23 2 1
*** Table definition with SETTINGS ***
0
0
SET experimental_allow_extended_storage_definition_syntax = 1;
SELECT '*** Without PARTITION BY and ORDER BY ***';
DROP TABLE IF EXISTS test.unsorted;
CREATE TABLE test.unsorted(x UInt32) ENGINE UnsortedMergeTree;
INSERT INTO test.unsorted VALUES (1), (2);
SELECT * FROM test.unsorted;
DROP TABLE test.unsorted;
SELECT '*** Replicated with sampling ***';
DROP TABLE IF EXISTS test.replicated_with_sampling;
CREATE TABLE test.replicated_with_sampling(x UInt8)
ENGINE ReplicatedMergeTree('/clickhouse/tables/test/replicated_with_sampling', 'r1')
ORDER BY x
SAMPLE BY x;
INSERT INTO test.replicated_with_sampling VALUES (1), (128);
SELECT sum(x) FROM test.replicated_with_sampling SAMPLE 1/2;
DROP TABLE test.replicated_with_sampling;
SELECT '*** Replacing with implicit version ***';
DROP TABLE IF EXISTS test.replacing;
CREATE TABLE test.replacing(d Date, x UInt32, s String) ENGINE = ReplacingMergeTree ORDER BY x PARTITION BY d;
INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'a');
INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'b');
INSERT INTO test.replacing VALUES ('2017-10-23', 1, 'c');
OPTIMIZE TABLE test.replacing PARTITION '2017-10-23' FINAL;
SELECT * FROM test.replacing;
DROP TABLE test.replacing;
SELECT '*** Replicated Collapsing ***';
DROP TABLE IF EXISTS test.replicated_collapsing;
CREATE TABLE test.replicated_collapsing(d Date, x UInt32, sign Int8)
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/test/replicated_collapsing', 'r1', sign)
PARTITION BY toYYYYMM(d) ORDER BY d;
INSERT INTO test.replicated_collapsing VALUES ('2017-10-23', 1, 1);
INSERT INTO test.replicated_collapsing VALUES ('2017-10-23', 1, -1), ('2017-10-23', 2, 1);
OPTIMIZE TABLE test.replicated_collapsing PARTITION 201710 FINAL;
SELECT * FROM test.replicated_collapsing;
DROP TABLE test.replicated_collapsing;
SELECT '*** Table definition with SETTINGS ***';
DROP TABLE IF EXISTS test.with_settings;
CREATE TABLE test.with_settings(x UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/tables/test/with_settings', 'r1')
ORDER BY x
SETTINGS replicated_can_become_leader = 0;
SELECT sleep(1); -- If replicated_can_become_leader were true, this replica would become the leader after 1 second.
SELECT is_leader FROM system.replicas WHERE database = 'test' AND table = 'with_settings';
DROP TABLE test.with_settings;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册