From dc96e6d1cfda91167712d0b92aad6e97dd0ea640 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 31 Jan 2020 18:26:10 +0300 Subject: [PATCH] Processors support for StorageMemory reading. --- dbms/src/Storages/StorageMemory.cpp | 33 +++++++++++++++-------------- dbms/src/Storages/StorageMemory.h | 4 +++- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/dbms/src/Storages/StorageMemory.cpp b/dbms/src/Storages/StorageMemory.cpp index b2cb5682f6..597657187d 100644 --- a/dbms/src/Storages/StorageMemory.cpp +++ b/dbms/src/Storages/StorageMemory.cpp @@ -6,6 +6,8 @@ #include #include +#include +#include namespace DB @@ -17,34 +19,34 @@ namespace ErrorCodes } -class MemoryBlockInputStream : public IBlockInputStream +class MemorySource : public SourceWithProgress { public: - MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage_) - : column_names(column_names_), begin(begin_), end(end_), it(begin), storage(storage_) {} + MemorySource(Names column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage) + : SourceWithProgress(storage.getSampleBlockForColumns(column_names)) + , column_names(std::move(column_names_)), begin(begin_), end(end_), it(begin) {} String getName() const override { return "Memory"; } - Block getHeader() const override { return storage.getSampleBlockForColumns(column_names); } - protected: - Block readImpl() override + Chunk generate() override { if (it == end) { - return Block(); + return {}; } else { Block src = *it; - Block res; + Columns columns; + columns.reserve(column_names.size()); /// Add only required columns to `res`. - for (size_t i = 0, size = column_names.size(); i < size; ++i) - res.insert(src.getByName(column_names[i])); + for (const auto & name : column_names) + columns.emplace_back(src.getByName(name).column); ++it; - return res; + return Chunk(std::move(columns), src.rows()); } } private: @@ -52,7 +54,6 @@ private: BlocksList::iterator begin; BlocksList::iterator end; BlocksList::iterator it; - const StorageMemory & storage; }; @@ -82,7 +83,7 @@ StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription col } -BlockInputStreams StorageMemory::read( +Pipes StorageMemory::readWithProcessors( const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & /*context*/, @@ -99,7 +100,7 @@ BlockInputStreams StorageMemory::read( if (num_streams > size) num_streams = size; - BlockInputStreams res; + Pipes pipes; for (size_t stream = 0; stream < num_streams; ++stream) { @@ -109,10 +110,10 @@ BlockInputStreams StorageMemory::read( std::advance(begin, stream * size / num_streams); std::advance(end, (stream + 1) * size / num_streams); - res.push_back(std::make_shared(column_names, begin, end, *this)); + pipes.emplace_back(std::make_shared(column_names, begin, end, *this)); } - return res; + return pipes; } diff --git a/dbms/src/Storages/StorageMemory.h b/dbms/src/Storages/StorageMemory.h index 1e66b17606..d6a83cdc69 100644 --- a/dbms/src/Storages/StorageMemory.h +++ b/dbms/src/Storages/StorageMemory.h @@ -28,7 +28,7 @@ public: size_t getSize() const { return data.size(); } - BlockInputStreams read( + Pipes readWithProcessors( const Names & column_names, const SelectQueryInfo & query_info, const Context & context, @@ -36,6 +36,8 @@ public: size_t max_block_size, unsigned num_streams) override; + bool supportProcessorsPipeline() const override { return true; } + BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override; void drop(TableStructureWriteLockHolder &) override; -- GitLab