diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 632c754a8d193e0e38b105640cec004720a3b602..40a630c566a1640e195bb5d7fa6aab316ff9dc98 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -368,6 +368,7 @@ namespace ErrorCodes extern const int INSERT_WAS_DEDUPLICATED = 389; extern const int CANNOT_GET_CREATE_TABLE_QUERY = 390; extern const int EXTERNAL_LIBRARY_ERROR = 391; + extern const int QUERY_IS_PROHIBITED = 392; extern const int KEEPER_EXCEPTION = 999; diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index eea11fbb3ff9ab56eac00f1733cfb9d21a887341..3ae9a1d5a060d4bf0113b4b0ee1464628a5e5652 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -54,6 +54,7 @@ namespace ErrorCodes extern const int UNKNOWN_TYPE_OF_QUERY; extern const int UNFINISHED; extern const int UNKNOWN_STATUS_OF_DISTRIBUTED_DDL_TASK; + extern const int QUERY_IS_PROHIBITED; } @@ -980,21 +981,32 @@ public: if (is_cancelled) return res; - auto elapsed_seconds = watch.elapsedSeconds(); - if (timeout_seconds >= 0 && elapsed_seconds > timeout_seconds) + if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds) { - throw Exception("Watching task " + node_path + " is executing too long (" + toString(std::round(elapsed_seconds)) + " sec.)", - ErrorCodes::TIMEOUT_EXCEEDED); + size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished; + size_t num_active_hosts = current_active_hosts.size(); + + std::stringstream msg; + msg << "Watching task " << node_path << " is executing longer than distributed_ddl_task_timeout" + << " (=" << timeout_seconds << ") seconds." + << " There are " << num_unfinished_hosts << " unfinished hosts" + << " (" << num_active_hosts << " of them are currently active)" + << ", they are going to execute the query in background"; + + throw Exception(msg.str(), ErrorCodes::TIMEOUT_EXCEEDED); } if (num_hosts_finished != 0 || try_number != 0) - std::this_thread::sleep_for(std::chrono::milliseconds(50 * std::min(static_cast(20), try_number + 1))); + { + auto current_sleep_for = std::chrono::milliseconds(std::min(static_cast(1000), 50 * (try_number + 1))); + std::this_thread::sleep_for(current_sleep_for); + } /// TODO: add shared lock if (!zookeeper->exists(node_path)) { throw Exception("Cannot provide query execution status. The query's node " + node_path - + " had been deleted by the cleaner since it was finished (or its lifetime is expired)", + + " has been deleted by the cleaner since it was finished (or its lifetime is expired)", ErrorCodes::UNFINISHED); } @@ -1003,7 +1015,7 @@ public: if (new_hosts.empty()) continue; - Strings cur_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active"); + current_active_hosts = getChildrenAllowNoNode(zookeeper, node_path + "/active"); MutableColumns columns = sample.cloneEmptyColumns(); for (const String & host_id : new_hosts) @@ -1019,12 +1031,14 @@ public: UInt16 port; Cluster::Address::fromString(host_id, host, port); + ++num_hosts_finished; + columns[0]->insert(host); columns[1]->insert(static_cast(port)); columns[2]->insert(static_cast(status.code)); columns[3]->insert(status.message); - columns[4]->insert(static_cast(waiting_hosts.size() - (++num_hosts_finished))); - columns[5]->insert(static_cast(cur_active_hosts.size())); + columns[4]->insert(static_cast(waiting_hosts.size() - num_hosts_finished)); + columns[5]->insert(static_cast(current_active_hosts.size())); } res = sample.cloneWithColumns(std::move(columns)); } @@ -1086,6 +1100,7 @@ private: NameSet waiting_hosts; /// hosts from task host list NameSet finished_hosts; /// finished hosts from host list NameSet ignoring_hosts; /// appeared hosts that are not in hosts list + Strings current_active_hosts; /// Hosts that were in active state at the last check size_t num_hosts_finished = 0; Int64 timeout_seconds = 120; @@ -1104,6 +1119,9 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont throw Exception("Distributed execution is not supported for such DDL queries", ErrorCodes::NOT_IMPLEMENTED); } + if (!context.getSettingsRef().allow_distributed_ddl) + throw Exception("Distributed DDL queries are prohibited for the user", ErrorCodes::QUERY_IS_PROHIBITED); + if (auto query_alter = dynamic_cast(query_ptr.get())) { for (const auto & param : query_alter->parameters) diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 0a7255091860bdbaca76805d4b2cf4890ae59bdc..26a40def4fad0016286597dc1b036308310b5714 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -170,7 +170,7 @@ struct Settings \ M(SettingBool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.") \ M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \ - M(SettingInt64, distributed_ddl_task_timeout, 120, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \ + M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \ M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, "Timeout for flushing data from streaming storages.") \ M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \ M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \ @@ -180,7 +180,8 @@ struct Settings M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown") \ M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \ \ - M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") + M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \ + M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") /// Possible limits for query execution. diff --git a/dbms/src/Interpreters/SettingsCommon.h b/dbms/src/Interpreters/SettingsCommon.h index 59c1c0dac28560da449caa44cbc1a5f946f2dc1c..68838b43b07b20a9caf95e5c8bec8017db87e12b 100644 --- a/dbms/src/Interpreters/SettingsCommon.h +++ b/dbms/src/Interpreters/SettingsCommon.h @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -57,7 +58,7 @@ struct SettingInt void set(const Field & x) { - set(safeGet(x)); + set(applyVisitor(FieldVisitorConvertToNumber(), x)); } void set(const String & x)