未验证 提交 490722ad 编写于 作者: A alesapin 提交者: GitHub

Merge pull request #6723 from yandex/bad_size_of_marks_skip_idx_bug

Fix segmentation fault in vertical merge with skip indices
......@@ -726,7 +726,7 @@ void ExpressionActions::addImpl(ExpressionAction action, Names & new_names)
new_names.push_back(action.result_name);
new_names.insert(new_names.end(), action.array_joined_columns.begin(), action.array_joined_columns.end());
/// Compiled functions are custom functions and them don't need building
/// Compiled functions are custom functions and they don't need building
if (action.type == ExpressionAction::APPLY_FUNCTION && !action.is_function_compiled)
{
if (sample_block.has(action.result_name))
......
......@@ -289,13 +289,15 @@ void MutationsInterpreter::prepare(bool dry_run)
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
}
/// We cares about affected indices because we also need to rewrite them
/// when one of index columns updated or filtered with delete
if (!affected_indices_columns.empty())
{
if (!stages.empty())
{
std::vector<Stage> stages_copy;
/// Copy all filled stages except index calculation stage.
for (const auto &stage : stages)
for (const auto & stage : stages)
{
stages_copy.emplace_back(context);
stages_copy.back().column_to_updated = stage.column_to_updated;
......
......@@ -41,7 +41,7 @@ public:
return res;
}
void formatImpl(const FormatSettings & s, FormatState &state, FormatStateStacked frame) const override
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override
{
frame.need_parens = false;
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');
......
......@@ -336,9 +336,9 @@ void IMergedBlockOutputStream::calculateAndSerializeSkipIndices(
size_t skip_index_current_mark = 0;
/// Filling and writing skip indices like in IMergedBlockOutputStream::writeColumn
for (size_t i = 0; i < storage.skip_indices.size(); ++i)
for (size_t i = 0; i < skip_indices.size(); ++i)
{
const auto index = storage.skip_indices[i];
const auto index = skip_indices[i];
auto & stream = *skip_indices_streams[i];
size_t prev_pos = 0;
skip_index_current_mark = skip_index_mark;
......
......@@ -358,10 +358,10 @@ void MergeTreeData::setProperties(
const auto & index_decl = std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast);
new_indices.push_back(
MergeTreeIndexFactory::instance().get(
all_columns,
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_decl->clone()),
global_context));
MergeTreeIndexFactory::instance().get(
all_columns,
std::dynamic_pointer_cast<ASTIndexDeclaration>(index_decl->clone()),
global_context));
if (indices_names.find(new_indices.back()->name) != indices_names.end())
throw Exception(
......@@ -1293,7 +1293,7 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & c
}
if (columns_alter_forbidden.count(command.column_name))
throw Exception("trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Trying to ALTER key column " + command.column_name, ErrorCodes::ILLEGAL_COLUMN);
if (columns_alter_metadata_only.count(command.column_name))
{
......@@ -1578,7 +1578,8 @@ void MergeTreeData::alterDataPart(
if (expression)
{
BlockInputStreamPtr part_in = std::make_shared<MergeTreeSequentialBlockInputStream>(
*this, part, expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
*this, part, expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
auto compression_codec = global_context.chooseCompressionCodec(
part->bytes_on_disk,
......@@ -1600,7 +1601,8 @@ void MergeTreeData::alterDataPart(
true /* sync */,
compression_codec,
true /* skip_offsets */,
{},
/// Don't recalc indices because indices alter is restricted
std::vector<MergeTreeIndexPtr>{},
unused_written_offsets,
part->index_granularity,
&part->index_granularity_info);
......
......@@ -621,6 +621,13 @@ public:
(settings->enable_mixed_granularity_parts || !has_non_adaptive_index_granularity_parts);
}
/// Get constant pointer to storage settings.
/// Copy this pointer into your scope and you will
/// get consistent settings.
MergeTreeSettingsPtr getSettings() const
{
return storage_settings.get();
}
MergeTreeDataFormatVersion format_version;
......@@ -679,13 +686,6 @@ public:
bool has_non_adaptive_index_granularity_parts = false;
/// Get constant pointer to storage settings.
/// Copy this pointer into your scope and you will
/// get consistent settings.
MergeTreeSettingsPtr getSettings() const
{
return storage_settings.get();
}
protected:
......
......@@ -379,7 +379,7 @@ static void extractMergingAndGatheringColumns(
std::set<String> key_columns(sort_key_columns_vec.cbegin(), sort_key_columns_vec.cend());
for (const auto & index : indexes)
{
Names index_columns_vec = index->expr->getRequiredColumns();
Names index_columns_vec = index->getColumnsRequiredForIndexCalc();
std::copy(index_columns_vec.cbegin(), index_columns_vec.cend(),
std::inserter(key_columns, key_columns.end()));
}
......@@ -828,6 +828,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
rows_sources_read_buf.seek(0, 0);
ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf);
MergedColumnOnlyOutputStream column_to(
data,
column_gathered_stream.getHeader(),
......@@ -835,10 +836,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
false,
compression_codec,
false,
{},
/// we don't need to recalc indices here
/// because all of them were already recalculated and written
/// as key part of vertical merge
std::vector<MergeTreeIndexPtr>{},
written_offset_columns,
to.getIndexGranularity()
);
to.getIndexGranularity());
size_t column_elems_written = 0;
column_to.writePrefix();
......@@ -1019,9 +1023,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
for (size_t i = 0; i < data.skip_indices.size(); ++i)
{
const auto & index = data.skip_indices[i];
const auto & index_cols = index->expr->getRequiredColumns();
auto it = find(cbegin(index_cols), cend(index_cols), col);
if (it != cend(index_cols) && indices_to_recalc.insert(index).second)
const auto & index_cols = index->getColumnsRequiredForIndexCalc();
auto it = std::find(std::cbegin(index_cols), std::cend(index_cols), col);
if (it != std::cend(index_cols) && indices_to_recalc.insert(index).second)
{
ASTPtr expr_list = MergeTreeData::extractKeyExpressionList(
storage_from_source_part->getIndices().indices[i]->expr->clone());
......@@ -1038,6 +1042,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
/// We can update only one column, but some skip idx expression may depend on several
/// columns (c1 + c2 * c3). It works because in stream was created with help of
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.
in = std::make_shared<MaterializingBlockInputStream>(
std::make_shared<ExpressionBlockInputStream>(in, indices_recalc_expr));
}
......
......@@ -51,6 +51,7 @@ std::unique_ptr<IMergeTreeIndex> MergeTreeIndexFactory::get(
return lft + ", " + rht.first;
}),
ErrorCodes::INCORRECT_QUERY);
return it->second(columns, node, context);
}
......
......@@ -104,11 +104,25 @@ public:
virtual MergeTreeIndexConditionPtr createIndexCondition(
const SelectQueryInfo & query_info, const Context & context) const = 0;
Names getColumnsRequiredForIndexCalc() const { return expr->getRequiredColumns(); }
/// Index name
String name;
/// Index expression (x * y)
/// with columns arguments
ExpressionActionsPtr expr;
/// Names of columns for index
Names columns;
/// Data types of columns
DataTypes data_types;
/// Block with columns and data_types
Block header;
/// Skip index granularity
size_t granularity;
};
......
SET allow_experimental_data_skipping_indices=1;
DROP TABLE IF EXISTS test_vertical_merge;
CREATE TABLE test_vertical_merge (
k UInt64,
val1 UInt64,
val2 UInt64,
INDEX idx1 val1 * val2 TYPE minmax GRANULARITY 1,
INDEX idx2 val1 * k TYPE minmax GRANULARITY 1
) ENGINE MergeTree()
ORDER BY k
SETTINGS vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO test_vertical_merge SELECT number, number + 5, number * 12 from numbers(1000);
SELECT COUNT() from test_vertical_merge WHERE val2 <= 2400;
OPTIMIZE TABLE test_vertical_merge FINAL;
SELECT COUNT() from test_vertical_merge WHERE val2 <= 2400;
DROP TABLE IF EXISTS test_vertical_merge;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册