未验证 提交 fc8b8d23 编写于 作者: V vdimir 提交者: GitHub

Merge pull request #18812 from excitoon-favorites/betters3endpoints

Added prefix-based S3 endpoint settings
......@@ -114,6 +114,10 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = S3('https://storage.
- `_path` — Path to the file.
- `_file` — Name of the file.
**See Also**
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
## S3-related settings {#settings}
The following settings can be set before query execution or placed into configuration file.
......@@ -124,8 +128,29 @@ The following settings can be set before query execution or placed into configur
Security consideration: if malicious user can specify arbitrary S3 URLs, `s3_max_redirects` must be set to zero to avoid [SSRF](https://en.wikipedia.org/wiki/Server-side_request_forgery) attacks; or alternatively, `remote_host_filter` must be specified in server configuration.
**See Also**
### Endpoint-based settings {#endpointsettings}
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
The following settings can be specified in configuration file for given endpoint (which will be matched by exact prefix of a URL):
- `endpoint` — Mandatory. Specifies prefix of an endpoint.
- `access_key_id` and `secret_access_key` — Optional. Specifies credentials to use with given endpoint.
- `use_environment_credentials` — Optional, default value is `false`. If set to `true`, S3 client will try to obtain credentials from environment variables and Amazon EC2 metadata for given endpoint.
- `header` — Optional, can be speficied multiple times. Adds specified HTTP header to a request to given endpoint.
This configuration also applies to S3 disks in `MergeTree` table engine family.
Example:
```
<s3>
<endpoint-name>
<endpoint>https://storage.yandexcloud.net/my-test-bucket-768/</endpoint>
<!-- <access_key_id>ACCESS_KEY_ID</access_key_id> -->
<!-- <secret_access_key>SECRET_ACCESS_KEY</secret_access_key> -->
<!-- <use_environment_credentials>false</use_environment_credentials> -->
<!-- <header>Authorization: Bearer SOME-TOKEN</header> -->
</endpoint-name>
</s3>
```
[Original article](https://clickhouse.tech/docs/en/operations/table_engines/s3/) <!--hide-->
......@@ -216,7 +216,8 @@ StorageS3::StorageS3(
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
auto settings = context_.getStorageS3Settings().getSettings(uri.endpoint);
auto settings = context_.getStorageS3Settings().getSettings(uri.uri.toString());
Aws::Auth::AWSCredentials credentials(access_key_id_, secret_access_key_);
if (access_key_id_.empty())
credentials = Aws::Auth::AWSCredentials(std::move(settings.access_key_id), std::move(settings.secret_access_key));
......
......@@ -3,6 +3,8 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
{
......@@ -23,39 +25,50 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U
for (const String & key : config_keys)
{
auto endpoint = config.getString(config_elem + "." + key + ".endpoint");
auto access_key_id = config.getString(config_elem + "." + key + ".access_key_id", "");
auto secret_access_key = config.getString(config_elem + "." + key + ".secret_access_key", "");
std::optional<bool> use_environment_credentials;
if (config.has(config_elem + "." + key + ".use_environment_credentials"))
if (config.has(config_elem + "." + key + ".endpoint"))
{
use_environment_credentials = config.getBool(config_elem + "." + key + ".use_environment_credentials");
}
auto endpoint = config.getString(config_elem + "." + key + ".endpoint");
auto access_key_id = config.getString(config_elem + "." + key + ".access_key_id", "");
auto secret_access_key = config.getString(config_elem + "." + key + ".secret_access_key", "");
std::optional<bool> use_environment_credentials;
if (config.has(config_elem + "." + key + ".use_environment_credentials"))
{
use_environment_credentials = config.getBool(config_elem + "." + key + ".use_environment_credentials");
}
HeaderCollection headers;
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
config.keys(config_elem + "." + key, subconfig_keys);
for (const String & subkey : subconfig_keys)
{
if (subkey.starts_with("header"))
HeaderCollection headers;
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
config.keys(config_elem + "." + key, subconfig_keys);
for (const String & subkey : subconfig_keys)
{
auto header_str = config.getString(config_elem + "." + key + "." + subkey);
auto delimiter = header_str.find(':');
if (delimiter == String::npos)
throw Exception("Malformed s3 header value", ErrorCodes::INVALID_CONFIG_PARAMETER);
headers.emplace_back(HttpHeader{header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos)});
if (subkey.starts_with("header"))
{
auto header_str = config.getString(config_elem + "." + key + "." + subkey);
auto delimiter = header_str.find(':');
if (delimiter == String::npos)
throw Exception("Malformed s3 header value", ErrorCodes::INVALID_CONFIG_PARAMETER);
headers.emplace_back(HttpHeader{header_str.substr(0, delimiter), header_str.substr(delimiter + 1, String::npos)});
}
}
}
settings.emplace(endpoint, S3AuthSettings{std::move(access_key_id), std::move(secret_access_key), std::move(headers), use_environment_credentials});
settings.emplace(endpoint, S3AuthSettings{std::move(access_key_id), std::move(secret_access_key), std::move(headers), use_environment_credentials});
}
}
}
S3AuthSettings StorageS3Settings::getSettings(const String & endpoint) const
{
std::lock_guard lock(mutex);
if (auto setting = settings.find(endpoint); setting != settings.end())
return setting->second;
auto next_prefix_setting = settings.upper_bound(endpoint);
/// Linear time algorithm may be replaced with logarithmic with prefix tree map.
for (auto possible_prefix_setting = next_prefix_setting; possible_prefix_setting != settings.begin();)
{
std::advance(possible_prefix_setting, -1);
if (boost::algorithm::starts_with(endpoint, possible_prefix_setting->first))
return possible_prefix_setting->second;
}
return {};
}
......
......@@ -61,7 +61,10 @@ class QueryTimeoutExceedException(Exception):
class QueryRuntimeException(Exception):
pass
def __init__(self, message, returncode, stderr):
super(QueryRuntimeException, self).__init__(message)
self.returncode = returncode
self.stderr = stderr
class CommandRequest:
......@@ -106,7 +109,7 @@ class CommandRequest:
if (self.process.returncode != 0 or stderr) and not self.ignore_error:
raise QueryRuntimeException(
'Client failed! Return code: {}, stderr: {}'.format(self.process.returncode, stderr))
'Client failed! Return code: {}, stderr: {}'.format(self.process.returncode, stderr), self.process.returncode, stderr)
return stdout
......@@ -122,7 +125,7 @@ class CommandRequest:
raise QueryTimeoutExceedException('Client timed out!')
if (self.process.returncode == 0):
raise QueryRuntimeException('Client expected to be failed but succeeded! stdout: {}'.format(stdout))
raise QueryRuntimeException('Client expected to be failed but succeeded! stdout: {}'.format(stdout), self.process.returncode, stderr)
return stderr
......
......@@ -4,5 +4,8 @@
<endpoint>http://resolver:8080</endpoint>
<header>Authorization: Bearer TOKEN</header>
</s3_mock>
<s3_mock_restricted_directory>
<endpoint>http://resolver:8080/root-with-auth/restricteddirectory/</endpoint>
</s3_mock_restricted_directory>
</s3>
</yandex>
from bottle import abort, route, run, request, response
@route('/redirected/<_path>')
@route('/redirected/<_path:path>')
def infinite_redirect(_path):
response.set_header("Location", request.url)
response.status = 307
return 'Redirected'
@route('/<_bucket>/<_path>')
@route('/<_bucket>/<_path:path>')
def server(_bucket, _path):
for name in request.headers:
if name == 'Authorization' and request.headers[name] == 'Bearer TOKEN':
......
......@@ -410,6 +410,20 @@ def test_custom_auth_headers(cluster):
assert result == '1\t2\t3\n'
def test_custom_auth_headers_exclusion(cluster):
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = f"SELECT * FROM s3('http://resolver:8080/{cluster.minio_restricted_bucket}/restricteddirectory/{filename}', 'CSV', '{table_format}')"
instance = cluster.instances["dummy"] # type: ClickHouseInstance
with pytest.raises(helpers.client.QueryRuntimeException) as ei:
result = run_query(instance, get_query)
print(result)
assert ei.value.returncode == 243
assert '403 Forbidden' in ei.value.stderr
def test_infinite_redirect(cluster):
bucket = "redirected"
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册