未验证 提交 24b0be2c 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #2727 from alesapin/CLICKHOUSE-3832

CLICKHOUSE-3832: Add HTTP Basic authentification in replication protocol
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
......@@ -23,14 +24,40 @@ namespace ErrorCodes
extern const int TOO_MANY_SIMULTANEOUS_QUERIES;
}
std::pair<String, bool> InterserverIOHTTPHandler::checkAuthentication(Poco::Net::HTTPServerRequest & request) const
{
const auto & config = server.config();
if (config.has("interserver_http_credentials.user"))
{
if (!request.hasCredentials())
return {"Server requires HTTP Basic authentification, but client doesn't provide it", false};
String scheme, info;
request.getCredentials(scheme, info);
if (scheme != "Basic")
return {"Server requires HTTP Basic authentification but client provides another method", false};
String user = config.getString("interserver_http_credentials.user");
String password = config.getString("interserver_http_credentials.password", "");
Poco::Net::HTTPBasicCredentials credentials(info);
if (std::make_pair(user, password) != std::make_pair(credentials.getUsername(), credentials.getPassword()))
return {"Incorrect user or password in HTTP Basic authentification", false};
}
else if (request.hasCredentials())
{
return {"Client requires HTTP Basic authentification, but server doesn't provide it", false};
}
return {"", true};
}
void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
HTMLForm params(request);
LOG_TRACE(log, "Request URI: " << request.getURI());
/// NOTE: You can do authentication here if you need to.
String endpoint_name = params.get("endpoint");
bool compress = params.get("compress") == "true";
......@@ -65,8 +92,18 @@ void InterserverIOHTTPHandler::handleRequest(Poco::Net::HTTPServerRequest & requ
try
{
processQuery(request, response);
LOG_INFO(log, "Done processing query");
if (auto [msg, success] = checkAuthentication(request); success)
{
processQuery(request, response);
LOG_INFO(log, "Done processing query");
}
else
{
response.setStatusAndReason(Poco::Net::HTTPServerResponse::HTTP_UNAUTHORIZED);
if (!response.sent())
response.send() << msg << std::endl;
LOG_WARNING(log, "Query processing failed request: '" << request.getURI() << "' authentification failed");
}
}
catch (Exception & e)
{
......
......@@ -34,6 +34,8 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
std::pair<String, bool> checkAuthentication(Poco::Net::HTTPServerRequest & request) const;
};
}
......@@ -230,6 +230,17 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setInterserverIOAddress(this_host, port);
}
if (config().has("interserver_http_credentials"))
{
String user = config().getString("interserver_http_credentials.user", "");
String password = config().getString("interserver_http_credentials.password", "");
if (user.empty())
throw Exception("Configuration parameter interserver_http_credentials user can't be empty", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
global_context->setInterverserCredentials(user, password);
}
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros"));
......
......@@ -18,6 +18,7 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method_,
OutStreamCallback out_stream_callback,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials,
size_t buffer_size_)
: ReadBuffer(nullptr, 0),
uri{uri},
......@@ -30,6 +31,9 @@ ReadWriteBufferFromHTTP::ReadWriteBufferFromHTTP(const Poco::URI & uri,
if (out_stream_callback)
request.setChunkedTransferEncoding(true);
if (!credentials.getUsername().empty())
credentials.authenticate(request);
Poco::Net::HTTPResponse response;
LOG_TRACE((&Logger::get("ReadWriteBufferFromHTTP")), "Sending request to " << uri.toString());
......
#pragma once
#include <functional>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/URI.h>
#include <IO/ReadBuffer.h>
......@@ -32,6 +33,7 @@ public:
const std::string & method = {},
OutStreamCallback out_stream_callback = {},
const ConnectionTimeouts & timeouts = {},
const Poco::Net::HTTPBasicCredentials & credentials = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
bool nextImpl() override;
......
......@@ -109,6 +109,8 @@ struct ContextShared
String interserver_io_host; /// The host name by which this server is available for other servers.
UInt16 interserver_io_port = 0; /// and port.
String interserver_io_user;
String interserver_io_password;
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
......@@ -1378,6 +1380,17 @@ void Context::setInterserverIOAddress(const String & host, UInt16 port)
shared->interserver_io_port = port;
}
void Context::setInterverserCredentials(const String & user, const String & password)
{
shared->interserver_io_user = user;
shared->interserver_io_password = password;
}
std::pair<String, String> Context::getInterserverCredentials() const
{
return { shared->interserver_io_user, shared->interserver_io_password };
}
std::pair<String, UInt16> Context::getInterserverIOAddress() const
{
......
......@@ -249,6 +249,11 @@ public:
/// How other servers can access this for downloading replicated data.
void setInterserverIOAddress(const String & host, UInt16 port);
std::pair<String, UInt16> getInterserverIOAddress() const;
// Credentials which server will use to communicate with others
void setInterverserCredentials(const String & user, const String & password);
std::pair<String, String> getInterserverCredentials() const;
/// The port that the server listens for executing SQL queries.
UInt16 getTCPPort() const;
......
......@@ -161,6 +161,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
bool to_detached,
const String & tmp_prefix_)
{
......@@ -175,7 +177,14 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
{"compress", "false"}
});
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts};
Poco::Net::HTTPBasicCredentials creds{};
if (!user.empty())
{
creds.setUsername(user);
creds.setPassword(password);
}
ReadWriteBufferFromHTTP in{uri, Poco::Net::HTTPRequest::HTTP_POST, {}, timeouts, creds};
static const String TMP_PREFIX = "tmp_fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
......
......@@ -54,6 +54,8 @@ public:
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
bool to_detached = false,
const String & tmp_prefix_ = "");
......
......@@ -1971,9 +1971,10 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
String replica_path = zookeeper_path + "/replicas/" + part_desc->replica;
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
part_desc->res_part = fetcher.fetchPart(part_desc->found_new_part_name, replica_path,
address.host, address.replication_port, timeouts, false, TMP_PREFIX + "fetch_");
address.host, address.replication_port, timeouts, user, password, false, TMP_PREFIX + "fetch_");
/// TODO: check columns_version of fetched part
......@@ -2706,10 +2707,11 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
try
{
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, to_detached);
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, to_detached);
if (!to_detached)
{
......
<yandex>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_credentials>
<user>admin</user>
<password>222</password>
</interserver_http_credentials>
</yandex>
<yandex>
<interserver_http_port>9009</interserver_http_port>
<interserver_http_credentials>
<user>root</user>
<password>111</password>
</interserver_http_credentials>
</yandex>
<yandex>
<interserver_http_port>9009</interserver_http_port>
</yandex>
<yandex>
<remote_servers>
<test_cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node3</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node4</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node5</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node7</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<default_database>test</default_database>
<host>node7</host>
<port>9000</port>
</replica>
<replica>
<default_database>test</default_database>
<host>node8</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>
import time
import pytest
from helpers.cluster import ClickHouseCluster
def _fill_nodes(nodes, shard):
for node in nodes:
node.query(
'''
CREATE DATABASE test;
CREATE TABLE test_table(date Date, id UInt32, dummy UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test{shard}/replicated', '{replica}', date, id, 8192);
'''.format(shard=shard, replica=node.name))
cluster = ClickHouseCluster(__file__, server_bin_path="/home/alesap/ClickHouse/dbms/programs/clickhouse-server")
node1 = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
node2 = cluster.add_instance('node2', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def same_credentials_cluster():
try:
cluster.start()
_fill_nodes([node1, node2], 1)
yield cluster
finally:
cluster.shutdown()
def test_same_credentials(same_credentials_cluster):
node1.query("insert into test_table values ('2017-06-16', 111, 0)")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '111\n'
assert node2.query("SELECT id FROM test_table order by id") == '111\n'
node2.query("insert into test_table values ('2017-06-17', 222, 1)")
time.sleep(1)
assert node1.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert node2.query("SELECT id FROM test_table order by id") == '111\n222\n'
node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def no_credentials_cluster():
try:
cluster.start()
_fill_nodes([node3, node4], 2)
yield cluster
finally:
cluster.shutdown()
def test_no_credentials(no_credentials_cluster):
node3.query("insert into test_table values ('2017-06-18', 111, 0)")
time.sleep(1)
assert node3.query("SELECT id FROM test_table order by id") == '111\n'
assert node4.query("SELECT id FROM test_table order by id") == '111\n'
node4.query("insert into test_table values ('2017-06-19', 222, 1)")
time.sleep(1)
assert node3.query("SELECT id FROM test_table order by id") == '111\n222\n'
assert node4.query("SELECT id FROM test_table order by id") == '111\n222\n'
node5 = cluster.add_instance('node5', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
node6 = cluster.add_instance('node6', main_configs=['configs/remote_servers.xml', 'configs/credentials2.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def different_credentials_cluster():
try:
cluster.start()
_fill_nodes([node5, node6], 3)
yield cluster
finally:
cluster.shutdown()
def test_different_credentials(different_credentials_cluster):
node5.query("insert into test_table values ('2017-06-20', 111, 0)")
time.sleep(1)
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == ''
node6.query("insert into test_table values ('2017-06-21', 222, 1)")
time.sleep(1)
assert node5.query("SELECT id FROM test_table order by id") == '111\n'
assert node6.query("SELECT id FROM test_table order by id") == '222\n'
node7 = cluster.add_instance('node7', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
node8 = cluster.add_instance('node8', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
@pytest.fixture(scope="module")
def credentials_and_no_credentials_cluster():
try:
cluster.start()
_fill_nodes([node7, node8], 4)
yield cluster
finally:
cluster.shutdown()
def test_credentials_and_no_credentials(credentials_and_no_credentials_cluster):
node7.query("insert into test_table values ('2017-06-21', 111, 0)")
time.sleep(1)
assert node7.query("SELECT id FROM test_table order by id") == '111\n'
assert node8.query("SELECT id FROM test_table order by id") == ''
node8.query("insert into test_table values ('2017-06-22', 222, 1)")
time.sleep(1)
assert node7.query("SELECT id FROM test_table order by id") == '111\n'
assert node8.query("SELECT id FROM test_table order by id") == '222\n'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册