提交 a8f4ed62 编写于 作者: M Michael Kolupaev

Merge

上级 6a3043e7
......@@ -112,7 +112,7 @@ public:
};
typedef std::map<String, Checksum> FileChecksums;
FileChecksums file_checksums;
FileChecksums files;
/// Проверяет, что множество столбцов и их контрольные суммы совпадают. Если нет - бросает исключение.
void check(const Checksums & rhs) const;
......@@ -120,6 +120,11 @@ public:
/// Сериализует и десериализует в человекочитаемом виде.
void readText(ReadBuffer & in);
void writeText(WriteBuffer & out) const;
bool empty() const
{
return files.empty();
}
};
DataPart(MergeTreeData & storage_) : storage(storage_), size_in_bytes(0) {}
......
......@@ -2,6 +2,7 @@
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Common/escapeForFileName.h>
......@@ -22,26 +23,41 @@ protected:
typedef std::set<std::string> OffsetColumns;
struct ColumnStream
{
ColumnStream(const String & data_path, const std::string & marks_path) :
plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY),
compressed(plain),
marks(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY) {}
ColumnStream(const String & escaped_column_name_, const String & data_path, const std::string & marks_path) :
escaped_column_name(escaped_column_name_),
plain_file(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY),
compressed_buf(plain_file),
marks_file(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY),
compressed(compressed_buf), marks(marks_file) {}
WriteBufferFromFile plain;
CompressedWriteBuffer compressed;
WriteBufferFromFile marks;
String escaped_column_name;
WriteBufferFromFile plain_file;
CompressedWriteBuffer compressed_buf;
WriteBufferFromFile marks_file;
HashingWriteBuffer compressed;
HashingWriteBuffer marks;
void finalize()
{
compressed.next();
plain.next();
plain_file.next();
marks.next();
}
void sync()
{
plain.sync();
marks.sync();
plain_file.sync();
marks_file.sync();
}
void addToChecksums(MergeTreeData::DataPart::Checksums & checksums, String name = "")
{
if (name == "")
name = escaped_column_name;
checksums.files[name + ".bin"].size = compressed.count();
checksums.files[name + ".bin"].hash = compressed.getHash();
checksums.files[name + ".mrk"].size = marks.count();
checksums.files[name + ".mrk"].hash = marks.getHash();
}
};
......@@ -64,6 +80,7 @@ protected:
+ ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
column_streams[size_name] = new ColumnStream(
escaped_size_name,
path + escaped_size_name + ".bin",
path + escaped_size_name + ".mrk");
......@@ -71,6 +88,7 @@ protected:
}
else
column_streams[name] = new ColumnStream(
escaped_column_name,
path + escaped_column_name + ".bin",
path + escaped_column_name + ".mrk");
}
......@@ -106,7 +124,7 @@ protected:
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain.count(), stream.marks);
writeIntBinary(stream.plain_file.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
......@@ -133,7 +151,7 @@ protected:
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain.count(), stream.marks);
writeIntBinary(stream.plain_file.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
......@@ -162,7 +180,8 @@ public:
{
Poco::File(part_path).createDirectories();
index_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_file_stream = new WriteBufferFromFile(part_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
index_stream = new HashingWriteBuffer(*index_file_stream);
columns_list = storage.getColumnsList();
for (const auto & it : columns_list)
......@@ -207,23 +226,44 @@ public:
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;
index_offset = (storage.index_granularity - written_for_last_mark) % storage.index_granularity;
}
void writeSuffix() override
{
throw Exception("Method writeSuffix is not supported by MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
void writeSuffix()
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
{
/// Заканчиваем запись.
/// Заканчиваем запись и достаем чексуммы.
MergeTreeData::DataPart::Checksums checksums;
index_stream->next();
index_stream = NULL;
checksums.files["primary.idx"].size = index_stream->count();
checksums.files["primary.idx"].hash = index_stream->getHash();
for (ColumnStreams::iterator it = column_streams.begin(); it != column_streams.end(); ++it)
{
it->second->finalize();
it->second->addToChecksums(checksums);
}
index_stream = NULL;
column_streams.clear();
if (marks_count == 0)
{
/// Кусок пустой - все записи удалились.
Poco::File(part_path).remove(true);
checksums.files.clear();
}
else
{
/// Записываем файл с чексуммами.
WriteBufferFromFile out(part_path + "checksums.txt", 1024);
checksums.writeText(out);
}
return checksums;
}
MergeTreeData::DataPart::Index & getIndex()
......@@ -243,7 +283,8 @@ private:
size_t marks_count;
SharedPtr<WriteBufferFromFile> index_stream;
SharedPtr<WriteBufferFromFile> index_file_stream;
SharedPtr<HashingWriteBuffer> index_stream;
MergeTreeData::DataPart::Index index_vec;
};
......@@ -284,19 +325,30 @@ public:
index_offset = (storage.index_granularity - written_for_last_mark) % storage.index_granularity;
}
void writeSuffix()
void writeSuffix() override
{
throw Exception("Method writeSuffix is not supported by MergedColumnOnlyOutputStream", ErrorCodes::NOT_IMPLEMENTED);
}
MergeTreeData::DataPart::Checksums writeSuffixAndGetChecksums()
{
MergeTreeData::DataPart::Checksums checksums;
for (auto & column_stream : column_streams)
{
column_stream.second->finalize();
if (sync)
column_stream.second->sync();
std::string column = escapeForFileName(column_stream.first);
column_stream.second->addToChecksums(checksums, column);
Poco::File(part_path + prefix + column + ".bin").renameTo(part_path + column + ".bin");
Poco::File(part_path + prefix + column + ".mrk").renameTo(part_path + column + ".mrk");
}
column_streams.clear();
initialized = false;
return checksums;
}
private:
......
......@@ -9,6 +9,7 @@
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
......@@ -366,26 +367,15 @@ void MergeTreeData::removeColumnFiles(String column_name)
void MergeTreeData::createConvertExpression(const String & in_column_name, const String & out_type, ExpressionActionsPtr & out_expression, String & out_column)
{
ASTFunction * function = new ASTFunction;
ASTPtr function_ptr = function;
Names out_names;
out_expression = new ExpressionActions(
NamesAndTypesList(1, NameAndTypePair(in_column_name, getDataTypeByName(in_column_name))), context.getSettingsRef());
ASTExpressionList * arguments = new ASTExpressionList;
ASTPtr arguments_ptr = arguments;
FunctionPtr function = context.getFunctionFactory().get("to" + out_type, context);
out_expression->add(ExpressionActions::Action::applyFunction(function, Names(1, in_column_name)), out_names);
out_expression->add(ExpressionActions::Action::removeColumn(in_column_name));
function->name = "to" + out_type;
function->arguments = arguments_ptr;
function->children.push_back(arguments_ptr);
ASTIdentifier * in_column = new ASTIdentifier;
ASTPtr in_column_ptr = in_column;
arguments->children.push_back(in_column_ptr);
in_column->name = in_column_name;
in_column->kind = ASTIdentifier::Column;
out_expression = ExpressionAnalyzer(function_ptr, context, *columns).getActions(false);
out_column = function->getColumnName();
out_column = out_names[0];
}
static DataTypePtr getDataTypeByName(const String & name, const NamesAndTypesList & columns)
......@@ -445,18 +435,29 @@ void MergeTreeData::prepareAlterModify(const ASTAlterQuery::Parameters & params)
ExpressionBlockInputStream in(new MergeTreeBlockInputStream(full_path + part->name + '/',
DEFAULT_MERGE_BLOCK_SIZE, column_name, *this, part, ranges, false, NULL, ""), expr);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true);
in.readPrefix();
out.writePrefix();
try
{
while(DB::Block b = in.read())
{
/// оставляем только столбец с результатом
b.erase(0);
out.write(b);
in.readSuffix();
DataPart::Checksums add_checksums = out.writeSuffixAndGetChecksums();
/// Запишем обновленные контрольные суммы во временный файл.
if (!part->checksums.empty())
{
DataPart::Checksums new_checksums = part->checksums;
std::string escaped_name = escapeForFileName(name_type.name);
std::string escaped_out_column = escapeForFileName(out_column);
new_checksums.files[escaped_name + ".bin"] = add_checksums.files[escaped_out_column + ".bin"];
new_checksums.files[escaped_name + ".mrk"] = add_checksums.files[escaped_out_column + ".mrk"];
WriteBufferFromFile checksums_file(full_path + part->name + '/' + escaped_out_column + ".checksums.txt", 1024);
new_checksums.writeText(checksums_file);
}
LOG_TRACE(log, "Write Suffix");
out.writeSuffix();
}
catch (const Exception & e)
{
......@@ -486,40 +487,63 @@ void MergeTreeData::commitAlterModify(const ASTAlterQuery::Parameters & params)
/// переименовываем старые столбцы, добавляя расширение .old
for (DataPartPtr & part : parts)
{
std::string path = full_path + part->name + '/' + escapeForFileName(name_type.name);
std::string part_path = full_path + part->name + '/';
std::string path = part_path + escapeForFileName(name_type.name);
if (Poco::File(path + ".bin").exists())
{
LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << path + ".bin" + ".old");
Poco::File(path + ".bin").renameTo(path + ".bin" + ".old");
LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << path + ".mrk" + ".old");
Poco::File(path + ".mrk").renameTo(path + ".mrk" + ".old");
if (Poco::File(part_path + "checksums.txt").exists())
{
LOG_TRACE(log, "Renaming " << part_path + "checksums.txt" << " to " << part_path + "checksums.txt" + ".old");
Poco::File(part_path + "checksums.txt").renameTo(part_path + "checksums.txt" + ".old");
}
}
}
/// переименовываем временные столбцы
for (DataPartPtr & part : parts)
{
std::string path = full_path + part->name + '/' + escapeForFileName(out_column);
std::string new_path = full_path + part->name + '/' + escapeForFileName(name_type.name);
std::string part_path = full_path + part->name + '/';
std::string name = escapeForFileName(out_column);
std::string new_name = escapeForFileName(name_type.name);
std::string path = part_path + name;
std::string new_path = part_path + new_name;
if (Poco::File(path + ".bin").exists())
{
LOG_TRACE(log, "Renaming " << path + ".bin" << " to " << new_path + ".bin");
Poco::File(path + ".bin").renameTo(new_path + ".bin");
LOG_TRACE(log, "Renaming " << path + ".mrk" << " to " << new_path + ".mrk");
Poco::File(path + ".mrk").renameTo(new_path + ".mrk");
if (Poco::File(path + ".checksums.txt").exists())
{
LOG_TRACE(log, "Renaming " << path + ".checksums.txt" << " to " << part_path + ".checksums.txt");
Poco::File(path + ".checksums.txt").renameTo(part_path + "checksums.txt");
}
}
}
// удаляем старые столбцы
for (DataPartPtr & part : parts)
{
std::string path = full_path + part->name + '/' + escapeForFileName(name_type.name);
std::string part_path = full_path + part->name + '/';
std::string path = part_path + escapeForFileName(name_type.name);
if (Poco::File(path + ".bin" + ".old").exists())
{
LOG_TRACE(log, "Removing old column " << path + ".bin" + ".old");
Poco::File(path + ".bin" + ".old").remove();
LOG_TRACE(log, "Removing old column " << path + ".mrk" + ".old");
Poco::File(path + ".mrk" + ".old").remove();
if (Poco::File(part_path + "checksums.txt" + ".old").exists())
{
LOG_TRACE(log, "Removing old checksums " << part_path + "checksums.txt" + ".old");
Poco::File(part_path + "checksums.txt" + ".old").remove();
}
}
}
......@@ -691,20 +715,20 @@ MergeTreeData::DataParts MergeTreeData::getDataParts()
void MergeTreeData::DataPart::Checksums::check(const Checksums & rhs) const
{
for (const auto & it : rhs.file_checksums)
for (const auto & it : rhs.files)
{
const String & name = it.first;
if (!file_checksums.count(name))
if (!files.count(name))
throw Exception("Unexpected file " + name + " in data part", ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART);
}
for (const auto & it : file_checksums)
for (const auto & it : files)
{
const String & name = it.first;
auto jt = rhs.file_checksums.find(name);
if (jt == rhs.file_checksums.end())
auto jt = rhs.files.find(name);
if (jt == rhs.files.end())
throw Exception("No file " + name + " in data part", ErrorCodes::NO_FILE_IN_DATA_PART);
const Checksum & expected = it.second;
......@@ -720,7 +744,7 @@ void MergeTreeData::DataPart::Checksums::check(const Checksums & rhs) const
void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
{
file_checksums.clear();
files.clear();
size_t count;
DB::assertString("checksums format version: 1\n", in);
......@@ -741,17 +765,17 @@ void MergeTreeData::DataPart::Checksums::readText(ReadBuffer & in)
DB::readText(sum.hash.second, in);
DB::assertString("\n", in);
file_checksums.insert(std::make_pair(name, sum));
files.insert(std::make_pair(name, sum));
}
}
void MergeTreeData::DataPart::Checksums::writeText(WriteBuffer & out) const
{
DB::writeString("checksums format version: 1\n", out);
DB::writeText(file_checksums.size(), out);
DB::writeText(files.size(), out);
DB::writeString(" files:\n", out);
for (const auto & it : file_checksums)
for (const auto & it : files)
{
DB::writeString(it.first, out);
DB::writeString("\n\tsize: ", out);
......
......@@ -311,7 +311,7 @@ String MergeTreeDataMerger::mergeParts(const MergeTreeData::DataPartsVector & pa
}
merged_stream->readSuffix();
to->writeSuffix();
new_data_part->checksums = to->writeSuffixAndGetChecksums();
new_data_part->index.swap(to->getIndex());
......
......@@ -106,7 +106,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
out.writePrefix();
out.write(block);
out.writeSuffix();
MergeTreeData::DataPart::Checksums checksums = out.writeSuffixAndGetChecksums();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data);
new_data_part->left_date = DayNum_t(min_date);
......@@ -120,6 +120,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithDa
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
new_data_part->index.swap(out.getIndex());
new_data_part->checksums = checksums;
return new_data_part;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册