diff --git a/dbms/include/DB/Columns/ColumnReplicated.h b/dbms/include/DB/Columns/ColumnReplicated.h deleted file mode 100644 index 32abf953c6723ba96d4be8f245f7818fd94c9016..0000000000000000000000000000000000000000 --- a/dbms/include/DB/Columns/ColumnReplicated.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once - -#include - - -namespace DB -{ - -/** Содержит промежуточные данные для вычисления выражений в функциях высшего порядка. - * Это - вложенный столбец произвольного размера. - * Сам ColumnReplicated притворяется, как столбец указанного в конструкторе размера. - */ -class ColumnReplicated final : public IColumnDummy -{ -public: - ColumnReplicated(size_t s_, ColumnPtr nested_) : IColumnDummy(s_), nested(nested_) {} - std::string getName() const override { return "ColumnReplicated"; } - ColumnPtr cloneDummy(size_t s_) const override { return new ColumnReplicated(s_, nested); } - - ColumnPtr & getData() { return nested; } -private: - ColumnPtr nested; -}; - -} diff --git a/dbms/include/DB/Columns/IColumnDummy.h b/dbms/include/DB/Columns/IColumnDummy.h index 509c56b14a87d41eddfb9323404f2f31dac7de73..d9c559f68f8a0e4ec27cb5ae3957f62637fd63ce 100644 --- a/dbms/include/DB/Columns/IColumnDummy.h +++ b/dbms/include/DB/Columns/IColumnDummy.h @@ -41,12 +41,7 @@ public: ColumnPtr filter(const Filter & filt) const override { - size_t new_size = 0; - for (Filter::const_iterator it = filt.begin(); it != filt.end(); ++it) - if (*it) - ++new_size; - - return cloneDummy(new_size); + return cloneDummy(countBytesInFilter(filt)); } ColumnPtr permute(const Permutation & perm, size_t limit) const override diff --git a/dbms/include/DB/DataStreams/BlockIO.h b/dbms/include/DB/DataStreams/BlockIO.h index 6c3c83a63bdb5227ffab69917a1b0d13b914a198..b0e69bbb27d212ca1e215d40129ed176dcefc807 100644 --- a/dbms/include/DB/DataStreams/BlockIO.h +++ b/dbms/include/DB/DataStreams/BlockIO.h @@ -2,12 +2,13 @@ #include #include -#include namespace DB { +class ProcessListEntry; + struct BlockIO { /** process_list_entry должен уничтожаться позже, чем in и out, @@ -15,7 +16,7 @@ struct BlockIO * (MemoryTracker * current_memory_tracker), * которая может использоваться до уничтожения in и out. */ - ProcessList::EntryPtr process_list_entry; + std::shared_ptr process_list_entry; BlockInputStreamPtr in; BlockOutputStreamPtr out; @@ -38,6 +39,8 @@ struct BlockIO return *this; } + + ~BlockIO(); }; } diff --git a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h index 0b8fbe16a62d2df4bafab07bdadc173a39bb3a82..95f16118c3c19de74688a2353dfb9edfe4c39c9c 100644 --- a/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/IProfilingBlockInputStream.h @@ -3,7 +3,6 @@ #include #include -#include #include #include @@ -14,6 +13,7 @@ namespace DB { class QuotaForIntervals; +class ProcessListElement; /** Смотрит за тем, как работает источник блоков. @@ -82,7 +82,7 @@ public: * На основе этой информации будет проверяться квота, и некоторые ограничения. * Также эта информация будет доступна в запросе SHOW PROCESSLIST. */ - void setProcessListElement(ProcessList::Element * elem); + void setProcessListElement(ProcessListElement * elem); /** Установить информацию о приблизительном общем количестве строк, которых нужно прочитать. */ @@ -154,7 +154,7 @@ protected: BlockStreamProfileInfo info; std::atomic is_cancelled{false}; ProgressCallback progress_callback; - ProcessList::Element * process_list_elem = nullptr; + ProcessListElement * process_list_elem = nullptr; bool enabled_extremes = false; diff --git a/dbms/include/DB/Functions/FunctionsHigherOrder.h b/dbms/include/DB/Functions/FunctionsHigherOrder.h index 1194a8498557268dcd19e2a9075284d094c75759..d04bf89620fc2bd03d93e31319c5baba48a87337 100644 --- a/dbms/include/DB/Functions/FunctionsHigherOrder.h +++ b/dbms/include/DB/Functions/FunctionsHigherOrder.h @@ -5,7 +5,6 @@ #include #include -#include #include #include @@ -580,7 +579,7 @@ public: ColumnWithNameAndType replicated_column = block.getByPosition(prerequisites[prerequisite_index]); replicated_column.name = name; - replicated_column.column = typeid_cast(*replicated_column.column).getData(); + replicated_column.column = typeid_cast(*replicated_column.column).getDataPtr(); temp_block.insert(replicated_column); ++prerequisite_index; diff --git a/dbms/include/DB/Functions/FunctionsMiscellaneous.h b/dbms/include/DB/Functions/FunctionsMiscellaneous.h index de7d94274dd93ff55ca11005cb0bbe94238eb76a..063203625014600bac6c7d944b337afddad7122a 100644 --- a/dbms/include/DB/Functions/FunctionsMiscellaneous.h +++ b/dbms/include/DB/Functions/FunctionsMiscellaneous.h @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -51,9 +50,8 @@ namespace DB * arrayJoin(arr) - особая функция - выполнить её напрямую нельзя; * используется только чтобы получить тип результата соответствующего выражения. * - * replicate(x, arr) - копирует x столько раз, сколько элементов в массиве arr; - * например: replicate(1, ['a', 'b', 'c']) = 1, 1, 1. - * не предназначена для пользователя, а используется только как prerequisites для функций высшего порядка. + * replicate(x, arr) - создаёт массив такого же размера как arr, все элементы которого равны x; + * например: replicate(1, ['a', 'b', 'c']) = [1, 1, 1]. * * sleep(n) - спит n секунд каждый блок. * @@ -570,18 +568,15 @@ public: }; -/** Размножает столбец (первый аргумент) по количеству элементов в массиве (втором аргументе). - * Не предназначена для внешнего использования. - * Так как возвращаемый столбец будет иметь несовпадающий размер с исходными, - * то результат не может быть потом использован в том же блоке, что и аргументы. +/** Создаёт массив, размножая столбец (первый аргумент) по количеству элементов в массиве (втором аргументе). * Используется только в качестве prerequisites для функций высшего порядка. */ class FunctionReplicate : public IFunction { +public: static constexpr auto name = "replicate"; static IFunction * create(const Context & context) { return new FunctionReplicate; } - /// Получить имя функции. String getName() const { @@ -600,7 +595,7 @@ class FunctionReplicate : public IFunction if (!array_type) throw Exception("Second argument for function " + getName() + " must be array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); - return arguments[0]->clone(); + return new DataTypeArray(arguments[0]->clone()); } /// Выполнить функцию над блоком. @@ -620,7 +615,9 @@ class FunctionReplicate : public IFunction array_column = typeid_cast(&*temp_column); } - block.getByPosition(result).column = new ColumnReplicated(first_column->size(), first_column->replicate(array_column->getOffsets())); + block.getByPosition(result).column = new ColumnArray( + first_column->replicate(array_column->getOffsets()), + array_column->getOffsetsColumn()); } }; diff --git a/dbms/include/DB/Interpreters/ProcessList.h b/dbms/include/DB/Interpreters/ProcessList.h index 40aba52e0698fc35a572740fc4a8be52027f6564..7bdbb4e3e9562bdb1de393efe58aa112d2d74cae 100644 --- a/dbms/include/DB/Interpreters/ProcessList.h +++ b/dbms/include/DB/Interpreters/ProcessList.h @@ -2,7 +2,7 @@ #include #include -#include +#include #include #include #include @@ -13,6 +13,7 @@ #include #include #include +#include namespace DB @@ -36,13 +37,16 @@ struct ProcessListElement MemoryTracker memory_tracker; + QueryPriorities::Handle priority_handle; + bool is_cancelled = false; ProcessListElement(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - size_t max_memory_usage) - : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage) + size_t max_memory_usage, QueryPriorities::Handle && priority_handle_) + : query(query_), user(user_), query_id(query_id_), ip_address(ip_address_), memory_tracker(max_memory_usage), + priority_handle(std::move(priority_handle_)) { current_memory_tracker = &memory_tracker; } @@ -55,126 +59,81 @@ struct ProcessListElement bool update(const Progress & value) { progress.incrementPiecewiseAtomically(value); + + if (priority_handle) + priority_handle->waitIfNeed(std::chrono::seconds(1)); /// NOTE Можно сделать настраиваемым таймаут. + return !is_cancelled; } }; +class ProcessList; + + +/// Держит итератор на список, и удаляет элемент из списка в деструкторе. +class ProcessListEntry +{ +private: + using Container = std::list; + + ProcessList & parent; + Container::iterator it; +public: + ProcessListEntry(ProcessList & parent_, Container::iterator it_) + : parent(parent_), it(it_) {} + + ~ProcessListEntry(); + + ProcessListElement * operator->() { return &*it; } + const ProcessListElement * operator->() const { return &*it; } + + ProcessListElement & get() { return *it; } + const ProcessListElement & get() const { return *it; } +}; + + class ProcessList { - friend class Entry; + friend class ProcessListEntry; public: using Element = ProcessListElement; + using Entry = ProcessListEntry; /// list, чтобы итераторы не инвалидировались. NOTE: можно заменить на cyclic buffer, но почти незачем. - typedef std::list Containter; + using Container = std::list; /// Query_id -> Element * - typedef std::unordered_map QueryToElement; + using QueryToElement = std::unordered_map; /// User -> Query_id -> Element * - typedef std::unordered_map UserToQueries; + using UserToQueries = std::unordered_map; private: mutable Poco::FastMutex mutex; mutable Poco::Condition have_space; /// Количество одновременно выполняющихся запросов стало меньше максимального. - Containter cont; + Container cont; size_t cur_size; /// В C++03 std::list::size не O(1). size_t max_size; /// Если 0 - не ограничено. Иначе, если пытаемся добавить больше - кидается исключение. UserToQueries user_to_queries; - - /// Держит итератор на список, и удаляет элемент из списка в деструкторе. - class Entry - { - private: - ProcessList & parent; - Containter::iterator it; - public: - Entry(ProcessList & parent_, Containter::iterator it_) - : parent(parent_), it(it_) {} - - ~Entry() - { - Poco::ScopedLock lock(parent.mutex); - - /// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены. - if (!it->is_cancelled && !it->query_id.empty()) - { - UserToQueries::iterator queries = parent.user_to_queries.find(it->user); - if (queries != parent.user_to_queries.end()) - { - QueryToElement::iterator element = queries->second.find(it->query_id); - if (element != queries->second.end()) - queries->second.erase(element); - } - } - - parent.cont.erase(it); - --parent.cur_size; - parent.have_space.signal(); - } - - Element * operator->() { return &*it; } - const Element * operator->() const { return &*it; } - - Element & get() { return *it; } - const Element & get() const { return *it; } - }; + QueryPriorities priorities; public: ProcessList(size_t max_size_ = 0) : cur_size(0), max_size(max_size_) {} - typedef Poco::SharedPtr EntryPtr; + typedef std::shared_ptr EntryPtr; /** Зарегистрировать выполняющийся запрос. Возвращает refcounted объект, который удаляет запрос из списка при уничтожении. * Если выполняющихся запросов сейчас слишком много - ждать не более указанного времени. * Если времени не хватило - кинуть исключение. */ EntryPtr insert(const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, - size_t max_memory_usage = 0, size_t max_wait_milliseconds = DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, bool replace_running_query = false) - { - EntryPtr res; - - { - Poco::ScopedLock lock(mutex); - - if (max_size && cur_size >= max_size && (!max_wait_milliseconds || !have_space.tryWait(mutex, max_wait_milliseconds))) - throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES); - - if (!query_id_.empty()) - { - UserToQueries::iterator queries = user_to_queries.find(user_); - - if (queries != user_to_queries.end()) - { - QueryToElement::iterator element = queries->second.find(query_id_); - if (element != queries->second.end()) - { - if (!replace_running_query) - throw Exception("Query with id = " + query_id_ + " is already running.", - ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); - element->second->is_cancelled = true; - /// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены. - queries->second.erase(element); - } - } - } - - ++cur_size; - - res = new Entry(*this, cont.emplace(cont.end(), query_, user_, query_id_, ip_address_, max_memory_usage)); - - if (!query_id_.empty()) - user_to_queries[user_][query_id_] = &res->get(); - } - - return res; - } + size_t max_memory_usage, size_t max_wait_milliseconds, bool replace_running_query, QueryPriorities::Priority priority); /// Количество одновременно выполняющихся запросов. size_t size() const { return cur_size; } /// Получить текущее состояние (копию) списка запросов. - Containter get() const + Container get() const { Poco::ScopedLock lock(mutex); return cont; diff --git a/dbms/include/DB/Interpreters/QueryLog.h b/dbms/include/DB/Interpreters/QueryLog.h new file mode 100644 index 0000000000000000000000000000000000000000..50f21c31b34fc78bd065717256ca947822fad4ac --- /dev/null +++ b/dbms/include/DB/Interpreters/QueryLog.h @@ -0,0 +1,245 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + + +/** Позволяет логгировать информацию о выполнении запросов: + * - о начале выполнения запроса; + * - метрики производительности, после выполнения запроса; + * - об ошибках при выполнении запроса. + * + * Логгирование производится асинхронно. Данные передаются в очередь, откуда их читает отдельный поток. + * Этот поток записывает лог в предназначенную для этого таблицу не чаще, чем с заданной периодичностью. + */ + +/** Что логгировать. + * Структура может меняться при изменении версии сервера. + * Если при первой записи обнаруживается, что имеющаяся таблица с логами имеет неподходящую стрктуру, + * то эта таблица переименовывается (откладывается в сторону) и создаётся новая таблица. + */ +struct QueryLogElement +{ + enum Type + { + SHUTDOWN = 0, /// Эта запись имеет служебное значение. + QUERY_START = 1, + QUERY_FINISH = 2, + }; + + enum Interface + { + TCP = 1, + HTTP = 2, + OLAP_HTTP = 3, + }; + + enum HTTPMethod + { + UNKNOWN = 0, + GET = 1, + POST = 2, + }; + + Type type; + + /// В зависимости от типа, не все поля могут быть заполнены. + + time_t event_time; + time_t query_start_time; + UInt64 query_duration_ms; + + UInt64 read_rows; + UInt64 read_bytes; + + UInt64 result_rows; + UInt64 result_bytes; + + String query; + + Interface interface; + HTTPMethod http_method; + Poco::Net::IPAddress ip_address; + String user; + String query_id; +}; + + +#define DBMS_QUERY_LOG_QUEUE_SIZE 1024 + + +class QueryLog : private boost::noncopyable +{ +public: + + /** Передаётся имя таблицы, в которую писать лог. + * Если таблица не существует, то она создаётся с движком MergeTree, с ключём по event_time. + * Если таблица существует, то проверяется, подходящая ли у неё структура. + * Если структура подходящая, то будет использоваться эта таблица. + * Если нет - то существующая таблица переименовывается в такую же, но с добавлением суффикса _N на конце, + * где N - минимальное число, начиная с 1 такое, что таблицы с таким именем ещё нет; + * и создаётся новая таблица, как будто существующей таблицы не было. + */ + QueryLog(Context & context_, const String & database_name_, const String & table_name_, size_t flush_interval_milliseconds_) + : context(context_), database_name(database_name_), table_name(table_name_), flush_interval_milliseconds(flush_interval_milliseconds_) + { + data.reserve(DBMS_QUERY_LOG_QUEUE_SIZE); + + // TODO + + saving_thread = std::thread([this] { threadFunction(); }); + } + + ~QueryLog() + { + /// Говорим потоку, что надо завершиться. + QueryLogElement elem; + elem.type = QueryLogElement::SHUTDOWN; + queue.push(elem); + + saving_thread.join(); + } + + /** Добавить запись в лог. + * Сохранение в таблицу делается асинхронно, и в случае сбоя, запись может никуда не попасть. + */ + void add(const QueryLogElement & element) + { + /// Здесь может быть блокировка. Возможно, в случае переполнения очереди, лучше сразу кидать эксепшен. Или даже отказаться от логгирования запроса. + queue.push(element); + } + +private: + Context & context; + const String database_name; + const String table_name; + StoragePtr table; + const size_t flush_interval_milliseconds; + + /// Очередь всё-таки ограничена. Но размер достаточно большой, чтобы не блокироваться во всех нормальных ситуациях. + ConcurrentBoundedQueue queue {DBMS_QUERY_LOG_QUEUE_SIZE}; + + /** Данные, которые были вынуты из очереди. Здесь данные накапливаются, пока не пройдёт достаточное количество времени. + * Можно было бы использовать двойную буферизацию, но предполагается, + * что запись в таблицу с логом будет быстрее, чем обработка большой пачки запросов. + */ + std::vector data; + + /** В этом потоке данные вынимаются из queue, складываются в data, а затем вставляются в таблицу. + */ + std::thread saving_thread; + + + void threadFunction() + { + Stopwatch time_after_last_write; + bool first = true; + + while (true) + { + try + { + if (first) + { + time_after_last_write.restart(); + first = false; + } + + QueryLogElement element; + bool has_element = false; + + if (data.empty()) + { + element = queue.pop(); + has_element = true; + } + else + { + size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; + if (milliseconds_elapsed < flush_interval_milliseconds) + has_element = queue.tryPop(element, flush_interval_milliseconds - milliseconds_elapsed); + } + + if (has_element) + { + if (element.type = QueryLogElement::SHUTDOWN) + { + flush(); + break; + } + else + data.push_back(element); + } + + size_t milliseconds_elapsed = time_after_last_write.elapsed() / 1000000; + if (milliseconds_elapsed >= flush_interval_milliseconds) + { + /// Записываем данные в таблицу. + flush(); + time_after_last_write.restart(); + } + } + catch (...) + { + /// В случае ошибки теряем накопленные записи, чтобы не блокироваться. + data.clear(); + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + } + + Block createBlock() + { + return { + {new ColumnUInt8, new DataTypeUInt8, "type"}, + {new ColumnUInt32, new DataTypeDateTime, "event_time"}, + {new ColumnUInt32, new DataTypeDateTime, "query_start_time"}, + }; + + /* time_t event_time; + time_t query_start_time; + UInt64 query_duration_ms; + + UInt64 read_rows; + UInt64 read_bytes; + + UInt64 result_rows; + UInt64 result_bytes; + + String query; + + Interface interface; + HTTPMethod http_method; + Poco::Net::IPAddress ip_address; + String user; + String query_id;*/ + } + + void flush() + { + try + { + Block block = createBlock(); + + // TODO Формирование блока и запись. + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + data.clear(); + } +}; + + +} diff --git a/dbms/include/DB/Interpreters/QueryPriorities.h b/dbms/include/DB/Interpreters/QueryPriorities.h new file mode 100644 index 0000000000000000000000000000000000000000..5239fe696316a68a5f37fec55d13a0ce2ba89241 --- /dev/null +++ b/dbms/include/DB/Interpreters/QueryPriorities.h @@ -0,0 +1,117 @@ +#pragma once + +#include +#include +#include +#include +#include + + +/** Реализует приоритеты запросов. + * Позволяет приостанавливать выполнение запроса, если выполняется хотя бы один более приоритетный запрос. + * + * Величина приоритета - целое число, чем меньше - тем больше приоритет. + * + * Приоритет 0 считается особенным - запросы с таким приоритетом выполняются всегда, + * не зависят от других запросов и не влияют на другие запросы. + * То есть 0 означает - не использовать приоритеты. + * + * NOTE Возможности сделать лучше: + * - реализовать ограничение на максимальное количество запросов с таким приоритетом. + */ +class QueryPriorities +{ +public: + using Priority = int; + +private: + friend struct Handle; + + using Count = int; + + /// Количество выполняющихся сейчас запросов с заданным приоритетом. + using Container = std::map; + + std::mutex mutex; + std::condition_variable condvar; + Container container; + + + /** Если есть более приоритетные запросы - спать, пока они не перестанут быть или не истечёт таймаут. + * Возвращает true, если более приоритетные запросы исчезли на момент возврата из функции, false, если истёк таймаут. + */ + template + bool waitIfNeed(Priority priority, Duration timeout) + { + if (0 == priority) + return true; + + std::unique_lock lock(mutex); + + while (true) + { + /// Если ли хотя бы один более приоритетный запрос? + bool found = false; + for (const auto & value : container) + { + if (value.first >= priority) + break; + + if (value.second > 0) + { + found = true; + break; + } + } + + if (!found) + return true; + + if (std::cv_status::timeout == condvar.wait_for(lock, timeout)) + return false; + } + } + +public: + struct HandleImpl + { + private: + QueryPriorities & parent; + QueryPriorities::Container::value_type & value; + + public: + HandleImpl(QueryPriorities & parent_, QueryPriorities::Container::value_type & value_) + : parent(parent_), value(value_) {} + + ~HandleImpl() + { + { + std::lock_guard lock(parent.mutex); + --value.second; + } + parent.condvar.notify_all(); + } + + template + bool waitIfNeed(Duration timeout) + { + return parent.waitIfNeed(value.first, timeout); + } + }; + + using Handle = std::shared_ptr; + + /** Зарегистрировать, что запрос с заданным приоритетом выполняется. + * Возвращается объект, в деструкторе которого, запись о запросе удаляется. + */ + Handle insert(Priority priority) + { + if (0 == priority) + return {}; + + std::lock_guard lock(mutex); + auto it = container.emplace(priority, 0).first; + ++it->second; + return std::make_shared(*this, *it); + } +}; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index e178e784936027f93a7742428c505943e0dfb699..8d799249f71d2b8af3fabc9d6f34586e1ae1f0a6 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -132,6 +132,9 @@ struct Settings \ /** Позволяет выбирать метод сжатия данных при записи */\ M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4) \ + \ + /** Приоритет запроса. 1 - самый высокий, больше - ниже; 0 - не использовать приоритеты. */ \ + M(SettingUInt64, priority, 0) \ /// Всевозможные ограничения на выполнение запроса. Limits limits; diff --git a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h index ec4509298b72909c94e0da1c78df8217db01b227..3cfdc8d89423076353c5ac6eb2c0861ab2c61e85 100644 --- a/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/include/DB/Storages/MergeTree/ActiveDataPartSet.h @@ -42,9 +42,6 @@ public: if (right != rhs.right) return right < rhs.right; - if (level != rhs.level) - return level < rhs.level; - return false; } @@ -53,7 +50,6 @@ public: { return left_month == rhs.left_month /// Куски за разные месяцы не объединяются && right_month == rhs.right_month - && level > rhs.level && left_date <= rhs.left_date && right_date >= rhs.right_date && left <= rhs.left diff --git a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h index 43b59d706272c4475f5e1e0bf62835e107ced5f0..57a55b9566ca42b003b0e14af959ebc9d3100f1c 100644 --- a/dbms/include/DB/Storages/StorageReplicatedMergeTree.h +++ b/dbms/include/DB/Storages/StorageReplicatedMergeTree.h @@ -134,7 +134,7 @@ public: void getStatus(Status & res, bool with_zk_fields = true); private: - void dropUnreplicatedPartition(const Field & partition, const Settings & settings); + void dropUnreplicatedPartition(const Field & partition, bool detach, const Settings & settings); friend class ReplicatedMergeTreeBlockOutputStream; friend class ReplicatedMergeTreeRestartingThread; diff --git a/dbms/src/DataStreams/BlockIO.cpp b/dbms/src/DataStreams/BlockIO.cpp new file mode 100644 index 0000000000000000000000000000000000000000..83b0fac54e22d94eb4c4c60bbd2b11f65880f394 --- /dev/null +++ b/dbms/src/DataStreams/BlockIO.cpp @@ -0,0 +1,9 @@ +#include +#include + +namespace DB +{ + +BlockIO::~BlockIO() = default; + +} diff --git a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp index e3a14875e2bed83e75b86b446e2658dfb65d7732..93216ea73f42a854ab62fa2a6e7dc628693c2fe3 100644 --- a/dbms/src/DataStreams/IProfilingBlockInputStream.cpp +++ b/dbms/src/DataStreams/IProfilingBlockInputStream.cpp @@ -5,6 +5,7 @@ #include #include +#include #include @@ -320,7 +321,7 @@ void IProfilingBlockInputStream::setProgressCallback(ProgressCallback callback) } -void IProfilingBlockInputStream::setProcessListElement(ProcessList::Element * elem) +void IProfilingBlockInputStream::setProcessListElement(ProcessListElement * elem) { process_list_elem = elem; diff --git a/dbms/src/Functions/FunctionsMiscellaneous.cpp b/dbms/src/Functions/FunctionsMiscellaneous.cpp index 59dd89481f19538392b0eb5f583a63facc32732e..e2a478933c177719059f8cd77548dd581c15fe0f 100644 --- a/dbms/src/Functions/FunctionsMiscellaneous.cpp +++ b/dbms/src/Functions/FunctionsMiscellaneous.cpp @@ -326,6 +326,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory) factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); + factory.registerFunction(); factory.registerFunction(); factory.registerFunction(); diff --git a/dbms/src/Interpreters/ProcessList.cpp b/dbms/src/Interpreters/ProcessList.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e15bef4002f284dc49464f84bd33b275d008b662 --- /dev/null +++ b/dbms/src/Interpreters/ProcessList.cpp @@ -0,0 +1,72 @@ +#include + +namespace DB +{ + + +ProcessList::EntryPtr ProcessList::insert( + const String & query_, const String & user_, const String & query_id_, const Poco::Net::IPAddress & ip_address_, + size_t max_memory_usage, size_t max_wait_milliseconds, bool replace_running_query, QueryPriorities::Priority priority) +{ + EntryPtr res; + + { + Poco::ScopedLock lock(mutex); + + if (max_size && cur_size >= max_size && (!max_wait_milliseconds || !have_space.tryWait(mutex, max_wait_milliseconds))) + throw Exception("Too much simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MUCH_SIMULTANEOUS_QUERIES); + + if (!query_id_.empty()) + { + UserToQueries::iterator queries = user_to_queries.find(user_); + + if (queries != user_to_queries.end()) + { + QueryToElement::iterator element = queries->second.find(query_id_); + if (element != queries->second.end()) + { + if (!replace_running_query) + throw Exception("Query with id = " + query_id_ + " is already running.", + ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING); + element->second->is_cancelled = true; + /// В случае если запрос отменяется, данные о нем удаляются из мапа в момент отмены. + queries->second.erase(element); + } + } + } + + ++cur_size; + + res.reset(new Entry(*this, cont.emplace(cont.end(), + query_, user_, query_id_, ip_address_, max_memory_usage, priorities.insert(priority)))); + + if (!query_id_.empty()) + user_to_queries[user_][query_id_] = &res->get(); + } + + return res; +} + + +ProcessListEntry::~ProcessListEntry() +{ + Poco::ScopedLock lock(parent.mutex); + + /// В случае, если запрос отменяется, данные о нем удаляются из мапа в момент отмены. + if (!it->is_cancelled && !it->query_id.empty()) + { + ProcessList::UserToQueries::iterator queries = parent.user_to_queries.find(it->user); + if (queries != parent.user_to_queries.end()) + { + ProcessList::QueryToElement::iterator element = queries->second.find(it->query_id); + if (element != queries->second.end()) + queries->second.erase(element); + } + } + + parent.cont.erase(it); + --parent.cur_size; + parent.have_space.signal(); +} + +} diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp index b8d79f5751ef31f2c45ba0a815921cd191062a39..91fdca2b2ad921bde889d214be95a5cafe3e9003 100644 --- a/dbms/src/Interpreters/executeQuery.cpp +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -14,6 +14,7 @@ #include #include +#include #include @@ -88,11 +89,14 @@ static std::tuple executeQueryImpl( ProcessList::EntryPtr process_list_entry; if (!internal && nullptr == typeid_cast(&*ast)) { + const Settings & settings = context.getSettingsRef(); + process_list_entry = context.getProcessList().insert( query, context.getUser(), context.getCurrentQueryId(), context.getIPAddress(), - context.getSettingsRef().limits.max_memory_usage, - context.getSettingsRef().queue_max_wait_ms.totalMilliseconds(), - context.getSettingsRef().replace_running_query); + settings.limits.max_memory_usage, + settings.queue_max_wait_ms.totalMilliseconds(), + settings.replace_running_query, + settings.priority); context.setProcessListElement(&process_list_entry->get()); } diff --git a/dbms/src/Parsers/ParserAlterQuery.cpp b/dbms/src/Parsers/ParserAlterQuery.cpp index f3dd0c02d9b7567bbcf2c815327d29aa91a97c4e..cf9338051847e288dc8b9a2420ea2281bc925e5b 100644 --- a/dbms/src/Parsers/ParserAlterQuery.cpp +++ b/dbms/src/Parsers/ParserAlterQuery.cpp @@ -150,6 +150,12 @@ bool ParserAlterQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_pa { ws.ignore(pos, end); + if (s_unreplicated.ignore(pos, end, max_parsed_pos, expected)) + { + params.unreplicated = true; + ws.ignore(pos, end); + } + if (!s_partition.ignore(pos, end, max_parsed_pos, expected)) return false; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 09124ad946fd2dddec47dc054d29a78600b8a987..2188ae8a15c302b0824cc6488756758c43c03bc6 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 88e9dbd49c543b62f59bf0a41d1b4901acb9313a..5bb155a3b064c1ffc4d641fb827adc362c9c1e79 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -340,8 +340,10 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( const Names & virt_columns, const Settings & settings) { - size_t min_marks_for_concurrent_read = (settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity; - size_t max_marks_to_use_cache = (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; + const size_t min_marks_for_concurrent_read = + (settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity; + const size_t max_marks_to_use_cache = + (settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity; /// На всякий случай перемешаем куски. std::random_shuffle(parts.begin(), parts.end()); @@ -354,12 +356,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( /// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back(). std::reverse(parts[i].ranges.begin(), parts[i].ranges.end()); - sum_marks_in_parts[i] = 0; - for (size_t j = 0; j < parts[i].ranges.size(); ++j) - { - MarkRange & range = parts[i].ranges[j]; + for (const auto & range : parts[i].ranges) sum_marks_in_parts[i] += range.end - range.begin; - } + sum_marks += sum_marks_in_parts[i]; } @@ -370,7 +369,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( if (sum_marks > 0) { - size_t min_marks_per_thread = (sum_marks - 1) / threads + 1; + const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1; for (size_t i = 0; i < threads && !parts.empty(); ++i) { @@ -415,10 +414,11 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( throw Exception("Unexpected end of ranges while spreading marks among threads", ErrorCodes::LOGICAL_ERROR); MarkRange & range = part.ranges.back(); - size_t marks_in_range = range.end - range.begin; - size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); - ranges_to_get_from_part.push_back(MarkRange(range.begin, range.begin + marks_to_get_from_range)); + const size_t marks_in_range = range.end - range.begin; + const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks); + + ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range); range.begin += marks_to_get_from_range; marks_in_part -= marks_to_get_from_range; need_marks -= marks_to_get_from_range; diff --git a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp index c8e8a84f3fe5c9dd14c55e53fe659ef93d954a24..3bd49d17231cfcee3522ff799f31c25b88062820 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartChecker.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB @@ -115,6 +116,12 @@ struct Stream readIntBinary(mrk_mark.offset_in_compressed_file, mrk_hashing_buf); readIntBinary(mrk_mark.offset_in_decompressed_block, mrk_hashing_buf); + /// На всякий случай, сохраним смещение в файле и размер предыдущего блока. + SCOPE_EXIT( + prev_offset_in_compressed_file = mrk_mark.offset_in_compressed_file; + prev_buffer_size = uncompressed_hashing_buf.buffer().size(); + ); + bool has_alternative_mark = false; MarkInCompressedFile alternative_data_mark; MarkInCompressedFile data_mark; @@ -138,6 +145,18 @@ struct Stream if (uncompressed_hashing_buf.eof()) return; } + else if (uncompressed_hashing_buf.offset() == 0) + { + /// Восстановим засечку на конец предыдущего блока по сохраненным данным + has_alternative_mark = true; + alternative_data_mark.offset_in_compressed_file = prev_offset_in_compressed_file; + alternative_data_mark.offset_in_decompressed_block = prev_buffer_size; + + if (mrk_mark == alternative_data_mark) + return; + } + + std::cout << "mrk_mark " << mrk_mark.offset_in_compressed_file << ' ' << mrk_mark.offset_in_decompressed_block << std::endl; data_mark.offset_in_compressed_file = compressed_hashing_buf.count() - uncompressing_buf.getSizeCompressed(); data_mark.offset_in_decompressed_block = uncompressed_hashing_buf.offset(); @@ -161,6 +180,10 @@ struct Stream checksums.files[name + ".mrk"] = MergeTreeData::DataPart::Checksums::Checksum( mrk_hashing_buf.count(), mrk_hashing_buf.getHash()); } + +private: + size_t prev_offset_in_compressed_file{}; + size_t prev_buffer_size{}; }; /// Возвращает количество строк. Добавляет в checksums чексуммы всех файлов столбца. diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 1cd57a70216a051fdf2af3be20a937c6d2fc6a8e..b9fb09b4e42645db81a636a7f30acdcbf5f6ce72 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2224,7 +2224,7 @@ static String getFakePartNameForDrop(const String & month_name, UInt64 left, UIn } -void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partition, const Settings & settings) +void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partition, const bool detach, const Settings & settings) { if (!unreplicated_data) return; @@ -2247,10 +2247,13 @@ void StorageReplicatedMergeTree::dropUnreplicatedPartition(const Field & partiti LOG_DEBUG(log, "Removing unreplicated part " << part->name); ++removed_parts; - unreplicated_data->replaceParts({part}, {}, false); + if (detach) + unreplicated_data->renameAndDetachPart(part, ""); + else + unreplicated_data->replaceParts({part}, {}, false); } - LOG_INFO(log, "Removed " << removed_parts << " unreplicated parts inside " << apply_visitor(FieldVisitorToString(), partition) << "."); + LOG_INFO(log, (detach ? "Detached " : "Removed ") << removed_parts << " unreplicated parts inside " << apply_visitor(FieldVisitorToString(), partition) << "."); } @@ -2258,13 +2261,7 @@ void StorageReplicatedMergeTree::dropPartition(const Field & field, bool detach, { if (unreplicated) { - if (detach) - throw Exception{ - "DETACH UNREPLICATED PATITION not supported", - ErrorCodes::LOGICAL_ERROR - }; - - dropUnreplicatedPartition(field, settings); + dropUnreplicatedPartition(field, detach, settings); return; } diff --git a/dbms/src/Storages/StorageSystemProcesses.cpp b/dbms/src/Storages/StorageSystemProcesses.cpp index 4baa62436a2f2fe3961389f7a7bff17b6df7f635..ed083c6d9da8256d1c2c1b55d225b62581fd80b5 100644 --- a/dbms/src/Storages/StorageSystemProcesses.cpp +++ b/dbms/src/Storages/StorageSystemProcesses.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include diff --git a/dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh b/dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh index 4c7834d605b2254f8d6a9f73ff8e9c046e7300f7..788f2bda147409c762dd201972d903afd7d3d7fb 100755 --- a/dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh +++ b/dbms/tests/queries/0_stateless/00177_inserts_through_http_parts.sh @@ -1,11 +1,11 @@ #!/bin/bash -curl 'http://localhost:8123/?query=DROP+TABLE' -d 'IF EXISTS test.insert' -curl 'http://localhost:8123/?query=CREATE' -d 'TABLE test.insert (x UInt8) ENGINE = Memory' -curl 'http://localhost:8123/' -d 'INSERT INTO test.insert VALUES (1),(2)' -curl 'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES' -d '(3),(4)' -curl 'http://localhost:8123/?query=INSERT+INTO+test.insert' -d 'VALUES (5),(6)' -curl 'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(7)' -d ',(8)' -curl 'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(9),(10)' -d ' ' -curl 'http://localhost:8123/' -d 'SELECT x FROM test.insert ORDER BY x' -curl 'http://localhost:8123/?query=DROP+TABLE' -d 'test.insert' +curl -sS 'http://localhost:8123/?query=DROP+TABLE' -d 'IF EXISTS test.insert' +curl -sS 'http://localhost:8123/?query=CREATE' -d 'TABLE test.insert (x UInt8) ENGINE = Memory' +curl -sS 'http://localhost:8123/' -d 'INSERT INTO test.insert VALUES (1),(2)' +curl -sS 'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES' -d '(3),(4)' +curl -sS 'http://localhost:8123/?query=INSERT+INTO+test.insert' -d 'VALUES (5),(6)' +curl -sS 'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(7)' -d ',(8)' +curl -sS 'http://localhost:8123/?query=INSERT+INTO+test.insert+VALUES+(9),(10)' -d ' ' +curl -sS 'http://localhost:8123/' -d 'SELECT x FROM test.insert ORDER BY x' +curl -sS 'http://localhost:8123/?query=DROP+TABLE' -d 'test.insert' diff --git a/dbms/tests/queries/0_stateless/00178_function_replicate.reference b/dbms/tests/queries/0_stateless/00178_function_replicate.reference new file mode 100644 index 0000000000000000000000000000000000000000..4fdec92dcf38938e50407807c55db71f1b95a4fa --- /dev/null +++ b/dbms/tests/queries/0_stateless/00178_function_replicate.reference @@ -0,0 +1,10 @@ +0 [] [] [] [] [] +1 [0] [1] ['1'] [[0]] [['0']] +2 [0,1] [2,2] ['2','2'] [[0,1],[0,1]] [['0','1'],['0','1']] +3 [0,1,2] [3,3,3] ['3','3','3'] [[0,1,2],[0,1,2],[0,1,2]] [['0','1','2'],['0','1','2'],['0','1','2']] +4 [0,1,2,3] [4,4,4,4] ['4','4','4','4'] [[0,1,2,3],[0,1,2,3],[0,1,2,3],[0,1,2,3]] [['0','1','2','3'],['0','1','2','3'],['0','1','2','3'],['0','1','2','3']] +5 [0,1,2,3,4] [5,5,5,5,5] ['5','5','5','5','5'] [[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4],[0,1,2,3,4]] [['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4'],['0','1','2','3','4']] +6 [0,1,2,3,4,5] [6,6,6,6,6,6] ['6','6','6','6','6','6'] [[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5],[0,1,2,3,4,5]] [['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5'],['0','1','2','3','4','5']] +7 [0,1,2,3,4,5,6] [7,7,7,7,7,7,7] ['7','7','7','7','7','7','7'] [[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6],[0,1,2,3,4,5,6]] [['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6'],['0','1','2','3','4','5','6']] +8 [0,1,2,3,4,5,6,7] [8,8,8,8,8,8,8,8] ['8','8','8','8','8','8','8','8'] [[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7],[0,1,2,3,4,5,6,7]] [['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7'],['0','1','2','3','4','5','6','7']] +9 [0,1,2,3,4,5,6,7,8] [9,9,9,9,9,9,9,9,9] ['9','9','9','9','9','9','9','9','9'] [[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8],[0,1,2,3,4,5,6,7,8]] [['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8'],['0','1','2','3','4','5','6','7','8']] diff --git a/dbms/tests/queries/0_stateless/00178_function_replicate.sql b/dbms/tests/queries/0_stateless/00178_function_replicate.sql new file mode 100644 index 0000000000000000000000000000000000000000..13ce1c243645d227cc7833727043ccbee43c8ad1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00178_function_replicate.sql @@ -0,0 +1,9 @@ +SELECT + number, + range(number) AS arr, + replicate(number, arr), + replicate(toString(number), arr), + replicate(range(number), arr), + replicate(arrayMap(x -> toString(x), range(number)), arr) +FROM system.numbers +LIMIT 10; diff --git a/dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.reference b/dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.reference new file mode 100644 index 0000000000000000000000000000000000000000..eb4ff6138fd27e6db85a38a18a7c1a3a772985b7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.reference @@ -0,0 +1,5 @@ +[0] +[0,1,2] +[0,1,2,3,4] +[0,1,2,3,4,5,6] +[0,1,2,3,4,5,6,7,8] diff --git a/dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.sql b/dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.sql new file mode 100644 index 0000000000000000000000000000000000000000..b5eefa57a9f406d8d803421d99e5626321b58605 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00179_lambdas_with_common_expressions_and_filter.sql @@ -0,0 +1,3 @@ +SELECT arrayMap(x -> number != -1 ? x : 0, arr) +FROM (SELECT number, range(number) AS arr FROM system.numbers LIMIT 10) +WHERE number % 2 = 1 AND arrayExists(x -> number != -1, arr);