StorageSet.cpp 8.0 KB
Newer Older
1
#include <Storages/StorageSet.h>
2
#include <Storages/StorageFactory.h>
3
#include <IO/ReadBufferFromFile.h>
P
proller 已提交
4
#include <Compression/CompressedReadBuffer.h>
5
#include <IO/WriteBufferFromFile.h>
P
proller 已提交
6
#include <Compression/CompressedWriteBuffer.h>
7 8
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
A
Alexey Milovidov 已提交
9
#include <Common/formatReadable.h>
10
#include <Common/escapeForFileName.h>
11
#include <Common/StringUtils/StringUtils.h>
12
#include <Interpreters/Set.h>
13
#include <Interpreters/Context.h>
14
#include <Poco/DirectoryIterator.h>
V
Vxider 已提交
15 16
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
17 18 19 20 21


namespace DB
{

22 23 24 25 26
namespace ErrorCodes
{
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}

27

28 29 30 31 32 33
namespace ErrorCodes
{
    extern const int INCORRECT_FILE_NAME;
}


A
Alexey Milovidov 已提交
34 35 36
class SetOrJoinBlockOutputStream : public IBlockOutputStream
{
public:
37 38 39
    SetOrJoinBlockOutputStream(
        StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
        const String & backup_path_, const String & backup_tmp_path_,
V
Vxider 已提交
40
        const String & backup_file_name_, bool persistent_);
A
Alexey Milovidov 已提交
41

42
    Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
43 44
    void write(const Block & block) override;
    void writeSuffix() override;
A
Alexey Milovidov 已提交
45 46

private:
47
    StorageSetOrJoinBase & table;
48
    StorageMetadataPtr metadata_snapshot;
49 50 51 52 53 54
    String backup_path;
    String backup_tmp_path;
    String backup_file_name;
    WriteBufferFromFile backup_buf;
    CompressedWriteBuffer compressed_backup_buf;
    NativeBlockOutputStream backup_stream;
V
Vxider 已提交
55
    bool persistent;
A
Alexey Milovidov 已提交
56 57 58
};


59 60 61 62 63
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
    StorageSetOrJoinBase & table_,
    const StorageMetadataPtr & metadata_snapshot_,
    const String & backup_path_,
    const String & backup_tmp_path_,
V
Vxider 已提交
64
    const String & backup_file_name_,
V
Vxider 已提交
65
    bool persistent_)
66 67 68 69 70 71 72 73
    : table(table_)
    , metadata_snapshot(metadata_snapshot_)
    , backup_path(backup_path_)
    , backup_tmp_path(backup_tmp_path_)
    , backup_file_name(backup_file_name_)
    , backup_buf(backup_tmp_path + backup_file_name)
    , compressed_backup_buf(backup_buf)
    , backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock())
V
Vxider 已提交
74
    , persistent(persistent_)
75
{
76
}
77

78 79
void SetOrJoinBlockOutputStream::write(const Block & block)
{
80 81
    /// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
    Block sorted_block = block.sortColumns();
82

83
    table.insertBlock(sorted_block);
V
Vxider 已提交
84
    if (persistent)
V
Vxider 已提交
85
        backup_stream.write(sorted_block);
86
}
87

88 89
void SetOrJoinBlockOutputStream::writeSuffix()
{
90
    table.finishInsert();
V
Vxider 已提交
91
    if (persistent)
V
Vxider 已提交
92 93 94 95
    {
        backup_stream.flush();
        compressed_backup_buf.next();
        backup_buf.next();
96

V
Vxider 已提交
97 98
        Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
    }
99
}
100 101


102
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
103
{
104
    UInt64 id = ++increment;
V
Vxider 已提交
105
    return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", persistent);
106 107 108
}


109
StorageSetOrJoinBase::StorageSetOrJoinBase(
A
Alexander Tokmakov 已提交
110
    const String & relative_path_,
111
    const StorageID & table_id_,
A
Alexey Milovidov 已提交
112
    const ColumnsDescription & columns_,
A
Alexander Tokmakov 已提交
113
    const ConstraintsDescription & constraints_,
V
Vxider 已提交
114
    const Context & context_,
V
Vxider 已提交
115
    bool persistent_)
V
Vxider 已提交
116
    : IStorage(table_id_),
V
Vxider 已提交
117
    persistent(persistent_)
118
{
A
alesapin 已提交
119 120 121 122
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(columns_);
    storage_metadata.setConstraints(constraints_);
    setInMemoryMetadata(storage_metadata);
123

A
Alexey Milovidov 已提交
124

A
Alexander Tokmakov 已提交
125
    if (relative_path_.empty())
126 127
        throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);

128 129
    base_path = context_.getPath();
    path = base_path + relative_path_;
