From 7cc2871e30c5f9cdc3b4fbf45b7df5ff36bad55b Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 1 Feb 2013 19:02:04 +0000 Subject: [PATCH] dbms: allowed to send custom settings with query; sending current settings to remote servers for distributed query processing [#CONV-2944]. --- dbms/include/DB/Client/Connection.h | 6 +- dbms/include/DB/Core/Defines.h | 2 + dbms/include/DB/Core/Protocol.h | 3 +- .../DB/DataStreams/RemoteBlockInputStream.h | 26 ++++- dbms/include/DB/Interpreters/Context.h | 1 + dbms/include/DB/Interpreters/Limits.h | 100 +++++++++++++++++- dbms/include/DB/Interpreters/Settings.h | 69 ++++++++++++ dbms/include/DB/Storages/IStorage.h | 7 ++ dbms/include/DB/Storages/StorageDistributed.h | 1 + dbms/include/DB/Storages/StorageLog.h | 1 + dbms/include/DB/Storages/StorageMemory.h | 1 + dbms/include/DB/Storages/StorageMerge.h | 1 + dbms/include/DB/Storages/StorageMergeTree.h | 1 + .../DB/Storages/StorageSystemDatabases.h | 1 + .../DB/Storages/StorageSystemNumbers.h | 1 + dbms/include/DB/Storages/StorageSystemOne.h | 1 + .../include/DB/Storages/StorageSystemTables.h | 1 + dbms/include/DB/Storages/StorageTinyLog.h | 1 + dbms/src/Client/Connection.cpp | 12 ++- dbms/src/DataStreams/tests/union_stream2.cpp | 2 +- .../Interpreters/InterpreterSelectQuery.cpp | 2 +- dbms/src/Server/HTTPHandler.h | 1 - dbms/src/Server/TCPHandler.cpp | 32 ++++-- dbms/src/Server/TCPHandler.h | 9 +- dbms/src/Storages/StorageDistributed.cpp | 3 +- dbms/src/Storages/StorageLog.cpp | 1 + dbms/src/Storages/StorageMemory.cpp | 1 + dbms/src/Storages/StorageMerge.cpp | 2 + dbms/src/Storages/StorageMergeTree.cpp | 1 + dbms/src/Storages/StorageSystemDatabases.cpp | 3 +- dbms/src/Storages/StorageSystemNumbers.cpp | 3 +- dbms/src/Storages/StorageSystemOne.cpp | 3 +- dbms/src/Storages/StorageSystemTables.cpp | 3 +- dbms/src/Storages/StorageTinyLog.cpp | 1 + 34 files changed, 268 insertions(+), 35 deletions(-) diff --git a/dbms/include/DB/Client/Connection.h b/dbms/include/DB/Client/Connection.h index 9e9304693a..d75162bbdf 100644 --- a/dbms/include/DB/Client/Connection.h +++ b/dbms/include/DB/Client/Connection.h @@ -18,6 +18,8 @@ #include #include +#include + namespace DB { @@ -72,7 +74,9 @@ public: String getServerAddress() const; /// query_id не должен быть равен 0. - void sendQuery(const String & query, UInt64 query_id_ = 1, UInt64 stage = QueryProcessingStage::Complete); + void sendQuery(const String & query, UInt64 query_id_ = 1, UInt64 stage = QueryProcessingStage::Complete, + const Settings * settings = NULL); + void sendCancel(); void sendData(const Block & block); diff --git a/dbms/include/DB/Core/Defines.h b/dbms/include/DB/Core/Defines.h index fe84b173eb..d8b1fd8f79 100644 --- a/dbms/include/DB/Core/Defines.h +++ b/dbms/include/DB/Core/Defines.h @@ -22,5 +22,7 @@ #define DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE 128 #define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3 +#define DBMS_MIN_REVISION_WITH_PER_QUERY_SETTINGS 28558 + /// Суффикс имени для столбца, содержащего смещения массива. #define ARRAY_SIZES_COLUMN_NAME_SUFFIX ".size" diff --git a/dbms/include/DB/Core/Protocol.h b/dbms/include/DB/Core/Protocol.h index e462abce66..3c2663242c 100644 --- a/dbms/include/DB/Core/Protocol.h +++ b/dbms/include/DB/Core/Protocol.h @@ -65,7 +65,8 @@ namespace Protocol enum Enum { Hello = 0, /// Имя, версия, ревизия, БД по-умолчанию. - Query = 1, /** Идентификатор запроса, информация, до какой стадии исполнять запрос, + Query = 1, /** Идентификатор запроса, настройки на отдельный запрос, + * информация, до какой стадии исполнять запрос, * использовать ли сжатие, текст запроса (без данных для INSERT-а). */ Data = 2, /// Блок данных со сжатием или без. diff --git a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h index ed737ef35f..ba365b76cb 100644 --- a/dbms/include/DB/DataStreams/RemoteBlockInputStream.h +++ b/dbms/include/DB/DataStreams/RemoteBlockInputStream.h @@ -15,21 +15,35 @@ namespace DB class RemoteBlockInputStream : public IProfilingBlockInputStream { public: - RemoteBlockInputStream(Connection & connection_, const String & query_, + RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) : connection(connection_), query(query_), stage(stage_), sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")")) { + if (settings_) + { + send_settings = true; + settings = *settings_; + } + else + send_settings = false; } /// Захватывает владение соединением из пула. - RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, + RemoteBlockInputStream(ConnectionPool::Entry pool_entry_, const String & query_, const Settings * settings_, QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete) : pool_entry(pool_entry_), connection(*pool_entry), query(query_), stage(stage_), sent_query(false), finished(false), was_cancelled(false), got_exception_from_server(false), log(&Logger::get("RemoteBlockInputStream (" + connection.getServerAddress() + ")")) { + if (settings_) + { + send_settings = true; + settings = *settings_; + } + else + send_settings = false; } @@ -38,8 +52,8 @@ public: BlockInputStreamPtr clone() { return pool_entry.isNull() - ? new RemoteBlockInputStream(connection, query, stage) - : new RemoteBlockInputStream(pool_entry, query, stage); + ? new RemoteBlockInputStream(connection, query, send_settings ? &settings : NULL, stage) + : new RemoteBlockInputStream(pool_entry, query, send_settings ? &settings : NULL, stage); } @@ -127,7 +141,7 @@ protected: { if (!sent_query) { - connection.sendQuery(query, 1, stage); + connection.sendQuery(query, 1, stage, send_settings ? &settings : NULL); sent_query = true; } @@ -173,6 +187,8 @@ private: Connection & connection; const String query; + bool send_settings; + Settings settings; QueryProcessingStage::Enum stage; /// Отправили запрос (это делается перед получением первого блока). diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index 9bebca63ab..0ee2ed39db 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -130,6 +130,7 @@ public: void setGlobalContext(Context & context_) { global_context = &context_; } const Settings & getSettingsRef() const { return settings; }; + Settings & getSettingsRef() { return settings; }; }; diff --git a/dbms/include/DB/Interpreters/Limits.h b/dbms/include/DB/Interpreters/Limits.h index bfa148a74e..a85b5d7d84 100644 --- a/dbms/include/DB/Interpreters/Limits.h +++ b/dbms/include/DB/Interpreters/Limits.h @@ -4,6 +4,11 @@ #include #include +#include +#include +#include +#include + namespace DB { @@ -19,13 +24,23 @@ struct Limits /// Что делать, если ограничение превышено. enum OverflowMode { - THROW, /// Кинуть исключение. - BREAK, /// Прервать выполнение запроса, вернуть что есть. - ANY, /** Только для GROUP BY: не добавлять новые строки в набор, - * но продолжать агрегировать для ключей, успевших попасть в набор. - */ + THROW = 0, /// Кинуть исключение. + BREAK = 1, /// Прервать выполнение запроса, вернуть что есть. + ANY = 2, /** Только для GROUP BY: не добавлять новые строки в набор, + * но продолжать агрегировать для ключей, успевших попасть в набор. + */ }; + static String toString(OverflowMode mode) + { + const char * strings[] = { "throw", "break", "any" }; + + if (mode < THROW || mode > ANY) + throw Exception("Unknown overflow mode", ErrorCodes::UNKNOWN_OVERFLOW_MODE); + + return strings[mode]; + } + /** Ограничения на чтение из самых "глубоких" источников. * То есть, только в самом глубоком подзапросе. * При чтении с удалённого сервера, проверяется только на удалённом сервере. @@ -121,7 +136,82 @@ struct Limits return true; } + /// Установить настройку по имени. Прочитать сериализованное значение из буфера. + bool trySet(const String & name, ReadBuffer & buf) + { + if ( name == "max_rows_to_read" + || name == "max_bytes_to_read" + || name == "max_rows_to_group_by" + || name == "max_rows_to_sort" + || name == "max_bytes_to_sort" + || name == "max_result_rows" + || name == "max_result_bytes" + || name == "max_execution_time" + || name == "min_execution_speed" + || name == "timeout_before_checking_execution_speed" + || name == "max_columns_to_read" + || name == "max_temporary_columns" + || name == "max_temporary_non_const_columns" + || name == "max_subquery_depth" + || name == "max_pipeline_depth" + || name == "max_ast_depth" + || name == "max_ast_elements" + || name == "readonly") + { + UInt64 value = 0; + readVarUInt(value, buf); + + if (!trySet(name, value)) + throw Exception("Logical error: unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); + } + else if (name == "read_overflow_mode" + || name == "group_by_overflow_mode" + || name == "sort_overflow_mode" + || name == "result_overflow_mode" + || name == "timeout_overflow_mode") + { + String value; + readBinary(value, buf); + + if (!trySet(name, value)) + throw Exception("Logical error: unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); + } + else + return false; + + return true; + } + private: + friend class Settings; + + /// Записать все настройки в буфер. (В отличие от соответствующего метода в Settings, пустая строка на конце не пишется). + void serialize(WriteBuffer & buf) const + { + writeStringBinary("max_rows_to_read", buf); writeVarUInt(max_rows_to_read, buf); + writeStringBinary("max_bytes_to_read", buf); writeVarUInt(max_bytes_to_read, buf); + writeStringBinary("read_overflow_mode", buf); writeStringBinary(toString(read_overflow_mode), buf); + writeStringBinary("max_rows_to_group_by", buf); writeVarUInt(max_rows_to_group_by, buf); + writeStringBinary("group_by_overflow_mode", buf); writeStringBinary(toString(group_by_overflow_mode), buf); + writeStringBinary("max_rows_to_sort", buf); writeVarUInt(max_rows_to_sort, buf); + writeStringBinary("max_bytes_to_sort", buf); writeVarUInt(max_bytes_to_sort, buf); + writeStringBinary("sort_overflow_mode", buf); writeStringBinary(toString(sort_overflow_mode), buf); + writeStringBinary("max_result_rows", buf); writeVarUInt(max_result_rows, buf); + writeStringBinary("max_result_bytes", buf); writeVarUInt(max_result_bytes, buf); + writeStringBinary("max_execution_time", buf); writeVarUInt(max_execution_time.totalSeconds(), buf); + writeStringBinary("timeout_overflow_mode", buf); writeStringBinary(toString(timeout_overflow_mode), buf); + writeStringBinary("min_execution_speed", buf); writeVarUInt(min_execution_speed, buf); + writeStringBinary("timeout_before_checking_execution_speed", buf); writeVarUInt(timeout_before_checking_execution_speed.totalSeconds(), buf); + writeStringBinary("max_columns_to_read", buf); writeVarUInt(max_columns_to_read, buf); + writeStringBinary("max_temporary_columns", buf); writeVarUInt(max_temporary_columns, buf); + writeStringBinary("max_temporary_non_const_columns", buf); writeVarUInt(max_temporary_non_const_columns, buf); + writeStringBinary("max_subquery_depth", buf); writeVarUInt(max_subquery_depth, buf); + writeStringBinary("max_pipeline_depth", buf); writeVarUInt(max_pipeline_depth, buf); + writeStringBinary("max_ast_depth", buf); writeVarUInt(max_ast_depth, buf); + writeStringBinary("max_ast_elements", buf); writeVarUInt(max_ast_elements, buf); + writeStringBinary("readonly", buf); writeVarUInt(readonly, buf); + } + OverflowMode getOverflowModeForGroupBy(const String & s) { if (s == "throw") return THROW; diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 24d1df1331..f7d61783b0 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -4,6 +4,11 @@ #include #include +#include +#include +#include +#include + #include @@ -77,6 +82,70 @@ struct Settings else if (!limits.trySet(name, value)) throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); } + + /// Установить настройку по имени. Прочитать сериализованное значение из буфера. + void set(const String & name, ReadBuffer & buf) + { + if ( name == "max_block_size" + || name == "max_threads" + || name == "max_query_size" + || name == "asynchronous" + || name == "interactive_delay" + || name == "connect_timeout" + || name == "receive_timeout" + || name == "send_timeout" + || name == "poll_interval" + || name == "connect_timeout_with_failover_ms" + || name == "max_distributed_connections" + || name == "distributed_connections_pool_size" + || name == "connections_with_failover_max_tries") + { + UInt64 value = 0; + readVarUInt(value, buf); + set(name, value); + } + else if (!limits.trySet(name, buf)) + throw Exception("Unknown setting " + name, ErrorCodes::UNKNOWN_SETTING); + } + + /// Прочитать настройки из буфера. Они записаны как набор name-value пар, идущих подряд, заканчивающихся пустым name. + void deserialize(ReadBuffer & buf) + { + while (true) + { + String name; + readBinary(name, buf); + + /// Пустая строка - это маркер конца настроек. + if (name.empty()) + break; + + set(name, buf); + } + } + + /// Записать все настройки в буфер. + void serialize(WriteBuffer & buf) const + { + writeStringBinary("max_block_size", buf); writeVarUInt(max_block_size, buf); + writeStringBinary("max_threads", buf); writeVarUInt(max_threads, buf); + writeStringBinary("max_query_size", buf); writeVarUInt(max_query_size, buf); + writeStringBinary("asynchronous", buf); writeVarUInt(asynchronous, buf); + writeStringBinary("interactive_delay", buf); writeVarUInt(interactive_delay, buf); + writeStringBinary("connect_timeout", buf); writeVarUInt(connect_timeout.totalSeconds(), buf); + writeStringBinary("receive_timeout", buf); writeVarUInt(receive_timeout.totalSeconds(), buf); + writeStringBinary("send_timeout", buf); writeVarUInt(send_timeout.totalSeconds(), buf); + writeStringBinary("poll_interval", buf); writeVarUInt(poll_interval, buf); + writeStringBinary("connect_timeout_with_failover_ms", buf); writeVarUInt(connect_timeout_with_failover_ms.totalMilliseconds(), buf); + writeStringBinary("max_distributed_connections", buf); writeVarUInt(max_distributed_connections, buf); + writeStringBinary("distributed_connections_pool_size", buf); writeVarUInt(distributed_connections_pool_size, buf); + writeStringBinary("connections_with_failover_max_tries", buf); writeVarUInt(connections_with_failover_max_tries, buf); + + limits.serialize(buf); + + /// Пустая строка - это маркер конца настроек. + writeStringBinary("", buf); + } }; diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index b54ff37844..0dba06191d 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -11,6 +11,8 @@ #include +#include + #include @@ -64,12 +66,17 @@ public: * (Обычно функция только читает столбцы из списка, но в других случаях, * например, запрос может быть частично обработан на удалённом сервере.) * + * settings - настройки на один запрос. + * Обычно Storage не заботится об этих настройках, так как они применяются в интерпретаторе. + * Но, например, при распределённой обработке запроса, настройки передаются на удалённый сервер. + * * threads - рекомендация, сколько потоков возвращать, * если хранилище может возвращать разное количество потоков. */ virtual BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1) diff --git a/dbms/include/DB/Storages/StorageDistributed.h b/dbms/include/DB/Storages/StorageDistributed.h index 68320e4549..ed2ccedb20 100644 --- a/dbms/include/DB/Storages/StorageDistributed.h +++ b/dbms/include/DB/Storages/StorageDistributed.h @@ -54,6 +54,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageLog.h b/dbms/include/DB/Storages/StorageLog.h index b361bc68c4..e2206dc243 100644 --- a/dbms/include/DB/Storages/StorageLog.h +++ b/dbms/include/DB/Storages/StorageLog.h @@ -128,6 +128,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageMemory.h b/dbms/include/DB/Storages/StorageMemory.h index 45376b9514..953d85d1de 100644 --- a/dbms/include/DB/Storages/StorageMemory.h +++ b/dbms/include/DB/Storages/StorageMemory.h @@ -60,6 +60,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageMerge.h b/dbms/include/DB/Storages/StorageMerge.h index 1d06d46216..21ad43b9a3 100644 --- a/dbms/include/DB/Storages/StorageMerge.h +++ b/dbms/include/DB/Storages/StorageMerge.h @@ -31,6 +31,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageMergeTree.h b/dbms/include/DB/Storages/StorageMergeTree.h index 2969a33f2c..9a85638b57 100644 --- a/dbms/include/DB/Storages/StorageMergeTree.h +++ b/dbms/include/DB/Storages/StorageMergeTree.h @@ -128,6 +128,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageSystemDatabases.h b/dbms/include/DB/Storages/StorageSystemDatabases.h index 019a52c3fc..c5dc1e0925 100644 --- a/dbms/include/DB/Storages/StorageSystemDatabases.h +++ b/dbms/include/DB/Storages/StorageSystemDatabases.h @@ -28,6 +28,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageSystemNumbers.h b/dbms/include/DB/Storages/StorageSystemNumbers.h index e9f3371b5d..b74ad41197 100644 --- a/dbms/include/DB/Storages/StorageSystemNumbers.h +++ b/dbms/include/DB/Storages/StorageSystemNumbers.h @@ -43,6 +43,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageSystemOne.h b/dbms/include/DB/Storages/StorageSystemOne.h index b31b253435..b97720808a 100644 --- a/dbms/include/DB/Storages/StorageSystemOne.h +++ b/dbms/include/DB/Storages/StorageSystemOne.h @@ -25,6 +25,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageSystemTables.h b/dbms/include/DB/Storages/StorageSystemTables.h index 6ad765f23b..252efb3eb6 100644 --- a/dbms/include/DB/Storages/StorageSystemTables.h +++ b/dbms/include/DB/Storages/StorageSystemTables.h @@ -28,6 +28,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/include/DB/Storages/StorageTinyLog.h b/dbms/include/DB/Storages/StorageTinyLog.h index 738b3c5b08..66f57e9052 100644 --- a/dbms/include/DB/Storages/StorageTinyLog.h +++ b/dbms/include/DB/Storages/StorageTinyLog.h @@ -104,6 +104,7 @@ public: BlockInputStreams read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1); diff --git a/dbms/src/Client/Connection.cpp b/dbms/src/Client/Connection.cpp index c616703d55..d6fbed37c8 100644 --- a/dbms/src/Client/Connection.cpp +++ b/dbms/src/Client/Connection.cpp @@ -185,7 +185,7 @@ bool Connection::ping() } -void Connection::sendQuery(const String & query, UInt64 query_id_, UInt64 stage) +void Connection::sendQuery(const String & query, UInt64 query_id_, UInt64 stage, const Settings * settings) { forceConnected(); @@ -195,6 +195,16 @@ void Connection::sendQuery(const String & query, UInt64 query_id_, UInt64 stage) writeVarUInt(Protocol::Client::Query, *out); writeIntBinary(query_id, *out); + + /// Настройки на отдельный запрос. + if (server_revision >= DBMS_MIN_REVISION_WITH_PER_QUERY_SETTINGS) + { + if (settings) + settings->serialize(*out); + else + writeStringBinary("", *out); + } + writeVarUInt(stage, *out); writeVarUInt(compression, *out); diff --git a/dbms/src/DataStreams/tests/union_stream2.cpp b/dbms/src/DataStreams/tests/union_stream2.cpp index 94904d0dd8..e9af790a9e 100644 --- a/dbms/src/DataStreams/tests/union_stream2.cpp +++ b/dbms/src/DataStreams/tests/union_stream2.cpp @@ -40,7 +40,7 @@ int main(int argc, char ** argv) DB::StoragePtr table = context.getTable("default", "hits6"); DB::QueryProcessingStage::Enum stage; - DB::BlockInputStreams streams = table->read(column_names, NULL, stage, settings.max_block_size, settings.max_threads); + DB::BlockInputStreams streams = table->read(column_names, NULL, settings, stage, settings.max_block_size, settings.max_threads); for (size_t i = 0, size = streams.size(); i < size; ++i) streams[i] = new DB::AsynchronousBlockInputStream(streams[i]); diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index bd8ea0028b..298214e21e 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -283,7 +283,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(BlockInpu /// Инициализируем изначальные потоки данных, на которые накладываются преобразования запроса. Таблица или подзапрос? if (!query.table || !dynamic_cast(&*query.table)) - streams = table->read(required_columns, query_ptr, from_stage, settings.max_block_size, settings.max_threads); + streams = table->read(required_columns, query_ptr, settings, from_stage, settings.max_block_size, settings.max_threads); else streams.push_back(maybeAsynchronous(interpreter_subquery->execute(), settings.asynchronous)); diff --git a/dbms/src/Server/HTTPHandler.h b/dbms/src/Server/HTTPHandler.h index 4f7498a2a5..ccf129b6b1 100644 --- a/dbms/src/Server/HTTPHandler.h +++ b/dbms/src/Server/HTTPHandler.h @@ -15,7 +15,6 @@ public: : server(server_) , log(&Logger::get("HTTPHandler")) { - LOG_TRACE(log, "In constructor."); } void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response); diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 740ed60444..ac5bf3f698 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -76,8 +76,13 @@ void TCPHandler::runImpl() SharedPtr exception; try - { - /// Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.) + { + /// Восстанавливаем контекст запроса. + query_context = connection_context; + + /** Пакет Query. (Также, если пришёл пакет Ping - обрабатываем его и продолжаем ждать Query.) + * Могут прийти настройки на отдельный запрос, которые модифицируют query_context. + */ receivePacket(); /// Обрабатываем Query @@ -207,7 +212,7 @@ void TCPHandler::processOrdinaryQuery() while (true) { - if (async_in.poll(connection_context.getSettingsRef().interactive_delay / 1000)) + if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000)) { block = async_in.read(); break; @@ -254,7 +259,6 @@ void TCPHandler::receiveHello() String client_name; UInt64 client_version_major = 0; UInt64 client_version_minor = 0; - UInt64 client_revision = 0; readVarUInt(packet_type, *in); if (packet_type != Protocol::Client::Hello) @@ -335,6 +339,12 @@ void TCPHandler::receiveQuery() readIntBinary(state.query_id, *in); + /// Настройки на отдельный запрос. + if (client_revision >= DBMS_MIN_REVISION_WITH_PER_QUERY_SETTINGS) + { + query_context.getSettingsRef().deserialize(*in); + } + readVarUInt(stage, *in); state.stage = QueryProcessingStage::Enum(stage); @@ -343,7 +353,7 @@ void TCPHandler::receiveQuery() readStringBinary(state.query, *in); - state.io = executeQuery(state.query, connection_context, state.stage); + state.io = executeQuery(state.query, query_context, state.stage); } @@ -356,12 +366,12 @@ bool TCPHandler::receiveData() else state.maybe_compressed_in = in; - state.block_in = connection_context.getFormatFactory().getInput( + state.block_in = query_context.getFormatFactory().getInput( "Native", *state.maybe_compressed_in, state.io.out_sample, - connection_context.getSettingsRef().max_block_size, - connection_context.getDataTypeFactory()); + query_context.getSettingsRef().max_block_size, + query_context.getDataTypeFactory()); } /// Прочитать из сети один блок и засунуть его в state.io.out (данные для INSERT-а) @@ -381,7 +391,7 @@ bool TCPHandler::isQueryCancelled() if (state.is_cancelled || state.sent_all_data) return true; - if (after_check_cancelled.elapsed() / 1000 < connection_context.getSettingsRef().interactive_delay) + if (after_check_cancelled.elapsed() / 1000 < query_context.getSettingsRef().interactive_delay) return false; after_check_cancelled.restart(); @@ -421,7 +431,7 @@ void TCPHandler::sendData(Block & block) else state.maybe_compressed_out = out; - state.block_out = connection_context.getFormatFactory().getOutput( + state.block_out = query_context.getFormatFactory().getOutput( "Native", *state.maybe_compressed_out, state.io.in_sample); @@ -466,7 +476,7 @@ void TCPHandler::sendProgress(size_t rows, size_t bytes) if (state.sent_all_data) return; - if (after_send_progress.elapsed() / 1000 < connection_context.getSettingsRef().interactive_delay) + if (after_send_progress.elapsed() / 1000 < query_context.getSettingsRef().interactive_delay) return; after_send_progress.restart(); diff --git a/dbms/src/Server/TCPHandler.h b/dbms/src/Server/TCPHandler.h index 7811e591ff..b98130f932 100644 --- a/dbms/src/Server/TCPHandler.h +++ b/dbms/src/Server/TCPHandler.h @@ -69,10 +69,10 @@ class TCPHandler : public Poco::Net::TCPServerConnection { public: TCPHandler(Server & server_, const Poco::Net::StreamSocket & socket_) - : Poco::Net::TCPServerConnection(socket_), server(server_) - , log(&Logger::get("TCPHandler")), connection_context(server.global_context) + : Poco::Net::TCPServerConnection(socket_), server(server_), + log(&Logger::get("TCPHandler")), client_revision(0), + connection_context(server.global_context), query_context(connection_context) { - LOG_TRACE(log, "In constructor."); } void run(); @@ -81,7 +81,10 @@ private: Server & server; Logger * log; + UInt64 client_revision; + Context connection_context; + Context query_context; SharedPtr in; SharedPtr out; diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index a3aa8c69f2..67342c6571 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -58,6 +58,7 @@ StorageDistributed::StorageDistributed( BlockInputStreams StorageDistributed::read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) @@ -79,7 +80,7 @@ BlockInputStreams StorageDistributed::read( BlockInputStreams res; for (ConnectionPools::iterator it = pools.begin(); it != pools.end(); ++it) - res.push_back(new RemoteBlockInputStream((*it)->get(), modified_query, processed_stage)); + res.push_back(new RemoteBlockInputStream((*it)->get(), modified_query, &settings, processed_stage)); return res; } diff --git a/dbms/src/Storages/StorageLog.cpp b/dbms/src/Storages/StorageLog.cpp index e244aafdf3..821af20ea1 100644 --- a/dbms/src/Storages/StorageLog.cpp +++ b/dbms/src/Storages/StorageLog.cpp @@ -302,6 +302,7 @@ void StorageLog::rename(const String & new_path_to_db, const String & new_name) BlockInputStreams StorageLog::read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) diff --git a/dbms/src/Storages/StorageMemory.cpp b/dbms/src/Storages/StorageMemory.cpp index 860d35eb16..9b6acb42d6 100644 --- a/dbms/src/Storages/StorageMemory.cpp +++ b/dbms/src/Storages/StorageMemory.cpp @@ -50,6 +50,7 @@ StorageMemory::StorageMemory(const std::string & name_, NamesAndTypesListPtr col BlockInputStreams StorageMemory::read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index ac04ea28dd..7d32dc84c2 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -19,6 +19,7 @@ StorageMerge::StorageMerge( BlockInputStreams StorageMerge::read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) @@ -52,6 +53,7 @@ BlockInputStreams StorageMerge::read( BlockInputStreams source_streams = (*it)->read( column_names, query, + settings, tmp_processed_stage, max_block_size, selected_tables.size() > threads ? 1 : (threads / selected_tables.size())); diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index aaf6f110bf..7098ea3881 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -819,6 +819,7 @@ BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query) BlockInputStreams StorageMergeTree::read( const Names & column_names_to_return, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) diff --git a/dbms/src/Storages/StorageSystemDatabases.cpp b/dbms/src/Storages/StorageSystemDatabases.cpp index ee779d7fb3..3f0255c6d7 100644 --- a/dbms/src/Storages/StorageSystemDatabases.cpp +++ b/dbms/src/Storages/StorageSystemDatabases.cpp @@ -16,7 +16,8 @@ StorageSystemDatabases::StorageSystemDatabases(const std::string & name_, const BlockInputStreams StorageSystemDatabases::read( - const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) + const Names & column_names, ASTPtr query, const Settings & settings, + QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) { check(column_names); processed_stage = QueryProcessingStage::FetchColumns; diff --git a/dbms/src/Storages/StorageSystemNumbers.cpp b/dbms/src/Storages/StorageSystemNumbers.cpp index 25ab2c44d7..39a4b61de3 100644 --- a/dbms/src/Storages/StorageSystemNumbers.cpp +++ b/dbms/src/Storages/StorageSystemNumbers.cpp @@ -47,7 +47,8 @@ StorageSystemNumbers::StorageSystemNumbers(const std::string & name_) BlockInputStreams StorageSystemNumbers::read( - const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) + const Names & column_names, ASTPtr query, const Settings & settings, + QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) { check(column_names); processed_stage = QueryProcessingStage::FetchColumns; diff --git a/dbms/src/Storages/StorageSystemOne.cpp b/dbms/src/Storages/StorageSystemOne.cpp index d1ed97a22e..2ff0cbbac7 100644 --- a/dbms/src/Storages/StorageSystemOne.cpp +++ b/dbms/src/Storages/StorageSystemOne.cpp @@ -19,7 +19,8 @@ StorageSystemOne::StorageSystemOne(const std::string & name_) BlockInputStreams StorageSystemOne::read( - const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) + const Names & column_names, ASTPtr query, const Settings & settings, + QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) { check(column_names); processed_stage = QueryProcessingStage::FetchColumns; diff --git a/dbms/src/Storages/StorageSystemTables.cpp b/dbms/src/Storages/StorageSystemTables.cpp index 883f7c67b7..1efefb64e4 100644 --- a/dbms/src/Storages/StorageSystemTables.cpp +++ b/dbms/src/Storages/StorageSystemTables.cpp @@ -18,7 +18,8 @@ StorageSystemTables::StorageSystemTables(const std::string & name_, const Contex BlockInputStreams StorageSystemTables::read( - const Names & column_names, ASTPtr query, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) + const Names & column_names, ASTPtr query, const Settings & settings, + QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) { check(column_names); processed_stage = QueryProcessingStage::FetchColumns; diff --git a/dbms/src/Storages/StorageTinyLog.cpp b/dbms/src/Storages/StorageTinyLog.cpp index 0beb83ec46..7d94980a8e 100644 --- a/dbms/src/Storages/StorageTinyLog.cpp +++ b/dbms/src/Storages/StorageTinyLog.cpp @@ -224,6 +224,7 @@ void StorageTinyLog::rename(const String & new_path_to_db, const String & new_na BlockInputStreams StorageTinyLog::read( const Names & column_names, ASTPtr query, + const Settings & settings, QueryProcessingStage::Enum & processed_stage, size_t max_block_size, unsigned threads) -- GitLab