From 10f188708bfd0144e6ba144b5be229c517373459 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Fri, 14 Apr 2017 02:53:11 +0300 Subject: [PATCH] Update TrivialBuffer.h --- dbms/src/Storages/TrivialBuffer.h | 194 +++++++++++++++--------------- 1 file changed, 97 insertions(+), 97 deletions(-) diff --git a/dbms/src/Storages/TrivialBuffer.h b/dbms/src/Storages/TrivialBuffer.h index e3cb500f15..a8722573f4 100644 --- a/dbms/src/Storages/TrivialBuffer.h +++ b/dbms/src/Storages/TrivialBuffer.h @@ -38,110 +38,110 @@ friend class TrivialBufferBlockInputStream; friend class TrivialBufferBlockOutputStream; public: - /// Пороги. - struct Thresholds - { - time_t time; /// Количество секунд от момента вставки первой строчки в блок. - size_t rows; /// Количество строк в блоке. - size_t bytes; /// Количество (несжатых) байт в блоке. - }; - - static StoragePtr create(const std::string & name_, NamesAndTypesListPtr columns_, - const NamesAndTypesList & materialized_columns_, - const NamesAndTypesList & alias_columns_, - const ColumnDefaults & column_defaults_, - Context & context_, size_t num_blocks_to_deduplicate_, - const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, - const String & destination_database_, const String & destination_table_); - - std::string getName() const override { return "TrivialBuffer"; } - std::string getTableName() const override { return name; } - - const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } - - BlockInputStreams read( - const Names & column_names, - ASTPtr query, - const Context & context, - const Settings & settings, - QueryProcessingStage::Enum & processed_stage, - size_t max_block_size = DEFAULT_BLOCK_SIZE, - unsigned threads = 1) override; - - BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override; - - bool checkThresholds(const time_t current_time, const size_t additional_rows = 0, - const size_t additional_bytes = 0) const; - bool checkThresholdsImpl(const size_t rows, const size_t bytes, - const time_t time_passed) const; - - /// Сбрасывает все буферы в подчинённую таблицу. - void shutdown() override; - bool optimize(const String & partition, bool final, const Settings & settings) override; - - void rename(const String & new_path_to_db, const String & new_database_name, - const String & new_table_name) override { name = new_table_name; } - - bool supportsSampling() const override { return true; } - bool supportsPrewhere() const override { return true; } - bool supportsFinal() const override { return true; } - bool supportsIndexForIn() const override { return true; } - bool supportsParallelReplicas() const override { return true; } - - /// Структура подчинённой таблицы не проверяется и не изменяется. - void alter(const AlterCommands & params, const String & database_name, - const String & table_name, const Context & context) override; + /// Пороги. + struct Thresholds + { + time_t time; /// Количество секунд от момента вставки первой строчки в блок. + size_t rows; /// Количество строк в блоке. + size_t bytes; /// Количество (несжатых) байт в блоке. + }; + + static StoragePtr create(const std::string & name_, NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, size_t num_blocks_to_deduplicate_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const String & destination_database_, const String & destination_table_); + + std::string getName() const override { return "TrivialBuffer"; } + std::string getTableName() const override { return name; } + + const NamesAndTypesList & getColumnsListImpl() const override { return *columns; } + + BlockInputStreams read( + const Names & column_names, + ASTPtr query, + const Context & context, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size = DEFAULT_BLOCK_SIZE, + unsigned threads = 1) override; + + BlockOutputStreamPtr write(ASTPtr query, const Settings & settings) override; + + bool checkThresholds(const time_t current_time, const size_t additional_rows = 0, + const size_t additional_bytes = 0) const; + bool checkThresholdsImpl(const size_t rows, const size_t bytes, + const time_t time_passed) const; + + /// Сбрасывает все буферы в подчинённую таблицу. + void shutdown() override; + bool optimize(const String & partition, bool final, const Settings & settings) override; + + void rename(const String & new_path_to_db, const String & new_database_name, + const String & new_table_name) override { name = new_table_name; } + + bool supportsSampling() const override { return true; } + bool supportsPrewhere() const override { return true; } + bool supportsFinal() const override { return true; } + bool supportsIndexForIn() const override { return true; } + bool supportsParallelReplicas() const override { return true; } + + /// Структура подчинённой таблицы не проверяется и не изменяется. + void alter(const AlterCommands & params, const String & database_name, + const String & table_name, const Context & context) override; private: - String name; - NamesAndTypesListPtr columns; + String name; + NamesAndTypesListPtr columns; - Context & context; + Context & context; - std::mutex mutex; + std::mutex mutex; - BlocksList data; + BlocksList data; - size_t current_rows = 0; + size_t current_rows = 0; size_t current_bytes = 0; - time_t first_write_time = 0; - const size_t num_blocks_to_deduplicate; - using HashType = UInt64; - using DeduplicationBuffer = std::unordered_set; - /// Вставка хэшей новый блоков идет в current_hashes, lookup - в - /// обоих set'ах. Когда current_hashes переполняется, current сбрасывается - /// в previous, а в current создается новый set. - std::unique_ptr current_hashes, previous_hashes; - const Thresholds min_thresholds; - const Thresholds max_thresholds; - - const String destination_database; - const String destination_table; - /// Если задано - не записывать данные из буфера, а просто опустошать буфер. - bool no_destination; - - Poco::Logger * log; - - Poco::Event shutdown_event; - /// Выполняет сброс данных по таймауту. - std::thread flush_thread; - - TrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_, - const NamesAndTypesList & materialized_columns_, - const NamesAndTypesList & alias_columns_, - const ColumnDefaults & column_defaults_, - Context & context_, size_t num_blocks_to_deduplicate_, - const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, - const String & destination_database_, const String & destination_table_); - - void addBlock(const Block & block); - /// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен - /// соответствовать destination-у. - void writeBlockToDestination(const Block & block, StoragePtr table); - - - void flush(bool check_thresholds = true); - void flushThread(); + time_t first_write_time = 0; + const size_t num_blocks_to_deduplicate; + using HashType = UInt64; + using DeduplicationBuffer = std::unordered_set; + /// Вставка хэшей новый блоков идет в current_hashes, lookup - в + /// обоих set'ах. Когда current_hashes переполняется, current сбрасывается + /// в previous, а в current создается новый set. + std::unique_ptr current_hashes, previous_hashes; + const Thresholds min_thresholds; + const Thresholds max_thresholds; + + const String destination_database; + const String destination_table; + /// Если задано - не записывать данные из буфера, а просто опустошать буфер. + bool no_destination; + + Poco::Logger * log; + + Poco::Event shutdown_event; + /// Выполняет сброс данных по таймауту. + std::thread flush_thread; + + TrivialBuffer(const std::string & name_, NamesAndTypesListPtr columns_, + const NamesAndTypesList & materialized_columns_, + const NamesAndTypesList & alias_columns_, + const ColumnDefaults & column_defaults_, + Context & context_, size_t num_blocks_to_deduplicate_, + const Thresholds & min_thresholds_, const Thresholds & max_thresholds_, + const String & destination_database_, const String & destination_table_); + + void addBlock(const Block & block); + /// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен + /// соответствовать destination-у. + void writeBlockToDestination(const Block & block, StoragePtr table); + + + void flush(bool check_thresholds = true); + void flushThread(); }; } -- GitLab