提交 70e50ecf 编写于 作者: P Pervakov Grigory

Base implementation of IDisk interafce for S3

上级 3a8dbf43
......@@ -112,7 +112,7 @@ void FileChecker::save() const
out->next();
}
disk->moveFile(tmp_files_info_path, files_info_path);
disk->replaceFile(tmp_files_info_path, files_info_path);
}
void FileChecker::load(Map & local_map, const String & path) const
......
......@@ -15,7 +15,7 @@ namespace ErrorCodes
extern const int PATH_ACCESS_DENIED;
}
std::mutex DiskLocal::mutex;
std::mutex IDisk::reservationMutex;
ReservationPtr DiskLocal::reserve(UInt64 bytes)
{
......@@ -26,7 +26,7 @@ ReservationPtr DiskLocal::reserve(UInt64 bytes)
bool DiskLocal::tryReserve(UInt64 bytes)
{
std::lock_guard lock(mutex);
std::lock_guard lock(IDisk::reservationMutex);
if (bytes == 0)
{
LOG_DEBUG(&Logger::get("DiskLocal"), "Reserving 0 bytes on disk " << backQuote(name));
......@@ -71,7 +71,7 @@ UInt64 DiskLocal::getAvailableSpace() const
UInt64 DiskLocal::getUnreservedSpace() const
{
std::lock_guard lock(mutex);
std::lock_guard lock(IDisk::reservationMutex);
auto available_space = getAvailableSpace();
available_space -= std::min(available_space, reserved_bytes);
return available_space;
......@@ -161,10 +161,9 @@ std::unique_ptr<WriteBuffer> DiskLocal::writeFile(const String & path, size_t bu
return std::make_unique<WriteBufferFromFile>(disk_path + path, buf_size, flags);
}
void DiskLocalReservation::update(UInt64 new_size)
{
std::lock_guard lock(DiskLocal::mutex);
std::lock_guard lock(IDisk::reservationMutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
......@@ -174,7 +173,7 @@ DiskLocalReservation::~DiskLocalReservation()
{
try
{
std::lock_guard lock(DiskLocal::mutex);
std::lock_guard lock(IDisk::reservationMutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
......
......@@ -4,7 +4,6 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <mutex>
#include <Poco/DirectoryIterator.h>
#include <Poco/File.h>
......@@ -79,8 +78,6 @@ private:
const String disk_path;
const UInt64 keep_free_space_bytes;
/// Used for reservation counters modification
static std::mutex mutex;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
};
......
#include "DiskS3.h"
#if USE_AWS_S3
# include "DiskFactory.h"
# include <IO/ReadBufferFromS3.h>
# include <IO/S3Common.h>
# include <IO/WriteBufferFromS3.h>
# include <Poco/File.h>
# include <Poco/FileStream.h>
# include <Common/quoteString.h>
# include <random>
# include <aws/s3/model/CopyObjectRequest.h>
# include <aws/s3/model/DeleteObjectRequest.h>
# include <aws/s3/model/GetObjectRequest.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FILE_ALREADY_EXISTS;
extern const int FILE_DOESNT_EXIST;
extern const int PATH_ACCESS_DENIED;
extern const int S3_ERROR;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
namespace
{
template <typename R, typename E>
void throwIfError(Aws::Utils::Outcome<R, E> && response)
{
if (!response.IsSuccess())
{
auto & err = response.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
}
}
class WriteIndirectBufferFromS3 : public WriteBufferFromS3
{
public:
WriteIndirectBufferFromS3(
std::shared_ptr<Aws::S3::S3Client> & client_ptr_,
const String & bucket_,
const String & metadata_path_,
const String & s3_path_,
size_t buf_size_)
: WriteBufferFromS3(client_ptr_, bucket_, s3_path_, DEFAULT_BLOCK_SIZE, buf_size_)
, metadata_path(metadata_path_)
, s3_path(s3_path_)
{
}
void finalize() override
{
WriteBufferFromS3::finalize();
Poco::FileOutputStream(metadata_path) << s3_path;
finalized = true;
}
~WriteIndirectBufferFromS3() override
{
if (!finalized)
{
WriteBufferFromS3::finalize();
Poco::FileOutputStream(metadata_path) << s3_path;
}
}
private:
bool finalized = false;
const String metadata_path;
const String s3_path;
};
}
DiskS3::DiskS3(String name_, std::shared_ptr<Aws::S3::S3Client> client_, String bucket_, String s3_root_path_, String metadata_path_)
: name(std::move(name_))
, client(std::move(client_))
, bucket(std::move(bucket_))
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
{
}
ReservationPtr DiskS3::reserve(UInt64 bytes)
{
if (!tryReserve(bytes))
return {};
return std::make_unique<DiskS3Reservation>(std::static_pointer_cast<DiskS3>(shared_from_this()), bytes);
}
bool DiskS3::exists(const String & path) const
{
return Poco::File(metadata_path + path).exists();
}
bool DiskS3::isFile(const String & path) const
{
return Poco::File(metadata_path + path).isFile();
}
bool DiskS3::isDirectory(const String & path) const
{
return Poco::File(metadata_path + path).isDirectory();
}
size_t DiskS3::getFileSize(const String & path) const
{
Aws::S3::Model::GetObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
auto outcome = client->GetObject(request);
if (!outcome.IsSuccess())
{
auto & err = outcome.GetError();
throw Exception(err.GetMessage(), static_cast<int>(err.GetErrorType()));
}
else
{
return outcome.GetResult().GetContentLength();
}
}
void DiskS3::createDirectory(const String & path)
{
Poco::File(metadata_path + path).createDirectory();
}
void DiskS3::createDirectories(const String & path)
{
Poco::File(metadata_path + path).createDirectories();
}
DiskDirectoryIteratorPtr DiskS3::iterateDirectory(const String & path)
{
return std::make_unique<DiskS3DirectoryIterator>(metadata_path + path, path);
}
void DiskS3::moveFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
throw Exception("File already exists " + to_path, ErrorCodes::FILE_ALREADY_EXISTS);
Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path);
}
void DiskS3::replaceFile(const String & from_path, const String & to_path)
{
Poco::File from_file(metadata_path + from_path);
Poco::File to_file(metadata_path + to_path);
if (to_file.exists())
{
Poco::File tmp_file(metadata_path + to_path + ".old");
to_file.renameTo(tmp_file.path());
from_file.renameTo(metadata_path + to_path);
remove(to_path + ".old");
}
else
from_file.renameTo(to_file.path());
}
void DiskS3::copyFile(const String & from_path, const String & to_path)
{
if (exists(to_path))
remove(to_path);
String s3_from_path;
String s3_to_path = s3_root_path + to_path + getRandomSuffix();
Poco::FileInputStream(metadata_path + from_path) >> s3_from_path;
Aws::S3::Model::CopyObjectRequest req;
req.SetBucket(bucket);
req.SetCopySource(s3_from_path);
req.SetKey(s3_to_path);
throwIfError(client->CopyObject(req));
Poco::FileOutputStream(metadata_path + to_path) << s3_to_path;
}
std::unique_ptr<ReadBuffer> DiskS3::readFile(const String & path, size_t buf_size) const
{
return std::make_unique<ReadBufferFromS3>(client, bucket, getS3Path(path), buf_size);
}
std::unique_ptr<WriteBuffer> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode)
{
if (!exists(path) || mode == WriteMode::Rewrite)
{
String new_s3_path = s3_root_path + path + '.' + getRandomSuffix();
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path, new_s3_path, buf_size);
}
else
{
auto old_s3_path = getS3Path(path);
ReadBufferFromS3 read_buffer(client, bucket, old_s3_path, buf_size);
auto writeBuffer = std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata_path + path, old_s3_path, buf_size);
std::vector<char> buffer(buf_size);
while (!read_buffer.eof())
writeBuffer->write(buffer.data(), read_buffer.read(buffer.data(), buf_size));
return writeBuffer;
}
}
String DiskS3::getS3Path(const String & path) const
{
if (!exists(path))
throw Exception("File not found: " + path, ErrorCodes::FILE_DOESNT_EXIST);
String s3_path;
Poco::FileInputStream(metadata_path + path) >> s3_path;
return s3_path;
}
String DiskS3::getRandomSuffix() const
{
std::mt19937 random{std::random_device{}()};
std::uniform_int_distribution<int> distribution('a', 'z');
String suffix(16, ' ');
for (auto & c : suffix)
c = distribution(random);
return suffix;
}
bool DiskS3::tryReserve(UInt64 bytes)
{
std::lock_guard lock(IDisk::reservationMutex);
if (bytes == 0)
{
LOG_DEBUG(&Logger::get("DiskS3"), "Reserving 0 bytes on s3 disk " << backQuote(name));
++reservation_count;
return true;
}
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (unreserved_space >= bytes)
{
LOG_DEBUG(
&Logger::get("DiskS3"),
"Reserving " << formatReadableSizeWithBinarySuffix(bytes) << " on disk " << backQuote(name) << ", having unreserved "
<< formatReadableSizeWithBinarySuffix(unreserved_space) << ".");
++reservation_count;
reserved_bytes += bytes;
return true;
}
return false;
}
void DiskS3::remove(const String & path)
{
if (exists(path))
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(getS3Path(path));
throwIfError(client->DeleteObject(request));
Poco::File(metadata_path + path).remove(true);
}
}
DiskS3Reservation::~DiskS3Reservation()
{
try
{
std::lock_guard lock(IDisk::reservationMutex);
if (disk->reserved_bytes < size)
{
disk->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservations size for disk '" + disk->getName() + "'.");
}
else
{
disk->reserved_bytes -= size;
}
if (disk->reservation_count == 0)
LOG_ERROR(&Logger::get("DiskLocal"), "Unbalanced reservation count for disk '" + disk->getName() + "'.");
else
--disk->reservation_count;
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void registerDiskS3(DiskFactory & factory)
{
auto creator = [](const String & name,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const Context & context) -> DiskPtr {
Poco::File disk{context.getPath() + "disks/" + name};
disk.createDirectories();
S3::URI uri(Poco::URI(config.getString(config_prefix + ".endpoint")));
auto client = S3::ClientFactory::instance().create(
uri.endpoint, config.getString(config_prefix + ".access_key_id"), config.getString(config_prefix + ".secret_access_key"));
if (uri.key.back() != '/')
throw Exception("S3 path must ends with '/', but '" + uri.key + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
String metadata_path = context.getPath() + "disks/" + name + "/";
auto s3disk = std::make_shared<DiskS3>(name, client, uri.bucket, uri.key, metadata_path);
{
auto file = s3disk->writeFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
file->write("test", 4);
}
{
auto file = s3disk->readFile("test_acl", DBMS_DEFAULT_BUFFER_SIZE);
String buf(4, '0');
file->readStrict(buf.data(), 4);
if (buf != "test")
throw Exception("No read accecss to S3 bucket in disk " + name, ErrorCodes::PATH_ACCESS_DENIED);
}
{
String s3_path;
Poco::FileInputStream(metadata_path + "test_acl") >> s3_path;
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(uri.bucket);
request.SetKey(s3_path);
throwIfError(client->DeleteObject(request));
Poco::File(metadata_path + "test_acl").remove();
}
return s3disk;
};
factory.registerDiskType("s3", creator);
}
}
#endif
#pragma once
#include <Common/config.h>
#if USE_AWS_S3
# include "DiskFactory.h"
# include <aws/s3/S3Client.h>
# include <Poco/DirectoryIterator.h>
namespace DB
{
class DiskS3 : public IDisk
{
public:
friend class DiskS3Reservation;
DiskS3(String name_, std::shared_ptr<Aws::S3::S3Client> client_, String bucket_, String s3_root_path_, String metadata_path_);
const String & getName() const override { return name; }
const String & getPath() const override { return s3_root_path; }
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getKeepingFreeSpace() const override { return 0; }
bool exists(const String & path) const override;
bool isFile(const String & path) const override;
bool isDirectory(const String & path) const override;
size_t getFileSize(const String & path) const override;
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override
{
for (auto it{iterateDirectory(path)}; it->isValid(); it->next())
remove(it->path());
}
void moveDirectory(const String & from_path, const String & to_path) override { moveFile(from_path, to_path); }
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
void copyFile(const String & from_path, const String & to_path) override;
std::unique_ptr<ReadBuffer> readFile(const String & path, size_t buf_size) const override;
std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size, WriteMode mode) override;
private:
String getS3Path(const String & path) const;
String getRandomSuffix() const;
bool tryReserve(UInt64 bytes);
void remove(const String & path);
private:
const String name;
std::shared_ptr<Aws::S3::S3Client> client;
const String bucket;
const String s3_root_path;
const String metadata_path;
UInt64 reserved_bytes = 0;
UInt64 reservation_count = 0;
};
using DiskS3Ptr = std::shared_ptr<DiskS3>;
class DiskS3DirectoryIterator : public IDiskDirectoryIterator
{
public:
DiskS3DirectoryIterator(const String & full_path, const String & folder_path_) : iter(full_path), folder_path(folder_path_) {}
void next() override { ++iter; }
bool isValid() const override { return iter != Poco::DirectoryIterator(); }
String path() const override
{
if (iter->isDirectory())
return folder_path + iter.name() + '/';
else
return folder_path + iter.name();
}
private:
Poco::DirectoryIterator iter;
String folder_path;
};
class DiskS3Reservation : public IReservation
{
public:
DiskS3Reservation(const DiskS3Ptr & disk_, UInt64 size_)
: disk(disk_), size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size_)
{
}
UInt64 getSize() const override { return size; }
DiskPtr getDisk() const override { return disk; }
void update(UInt64 new_size) override
{
std::lock_guard lock(IDisk::reservationMutex);
disk->reserved_bytes -= size;
size = new_size;
disk->reserved_bytes += size;
}
~DiskS3Reservation() override;
private:
DiskS3Ptr disk;
UInt64 size;
CurrentMetrics::Increment metric_increment;
};
}
#endif
......@@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <memory>
#include <mutex>
#include <utility>
#include <boost/noncopyable.hpp>
#include <Poco/Path.h>
......@@ -125,6 +126,10 @@ public:
/// Open the file for write and return WriteBuffer object.
virtual std::unique_ptr<WriteBuffer> writeFile(const String & path, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, WriteMode mode = WriteMode::Rewrite) = 0;
public:
/// Used for reservation counters modification
static std::mutex reservationMutex;
};
using DiskPtr = std::shared_ptr<IDisk>;
......@@ -151,7 +156,7 @@ public:
/**
* Information about reserved size on particular disk.
*/
class IReservation
class IReservation : boost::noncopyable
{
public:
/// Get reservation size.
......
#include "DiskFactory.h"
#include "registerDisks.h"
#include "DiskFactory.h"
#include <Common/config.h>
namespace DB
{
void registerDiskLocal(DiskFactory & factory);
void registerDiskMemory(DiskFactory & factory);
#if USE_AWS_S3
void registerDiskS3(DiskFactory & factory);
#endif
void registerDisks()
{
......@@ -12,6 +18,9 @@ void registerDisks()
registerDiskLocal(factory);
registerDiskMemory(factory);
#if USE_AWS_S3
registerDiskS3(factory);
#endif
}
}
......@@ -93,11 +93,10 @@ namespace S3
if (!endpoint.empty())
cfg.endpointOverride = endpoint;
auto cred_provider = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(access_key_id,
secret_access_key);
Aws::Auth::AWSCredentials cred(access_key_id, secret_access_key);
return std::make_shared<Aws::S3::S3Client>(
std::move(cred_provider), // Credentials provider.
cred, // Aws credentials.
std::move(cfg), // Client configuration.
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, // Sign policy.
endpoint.empty() // Use virtual addressing only if endpoint is not specified.
......@@ -105,7 +104,7 @@ namespace S3
}
URI::URI(Poco::URI & uri_)
URI::URI(const Poco::URI & uri_)
{
static const std::regex BUCKET_KEY_PATTERN("([^/]+)/(.*)");
......
......@@ -49,7 +49,7 @@ struct URI
String bucket;
String key;
explicit URI (Poco::URI & uri_);
explicit URI (const Poco::URI & uri_);
};
}
......
......@@ -69,6 +69,7 @@ void WriteBufferFromS3::nextImpl()
void WriteBufferFromS3::finalize()
{
next();
temporary_buffer->finalize();
if (!buffer_string.empty())
{
......
......@@ -425,6 +425,12 @@ void StorageTinyLog::truncate(const ASTPtr &, const Context &, TableStructureWri
addFiles(column.name, *column.type);
}
void StorageTinyLog::drop(TableStructureWriteLockHolder &)
{
std::unique_lock<std::shared_mutex> lock(rwlock);
disk->clearDirectory(table_path);
files.clear();
}
void registerStorageTinyLog(StorageFactory & factory)
{
......
......@@ -46,6 +46,8 @@ public:
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void drop(TableStructureWriteLockHolder &) override;
protected:
StorageTinyLog(
DiskPtr disk_,
......
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<storage_configuration>
<disks>
<default>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</default>
</disks>
</storage_configuration>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>
import logging
import pytest
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(cluster):
minio_client = cluster.minio_client
if minio_client.bucket_exists(cluster.minio_bucket):
minio_client.remove_bucket(cluster.minio_bucket)
minio_client.make_bucket(cluster.minio_bucket)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node", config_dir="configs", with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
yield cluster
finally:
cluster.shutdown()
def test_tinylog_s3(cluster):
node = cluster.instances["node"]
minio = cluster.minio_client
node.query("CREATE TABLE s3_test (id UInt64) Engine=TinyLog")
node.query("INSERT INTO s3_test SELECT number FROM numbers(3)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n"
assert list(map(lambda obj: obj.object_name[:-17],
minio.list_objects(cluster.minio_bucket, 'data/data/default/s3_test/'))) == [
'data/data/default/s3_test/id.bin', 'data/data/default/s3_test/tmp_sizes.json']
node.query("INSERT INTO s3_test SELECT number + 3 FROM numbers(3)")
assert node.query("SELECT * FROM s3_test") == "0\n1\n2\n3\n4\n5\n"
assert list(map(lambda obj: obj.object_name[:-17],
minio.list_objects(cluster.minio_bucket, 'data/data/default/s3_test/'))) == [
'data/data/default/s3_test/id.bin', 'data/data/default/s3_test/tmp_sizes.json']
node.query("DROP TABLE s3_test")
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/data/default/s3_test/'))) == 0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册