提交 dc96e6d1 编写于 作者: N Nikolai Kochetov

Processors support for StorageMemory reading.

上级 fc783971
......@@ -6,6 +6,8 @@
#include <Storages/StorageFactory.h>
#include <IO/WriteHelpers.h>
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
namespace DB
......@@ -17,34 +19,34 @@ namespace ErrorCodes
}
class MemoryBlockInputStream : public IBlockInputStream
class MemorySource : public SourceWithProgress
{
public:
MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage_)
: column_names(column_names_), begin(begin_), end(end_), it(begin), storage(storage_) {}
MemorySource(Names column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage)
: SourceWithProgress(storage.getSampleBlockForColumns(column_names))
, column_names(std::move(column_names_)), begin(begin_), end(end_), it(begin) {}
String getName() const override { return "Memory"; }
Block getHeader() const override { return storage.getSampleBlockForColumns(column_names); }
protected:
Block readImpl() override
Chunk generate() override
{
if (it == end)
{
return Block();
return {};
}
else
{
Block src = *it;
Block res;
Columns columns;
columns.reserve(column_names.size());
/// Add only required columns to `res`.
for (size_t i = 0, size = column_names.size(); i < size; ++i)
res.insert(src.getByName(column_names[i]));
for (const auto & name : column_names)
columns.emplace_back(src.getByName(name).column);
++it;
return res;
return Chunk(std::move(columns), src.rows());
}
}
private:
......@@ -52,7 +54,6 @@ private:
BlocksList::iterator begin;
BlocksList::iterator end;
BlocksList::iterator it;
const StorageMemory & storage;
};
......@@ -82,7 +83,7 @@ StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription col
}
BlockInputStreams StorageMemory::read(
Pipes StorageMemory::readWithProcessors(
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
......@@ -99,7 +100,7 @@ BlockInputStreams StorageMemory::read(
if (num_streams > size)
num_streams = size;
BlockInputStreams res;
Pipes pipes;
for (size_t stream = 0; stream < num_streams; ++stream)
{
......@@ -109,10 +110,10 @@ BlockInputStreams StorageMemory::read(
std::advance(begin, stream * size / num_streams);
std::advance(end, (stream + 1) * size / num_streams);
res.push_back(std::make_shared<MemoryBlockInputStream>(column_names, begin, end, *this));
pipes.emplace_back(std::make_shared<MemorySource>(column_names, begin, end, *this));
}
return res;
return pipes;
}
......
......@@ -28,7 +28,7 @@ public:
size_t getSize() const { return data.size(); }
BlockInputStreams read(
Pipes readWithProcessors(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
......@@ -36,6 +36,8 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportProcessorsPipeline() const override { return true; }
BlockOutputStreamPtr write(const ASTPtr & query, const Context & context) override;
void drop(TableStructureWriteLockHolder &) override;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册