提交 32a45d0d 编写于 作者: A Azat Khuzhin

Implement lifetime_rows/lifetime_bytes for Buffer engine

Buffer engine is usually used on INSERTs, but right now there is no way
to track number of INSERTed rows per-table, since only summary metrics
exists:
- StorageBufferRows
- StorageBufferBytes

But it can be pretty useful to track INSERTed rows rate (and it can be
exposed via http_handlers for i.e. prometheus)
上级 433fdffc
......@@ -46,8 +46,8 @@ This table contains the following columns (the column type is shown in brackets)
- If the table stores data on disk, returns used space on disk (i.e. compressed).
- If the table stores data in memory, returns approximated number of used bytes in memory.
- `lifetime_rows` (Nullable(UInt64)) - Total number of rows INSERTed since server start.
- `lifetime_rows` (Nullable(UInt64)) - Total number of rows INSERTed since server start (only for `Buffer` tables).
- `lifetime_bytes` (Nullable(UInt64)) - Total number of bytes INSERTed since server start.
- `lifetime_bytes` (Nullable(UInt64)) - Total number of bytes INSERTed since server start (only for `Buffer` tables).
The `system.tables` table is used in `SHOW TABLES` query implementation.
......@@ -279,7 +279,7 @@ Pipes StorageBuffer::read(
}
static void appendBlock(const Block & from, Block & to)
static void appendBlock(StorageBuffer::LifeTimeWrites &writes, const Block & from, Block & to)
{
if (!to)
throw Exception("Cannot append to empty block", ErrorCodes::LOGICAL_ERROR);
......@@ -295,6 +295,9 @@ static void appendBlock(const Block & from, Block & to)
CurrentMetrics::add(CurrentMetrics::StorageBufferRows, rows);
CurrentMetrics::add(CurrentMetrics::StorageBufferBytes, bytes);
writes.rows += rows;
writes.bytes += bytes;
size_t old_rows = to.rows();
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
......@@ -446,7 +449,7 @@ private:
if (!buffer.first_write_time)
buffer.first_write_time = current_time;
appendBlock(sorted_block, buffer.data);
appendBlock(storage.writes, sorted_block, buffer.data);
}
};
......
#pragma once
#include <mutex>
#include <atomic>
#include <thread>
#include <ext/shared_ptr_helper.h>
#include <Core/NamesAndTypes.h>
......@@ -50,6 +51,12 @@ public:
size_t rows; /// The number of rows in the block.
size_t bytes; /// The number of (uncompressed) bytes in the block.
};
/// Lifetime
struct LifeTimeWrites
{
std::atomic<size_t> rows = 0;
std::atomic<size_t> bytes = 0;
};
std::string getName() const override { return "Buffer"; }
......@@ -94,6 +101,10 @@ public:
std::optional<UInt64> totalRows() const override;
std::optional<UInt64> totalBytes() const override;
std::optional<UInt64> lifetimeRows() const override { return writes.rows; }
std::optional<UInt64> lifetimeBytes() const override { return writes.bytes; }
private:
Context global_context;
......@@ -114,6 +125,8 @@ private:
StorageID destination_id;
bool allow_materialized;
LifeTimeWrites writes;
Poco::Logger * log;
void flushAllBuffers(bool check_thresholds = true);
......
......@@ -33,3 +33,8 @@ Check total_bytes/total_rows for Memory
Check total_bytes/total_rows for Buffer
0 0
256 50
Check lifetime_bytes/lifetime_rows for Buffer
100 50
100 50
200 100
200 100
......@@ -84,6 +84,8 @@ SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tab
DROP TABLE check_system_tables;
SELECT 'Check total_bytes/total_rows for Buffer';
DROP TABLE IF EXISTS check_system_tables;
DROP TABLE IF EXISTS check_system_tables_null;
CREATE TABLE check_system_tables_null (key UInt16) ENGINE = Null();
CREATE TABLE check_system_tables (key UInt16) ENGINE = Buffer(
currentDatabase(),
......@@ -96,5 +98,14 @@ CREATE TABLE check_system_tables (key UInt16) ENGINE = Buffer(
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
INSERT INTO check_system_tables SELECT * FROM numbers_mt(50);
SELECT total_bytes, total_rows FROM system.tables WHERE name = 'check_system_tables';
SELECT 'Check lifetime_bytes/lifetime_rows for Buffer';
SELECT lifetime_bytes, lifetime_rows FROM system.tables WHERE name = 'check_system_tables';
OPTIMIZE TABLE check_system_tables; -- flush
SELECT lifetime_bytes, lifetime_rows FROM system.tables WHERE name = 'check_system_tables';
INSERT INTO check_system_tables SELECT * FROM numbers_mt(50);
SELECT lifetime_bytes, lifetime_rows FROM system.tables WHERE name = 'check_system_tables';
OPTIMIZE TABLE check_system_tables; -- flush
SELECT lifetime_bytes, lifetime_rows FROM system.tables WHERE name = 'check_system_tables';
DROP TABLE check_system_tables;
DROP TABLE check_system_tables_null;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册