diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index b147c5f4887651112017a34cee72cab11b2e8a45..16163721c63e5b5bfcd6b3504e5fbe24b19a00cd 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -307,6 +307,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( } } + //std::cerr << "Source header:" << source_header.dumpStructure() << std::endl; if (storage) table_lock = storage->lockStructureForShare(false, context->getInitialQueryId()); diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 4a585cf424f3dd7939f13d1d79f0e59283b65e77..5ae7a8996d437fe95bba01eef21590eee1250f64 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -19,6 +19,7 @@ #include #include #include +#include namespace DB @@ -172,7 +173,9 @@ MutationsInterpreter::MutationsInterpreter( , context(context_) , can_execute(can_execute_) { + std::cerr << "STORAGE IS NULLPTR:" << (storage == nullptr) << std::endl; mutation_ast = prepare(!can_execute); + std::cerr << "Mutations ast:" << queryToString(mutation_ast) << std::endl; SelectQueryOptions limits = SelectQueryOptions().analyze(!can_execute).ignoreLimits(); select_interpreter = std::make_unique(mutation_ast, context, storage, limits); } @@ -259,15 +262,22 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) if (commands.empty()) throw Exception("Empty mutation commands list", ErrorCodes::LOGICAL_ERROR); + std::cerr << "PREPARING\n"; + const ColumnsDescription & columns_desc = storage->getColumns(); const IndicesDescription & indices_desc = storage->getIndices(); + std::cerr << "COLUMNS RECEIVED:" << columns_desc.toString() << std::endl; NamesAndTypesList all_columns = columns_desc.getAllPhysical(); + std::cerr << "COMMANDS SIZE:" << commands.size() << std::endl; NameSet updated_columns; for (const MutationCommand & command : commands) { for (const auto & kv : command.column_to_update_expression) + { + std::cerr << "COLUMN:" << kv.first << std::endl; updated_columns.insert(kv.first); + } } /// We need to know which columns affect which MATERIALIZED columns and data skipping indices @@ -311,6 +321,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) /// First, break a sequence of commands into stages. for (const auto & command : commands) { + std::cerr << "Processing command:" << command.ast << std::endl; if (command.type == MutationCommand::DELETE) { if (stages.empty() || !stages.back().column_to_updated.empty()) @@ -381,9 +392,16 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) const auto required_columns = syntax_result->requiredSourceColumns(); affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns)); } - else if (command.type == MutationCommand::CAST) + else if (command.type == MutationCommand::READ) { - stages.back().column_to_updated.emplace(command.column_name, makeASTFunction("CAST", command.column_name, command.type_name)); + if (stages.empty() || !stages.back().column_to_updated.empty()) + stages.emplace_back(context); + if (stages.size() == 1) /// First stage only supports filtering and can't update columns. + stages.emplace_back(context); + + /// TODO(alesap) + if (command.data_type) + stages.back().column_to_updated.emplace(command.column_name, std::make_shared(command.column_name)); } else throw Exception("Unknown mutation command type: " + DB::toString(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND); @@ -427,6 +445,8 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & prepared_stages, bool dry_run) { NamesAndTypesList all_columns = storage->getColumns().getAllPhysical(); + std::cerr << "Prepare interpreter storage columns:" << all_columns.toString() << std::endl; + /// Next, for each stage calculate columns changed by this and previous stages. for (size_t i = 0; i < prepared_stages.size(); ++i) diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.cpp b/dbms/src/Interpreters/evaluateMissingDefaults.cpp index bef41488793244730ebdf67aa6f0b9b7f36bb7d5..c5cb36e3979157efc4214864243ce2d6d50bfac2 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.cpp +++ b/dbms/src/Interpreters/evaluateMissingDefaults.cpp @@ -1,12 +1,17 @@ #include "evaluateMissingDefaults.h" +#include #include +#include #include #include #include #include #include #include +#include +#include +#include #include #include @@ -14,7 +19,8 @@ namespace DB { -static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults) +namespace { +ASTPtr defaultRequiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults) { ASTPtr default_expr_list = std::make_shared(); @@ -27,8 +33,7 @@ static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & requi /// expressions must be cloned to prevent modification by the ExpressionAnalyzer if (it != column_defaults.end()) - default_expr_list->children.emplace_back( - setAlias(it->second.expression->clone(), it->first)); + default_expr_list->children.emplace_back(setAlias(it->second.expression->clone(), it->first)); } if (default_expr_list->children.empty()) @@ -36,22 +41,49 @@ static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & requi return default_expr_list; } -void evaluateMissingDefaults(Block & block, - const NamesAndTypesList & required_columns, - const ColumnDefaults & column_defaults, - const Context & context, bool save_unneeded_columns) +ASTPtr convertRequiredExpressions(Block & block, const NamesAndTypesList & required_columns) { - if (column_defaults.empty()) - return; + ASTPtr conversion_expr_list = std::make_shared(); + for (const auto & required_column : required_columns) + { + if (!block.has(required_column.name)) + continue; + //throw Exception("Required conversion of column " + required_column.name + " which is absent in block. It's a bug", ErrorCodes::LOGICAL_ERROR); + + auto column_in_block = block.getByName(required_column.name); + //std::cerr << "Looking at:" << required_column.name << std::endl; + //std::cerr << "In block type:" << column_in_block.type->getName() << std::endl; + //std::cerr << "Required type:" << required_column.type->getName() << std::endl; + if (column_in_block.type->equals(*required_column.type)) + { + //std::cerr << "TYPES ARE SAME\n"; + continue; + } + //std::cerr << "TYPES ARE DIFFERENT\n"; + + auto cast_func = makeASTFunction( + "CAST", std::make_shared(required_column.name), std::make_shared(required_column.type->getName())); + + conversion_expr_list->children.emplace_back(setAlias(cast_func, required_column.name)); - ASTPtr default_expr_list = requiredExpressions(block, required_columns, column_defaults); - if (!default_expr_list) + } + return conversion_expr_list; +} + +void executeExpressionsOnBlock( + Block & block, + ASTPtr expr_list, + bool save_unneeded_columns, + const NamesAndTypesList & required_columns, + const Context & context) +{ + if (!expr_list) return; if (!save_unneeded_columns) { - auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList()); - ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block); + auto syntax_result = SyntaxAnalyzer(context).analyze(expr_list, block.getNamesAndTypesList()); + ExpressionAnalyzer{expr_list, syntax_result, context}.getActions(true)->execute(block); return; } @@ -59,8 +91,8 @@ void evaluateMissingDefaults(Block & block, * we are going to operate on a copy instead of the original block */ Block copy_block{block}; - auto syntax_result = SyntaxAnalyzer(context).analyze(default_expr_list, block.getNamesAndTypesList()); - auto expression_analyzer = ExpressionAnalyzer{default_expr_list, syntax_result, context}; + auto syntax_result = SyntaxAnalyzer(context).analyze(expr_list, block.getNamesAndTypesList()); + auto expression_analyzer = ExpressionAnalyzer{expr_list, syntax_result, context}; auto required_source_columns = syntax_result->requiredSourceColumns(); auto rows_was = copy_block.rows(); @@ -82,7 +114,9 @@ void evaluateMissingDefaults(Block & block, copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared(), "__dummy"}); } + //std::cerr << "Block before expression:" << copy_block.dumpStructure() << std::endl; expression_analyzer.getActions(true)->execute(copy_block); + //std::cerr << "Block after expression:" << copy_block.dumpStructure() << std::endl; /// move evaluated columns to the original block, materializing them at the same time size_t pos = 0; @@ -93,9 +127,36 @@ void evaluateMissingDefaults(Block & block, auto evaluated_col = copy_block.getByName(col->name); evaluated_col.column = evaluated_col.column->convertToFullColumnIfConst(); - block.insert(pos, std::move(evaluated_col)); + if (block.has(col->name)) + block.getByName(col->name) = std::move(evaluated_col); + else + block.insert(pos, std::move(evaluated_col)); } } } } + +void performRequiredConversions(Block & block, const NamesAndTypesList & required_columns, const Context & context) +{ + ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns); + //std::cerr << queryToString(conversion_expr_list) << std::endl; + //std::cerr << "Block:" << block.dumpStructure() << std::endl; + if (conversion_expr_list->children.empty()) + return; + executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context); +} + +void evaluateMissingDefaults(Block & block, + const NamesAndTypesList & required_columns, + const ColumnDefaults & column_defaults, + const Context & context, bool save_unneeded_columns) +{ + if (column_defaults.empty()) + return; + + ASTPtr default_expr_list = defaultRequiredExpressions(block, required_columns, column_defaults); + executeExpressionsOnBlock(block, default_expr_list, save_unneeded_columns, required_columns, context); +} + +} diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.h b/dbms/src/Interpreters/evaluateMissingDefaults.h index 320fb35c9cbf5701dd890b9f5babab6b644a19b8..51db620c86f7047aa5bbe504ba639d1c5885444a 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.h +++ b/dbms/src/Interpreters/evaluateMissingDefaults.h @@ -17,4 +17,7 @@ void evaluateMissingDefaults(Block & block, const std::unordered_map & column_defaults, const Context & context, bool save_unneeded_columns = true); +void performRequiredConversions(Block & block, + const NamesAndTypesList & required_columns, + const Context & context); } diff --git a/dbms/src/Storages/AlterCommands.cpp b/dbms/src/Storages/AlterCommands.cpp index 58e21caedd6df675eb593f52c5ce60f31a2340ac..b663f2a232275d1176f1ecdfe23d08d6310e147f 100644 --- a/dbms/src/Storages/AlterCommands.cpp +++ b/dbms/src/Storages/AlterCommands.cpp @@ -49,7 +49,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ if (command_ast->type == ASTAlterCommand::ADD_COLUMN) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = AlterCommand::ADD_COLUMN; const auto & ast_col_decl = command_ast->col_decl->as(); @@ -90,7 +90,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = AlterCommand::DROP_COLUMN; command.column_name = getIdentifierName(command_ast->column); command.if_exists = command_ast->if_exists; @@ -99,7 +99,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = AlterCommand::MODIFY_COLUMN; const auto & ast_col_decl = command_ast->col_decl->as(); @@ -135,7 +135,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::COMMENT_COLUMN) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = COMMENT_COLUMN; command.column_name = getIdentifierName(command_ast->column); const auto & ast_comment = command_ast->comment->as(); @@ -146,7 +146,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_ORDER_BY) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = AlterCommand::MODIFY_ORDER_BY; command.order_by = command_ast->order_by; return command; @@ -154,7 +154,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::ADD_INDEX) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.index_decl = command_ast->index_decl; command.type = AlterCommand::ADD_INDEX; @@ -172,7 +172,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::ADD_CONSTRAINT) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.constraint_decl = command_ast->constraint_decl; command.type = AlterCommand::ADD_CONSTRAINT; @@ -190,7 +190,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ throw Exception("\"ALTER TABLE table CLEAR COLUMN column\" queries are not supported yet. Use \"CLEAR COLUMN column IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.if_exists = command_ast->if_exists; command.type = AlterCommand::DROP_CONSTRAINT; command.constraint_name = command_ast->constraint->as().name; @@ -203,7 +203,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ throw Exception("\"ALTER TABLE table CLEAR INDEX index\" queries are not supported yet. Use \"CLEAR INDEX index IN PARTITION\".", ErrorCodes::NOT_IMPLEMENTED); AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = AlterCommand::DROP_INDEX; command.index_name = command_ast->index->as().name; command.if_exists = command_ast->if_exists; @@ -213,7 +213,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_TTL) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = AlterCommand::MODIFY_TTL; command.ttl = command_ast->ttl; return command; @@ -221,7 +221,7 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ else if (command_ast->type == ASTAlterCommand::MODIFY_SETTING) { AlterCommand command; - command.ast = command_ast; + command.ast = command_ast->clone(); command.type = AlterCommand::MODIFY_SETTING; command.settings_changes = command_ast->settings_changes->as().changes; return command; @@ -499,12 +499,18 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to) bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metadata) const { + if (ignore) + return false; + + if (type == DROP_COLUMN) + return true; + if (type != MODIFY_COLUMN || data_type == nullptr) return false; for (const auto & column : metadata.columns.getAllPhysical()) { - if (column.name == column_name && !isMetadataOnlyConversion(column.type, data_type)) + if (column.name == column_name && !isMetadataOnlyConversion(column.type.get(), data_type.get())) return true; } return false; @@ -534,11 +540,11 @@ std::optional AlterCommand::tryConvertToMutationCommand(const S MutationCommand result; - result.type = MutationCommand::Type::CAST; + result.type = MutationCommand::Type::READ; result.column_name = column_name; result.data_type = data_type; result.predicate = nullptr; - result.ast = ast; + result.ast = ast->clone(); return result; } @@ -835,7 +841,7 @@ bool AlterCommands::isCommentAlter() const } -MutationCommands getMutationCommands(const StorageInMemoryMetadata & metadata) const +MutationCommands AlterCommands::getMutationCommands(const StorageInMemoryMetadata & metadata) const { MutationCommands result; for (const auto & alter_cmd : *this) diff --git a/dbms/src/Storages/IStorage.cpp b/dbms/src/Storages/IStorage.cpp index e48e989659741dda1eccf20ae526d9f09fe6e6d2..13f86b9750d615f10296ded025da5d223e83231a 100644 --- a/dbms/src/Storages/IStorage.cpp +++ b/dbms/src/Storages/IStorage.cpp @@ -101,6 +101,7 @@ Block IStorage::getSampleBlockForColumns(const Names & column_names) const Block res; NamesAndTypesList all_columns = getColumns().getAll(); + //std::cerr << "ALL LLLL COLUMNS:" << all_columns.toString() << std::endl; std::unordered_map columns_map; for (const auto & elem : all_columns) columns_map.emplace(elem.name, elem.type); @@ -120,6 +121,7 @@ Block IStorage::getSampleBlockForColumns(const Names & column_names) const } } + //std::cerr << "RES:" << res.dumpStructure() << std::endl; return res; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp index a519e2a4b71ef6246420235c93e7c9182f5c3b1e..963296dd30f0ce98c1a3142d5109c5197bbe5517 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectProcessor.cpp @@ -44,6 +44,8 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor( save_marks_in_cache(save_marks_in_cache_), virt_column_names(virt_column_names_) { + //std::cerr << "HEADER IN SELECT PROCESSOR:" << getPort().getHeader().dumpStructure() << std::endl; + //std::cerr << "STACK:" << StackTrace().toString() << std::endl; header_without_virtual_columns = getPort().getHeader(); for (auto it = virt_column_names.rbegin(); it != virt_column_names.rend(); ++it) diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 920697f3c32910dfd3c7257761de312582965c44..05e137ddb557364563046646ba560b8abd6fc61a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -241,10 +241,10 @@ MergeTreeReadTaskColumns getReadTaskColumns(const MergeTreeData & storage, const { /// Under owned_data_part->columns_lock we check that all requested columns are of the same type as in the table. /// This may be not true in case of ALTER MODIFY. - if (!pre_column_names.empty()) - storage.check(data_part->columns, pre_column_names); - if (!column_names.empty()) - storage.check(data_part->columns, column_names); + //if (!pre_column_names.empty()) + // storage.check(data_part->columns, pre_column_names); + //if (!column_names.empty()) + // storage.check(data_part->columns, column_names); const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical(); result.pre_columns = physical_columns.addTypes(pre_column_names); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 3b7c90ff9fb7dbf1ae984231832105b7f45f87aa..7a0af8fab7046ea91e7cdacd9db1b317dd6795ab 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1536,15 +1536,15 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const S } } - if (commands.isModifyingData()) - { - /// Check that type conversions are possible. - ExpressionActionsPtr unused_expression; - NameToNameMap unused_map; - bool unused_bool; - createConvertExpression(nullptr, getColumns().getAllPhysical(), metadata.columns.getAllPhysical(), - getIndices().indices, metadata.indices.indices, unused_expression, unused_map, unused_bool); - } + //if (commands.isModifyingData()) + //{ + // /// Check that type conversions are possible. + // ExpressionActionsPtr unused_expression; + // NameToNameMap unused_map; + // bool unused_bool; + // createConvertExpression(nullptr, getColumns().getAllPhysical(), metadata.columns.getAllPhysical(), + // getIndices().indices, metadata.indices.indices, unused_expression, unused_map, unused_bool); + //} } void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 3295c863ef9dcf636693935bc8991fee45926a0b..6e8756b1c814853df3c7363a2a0298e36282f0fd 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -962,6 +962,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared( data, space_reservation->getDisk(), future_part.name, future_part.part_info); + new_data_part->relative_path = "tmp_mut_" + future_part.name; new_data_part->is_temp = true; new_data_part->ttl_infos = source_part->ttl_infos; @@ -988,6 +989,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor const auto data_settings = data.getSettings(); Block in_header = in->getHeader(); + std::cerr << "Mutations header:" << in_header.dumpStructure() << std::endl; UInt64 watch_prev_elapsed = 0; MergeStageProgress stage_progress(1.0); @@ -1069,6 +1071,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor /// Don't change granularity type while mutating subset of columns auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension(); + + /// Skip updated files for (const auto & entry : updated_header) { IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) @@ -1087,6 +1091,33 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor files_to_skip.insert(index->getFileName() + mrk_extension); } + /// TODO(alesap) better + for (const auto & part_column : source_part->columns) + { + bool found = false; + for (const auto & all_column : all_columns) + { + if (part_column.name == all_column.name) + { + found = true; + break; + } + } + if (!found) + { + std::cerr << "REMOVING COLUMN:" << part_column.name << std::endl; + IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) + { + String stream_name = IDataType::getFileNameForStream(part_column.name, substream_path); + files_to_skip.insert(stream_name + ".bin"); + files_to_skip.insert(stream_name + mrk_extension); + }; + + IDataType::SubstreamPath stream_path; + part_column.type->enumerateStreams(callback, stream_path); + } + } + Poco::DirectoryIterator dir_end; for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it) { @@ -1101,37 +1132,46 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor merge_entry->columns_written = all_columns.size() - updated_header.columns(); - IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; - MergedColumnOnlyOutputStream out( - data, - updated_header, - new_part_tmp_path, - /* sync = */ false, - compression_codec, - /* skip_offsets = */ false, - std::vector(indices_to_recalc.begin(), indices_to_recalc.end()), - unused_written_offsets, - source_part->index_granularity, - &source_part->index_granularity_info - ); + new_data_part->checksums = source_part->checksums; + if (updated_header.columns() != 0) + { + IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; + MergedColumnOnlyOutputStream out( + data, + updated_header, + new_part_tmp_path, + /* sync = */ false, + compression_codec, + /* skip_offsets = */ false, + std::vector(indices_to_recalc.begin(), indices_to_recalc.end()), + unused_written_offsets, + source_part->index_granularity, + &source_part->index_granularity_info + ); + + in->readPrefix(); + out.writePrefix(); + + Block block; + while (check_not_cancelled() && (block = in->read())) + { + out.write(block); + + merge_entry->rows_written += block.rows(); + merge_entry->bytes_written_uncompressed += block.bytes(); + } - in->readPrefix(); - out.writePrefix(); + in->readSuffix(); - Block block; - while (check_not_cancelled() && (block = in->read())) - { - out.write(block); + auto changed_checksums = out.writeSuffixAndGetChecksums(); - merge_entry->rows_written += block.rows(); - merge_entry->bytes_written_uncompressed += block.bytes(); + new_data_part->checksums.add(std::move(changed_checksums)); } - in->readSuffix(); - auto changed_checksums = out.writeSuffixAndGetChecksums(); + for (const String & file_to_skip : files_to_skip) + if (new_data_part->checksums.files.count(file_to_skip)) + new_data_part->checksums.files.erase(file_to_skip); - new_data_part->checksums = source_part->checksums; - new_data_part->checksums.add(std::move(changed_checksums)); { /// Write file with checksums. WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096); @@ -1144,8 +1184,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); for (auto it = new_data_part->columns.begin(); it != new_data_part->columns.end();) { - if (source_columns_name_set.count(it->name) || updated_header.has(it->name)) + if (updated_header.has(it->name)) + { + auto updated_type = updated_header.getByName(it->name).type; + if (updated_type != it->type) + it->type = updated_type; ++it; + } + else if (source_columns_name_set.count(it->name)) + { + ++it; + } else it = new_data_part->columns.erase(it); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 09c4fe835d62ea4190bd1a956637a36ce6cd149a..043e389458aa263be3a771a4e15844c944395034 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -644,6 +644,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( } else { + std::cerr << "Spreading marks among streams\n"; res = spreadMarkRangesAmongStreams( std::move(parts_with_ranges), num_streams, @@ -763,6 +764,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, query_info.prewhere_info, true, column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false); + std::cerr << "POOL HEADER:" << pool->getHeader().dumpStructure() << std::endl; /// Let's estimate total number of rows for progress bar. LOG_TRACE(log, "Reading approx. " << total_rows << " rows with " << num_streams << " streams"); @@ -790,6 +792,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams( { RangesInDataPart & part = parts[part_index]; + std::cerr << "Creating sequential stream from part:" << part_index << std::endl; auto source = std::make_shared( data, part.data_part, max_block_size, settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache, diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp index a09bd548b6451f71dea217f29f8d9d88db974c32..b78f2bc49eb34898f0f1bc1984e76729d034c6e4 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -618,6 +618,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar } merge_tree_reader->evaluateMissingDefaults(block, columns); + merge_tree_reader->performRequiredConversions(columns); } read_result.columns.reserve(read_result.columns.size() + columns.size()); @@ -637,6 +638,8 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, Mar if (should_evaluate_missing_defaults) merge_tree_reader->evaluateMissingDefaults({}, read_result.columns); + + merge_tree_reader->performRequiredConversions(read_result.columns); } else read_result.columns.clear(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp index d308667a67bdc408f3c15a61d22b95cf72c905a8..e22b10b83194bb91f6af87d8e1ddd92db29baed7 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReadPool.cpp @@ -155,6 +155,7 @@ MarkRanges MergeTreeReadPool::getRestMarks(const MergeTreeDataPart & part, const Block MergeTreeReadPool::getHeader() const { + //////////////////std::cerr << "COLUMN NAMES IN POOL:" << column_names.front() << std::endl; return data.getSampleBlockForColumns(column_names); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index 72c31a9dcb1e86b2ed8cc667cf5776e899b4be52..f3dcf3adafc25fb1d8e290afc9e44ef67f28bd6a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -56,10 +56,32 @@ MergeTreeReader::MergeTreeReader( , mmap_threshold(mmap_threshold_) , max_read_buffer_size(max_read_buffer_size_) { + //std::cerr << "Merge tree reader created for part:" << data_part->name << std::endl; try { + for (const NameAndTypePair & column_from_part : data_part->columns) + { + columns_from_part[column_from_part.name] = column_from_part.type; + } + for (const NameAndTypePair & column : columns) - addStreams(column.name, *column.type, profile_callback_, clock_type_); + { + //std::cerr << "Column name to read:" << column.name << std::endl; + if (columns_from_part.count(column.name)) + { + //std::cerr << "With type:" << columns_from_part[column.name]->getName() << std::endl; + //std::cerr << "Original type:" << column.type->getName() << std::endl; + + addStreams(column.name, *columns_from_part[column.name], profile_callback_, clock_type_); + } + else + { + //std::cerr << "Original type:" << column.type->getName() << std::endl; + addStreams(column.name, *column.type, profile_callback_, clock_type_); + } + + } + //std::cerr << "COLUMNS IN CONSTRUCTOR:" << columns.toString() << std::endl; } catch (...) { @@ -95,12 +117,17 @@ size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t auto name_and_type = columns.begin(); for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) { - auto & [name, type] = *name_and_type; + String & name = name_and_type->name; + DataTypePtr type; + if (columns_from_part.count(name)) + type = columns_from_part[name]; + else + type = name_and_type->type; /// The column is already present in the block so we will append the values to the end. bool append = res_columns[pos] != nullptr; if (!append) - res_columns[pos] = name_and_type->type->createColumn(); + res_columns[pos] = type->createColumn(); /// To keep offsets shared. TODO Very dangerous. Get rid of this. MutableColumnPtr column = res_columns[pos]->assumeMutable(); @@ -214,6 +241,7 @@ void MergeTreeReader::readData( size_t from_mark, bool continue_reading, size_t max_rows_to_read, bool with_offsets) { + //std::cerr << "READ DATA:" << name << " with type:" << type.getName() << std::endl; auto get_stream_getter = [&](bool stream_for_prefix) -> IDataType::InputStreamGetter { return [&, stream_for_prefix](const IDataType::SubstreamPath & substream_path) -> ReadBuffer * @@ -370,7 +398,7 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns size_t num_columns = columns.size(); if (res_columns.size() != num_columns) - throw Exception("invalid number of columns passed to MergeTreeReader::fillMissingColumns. " + throw Exception("invalid number of columns passed to MergeTreeReader::evaluateMissingDefaults. " "Expected " + toString(num_columns) + ", " "got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR); @@ -400,4 +428,65 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns } } +void MergeTreeReader::performRequiredConversions(Columns & res_columns) +{ + try + { + size_t num_columns = columns.size(); + + if (res_columns.size() != num_columns) + { + throw Exception( + "invalid number of columns passed to MergeTreeReader::performRequiredConversions. " + "Expected " + + toString(num_columns) + + ", " + "got " + + toString(res_columns.size()), + ErrorCodes::LOGICAL_ERROR); + } + + Block copy_block; + auto name_and_type = columns.begin(); + //std::cerr << "DATAPART NAMES AND TYPES:" << data_part->columns.toString() << std::endl; + //std::cerr << "REQUIRED COLUMNS NAMES AND TYPES:" << columns.toString() << std::endl; + //std::cerr << "RES COLUMNS SIZE:" << res_columns.size() << std::endl; + //std::cerr << "RES COLUMNS STRUCTURE:\n"; + //for (const auto & column : res_columns) + //{ + // std::cerr << column->dumpStructure() << std::endl; + //} + + for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) + { + //std::cerr << "POS:" << pos << std::endl; + if (res_columns[pos] == nullptr) + continue; + + //std::cerr << "POS NAME:" << name_and_type->name << std::endl; + //std::cerr << "POS TYPE:" << name_and_type->type->getName() << std::endl; + if (columns_from_part.count(name_and_type->name)) + copy_block.insert({res_columns[pos], columns_from_part[name_and_type->name], name_and_type->name}); + else + copy_block.insert({res_columns[pos], name_and_type->type, name_and_type->name}); + } + + //std::cerr << "Copy block: " << copy_block.dumpStructure() << std::endl; + DB::performRequiredConversions(copy_block, columns, storage.global_context); + std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl; + + /// Move columns from block. + name_and_type = columns.begin(); + for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) + { + res_columns[pos] = std::move(copy_block.getByName(name_and_type->name).column); + } + } + catch(Exception & e) + { + /// Better diagnostics. + e.addMessage("(while reading from part " + path + ")"); + throw; + } +} } diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.h b/dbms/src/Storages/MergeTree/MergeTreeReader.h index b0642c0610818e1995aecf0a7693cf4b22fa7ece..d0562fe3300956fac0af29b199df4f437b6e84d1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.h @@ -45,6 +45,9 @@ public: /// Evaluate defaulted columns if necessary. void evaluateMissingDefaults(Block additional_columns, Columns & res_columns); + /// Perform conversions TODO(alesap) + void performRequiredConversions(Columns & res_columns); + const NamesAndTypesList & getColumns() const { return columns; } size_t numColumnsInResult() const { return columns.size(); } @@ -74,6 +77,8 @@ private: /// Columns that are read. NamesAndTypesList columns; + std::unordered_map columns_from_part; + UncompressedCache * uncompressed_cache; MarkCache * mark_cache; /// If save_marks_in_cache is false, then, if marks are not in cache, we will load them but won't save in the cache, to avoid evicting other data. diff --git a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp index dac42859eef4a51e632a066927fadb777f243c4b..8c483c3fa68ff6865825aeae05d2cec47decf76c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSelectProcessor.cpp @@ -11,25 +11,25 @@ namespace ErrorCodes extern const int MEMORY_LIMIT_EXCEEDED; } -static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part) -{ - /// Types may be different during ALTER (when this stream is used to perform an ALTER). - /// NOTE: We may use similar code to implement non blocking ALTERs. - for (const auto & name_type : data_part->columns) - { - if (header.has(name_type.name)) - { - auto & elem = header.getByName(name_type.name); - if (!elem.type->equals(*name_type.type)) - { - elem.type = name_type.type; - elem.column = elem.type->createColumn(); - } - } - } - - return std::move(header); -} +//static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & data_part) +//{ +// /// Types may be different during ALTER (when this stream is used to perform an ALTER). +// /// NOTE: We may use similar code to implement non blocking ALTERs. +// for (const auto & name_type : data_part->columns) +// { +// if (header.has(name_type.name)) +// { +// auto & elem = header.getByName(name_type.name); +// if (!elem.type->equals(*name_type.type)) +// { +// elem.type = name_type.type; +// elem.column = elem.type->createColumn(); +// } +// } +// } +// +// return std::move(header); +//} MergeTreeSelectProcessor::MergeTreeSelectProcessor( const MergeTreeData & storage_, @@ -51,7 +51,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor( bool quiet) : MergeTreeBaseSelectProcessor{ - replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_), + storage_.getSampleBlockForColumns(required_columns_), storage_, prewhere_info_, max_block_size_rows_, preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_, min_bytes_to_use_mmap_io_, max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names_}, diff --git a/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp index 372c29a3ac305c46bfad1be6f3cf1e5ada80173b..08beda32d741224e4b6dd90abcfba9accfe38f25 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeSequentialBlockInputStream.cpp @@ -35,18 +35,20 @@ MergeTreeSequentialBlockInputStream::MergeTreeSequentialBlockInputStream( addTotalRowsApprox(data_part->rows_count); header = storage.getSampleBlockForColumns(columns_to_read); - fixHeader(header); + //fixHeader(header); /// Add columns because we don't want to read empty blocks injectRequiredColumns(storage, data_part, columns_to_read); NamesAndTypesList columns_for_reader; if (take_column_types_from_storage) { + std::cerr << "Taking columns from storage\n"; const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical(); columns_for_reader = physical_columns.addTypes(columns_to_read); } else { + std::cerr << "Taking columns from data part\n"; /// take columns from data_part columns_for_reader = data_part->columns.addTypes(columns_to_read); } @@ -107,6 +109,8 @@ try if (should_evaluate_missing_defaults) reader->evaluateMissingDefaults({}, columns); + reader->performRequiredConversions(columns); + res = header.cloneEmpty(); /// Reorder columns and fill result block. diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 0599a35905d18237db37d506ce0b09ce1e569ce3..7c6cfa42873f438461f51b86d72b28ea90dc6c4d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -67,10 +67,10 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const case FINISH_ALTER: /// Just make local /metadata and /columns consistent with global out << "alter\n"; - for (const String & s : source_parts) - out << s << '\n'; - out << "finish"; + out << required_mutation_znode << "\n"; + out << "finish\n"; break; + default: throw Exception("Unknown log entry type: " + DB::toString(type), ErrorCodes::LOGICAL_ERROR); } @@ -161,18 +161,14 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) else if (type_str == "alter") { type = FINISH_ALTER; - while (!in.eof()) - { - String s; - in >> s >> "\n"; - if (s == "finish") - break; - source_parts.push_back(s); - } + in >> required_mutation_znode >> "\nfinish\n"; } + std::cerr << "Read backn\n"; in >> "\n"; + std::cerr << "Readed\n"; + /// Optional field. if (!in.eof()) in >> "quorum: " >> quorum >> "\n"; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 7d63c20ce9b24b84e9631c2b5571328254c9b9a8..f7744d5a110180b1dacd78ecbd842ef5d901d54d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -89,6 +89,9 @@ struct ReplicatedMergeTreeLogEntryData /// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory. bool detach = false; + /// For ALTER TODO(alesap) + String required_mutation_znode; + /// REPLACE PARTITION FROM command struct ReplaceRangeEntry { @@ -111,6 +114,10 @@ struct ReplicatedMergeTreeLogEntryData /// selection of merges. These parts are added to queue.virtual_parts. Strings getVirtualPartNames() const { + /// Doesn't produce any part + if (type == FINISH_ALTER) + return {}; + /// DROP_RANGE does not add a real part, but we must disable merges in that range if (type == DROP_RANGE) return {new_part_name}; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 41e1d408b52f4e46762df5a8fc5676d85e39b761..9af93996c0c61c679479c348386225cf37eef5ab 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -517,6 +517,8 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, C } catch (...) { + std::cerr << "DIE HERE:\n"; + tryLogCurrentException(log); /// If it fails, the data in RAM is incorrect. In order to avoid possible further corruption of data in ZK, we will kill ourselves. /// This is possible only if there is an unknown logical error. std::terminate(); @@ -532,7 +534,8 @@ void ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, C } -static Names getPartNamesToMutate( +namespace { +Names getPartNamesToMutate( const ReplicatedMergeTreeMutationEntry & mutation, const ActiveDataPartSet & parts) { Names result; @@ -556,8 +559,9 @@ static Names getPartNamesToMutate( return result; } +} -Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const +Names ReplicatedMergeTreeQueue::getCurrentPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const { return getPartNamesToMutate(entry, current_parts); } @@ -1008,16 +1012,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (entry.type == LogEntry::FINISH_ALTER) { - for (const auto & name : entry.source_parts) - { - if (future_parts.count(name)) - { - String reason = "Not altering storage because part " + name - + " is not ready yet (log entry for that part is being processed)."; - LOG_TRACE(log, reason); - out_postpone_reason = reason; - return false; - } + std::cerr << "Entry finish alter\n"; + if (mutations_by_znode.count(entry.required_mutation_znode) && !mutations_by_znode.at(entry.required_mutation_znode).is_done) { + String reason = "Not altering storage because mutation " + entry.required_mutation_znode + " is not ready yet (mutation is beeing processed)."; + LOG_TRACE(log, reason); + out_postpone_reason = reason; + return false; } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 6ed2f1889edc24c21082248a38eec473e152027f..cad23df6f46602431174a8fbae01d776b29bf197 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -117,6 +117,7 @@ private: String latest_fail_reason; }; + /// Mapping from znode path to Mutations Status std::map mutations_by_znode; std::unordered_map> mutations_by_partition; /// Znode ID of the latest mutation that is done. @@ -339,7 +340,7 @@ public: /// Adds a subscriber SubscriberHandler addSubscriber(SubscriberCallBack && callback); - Names getPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const; + Names getCurrentPartNamesToMutate(ReplicatedMergeTreeMutationEntry & entry) const; struct Status { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index fce4479c16f452c517d0209f55b526ee6ad6257d..5789bd1f9a724af89459ac9cfb1f349316e82d17 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -199,7 +199,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup() storage.mutations_updating_task->activateAndSchedule(); storage.mutations_finalizing_task->activateAndSchedule(); storage.cleanup_thread.start(); - storage.alter_thread.start(); + //storage.alter_thread.start(); storage.part_check_thread.start(); return true; @@ -346,7 +346,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown() storage.mutations_finalizing_task->deactivate(); storage.cleanup_thread.stop(); - storage.alter_thread.stop(); + //storage.alter_thread.stop(); storage.part_check_thread.stop(); LOG_TRACE(log, "Threads finished"); diff --git a/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index 6865cc956faf8982afd40c483e5dfebf407801da..2ad4abf607b3eedcfb1dea651145ce519af9e229 100644 --- a/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -39,6 +39,7 @@ public: for (auto & pipe : pipes) streams.emplace_back(std::make_shared(std::move(pipe))); + //std::cerr << "Streams header:" << streams.back()->getHeader().dumpStructure() << std::endl; return streams; } diff --git a/dbms/src/Storages/MutationCommands.cpp b/dbms/src/Storages/MutationCommands.cpp index f8bc781f166b115536f9b0ee1ce4ef2dd5b515f3..2d261a8c0b79b5a69b2cdf1d91e4ecf3a3e8f232 100644 --- a/dbms/src/Storages/MutationCommands.cpp +++ b/dbms/src/Storages/MutationCommands.cpp @@ -2,12 +2,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include namespace DB @@ -19,7 +21,7 @@ namespace ErrorCodes extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN; } -std::optional MutationCommand::parse(ASTAlterCommand * command) +std::optional MutationCommand::parse(ASTAlterCommand * command, bool parse_modify) { if (command->type == ASTAlterCommand::DELETE) { @@ -55,6 +57,24 @@ std::optional MutationCommand::parse(ASTAlterCommand * command) res.index_name = command->index->as().name; return res; } + else if (parse_modify && command->type == ASTAlterCommand::MODIFY_COLUMN) + { + MutationCommand res; + res.ast = command->ptr(); + res.type = MutationCommand::Type::READ; + const auto & ast_col_decl = command->col_decl->as(); + res.column_name = ast_col_decl.name; + res.data_type = DataTypeFactory::instance().get(ast_col_decl.type); + return res; + } + else if (parse_modify && command->type == ASTAlterCommand::DROP_COLUMN) + { + MutationCommand res; + res.ast = command->ptr(); + res.type = MutationCommand::Type::READ; + res.column_name = getIdentifierName(command->column); + return res; + } else return {}; } @@ -85,7 +105,7 @@ void MutationCommands::readText(ReadBuffer & in) p_alter_commands, commands_str.data(), commands_str.data() + commands_str.length(), "mutation commands list", 0); for (ASTAlterCommand * command_ast : commands_ast->as().commands) { - auto command = MutationCommand::parse(command_ast); + auto command = MutationCommand::parse(command_ast, true); if (!command) throw Exception("Unknown mutation command type: " + DB::toString(command_ast->type), ErrorCodes::UNKNOWN_MUTATION_COMMAND); push_back(std::move(*command)); diff --git a/dbms/src/Storages/MutationCommands.h b/dbms/src/Storages/MutationCommands.h index ce84c08e6319b96e1ca5c27652c4493bc492f811..d808fbca4d5906230b7b4c7c909b69a93a45fade 100644 --- a/dbms/src/Storages/MutationCommands.h +++ b/dbms/src/Storages/MutationCommands.h @@ -27,7 +27,7 @@ struct MutationCommand DELETE, UPDATE, MATERIALIZE_INDEX, - CAST /// for ALTER MODIFY column + READ }; Type type = EMPTY; @@ -46,7 +46,7 @@ struct MutationCommand String column_name; DataTypePtr data_type; - static std::optional parse(ASTAlterCommand * command); + static std::optional parse(ASTAlterCommand * command, bool parse_modify=false); }; /// Multiple mutation commands, possible from different ALTER queries diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 3998189b81ad9f89896163b89d94d1878f65594f..15318aecefb76d8e4bd3295268287e00541ea0bc 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -208,7 +208,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name_, table_name_)), replica_name(global_context.getMacros()->expand(replica_name_, database_name_, table_name_)), reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()), - queue(*this), fetcher(*this), cleanup_thread(*this), alter_thread(*this), + queue(*this), fetcher(*this), cleanup_thread(*this), part_check_thread(*this), restarting_thread(*this) { if (!zookeeper_path.empty() && zookeeper_path.back() == '/') @@ -1156,34 +1156,37 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) } -bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree::LogEntry & entry) +bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree::LogEntry & /*entry*/) { + std::cerr << "Trying to finish alter\n"; auto zookeeper = getZooKeeper(); String columns_path = zookeeper_path + "/columns"; - auto columns_znode = zookeeper->get(columns_path); - if (!columns_znode.exists) + String columns_str; + Coordination::Stat columns_znode_stat; + if (!zookeeper->tryGet(columns_path, columns_str, &columns_znode_stat)) throw Exception(columns_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); - int32_t columns_version = columns_znode.stat.version; + int32_t columns_version_zk = columns_znode_stat.version; String metadata_path = zookeeper_path + "/metadata"; - auto metadata_znode = zookeeper->get(metadata_path); - if (!metadata_znode.exists) + String metadata_str; + Coordination::Stat metadata_znode_stat; + + if (!zookeeper->tryGet(metadata_path, metadata_str, &metadata_znode_stat)) throw Exception(metadata_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); - int32_t metadata_version = metadata_znode.stat.version; + int32_t metadata_version_zk = metadata_znode_stat.version; + + const bool changed_columns_version = (columns_version_zk != this->columns_version); + const bool changed_metadata_version = (metadata_version_zk != this->metadata_version); - const bool changed_columns_version = (columns_version != storage.columns_version); - const bool changed_metadata_version = (metadata_version != storage.metadata_version); + std::cerr << "Versions changed: columns" << changed_columns_version << " metadata:" << changed_metadata_version << std::endl; if (!(changed_columns_version || changed_metadata_version)) - return; + return true; - const String & columns_str = columns_znode.contents; auto columns_in_zk = ColumnsDescription::parse(columns_str); - - const String & metadata_str = metadata_znode.contents; auto metadata_in_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); - auto metadata_diff = ReplicatedMergeTreeTableMetadata(storage).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true); + auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true); MergeTreeData::DataParts parts; @@ -1194,7 +1197,7 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree auto table_lock = lockExclusively(RWLockImpl::NO_QUERY); - if (columns_in_zk == storage.getColumns() && metadata_diff.empty()) + if (columns_in_zk == getColumns() && metadata_diff.empty()) { LOG_INFO( log, @@ -1210,8 +1213,8 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree LOG_INFO(log, "Applied changes to the metadata of the table."); } - columns_version = columns_version; - metadata_version = metadata_version; + this->columns_version = columns_version_zk; + this->metadata_version = metadata_version_zk; recalculateColumnSizes(); /// Update metadata ZK nodes for a specific replica. @@ -1220,6 +1223,7 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree if (changed_metadata_version) zookeeper->set(replica_path + "/metadata", metadata_str); } + return true; } bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry) @@ -3291,13 +3295,32 @@ void StorageReplicatedMergeTree::alter( } - auto ast_to_str = [](ASTPtr query) -> String + struct ChangedNode { + ChangedNode(const String & table_path_, String name_, String new_value_) + : table_path(table_path_), name(std::move(name_)), shared_path(table_path + "/" + name), new_value(std::move(new_value_)) + { + } + + const String & table_path; + String name; + + String shared_path; + + String new_value; + int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck. + }; + + auto ast_to_str = [](ASTPtr query) -> String { if (!query) return ""; return queryToString(query); }; + std::cerr << " Columns preparation to alter:" << getColumns().getAllPhysical().toString() << std::endl; + + /// /columns and /metadata nodes + std::vector changed_nodes; { /// Just to read current structure. Alter will be done in separate thread. auto table_lock = lockStructureForShare(false, query_context.getCurrentQueryId()); @@ -3357,215 +3380,21 @@ void StorageReplicatedMergeTree::alter( entry.type = LogEntry::FINISH_ALTER; entry.source_replica = replica_name; - if (maybe_mutation_commands) - { - ReplicatedMergeTreeMutationEntry entry = mutateImpl(*maybe_mutation_commands, context); - entry.source_parts = queue.getPartNamesToMutate(entry); - } + std::cerr << " Columns before mutation:" << getColumns().getAllPhysical().toString() << std::endl; - entry.new_part_name = new_part_name; + entry.new_part_name = ""; entry.create_time = time(nullptr); - zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); + String path_created = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); + entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); waitForAllReplicasToProcessLogEntry(entry); - ///// Wait until all replicas will apply ALTER. - - //for (const auto & node : changed_nodes) - //{ - // Coordination::Stat stat; - // /// Subscribe to change of shared ZK metadata nodes, to finish waiting if someone will do another ALTER. - // if (!getZooKeeper()->exists(node.shared_path, &stat, alter_query_event)) - // throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); - - // if (stat.version != node.new_version) - // { - // LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " + - // "overlapping ALTER-s are fine but use caution with nontransitive changes"); - // return; - // } - //} - - //Strings replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); - - //std::set inactive_replicas; - //std::set timed_out_replicas; - - //time_t replication_alter_columns_timeout = query_context.getSettingsRef().replication_alter_columns_timeout; - - ///// This code is quite similar with waitMutationToFinishOnReplicas - ///// but contains more complicated details (versions manipulations, multiple nodes, etc.). - ///// It will be removed soon in favor of alter-modify implementation on top of mutations. - ///// TODO (alesap) - //for (const String & replica : replicas) - //{ - // LOG_DEBUG(log, "Waiting for " << replica << " to apply changes"); - - // while (!partial_shutdown_called) - // { - // auto zookeeper = getZooKeeper(); - - // /// Replica could be inactive. - // if (!zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active")) - // { - // LOG_WARNING(log, "Replica " << replica << " is not active during ALTER query." - // " ALTER will be done asynchronously when replica becomes active."); - - // inactive_replicas.emplace(replica); - // break; - // } - - // struct ReplicaNode - // { - // explicit ReplicaNode(String path_) : path(std::move(path_)) {} - - // String path; - // String value; - // int32_t version = -1; - // }; - - // std::vector replica_nodes; - // for (const auto & node : changed_nodes) - // replica_nodes.emplace_back(node.getReplicaPath(replica)); - - // bool replica_was_removed = false; - // for (auto & node : replica_nodes) - // { - // Coordination::Stat stat; - - // /// Replica could has been removed. - // if (!zookeeper->tryGet(node.path, node.value, &stat)) - // { - // LOG_WARNING(log, replica << " was removed"); - // replica_was_removed = true; - // break; - // } - - // node.version = stat.version; - // } - - // if (replica_was_removed) - // break; - - // bool alter_was_applied = true; - // for (size_t i = 0; i < replica_nodes.size(); ++i) - // { - // if (replica_nodes[i].value != changed_nodes[i].new_value) - // { - // alter_was_applied = false; - // break; - // } - // } - - // /// The ALTER has been successfully applied. - // if (alter_was_applied) - // break; - - // for (const auto & node : changed_nodes) - // { - // Coordination::Stat stat; - // if (!zookeeper->exists(node.shared_path, &stat)) - // throw Exception(node.shared_path + " doesn't exist", ErrorCodes::NOT_FOUND_NODE); - - // if (stat.version != node.new_version) - // { - // LOG_WARNING(log, node.shared_path + " changed before this ALTER finished; " - // "overlapping ALTER-s are fine but use caution with nontransitive changes"); - // return; - // } - // } - - // bool replica_nodes_changed_concurrently = false; - // for (const auto & replica_node : replica_nodes) - // { - // Coordination::Stat stat; - // if (!zookeeper->exists(replica_node.path, &stat, alter_query_event)) - // { - // LOG_WARNING(log, replica << " was removed"); - // replica_was_removed = true; - // break; - // } - - // if (stat.version != replica_node.version) - // { - // replica_nodes_changed_concurrently = true; - // break; - // } - // } - - // if (replica_was_removed) - // break; - - // if (replica_nodes_changed_concurrently) - // continue; - - // /// alter_query_event subscribed with zookeeper watch callback to /repliacs/{replica}/metadata - // /// and /replicas/{replica}/columns nodes for current relica + shared nodes /columns and /metadata, - // /// which is common for all replicas. If changes happen with this nodes (delete, set and create) - // /// than event will be notified and wait will be interrupted. - // /// - // /// ReplicatedMergeTreeAlterThread responsible for local /replicas/{replica}/metadata and - // /// /replicas/{replica}/columns changes. Shared /columns and /metadata nodes can be changed by *newer* - // /// concurrent alter from other replica. First of all it will update shared nodes and we will have no - // /// ability to identify, that our *current* alter finshed. So we cannot do anything better than just - // /// return from *current* alter with success result. - // if (!replication_alter_columns_timeout) - // { - // alter_query_event->wait(); - // /// Everything is fine. - // } - // else if (alter_query_event->tryWait(replication_alter_columns_timeout * 1000)) - // { - // /// Everything is fine. - // } - // else - // { - // LOG_WARNING(log, "Timeout when waiting for replica " << replica << " to apply ALTER." - // " ALTER will be done asynchronously."); - - // timed_out_replicas.emplace(replica); - // break; - // } - // } - - // if (partial_shutdown_called) - // throw Exception("Alter is not finished because table shutdown was called. Alter will be done after table restart.", - // ErrorCodes::UNFINISHED); - - // if (!inactive_replicas.empty() || !timed_out_replicas.empty()) - // { - // std::stringstream exception_message; - // exception_message << "Alter is not finished because"; - - // if (!inactive_replicas.empty()) - // { - // exception_message << " some replicas are inactive right now"; - - // for (auto it = inactive_replicas.begin(); it != inactive_replicas.end(); ++it) - // exception_message << (it == inactive_replicas.begin() ? ": " : ", ") << *it; - // } - - // if (!timed_out_replicas.empty() && !inactive_replicas.empty()) - // exception_message << " and"; - - // if (!timed_out_replicas.empty()) - // { - // exception_message << " timeout when waiting for some replicas"; - - // for (auto it = timed_out_replicas.begin(); it != timed_out_replicas.end(); ++it) - // exception_message << (it == timed_out_replicas.begin() ? ": " : ", ") << *it; - - // exception_message << " (replication_alter_columns_timeout = " << replication_alter_columns_timeout << ")"; - // } - - // exception_message << ". Alter will be done asynchronously."; - - // throw Exception(exception_message.str(), ErrorCodes::UNFINISHED); - // } - //} - - //LOG_DEBUG(log, "ALTER finished"); + if (!maybe_mutation_commands.empty()) + { + std::cerr << "We have mutation commands:" << maybe_mutation_commands.size() << std::endl; + ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, query_context); + } } void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) @@ -4519,10 +4348,10 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context) { - mutateImpl(commands, context); + mutateImpl(commands, query_context); } -StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & query_context) +ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & query_context) { /// Overview of the mutation algorithm. /// @@ -4627,16 +4456,16 @@ StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const } /// we have to wait - if (query_context.getSettingsRef().mutations_sync != 0) - { + //if (query_context.getSettingsRef().mutations_sync != 0) + //{ Strings replicas; if (query_context.getSettingsRef().mutations_sync == 2) /// wait for all replicas replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas"); - else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself - replicas.push_back(replica_path); + //else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself + // replicas.push_back(replica_path); waitMutationToFinishOnReplicas(replicas, entry.znode_name); - } + //} return entry; } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 0ccdf046892699839ef36ee77242d612449a5dfb..6ad7b08b34ef5a08791b4509823451156a85bd5d 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -288,9 +288,6 @@ private: /// A thread that removes old parts, log entries, and blocks. ReplicatedMergeTreeCleanupThread cleanup_thread; - /// A thread monitoring changes to the column list in ZooKeeper and updating the parts in accordance with these changes. - ReplicatedMergeTreeAlterThread alter_thread; - /// A thread that checks the data of the parts, as well as the queue of the parts to be checked. ReplicatedMergeTreePartCheckThread part_check_thread;