130 131 132 133
}


StorageSet::StorageSet(
A
Alexander Tokmakov 已提交
134
    const String & relative_path_,
135
    const StorageID & table_id_,
A
Alexey Milovidov 已提交
136
    const ColumnsDescription & columns_,
A
Alexander Tokmakov 已提交
137
    const ConstraintsDescription & constraints_,
V
Vxider 已提交
138
    const Context & context_,
V
Vxider 已提交
139 140
    bool persistent_)
    : StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_, persistent_},
141
    set(std::make_shared<Set>(SizeLimits(), false, true))
142
{
143 144

    Block header = getInMemoryMetadataPtr()->getSampleBlock();
145 146 147
    header = header.sortColumns();
    set->setHeader(header);

148
    restore();
149 150 151
}


A
Alexey Milovidov 已提交
152
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); }
153
void StorageSet::finishInsert() { set->finishInsert(); }
Z
zhang2014 已提交
154

155
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }
156 157
std::optional<UInt64> StorageSet::totalRows(const Settings &) const { return set->getTotalRowCount(); }
std::optional<UInt64> StorageSet::totalBytes(const Settings &) const { return set->getTotalByteCount(); }
158

A
alesapin 已提交
159
void StorageSet::truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &)
Z
zhang2014 已提交
160
{
161 162 163 164
    Poco::File(path).remove(true);
    Poco::File(path).createDirectories();
    Poco::File(path + "tmp/").createDirectories();

165
    Block header = metadata_snapshot->getSampleBlock();
166
    header = header.sortColumns();
167

Z
zhang2014 已提交
168
    increment = 0;
169
    set = std::make_shared<Set>(SizeLimits(), false, true);
170
    set->setHeader(header);
171
}
172 173


174
void StorageSetOrJoinBase::restore()
175
{
176 177 178 179 180 181 182
    Poco::File tmp_dir(path + "tmp/");
    if (!tmp_dir.exists())
    {
        tmp_dir.createDirectories();
        return;
    }

A
Alexey Milovidov 已提交
183
    static const char * file_suffix = ".bin";
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
    static const auto file_suffix_size = strlen(".bin");

    Poco::DirectoryIterator dir_end;
    for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
    {
        const auto & name = dir_it.name();

        if (dir_it->isFile()
            && endsWith(name, file_suffix)
            && dir_it->getSize() > 0)
        {
            /// Calculate the maximum number of available files with a backup to add the following files with large numbers.
            UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
            if (file_num > increment)
                increment = file_num;

            restoreFromFile(dir_it->path());
        }
    }
203 204 205
}


206
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
207
{
208 209
    ReadBufferFromFile backup_buf(file_path);
    CompressedReadBuffer compressed_backup_buf(backup_buf);
210
    NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
211 212

    backup_stream.readPrefix();
213

214 215
    while (Block block = backup_stream.read())
        insertBlock(block);
216 217

    finishInsert();
218 219 220
    backup_stream.readSuffix();

    /// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
A
Alexey Milovidov 已提交
221
    LOG_INFO(&Poco::Logger::get("StorageSetOrJoinBase"), "Loaded from backup file {}. {} rows, {}. State has {} unique rows.",
222
        file_path, backup_stream.getProfileInfo().rows, ReadableSize(backup_stream.getProfileInfo().bytes), getSize());
223 224 225
}


A
Alexander Tokmakov 已提交
226
void StorageSetOrJoinBase::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
227
{
228
    /// Rename directory with data.
229
    String new_path = base_path + new_path_to_table_data;
230
    Poco::File(path).renameTo(new_path);
231

A
Alexander Tokmakov 已提交
232
    path = new_path;
A
Alexander Tokmakov 已提交
233
    renameInMemory(new_table_id);
234 235 236
}


237 238 239 240 241 242 243 244 245
void registerStorageSet(StorageFactory & factory)
{
    factory.registerStorage("Set", [](const StorageFactory::Arguments & args)
    {
        if (!args.engine_args.empty())
            throw Exception(
                "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
                ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

V
Vxider 已提交
246
        bool has_settings = args.storage_def->settings;
V
Vxider 已提交
247

V
Vxider 已提交
248 249
        auto set_settings = std::make_unique<SetSettings>();
        if (has_settings)
V
Vxider 已提交
250
        {
V
Vxider 已提交
251
            set_settings->loadFromQuery(*args.storage_def);
V
Vxider 已提交
252 253
        }

V
Vxider 已提交
254
        return StorageSet::create(args.relative_data_path, args.table_id, args.columns, args.constraints, args.context, set_settings->persistent);
V
Vxider 已提交
255
    }, StorageFactory::StorageFeatures{ .supports_settings = true, });
256 257 258
}


259
}