提交 2c124ea3 编写于 作者: A Alexey Milovidov

dbms: StorageJoin: development [#METR-2944].

上级 684b2e70
#pragma once
#include <DB/Storages/IStorage.h>
#include <DB/Storages/StorageSet.h>
#include <DB/Interpreters/Join.h>
......@@ -11,10 +11,8 @@ namespace DB
* При вставке в таблицу, данные будут вставлены в состояние,
* а также записаны в файл-бэкап, для восстановления после перезапуска.
* Чтение из таблицы напрямую невозможно - возможно лишь указание в правой части JOIN.
*
* NOTE: В основном, повторяет StorageSet. Можно обобщить.
*/
class StorageJoin : public IStorage
class StorageJoin : public StorageSetOrJoinBase
{
public:
static StoragePtr create(
......@@ -28,28 +26,22 @@ public:
const ColumnDefaults & column_defaults_)
{
return (new StorageJoin{
path_, name_, columns_,
path_, name_,
key_names_, kind_, strictness_,
columns_,
materialized_columns_, alias_columns_, column_defaults_})->thisPtr();
}
String getName() const override { return "Join"; }
String getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
BlockOutputStreamPtr write(ASTPtr query) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
/// Получить доступ к внутренностям.
JoinPtr & getJoin() { return join; }
private:
String path;
String name;
NamesAndTypesListPtr columns;
const Names & key_names;
ASTJoin::Kind kind; /// LEFT | INNER
ASTJoin::Strictness strictness; /// ANY | ALL
UInt64 increment = 0; /// Для имён файлов бэкапа.
JoinPtr join;
StorageJoin(
......@@ -62,9 +54,8 @@ private:
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_);
/// Восстановление из бэкапа.
void restore();
void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory);
void insertBlock(const Block & block) override { join->insertFromBlock(block); }
size_t getSize() const override { return join->getTotalRowCount(); };
};
}
#pragma once
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Storages/IStorage.h>
#include <DB/Interpreters/Set.h>
......@@ -10,25 +14,62 @@ namespace DB
/** Общая часть StorageSet и StorageJoin.
*/
class StorageSetAndJoinBase : public IStorage
class StorageSetOrJoinBase : public IStorage
{
friend class SetOrJoinBlockOutputStream;
public:
String getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
BlockOutputStreamPtr write(ASTPtr query) override;
protected:
StorageSetOrJoinBase(
const String & path_,
const String & name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_);
String path;
String name;
NamesAndTypesListPtr columns;
UInt64 increment = 0; /// Для имён файлов бэкапа.
String getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
/// Восстановление из бэкапа.
void restore();
private:
void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory);
/// Вставить блок в состояние.
virtual void insertBlock(const Block & block) = 0;
virtual size_t getSize() const = 0;
};
class SetOrJoinBlockOutputStream : public IBlockOutputStream
{
public:
SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_);
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageSetOrJoinBase & table;
String backup_path;
String backup_tmp_path;
String backup_file_name;
WriteBufferFromFile backup_buf;
CompressedWriteBuffer compressed_backup_buf;
NativeBlockOutputStream backup_stream;
};
......@@ -37,7 +78,7 @@ protected:
* а также записаны в файл-бэкап, для восстановления после перезапуска.
* Чтение из таблицы напрямую невозможно - возможно лишь указание в правой части оператора IN.
*/
class StorageSet : public IStorage
class StorageSet : public StorageSetOrJoinBase
{
public:
static StoragePtr create(
......@@ -54,23 +95,11 @@ public:
}
String getName() const override { return "Set"; }
String getTableName() const override { return name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
BlockOutputStreamPtr write(ASTPtr query) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override;
/// Получить доступ к внутренностям.
SetPtr & getSet() { return set; }
private:
String path;
String name;
NamesAndTypesListPtr columns;
UInt64 increment = 0; /// Для имён файлов бэкапа.
SetPtr set { new Set{Limits{}} };
StorageSet(
......@@ -81,9 +110,8 @@ private:
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_);
/// Восстановление из бэкапа.
void restore();
void restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory);
void insertBlock(const Block & block) override { set->insertFromBlock(block); }
size_t getSize() const override { return set->getTotalRowCount(); };
};
}
......@@ -26,6 +26,7 @@
#include <DB/Storages/StorageChunkMerger.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Storages/StorageSet.h>
#include <DB/Storages/StorageJoin.h>
namespace DB
......
#include <DB/Storages/StorageJoin.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/Common/escapeForFileName.h>
namespace DB
{
class JoinBlockOutputStream : public IBlockOutputStream
{
public:
JoinBlockOutputStream(JoinPtr & join_, const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
: join(join_),
backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
backup_file_name(backup_file_name_),
backup_buf(backup_tmp_path + backup_file_name),
compressed_backup_buf(backup_buf),
backup_stream(compressed_backup_buf)
{
}
void write(const Block & block) override
{
join->insertFromBlock(block);
backup_stream.write(block);
}
void writeSuffix() override
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
}
private:
JoinPtr join;
String backup_path;
String backup_tmp_path;
String backup_file_name;
WriteBufferFromFile backup_buf;
CompressedWriteBuffer compressed_backup_buf;
NativeBlockOutputStream backup_stream;
};
BlockOutputStreamPtr StorageJoin::write(ASTPtr query)
{
++increment;
return new JoinBlockOutputStream(join, path, path + "tmp/", toString(increment) + ".bin");
}
StorageJoin::StorageJoin(
const String & path_,
const String & name_,
const Names & key_names_,
ASTJoin::Kind kind_, ASTJoin::Strictness strictness_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_)
: StorageSetOrJoinBase{path_, name_, columns_, materialized_columns_, alias_columns_, column_defaults_},
key_names(key_names_), kind(kind_), strictness(strictness_)
{
join = new Join(key_names, key_names, Limits(), kind, strictness);
restore();
}
void StorageJoin::restore()
{
Poco::File tmp_dir(path + "tmp/");
if (!tmp_dir.exists())
{
tmp_dir.createDirectories();
return;
}
constexpr auto file_suffix = ".bin";
constexpr auto file_suffix_size = strlen(file_suffix);
DataTypeFactory data_type_factory;
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(path); dir_end != dir_it; ++dir_it)
{
const auto & name = dir_it.name();
if (dir_it->isFile()
&& name.size() > file_suffix_size
&& 0 == name.compare(name.size() - file_suffix_size, file_suffix_size, file_suffix)
&& dir_it->getSize() > 0)
{
/// Вычисляем максимальный номер имеющихся файлов с бэкапом, чтобы добавлять следующие файлы с большими номерами.
UInt64 file_num = parse<UInt64>(name.substr(0, name.size() - file_suffix_size));
if (file_num > increment)
increment = file_num;
restoreFromFile(dir_it->path(), data_type_factory);
}
}
}
void StorageJoin::restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory)
{
ReadBufferFromFile backup_buf(file_path);
CompressedReadBuffer compressed_backup_buf(backup_buf);
NativeBlockInputStream backup_stream(compressed_backup_buf, data_type_factory);
backup_stream.readPrefix();
while (Block block = backup_stream.read())
join->insertFromBlock(block);
backup_stream.readSuffix();
/// TODO Добавить скорость, сжатые байты, объём данных в памяти, коэффициент сжатия... Обобщить всё логгирование статистики в проекте.
LOG_INFO(&Logger::get("StorageJoin"), std::fixed << std::setprecision(2)
<< "Loaded from backup file " << file_path << ". "
<< backup_stream.getInfo().rows << " rows, "
<< backup_stream.getInfo().bytes / 1048576.0 << " MiB. "
<< "Join has " << join->getTotalRowCount() << " unique rows.");
}
void StorageJoin::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
/// Переименовываем директорию с данными.
String new_path = new_path_to_db + escapeForFileName(new_table_name);
Poco::File(path).renameTo(new_path);
path = new_path + "/";
name = new_table_name;
}
}
#include <DB/Storages/StorageSet.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/Common/escapeForFileName.h>
......@@ -12,53 +9,42 @@ namespace DB
{
class SetBlockOutputStream : public IBlockOutputStream
SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(StorageSetOrJoinBase & table_,
const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
: table(table_),
backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
backup_file_name(backup_file_name_),
backup_buf(backup_tmp_path + backup_file_name),
compressed_backup_buf(backup_buf),
backup_stream(compressed_backup_buf)
{
public:
SetBlockOutputStream(SetPtr & set_, const String & backup_path_, const String & backup_tmp_path_, const String & backup_file_name_)
: set(set_),
backup_path(backup_path_), backup_tmp_path(backup_tmp_path_),
backup_file_name(backup_file_name_),
backup_buf(backup_tmp_path + backup_file_name),
compressed_backup_buf(backup_buf),
backup_stream(compressed_backup_buf)
{
}
}
void write(const Block & block) override
{
set->insertFromBlock(block);
backup_stream.write(block);
}
void SetOrJoinBlockOutputStream::write(const Block & block)
{
table.insertBlock(block);
backup_stream.write(block);
}
void writeSuffix() override
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
void SetOrJoinBlockOutputStream::writeSuffix()
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
}
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
}
private:
SetPtr set;
String backup_path;
String backup_tmp_path;
String backup_file_name;
WriteBufferFromFile backup_buf;
CompressedWriteBuffer compressed_backup_buf;
NativeBlockOutputStream backup_stream;
};
BlockOutputStreamPtr StorageSet::write(ASTPtr query)
BlockOutputStreamPtr StorageSetOrJoinBase::write(ASTPtr query)
{
++increment;
return new SetBlockOutputStream(set, path, path + "tmp/", toString(increment) + ".bin");
return new SetOrJoinBlockOutputStream(*this, path, path + "tmp/", toString(increment) + ".bin");
}
StorageSet::StorageSet(
StorageSetOrJoinBase::StorageSetOrJoinBase(
const String & path_,
const String & name_,
NamesAndTypesListPtr columns_,
......@@ -67,12 +53,25 @@ StorageSet::StorageSet(
const ColumnDefaults & column_defaults_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
path(path_ + escapeForFileName(name_) + '/'), name(name_), columns(columns_)
{
}
StorageSet::StorageSet(
const String & path_,
const String & name_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_)
: StorageSetOrJoinBase{path_, name_, columns_, materialized_columns_, alias_columns_, column_defaults_}
{
restore();
}
void StorageSet::restore()
void StorageSetOrJoinBase::restore()
{
Poco::File tmp_dir(path + "tmp/");
if (!tmp_dir.exists())
......@@ -107,7 +106,7 @@ void StorageSet::restore()
}
void StorageSet::restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory)
void StorageSetOrJoinBase::restoreFromFile(const String & file_path, const DataTypeFactory & data_type_factory)
{
ReadBufferFromFile backup_buf(file_path);
CompressedReadBuffer compressed_backup_buf(backup_buf);
......@@ -115,19 +114,19 @@ void StorageSet::restoreFromFile(const String & file_path, const DataTypeFactory
backup_stream.readPrefix();
while (Block block = backup_stream.read())
set->insertFromBlock(block);
insertBlock(block);
backup_stream.readSuffix();
/// TODO Добавить скорость, сжатые байты, объём данных в памяти, коэффициент сжатия... Обобщить всё логгирование статистики в проекте.
LOG_INFO(&Logger::get("StorageSet"), std::fixed << std::setprecision(2)
LOG_INFO(&Logger::get("StorageSetOrJoinBase"), std::fixed << std::setprecision(2)
<< "Loaded from backup file " << file_path << ". "
<< backup_stream.getInfo().rows << " rows, "
<< backup_stream.getInfo().bytes / 1048576.0 << " MiB. "
<< "Set has " << set->getTotalRowCount() << " unique rows.");
<< "State has " << getSize() << " unique rows.");
}
void StorageSet::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageSetOrJoinBase::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
/// Переименовываем директорию с данными.
String new_path = new_path_to_db + escapeForFileName(new_table_name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册