StorageMemory.cpp 5.0 KB
Newer Older
1
#include <Common/Exception.h>
A
Alexey Milovidov 已提交
2

3
#include <DataStreams/IBlockInputStream.h>
4

5
#include <Storages/StorageMemory.h>
6
#include <Storages/StorageFactory.h>
A
Alexey Milovidov 已提交
7

8
#include <IO/WriteHelpers.h>
9 10
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
11

A
Alexey Milovidov 已提交
12 13 14 15

namespace DB
{

16 17 18 19 20 21
namespace ErrorCodes
{
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}


22
class MemorySource : public SourceWithProgress
A
Alexey Milovidov 已提交
23
{
24
public:
25 26 27 28 29 30
    MemorySource(
        Names column_names_,
        BlocksList::iterator begin_,
        BlocksList::iterator end_,
        const StorageMemory & storage,
        const StorageMetadataPtr & metadata_snapshot)
A
alesapin 已提交
31
        : SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
32 33
        , column_names(std::move(column_names_))
        , begin(begin_)
34
        , end(end_) /// [begin, end]
35 36 37
        , it(begin)
    {
    }
A
Alexey Milovidov 已提交
38

39
    String getName() const override { return "Memory"; }
A
Alexey Milovidov 已提交
40

41
protected:
42
    Chunk generate() override
43
    {
N
Nikolai Kochetov 已提交
44
        if (is_finished)
45
        {
46
            return {};
47 48 49 50
        }
        else
        {
            Block src = *it;
51 52
            Columns columns;
            columns.reserve(column_names.size());
53 54

            /// Add only required columns to `res`.
55 56
            for (const auto & name : column_names)
                columns.emplace_back(src.getByName(name).column);
57

58
            if (it == end)
N
Nikolai Kochetov 已提交
59
                is_finished = true;
60 61
            else
                ++it;
62
            return Chunk(std::move(columns), src.rows());
63 64
        }
    }
65
private:
66 67 68 69
    Names column_names;
    BlocksList::iterator begin;
    BlocksList::iterator end;
    BlocksList::iterator it;
N
Nikolai Kochetov 已提交
70
    bool is_finished = false;
71
};
A
Alexey Milovidov 已提交
72 73


74
class MemoryBlockOutputStream : public IBlockOutputStream
A
Alexey Milovidov 已提交
75
{
76
public:
77 78 79 80 81 82
    explicit MemoryBlockOutputStream(
        StorageMemory & storage_,
        const StorageMetadataPtr & metadata_snapshot_)
        : storage(storage_)
        , metadata_snapshot(metadata_snapshot_)
    {}
83

84
    Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
85

86 87
    void write(const Block & block) override
    {
A
alesapin 已提交
88
        metadata_snapshot->check(block, true);
A
Alexey Milovidov 已提交
89
        std::lock_guard lock(storage.mutex);
90 91
        storage.data.push_back(block);
    }
92
private:
93
    StorageMemory & storage;
94
    StorageMetadataPtr metadata_snapshot;
95
};
A
Alexey Milovidov 已提交
96 97


98 99
StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_)
    : IStorage(table_id_)
100
{
A
alesapin 已提交
101 102 103 104
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(std::move(columns_description_));
    storage_metadata.setConstraints(std::move(constraints_));
    setInMemoryMetadata(storage_metadata);
105 106 107
}


108
Pipes StorageMemory::read(
109
    const Names & column_names,
110
    const StorageMetadataPtr & metadata_snapshot,
A
Alexey Milovidov 已提交
111
    const SelectQueryInfo & /*query_info*/,
112 113
    const Context & /*context*/,
    QueryProcessingStage::Enum /*processed_stage*/,
A
Alexey Milovidov 已提交
114
    size_t /*max_block_size*/,
115
    unsigned num_streams)
A
Alexey Milovidov 已提交
116
{
A
alesapin 已提交
117
    metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
118

A
Alexey Milovidov 已提交
119
    std::lock_guard lock(mutex);
120

121
    size_t size = data.size();
122

123 124
    if (num_streams > size)
        num_streams = size;
125

126
    Pipes pipes;
127

128
    for (size_t stream = 0; stream < num_streams; ++stream)
129 130 131
    {
        BlocksList::iterator begin = data.begin();
        BlocksList::iterator end = data.begin();
132

133 134
        std::advance(begin, stream * size / num_streams);
        std::advance(end, (stream + 1) * size / num_streams);
135

136 137 138 139 140
        if (begin == end)
            continue;
        else
            --end;

141
        pipes.emplace_back(std::make_shared<MemorySource>(column_names, begin, end, *this, metadata_snapshot));
142
    }
143

144
    return pipes;
A
Alexey Milovidov 已提交
145 146
}

147

148
BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
A
Alexey Milovidov 已提交
149
{
150
    return std::make_shared<MemoryBlockOutputStream>(*this, metadata_snapshot);
A
Alexey Milovidov 已提交
151 152
}

A
Alexey Milovidov 已提交
153

A
Alexander Tokmakov 已提交
154
void StorageMemory::drop()
A
Alexey Milovidov 已提交
155
{
A
Alexey Milovidov 已提交
156
    std::lock_guard lock(mutex);
157
    data.clear();
A
Alexey Milovidov 已提交
158 159
}

A
alesapin 已提交
160
void StorageMemory::truncate(
A
alesapin 已提交
161
    const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
Z
zhang2014 已提交
162
{
A
Alexey Milovidov 已提交
163
    std::lock_guard lock(mutex);
Z
zhang2014 已提交
164 165 166
    data.clear();
}

167 168 169 170
std::optional<UInt64> StorageMemory::totalRows() const
{
    UInt64 rows = 0;
    std::lock_guard lock(mutex);
A
Alexey Milovidov 已提交
171
    for (const auto & buffer : data)
172 173 174 175
        rows += buffer.rows();
    return rows;
}

176 177 178 179
std::optional<UInt64> StorageMemory::totalBytes() const
{
    UInt64 bytes = 0;
    std::lock_guard lock(mutex);
A
Alexey Milovidov 已提交
180
    for (const auto & buffer : data)
181
        bytes += buffer.allocatedBytes();
182 183 184
    return bytes;
}

185 186 187 188 189 190 191 192 193 194

void registerStorageMemory(StorageFactory & factory)
{
    factory.registerStorage("Memory", [](const StorageFactory::Arguments & args)
    {
        if (!args.engine_args.empty())
            throw Exception(
                "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
                ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

195
        return StorageMemory::create(args.table_id, args.columns, args.constraints);
196 197 198
    });
}

A
Alexey Milovidov 已提交
199
}