提交 db8d82f1 编写于 作者: A Alexey Milovidov

dbms: Buffer: better [#METR-19249].

上级 e4738347
......@@ -128,7 +128,8 @@ private:
void flushAllBuffers(bool check_thresholds = true);
/// Сбросить буфер. Если выставлено check_thresholds - сбрасывает только если превышены пороги.
void flushBuffer(Buffer & buffer, bool check_thresholds);
bool checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
/// Аргумент table передаётся, так как иногда вычисляется заранее. Он должен соответствовать destination-у.
void writeBlockToDestination(const Block & block, StoragePtr table);
......
......@@ -238,6 +238,8 @@ private:
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock)
{
time_t current_time = time(0);
/// Сортируем столбцы в блоке. Это нужно, чтобы было проще потом конкатенировать блоки.
Block sorted_block = block.sortColumns();
......@@ -245,7 +247,7 @@ private:
{
buffer.data = sorted_block.cloneEmpty();
}
else if (storage.checkThresholds(buffer, time(0), sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
else if (storage.checkThresholds(buffer, current_time, sorted_block.rowsInFirstColumn(), sorted_block.bytes()))
{
/** Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
* Это также защищает от неограниченного потребления оперативки, так как в случае невозможности записать в таблицу,
......@@ -258,7 +260,7 @@ private:
}
if (!buffer.first_write_time)
buffer.first_write_time = time(0);
buffer.first_write_time = current_time;
appendBlock(sorted_block, buffer.data);
}
......@@ -297,7 +299,7 @@ bool StorageBuffer::optimize(const Settings & settings)
}
bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes)
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
time_t time_passed = 0;
if (buffer.first_write_time)
......@@ -306,6 +308,12 @@ bool StorageBuffer::checkThresholds(Buffer & buffer, time_t current_time, size_t
size_t rows = buffer.data.rowsInFirstColumn() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes;
return checkThresholdsImpl(rows, bytes, time_passed);
}
bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
{
return
(time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
|| (time_passed > max_thresholds.time || rows > max_thresholds.rows || bytes > max_thresholds.bytes);
......@@ -322,7 +330,7 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds)
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
{
Block block_to_write = buffer.data.cloneEmpty();
time_t current_time = check_thresholds ? time(0) : 0;
time_t current_time = time(0);
size_t rows = 0;
size_t bytes = 0;
......@@ -344,7 +352,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
if (check_thresholds)
{
if (!checkThresholds(buffer, current_time))
if (!checkThresholdsImpl(rows, bytes, time_passed))
return;
}
else
......
......@@ -5,7 +5,7 @@ clickhouse-client -n --query="
DROP TABLE IF EXISTS test.buffer;
CREATE TABLE test.dst (x UInt64, d Date DEFAULT today()) ENGINE = MergeTree(d, x, 8192);
CREATE TABLE test.buffer (x UInt64, d Date DEFAULT today()) ENGINE = Buffer(test, dst, 16, 1, 100, 10000, 10, 1000, 100000);
CREATE TABLE test.buffer (x UInt64, d Date DEFAULT today()) ENGINE = Buffer(test, dst, 16, 1, 10, 100, 1000, 10000, 100000);
";
seq 1 1000 | sed -r -e 's/^(.+)$/INSERT INTO test.buffer (x) VALUES (\1);/' | clickhouse-client -n &
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册