diff --git a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp index f648b7f3fb7ddadc3784e840dcbe175714d20a2b..11fa64739d360c551ec4ab4199974e379bd2c762 100644 --- a/dbms/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/dbms/src/Storages/MergeTree/DataPartsExchange.cpp @@ -25,7 +25,6 @@ namespace ErrorCodes extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; extern const int CANNOT_WRITE_TO_OSTREAM; extern const int CHECKSUM_DOESNT_MATCH; - extern const int UNKNOWN_PROTOCOL; extern const int INSECURE_PATH; } @@ -34,10 +33,8 @@ namespace DataPartsExchange namespace { - -static constexpr auto REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE = "0"; -static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = "1"; -static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = "2"; +static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1; +static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2; std::string getEndpointId(const std::string & node_id) @@ -54,15 +51,10 @@ std::string Service::getId(const std::string & node_id) const void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response) { - String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE); + int client_protocol_version = parse(params.get("client_protocol_version", "0")); String part_name = params.get("part"); - if (client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS - && client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE - && client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE) - throw Exception("Unsupported fetch protocol version", ErrorCodes::UNKNOWN_PROTOCOL); - const auto data_settings = data.getSettings(); /// Validation of the input that may come from malicious replica. @@ -79,7 +71,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo response.setChunkedTransferEncoding(false); return; } - response.addCookie({"server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS}); + + /// We pretend to work as older server version, to be sure that client will correctly process our version + response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS))}); ++total_sends; SCOPE_EXIT({--total_sends;}); @@ -107,10 +101,10 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo MergeTreeData::DataPart::Checksums data_checksums; - if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) + if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE) writeBinary(checksums.getTotalSizeOnDisk(), out); - if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) + if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) { WriteBufferFromOwnString ttl_infos_buffer; part->ttl_infos.write(ttl_infos_buffer); @@ -202,7 +196,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( { {"endpoint", getEndpointId(replica_path)}, {"part", part_name}, - {"client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS}, + {"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)}, {"compress", "false"} }); @@ -224,15 +218,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( data_settings->replicated_max_parallel_fetches_for_host }; - auto server_protocol_version = in.getResponseCookie("server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE); - + int server_protocol_version = parse(in.getResponseCookie("server_protocol_version", "0")); ReservationPtr reservation; - if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) + if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE) { size_t sum_files_size; readBinary(sum_files_size, in); - if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) + if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) { IMergeTreeDataPart::TTLInfos ttl_infos; String ttl_infos_string; diff --git a/dbms/tests/integration/test_backward_compatability/__init__.py b/dbms/tests/integration/test_backward_compatability/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/dbms/tests/integration/test_backward_compatability/test.py b/dbms/tests/integration/test_backward_compatability/test.py new file mode 100644 index 0000000000000000000000000000000000000000..914153342f890a39fede8631e9bdf36f083f37eb --- /dev/null +++ b/dbms/tests/integration/test_backward_compatability/test.py @@ -0,0 +1,31 @@ +import pytest + +import helpers.client as client +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:19.17.8.54', stay_alive=True, with_installed_binary=True) +node2 = cluster.add_instance('node2', with_zookeeper=True) + +@pytest.fixture(scope="module") +def start_cluster(): + try: + cluster.start() + for i, node in enumerate([node1, node2]): + node.query( + '''CREATE TABLE t(date Date, id UInt32) + ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}') + PARTITION BY toYYYYMM(date) + ORDER BY id'''.format(i)) + + yield cluster + + finally: + cluster.shutdown() + + +def test_backward_compatability(start_cluster): + node2.query("INSERT INTO t VALUES (today(), 1)") + node1.query("SYSTEM SYNC REPLICA t", timeout=10) + + assert node1.query("SELECT count() FROM t") == "1\n"