提交 442ff1d3 编写于 作者: A Alexey Milovidov

StorageBuffer: simplification; added metrics [#METR-23888].

上级 fc27e531
......@@ -29,6 +29,8 @@
M(ZooKeeperWatch) \
M(DelayedInserts) \
M(ContextLockWait) \
M(StorageBufferRows) \
M(StorageBufferBytes) \
namespace CurrentMetrics
......
......@@ -100,6 +100,8 @@
\
M(RegexpCreated) \
M(ContextLock) \
\
M(StorageBufferErrorOnFlush) \
namespace ProfileEvents
{
......
......@@ -8,11 +8,24 @@
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Common/setThreadName.h>
#include <DB/Common/CurrentMetrics.h>
#include <Poco/Ext/ThreadNumber.h>
#include <ext/range.hpp>
namespace ProfileEvents
{
extern const Event StorageBufferErrorOnFlush;
}
namespace CurrentMetrics
{
extern const Metric StorageBufferRows;
extern const Metric StorageBufferBytes;
}
namespace DB
{
......@@ -159,6 +172,11 @@ static void appendBlock(const Block & from, Block & to)
throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);
size_t rows = from.rows();
size_t bytes = from.bytes();
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, bytes);
for (size_t column_no = 0, columns = to.columns(); column_no < columns; ++column_no)
{
const IColumn & col_from = *from.getByPosition(column_no).column.get();
......@@ -178,7 +196,7 @@ class BufferBlockOutputStream : public IBlockOutputStream
public:
BufferBlockOutputStream(StorageBuffer & storage_) : storage(storage_) {}
void write(const Block & block)
void write(const Block & block) override
{
if (!block)
return;
......@@ -269,7 +287,7 @@ private:
*/
lock.unlock();
storage.flushBuffer(buffer, false);
storage.flushBuffer(buffer, true);
lock.lock();
}
......@@ -365,14 +383,6 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
size_t bytes = 0;
time_t time_passed = 0;
/** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время.
* Под блокировкой, получаем из буфера блок, и заменяем в нём блок на новый пустой.
* Затем пытаемся записать полученный блок в подчинённую таблицу.
* Если этого не получилось - кладём данные обратно в буфер.
* Замечание: может быть, стоит избавиться от такой сложности.
*
* NOTE During flush, data is not seen by SELECTs.
*/
{
std::lock_guard<std::mutex> lock(buffer.mutex);
......@@ -396,39 +406,42 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
buffer.data.swap(block_to_write);
buffer.first_write_time = 0;
}
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
if (no_destination)
return;
LOG_TRACE(log, "Flushing buffer with " << rows << " rows, " << bytes << " bytes, age " << time_passed << " seconds.");
try
{
writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table));
}
catch (...)
{
/// Возвращаем блок на место в буфер.
std::lock_guard<std::mutex> lock(buffer.mutex);
if (no_destination)
return;
if (buffer.data)
/** For simplicity, buffer is locked during write.
* We could unlock buffer temporary, but it would lead to too much difficulties:
* - data, that is written, will not be visible for SELECTs;
* - new data could be appended to buffer, and in case of exception, we must merge it with old data, that has not been written;
* - this could lead to infinite memory growth.
*/
try
{
/** Так как структура таблицы не изменилась, можно склеить два блока.
* Замечание: остаётся проблема - из-за того, что в разных попытках вставляются разные блоки,
* теряется идемпотентность вставки в ReplicatedMergeTree.
*/
appendBlock(buffer.data, block_to_write);
writeBlockToDestination(block_to_write, context.tryGetTable(destination_database, destination_table));
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::StorageBufferErrorOnFlush);
buffer.data.swap(block_to_write);
/// Возвращаем блок на место в буфер.
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, block_to_write.rows());
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
/// Через некоторое время будет следующая попытка записать.
throw;
buffer.data.swap(block_to_write);
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
/// Через некоторое время будет следующая попытка записать.
throw;
}
}
}
......
......@@ -76,9 +76,9 @@ public:
{
}
String getName() const { return "Log"; }
String getName() const override { return "Log"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Log(" << storage.getTableName() << ", " << &storage << ", " << mark_number << ", " << rows_limit;
......@@ -91,7 +91,7 @@ public:
}
protected:
Block readImpl();
Block readImpl() override;
private:
size_t block_size;
......@@ -140,7 +140,7 @@ public:
addStream(column.name, *column.type);
}
~LogBlockOutputStream()
~LogBlockOutputStream() override
{
try
{
......@@ -152,8 +152,8 @@ public:
}
}
void write(const Block & block);
void writeSuffix();
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageLog & storage;
......
......@@ -16,9 +16,9 @@ public:
MemoryBlockInputStream(const Names & column_names_, BlocksList::iterator begin_, BlocksList::iterator end_)
: column_names(column_names_), begin(begin_), end(end_), it(begin) {}
String getName() const { return "Memory"; }
String getName() const override { return "Memory"; }
String getID() const
String getID() const override
{
std::stringstream res;
res << "Memory(" << &*begin << ", " << &*end;
......@@ -31,7 +31,7 @@ public:
}
protected:
Block readImpl()
Block readImpl() override
{
if (it == end)
{
......@@ -63,7 +63,7 @@ class MemoryBlockOutputStream : public IBlockOutputStream
public:
MemoryBlockOutputStream(StorageMemory & storage_) : storage(storage_) {}
void write(const Block & block)
void write(const Block & block) override
{
storage.check(block, true);
std::lock_guard<std::mutex> lock(storage.mutex);
......
......@@ -54,12 +54,12 @@ public:
: block_size(block_size_), column_names(column_names_), column_types(column_names.size()),
storage(storage_), max_read_buffer_size(max_read_buffer_size_) {}
String getName() const { return "TinyLog"; }
String getName() const override { return "TinyLog"; }
String getID() const;
String getID() const override;
protected:
Block readImpl();
Block readImpl() override;
private:
size_t block_size;
Names column_names;
......@@ -98,7 +98,7 @@ public:
addStream(col.name, *col.type);
}
~TinyLogBlockOutputStream()
~TinyLogBlockOutputStream() override
{
try
{
......@@ -110,8 +110,8 @@ public:
}
}
void write(const Block & block);
void writeSuffix();
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageTinyLog & storage;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册