From fd414800d5cab227e8bcc7024f218c7e847ac446 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 3 Jan 2014 08:20:13 +0000 Subject: [PATCH] dbms: added ProfileEvents system [#METR-2944]. --- dbms/include/DB/Common/ProfileEvents.h | 62 +++++++++++++++++++ dbms/include/DB/IO/CompressedReadBuffer.h | 6 ++ dbms/include/DB/IO/ReadBufferFromFile.h | 2 + .../DB/IO/ReadBufferFromFileDescriptor.h | 6 ++ dbms/include/DB/IO/UncompressedCache.h | 3 + dbms/include/DB/IO/WriteBufferFromFile.h | 4 ++ .../include/DB/Storages/StorageSystemEvents.h | 42 +++++++++++++ dbms/src/Common/ProfileEvents.cpp | 7 +++ .../Interpreters/InterpreterInsertQuery.cpp | 2 +- .../Interpreters/InterpreterSelectQuery.cpp | 2 + dbms/src/Interpreters/executeQuery.cpp | 6 ++ dbms/src/Server/Server.cpp | 6 +- dbms/src/Storages/StorageSystemEvents.cpp | 62 +++++++++++++++++++ 13 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 dbms/include/DB/Common/ProfileEvents.h create mode 100644 dbms/include/DB/Storages/StorageSystemEvents.h create mode 100644 dbms/src/Common/ProfileEvents.cpp create mode 100644 dbms/src/Storages/StorageSystemEvents.cpp diff --git a/dbms/include/DB/Common/ProfileEvents.h b/dbms/include/DB/Common/ProfileEvents.h new file mode 100644 index 0000000000..8860f25a37 --- /dev/null +++ b/dbms/include/DB/Common/ProfileEvents.h @@ -0,0 +1,62 @@ +#pragma once + +#include + + +/** Позволяет считать количество различных событий, произошедших в программе + * - для высокоуровневого профайлинга. + */ + +#define APPLY_FOR_EVENTS(M) \ + M(Query, "Queries") \ + M(SelectQuery, "Select queries") \ + M(InsertQuery, "Insert queries") \ + M(FileOpen, "File opens") \ + M(Seek, "Seeks") \ + M(ReadBufferFromFileDescriptorRead, "ReadBufferFromFileDescriptor reads") \ + M(ReadCompressedBytes, "Read compressed bytes") \ + M(CompressedReadBufferBlocks, "Read decompressed blocks") \ + M(CompressedReadBufferBytes, "Read decompressed bytes") \ + M(UncompressedCacheHits, "Uncompressed cache hits") \ + M(UncompressedCacheMisses, "Uncompressed cache misses") \ + \ + M(END, "") + +namespace ProfileEvents +{ + /// Виды событий. + enum Event + { + #define M(NAME, DESCRIPTION) NAME, + APPLY_FOR_EVENTS(M) + #undef M + }; + + + /// Получить текстовое описание события по его enum-у. + inline const char * getDescription(Event event) + { + static const char * descriptions[] = + { + #define M(NAME, DESCRIPTION) DESCRIPTION, + APPLY_FOR_EVENTS(M) + #undef M + }; + + return descriptions[event]; + } + + + /// Счётчики - сколько раз каждое из событий произошло. + extern size_t counters[Event::END]; + + + /// Увеличить счётчик события. Потокобезопасно. + inline void increment(Event event, size_t amount = 1) + { + __sync_fetch_and_add(&counters[event], amount); + } +} + + +#undef APPLY_FOR_EVENTS diff --git a/dbms/include/DB/IO/CompressedReadBuffer.h b/dbms/include/DB/IO/CompressedReadBuffer.h index 83c236e3af..3c9f44e412 100644 --- a/dbms/include/DB/IO/CompressedReadBuffer.h +++ b/dbms/include/DB/IO/CompressedReadBuffer.h @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -50,6 +51,8 @@ private: if (size_compressed > DBMS_MAX_COMPRESSED_SIZE) throw Exception("Too large size_compressed. Most likely corrupted data.", ErrorCodes::TOO_LARGE_SIZE_COMPRESSED); + ProfileEvents::increment(ProfileEvents::ReadCompressedBytes, size_compressed + sizeof(checksum)); + size_decompressed = qlz_size_decompressed(&own_compressed_buffer[0]); /// Находится ли сжатый блок целиком в буфере in? @@ -74,6 +77,9 @@ private: void decompress(char * to, size_t size_decompressed) { + ProfileEvents::increment(ProfileEvents::CompressedReadBufferBlocks); + ProfileEvents::increment(ProfileEvents::CompressedReadBufferBytes, size_decompressed); + /// Старший бит первого байта определяет использованный метод сжатия. if ((compressed_buffer[0] & 0x80) == 0) { diff --git a/dbms/include/DB/IO/ReadBufferFromFile.h b/dbms/include/DB/IO/ReadBufferFromFile.h index aa412815bf..6e8878fd9b 100644 --- a/dbms/include/DB/IO/ReadBufferFromFile.h +++ b/dbms/include/DB/IO/ReadBufferFromFile.h @@ -20,6 +20,8 @@ public: char * existing_memory = NULL, size_t alignment = 0) : ReadBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment), file_name(file_name_) { + ProfileEvents::increment(ProfileEvents::FileOpen); + fd = open(file_name.c_str(), O_RDONLY); if (-1 == fd) diff --git a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h index d97a1c9a47..4419ce0041 100644 --- a/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h +++ b/dbms/include/DB/IO/ReadBufferFromFileDescriptor.h @@ -3,6 +3,8 @@ #include #include +#include + #include #include @@ -27,6 +29,8 @@ protected: size_t bytes_read = 0; while (!bytes_read) { + ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorRead); + ssize_t res = ::read(fd, internal_buffer.begin(), internal_buffer.size()); if (!res) break; @@ -84,6 +88,8 @@ public: } else { + ProfileEvents::increment(ProfileEvents::Seek); + pos = working_buffer.end(); off_t res = lseek(fd, new_pos, SEEK_SET); if (-1 == res) diff --git a/dbms/include/DB/IO/UncompressedCache.h b/dbms/include/DB/IO/UncompressedCache.h index b3c8c86b9d..301bc9bc35 100644 --- a/dbms/include/DB/IO/UncompressedCache.h +++ b/dbms/include/DB/IO/UncompressedCache.h @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -68,11 +69,13 @@ public: if (cell && cell->key == key) { + ProfileEvents::increment(ProfileEvents::UncompressedCacheHits); ++hits; return cell; } else { + ProfileEvents::increment(ProfileEvents::UncompressedCacheMisses); ++misses; return NULL; } diff --git a/dbms/include/DB/IO/WriteBufferFromFile.h b/dbms/include/DB/IO/WriteBufferFromFile.h index 7b2d9ee4d4..d0cf7e409b 100644 --- a/dbms/include/DB/IO/WriteBufferFromFile.h +++ b/dbms/include/DB/IO/WriteBufferFromFile.h @@ -4,6 +4,8 @@ #include #include +#include + #include @@ -22,6 +24,8 @@ public: char * existing_memory = NULL, size_t alignment = 0) : WriteBufferFromFileDescriptor(-1, buf_size, existing_memory, alignment), file_name(file_name_) { + ProfileEvents::increment(ProfileEvents::FileOpen); + fd = open(file_name.c_str(), flags == -1 ? O_WRONLY | O_TRUNC | O_CREAT : flags, mode); if (-1 == fd) diff --git a/dbms/include/DB/Storages/StorageSystemEvents.h b/dbms/include/DB/Storages/StorageSystemEvents.h new file mode 100644 index 0000000000..86f632fd04 --- /dev/null +++ b/dbms/include/DB/Storages/StorageSystemEvents.h @@ -0,0 +1,42 @@ +#pragma once + +#include + +#include +#include + + +namespace DB +{ + +using Poco::SharedPtr; + + +/** Реализует системную таблицу events, которая позволяет получить информацию для профайлинга. + */ +class StorageSystemEvents : public IStorage +{ +public: + static StoragePtr create(const std::string & name_); + + std::string getName() const { return "SystemEvents"; } + std::string getTableName() const { return name; } + + const NamesAndTypesList & getColumnsList() const { return columns; } + + BlockInputStreams read( + const Names & column_names, + ASTPtr query, + const Settings & settings, + QueryProcessingStage::Enum & processed_stage, + size_t max_block_size = DEFAULT_BLOCK_SIZE, + unsigned threads = 1); + +private: + const std::string name; + NamesAndTypesList columns; + + StorageSystemEvents(const std::string & name_); +}; + +} diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp new file mode 100644 index 0000000000..24b025f36a --- /dev/null +++ b/dbms/src/Common/ProfileEvents.cpp @@ -0,0 +1,7 @@ +#include + + +namespace ProfileEvents +{ + size_t counters[Event::END]; /// Глобальная переменная - инициализируется нулями. +} diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp index 12b4271273..967db071f0 100644 --- a/dbms/src/Interpreters/InterpreterInsertQuery.cpp +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -19,7 +19,7 @@ namespace DB InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_) : query_ptr(query_ptr_), context(context_) { - + ProfileEvents::increment(ProfileEvents::InsertQuery); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index d36a1722e1..9555d3e077 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -29,6 +29,8 @@ namespace DB void InterpreterSelectQuery::init(BlockInputStreamPtr input_) { + ProfileEvents::increment(ProfileEvents::SelectQuery); + if (settings.limits.max_subquery_depth && subquery_depth > settings.limits.max_subquery_depth) throw Exception("Too deep subqueries. Maximum: " + toString(settings.limits.max_subquery_depth), ErrorCodes::TOO_DEEP_SUBQUERIES); diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index 7f2c3981c4..4788c81af4 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -1,3 +1,5 @@ +#include + #include #include @@ -25,6 +27,8 @@ void executeQuery( bool internal, QueryProcessingStage::Enum stage) { + ProfileEvents::increment(ProfileEvents::Query); + ParserQuery parser; ASTPtr ast; std::string expected; @@ -112,6 +116,8 @@ BlockIO executeQuery( bool internal, QueryProcessingStage::Enum stage) { + ProfileEvents::increment(ProfileEvents::Query); + ParserQuery parser; ASTPtr ast; std::string expected; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 93f4427d15..24b50ae107 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include "Server.h" @@ -126,8 +127,9 @@ int Server::main(const std::vector & args) global_context->addTable("system", "one", StorageSystemOne::create("one")); global_context->addTable("system", "numbers", StorageSystemNumbers::create("numbers")); global_context->addTable("system", "tables", StorageSystemTables::create("tables", *global_context)); - global_context->addTable("system", "databases", StorageSystemDatabases::create("databases", *global_context)); - global_context->addTable("system", "processes", StorageSystemProcesses::create("processes", *global_context)); + global_context->addTable("system", "databases", StorageSystemDatabases::create("databases", *global_context)); + global_context->addTable("system", "processes", StorageSystemProcesses::create("processes", *global_context)); + global_context->addTable("system", "events", StorageSystemEvents::create("events")); global_context->setCurrentDatabase(config.getString("default_database", "default")); diff --git a/dbms/src/Storages/StorageSystemEvents.cpp b/dbms/src/Storages/StorageSystemEvents.cpp new file mode 100644 index 0000000000..da6e118034 --- /dev/null +++ b/dbms/src/Storages/StorageSystemEvents.cpp @@ -0,0 +1,62 @@ +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +StorageSystemEvents::StorageSystemEvents(const std::string & name_) + : name(name_) +{ + columns.push_back(NameAndTypePair("event", new DataTypeString)); + columns.push_back(NameAndTypePair("value", new DataTypeUInt64)); +} + +StoragePtr StorageSystemEvents::create(const std::string & name_) +{ + return (new StorageSystemEvents(name_))->thisPtr(); +} + + +BlockInputStreams StorageSystemEvents::read( + const Names & column_names, ASTPtr query, const Settings & settings, + QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) +{ + check(column_names); + processed_stage = QueryProcessingStage::FetchColumns; + + Block block; + + ColumnWithNameAndType col_event; + col_event.name = "event"; + col_event.type = new DataTypeString; + col_event.column = new ColumnString; + block.insert(col_event); + + ColumnWithNameAndType col_value; + col_value.name = "value"; + col_value.type = new DataTypeUInt64; + col_value.column = new ColumnUInt64; + block.insert(col_value); + + for (size_t i = 0; i < ProfileEvents::END; ++i) + { + UInt64 value = ProfileEvents::counters[i]; + + if (0 != value) + { + col_event.column->insert(String(ProfileEvents::getDescription(ProfileEvents::Event(i)))); + col_value.column->insert(value); + } + } + + return BlockInputStreams(1, new OneBlockInputStream(block)); +} + + +} -- GitLab