diff --git a/dbms/src/Storages/TrivialBuffer.cpp b/dbms/src/Storages/TrivialBuffer.cpp index 7f2702bce6b7a5c4555e190e0a4a2bfeae94d816..2d08b4ef8b43e16e2f5f30754f0818a9f07f01b0 100644 --- a/dbms/src/Storages/TrivialBuffer.cpp +++ b/dbms/src/Storages/TrivialBuffer.cpp @@ -18,18 +18,18 @@ namespace ProfileEvents { - extern const Event TrivialBufferFlush; - extern const Event TrivialBufferErrorOnFlush; - extern const Event TrivialBufferPassedAllMinThresholds; - extern const Event TrivialBufferPassedTimeMaxThreshold; - extern const Event TrivialBufferPassedRowsMaxThreshold; - extern const Event TrivialBufferPassedBytesMaxThreshold; + extern const Event TrivialBufferFlush; + extern const Event TrivialBufferErrorOnFlush; + extern const Event TrivialBufferPassedAllMinThresholds; + extern const Event TrivialBufferPassedTimeMaxThreshold; + extern const Event TrivialBufferPassedRowsMaxThreshold; + extern const Event TrivialBufferPassedBytesMaxThreshold; } namespace CurrentMetrics { - extern const Metric TrivialBufferRows; - extern const Metric TrivialBufferBytes; + extern const Metric TrivialBufferRows; + extern const Metric TrivialBufferBytes; } @@ -38,347 +38,347 @@ namespace DB namespace ErrorCodes { - extern const int INFINITE_LOOP; - extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE; + extern const int INFINITE_LOOP; + extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE; } StoragePtr TrivialBuffer::create(const std::string & name_, NamesAndTypesListPtr columns_, - const NamesAndTypesList & materialized_columns_, - const NamesAndTypesList & alias_columns_, - const ColumnDefaults & column_defaults_, - Context & context_, const size_t num_blocks_to_deduplicate_, - const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, const size_t num_blocks_to_deduplicate_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, const String & destination_database_, const String & destination_table_) { - return make_shared( - name_, columns_, materialized_columns_, alias_columns_, column_defaults_, - context_, num_blocks_to_deduplicate_, min_thresholds_, max_thresholds_, - destination_database_, destination_table_); + return make_shared( + name_, columns_, materialized_columns_, alias_columns_, column_defaults_, + context_, num_blocks_to_deduplicate_, min_thresholds_, max_thresholds_, + destination_database_, destination_table_); } TrivialBuffer::TrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_, - const NamesAndTypesList & materialized_columns_, - const NamesAndTypesList & alias_columns_, - const ColumnDefaults & column_defaults_, - Context & context_, const size_t num_blocks_to_deduplicate_, - const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, - const String & destination_database_, const String & destination_table_) - : IStorage{materialized_columns_, alias_columns_, column_defaults_}, - name(name_), columns(columns_), context(context_), - num_blocks_to_deduplicate(num_blocks_to_deduplicate_), - current_hashes(std::make_unique()), - previous_hashes(std::make_unique()), - min_thresholds(min_thresholds_), max_thresholds(max_thresholds_), - destination_database(destination_database_), destination_table(destination_table_), - no_destination(destination_database.empty() && destination_table.empty()), - log(&Logger::get("TrivialBuffer (" + name + ")")), - flush_thread(&TrivialBuffer::flushThread, this) + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, const size_t num_blocks_to_deduplicate_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const String & destination_database_, const String & destination_table_) + : IStorage{materialized_columns_, alias_columns_, column_defaults_}, + name(name_), columns(columns_), context(context_), + num_blocks_to_deduplicate(num_blocks_to_deduplicate_), + current_hashes(std::make_unique()), + previous_hashes(std::make_unique()), + min_thresholds(min_thresholds_), max_thresholds(max_thresholds_), + destination_database(destination_database_), destination_table(destination_table_), + no_destination(destination_database.empty() && destination_table.empty()), + log(&Logger::get("TrivialBuffer (" + name + ")")), + flush_thread(&TrivialBuffer::flushThread, this) { } class TrivialBufferBlockInputStream : public IProfilingBlockInputStream { public: - TrivialBufferBlockInputStream(const Names & column_names_, TrivialBuffer & buffer_) - : column_names(column_names_.begin(), column_names_.end()), buffer(buffer_) {} + TrivialBufferBlockInputStream(const Names & column_names_, TrivialBuffer & buffer_) + : column_names(column_names_.begin(), column_names_.end()), buffer(buffer_) {} - String getName() const { return "TrivialBuffer"; } + String getName() const { return "TrivialBuffer"; } - String getID() const - { - std::stringstream res; - res << "TrivialBuffer(" << &buffer; + String getID() const + { + std::stringstream res; + res << "TrivialBuffer(" << &buffer; - for (const auto & name : column_names) - res << ", " << name; + for (const auto & name : column_names) + res << ", " << name; - res << ")"; - return res.str(); - } + res << ")"; + return res.str(); + } protected: - Block readImpl() - { - Block res; + Block readImpl() + { + Block res; - if (has_been_read) - return res; - has_been_read = true; + if (has_been_read) + return res; + has_been_read = true; - std::lock_guard lock(buffer.mutex); + std::lock_guard lock(buffer.mutex); - for (auto & block : buffer.data) - { - if (!block.rows()) - continue; + for (auto & block : buffer.data) + { + if (!block.rows()) + continue; - for (const auto & name : column_names) - { - auto & col = block.getByName(name); - res.insert(ColumnWithTypeAndName(col.column->clone(), col.type, name)); - } - } + for (const auto & name : column_names) + { + auto & col = block.getByName(name); + res.insert(ColumnWithTypeAndName(col.column->clone(), col.type, name)); + } + } - return res; - } + return res; + } private: - Names column_names; - TrivialBuffer & buffer; - bool has_been_read = false; + Names column_names; + TrivialBuffer & buffer; + bool has_been_read = false; }; BlockInputStreams TrivialBuffer::read( - const Names & column_names, - ASTPtr query, - const Context & context, - const Settings & settings, - QueryProcessingStage::Enum & processed_stage, - size_t max_block_size, - unsigned threads) + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size, + unsigned threads) { - processed_stage = QueryProcessingStage::FetchColumns; + processed_stage = QueryProcessingStage::FetchColumns; - BlockInputStreams streams; + BlockInputStreams streams; - if (!no_destination) - { - auto destination = context.getTable(destination_database, destination_table); + if (!no_destination) + { + auto destination = context.getTable(destination_database, destination_table); - if (destination.get() == this) - throw Exception("Destination table is myself. Read will cause infinite loop.", - ErrorCodes::INFINITE_LOOP); + if (destination.get() == this) + throw Exception("Destination table is myself. Read will cause infinite loop.", + ErrorCodes::INFINITE_LOOP); - /** Отключаем оптимизацию "перенос в PREWHERE", - * так как Buffer не поддерживает PREWHERE. - */ - Settings modified_settings = settings; - modified_settings.optimize_move_to_prewhere = false; + /** Отключаем оптимизацию "перенос в PREWHERE", + * так как Buffer не поддерживает PREWHERE. + */ + Settings modified_settings = settings; + modified_settings.optimize_move_to_prewhere = false; - streams = destination->read(column_names, query, context, modified_settings, - processed_stage, max_block_size, threads); - } + streams = destination->read(column_names, query, context, modified_settings, + processed_stage, max_block_size, threads); + } - streams.push_back(std::make_shared(column_names, - *this)); + streams.push_back(std::make_shared(column_names, + *this)); - /** Если источники из таблицы были обработаны до какой-то не начальной стадии выполнения запроса, - * то тогда источники из буферов надо тоже обернуть в конвейер обработки до той же стадии. - */ - if (processed_stage > QueryProcessingStage::FetchColumns) - streams.back() = InterpreterSelectQuery(query, context, processed_stage, 0, - streams.back()).execute().in; + /** Если источники из таблицы были обработаны до какой-то не начальной стадии выполнения запроса, + * то тогда источники из буферов надо тоже обернуть в конвейер обработки до той же стадии. + */ + if (processed_stage > QueryProcessingStage::FetchColumns) + streams.back() = InterpreterSelectQuery(query, context, processed_stage, 0, + streams.back()).execute().in; - return streams; + return streams; } void TrivialBuffer::addBlock(const Block & block) { - SipHash hash; - block.updateHash(hash); - HashType block_hash = hash.get64(); - - std::lock_guard lock(mutex); - if (current_hashes->find(block_hash) == current_hashes->end() - && previous_hashes->find(block_hash) == previous_hashes->end()) - { - if (current_hashes->size() >= num_blocks_to_deduplicate / 2) - { - previous_hashes = std::move(current_hashes); - current_hashes = std::make_unique(); - } - current_hashes->insert(block_hash); - current_rows += block.rows(); - current_bytes += block.bytes(); - data.push_back(block); - - CurrentMetrics::add(CurrentMetrics::TrivialBufferRows, current_rows); - CurrentMetrics::add(CurrentMetrics::TrivialBufferBytes, current_bytes); - } - else - { - //NOTE: может быть, если нашли этот блок в previous, - // то надо его хэш перенести в current? - /* - auto it = previous_hashes->find(block_hash); - if (it != previous_hashes->end()) - { - current_hashes->insert(it); - previous_hashes->erase(it); - } - */ - } + SipHash hash; + block.updateHash(hash); + HashType block_hash = hash.get64(); + + std::lock_guard lock(mutex); + if (current_hashes->find(block_hash) == current_hashes->end() + && previous_hashes->find(block_hash) == previous_hashes->end()) + { + if (current_hashes->size() >= num_blocks_to_deduplicate / 2) + { + previous_hashes = std::move(current_hashes); + current_hashes = std::make_unique(); + } + current_hashes->insert(block_hash); + current_rows += block.rows(); + current_bytes += block.bytes(); + data.push_back(block); + + CurrentMetrics::add(CurrentMetrics::TrivialBufferRows, current_rows); + CurrentMetrics::add(CurrentMetrics::TrivialBufferBytes, current_bytes); + } + else + { + //NOTE: может быть, если нашли этот блок в previous, + // то надо его хэш перенести в current? + /* + auto it = previous_hashes->find(block_hash); + if (it != previous_hashes->end()) + { + current_hashes->insert(it); + previous_hashes->erase(it); + } + */ + } } void TrivialBuffer::flush(bool check_thresholds) { - Block block_to_write; - time_t current_time = time(0); - - time_t time_passed = 0; - - if (data.empty()) - return; - - { - std::lock_guard lock(mutex); - - if (first_write_time) - time_passed = current_time - first_write_time; - - if (check_thresholds) - { - if (!checkThresholdsImpl(current_rows, current_bytes, time_passed)) - return; - } - else - { - if (current_rows == 0) - return; - } - - /// Collecting BlockList into single block. - block_to_write = data.front().cloneEmpty(); - block_to_write.checkNumberOfRows(); - for (auto & block : data) - { - block.checkNumberOfRows(); - for (size_t column_no = 0, columns = block.columns(); column_no < columns; ++column_no) - { - IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get(); - const IColumn & col_from = *block.getByName(col_to.getName()).column.get(); - - col_to.insertRangeFrom(col_from, 0, block.rows()); - } - - } - first_write_time = 0; - - ProfileEvents::increment(ProfileEvents::TrivialBufferFlush); - - LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds."); - - if (no_destination) - return; - - /** For simplicity, buffer is locked during write. - * We could unlock buffer temporary, but it would lead to too much difficulties: - * - data, that is written, will not be visible for SELECTs; - * - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written; - * - this could lead to infinite memory growth. - */ - try - { - writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table)); - data.clear(); - - CurrentMetrics::sub(CurrentMetrics::TrivialBufferRows, block_to_write.rows()); - CurrentMetrics::sub(CurrentMetrics::TrivialBufferBytes, block_to_write.bytes()); - - } - catch (...) - { - ProfileEvents::increment(ProfileEvents::TrivialBufferErrorOnFlush); - - if (!first_write_time) - first_write_time = current_time; - - /// Через некоторое время будет следующая попытка записать. - throw; - } - } + Block block_to_write; + time_t current_time = time(0); + + time_t time_passed = 0; + + if (data.empty()) + return; + + { + std::lock_guard lock(mutex); + + if (first_write_time) + time_passed = current_time - first_write_time; + + if (check_thresholds) + { + if (!checkThresholdsImpl(current_rows, current_bytes, time_passed)) + return; + } + else + { + if (current_rows == 0) + return; + } + + /// Collecting BlockList into single block. + block_to_write = data.front().cloneEmpty(); + block_to_write.checkNumberOfRows(); + for (auto & block : data) + { + block.checkNumberOfRows(); + for (size_t column_no = 0, columns = block.columns(); column_no < columns; ++column_no) + { + IColumn & col_to = *block_to_write.safeGetByPosition(column_no).column.get(); + const IColumn & col_from = *block.getByName(col_to.getName()).column.get(); + + col_to.insertRangeFrom(col_from, 0, block.rows()); + } + + } + first_write_time = 0; + + ProfileEvents::increment(ProfileEvents::TrivialBufferFlush); + + LOG_TRACE(log, "Flushing buffer with " << block_to_write.rows() << " rows, " << block_to_write.bytes() << " bytes, age " << time_passed << " seconds."); + + if (no_destination) + return; + + /** For simplicity, buffer is locked during write. + * We could unlock buffer temporary, but it would lead to too much difficulties: + * - data, that is written, will not be visible for SELECTs; + * - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written; + * - this could lead to infinite memory growth. + */ + try + { + writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table)); + data.clear(); + + CurrentMetrics::sub(CurrentMetrics::TrivialBufferRows, block_to_write.rows()); + CurrentMetrics::sub(CurrentMetrics::TrivialBufferBytes, block_to_write.bytes()); + + } + catch (...) + { + ProfileEvents::increment(ProfileEvents::TrivialBufferErrorOnFlush); + + if (!first_write_time) + first_write_time = current_time; + + /// Через некоторое время будет следующая попытка записать. + throw; + } + } } class TrivialBufferBlockOutputStream : public IBlockOutputStream { public: - TrivialBufferBlockOutputStream(TrivialBuffer & buffer_) : buffer(buffer_) {} - void write(const Block & block) override - { - if (!block) - return; - - size_t rows = block.rows(); - size_t bytes = block.bytes(); - if (!rows) - return; - - StoragePtr destination; - if (!buffer.no_destination) - { - destination = buffer.context.tryGetTable(buffer.destination_database, - buffer.destination_table); - - if (destination) - { - if (destination.get() == &buffer) - throw Exception("Destination table is myself. Write will " - "cause infinite loop.", ErrorCodes::INFINITE_LOOP); - - /// Проверяем структуру таблицы. - try - { - destination->check(block, true); - } - catch (Exception & e) - { - e.addMessage("(when looking at destination table " - + buffer.destination_database + "." - + buffer.destination_table + ")"); - throw; - } - } - } - - /// Вставляем блок в список блоков. - - time_t current_time = time(0); - if (buffer.checkThresholds(current_time, rows, bytes)) + TrivialBufferBlockOutputStream(TrivialBuffer & buffer_) : buffer(buffer_) {} + void write(const Block & block) override + { + if (!block) + return; + + size_t rows = block.rows(); + size_t bytes = block.bytes(); + if (!rows) + return; + + StoragePtr destination; + if (!buffer.no_destination) + { + destination = buffer.context.tryGetTable(buffer.destination_database, + buffer.destination_table); + + if (destination) + { + if (destination.get() == &buffer) + throw Exception("Destination table is myself. Write will " + "cause infinite loop.", ErrorCodes::INFINITE_LOOP); + + /// Проверяем структуру таблицы. + try + { + destination->check(block, true); + } + catch (Exception & e) + { + e.addMessage("(when looking at destination table " + + buffer.destination_database + "." + + buffer.destination_table + ")"); + throw; + } + } + } + + /// Вставляем блок в список блоков. + + time_t current_time = time(0); + if (buffer.checkThresholds(current_time, rows, bytes)) { /** Если после вставки в буфер, ограничения будут превышены, - * то будем сбрасывать буфер. + * то будем сбрасывать буфер. * Это также защищает от неограниченного потребления оперативки, - * так как в случае невозможности записать в таблицу, + * так как в случае невозможности записать в таблицу, * будет выкинуто исключение, а новые данные не будут добавлены в буфер. */ buffer.flush(true); } - if (!buffer.first_write_time) + if (!buffer.first_write_time) buffer.first_write_time = current_time; - buffer.addBlock(block); - } + buffer.addBlock(block); + } private: - TrivialBuffer & buffer; + TrivialBuffer & buffer; }; BlockOutputStreamPtr TrivialBuffer::write(ASTPtr query, const Settings & settings) { - return std::make_shared(*this); + return std::make_shared(*this); } void TrivialBuffer::shutdown() { - shutdown_event.set(); - - if (flush_thread.joinable()) - flush_thread.join(); - - try - { - ///NOTE: в StorageBuffer здесь используется optimize({}, {}, context.getSettings()). - /// Зачем? - flush(false); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + shutdown_event.set(); + + if (flush_thread.joinable()) + flush_thread.join(); + + try + { + ///NOTE: в StorageBuffer здесь используется optimize({}, {}, context.getSettings()). + /// Зачем? + flush(false); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } } @@ -395,162 +395,162 @@ void TrivialBuffer::shutdown() */ bool TrivialBuffer::optimize(const String & partition, bool final, const Settings & settings) { - if (!partition.empty()) - throw Exception("Partition cannot be specified when optimizing table of type Buffer", - ErrorCodes::NOT_IMPLEMENTED); + if (!partition.empty()) + throw Exception("Partition cannot be specified when optimizing table of type Buffer", + ErrorCodes::NOT_IMPLEMENTED); - if (final) - throw Exception("FINAL cannot be specified when optimizing table of type Buffer", - ErrorCodes::NOT_IMPLEMENTED); + if (final) + throw Exception("FINAL cannot be specified when optimizing table of type Buffer", + ErrorCodes::NOT_IMPLEMENTED); - flush(false); - return true; + flush(false); + return true; } bool TrivialBuffer::checkThresholds(const time_t current_time, const size_t additional_rows, - const size_t additional_bytes) const + const size_t additional_bytes) const { - time_t time_passed = 0; - if (first_write_time) - time_passed = current_time - first_write_time; + time_t time_passed = 0; + if (first_write_time) + time_passed = current_time - first_write_time; - size_t rows = current_rows + additional_rows; - size_t bytes = current_bytes + additional_bytes; + size_t rows = current_rows + additional_rows; + size_t bytes = current_bytes + additional_bytes; - return checkThresholdsImpl(rows, bytes, time_passed); + return checkThresholdsImpl(rows, bytes, time_passed); } bool TrivialBuffer::checkThresholdsImpl(const size_t rows, const size_t bytes, - const time_t time_passed) const + const time_t time_passed) const { - if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) - { - ProfileEvents::increment(ProfileEvents::TrivialBufferPassedAllMinThresholds); - return true; - } - - if (time_passed > max_thresholds.time) - { - ProfileEvents::increment(ProfileEvents::TrivialBufferPassedTimeMaxThreshold); - return true; - } - - if (rows > max_thresholds.rows) - { - ProfileEvents::increment(ProfileEvents::TrivialBufferPassedRowsMaxThreshold); - return true; - } - - if (bytes > max_thresholds.bytes) - { - ProfileEvents::increment(ProfileEvents::TrivialBufferPassedBytesMaxThreshold); - return true; - } - - return false; + if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes) + { + ProfileEvents::increment(ProfileEvents::TrivialBufferPassedAllMinThresholds); + return true; + } + + if (time_passed > max_thresholds.time) + { + ProfileEvents::increment(ProfileEvents::TrivialBufferPassedTimeMaxThreshold); + return true; + } + + if (rows > max_thresholds.rows) + { + ProfileEvents::increment(ProfileEvents::TrivialBufferPassedRowsMaxThreshold); + return true; + } + + if (bytes > max_thresholds.bytes) + { + ProfileEvents::increment(ProfileEvents::TrivialBufferPassedBytesMaxThreshold); + return true; + } + + return false; } void TrivialBuffer::flushThread() { - setThreadName("BufferFlush"); - - do - { - try - { - flush(true); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } while (!shutdown_event.tryWait(1000)); + setThreadName("BufferFlush"); + + do + { + try + { + flush(true); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } while (!shutdown_event.tryWait(1000)); } void TrivialBuffer::writeBlockToDestination(const Block & block, StoragePtr table) { - if (no_destination || !block) - return; - - if (!table) - { - LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exist. Block of data is discarded."); - return; - } - - auto insert = std::make_shared(); - - insert->database = destination_database; - insert->table = destination_table; - - /** Будем вставлять столбцы, являющиеся пересечением множества столбцов таблицы-буфера и подчинённой таблицы. - * Это позволит поддержать часть случаев (но не все), когда структура таблицы не совпадает. - */ - Block structure_of_destination_table = table->getSampleBlock(); - Names columns_intersection; - columns_intersection.reserve(block.columns()); - for (size_t i : ext::range(0, structure_of_destination_table.columns())) - { - auto dst_col = structure_of_destination_table.getByPosition(i); - if (block.has(dst_col.name)) - { - if (block.getByName(dst_col.name).type->getName() != dst_col.type->getName()) - { - LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table - << " have different type of column " << dst_col.name << ". Block of data is discarded."); - return; - } - - columns_intersection.push_back(dst_col.name); - } - } - - if (columns_intersection.empty()) - { - LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded."); - return; - } - - if (columns_intersection.size() != block.columns()) - LOG_WARNING(log, "Not all columns from block in buffer exist in destination table " - << destination_database << "." << destination_table << ". Some columns are discarded."); - - auto list_of_columns = std::make_shared(); - insert->columns = list_of_columns; - list_of_columns->children.reserve(columns_intersection.size()); - for (const String & column : columns_intersection) - list_of_columns->children.push_back(std::make_shared(StringRange(), column, ASTIdentifier::Column)); - - InterpreterInsertQuery interpreter{insert, context}; - - auto block_io = interpreter.execute(); - block_io.out->writePrefix(); - block_io.out->write(block); - block_io.out->writeSuffix(); + if (no_destination || !block) + return; + + if (!table) + { + LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exist. Block of data is discarded."); + return; + } + + auto insert = std::make_shared(); + + insert->database = destination_database; + insert->table = destination_table; + + /** Будем вставлять столбцы, являющиеся пересечением множества столбцов таблицы-буфера и подчинённой таблицы. + * Это позволит поддержать часть случаев (но не все), когда структура таблицы не совпадает. + */ + Block structure_of_destination_table = table->getSampleBlock(); + Names columns_intersection; + columns_intersection.reserve(block.columns()); + for (size_t i : ext::range(0, structure_of_destination_table.columns())) + { + auto dst_col = structure_of_destination_table.getByPosition(i); + if (block.has(dst_col.name)) + { + if (block.getByName(dst_col.name).type->getName() != dst_col.type->getName()) + { + LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table + << " have different type of column " << dst_col.name << ". Block of data is discarded."); + return; + } + + columns_intersection.push_back(dst_col.name); + } + } + + if (columns_intersection.empty()) + { + LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded."); + return; + } + + if (columns_intersection.size() != block.columns()) + LOG_WARNING(log, "Not all columns from block in buffer exist in destination table " + << destination_database << "." << destination_table << ". Some columns are discarded."); + + auto list_of_columns = std::make_shared(); + insert->columns = list_of_columns; + list_of_columns->children.reserve(columns_intersection.size()); + for (const String & column : columns_intersection) + list_of_columns->children.push_back(std::make_shared(StringRange(), column, ASTIdentifier::Column)); + + InterpreterInsertQuery interpreter{insert, context}; + + auto block_io = interpreter.execute(); + block_io.out->writePrefix(); + block_io.out->write(block); + block_io.out->writeSuffix(); } void TrivialBuffer::alter(const AlterCommands & params, const String & database_name, - const String & table_name, const Context & context) + const String & table_name, const Context & context) { - for (const auto & param : params) - if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) - throw Exception("Storage engine " + getName() + " doesn't support primary key.", - ErrorCodes::NOT_IMPLEMENTED); + for (const auto & param : params) + if (param.type == AlterCommand::MODIFY_PRIMARY_KEY) + throw Exception("Storage engine " + getName() + " doesn't support primary key.", + ErrorCodes::NOT_IMPLEMENTED); - auto lock = lockStructureForAlter(); + auto lock = lockStructureForAlter(); - /// Чтобы не осталось блоков старой структуры. - ///NOTE: опять здесь optimize({}, {}, context.getSettings()) - flush(false); + /// Чтобы не осталось блоков старой структуры. + ///NOTE: опять здесь optimize({}, {}, context.getSettings()) + flush(false); - params.apply(*columns, materialized_columns, alias_columns, column_defaults); + params.apply(*columns, materialized_columns, alias_columns, column_defaults); - context.getDatabase(database_name)->alterTable( - context, table_name, - *columns, materialized_columns, alias_columns, column_defaults, {}); + context.getDatabase(database_name)->alterTable( + context, table_name, + *columns, materialized_columns, alias_columns, column_defaults, {}); } }