diff --git a/dbms/include/DB/DataStreams/BlockIO.h b/dbms/include/DB/DataStreams/BlockIO.h index 6c3c83a63bdb5227ffab69917a1b0d13b914a198..b0e69bbb27d212ca1e215d40129ed176dcefc807 100644 --- a/dbms/include/DB/DataStreams/BlockIO.h +++ b/dbms/include/DB/DataStreams/BlockIO.h @@ -2,12 +2,13 @@ #include #include -#include namespace DB { +class ProcessListEntry; + struct BlockIO { /** process_list_entry должен уничтожаться позже, чем in и out, @@ -15,7 +16,7 @@ struct BlockIO * (MemoryTracker * current_memory_tracker), * которая может использоваться до уничтожения in и out. */ - ProcessList::EntryPtr process_list_entry; + std::shared_ptr process_list_entry; BlockInputStreamPtr in; BlockOutputStreamPtr out; @@ -38,6 +39,8 @@ struct BlockIO return *this; } + + ~BlockIO(); }; } diff --git a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h index 0b8fbe16a62d2df4bafab07bdadc173a39bb3a82..95f16118c3c19de74688a2353dfb9edfe4c39c9c 100644 --- a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h @@ -3,7 +3,6 @@ #include #include -#include #include #include @@ -14,6 +13,7 @@ namespace DB { class QuotaForIntervals; +class ProcessListElement; /** Смотрит за тем, как работает источник блоков. @@ -82,7 +82,7 @@ public: * На основе этой информации будет проверяться квота, и некоторые ограничения. * Также эта информация будет доступна в запросе SHOW PROCESSLIST. */ - void setProcessListElement(ProcessList::Element * elem); + void setProcessListElement(ProcessListElement * elem); /** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать. */ @@ -154,7 +154,7 @@ protected: BlockStreamProfileInfo info; std::atomic is_cancelled{false}; ProgressCallback progress_callback; - ProcessList::Element * process_list_elem = nullptr; + ProcessListElement * process_list_elem = nullptr; bool enabled_extremes = false; diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 40aba52e0698fc35a572740fc4a8be52027f6564..7bdbb4e3e9562bdb1de393efe58aa112d2d74cae 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -2,7 +2,7 @@ #include #include -#include +#include #include #include #include @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -36,13 +37,16 @@ struct ProcessListElement MemoryTracker memory_tracker; + QueryPriorities::Handle priority_handle; + bool is_cancelled = false; ProcessListElement(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - size_t max_memory_usage) - : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage) + size_t max_memory_usage, QueryPriorities::Handle && priority_handle_) + : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage), + priority_handle(std::move(priority_handle_)) { current_memory_tracker = &memory_tracker; } @@ -55,126 +59,81 @@ struct ProcessListElement bool update(const Progress & value) { progress.incrementPiecewiseAtomically(value); + + if (priority_handle) + priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут. + return !is_cancelled; } }; +class ProcessList; + + +/// Держит итератор на список, и удаляет элемент из списка в деструкторе. +class ProcessListEntry +{ +private: + using Container = std::list; + + ProcessList & parent; + Container::iterator it; +public: + ProcessListEntry(ProcessList & parent_, Container::iterator it_) + : parent(parent_), it(it_) {} + + ~ProcessListEntry(); + + ProcessListElement * operator->() { return &*it; } + const ProcessListElement * operator->() const { return &*it; } + + ProcessListElement & get() { return *it; } + const ProcessListElement & get() const { return *it; } +}; + + class ProcessList { - friend class Entry; + friend class ProcessListEntry; public: using Element = ProcessListElement; + using Entry = ProcessListEntry; /// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем. - typedef std::list Containter; + using Container = std::list; /// Query_id -> Element * - typedef std::unordered_map QueryToElement; + using QueryToElement = std::unordered_map; /// User -> Query_id -> Element * - typedef std::unordered_map UserToQueries; + using UserToQueries = std::unordered_map; private: mutable Poco::FastMutex mutex; mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального. - Containter cont; + Container cont; size_t cur_size; /// В C++03 std::list::size не O(1). size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение. UserToQueries user_to_queries; - - /// Держит итератор на список, и удаляет элемент из списка в деструкторе. - class Entry - { - private: - ProcessList & parent; - Containter::iterator it; - public: - Entry(ProcessList & parent_, Containter::iterator it_) - : parent(parent_), it(it_) {} - - ~Entry() - { - Poco::ScopedLock lock(parent.mutex); - - /// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены. - if (!it->is_cancelled && !it->query_id.empty()) - { - UserToQueries::iterator queries = parent.user_to_queries.find(it->user); - if (queries != parent.user_to_queries.end()) - { - QueryToElement::iterator element = queries->second.find(it->query_id); - if (element != queries->second.end()) - queries->second.erase(element); - } - } - - parent.cont.erase(it); - --parent.cur_size; - parent.have_space.signal(); - } - - Element * operator->() { return &*it; } - const Element * operator->() const { return &*it; } - - Element & get() { return *it; } - const Element & get() const { return *it; } - }; + QueryPriorities priorities; public: ProcessList(size_t max_size_ = 0) : cur_size(0), max_size(max_size_) {} - typedef Poco::SharedPtr EntryPtr; + typedef std::shared_ptr EntryPtr; /** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении. * Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени. * Если времени не хватило - кинуть исключение. */ EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - size_t max_memory_usage = 0, size_t max_wait_milliseconds = DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, bool replace_running_query = false) - { - EntryPtr res; - - { - Poco::ScopedLock lock(mutex); - - if (max_size && cur_size >= max_size && (!max_wait_milliseconds || !have_space.tryWait(mutex, max_wait_milliseconds))) - throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES); - - if (!query_id_.empty()) - { - UserToQueries::iterator queries = user_to_queries.find(user_); - - if (queries != user_to_queries.end()) - { - QueryToElement::iterator element = queries->second.find(query_id_); - if (element != queries->second.end()) - { - if (!replace_running_query) - throw Exception("Query with id = " + query_id_ + " is already running.", - ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); - element->second->is_cancelled = true; - /// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены. - queries->second.erase(element); - } - } - } - - ++cur_size; - - res = new Entry(*this, cont.emplace(cont.end(), query_, user_, query_id_, ip_address_, max_memory_usage)); - - if (!query_id_.empty()) - user_to_queries[user_][query_id_] = &res->get(); - } - - return res; - } + size_t max_memory_usage, size_t max_wait_milliseconds, bool replace_running_query, QueryPriorities::Priority priority); /// Количество одновременно выполняющихся запросов. size_t size() const { return cur_size; } /// Получить текущее состояние (копию) списка запросов. - Containter get() const + Container get() const { Poco::ScopedLock lock(mutex); return cont; diff --git a/dbms/include/DB/Interpreters/QueryPriorities.h b/dbms/include/DB/Interpreters/QueryPriorities.h new file mode 100644 index 0000000000000000000000000000000000000000..5239fe696316a68a5f37fec55d13a0ce2ba89241 --- /dev/null +++ b/dbms/include/DB/Interpreters/QueryPriorities.h @@ -0,0 +1,117 @@ +#pragma once + +#include +#include +#include +#include +#include + + +/** Реализует приоритеты запросов. + * Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос. + * + * Величина приоритета - целое число, чем меньше - тем больше приоритет. + * + * Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда, + * не зависят от других запросов и не влияют на другие запросы. + * То есть 0 означает - не использовать приоритеты. + * + * NOTE Возможности сделать лучше: + * - реализовать ограничение на максимальное количество запросов с таким приоритетом. + */ +class QueryPriorities +{ +public: + using Priority = int; + +private: + friend struct Handle; + + using Count = int; + + /// Количество выполняющихся сейчас запросов с заданным приоритетом. + using Container = std::map; + + std::mutex mutex; + std::condition_variable condvar; + Container container; + + + /** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут. + * Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут. + */ + template + bool waitIfNeed(Priority priority, Duration timeout) + { + if (0 == priority) + return true; + + std::unique_lock lock(mutex); + + while (true) + { + /// Если ли хотя бы один более приоритетный запрос? + bool found = false; + for (const auto & value : container) + { + if (value.first >= priority) + break; + + if (value.second > 0) + { + found = true; + break; + } + } + + if (!found) + return true; + + if (std::cv_status::timeout == condvar.wait_for(lock, timeout)) + return false; + } + } + +public: + struct HandleImpl + { + private: + QueryPriorities & parent; + QueryPriorities::Container::value_type & value; + + public: + HandleImpl(QueryPriorities & parent_, QueryPriorities::Container::value_type & value_) + : parent(parent_), value(value_) {} + + ~HandleImpl() + { + { + std::lock_guard lock(parent.mutex); + --value.second; + } + parent.condvar.notify_all(); + } + + template + bool waitIfNeed(Duration timeout) + { + return parent.waitIfNeed(value.first, timeout); + } + }; + + using Handle = std::shared_ptr; + + /** Зарегистрировать, что запрос с заданным приоритетом выполняется. + * Возвращается объект, в деструкторе которого, запись о запросе удаляется. + */ + Handle insert(Priority priority) + { + if (0 == priority) + return {}; + + std::lock_guard lock(mutex); + auto it = container.emplace(priority, 0).first; + ++it->second; + return std::make_shared(*this, *it); + } +}; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index e178e784936027f93a7742428c505943e0dfb699..8d799249f71d2b8af3fabc9d6f34586e1ae1f0a6 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -132,6 +132,9 @@ struct Settings \ /** Позволяет выбирать метод сжатия данных при записи */\ M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4) \ + \ + /** Приоритет запроса. 1 - самый высокий, больше - ниже; 0 - не использовать приоритеты. */ \ + M(SettingUInt64, priority, 0) \ /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/src/DataStreams/BlockIO.cpp b/dbms/src/DataStreams/BlockIO.cpp new file mode 100644 index 0000000000000000000000000000000000000000..83b0fac54e22d94eb4c4c60bbd2b11f65880f394 --- /dev/null +++ b/dbms/src/DataStreams/BlockIO.cpp @@ -0,0 +1,9 @@ +#include +#include + +namespace DB +{ + +BlockIO::~BlockIO() = default; + +} diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index e3a14875e2bed83e75b86b446e2658dfb65d7732..93216ea73f42a854ab62fa2a6e7dc628693c2fe3 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -5,6 +5,7 @@ #include #include +#include #include @@ -320,7 +321,7 @@ void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback) } -void IProfilingBlockInputStream::setProcessListElement(ProcessList::Element * elem) +void IProfilingBlockInputStream::setProcessListElement(ProcessListElement * elem) { process_list_elem = elem; diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e15bef4002f284dc49464f84bd33b275d008b662 --- /dev/null +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -0,0 +1,72 @@ +#include + +namespace DB +{ + + +ProcessList::EntryPtr ProcessList::insert( + const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, + size_t max_memory_usage, size_t max_wait_milliseconds, bool replace_running_query, QueryPriorities::Priority priority) +{ + EntryPtr res; + + { + Poco::ScopedLock lock(mutex); + + if (max_size && cur_size >= max_size && (!max_wait_milliseconds || !have_space.tryWait(mutex, max_wait_milliseconds))) + throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES); + + if (!query_id_.empty()) + { + UserToQueries::iterator queries = user_to_queries.find(user_); + + if (queries != user_to_queries.end()) + { + QueryToElement::iterator element = queries->second.find(query_id_); + if (element != queries->second.end()) + { + if (!replace_running_query) + throw Exception("Query with id = " + query_id_ + " is already running.", + ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); + element->second->is_cancelled = true; + /// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены. + queries->second.erase(element); + } + } + } + + ++cur_size; + + res.reset(new Entry(*this, cont.emplace(cont.end(), + query_, user_, query_id_, ip_address_, max_memory_usage, priorities.insert(priority)))); + + if (!query_id_.empty()) + user_to_queries[user_][query_id_] = &res->get(); + } + + return res; +} + + +ProcessListEntry::~ProcessListEntry() +{ + Poco::ScopedLock lock(parent.mutex); + + /// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены. + if (!it->is_cancelled && !it->query_id.empty()) + { + ProcessList::UserToQueries::iterator queries = parent.user_to_queries.find(it->user); + if (queries != parent.user_to_queries.end()) + { + ProcessList::QueryToElement::iterator element = queries->second.find(it->query_id); + if (element != queries->second.end()) + queries->second.erase(element); + } + } + + parent.cont.erase(it); + --parent.cur_size; + parent.have_space.signal(); +} + +} diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index b8d79f5751ef31f2c45ba0a815921cd191062a39..91fdca2b2ad921bde889d214be95a5cafe3e9003 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -14,6 +14,7 @@ #include #include +#include #include @@ -88,11 +89,14 @@ static std::tuple executeQueryImpl( ProcessList::EntryPtr process_list_entry; if (!internal && nullptr == typeid_cast(&*ast)) { + const Settings & settings = context.getSettingsRef(); + process_list_entry = context.getProcessList().insert( query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(), - context.getSettingsRef().limits.max_memory_usage, - context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(), - context.getSettingsRef().replace_running_query); + settings.limits.max_memory_usage, + settings.queue_max_wait_ms.totalMilliseconds(), + settings.replace_running_query, + settings.priority); context.setProcessListElement(&process_list_entry->get()); } diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 09124ad946fd2dddec47dc054d29a78600b8a987..2188ae8a15c302b0824cc6488756758c43c03bc6 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/StorageSystemProcesses.cpp b/dbms/src/Storages/StorageSystemProcesses.cpp index 4baa62436a2f2fe3961389f7a7bff17b6df7f635..ed083c6d9da8256d1c2c1b55d225b62581fd80b5 100644 --- a/dbms/src/Storages/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/StorageSystemProcesses.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include