StorageMemory.cpp 3.7 KB
Newer Older
1
#include <Common/Exception.h>
A
Alexey Milovidov 已提交
2

3
#include <DataStreams/IBlockInputStream.h>
4

5
#include <Storages/StorageMemory.h>
6
#include <Storages/StorageFactory.h>
A
Alexey Milovidov 已提交
7

8 9
#include <IO/WriteHelpers.h>

A
Alexey Milovidov 已提交
10 11 12 13

namespace DB
{

14 15 16 17 18 19
namespace ErrorCodes
{
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}


20
class MemoryBlockInputStream : public IBlockInputStream
A
Alexey Milovidov 已提交
21
{
22
public:
23 24
    MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage_)
        : column_names(column_names_), begin(begin_), end(end_), it(begin), storage(storage_) {}
A
Alexey Milovidov 已提交
25

26
    String getName() const override { return "Memory"; }
A
Alexey Milovidov 已提交
27

28
    Block getHeader() const override { return storage.getSampleBlockForColumns(column_names); }
29

30
protected:
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
    Block readImpl() override
    {
        if (it == end)
        {
            return Block();
        }
        else
        {
            Block src = *it;
            Block res;

            /// Add only required columns to `res`.
            for (size_t i = 0, size = column_names.size(); i < size; ++i)
                res.insert(src.getByName(column_names[i]));

            ++it;
            return res;
        }
    }
50
private:
51 52 53 54
    Names column_names;
    BlocksList::iterator begin;
    BlocksList::iterator end;
    BlocksList::iterator it;
55
    const StorageMemory & storage;
56
};
A
Alexey Milovidov 已提交
57 58


59
class MemoryBlockOutputStream : public IBlockOutputStream
A
Alexey Milovidov 已提交
60
{
61
public:
62
    explicit MemoryBlockOutputStream(StorageMemory & storage_) : storage(storage_) {}
63

64 65
    Block getHeader() const override { return storage.getSampleBlock(); }

66 67 68
    void write(const Block & block) override
    {
        storage.check(block, true);
A
Alexey Milovidov 已提交
69
        std::lock_guard lock(storage.mutex);
70 71
        storage.data.push_back(block);
    }
72
private:
73
    StorageMemory & storage;
74
};
A
Alexey Milovidov 已提交
75 76


77 78
StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
    : IStorage(table_id_)
79
{
A
Alexey Milovidov 已提交
80 81
    setColumns(std::move(columns_description_));
    setConstraints(std::move(constraints_));
82 83 84
}


85
BlockInputStreams StorageMemory::read(
86
    const Names & column_names,
A
Alexey Milovidov 已提交
87
    const SelectQueryInfo & /*query_info*/,
88 89
    const Context & /*context*/,
    QueryProcessingStage::Enum /*processed_stage*/,
A
Alexey Milovidov 已提交
90
    size_t /*max_block_size*/,
91
    unsigned num_streams)
A
Alexey Milovidov 已提交
92
{
93
    check(column_names);
94

A
Alexey Milovidov 已提交
95
    std::lock_guard lock(mutex);
96

97
    size_t size = data.size();
98

99 100
    if (num_streams > size)
        num_streams = size;
101

102
    BlockInputStreams res;
103

104
    for (size_t stream = 0; stream < num_streams; ++stream)
105 106 107
    {
        BlocksList::iterator begin = data.begin();
        BlocksList::iterator end = data.begin();
108

109 110
        std::advance(begin, stream * size / num_streams);
        std::advance(end, (stream + 1) * size / num_streams);
111

112
        res.push_back(std::make_shared<MemoryBlockInputStream>(column_names, begin, end, *this));
113
    }
114

115
    return res;
A
Alexey Milovidov 已提交
116 117
}

118

A
Alexey Milovidov 已提交
119
BlockOutputStreamPtr StorageMemory::write(
120
    const ASTPtr & /*query*/, const Context & /*context*/)
A
Alexey Milovidov 已提交
121
{
122
    return std::make_shared<MemoryBlockOutputStream>(*this);
A
Alexey Milovidov 已提交
123 124
}

A
Alexey Milovidov 已提交
125

A
Alexander Tokmakov 已提交
126
void StorageMemory::drop()
A
Alexey Milovidov 已提交
127
{
A
Alexey Milovidov 已提交
128
    std::lock_guard lock(mutex);
129
    data.clear();
A
Alexey Milovidov 已提交
130 131
}

132
void StorageMemory::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
Z
zhang2014 已提交
133
{
A
Alexey Milovidov 已提交
134
    std::lock_guard lock(mutex);
Z
zhang2014 已提交
135 136 137
    data.clear();
}

138 139 140 141 142 143 144 145 146 147

void registerStorageMemory(StorageFactory & factory)
{
    factory.registerStorage("Memory", [](const StorageFactory::Arguments & args)
    {
        if (!args.engine_args.empty())
            throw Exception(
                "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
                ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

148
        return StorageMemory::create(args.table_id, args.columns, args.constraints);
149 150 151
    });
}

A
Alexey Milovidov 已提交
152
}