StorageSet.cpp 6.8 KB
Newer Older
1
#include <Storages/StorageSet.h>
2
#include <Storages/StorageFactory.h>
3
#include <IO/ReadBufferFromFile.h>
P
proller 已提交
4
#include <Compression/CompressedReadBuffer.h>
5
#include <IO/WriteBufferFromFile.h>
P
proller 已提交
6
#include <Compression/CompressedWriteBuffer.h>
7 8
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
A
Alexey Milovidov 已提交
9
#include <Common/formatReadable.h>
10
#include <Common/escapeForFileName.h>
11
#include <Common/StringUtils/StringUtils.h>
12
#include <Interpreters/Set.h>
13
#include <Interpreters/Context.h>
14
#include <Poco/DirectoryIterator.h>
15 16 17 18 19


namespace DB
{

20 21 22 23 24
namespace ErrorCodes
{
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}

25

26 27 28 29 30 31
namespace ErrorCodes
{
    extern const int INCORRECT_FILE_NAME;
}


A
Alexey Milovidov 已提交
32 33 34
class SetOrJoinBlockOutputStream : public IBlockOutputStream
{
public:
35 36
    SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
        const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_);
A
Alexey Milovidov 已提交
37

38
    Block getHeader() const override { return table.getSampleBlock(); }
39 40
    void write(const Block & block) override;
    void writeSuffix() override;
A
Alexey Milovidov 已提交
41 42

private:
43 44 45 46 47 48 49
    StorageSetOrJoinBase & table;
    String backup_path;
    String backup_tmp_path;
    String backup_file_name;
    WriteBufferFromFile backup_buf;
    CompressedWriteBuffer compressed_backup_buf;
    NativeBlockOutputStream backup_stream;
A
Alexey Milovidov 已提交
50 51 52
};


53
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
54 55 56 57 58 59
    const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
    : table(table_),
    backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
    backup_file_name(backup_file_name_),
    backup_buf(backup_tmp_path + backup_file_name),
    compressed_backup_buf(backup_buf),
60
    backup_stream(compressed_backup_buf, 0, table.getSampleBlock())
61
{
62
}
63

64 65
void SetOrJoinBlockOutputStream::write(const Block & block)
{
66 67
    /// Sort columns in the block. This is necessary, since Set and Join count on the same column order in different blocks.
    Block sorted_block = block.sortColumns();
68

69 70
    table.insertBlock(sorted_block);
    backup_stream.write(sorted_block);
71
}
72

73 74
void SetOrJoinBlockOutputStream::writeSuffix()
{
75
    table.finishInsert();
76 77 78
    backup_stream.flush();
    compressed_backup_buf.next();
    backup_buf.next();
79

80
    Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
81
}
82 83


84
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
85
{
86 87
    UInt64 id = ++increment;
    return std::make_shared<SetOrJoinBlockOutputStream>(*this, path, path + "tmp/", toString(id) + ".bin");
88 89 90
}


91
StorageSetOrJoinBase::StorageSetOrJoinBase(
A
Alexander Tokmakov 已提交
92
    const String & relative_path_,
93
    const StorageID & table_id_,
A
Alexey Milovidov 已提交
94
    const ColumnsDescription & columns_,
A
Alexander Tokmakov 已提交
95 96
    const ConstraintsDescription & constraints_,
    const Context & context_)
97
    : IStorage(table_id_)
98
{
99 100 101 102 103
    StorageInMemoryMetadata metadata_;
    metadata_.setColumns(columns_);
    metadata_.setConstraints(constraints_);
    setInMemoryMetadata(metadata_);

A
Alexey Milovidov 已提交
104

A
Alexander Tokmakov 已提交
105
    if (relative_path_.empty())
106 107
        throw Exception("Join and Set storages require data path", ErrorCodes::INCORRECT_FILE_NAME);

108 109
    base_path = context_.getPath();
    path = base_path + relative_path_;
110 111 112 113
}


