未验证 提交 c3828a1f 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #19660 from CurtizJ/sync-directory

Better abstractions in disk interface
......@@ -37,7 +37,6 @@ SRCS(
CurrentMetrics.cpp
CurrentThread.cpp
DNSResolver.cpp
DirectorySyncGuard.cpp
Dwarf.cpp
Elf.cpp
ErrorCodes.cpp
......
......@@ -175,24 +175,14 @@ void DiskDecorator::truncateFile(const String & path, size_t size)
delegate->truncateFile(path, size);
}
int DiskDecorator::open(const String & path, int flags) const
{
return delegate->open(path, flags);
}
void DiskDecorator::close(int fd) const
{
delegate->close(fd);
}
void DiskDecorator::sync(int fd) const
Executor & DiskDecorator::getExecutor()
{
delegate->sync(fd);
return delegate->getExecutor();
}
Executor & DiskDecorator::getExecutor()
SyncGuardPtr DiskDecorator::getDirectorySyncGuard(const String & path) const
{
return delegate->getExecutor();
return delegate->getDirectorySyncGuard(path);
}
}
......@@ -48,11 +48,9 @@ public:
void setReadOnly(const String & path) override;
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
int open(const String & path, int flags) const override;
void close(int fd) const override;
void sync(int fd) const override;
const String getType() const override { return delegate->getType(); }
Executor & getExecutor() override;
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
protected:
DiskPtr delegate;
......
......@@ -5,6 +5,7 @@
#include <Interpreters/Context.h>
#include <Common/filesystemHelpers.h>
#include <Common/quoteString.h>
#include <Disks/LocalDirectorySyncGuard.h>
#include <IO/createReadBufferFromFileBase.h>
#include <common/logger_useful.h>
......@@ -20,10 +21,6 @@ namespace ErrorCodes
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED;
extern const int INCORRECT_DISK_INDEX;
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_FSYNC;
extern const int CANNOT_CLOSE_FILE;
extern const int CANNOT_TRUNCATE_FILE;
extern const int CANNOT_UNLINK;
extern const int CANNOT_RMDIR;
......@@ -315,26 +312,9 @@ void DiskLocal::copy(const String & from_path, const std::shared_ptr<IDisk> & to
IDisk::copy(from_path, to_disk, to_path); /// Copy files through buffers.
}
int DiskLocal::open(const String & path, int flags) const
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
{
String full_path = disk_path + path;
int fd = ::open(full_path.c_str(), flags);
if (-1 == fd)
throwFromErrnoWithPath("Cannot open file " + full_path, full_path,
errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
return fd;
}
void DiskLocal::close(int fd) const
{
if (-1 == ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
}
void DiskLocal::sync(int fd) const
{
if (-1 == ::fsync(fd))
throw Exception("Cannot fsync", ErrorCodes::CANNOT_FSYNC);
return std::make_unique<LocalDirectorySyncGuard>(disk_path + path);
}
DiskPtr DiskLocalReservation::getDisk(size_t i) const
......
......@@ -98,14 +98,12 @@ public:
void createHardLink(const String & src_path, const String & dst_path) override;
int open(const String & path, int flags) const override;
void close(int fd) const override;
void sync(int fd) const override;
void truncateFile(const String & path, size_t size) override;
const String getType() const override { return "local"; }
SyncGuardPtr getDirectorySyncGuard(const String & path) const override;
private:
bool tryReserve(UInt64 bytes);
......
......@@ -436,21 +436,6 @@ void DiskMemory::setReadOnly(const String &)
throw Exception("Method setReadOnly is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
int DiskMemory::open(const String & /*path*/, int /*flags*/) const
{
throw Exception("Method open is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::close(int /*fd*/) const
{
throw Exception("Method close is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::sync(int /*fd*/) const
{
throw Exception("Method sync is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::truncateFile(const String & path, size_t size)
{
std::lock_guard lock(mutex);
......
......@@ -89,10 +89,6 @@ public:
void createHardLink(const String & src_path, const String & dst_path) override;
int open(const String & path, int flags) const override;
void close(int fd) const override;
void sync(int fd) const override;
void truncateFile(const String & path, size_t size) override;
const String getType() const override { return "memory"; }
......
......@@ -76,4 +76,9 @@ void IDisk::truncateFile(const String &, size_t)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType());
}
SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
{
return nullptr;
}
}
......@@ -57,6 +57,19 @@ public:
using SpacePtr = std::shared_ptr<Space>;
/**
* A guard, that should synchronize file's or directory's state
* with storage device (e.g. fsync in POSIX) in its destructor.
*/
class ISyncGuard
{
public:
ISyncGuard() = default;
virtual ~ISyncGuard() = default;
};
using SyncGuardPtr = std::unique_ptr<ISyncGuard>;
/**
* A unit of storage persisting data and metadata.
* Abstract underlying storage technology.
......@@ -174,15 +187,6 @@ public:
/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const String & src_path, const String & dst_path) = 0;
/// Wrapper for POSIX open
virtual int open(const String & path, int flags) const = 0;
/// Wrapper for POSIX close
virtual void close(int fd) const = 0;
/// Wrapper for POSIX fsync
virtual void sync(int fd) const = 0;
/// Truncate file to specified size.
virtual void truncateFile(const String & path, size_t size);
......@@ -195,6 +199,9 @@ public:
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
/// Returns guard, that insures synchronization of directory metadata with storage device.
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
private:
std::unique_ptr<Executor> executor;
};
......
#include <Common/DirectorySyncGuard.h>
#include <Disks/LocalDirectorySyncGuard.h>
#include <Common/Exception.h>
#include <Disks/IDisk.h>
#include <fcntl.h> // O_RDWR
......@@ -14,14 +14,20 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_FSYNC;
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_CLOSE_FILE;
}
DirectorySyncGuard::DirectorySyncGuard(const DiskPtr & disk_, const String & path)
: disk(disk_)
, fd(disk_->open(path, O_DIRECTORY))
{}
LocalDirectorySyncGuard::LocalDirectorySyncGuard(const String & full_path)
: fd(::open(full_path.c_str(), O_DIRECTORY))
{
if (-1 == fd)
throwFromErrnoWithPath("Cannot open file " + full_path, full_path,
errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
}
DirectorySyncGuard::~DirectorySyncGuard()
LocalDirectorySyncGuard::~LocalDirectorySyncGuard()
{
try
{
......@@ -29,8 +35,11 @@ DirectorySyncGuard::~DirectorySyncGuard()
if (fcntl(fd, F_FULLFSYNC, 0))
throwFromErrno("Cannot fcntl(F_FULLFSYNC)", ErrorCodes::CANNOT_FSYNC);
#endif
disk->sync(fd);
disk->close(fd);
if (-1 == ::fsync(fd))
throw Exception("Cannot fsync", ErrorCodes::CANNOT_FSYNC);
if (-1 == ::close(fd))
throw Exception("Cannot close file", ErrorCodes::CANNOT_CLOSE_FILE);
}
catch (...)
{
......
#pragma once
#include <string>
#include <memory>
#include <Disks/IDisk.h>
namespace DB
{
......@@ -13,17 +12,16 @@ using DiskPtr = std::shared_ptr<IDisk>;
/// It's used to keep descriptor open, while doing some operations with it, and do fsync at the end.
/// Guaranties of sequence 'close-reopen-fsync' may depend on kernel version.
/// Source: linux-fsdevel mailing-list https://marc.info/?l=linux-fsdevel&m=152535409207496
class DirectorySyncGuard
class LocalDirectorySyncGuard final : public ISyncGuard
{
public:
/// NOTE: If you have already opened descriptor, it's preferred to use
/// this constructor instead of constructor with path.
DirectorySyncGuard(const DiskPtr & disk_, int fd_) : disk(disk_), fd(fd_) {}
DirectorySyncGuard(const DiskPtr & disk_, const std::string & path);
~DirectorySyncGuard();
LocalDirectorySyncGuard(int fd_) : fd(fd_) {}
LocalDirectorySyncGuard(const String & full_path);
~LocalDirectorySyncGuard() override;
private:
DiskPtr disk;
int fd = -1;
};
......
......@@ -36,7 +36,6 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int UNKNOWN_FORMAT;
extern const int INCORRECT_DISK_INDEX;
extern const int NOT_IMPLEMENTED;
extern const int PATH_ACCESS_DENIED;
extern const int CANNOT_DELETE_DIRECTORY;
}
......@@ -878,21 +877,6 @@ void DiskS3::setReadOnly(const String & path)
metadata.save();
}
int DiskS3::open(const String & /*path*/, int /*flags*/) const
{
throw Exception("Method open is not implemented for S3 disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskS3::close(int /*fd*/) const
{
throw Exception("Method close is not implemented for S3 disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskS3::sync(int /*fd*/) const
{
throw Exception("Method sync is not implemented for S3 disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskS3::shutdown()
{
/// This call stops any next retry attempts for ongoing S3 requests.
......
......@@ -105,10 +105,6 @@ public:
void setReadOnly(const String & path) override;
int open(const String & path, int flags) const override;
void close(int fd) const override;
void sync(int fd) const override;
const String getType() const override { return "s3"; }
void shutdown() override;
......
......@@ -17,6 +17,7 @@ SRCS(
DiskSelector.cpp
IDisk.cpp
IVolume.cpp
LocalDirectorySyncGuard.cpp
SingleDiskVolume.cpp
StoragePolicy.cpp
VolumeJBOD.cpp
......
......@@ -7,7 +7,6 @@
#include <Common/quoteString.h>
#include <Common/hex.h>
#include <Common/ActionBlocker.h>
#include <Common/DirectorySyncGuard.h>
#include <common/StringRef.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cluster.h>
......@@ -178,6 +177,13 @@ namespace
|| code == ErrorCodes::CANNOT_DECOMPRESS
|| (!remote_error && code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF);
}
SyncGuardPtr getDirectorySyncGuard(bool dir_fsync, const DiskPtr & disk, const String & path)
{
if (dir_fsync)
return disk->getDirectorySyncGuard(path);
return nullptr;
}
}
......@@ -244,10 +250,7 @@ void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
task_handle->deactivate();
}
std::optional<DirectorySyncGuard> dir_sync_guard;
if (dir_fsync)
dir_sync_guard.emplace(disk, relative_path);
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
Poco::File(path).remove(true);
}
......@@ -448,10 +451,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
throw;
}
std::optional<DirectorySyncGuard> dir_sync_guard;
if (dir_fsync)
dir_sync_guard.emplace(disk, relative_path);
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
Poco::File{file_path}.remove();
metric_pending_files.sub();
......@@ -537,9 +537,7 @@ struct StorageDistributedDirectoryMonitor::Batch
/// Temporary file is required for atomicity.
String tmp_file{parent.current_batch_file_path + ".tmp"};
std::optional<DirectorySyncGuard> dir_sync_guard;
if (dir_fsync)
dir_sync_guard.emplace(parent.disk, parent.relative_path);
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
if (Poco::File{tmp_file}.exists())
LOG_ERROR(parent.log, "Temporary file {} exists. Unclean shutdown?", backQuote(tmp_file));
......@@ -607,10 +605,7 @@ struct StorageDistributedDirectoryMonitor::Batch
{
LOG_TRACE(parent.log, "Sent a batch of {} files.", file_indices.size());
std::optional<DirectorySyncGuard> dir_sync_guard;
if (dir_fsync)
dir_sync_guard.emplace(parent.disk, parent.relative_path);
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
for (UInt64 file_index : file_indices)
Poco::File{file_index_to_path.at(file_index)}.remove();
}
......@@ -813,9 +808,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
}
{
std::optional<DirectorySyncGuard> dir_sync_guard;
if (dir_fsync)
dir_sync_guard.emplace(disk, relative_path);
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
/// current_batch.txt will not exist if there was no send
/// (this is the case when all batches that was pending has been marked as pending)
......@@ -834,13 +827,8 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
Poco::File{broken_path}.createDirectory();
std::optional<DirectorySyncGuard> dir_sync_guard;
std::optional<DirectorySyncGuard> broken_dir_sync_guard;
if (dir_fsync)
{
broken_dir_sync_guard.emplace(disk, relative_path + "/broken/");
dir_sync_guard.emplace(disk, relative_path);
}
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path + "/broken/");
Poco::File{file_path}.renameTo(broken_file_path);
......
......@@ -29,7 +29,6 @@
#include <Common/escapeForFileName.h>
#include <Common/CurrentThread.h>
#include <Common/createHardLink.h>
#include <Common/DirectorySyncGuard.h>
#include <common/logger_useful.h>
#include <ext/range.h>
#include <ext/scope_guard.h>
......@@ -623,11 +622,11 @@ void DistributedBlockOutputStream::writeToShard(const Block & block, const std::
auto make_directory_sync_guard = [&](const std::string & current_path)
{
std::unique_ptr<DirectorySyncGuard> guard;
SyncGuardPtr guard;
if (dir_fsync)
{
const std::string relative_path(data_path + current_path);
guard = std::make_unique<DirectorySyncGuard>(disk, relative_path);
guard = disk->getDirectorySyncGuard(relative_path);
}
return guard;
};
......
......@@ -5,7 +5,6 @@
#include <Disks/SingleDiskVolume.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <Common/DirectorySyncGuard.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <IO/HTTPCommon.h>
#include <ext/scope_guard.h>
......@@ -398,9 +397,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
disk->createDirectories(part_download_path);
std::optional<DirectorySyncGuard> sync_guard;
SyncGuardPtr sync_guard;
if (data.getSettings()->fsync_part_directory)
sync_guard.emplace(disk, part_download_path);
sync_guard = disk->getDirectorySyncGuard(part_download_path);
MergeTreeData::DataPart::Checksums checksums;
for (size_t i = 0; i < files; ++i)
......
......@@ -11,7 +11,6 @@
#include <Storages/MergeTree/checkDataPart.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/DirectorySyncGuard.h>
#include <Common/CurrentMetrics.h>
#include <common/JSON.h>
#include <common/logger_useful.h>
......@@ -1002,9 +1001,9 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
volume->getDisk()->moveFile(from, to);
relative_path = new_relative_path;
std::optional<DirectorySyncGuard> sync_guard;
SyncGuardPtr sync_guard;
if (storage.getSettings()->fsync_part_directory)
sync_guard.emplace(volume->getDisk(), to);
sync_guard = volume->getDisk()->getDirectorySyncGuard(to);
}
......
......@@ -29,7 +29,6 @@
#include <Common/interpolate.h>
#include <Common/typeid_cast.h>
#include <Common/escapeForFileName.h>
#include <Common/DirectorySyncGuard.h>
#include <Parsers/queryToString.h>
#include <cmath>
......@@ -780,9 +779,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
gathering_column_names.clear();
}
std::optional<DirectorySyncGuard> sync_guard;
SyncGuardPtr sync_guard;
if (data.getSettings()->fsync_part_directory)
sync_guard.emplace(disk, new_part_tmp_path);
sync_guard = disk->getDirectorySyncGuard(new_part_tmp_path);
/** Read from all parts, merge and write into a new one.
* In passing, we calculate expression for sorting.
......@@ -1182,9 +1181,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
disk->createDirectories(new_part_tmp_path);
std::optional<DirectorySyncGuard> sync_guard;
SyncGuardPtr sync_guard;
if (data.getSettings()->fsync_part_directory)
sync_guard.emplace(disk, new_part_tmp_path);
sync_guard = disk->getDirectorySyncGuard(new_part_tmp_path);
/// Don't change granularity type while mutating subset of columns
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(new_data_part->getType())
......
......@@ -12,7 +12,6 @@
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Common/typeid_cast.h>
#include <Common/DirectorySyncGuard.h>
#include <Parsers/queryToString.h>
......@@ -362,7 +361,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->is_temp = true;
std::optional<DirectorySyncGuard> sync_guard;
SyncGuardPtr sync_guard;
if (new_data_part->isStoredOnDisk())
{
/// The name could be non-unique in case of stale files from previous runs.
......@@ -378,7 +377,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
disk->createDirectories(full_path);
if (data.getSettings()->fsync_part_directory)
sync_guard.emplace(disk, full_path);
sync_guard = disk->getDirectorySyncGuard(full_path);
}
if (metadata_snapshot->hasRowsTTL())
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册