提交 2abf4bbc 编写于 作者: A alesapin

Not working state

上级 e7aa209b
......@@ -307,6 +307,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
}
//std::cerr << "Source header:" << source_header.dumpStructure() << std::endl;
if (storage)
table_lock = storage->lockStructureForShare(false, context->getInitialQueryId());
......
......@@ -19,6 +19,7 @@
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/formatAST.h>
#include <IO/WriteHelpers.h>
#include <Parsers/queryToString.h>
namespace DB
......@@ -172,7 +173,9 @@ MutationsInterpreter::MutationsInterpreter(
, context(context_)
, can_execute(can_execute_)
{
std::cerr << "STORAGE IS NULLPTR:" << (storage == nullptr) << std::endl;
mutation_ast = prepare(!can_execute);
std::cerr << "Mutations ast:" << queryToString(mutation_ast) << std::endl;
SelectQueryOptions limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits();
select_interpreter = std::make_unique<InterpreterSelectQuery>(mutation_ast, context, storage, limits);
}
......@@ -259,15 +262,22 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
if (commands.empty())
throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR);
std::cerr << "PREPARING\n";
const ColumnsDescription & columns_desc = storage->getColumns();
const IndicesDescription & indices_desc = storage->getIndices();
std::cerr << "COLUMNS RECEIVED:" << columns_desc.toString() << std::endl;
NamesAndTypesList all_columns = columns_desc.getAllPhysical();
std::cerr << "COMMANDS SIZE:" << commands.size() << std::endl;
NameSet updated_columns;
for (const MutationCommand & command : commands)
{
for (const auto & kv : command.column_to_update_expression)
{
std::cerr << "COLUMN:" << kv.first << std::endl;
updated_columns.insert(kv.first);
}
}
/// We need to know which columns affect which MATERIALIZED columns and data skipping indices
......@@ -311,6 +321,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
/// First, break a sequence of commands into stages.
for (const auto & command : commands)
{
std::cerr << "Processing command:" << command.ast << std::endl;
if (command.type == MutationCommand::DELETE)
{
if (stages.empty() || !stages.back().column_to_updated.empty())
......@@ -381,9 +392,16 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
const auto required_columns = syntax_result->requiredSourceColumns();
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
}
else if (command.type == MutationCommand::CAST)
else if (command.type == MutationCommand::READ)
{
stages.back().column_to_updated.emplace(command.column_name, makeASTFunction("CAST", command.column_name, command.type_name));
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
/// TODO(alesap)
if (command.data_type)
stages.back().column_to_updated.emplace(command.column_name, std::make_shared<ASTIdentifier>(command.column_name));
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
......@@ -427,6 +445,8 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run)
{
NamesAndTypesList all_columns = storage->getColumns().getAllPhysical();
std::cerr << "Prepare interpreter storage columns:" << all_columns.toString() << std::endl;
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i)
......
#include "evaluateMissingDefaults.h"
#include <iostream>
#include <Core/Block.h>
#include <Parsers/queryToString.h>
#include <Storages/ColumnDefault.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTWithAlias.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <utility>
#include <DataTypes/DataTypesNumber.h>
......@@ -14,7 +19,8 @@
namespace DB
{
static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults)
namespace {
ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults)
{
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
......@@ -27,8 +33,7 @@ static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & requi
/// expressions must be cloned to prevent modification by the ExpressionAnalyzer
if (it != column_defaults.end())
default_expr_list->children.emplace_back(
setAlias(it->second.expression->clone(), it->first));
default_expr_list->children.emplace_back(setAlias(it->second.expression->clone(), it->first));
}
if (default_expr_list->children.empty())
......@@ -36,22 +41,49 @@ static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & requi
return default_expr_list;
}
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const Context & context, bool save_unneeded_columns)
ASTPtr convertRequiredExpressions(Block & block, const NamesAndTypesList & required_columns)
{
if (column_defaults.empty())
return;
ASTPtr conversion_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & required_column : required_columns)
{
if (!block.has(required_column.name))
continue;
//throw Exception("Required conversion of column " + required_column.name + " which is absent in block. It's a bug", ErrorCodes::LOGICAL_ERROR);
auto column_in_block = block.getByName(required_column.name);
//std::cerr << "Looking at:" << required_column.name << std::endl;
//std::cerr << "In block type:" << column_in_block.type->getName() << std::endl;
//std::cerr << "Required type:" << required_column.type->getName() << std::endl;
if (column_in_block.type->equals(*required_column.type))
{
//std::cerr << "TYPES ARE SAME\n";
continue;
}
//std::cerr << "TYPES ARE DIFFERENT\n";
auto cast_func = makeASTFunction(
"CAST", std::make_shared<ASTIdentifier>(required_column.name), std::make_shared<ASTLiteral>(required_column.type->getName()));
conversion_expr_list->children.emplace_back(setAlias(cast_func, required_column.name));
ASTPtr default_expr_list = requiredExpressions(block, required_columns, column_defaults);
if (!default_expr_list)
}
return conversion_expr_list;
}
void executeExpressionsOnBlock(
Block & block,
ASTPtr expr_list,
bool save_unneeded_columns,
const NamesAndTypesList & required_columns,
const Context & context)
{
if (!expr_list)
return;
if (!save_unneeded_columns)
{
auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block);
auto syntax_result = SyntaxAnalyzer(context).analyze(expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{expr_list, syntax_result, context}.getActions(true)->execute(block);
return;
}
......@@ -59,8 +91,8 @@ void evaluateMissingDefaults(Block & block,
* we are going to operate on a copy instead of the original block */
Block copy_block{block};
auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{default_expr_list, syntax_result, context};
auto syntax_result = SyntaxAnalyzer(context).analyze(expr_list, block.getNamesAndTypesList());
auto expression_analyzer = ExpressionAnalyzer{expr_list, syntax_result, context};
auto required_source_columns = syntax_result->requiredSourceColumns();
auto rows_was = copy_block.rows();
......@@ -82,7 +114,9 @@ void evaluateMissingDefaults(Block & block,
copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared<DataTypeUInt8>(), "__dummy"});
}
//std::cerr << "Block before expression:" << copy_block.dumpStructure() << std::endl;
expression_analyzer.getActions(true)->execute(copy_block);
//std::cerr << "Block after expression:" << copy_block.dumpStructure() << std::endl;
/// move evaluated columns to the original block, materializing them at the same time
size_t pos = 0;
......@@ -93,9 +127,36 @@ void evaluateMissingDefaults(Block & block,
auto evaluated_col = copy_block.getByName(col->name);
evaluated_col.column = evaluated_col.column->convertToFullColumnIfConst();
block.insert(pos, std::move(evaluated_col));
if (block.has(col->name))
block.getByName(col->name) = std::move(evaluated_col);
else
block.insert(pos, std::move(evaluated_col));
}
}
}
}
void performRequiredConversions(Block & block, const NamesAndTypesList & required_columns, const Context & context)
{
ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns);
//std::cerr << queryToString(conversion_expr_list) << std::endl;
//std::cerr << "Block:" << block.dumpStructure() << std::endl;
if (conversion_expr_list->children.empty())
return;
executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context);
}
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const Context & context, bool save_unneeded_columns)
{
if (column_defaults.empty())
return;
ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, column_defaults);
executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context);
}
}
......@@ -17,4 +17,7 @@ void evaluateMissingDefaults(Block & block,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const Context & context, bool save_unneeded_columns = true);
void performRequiredConversions(Block & block,
const NamesAndTypesList & required_columns,
const Context & context);
}
......@@ -49,7 +49,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
if (command_ast->type == ASTAlterCommand::ADD_COLUMN)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = AlterCommand::ADD_COLUMN;
const auto & ast_col_decl = command_ast->col_decl->as<ASTColumnDeclaration &>();
......@@ -90,7 +90,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = AlterCommand::DROP_COLUMN;
command.column_name = getIdentifierName(command_ast->column);
command.if_exists = command_ast->if_exists;
......@@ -99,7 +99,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = AlterCommand::MODIFY_COLUMN;
const auto & ast_col_decl = command_ast->col_decl->as<ASTColumnDeclaration &>();
......@@ -135,7 +135,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::COMMENT_COLUMN)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = COMMENT_COLUMN;
command.column_name = getIdentifierName(command_ast->column);
const auto & ast_comment = command_ast->comment->as<ASTLiteral &>();
......@@ -146,7 +146,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_ORDER_BY)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = AlterCommand::MODIFY_ORDER_BY;
command.order_by = command_ast->order_by;
return command;
......@@ -154,7 +154,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::ADD_INDEX)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.index_decl = command_ast->index_decl;
command.type = AlterCommand::ADD_INDEX;
......@@ -172,7 +172,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::ADD_CONSTRAINT)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.constraint_decl = command_ast->constraint_decl;
command.type = AlterCommand::ADD_CONSTRAINT;
......@@ -190,7 +190,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.if_exists = command_ast->if_exists;
command.type = AlterCommand::DROP_CONSTRAINT;
command.constraint_name = command_ast->constraint->as<ASTIdentifier &>().name;
......@@ -203,7 +203,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
throw Exception("\"ALTER TABLE table CLEAR INDEX index\" queries are not supported yet. Use \"CLEAR INDEX index IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = AlterCommand::DROP_INDEX;
command.index_name = command_ast->index->as<ASTIdentifier &>().name;
command.if_exists = command_ast->if_exists;
......@@ -213,7 +213,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_TTL)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = AlterCommand::MODIFY_TTL;
command.ttl = command_ast->ttl;
return command;
......@@ -221,7 +221,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::MODIFY_SETTING)
{
AlterCommand command;
command.ast = command_ast;
command.ast = command_ast->clone();
command.type = AlterCommand::MODIFY_SETTING;
command.settings_changes = command_ast->settings_changes->as<ASTSetQuery &>().changes;
return command;
......@@ -499,12 +499,18 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metadata) const
{
if (ignore)
return false;
if (type == DROP_COLUMN)
return true;
if (type != MODIFY_COLUMN || data_type == nullptr)
return false;
for (const auto & column : metadata.columns.getAllPhysical())
{
if (column.name == column_name && !isMetadataOnlyConversion(column.type, data_type))
if (column.name == column_name && !isMetadataOnlyConversion(column.type.get(), data_type.get()))
return true;
}
return false;
......@@ -534,11 +540,11 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const S
MutationCommand result;
result.type = MutationCommand::Type::CAST;
result.type = MutationCommand::Type::READ;
result.column_name = column_name;
result.data_type = data_type;
result.predicate = nullptr;
result.ast = ast;
result.ast = ast->clone();
return result;
}
......@@ -835,7 +841,7 @@ bool AlterCommands::isCommentAlter() const
}
MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const
MutationCommands AlterCommands::getMutationCommands(const StorageInMemoryMetadata & metadata) const
{
MutationCommands result;
for (const auto & alter_cmd : *this)
......
......@@ -101,6 +101,7 @@ Block IStorage::getSampleBlockForColumns(const Names & column_names) const
Block res;
NamesAndTypesList all_columns = getColumns().getAll();
//std::cerr << "ALL LLLL COLUMNS:" << all_columns.toString() << std::endl;
std::unordered_map<String, DataTypePtr> columns_map;
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
......@@ -120,6 +121,7 @@ Block IStorage::getSampleBlockForColumns(const Names & column_names) const
}
}
//std::cerr << "RES:" << res.dumpStructure() << std::endl;
return res;
}
......
......@@ -44,6 +44,8 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
save_marks_in_cache(save_marks_in_cache_),
virt_column_names(virt_column_names_)
{
//std::cerr << "HEADER IN SELECT PROCESSOR:" << getPort().getHeader().dumpStructure() << std::endl;
//std::cerr << "STACK:" << StackTrace().toString() << std::endl;
header_without_virtual_columns = getPort().getHeader();
for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it)
......
......@@ -241,10 +241,10 @@ MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const
{
/// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table.
/// This may be not true in case of ALTER MODIFY.
if (!pre_column_names.empty())
storage.check(data_part->columns, pre_column_names);
if (!column_names.empty())
storage.check(data_part->columns, column_names);
//if (!pre_column_names.empty())
// storage.check(data_part->columns, pre_column_names);
//if (!column_names.empty())
// storage.check(data_part->columns, column_names);
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
result.pre_columns = physical_columns.addTypes(pre_column_names);
......
......@@ -1536,15 +1536,15 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S
}
}
if (commands.isModifyingData())
{
/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
createConvertExpression(nullptr, getColumns().getAllPhysical(), metadata.columns.getAllPhysical(),
getIndices().indices, metadata.indices.indices, unused_expression, unused_map, unused_bool);
}
//if (commands.isModifyingData())
//{
// /// Check that type conversions are possible.
// ExpressionActionsPtr unused_expression;
// NameToNameMap unused_map;
// bool unused_bool;
// createConvertExpression(nullptr, getColumns().getAllPhysical(), metadata.columns.getAllPhysical(),
// getIndices().indices, metadata.indices.indices, unused_expression, unused_map, unused_bool);
//}
}
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns,
......
......@@ -962,6 +962,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, space_reservation->getDisk(), future_part.name, future_part.part_info);
new_data_part->relative_path = "tmp_mut_" + future_part.name;
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;
......@@ -988,6 +989,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const auto data_settings = data.getSettings();
Block in_header = in->getHeader();
std::cerr << "Mutations header:" << in_header.dumpStructure() << std::endl;
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
......@@ -1069,6 +1071,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// Don't change granularity type while mutating subset of columns
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
/// Skip updated files
for (const auto & entry : updated_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
......@@ -1087,6 +1091,33 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
files_to_skip.insert(index->getFileName() + mrk_extension);
}
/// TODO(alesap) better
for (const auto & part_column : source_part->columns)
{
bool found = false;
for (const auto & all_column : all_columns)
{
if (part_column.name == all_column.name)
{
found = true;
break;
}
}
if (!found)
{
std::cerr << "REMOVING COLUMN:" << part_column.name << std::endl;
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(part_column.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + mrk_extension);
};
IDataType::SubstreamPath stream_path;
part_column.type->enumerateStreams(callback, stream_path);
}
}
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
{
......@@ -1101,37 +1132,46 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
merge_entry->columns_written = all_columns.size() - updated_header.columns();
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data,
updated_header,
new_part_tmp_path,
/* sync = */ false,
compression_codec,
/* skip_offsets = */ false,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
unused_written_offsets,
source_part->index_granularity,
&source_part->index_granularity_info
);
new_data_part->checksums = source_part->checksums;
if (updated_header.columns() != 0)
{
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data,
updated_header,
new_part_tmp_path,
/* sync = */ false,
compression_codec,
/* skip_offsets = */ false,
std::vector<MergeTreeIndexPtr>(indices_to_recalc.begin(), indices_to_recalc.end()),
unused_written_offsets,
source_part->index_granularity,
&source_part->index_granularity_info
);
in->readPrefix();
out.writePrefix();
Block block;
while (check_not_cancelled() && (block = in->read()))
{
out.write(block);
merge_entry->rows_written += block.rows();
merge_entry->bytes_written_uncompressed += block.bytes();
}
in->readPrefix();
out.writePrefix();
in->readSuffix();
Block block;
while (check_not_cancelled() && (block = in->read()))
{
out.write(block);
auto changed_checksums = out.writeSuffixAndGetChecksums();
merge_entry->rows_written += block.rows();
merge_entry->bytes_written_uncompressed += block.bytes();
new_data_part->checksums.add(std::move(changed_checksums));
}
in->readSuffix();
auto changed_checksums = out.writeSuffixAndGetChecksums();
for (const String & file_to_skip : files_to_skip)
if (new_data_part->checksums.files.count(file_to_skip))
new_data_part->checksums.files.erase(file_to_skip);
new_data_part->checksums = source_part->checksums;
new_data_part->checksums.add(std::move(changed_checksums));
{
/// Write file with checksums.
WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096);
......@@ -1144,8 +1184,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = new_data_part->columns.begin(); it != new_data_part->columns.end();)
{
if (source_columns_name_set.count(it->name) || updated_header.has(it->name))
if (updated_header.has(it->name))
{
auto updated_type = updated_header.getByName(it->name).type;
if (updated_type != it->type)
it->type = updated_type;
++it;
}
else if (source_columns_name_set.count(it->name))
{
++it;
}
else
it = new_data_part->columns.erase(it);
}
......
......@@ -644,6 +644,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
}
else
{
std::cerr << "Spreading marks among streams\n";
res = spreadMarkRangesAmongStreams(
std::move(parts_with_ranges),
num_streams,
......@@ -763,6 +764,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, query_info.prewhere_info, true,
column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
std::cerr << "POOL HEADER:" << pool->getHeader().dumpStructure() << std::endl;
/// Let's estimate total number of rows for progress bar.
LOG_TRACE(log, "Reading approx. " << total_rows << " rows with " << num_streams << " streams");
......@@ -790,6 +792,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
{
RangesInDataPart & part = parts[part_index];
std::cerr << "Creating sequential stream from part:" << part_index << std::endl;
auto source = std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
......
......@@ -618,6 +618,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
}
merge_tree_reader->evaluateMissingDefaults(block, columns);
merge_tree_reader->performRequiredConversions(columns);
}
read_result.columns.reserve(read_result.columns.size() + columns.size());
......@@ -637,6 +638,8 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar
if (should_evaluate_missing_defaults)
merge_tree_reader->evaluateMissingDefaults({}, read_result.columns);
merge_tree_reader->performRequiredConversions(read_result.columns);
}
else
read_result.columns.clear();
......
......@@ -155,6 +155,7 @@ MarkRanges MergeTreeReadPool::getRestMarks(const MergeTreeDataPart & part, const
Block MergeTreeReadPool::getHeader() const
{
//////////////////std::cerr << "COLUMN NAMES IN POOL:" << column_names.front() << std::endl;
return data.getSampleBlockForColumns(column_names);
}
......
......@@ -56,10 +56,32 @@ MergeTreeReader::MergeTreeReader(
, mmap_threshold(mmap_threshold_)
, max_read_buffer_size(max_read_buffer_size_)
{
//std::cerr << "Merge tree reader created for part:" << data_part->name << std::endl;
try
{
for (const NameAndTypePair & column_from_part : data_part->columns)
{
columns_from_part[column_from_part.name] = column_from_part.type;
}
for (const NameAndTypePair & column : columns)
addStreams(column.name, *column.type, profile_callback_, clock_type_);
{
//std::cerr << "Column name to read:" << column.name << std::endl;
if (columns_from_part.count(column.name))
{
//std::cerr << "With type:" << columns_from_part[column.name]->getName() << std::endl;
//std::cerr << "Original type:" << column.type->getName() << std::endl;
addStreams(column.name, *columns_from_part[column.name], profile_callback_, clock_type_);
}
else
{
//std::cerr << "Original type:" << column.type->getName() << std::endl;
addStreams(column.name, *column.type, profile_callback_, clock_type_);
}
}
//std::cerr << "COLUMNS IN CONSTRUCTOR:" << columns.toString() << std::endl;
}
catch (...)
{
......@@ -95,12 +117,17 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t
auto name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
auto & [name, type] = *name_and_type;
String & name = name_and_type->name;
DataTypePtr type;
if (columns_from_part.count(name))
type = columns_from_part[name];
else
type = name_and_type->type;
/// The column is already present in the block so we will append the values to the end.
bool append = res_columns[pos] != nullptr;
if (!append)
res_columns[pos] = name_and_type->type->createColumn();
res_columns[pos] = type->createColumn();
/// To keep offsets shared. TODO Very dangerous. Get rid of this.
MutableColumnPtr column = res_columns[pos]->assumeMutable();
......@@ -214,6 +241,7 @@ void MergeTreeReader::readData(
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
bool with_offsets)
{
//std::cerr << "READ DATA:" << name << " with type:" << type.getName() << std::endl;
auto get_stream_getter = [&](bool stream_for_prefix) -> IDataType::InputStreamGetter
{
return [&, stream_for_prefix](const IDataType::SubstreamPath & substream_path) -> ReadBuffer *
......@@ -370,7 +398,7 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
size_t num_columns = columns.size();
if (res_columns.size() != num_columns)
throw Exception("invalid number of columns passed to MergeTreeReader::fillMissingColumns. "
throw Exception("invalid number of columns passed to MergeTreeReader::evaluateMissingDefaults. "
"Expected " + toString(num_columns) + ", "
"got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR);
......@@ -400,4 +428,65 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
}
}
void MergeTreeReader::performRequiredConversions(Columns & res_columns)
{
try
{
size_t num_columns = columns.size();
if (res_columns.size() != num_columns)
{
throw Exception(
"invalid number of columns passed to MergeTreeReader::performRequiredConversions. "
"Expected "
+ toString(num_columns)
+ ", "
"got "
+ toString(res_columns.size()),
ErrorCodes::LOGICAL_ERROR);
}
Block copy_block;
auto name_and_type = columns.begin();
//std::cerr << "DATAPART NAMES AND TYPES:" << data_part->columns.toString() << std::endl;
//std::cerr << "REQUIRED COLUMNS NAMES AND TYPES:" << columns.toString() << std::endl;
//std::cerr << "RES COLUMNS SIZE:" << res_columns.size() << std::endl;
//std::cerr << "RES COLUMNS STRUCTURE:\n";
//for (const auto & column : res_columns)
//{
// std::cerr << column->dumpStructure() << std::endl;
//}
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
//std::cerr << "POS:" << pos << std::endl;
if (res_columns[pos] == nullptr)
continue;
//std::cerr << "POS NAME:" << name_and_type->name << std::endl;
//std::cerr << "POS TYPE:" << name_and_type->type->getName() << std::endl;
if (columns_from_part.count(name_and_type->name))
copy_block.insert({res_columns[pos], columns_from_part[name_and_type->name], name_and_type->name});
else
copy_block.insert({res_columns[pos], name_and_type->type, name_and_type->name});
}
//std::cerr << "Copy block: " << copy_block.dumpStructure() << std::endl;
DB::performRequiredConversions(copy_block, columns, storage.global_context);
std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl;
/// Move columns from block.
name_and_type = columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
res_columns[pos] = std::move(copy_block.getByName(name_and_type->name).column);
}
}
catch(Exception & e)
{
/// Better diagnostics.
e.addMessage("(while reading from part " + path + ")");
throw;
}
}
}
......@@ -45,6 +45,9 @@ public:
/// Evaluate defaulted columns if necessary.
void evaluateMissingDefaults(Block additional_columns, Columns & res_columns);
/// Perform conversions TODO(alesap)
void performRequiredConversions(Columns & res_columns);
const NamesAndTypesList & getColumns() const { return columns; }
size_t numColumnsInResult() const { return columns.size(); }
......@@ -74,6 +77,8 @@ private:
/// Columns that are read.
NamesAndTypesList columns;
std::unordered_map<String, DataTypePtr> columns_from_part;
UncompressedCache * uncompressed_cache;
MarkCache * mark_cache;
/// If save_marks_in_cache is false, then, if marks are not in cache, we will load them but won't save in the cache, to avoid evicting other data.
......
......@@ -11,25 +11,25 @@ namespace ErrorCodes
extern const int MEMORY_LIMIT_EXCEEDED;
}
static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part)
{
/// Types may be different during ALTER (when this stream is used to perform an ALTER).
/// NOTE: We may use similar code to implement non blocking ALTERs.
for (const auto & name_type : data_part->columns)
{
if (header.has(name_type.name))
{
auto & elem = header.getByName(name_type.name);
if (!elem.type->equals(*name_type.type))
{
elem.type = name_type.type;
elem.column = elem.type->createColumn();
}
}
}
return std::move(header);
}
//static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part)
//{
// /// Types may be different during ALTER (when this stream is used to perform an ALTER).
// /// NOTE: We may use similar code to implement non blocking ALTERs.
// for (const auto & name_type : data_part->columns)
// {
// if (header.has(name_type.name))
// {
// auto & elem = header.getByName(name_type.name);
// if (!elem.type->equals(*name_type.type))
// {
// elem.type = name_type.type;
// elem.column = elem.type->createColumn();
// }
// }
// }
//
// return std::move(header);
//}
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
const MergeTreeData & storage_,
......@@ -51,7 +51,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
bool quiet)
:
MergeTreeBaseSelectProcessor{
replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_),
storage_.getSampleBlockForColumns(required_columns_),
storage_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_, min_bytes_to_use_mmap_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_},
......
......@@ -35,18 +35,20 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream(
addTotalRowsApprox(data_part->rows_count);
header = storage.getSampleBlockForColumns(columns_to_read);
fixHeader(header);
//fixHeader(header);
/// Add columns because we don't want to read empty blocks
injectRequiredColumns(storage, data_part, columns_to_read);
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)
{
std::cerr << "Taking columns from storage\n";
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
columns_for_reader = physical_columns.addTypes(columns_to_read);
}
else
{
std::cerr << "Taking columns from data part\n";
/// take columns from data_part
columns_for_reader = data_part->columns.addTypes(columns_to_read);
}
......@@ -107,6 +109,8 @@ try
if (should_evaluate_missing_defaults)
reader->evaluateMissingDefaults({}, columns);
reader->performRequiredConversions(columns);
res = header.cloneEmpty();
/// Reorder columns and fill result block.
......
......@@ -67,10 +67,10 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
case FINISH_ALTER: /// Just make local /metadata and /columns consistent with global
out << "alter\n";
for (const String & s : source_parts)
out << s << '\n';
out << "finish";
out << required_mutation_znode << "\n";
out << "finish\n";
break;
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
......@@ -161,18 +161,14 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
else if (type_str == "alter")
{
type = FINISH_ALTER;
while (!in.eof())
{
String s;
in >> s >> "\n";
if (s == "finish")
break;
source_parts.push_back(s);
}
in >> required_mutation_znode >> "\nfinish\n";
}
std::cerr << "Read backn\n";
in >> "\n";
std::cerr << "Readed\n";
/// Optional field.
if (!in.eof())
in >> "quorum: " >> quorum >> "\n";
......
......@@ -89,6 +89,9 @@ struct ReplicatedMergeTreeLogEntryData
/// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory.
bool detach = false;
/// For ALTER TODO(alesap)
String required_mutation_znode;
/// REPLACE PARTITION FROM command
struct ReplaceRangeEntry
{
......@@ -111,6 +114,10 @@ struct ReplicatedMergeTreeLogEntryData
/// selection of merges. These parts are added to queue.virtual_parts.
Strings getVirtualPartNames() const
{
/// Doesn't produce any part
if (type == FINISH_ALTER)
return {};
/// DROP_RANGE does not add a real part, but we must disable merges in that range
if (type == DROP_RANGE)
return {new_part_name};
......
......@@ -517,6 +517,8 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, C
}
catch (...)
{
std::cerr << "DIE HERE:\n";
tryLogCurrentException(log);
/// If it fails, the data in RAM is incorrect. In order to avoid possible further corruption of data in ZK, we will kill ourselves.
/// This is possible only if there is an unknown logical error.
std::terminate();
......@@ -532,7 +534,8 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, C
}
static Names getPartNamesToMutate(
namespace {
Names getPartNamesToMutate(
const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts)
{
Names result;
......@@ -556,8 +559,9 @@ static Names getPartNamesToMutate(
return result;
}
}
Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const
Names ReplicatedMergeTreeQueue::getCurrentPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const
{
return getPartNamesToMutate(entry, current_parts);
}
......@@ -1008,16 +1012,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (entry.type == LogEntry::FINISH_ALTER)
{
for (const auto & name : entry.source_parts)
{
if (future_parts.count(name))
{
String reason = "Not altering storage because part " + name
+ " is not ready yet (log entry for that part is being processed).";
LOG_TRACE(log, reason);
out_postpone_reason = reason;
return false;
}
std::cerr << "Entry finish alter\n";
if (mutations_by_znode.count(entry.required_mutation_znode) && !mutations_by_znode.at(entry.required_mutation_znode).is_done) {
String reason = "Not altering storage because mutation " + entry.required_mutation_znode + " is not ready yet (mutation is beeing processed).";
LOG_TRACE(log, reason);
out_postpone_reason = reason;
return false;
}
}
......
......@@ -117,6 +117,7 @@ private:
String latest_fail_reason;
};
/// Mapping from znode path to Mutations Status
std::map<String, MutationStatus> mutations_by_znode;
std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;
/// Znode ID of the latest mutation that is done.
......@@ -339,7 +340,7 @@ public:
/// Adds a subscriber
SubscriberHandler addSubscriber(SubscriberCallBack && callback);
Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const;
Names getCurrentPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const;
struct Status
{
......
......@@ -199,7 +199,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule();
storage.cleanup_thread.start();
storage.alter_thread.start();
//storage.alter_thread.start();
storage.part_check_thread.start();
return true;
......@@ -346,7 +346,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
storage.mutations_finalizing_task->deactivate();
storage.cleanup_thread.stop();
storage.alter_thread.stop();
//storage.alter_thread.stop();
storage.part_check_thread.stop();
LOG_TRACE(log, "Threads finished");
......
......@@ -39,6 +39,7 @@ public:
for (auto & pipe : pipes)
streams.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipe)));
//std::cerr << "Streams header:" << streams.back()->getHeader().dumpStructure() << std::endl;
return streams;
}
......
......@@ -2,12 +2,14 @@
#include <IO/Operators.h>
#include <Parsers/formatAST.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <DataTypes/DataTypeFactory.h>
namespace DB
......@@ -19,7 +21,7 @@ namespace ErrorCodes
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN;
}
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command)
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command, bool parse_modify)
{
if (command->type == ASTAlterCommand::DELETE)
{
......@@ -55,6 +57,24 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command)
res.index_name = command->index->as<ASTIdentifier &>().name;
return res;
}
else if (parse_modify && command->type == ASTAlterCommand::MODIFY_COLUMN)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MutationCommand::Type::READ;
const auto & ast_col_decl = command->col_decl->as<ASTColumnDeclaration &>();
res.column_name = ast_col_decl.name;
res.data_type = DataTypeFactory::instance().get(ast_col_decl.type);
return res;
}
else if (parse_modify && command->type == ASTAlterCommand::DROP_COLUMN)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MutationCommand::Type::READ;
res.column_name = getIdentifierName(command->column);
return res;
}
else
return {};
}
......@@ -85,7 +105,7 @@ void MutationCommands::readText(ReadBuffer & in)
p_alter_commands, commands_str.data(), commands_str.data() + commands_str.length(), "mutation commands list", 0);
for (ASTAlterCommand * command_ast : commands_ast->as<ASTAlterCommandList &>().commands)
{
auto command = MutationCommand::parse(command_ast);
auto command = MutationCommand::parse(command_ast, true);
if (!command)
throw Exception("Unknown mutation command type: " + DB::toString<int>(command_ast->type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
push_back(std::move(*command));
......
......@@ -27,7 +27,7 @@ struct MutationCommand
DELETE,
UPDATE,
MATERIALIZE_INDEX,
CAST /// for ALTER MODIFY column
READ
};
Type type = EMPTY;
......@@ -46,7 +46,7 @@ struct MutationCommand
String column_name;
DataTypePtr data_type;
static std::optional<MutationCommand> parse(ASTAlterCommand * command);
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool parse_modify=false);
};
/// Multiple mutation commands, possible from different ALTER queries
......
......@@ -208,7 +208,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name_, table_name_)),
replica_name(global_context.getMacros()->expand(replica_name_, database_name_, table_name_)),
reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()),
queue(*this), fetcher(*this), cleanup_thread(*this), alter_thread(*this),
queue(*this), fetcher(*this), cleanup_thread(*this),
part_check_thread(*this), restarting_thread(*this)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
......@@ -1156,34 +1156,37 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
}
bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree::LogEntry & entry)
bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree::LogEntry & /*entry*/)
{
std::cerr << "Trying to finish alter\n";
auto zookeeper = getZooKeeper();
String columns_path = zookeeper_path + "/columns";
auto columns_znode = zookeeper->get(columns_path);
if (!columns_znode.exists)
String columns_str;
Coordination::Stat columns_znode_stat;
if (!zookeeper->tryGet(columns_path, columns_str, &columns_znode_stat))
throw Exception(columns_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int32_t columns_version = columns_znode.stat.version;
int32_t columns_version_zk = columns_znode_stat.version;
String metadata_path = zookeeper_path + "/metadata";
auto metadata_znode = zookeeper->get(metadata_path);
if (!metadata_znode.exists)
String metadata_str;
Coordination::Stat metadata_znode_stat;
if (!zookeeper->tryGet(metadata_path, metadata_str, &metadata_znode_stat))
throw Exception(metadata_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
int32_t metadata_version = metadata_znode.stat.version;
int32_t metadata_version_zk = metadata_znode_stat.version;
const bool changed_columns_version = (columns_version_zk != this->columns_version);
const bool changed_metadata_version = (metadata_version_zk != this->metadata_version);
const bool changed_columns_version = (columns_version != storage.columns_version);
const bool changed_metadata_version = (metadata_version != storage.metadata_version);
std::cerr << "Versions changed: columns" << changed_columns_version << " metadata:" << changed_metadata_version << std::endl;
if (!(changed_columns_version || changed_metadata_version))
return;
return true;
const String & columns_str = columns_znode.contents;
auto columns_in_zk = ColumnsDescription::parse(columns_str);
const String & metadata_str = metadata_znode.contents;
auto metadata_in_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
MergeTreeData::DataParts parts;
......@@ -1194,7 +1197,7 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree
auto table_lock = lockExclusively(RWLockImpl::NO_QUERY);
if (columns_in_zk == storage.getColumns() && metadata_diff.empty())
if (columns_in_zk == getColumns() && metadata_diff.empty())
{
LOG_INFO(
log,
......@@ -1210,8 +1213,8 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree
LOG_INFO(log, "Applied changes to the metadata of the table.");
}
columns_version = columns_version;
metadata_version = metadata_version;
this->columns_version = columns_version_zk;
this->metadata_version = metadata_version_zk;
recalculateColumnSizes();
/// Update metadata ZK nodes for a specific replica.
......@@ -1220,6 +1223,7 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree
if (changed_metadata_version)
zookeeper->set(replica_path + "/metadata", metadata_str);
}
return true;
}
bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry)
......@@ -3291,13 +3295,32 @@ void StorageReplicatedMergeTree::alter(
}
auto ast_to_str = [](ASTPtr query) -> String
struct ChangedNode
{
ChangedNode(const String & table_path_, String name_, String new_value_)
: table_path(table_path_), name(std::move(name_)), shared_path(table_path + "/" + name), new_value(std::move(new_value_))
{
}
const String & table_path;
String name;
String shared_path;
String new_value;
int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck.
};
auto ast_to_str = [](ASTPtr query) -> String {
if (!query)
return "";
return queryToString(query);
};
std::cerr << " Columns preparation to alter:" << getColumns().getAllPhysical().toString() << std::endl;
/// /columns and /metadata nodes
std::vector<ChangedNode> changed_nodes;
{
/// Just to read current structure. Alter will be done in separate thread.
auto table_lock = lockStructureForShare(false, query_context.getCurrentQueryId());
......@@ -3357,215 +3380,21 @@ void StorageReplicatedMergeTree::alter(
entry.type = LogEntry::FINISH_ALTER;
entry.source_replica = replica_name;
if (maybe_mutation_commands)
{
ReplicatedMergeTreeMutationEntry entry = mutateImpl(*maybe_mutation_commands, context);
entry.source_parts = queue.getPartNamesToMutate(entry);
}
std::cerr << " Columns before mutation:" << getColumns().getAllPhysical().toString() << std::endl;
entry.new_part_name = new_part_name;
entry.new_part_name = "";
entry.create_time = time(nullptr);
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
String path_created = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
waitForAllReplicasToProcessLogEntry(entry);
///// Wait until all replicas will apply ALTER.
//for (const auto & node : changed_nodes)
//{
// Coordination::Stat stat;
// /// Subscribe to change of shared ZK metadata nodes, to finish waiting if someone will do another ALTER.
// if (!getZooKeeper()->exists(node.shared_path, &stat, alter_query_event))
// throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
// if (stat.version != node.new_version)
// {
// LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " +
// "overlapping ALTER-s are fine but use caution with nontransitive changes");
// return;
// }
//}
//Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
//std::set<String> inactive_replicas;
//std::set<String> timed_out_replicas;
//time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout;
///// This code is quite similar with waitMutationToFinishOnReplicas
///// but contains more complicated details (versions manipulations, multiple nodes, etc.).
///// It will be removed soon in favor of alter-modify implementation on top of mutations.
///// TODO (alesap)
//for (const String & replica : replicas)
//{
// LOG_DEBUG(log, "Waiting for " << replica << " to apply changes");
// while (!partial_shutdown_called)
// {
// auto zookeeper = getZooKeeper();
// /// Replica could be inactive.
// if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
// {
// LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query."
// " ALTER will be done asynchronously when replica becomes active.");
// inactive_replicas.emplace(replica);
// break;
// }
// struct ReplicaNode
// {
// explicit ReplicaNode(String path_) : path(std::move(path_)) {}
// String path;
// String value;
// int32_t version = -1;
// };
// std::vector<ReplicaNode> replica_nodes;
// for (const auto & node : changed_nodes)
// replica_nodes.emplace_back(node.getReplicaPath(replica));
// bool replica_was_removed = false;
// for (auto & node : replica_nodes)
// {
// Coordination::Stat stat;
// /// Replica could has been removed.
// if (!zookeeper->tryGet(node.path, node.value, &stat))
// {
// LOG_WARNING(log, replica << " was removed");
// replica_was_removed = true;
// break;
// }
// node.version = stat.version;
// }
// if (replica_was_removed)
// break;
// bool alter_was_applied = true;
// for (size_t i = 0; i < replica_nodes.size(); ++i)
// {
// if (replica_nodes[i].value != changed_nodes[i].new_value)
// {
// alter_was_applied = false;
// break;
// }
// }
// /// The ALTER has been successfully applied.
// if (alter_was_applied)
// break;
// for (const auto & node : changed_nodes)
// {
// Coordination::Stat stat;
// if (!zookeeper->exists(node.shared_path, &stat))
// throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE);
// if (stat.version != node.new_version)
// {
// LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; "
// "overlapping ALTER-s are fine but use caution with nontransitive changes");
// return;
// }
// }
// bool replica_nodes_changed_concurrently = false;
// for (const auto & replica_node : replica_nodes)
// {
// Coordination::Stat stat;
// if (!zookeeper->exists(replica_node.path, &stat, alter_query_event))
// {
// LOG_WARNING(log, replica << " was removed");
// replica_was_removed = true;
// break;
// }
// if (stat.version != replica_node.version)
// {
// replica_nodes_changed_concurrently = true;
// break;
// }
// }
// if (replica_was_removed)
// break;
// if (replica_nodes_changed_concurrently)
// continue;
// /// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata
// /// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata,
// /// which is common for all replicas. If changes happen with this nodes (delete, set and create)
// /// than event will be notified and wait will be interrupted.
// ///
// /// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and
// /// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer*
// /// concurrent alter from other replica. First of all it will update shared nodes and we will have no
// /// ability to identify, that our *current* alter finshed. So we cannot do anything better than just
// /// return from *current* alter with success result.
// if (!replication_alter_columns_timeout)
// {
// alter_query_event->wait();
// /// Everything is fine.
// }
// else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000))
// {
// /// Everything is fine.
// }
// else
// {
// LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER."
// " ALTER will be done asynchronously.");
// timed_out_replicas.emplace(replica);
// break;
// }
// }
// if (partial_shutdown_called)
// throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.",
// ErrorCodes::UNFINISHED);
// if (!inactive_replicas.empty() || !timed_out_replicas.empty())
// {
// std::stringstream exception_message;
// exception_message << "Alter is not finished because";
// if (!inactive_replicas.empty())
// {
// exception_message << " some replicas are inactive right now";
// for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it)
// exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it;
// }
// if (!timed_out_replicas.empty() && !inactive_replicas.empty())
// exception_message << " and";
// if (!timed_out_replicas.empty())
// {
// exception_message << " timeout when waiting for some replicas";
// for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it)
// exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it;
// exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")";
// }
// exception_message << ". Alter will be done asynchronously.";
// throw Exception(exception_message.str(), ErrorCodes::UNFINISHED);
// }
//}
//LOG_DEBUG(log, "ALTER finished");
if (!maybe_mutation_commands.empty())
{
std::cerr << "We have mutation commands:" << maybe_mutation_commands.size() << std::endl;
ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, query_context);
}
}
void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context)
......@@ -4519,10 +4348,10 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{
mutateImpl(commands, context);
mutateImpl(commands, query_context);
}
StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & query_context)
ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & query_context)
{
/// Overview of the mutation algorithm.
///
......@@ -4627,16 +4456,16 @@ StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const
}
/// we have to wait
if (query_context.getSettingsRef().mutations_sync != 0)
{
//if (query_context.getSettingsRef().mutations_sync != 0)
//{
Strings replicas;
if (query_context.getSettingsRef().mutations_sync == 2) /// wait for all replicas
replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself
replicas.push_back(replica_path);
//else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself
// replicas.push_back(replica_path);
waitMutationToFinishOnReplicas(replicas, entry.znode_name);
}
//}
return entry;
}
......
......@@ -288,9 +288,6 @@ private:
/// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread;
/// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes.
ReplicatedMergeTreeAlterThread alter_thread;
/// A thread that checks the data of the parts, as well as the queue of the parts to be checked.
ReplicatedMergeTreePartCheckThread part_check_thread;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册