diff --git a/dbms/include/DB/Interpreters/Join.h b/dbms/include/DB/Interpreters/Join.h index a436bfa31cc2a4d463cd2f582249f9176b075f02..5e3df42b1c1375d10251b3518e554f660b7b1ed8 100644 --- a/dbms/include/DB/Interpreters/Join.h +++ b/dbms/include/DB/Interpreters/Join.h @@ -169,6 +169,13 @@ private: size_t max_bytes; OverflowMode overflow_mode; + /** Защищает работу с состоянием в функциях insertFromBlock и joinBlock. + * Эти функции могут вызываться одновременно из разных потоков только при использовании StorageJoin, + * и StorageJoin вызывает только эти две функции. + * Поэтому остальные функции не защинены. + */ + mutable Poco::RWLock rwlock; + void init(Set::Type type_); template diff --git a/dbms/include/DB/Storages/StorageJoin.h b/dbms/include/DB/Storages/StorageJoin.h new file mode 100644 index 0000000000000000000000000000000000000000..96d72053bd22e111f6c45b1eb8fe7a721414c310 --- /dev/null +++ b/dbms/include/DB/Storages/StorageJoin.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include + + +namespace DB +{ + +/** Позволяет сохранить состояние для последующего использования в правой части JOIN. + * При вставке в таблицу, данные будут вставлены в состояние, + * а также записаны в файл-бэкап, для восстановления после перезапуска. + * Чтение из таблицы напрямую невозможно - возможно лишь указание в правой части JOIN. + * + * NOTE: В основном, повторяет StorageSet. Можно обобщить. + */ +class StorageJoin : public IStorage +{ +public: + static StoragePtr create( + const String & path_, + const String & name_, + const Names & key_names_, + ASTJoin::Kind kind_, ASTJoin::Strictness strictness_, + NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_) + { + return (new StorageJoin{ + path_, name_, columns_, + materialized_columns_, alias_columns_, column_defaults_})->thisPtr(); + } + + String getName() const override { return "Join"; } + String getTableName() const override { return name; } + + const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } + + BlockOutputStreamPtr write(ASTPtr query) override; + + void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; + + /// Получить доступ к внутренностям. + JoinPtr & getJoin() { return join; } + +private: + String path; + String name; + NamesAndTypesListPtr columns; + + UInt64 increment = 0; /// Для имён файлов бэкапа. + JoinPtr join; + + StorageJoin( + const String & path_, + const String & name_, + const Names & key_names_, + ASTJoin::Kind kind_, ASTJoin::Strictness strictness_, + NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_); + + /// Восстановление из бэкапа. + void restore(); + void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory); +}; + +} diff --git a/dbms/include/DB/Storages/StorageSet.h b/dbms/include/DB/Storages/StorageSet.h index f590d520f5cb62ab801ffbce8352b687d48fd316..94b93e533f9aa835739ddd7f662512682c537b92 100644 --- a/dbms/include/DB/Storages/StorageSet.h +++ b/dbms/include/DB/Storages/StorageSet.h @@ -7,6 +7,31 @@ namespace DB { + +/** Общая часть StorageSet и StorageJoin. + */ +class StorageSetAndJoinBase : public IStorage +{ +protected: + String path; + String name; + NamesAndTypesListPtr columns; + + UInt64 increment = 0; /// Для имён файлов бэкапа. + + String getTableName() const override { return name; } + const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } + + void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override; + + /// Восстановление из бэкапа. + void restore(); + void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory); + + virtual void insertBlock(const Block & block) = 0; +}; + + /** Позволяет сохранить множество для последующего использования в правой части оператора IN. * При вставке в таблицу, данные будут вставлены в множество, * а также записаны в файл-бэкап, для восстановления после перезапуска. diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index bd04816e9210350e3e27f9cabea016d3037db1db..ff8575370e8690ab59f4e0a69879f6fa2840d772 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -261,6 +261,8 @@ void Join::insertFromBlockImpl(Maps & maps, size_t rows, const ConstColumnPlainP bool Join::insertFromBlock(const Block & block) { + Poco::ScopedWriteRWLock lock(rwlock); + size_t keys_size = key_names_right.size(); ConstColumnPlainPtrs key_columns(keys_size); @@ -530,6 +532,8 @@ void Join::joinBlockImpl(Block & block, Maps & maps) void Join::joinBlock(Block & block) { + Poco::ScopedReadRWLock lock(rwlock); + if (kind == ASTJoin::Left && strictness == ASTJoin::Any) joinBlockImpl(block, maps_any); else if (kind == ASTJoin::Inner && strictness == ASTJoin::Any) diff --git a/dbms/src/Storages/StorageJoin.cpp b/dbms/src/Storages/StorageJoin.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b1bc141c425e8e091ed6be50b529b4c5ac5b8bd4 --- /dev/null +++ b/dbms/src/Storages/StorageJoin.cpp @@ -0,0 +1,141 @@ +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +class JoinBlockOutputStream : public IBlockOutputStream +{ +public: + JoinBlockOutputStream(JoinPtr & join_, const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_) + : join(join_), + backup_path(backup_path_), backup_tmp_path(backup_tmp_path_), + backup_file_name(backup_file_name_), + backup_buf(backup_tmp_path + backup_file_name), + compressed_backup_buf(backup_buf), + backup_stream(compressed_backup_buf) + { + } + + void write(const Block & block) override + { + join->insertFromBlock(block); + backup_stream.write(block); + } + + void writeSuffix() override + { + backup_stream.flush(); + compressed_backup_buf.next(); + backup_buf.next(); + + Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name); + } + +private: + JoinPtr join; + String backup_path; + String backup_tmp_path; + String backup_file_name; + WriteBufferFromFile backup_buf; + CompressedWriteBuffer compressed_backup_buf; + NativeBlockOutputStream backup_stream; +}; + + +BlockOutputStreamPtr StorageJoin::write(ASTPtr query) +{ + ++increment; + return new JoinBlockOutputStream(join, path, path + "tmp/", toString(increment) + ".bin"); +} + + +StorageJoin::StorageJoin( + const String & path_, + const String & name_, + NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_) + : IStorage{materialized_columns_, alias_columns_, column_defaults_}, + path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_) +{ + restore(); +} + + +void StorageJoin::restore() +{ + Poco::File tmp_dir(path + "tmp/"); + if (!tmp_dir.exists()) + { + tmp_dir.createDirectories(); + return; + } + + constexpr auto file_suffix = ".bin"; + constexpr auto file_suffix_size = strlen(file_suffix); + + DataTypeFactory data_type_factory; + + Poco::DirectoryIterator dir_end; + for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it) + { + const auto & name = dir_it.name(); + + if (dir_it->isFile() + && name.size() > file_suffix_size + && 0 == name.compare(name.size() - file_suffix_size, file_suffix_size, file_suffix) + && dir_it->getSize() > 0) + { + /// Вычисляем максимальный номер имеющихся файлов с бэкапом, чтобы добавлять следующие файлы с большими номерами. + UInt64 file_num = parse(name.substr(0, name.size() - file_suffix_size)); + if (file_num > increment) + increment = file_num; + + restoreFromFile(dir_it->path(), data_type_factory); + } + } +} + + +void StorageJoin::restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory) +{ + ReadBufferFromFile backup_buf(file_path); + CompressedReadBuffer compressed_backup_buf(backup_buf); + NativeBlockInputStream backup_stream(compressed_backup_buf, data_type_factory); + + backup_stream.readPrefix(); + while (Block block = backup_stream.read()) + join->insertFromBlock(block); + backup_stream.readSuffix(); + + /// TODO Добавить скорость, сжатые байты, объём данных в памяти, коэффициент сжатия... Обобщить всё логгирование статистики в проекте. + LOG_INFO(&Logger::get("StorageJoin"), std::fixed << std::setprecision(2) + << "Loaded from backup file " << file_path << ". " + << backup_stream.getInfo().rows << " rows, " + << backup_stream.getInfo().bytes / 1048576.0 << " MiB. " + << "Join has " << join->getTotalRowCount() << " unique rows."); +} + + +void StorageJoin::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) +{ + /// Переименовываем директорию с данными. + String new_path = new_path_to_db + escapeForFileName(new_table_name); + Poco::File(path).renameTo(new_path); + + path = new_path + "/"; + name = new_table_name; +} + + +}