提交 e063ccc1 编写于 作者: A Alexey Milovidov

dbms: StorageBuffer: additions [#METR-13297].

上级 b3c060b8
......@@ -19,10 +19,10 @@ namespace DB
* При чтении, читает как из своих буферов, так и из подчинённой таблицы.
*
* Буфер представляет собой набор из num_shards блоков.
* При записи, выбирается номер блока по остатку от деления ThreadNumber на num_buckets (или один из других),
* При записи, выбирается номер блока по остатку от деления ThreadNumber на num_shards (или один из других),
* и в соответствующий блок добавляются строчки.
* При использовании блока, он блокируется некоторым mutex-ом. Если при записи, соответствующий блок уже занят
* - пробуем заблокировать следующий по кругу блок, и так не более num_buckets раз (далее блокируемся).
* - пробуем заблокировать следующий по кругу блок, и так не более num_shards раз (далее блокируемся).
* Пороги проверяются при вставке, а также, периодически, в фоновом потоке (чтобы реализовать пороги по времени).
* Пороги действуют независимо для каждого shard-а. Каждый shard может быть сброшен независимо от других.
* Если в таблицу вставляется блок, который сам по себе превышает max-пороги, то он записывается сразу в подчинённую таблицу без буферизации.
......@@ -78,8 +78,7 @@ public:
bool supportsFinal() const override { return true; }
bool supportsPrewhere() const override { return true; }
/// в подтаблицах добавлять и удалять столбы нужно вручную
/// структура подтаблиц не проверяется
/// Структура подчинённой таблицы не проверяется и не изменяется.
void alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context) override;
private:
......@@ -119,6 +118,9 @@ private:
void flushBuffer(Buffer & buffer, bool check_thresholds);
bool checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0);
/// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у.
void writeBlockToDestination(const Block & block, StoragePtr table);
Poco::Event shutdown_event;
void flushThread();
};
......
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/InterpreterAlterQuery.h>
#include <DB/Storages/StorageBuffer.h>
#include <DB/Parsers/ASTInsertQuery.h>
#include <Poco/Ext/ThreadNumber.h>
#include <statdaemons/ext/range.hpp>
namespace DB
{
......@@ -116,6 +120,24 @@ BlockInputStreams StorageBuffer::read(
}
static void appendBlock(const Block & from, Block & to)
{
size_t rows = from.rows();
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
const IColumn & col_from = *from.getByPosition(column_no).column.get();
IColumn & col_to = *to.getByPosition(column_no).column.get();
if (col_from.getName() != col_to.getName())
throw Exception("Cannot append block to another: different type of columns at index " + toString(column_no)
+ ". Block 1: " + from.dumpStructure() + ". Block 2: " + to.dumpStructure(), ErrorCodes::BLOCKS_HAS_DIFFERENT_STRUCTURE);
for (size_t row_no = 0; row_no < rows; ++row_no)
col_to.insertFrom(col_from, row_no);
}
}
class BufferBlockOutputStream : public IBlockOutputStream
{
public:
......@@ -130,13 +152,33 @@ public:
if (!rows)
return;
StoragePtr destination;
if (!storage.no_destination)
{
destination = storage.context.tryGetTable(storage.destination_database, storage.destination_table);
/// Проверяем структуру таблицы.
try
{
destination->check(block, true);
}
catch (Exception & e)
{
e.addMessage("(when looking at destination table " + storage.destination_database + "." + storage.destination_table + ")");
throw;
}
}
size_t bytes = block.bytes();
/// Если блок уже превышает максимальные ограничения, то пишем минуя буфер.
if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes)
{
LOG_TRACE(storage.log, "Writing block with " << rows << " rows, " << bytes << " bytes directly.");
writeDirect(block);
if (!storage.no_destination)
{
LOG_TRACE(storage.log, "Writing block with " << rows << " rows, " << bytes << " bytes directly.");
storage.writeBlockToDestination(block, destination);
}
return;
}
......@@ -184,34 +226,16 @@ private:
buffer.first_write_time = 0;
lock.unlock();
appendBlock(block, block_to_write);
writeDirect(block_to_write);
if (!storage.no_destination)
{
appendBlock(block, block_to_write);
storage.writeBlockToDestination(block_to_write,
storage.context.tryGetTable(storage.destination_database, storage.destination_table));
}
}
else
appendBlock(block, buffer.data);
}
void appendBlock(const Block & from, Block & to)
{
size_t rows = from.rows();
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
const IColumn & col_from = *from.getByPosition(column_no).column.get();
IColumn & col_to = *to.unsafeGetByPosition(column_no).column.get();
for (size_t row_no = 0; row_no < rows; ++row_no)
col_to.insertFrom(col_from, row_no);
}
}
void writeDirect(const Block & block)
{
auto table = storage.context.getTable(storage.destination_database, storage.destination_table);
auto dst = table->write(nullptr);
dst->writePrefix();
dst->write(block);
dst->writeSuffix();
}
};
......@@ -263,14 +287,16 @@ bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{
StoragePtr table;
Block block_to_write;
if (!no_destination)
table = context.tryGetTable(destination_database, destination_table);
time_t current_time = check_thresholds ? time(0) : 0;
/** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время.
* Под блокировкой, получаем из буфера блок, и заменяем в нём блок на новый пустой.
* Затем пытаемся записать полученный блок в подчинённую таблицу.
* Если этого не получилось - кладём данные обратно в буфер.
* Для этого также делается блокировка структуры таблицы.
* Замечание: может быть, стоит избавиться от такой сложности.
*/
{
std::lock_guard<std::mutex> lock(buffer.mutex);
......@@ -281,21 +307,99 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
buffer.first_write_time = 0;
}
if (!table)
if (no_destination)
return;
try
{
if (!no_destination)
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exists.");
writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table));
}
catch (...)
{
/// Возвращаем блок на место в буфер.
std::lock_guard<std::mutex> lock(buffer.mutex);
if (buffer.data)
{
/** Так как структура таблицы не изменилась, можно склеить два блока.
* Замечание: остаётся проблема - из-за того, что в разных попытках вставляются разные блоки,
* теряется идемпотентность вставки в ReplicatedMergeTree.
*/
appendBlock(block_to_write, buffer.data);
buffer.data.swap(block_to_write);
}
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
/// Через некоторое время будет следующая попытка записать.
throw;
}
}
void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr table)
{
if (no_destination || !block)
return;
if (!table)
{
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " doesn't exist. Block of data is discarded.");
return;
}
ASTInsertQuery * insert = new ASTInsertQuery;
ASTPtr ast_ptr = insert;
insert->database = destination_database;
insert->table = destination_table;
/** Будем вставлять столбцы, являющиеся пересечением множества столбцов таблицы-буфера и подчинённой таблицы.
* Это позволит поддержать часть случаев (но не все), когда структура таблицы не совпадает.
*/
Block structure_of_destination_table = table->getSampleBlock();
Names columns_intersection;
columns_intersection.reserve(block.columns());
for (size_t i : ext::range(0, structure_of_destination_table.columns()))
{
auto dst_col = structure_of_destination_table.unsafeGetByPosition(i);
if (block.has(dst_col.name))
{
if (block.getByName(dst_col.name).type->getName() != dst_col.type->getName())
{
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table
<< " have different type of column " << dst_col.name << ". Block of data is discarded.");
return;
}
columns_intersection.push_back(dst_col.name);
}
}
if (block_to_write)
if (columns_intersection.empty())
{
auto dst = table->write(nullptr);
dst->writePrefix();
dst->write(block_to_write);
dst->writeSuffix();
LOG_ERROR(log, "Destination table " << destination_database << "." << destination_table << " have no common columns with block in buffer. Block of data is discarded.");
return;
}
if (columns_intersection.size() != block.columns())
LOG_WARNING(log, "Not all columns from block in buffer exist in destination table "
<< destination_database << "." << destination_table << ". Some columns are discarded.");
ASTExpressionList * list_of_columns = new ASTExpressionList;
insert->columns = list_of_columns;
list_of_columns->children.reserve(columns_intersection.size());
for (const String & column : columns_intersection)
list_of_columns->children.push_back(new ASTIdentifier(StringRange(), column, ASTIdentifier::Column));
InterpreterInsertQuery interpreter{ast_ptr, context};
auto block_io = interpreter.execute();
block_io.out->writePrefix();
block_io.out->write(block);
block_io.out->writeSuffix();
}
......@@ -305,8 +409,7 @@ void StorageBuffer::flushThread()
{
try
{
for (auto & buf : buffers)
flushBuffer(buf, true);
optimize();
}
catch (...)
{
......@@ -319,6 +422,10 @@ void StorageBuffer::flushThread()
void StorageBuffer::alter(const AlterCommands & params, const String & database_name, const String & table_name, Context & context)
{
auto lock = lockStructureForAlter();
/// Чтобы не осталось блоков старой структуры.
optimize();
params.apply(*columns);
InterpreterAlterQuery::updateMetadata(database_name, table_name, *columns, context);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册