未验证 提交 686f70fe 编写于 作者: A alesapin 提交者: GitHub

Merge pull request #9412 from ClickHouse/CurtizJ-test-backward-compatability

Fix replication protocol backward incompatibility.
......@@ -25,7 +25,6 @@ namespace ErrorCodes
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int UNKNOWN_PROTOCOL;
extern const int INSECURE_PATH;
}
......@@ -34,10 +33,8 @@ namespace DataPartsExchange
namespace
{
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE = "0";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = "1";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = "2";
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1;
static constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2;
std::string getEndpointId(const std::string & node_id)
......@@ -54,15 +51,10 @@ std::string Service::getId(const std::string & node_id) const
void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, Poco::Net::HTTPServerResponse & response)
{
String client_protocol_version = params.get("client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
int client_protocol_version = parse<int>(params.get("client_protocol_version", "0"));
String part_name = params.get("part");
if (client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS
&& client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE
&& client_protocol_version != REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE)
throw Exception("Unsupported fetch protocol version", ErrorCodes::UNKNOWN_PROTOCOL);
const auto data_settings = data.getSettings();
/// Validation of the input that may come from malicious replica.
......@@ -79,7 +71,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
response.setChunkedTransferEncoding(false);
return;
}
response.addCookie({"server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS});
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS))});
++total_sends;
SCOPE_EXIT({--total_sends;});
......@@ -107,10 +101,10 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
MergeTreeData::DataPart::Checksums data_checksums;
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
writeBinary(checksums.getTotalSizeOnDisk(), out);
if (client_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
WriteBufferFromOwnString ttl_infos_buffer;
part->ttl_infos.write(ttl_infos_buffer);
......@@ -202,7 +196,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)},
{"compress", "false"}
});
......@@ -224,15 +218,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
data_settings->replicated_max_parallel_fetches_for_host
};
auto server_protocol_version = in.getResponseCookie("server_protocol_version", REPLICATION_PROTOCOL_VERSION_WITHOUT_PARTS_SIZE);
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
ReservationPtr reservation;
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE || server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
{
size_t sum_files_size;
readBinary(sum_files_size, in);
if (server_protocol_version == REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
IMergeTreeDataPart::TTLInfos ttl_infos;
String ttl_infos_string;
......
import pytest
import helpers.client as client
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:19.17.8.54', stay_alive=True, with_installed_binary=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
for i, node in enumerate([node1, node2]):
node.query(
'''CREATE TABLE t(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/t', '{}')
PARTITION BY toYYYYMM(date)
ORDER BY id'''.format(i))
yield cluster
finally:
cluster.shutdown()
def test_backward_compatability(start_cluster):
node2.query("INSERT INTO t VALUES (today(), 1)")
node1.query("SYSTEM SYNC REPLICA t", timeout=10)
assert node1.query("SELECT count() FROM t") == "1\n"
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册