StorageSet::StorageSet(
A
Alexander Tokmakov 已提交
114
    const String & relative_path_,
115
    const StorageID & table_id_,
A
Alexey Milovidov 已提交
116
    const ColumnsDescription & columns_,
A
Alexander Tokmakov 已提交
117 118
    const ConstraintsDescription & constraints_,
    const Context & context_)
119
    : StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_},
120
    set(std::make_shared<Set>(SizeLimits(), false, true))
121
{
122 123 124 125
    Block header = getSampleBlock();
    header = header.sortColumns();
    set->setHeader(header);

126
    restore();
127 128 129
}


A
Alexey Milovidov 已提交
130
void StorageSet::insertBlock(const Block & block) { set->insertFromBlock(block); }
131
void StorageSet::finishInsert() { set->finishInsert(); }
Z
zhang2014 已提交
132 133
size_t StorageSet::getSize() const { return set->getTotalRowCount(); }

134

135
void StorageSet::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &)
Z
zhang2014 已提交
136
{
137 138 139 140
    Poco::File(path).remove(true);
    Poco::File(path).createDirectories();
    Poco::File(path + "tmp/").createDirectories();

141 142
    Block header = getSampleBlock();
    header = header.sortColumns();
143

Z
zhang2014 已提交
144
    increment = 0;
145
    set = std::make_shared<Set>(SizeLimits(), false, true);
146
    set->setHeader(header);
147
}
148 149


150
void StorageSetOrJoinBase::restore()
151
{
152 153 154 155 156 157 158
    Poco::File tmp_dir(path + "tmp/");
    if (!tmp_dir.exists())
    {
        tmp_dir.createDirectories();
        return;
    }

A
Alexey Milovidov 已提交
159
    static const char * file_suffix = ".bin";
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
    static const auto file_suffix_size = strlen(".bin");

    Poco::DirectoryIterator dir_end;
    for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
    {
        const auto & name = dir_it.name();

        if (dir_it->isFile()
            && endsWith(name, file_suffix)
            && dir_it->getSize() > 0)
        {
            /// Calculate the maximum number of available files with a backup to add the following files with large numbers.
            UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
            if (file_num > increment)
                increment = file_num;

            restoreFromFile(dir_it->path());
        }
    }
179 180 181
}


182
void StorageSetOrJoinBase::restoreFromFile(const String & file_path)
183
{
184 185
    ReadBufferFromFile backup_buf(file_path);
    CompressedReadBuffer compressed_backup_buf(backup_buf);
186
    NativeBlockInputStream backup_stream(compressed_backup_buf, 0);
187 188

    backup_stream.readPrefix();
189

190 191
    while (Block block = backup_stream.read())
        insertBlock(block);
192 193

    finishInsert();
194 195 196
    backup_stream.readSuffix();

    /// TODO Add speed, compressed bytes, data volume in memory, compression ratio ... Generalize all statistics logging in project.
A
Alexey Milovidov 已提交
197
    LOG_INFO(&Poco::Logger::get("StorageSetOrJoinBase"), "Loaded from backup file {}. {} rows, {}. State has {} unique rows.",
198
        file_path, backup_stream.getProfileInfo().rows, ReadableSize(backup_stream.getProfileInfo().bytes), getSize());
199 200 201
}


A
Alexander Tokmakov 已提交
202
void StorageSetOrJoinBase::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
203
{
204
    /// Rename directory with data.
205
    String new_path = base_path + new_path_to_table_data;
206
    Poco::File(path).renameTo(new_path);
207

A
Alexander Tokmakov 已提交
208
    path = new_path;
A
Alexander Tokmakov 已提交
209
    renameInMemory(new_table_id);
210 211 212
}


213 214 215 216 217 218 219 220 221
void registerStorageSet(StorageFactory & factory)
{
    factory.registerStorage("Set", [](const StorageFactory::Arguments & args)
    {
        if (!args.engine_args.empty())
            throw Exception(
                "Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
                ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

222
        return StorageSet::create(args.relative_data_path, args.table_id, args.columns, args.constraints, args.context);
223 224 225 226
    });
}


227
}