提交 3f3b7f61 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

always send changed settings for remote connections [#CLICKHOUSE-3151]

上级 ee457eca
......@@ -12,12 +12,9 @@ namespace ErrorCodes
}
MultiplexedConnections::MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_)
MultiplexedConnections::MultiplexedConnections(Connection & connection, const Settings & settings_, ThrottlerPtr throttler_)
: settings(settings_), throttler(throttler_), supports_parallel_execution(false)
{
if (connection_ == nullptr)
throw Exception("Invalid connection specified", ErrorCodes::LOGICAL_ERROR);
active_connection_total_count = 1;
ShardState shard_state;
......@@ -30,8 +27,8 @@ MultiplexedConnections::MultiplexedConnections(Connection * connection_, const S
replica_state.connection_index = 0;
replica_state.shard_state = &shard_states[0];
connection_->setThrottler(throttler);
connections.push_back(connection_);
connection.setThrottler(throttler);
connections.push_back(&connection);
auto res = replica_map.emplace(connections[0]->socket.impl()->sockfd(), replica_state);
if (!res.second)
......@@ -39,7 +36,7 @@ MultiplexedConnections::MultiplexedConnections(Connection * connection_, const S
}
MultiplexedConnections::MultiplexedConnections(
ConnectionPoolWithFailover & pool_, const Settings * settings_, ThrottlerPtr throttler_,
ConnectionPoolWithFailover & pool_, const Settings & settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
{
......@@ -53,7 +50,7 @@ MultiplexedConnections::MultiplexedConnections(
}
MultiplexedConnections::MultiplexedConnections(
const ConnectionPoolWithFailoverPtrs & pools_, const Settings * settings_, ThrottlerPtr throttler_,
const ConnectionPoolWithFailoverPtrs & pools_, const Settings & settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table)
: settings(settings_), throttler(throttler_), pool_mode(pool_mode_)
{
......@@ -110,44 +107,27 @@ void MultiplexedConnections::sendQuery(
if (supports_parallel_execution)
{
if (settings == nullptr)
/// Each shard has one or more replicas.
auto it = connections.begin();
for (const auto & shard_state : shard_states)
{
/// Each shard has one address.
auto it = connections.begin();
for (size_t i = 0; i < shard_states.size(); ++i)
Settings query_settings = settings;
query_settings.parallel_replicas_count = shard_state.active_connection_count;
UInt64 offset = 0;
for (size_t i = 0; i < shard_state.allocated_connection_count; ++i)
{
Connection * connection = *it;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, nullptr, client_info, with_pending_data);
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
++offset;
++it;
}
}
else
{
/// Each shard has one or more replicas.
auto it = connections.begin();
for (const auto & shard_state : shard_states)
{
Settings query_settings = *settings;
query_settings.parallel_replicas_count = shard_state.active_connection_count;
UInt64 offset = 0;
for (size_t i = 0; i < shard_state.allocated_connection_count; ++i)
{
Connection * connection = *it;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
query_settings.parallel_replica_offset = offset;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
++offset;
++it;
}
}
}
}
else
{
......@@ -155,7 +135,7 @@ void MultiplexedConnections::sendQuery(
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, settings, client_info, with_pending_data);
connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data);
}
sent_query = true;
......@@ -280,9 +260,9 @@ void MultiplexedConnections::initFromShard(ConnectionPoolWithFailover & pool, co
{
std::vector<IConnectionPool::Entry> entries;
if (main_table)
entries = pool.getManyChecked(settings, pool_mode, *main_table);
entries = pool.getManyChecked(&settings, pool_mode, *main_table);
else
entries = pool.getMany(settings, pool_mode);
entries = pool.getMany(&settings, pool_mode);
/// If getMany() did not allocate connections and did not throw exceptions, this means that
/// `skip_unavailable_shards` was set. Then just return.
......@@ -424,7 +404,7 @@ MultiplexedConnections::ReplicaMap::iterator MultiplexedConnections::waitForRead
read_list.push_back(connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->receive_timeout);
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.receive_timeout);
if (n == 0)
throw Exception("Timeout exceeded while reading from " + dumpAddressesUnlocked(), ErrorCodes::TIMEOUT_EXCEEDED);
......
......@@ -20,22 +20,20 @@ class MultiplexedConnections final : private boost::noncopyable
{
public:
/// Accepts ready connection.
MultiplexedConnections(Connection * connection_, const Settings * settings_, ThrottlerPtr throttler_);
MultiplexedConnections(Connection & connection, const Settings & settings_, ThrottlerPtr throttler_);
/** Accepts a pool from which it will be necessary to get one or more connections.
* If the append_extra_info flag is set, additional information appended to each received block.
* If the get_all_replicas flag is set, all connections are selected.
*/
MultiplexedConnections(
ConnectionPoolWithFailover & pool_, const Settings * settings_, ThrottlerPtr throttler_,
ConnectionPoolWithFailover & pool_, const Settings & settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table = nullptr);
/** Accepts pools, one for each shard, from which one will need to get one or more connections.
* If the append_extra_info flag is set, additional information appended to each received block.
* If the do_broadcast flag is set, all connections are received.
*/
MultiplexedConnections(
const ConnectionPoolWithFailoverPtrs & pools_, const Settings * settings_, ThrottlerPtr throttler_,
const ConnectionPoolWithFailoverPtrs & pools_, const Settings & settings_, ThrottlerPtr throttler_,
bool append_extra_info, PoolMode pool_mode_, const QualifiedTableName * main_table = nullptr);
/// Send all content of external tables to replicas.
......@@ -131,7 +129,7 @@ private:
void invalidateReplica(ReplicaMap::iterator it);
private:
const Settings * settings;
const Settings & settings;
Connections connections;
ReplicaMap replica_map;
......
......@@ -17,21 +17,23 @@ namespace ErrorCodes
RemoteBlockInputStream::RemoteBlockInputStream(Connection & connection_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Context & context_, const Settings * settings, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: connection(&connection_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
}
RemoteBlockInputStream::RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr & pool_, const String & query_,
const Settings * settings_, const Context & context_, ThrottlerPtr throttler_,
const Context & context_, const Settings * settings, ThrottlerPtr throttler_,
const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: pool(pool_), query(query_), throttler(throttler_), external_tables(external_tables_),
stage(stage_), context(context_)
{
init(settings_);
if (settings)
context.setSettings(*settings);
}
RemoteBlockInputStream::~RemoteBlockInputStream()
......@@ -215,30 +217,18 @@ void RemoteBlockInputStream::readSuffixImpl()
void RemoteBlockInputStream::createMultiplexedConnections()
{
Settings * multiplexed_connections_settings = send_settings ? &context.getSettingsRef() : nullptr;
const QualifiedTableName * main_table_ptr = main_table ? &main_table.value() : nullptr;
if (connection != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(
connection, multiplexed_connections_settings, throttler);
*connection, context.getSettingsRef(), throttler);
else if (pool != nullptr)
multiplexed_connections = std::make_unique<MultiplexedConnections>(
*pool, multiplexed_connections_settings, throttler,
*pool, context.getSettingsRef(), throttler,
append_extra_info, pool_mode, main_table_ptr);
else
throw Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
}
void RemoteBlockInputStream::init(const Settings * settings)
{
if (settings)
{
send_settings = true;
context.setSettings(*settings);
}
else
send_settings = false;
}
void RemoteBlockInputStream::sendQuery()
{
createMultiplexedConnections();
......
......@@ -20,14 +20,16 @@ namespace DB
class RemoteBlockInputStream : public IProfilingBlockInputStream
{
public:
/// Takes already set connection
RemoteBlockInputStream(Connection & connection_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
/// Takes already set connection.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(Connection & connection_, const String & query_, const Context & context_,
const Settings * settings = nullptr, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it
RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr & pool_, const String & query_, const Settings * settings_,
const Context & context_, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
/// Takes a pool and gets one or several connections from it.
/// If `settings` is nullptr, settings will be taken from context.
RemoteBlockInputStream(const ConnectionPoolWithFailoverPtr & pool_, const String & query_, const Context & context_,
const Settings * settings = nullptr, ThrottlerPtr throttler_ = nullptr, const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
~RemoteBlockInputStream() override;
......@@ -82,8 +84,6 @@ protected:
bool hasThrownException() const;
private:
void init(const Settings * settings);
void sendQuery();
/// If wasn't sent yet, send request to cancell all connections to replicas
......@@ -99,7 +99,6 @@ private:
std::unique_ptr<MultiplexedConnections> multiplexed_connections;
const String query;
bool send_settings;
/// If != nullptr, used to limit network trafic
ThrottlerPtr throttler;
/// Temporary tables needed to be sent to remote servers
......
......@@ -71,7 +71,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
*/
if (is_local)
return executeQuery(load_all_query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, nullptr, context);
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, context);
}
......@@ -101,7 +101,7 @@ BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(con
{
if (is_local)
return executeQuery(query, context, true).in;
return std::make_shared<RemoteBlockInputStream>(pool, query, nullptr, context);
return std::make_shared<RemoteBlockInputStream>(pool, query, context);
}
}
......@@ -30,7 +30,7 @@ void AlterStreamFactory::createForShard(
}
else
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, &context.getSettingsRef(), context, throttler);
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context, nullptr, throttler);
stream->setPoolMode(PoolMode::GET_ONE);
res.emplace_back(std::move(stream));
}
......
......@@ -46,7 +46,7 @@ void DescribeStreamFactory::createForShard(
}
auto remote_stream = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, &context.getSettingsRef(), context, throttler);
shard_info.pool, query, context, nullptr, throttler);
remote_stream->setPoolMode(PoolMode::GET_ALL);
remote_stream->appendExtraInfo();
res.emplace_back(std::move(remote_stream));
......
......@@ -38,7 +38,7 @@ void SelectStreamFactory::createForShard(
}
else
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, &context.getSettingsRef(), context, throttler, external_tables, processed_stage);
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
......
......@@ -296,7 +296,7 @@ private:
void execute(ConnectionPool::Entry & connection, Query & query)
{
Stopwatch watch;
RemoteBlockInputStream stream(*connection, query, &settings, global_context, nullptr, Tables(), query_processing_stage);
RemoteBlockInputStream stream(*connection, query, global_context, &settings, nullptr, Tables(), query_processing_stage);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
......
......@@ -1072,7 +1072,7 @@ private:
statistics.last_query_rows_read = 0;
statistics.last_query_bytes_read = 0;
RemoteBlockInputStream stream(connection, query, &settings, global_context, nullptr, Tables() /*, query_processing_stage*/);
RemoteBlockInputStream stream(connection, query, global_context, &settings);
stream.setProgressCallback([&](const Progress & value)
{
......
......@@ -3086,7 +3086,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query
leader_address.database,
"", "", "ClickHouse replica");
RemoteBlockInputStream stream(connection, formattedAST(new_query), &settings, context);
RemoteBlockInputStream stream(connection, formattedAST(new_query), context, &settings);
NullBlockOutputStream output;
copyData(stream, output);
......
......@@ -25,7 +25,6 @@ NamesAndTypesList getStructureOfRemoteTable(
{
/// Request for a table description
String query = "DESC TABLE " + backQuoteIfNeed(database) + "." + backQuoteIfNeed(table);
Settings settings = context.getSettings();
NamesAndTypesList res;
/// Send to the first any remote shard.
......@@ -34,9 +33,7 @@ NamesAndTypesList getStructureOfRemoteTable(
if (shard_info.isLocal())
return context.getTable(database, table)->getColumnsList();
BlockInputStreamPtr input = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, &settings, context, nullptr,
Tables(), QueryProcessingStage::Complete);
BlockInputStreamPtr input = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, context);
input->readPrefix();
const DataTypeFactory & data_type_factory = DataTypeFactory::instance();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册