未验证 提交 058f86de 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #3254 from yandex/group_by_two_level_compatibility

Fixed incompatibility when versions prior to 18.12.17 are used on remote servers and newer is used on initiating server, and GROUP BY both fixed and non-fixed keys, and when two-level group by method is activated
......@@ -231,6 +231,14 @@ void Connection::getServerVersion(String & name, UInt64 & version_major, UInt64
revision = server_revision;
}
UInt64 Connection::getServerRevision()
{
if (!connected)
connect();
return server_revision;
}
const String & Connection::getServerTimezone()
{
if (!connected)
......
......@@ -106,6 +106,7 @@ public:
void setDefaultDatabase(const String & database);
void getServerVersion(String & name, UInt64 & version_major, UInt64 & version_minor, UInt64 & version_patch, UInt64 & revision);
UInt64 getServerRevision();
const String & getServerTimezone();
const String & getServerDisplayName();
......
......@@ -153,13 +153,9 @@ ConnectionPoolWithFailover::tryGetEntry(
{
result.entry = pool.get(settings, /* force_connected = */ false);
String server_name;
UInt64 server_version_major;
UInt64 server_version_minor;
UInt64 server_version_patch;
UInt64 server_revision;
UInt64 server_revision = 0;
if (table_to_check)
result.entry->getServerVersion(server_name, server_version_major, server_version_minor, server_version_patch, server_revision);
server_revision = result.entry->getServerRevision();
if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
{
......
......@@ -87,28 +87,36 @@ void MultiplexedConnections::sendQuery(
if (sent_query)
throw Exception("Query already sent.", ErrorCodes::LOGICAL_ERROR);
if (replica_states.size() > 1)
Settings modified_settings = settings;
for (auto & replica : replica_states)
{
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_states.size();
if (!replica.connection)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < replica_states.size(); ++i)
if (replica.connection->getServerRevision() < DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD)
{
Connection * connection = replica_states[i].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
/// Disable two-level aggregation due to version incompatibility.
modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0;
}
}
query_settings.parallel_replica_offset = i;
connection->sendQuery(query, query_id, stage, &query_settings, client_info, with_pending_data);
size_t num_replicas = replica_states.size();
if (num_replicas > 1)
{
/// Use multiple replicas for parallel query processing.
modified_settings.parallel_replicas_count = num_replicas;
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
}
}
else
{
Connection * connection = replica_states[0].connection;
if (connection == nullptr)
throw Exception("MultiplexedConnections: Internal error", ErrorCodes::LOGICAL_ERROR);
connection->sendQuery(query, query_id, stage, &settings, client_info, with_pending_data);
/// Use single replica.
replica_states[0].connection->sendQuery(query, query_id, stage, &modified_settings, client_info, with_pending_data);
}
sent_query = true;
......
......@@ -47,6 +47,10 @@
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54408
/// Version of ClickHouse TCP protocol. Set to git tag with latest protocol change.
#define DBMS_TCP_PROTOCOL_VERSION 54226
......
......@@ -2085,7 +2085,7 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
/** `minus one` means the absence of information about the bucket
* - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
* If there is at least one block with a bucket number greater than zero, then there was a two-level aggregation.
* If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation.
*/
auto max_bucket = bucket_to_blocks.rbegin()->first;
size_t has_two_level = max_bucket >= 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册