提交 82e7631d 编写于 作者: Z Zhichao Cao 提交者: Facebook GitHub Bot

Replace Status with IOStatus in the backupable_db (#8820)

Summary:
In order to populate the IOStatus up to the higher level, replace some of the Status to IOStatus.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8820

Test Plan: make check

Reviewed By: pdillinger

Differential Revision: D30967215

Pulled By: zhichao-cao

fbshipit-source-id: ccf9d5cfbd9d3de047c464aaa85f9fa43b474903
上级 5c92aa38
......@@ -11,24 +11,24 @@
namespace ROCKSDB_NAMESPACE {
Status LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
IOStatus LineFileReader::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg) {
std::unique_ptr<FSSequentialFile> file;
Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (s.ok()) {
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new LineFileReader(std::move(file), fname));
}
return s;
return io_s;
}
bool LineFileReader::ReadLine(std::string* out) {
assert(out);
if (!status_.ok()) {
if (!io_status_.ok()) {
// Status should be checked (or permit unchecked) any time we return false.
status_.MustCheck();
io_status_.MustCheck();
return false;
}
out->clear();
......@@ -44,16 +44,16 @@ bool LineFileReader::ReadLine(std::string* out) {
return true;
}
if (at_eof_) {
status_.MustCheck();
io_status_.MustCheck();
return false;
}
// else flush and reload buffer
out->append(buf_begin_, buf_end_ - buf_begin_);
Slice result;
status_ = sfr_.Read(buf_.size(), &result, buf_.data());
io_status_ = sfr_.Read(buf_.size(), &result, buf_.data());
IOSTATS_ADD(bytes_read, result.size());
if (!status_.ok()) {
status_.MustCheck();
if (!io_status_.ok()) {
io_status_.MustCheck();
return false;
}
if (result.size() != buf_.size()) {
......
......@@ -17,7 +17,7 @@ class LineFileReader {
private:
std::array<char, 8192> buf_;
SequentialFileReader sfr_;
Status status_;
IOStatus io_status_;
const char* buf_begin_ = buf_.data();
const char* buf_end_ = buf_.data();
size_t line_number_ = 0;
......@@ -29,7 +29,7 @@ class LineFileReader {
explicit LineFileReader(Args&&... args)
: sfr_(std::forward<Args&&>(args)...) {}
static Status Create(const std::shared_ptr<FileSystem>& fs,
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<LineFileReader>* reader,
IODebugContext* dbg);
......@@ -53,7 +53,7 @@ class LineFileReader {
// Returns any error encountered during read. The error is considered
// permanent and no retry or recovery is attempted with the same
// LineFileReader.
const Status& GetStatus() const { return status_; }
const IOStatus& GetStatus() const { return io_status_; }
};
} // namespace ROCKSDB_NAMESPACE
......@@ -22,20 +22,20 @@
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
Status SequentialFileReader::Create(
IOStatus SequentialFileReader::Create(
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
IODebugContext* dbg) {
std::unique_ptr<FSSequentialFile> file;
Status s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (s.ok()) {
IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
reader->reset(new SequentialFileReader(std::move(file), fname));
}
return s;
return io_s;
}
Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
Status s;
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
IOStatus io_s;
if (use_direct_io()) {
#ifndef ROCKSDB_LITE
size_t offset = offset_.fetch_add(n);
......@@ -48,9 +48,9 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
buf.Alignment(alignment);
buf.AllocateNewBuffer(size);
Slice tmp;
s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
buf.BufferStart(), nullptr);
if (s.ok() && offset_advance < tmp.size()) {
if (io_s.ok() && offset_advance < tmp.size()) {
buf.Size(tmp.size());
r = buf.Read(scratch, offset_advance,
std::min(tmp.size() - offset_advance, n));
......@@ -58,17 +58,17 @@ Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
*result = Slice(scratch, r);
#endif // !ROCKSDB_LITE
} else {
s = file_->Read(n, IOOptions(), result, scratch, nullptr);
io_s = file_->Read(n, IOOptions(), result, scratch, nullptr);
}
IOSTATS_ADD(bytes_read, result->size());
return s;
return io_s;
}
Status SequentialFileReader::Skip(uint64_t n) {
IOStatus SequentialFileReader::Skip(uint64_t n) {
#ifndef ROCKSDB_LITE
if (use_direct_io()) {
offset_ += static_cast<size_t>(n);
return Status::OK();
return IOStatus::OK();
}
#endif // !ROCKSDB_LITE
return file_->Skip(n);
......
......@@ -41,7 +41,7 @@ class SequentialFileReader {
: file_name_(_file_name),
file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size),
io_tracer, _file_name) {}
static Status Create(const std::shared_ptr<FileSystem>& fs,
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<SequentialFileReader>* reader,
IODebugContext* dbg);
......@@ -49,9 +49,9 @@ class SequentialFileReader {
SequentialFileReader(const SequentialFileReader&) = delete;
SequentialFileReader& operator=(const SequentialFileReader&) = delete;
Status Read(size_t n, Slice* result, char* scratch);
IOStatus Read(size_t n, Slice* result, char* scratch);
Status Skip(uint64_t n);
IOStatus Skip(uint64_t n);
FSSequentialFile* file() { return file_.get(); }
......
......@@ -17,6 +17,7 @@
#include <vector>
#include "rocksdb/env.h"
#include "rocksdb/io_status.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
......@@ -406,25 +407,25 @@ class BackupEngineReadOnlyBase {
std::vector<BackupID>* corrupt_backup_ids) const = 0;
// Restore to specified db_dir and wal_dir from backup_id.
virtual Status RestoreDBFromBackup(const RestoreOptions& options,
virtual IOStatus RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) const = 0;
// keep for backward compatibility.
virtual Status RestoreDBFromBackup(
virtual IOStatus RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& options = RestoreOptions()) const {
return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
}
// Like RestoreDBFromBackup but restores from latest non-corrupt backup_id
virtual Status RestoreDBFromLatestBackup(
virtual IOStatus RestoreDBFromLatestBackup(
const RestoreOptions& options, const std::string& db_dir,
const std::string& wal_dir) const = 0;
// keep for backward compatibility.
virtual Status RestoreDBFromLatestBackup(
virtual IOStatus RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& options = RestoreOptions()) const {
return RestoreDBFromLatestBackup(options, db_dir, wal_dir);
......@@ -445,7 +446,7 @@ class BackupEngineReadOnlyBase {
// their sizes (and checksums) when the BackupEngine was opened.
//
// Returns Status::OK() if all checks are good
virtual Status VerifyBackup(BackupID backup_id,
virtual IOStatus VerifyBackup(BackupID backup_id,
bool verify_with_checksum = false) const = 0;
};
......@@ -457,12 +458,12 @@ class BackupEngineAppendOnlyBase {
virtual ~BackupEngineAppendOnlyBase() {}
// same as CreateNewBackup, but stores extra application metadata.
virtual Status CreateNewBackupWithMetadata(
virtual IOStatus CreateNewBackupWithMetadata(
const CreateBackupOptions& options, DB* db,
const std::string& app_metadata, BackupID* new_backup_id = nullptr) = 0;
// keep here for backward compatibility.
virtual Status CreateNewBackupWithMetadata(
virtual IOStatus CreateNewBackupWithMetadata(
DB* db, const std::string& app_metadata, bool flush_before_backup = false,
std::function<void()> progress_callback = []() {}) {
CreateBackupOptions options;
......@@ -514,7 +515,7 @@ class BackupEngineAppendOnlyBase {
// with Append or Write operations in another BackupEngine on the same
// backup_dir, because temporary files will be treated as obsolete and
// deleted.
virtual Status GarbageCollect() = 0;
virtual IOStatus GarbageCollect() = 0;
};
// A backup engine for organizing and managing backups.
......@@ -585,13 +586,13 @@ class BackupEngine : public BackupEngineReadOnlyBase,
// Deletes old backups, keeping latest num_backups_to_keep alive.
// See also DeleteBackup.
virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
virtual IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) = 0;
// Deletes a specific backup. If this operation (or PurgeOldBackups)
// is not completed due to crash, power failure, etc. the state
// will be cleaned up the next time you call DeleteBackup,
// PurgeOldBackups, or GarbageCollect.
virtual Status DeleteBackup(BackupID backup_id) = 0;
virtual IOStatus DeleteBackup(BackupID backup_id) = 0;
};
// A variant of BackupEngine that only allows "Read" operations. See
......
......@@ -128,17 +128,17 @@ class BackupEngineImpl {
bool read_only = false);
~BackupEngineImpl();
Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
const std::string& app_metadata,
IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options,
DB* db, const std::string& app_metadata,
BackupID* new_backup_id_ptr);
Status PurgeOldBackups(uint32_t num_backups_to_keep);
IOStatus PurgeOldBackups(uint32_t num_backups_to_keep);
Status DeleteBackup(BackupID backup_id);
IOStatus DeleteBackup(BackupID backup_id);
void StopBackup() { stop_backup_.store(true, std::memory_order_release); }
Status GarbageCollect();
IOStatus GarbageCollect();
// The returned BackupInfos are in chronological order, which means the
// latest backup comes last.
......@@ -150,21 +150,21 @@ class BackupEngineImpl {
void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) const;
Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir,
IOStatus RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id, const std::string& db_dir,
const std::string& wal_dir) const;
Status RestoreDBFromLatestBackup(const RestoreOptions& options,
IOStatus RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
const std::string& wal_dir) const {
// Note: don't read latest_valid_backup_id_ outside of lock
return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
}
Status VerifyBackup(BackupID backup_id,
IOStatus VerifyBackup(BackupID backup_id,
bool verify_with_checksum = false) const;
Status Initialize();
IOStatus Initialize();
ShareFilesNaming GetNamingNoFlags() const {
return options_.share_files_with_checksum_naming &
......@@ -178,12 +178,12 @@ class BackupEngineImpl {
private:
void DeleteChildren(const std::string& dir,
uint32_t file_type_filter = 0) const;
Status DeleteBackupNoGC(BackupID backup_id);
IOStatus DeleteBackupNoGC(BackupID backup_id);
// Extends the "result" map with pathname->size mappings for the contents of
// "dir" in "env". Pathnames are prefixed with "dir".
Status ReadChildFileCurrentSizes(
const std::string& dir, Env* env,
IOStatus ReadChildFileCurrentSizes(
const std::string& dir, const std::shared_ptr<FileSystem>&,
std::unordered_map<std::string, uint64_t>* result) const;
struct FileInfo {
......@@ -345,14 +345,15 @@ class BackupEngineImpl {
BackupMeta(
const std::string& meta_filename, const std::string& meta_tmp_filename,
std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
Env* env)
Env* env, const std::shared_ptr<FileSystem>& fs)
: timestamp_(0),
sequence_number_(0),
size_(0),
meta_filename_(meta_filename),
meta_tmp_filename_(meta_tmp_filename),
file_infos_(file_infos),
env_(env) {}
env_(env),
fs_(fs) {}
BackupMeta(const BackupMeta&) = delete;
BackupMeta& operator=(const BackupMeta&) = delete;
......@@ -386,9 +387,9 @@ class BackupEngineImpl {
app_metadata_ = app_metadata;
}
Status AddFile(std::shared_ptr<FileInfo> file_info);
IOStatus AddFile(std::shared_ptr<FileInfo> file_info);
Status Delete(bool delete_meta = true);
IOStatus Delete(bool delete_meta = true);
bool Empty() const { return files_.empty(); }
......@@ -404,12 +405,12 @@ class BackupEngineImpl {
}
// @param abs_path_to_size Pre-fetched file sizes (bytes).
Status LoadFromFile(
IOStatus LoadFromFile(
const std::string& backup_dir,
const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
RateLimiter* rate_limiter, Logger* info_log,
std::unordered_set<std::string>* reported_ignored_fields);
Status StoreToFile(
IOStatus StoreToFile(
bool sync, const TEST_FutureSchemaVersion2Options* test_future_options);
std::string GetInfoString() {
......@@ -438,8 +439,8 @@ class BackupEngineImpl {
dst_dir.replace(i, kMetaDirSlash.size(), kPrivateDirSlash);
// Make the RemapSharedFileSystem
std::shared_ptr<FileSystem> remap_fs =
std::make_shared<RemapSharedFileSystem>(
env_->GetFileSystem(), dst_dir, src_base_dir, files_);
std::make_shared<RemapSharedFileSystem>(fs_, dst_dir, src_base_dir,
files_);
// Make it read-only for safety
remap_fs = std::make_shared<ReadOnlyFileSystem>(remap_fs);
// Make an Env wrapper
......@@ -462,6 +463,8 @@ class BackupEngineImpl {
std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
Env* env_;
mutable std::shared_ptr<Env> env_for_open_;
std::shared_ptr<FileSystem> fs_;
IOOptions iooptions_ = IOOptions();
}; // BackupMeta
void SetBackupInfoFromBackupMeta(BackupID id, const BackupMeta& meta,
......@@ -534,16 +537,16 @@ class BackupEngineImpl {
//
// @param src If non-empty, the file is copied from this pathname.
// @param contents If non-empty, the file will be created with these contents.
Status CopyOrCreateFile(const std::string& src, const std::string& dst,
const std::string& contents, Env* src_env,
Env* dst_env, const EnvOptions& src_env_options,
bool sync, RateLimiter* rate_limiter,
uint64_t* size = nullptr,
std::string* checksum_hex = nullptr,
IOStatus CopyOrCreateFile(
const std::string& src, const std::string& dst,
const std::string& contents, Env* src_env, Env* dst_env,
const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter,
uint64_t* size = nullptr, std::string* checksum_hex = nullptr,
uint64_t size_limit = 0,
std::function<void()> progress_callback = []() {});
Status ReadFileAndComputeChecksum(const std::string& src, Env* src_env,
IOStatus ReadFileAndComputeChecksum(const std::string& src,
const std::shared_ptr<FileSystem>& src_fs,
const EnvOptions& src_env_options,
uint64_t size_limit,
std::string* checksum_hex) const;
......@@ -563,13 +566,13 @@ class BackupEngineImpl {
// channel when the BackupEngineImpl is shutdown, these will also have
// Status that have not been checked. This
// TODO: Fix those issues so that the Status
status.PermitUncheckedError();
io_status.PermitUncheckedError();
}
uint64_t size;
std::string checksum_hex;
std::string db_id;
std::string db_session_id;
Status status;
IOStatus io_status;
};
// Exactly one of src_path and contents must be non-empty. If src_path is
......@@ -756,7 +759,7 @@ class BackupEngineImpl {
// copied.
// @param fname Name of destination file and, in case of copy, source file.
// @param contents If non-empty, the file will be created with these contents.
Status AddBackupFileWorkItem(
IOStatus AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
......@@ -773,7 +776,7 @@ class BackupEngineImpl {
BackupID latest_backup_id_;
BackupID latest_valid_backup_id_;
std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
std::map<BackupID, std::pair<IOStatus, std::unique_ptr<BackupMeta>>>
corrupt_backups_;
std::unordered_map<std::string,
std::shared_ptr<FileInfo>> backuped_file_infos_;
......@@ -785,16 +788,19 @@ class BackupEngineImpl {
Env* backup_env_;
// directories
std::unique_ptr<Directory> backup_directory_;
std::unique_ptr<Directory> shared_directory_;
std::unique_ptr<Directory> meta_directory_;
std::unique_ptr<Directory> private_directory_;
std::unique_ptr<FSDirectory> backup_directory_;
std::unique_ptr<FSDirectory> shared_directory_;
std::unique_ptr<FSDirectory> meta_directory_;
std::unique_ptr<FSDirectory> private_directory_;
static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
bool read_only_;
BackupStatistics backup_statistics_;
std::unordered_set<std::string> reported_ignored_fields_;
static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
std::shared_ptr<FileSystem> db_fs_;
std::shared_ptr<FileSystem> backup_fs_;
IOOptions io_options_ = IOOptions();
public:
std::unique_ptr<TEST_FutureSchemaVersion2Options> test_future_options_;
......@@ -813,20 +819,20 @@ class BackupEngineImplThreadSafe : public BackupEngine,
~BackupEngineImplThreadSafe() override {}
using BackupEngine::CreateNewBackupWithMetadata;
Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
const std::string& app_metadata,
IOStatus CreateNewBackupWithMetadata(const CreateBackupOptions& options,
DB* db, const std::string& app_metadata,
BackupID* new_backup_id) override {
WriteLock lock(&mutex_);
return impl_.CreateNewBackupWithMetadata(options, db, app_metadata,
new_backup_id);
}
Status PurgeOldBackups(uint32_t num_backups_to_keep) override {
IOStatus PurgeOldBackups(uint32_t num_backups_to_keep) override {
WriteLock lock(&mutex_);
return impl_.PurgeOldBackups(num_backups_to_keep);
}
Status DeleteBackup(BackupID backup_id) override {
IOStatus DeleteBackup(BackupID backup_id) override {
WriteLock lock(&mutex_);
return impl_.DeleteBackup(backup_id);
}
......@@ -836,7 +842,7 @@ class BackupEngineImplThreadSafe : public BackupEngine,
impl_.StopBackup();
}
Status GarbageCollect() override {
IOStatus GarbageCollect() override {
WriteLock lock(&mutex_);
return impl_.GarbageCollect();
}
......@@ -867,22 +873,22 @@ class BackupEngineImplThreadSafe : public BackupEngine,
}
using BackupEngine::RestoreDBFromBackup;
Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir,
IOStatus RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id, const std::string& db_dir,
const std::string& wal_dir) const override {
ReadLock lock(&mutex_);
return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
}
using BackupEngine::RestoreDBFromLatestBackup;
Status RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
IOStatus RestoreDBFromLatestBackup(
const RestoreOptions& options, const std::string& db_dir,
const std::string& wal_dir) const override {
// Defer to above function, which locks
return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
}
Status VerifyBackup(BackupID backup_id,
IOStatus VerifyBackup(BackupID backup_id,
bool verify_with_checksum = false) const override {
ReadLock lock(&mutex_);
return impl_.VerifyBackup(backup_id, verify_with_checksum);
......@@ -940,6 +946,8 @@ BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options,
options_.restore_rate_limiter.reset(
NewGenericRateLimiter(options_.restore_rate_limit));
}
db_fs_ = db_env_->GetFileSystem();
backup_fs_ = backup_env_->GetFileSystem();
}
BackupEngineImpl::~BackupEngineImpl() {
......@@ -953,7 +961,7 @@ BackupEngineImpl::~BackupEngineImpl() {
}
}
Status BackupEngineImpl::Initialize() {
IOStatus BackupEngineImpl::Initialize() {
assert(!initialized_);
initialized_ = true;
if (read_only_) {
......@@ -976,7 +984,7 @@ Status BackupEngineImpl::Initialize() {
}
// gather the list of directories that we need to create
std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
std::vector<std::pair<std::string, std::unique_ptr<FSDirectory>*>>
directories;
directories.emplace_back(GetAbsolutePath(), &backup_directory_);
if (options_.share_table_files) {
......@@ -994,23 +1002,26 @@ Status BackupEngineImpl::Initialize() {
directories.emplace_back(meta_path, &meta_directory_);
// create all the dirs we need
for (const auto& d : directories) {
auto s = backup_env_->CreateDirIfMissing(d.first);
if (s.ok()) {
s = backup_env_->NewDirectory(d.first, d.second);
IOStatus io_s =
backup_fs_->CreateDirIfMissing(d.first, io_options_, nullptr);
if (io_s.ok()) {
io_s =
backup_fs_->NewDirectory(d.first, io_options_, d.second, nullptr);
}
if (!s.ok()) {
return s;
if (!io_s.ok()) {
return io_s;
}
}
}
std::vector<std::string> backup_meta_files;
{
auto s = backup_env_->GetChildren(meta_path, &backup_meta_files);
if (s.IsNotFound()) {
return Status::NotFound(meta_path + " is missing");
} else if (!s.ok()) {
return s;
IOStatus io_s = backup_fs_->GetChildren(meta_path, io_options_,
&backup_meta_files, nullptr);
if (io_s.IsNotFound()) {
return IOStatus::NotFound(meta_path + " is missing");
} else if (!io_s.ok()) {
return io_s;
}
}
// create backups_ structure
......@@ -1034,7 +1045,7 @@ Status BackupEngineImpl::Initialize() {
backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
GetBackupMetaFile(backup_id, false /* tmp */),
GetBackupMetaFile(backup_id, true /* tmp */),
&backuped_file_infos_, backup_env_))));
&backuped_file_infos_, backup_env_, backup_fs_))));
}
latest_backup_id_ = 0;
......@@ -1045,12 +1056,12 @@ Status BackupEngineImpl::Initialize() {
options_.info_log,
"Backup Engine started with destroy_old_data == true, deleting all "
"backups");
auto s = PurgeOldBackups(0);
if (s.ok()) {
s = GarbageCollect();
IOStatus io_s = PurgeOldBackups(0);
if (io_s.ok()) {
io_s = GarbageCollect();
}
if (!s.ok()) {
return s;
if (!io_s.ok()) {
return io_s;
}
} else { // Load data from storage
// abs_path_to_size: maps absolute paths of files in backup directory to
......@@ -1061,11 +1072,11 @@ Status BackupEngineImpl::Initialize() {
for (const auto& rel_dir :
{GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
const auto abs_dir = GetAbsolutePath(rel_dir);
Status s =
ReadChildFileCurrentSizes(abs_dir, backup_env_, &abs_path_to_size);
if (!s.ok()) {
IOStatus io_s =
ReadChildFileCurrentSizes(abs_dir, backup_fs_, &abs_path_to_size);
if (!io_s.ok()) {
// I/O error likely impacting all backups
return s;
return io_s;
}
}
// load the backups if any, until valid_backups_to_open of the latest
......@@ -1084,26 +1095,26 @@ Status BackupEngineImpl::Initialize() {
// Insert files and their sizes in backup sub-directories
// (private/backup_id) to abs_path_to_size
Status s = ReadChildFileCurrentSizes(
GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
IOStatus io_s = ReadChildFileCurrentSizes(
GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_fs_,
&abs_path_to_size);
if (s.ok()) {
s = backup_iter->second->LoadFromFile(
if (io_s.ok()) {
io_s = backup_iter->second->LoadFromFile(
options_.backup_dir, abs_path_to_size,
options_.backup_rate_limiter.get(), options_.info_log,
&reported_ignored_fields_);
}
if (s.IsCorruption() || s.IsNotSupported()) {
if (io_s.IsCorruption() || io_s.IsNotSupported()) {
ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
backup_iter->first, s.ToString().c_str());
corrupt_backups_.insert(
std::make_pair(backup_iter->first,
std::make_pair(s, std::move(backup_iter->second))));
} else if (!s.ok()) {
backup_iter->first, io_s.ToString().c_str());
corrupt_backups_.insert(std::make_pair(
backup_iter->first,
std::make_pair(io_s, std::move(backup_iter->second))));
} else if (!io_s.ok()) {
// Distinguish corruption errors from errors in the backup Env.
// Errors in the backup Env (i.e., this code path) will cause Open() to
// fail, whereas corruption errors would not cause Open() failures.
return s;
return io_s;
} else {
ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
backup_iter->first,
......@@ -1168,7 +1179,7 @@ Status BackupEngineImpl::Initialize() {
uint64_t prev_bytes_written = IOSTATS(bytes_written);
CopyOrCreateResult result;
result.status = CopyOrCreateFile(
result.io_status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.src_env, work_item.dst_env, work_item.src_env_options,
work_item.sync, work_item.rate_limiter, &result.size,
......@@ -1182,7 +1193,7 @@ Status BackupEngineImpl::Initialize() {
result.db_id = work_item.db_id;
result.db_session_id = work_item.db_session_id;
if (result.status.ok() && !work_item.src_checksum_hex.empty()) {
if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
// unknown checksum function name implies no db table file checksum in
// db manifest; work_item.src_checksum_hex not empty means
// backup engine has calculated its crc32c checksum for the table
......@@ -1194,9 +1205,9 @@ Status BackupEngineImpl::Initialize() {
std::string checksum_info(
"Expected checksum is " + work_item.src_checksum_hex +
" while computed checksum is " + result.checksum_hex);
result.status =
Status::Corruption("Checksum mismatch after copying to " +
work_item.dst_path + ": " + checksum_info);
result.io_status = IOStatus::Corruption(
"Checksum mismatch after copying to " + work_item.dst_path +
": " + checksum_info);
}
} else {
// FIXME(peterd): dead code?
......@@ -1217,16 +1228,16 @@ Status BackupEngineImpl::Initialize() {
}
ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
return Status::OK();
return IOStatus::OK();
}
Status BackupEngineImpl::CreateNewBackupWithMetadata(
IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
const CreateBackupOptions& options, DB* db, const std::string& app_metadata,
BackupID* new_backup_id_ptr) {
assert(initialized_);
assert(!read_only_);
if (app_metadata.size() > kMaxAppMetaSize) {
return Status::InvalidArgument("App metadata too large");
return IOStatus::InvalidArgument("App metadata too large");
}
if (options.decrease_background_thread_cpu_priority) {
......@@ -1246,8 +1257,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
assert(backups_.find(new_backup_id) == backups_.end());
auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
Status s = backup_env_->FileExists(private_dir);
if (s.ok()) {
IOStatus io_s = backup_fs_->FileExists(private_dir, io_options_, nullptr);
if (io_s.ok()) {
// maybe last backup failed and left partial state behind, clean it up.
// need to do this before updating backups_ such that a private dir
// named after new_backup_id will be cleaned up.
......@@ -1255,17 +1266,17 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
// of the latest full backup, then there could be more than one next
// id with a private dir, the last thing to be deleted in delete
// backup, but all will be cleaned up with a GarbageCollect.)
s = GarbageCollect();
} else if (s.IsNotFound()) {
io_s = GarbageCollect();
} else if (io_s.IsNotFound()) {
// normal case, the new backup's private dir doesn't exist yet
s = Status::OK();
io_s = IOStatus::OK();
}
auto ret = backups_.insert(std::make_pair(
new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
GetBackupMetaFile(new_backup_id, false /* tmp */),
GetBackupMetaFile(new_backup_id, true /* tmp */),
&backuped_file_infos_, backup_env_))));
&backuped_file_infos_, backup_env_, backup_fs_))));
assert(ret.second == true);
auto& new_backup = ret.first->second;
new_backup->RecordTimestamp();
......@@ -1283,8 +1294,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
"DEPRECATED and could lead to data loss.");
}
if (s.ok()) {
s = backup_env_->CreateDir(private_dir);
if (io_s.ok()) {
io_s = backup_fs_->CreateDir(private_dir, io_options_, nullptr);
}
// A set into which we will insert the dst_paths that are calculated for live
......@@ -1298,7 +1309,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
Status disabled = db->DisableFileDeletions();
DBOptions db_options = db->GetDBOptions();
Statistics* stats = db_options.statistics.get();
if (s.ok()) {
if (io_s.ok()) {
CheckpointImpl checkpoint(db);
uint64_t sequence_number = 0;
FileChecksumGenFactory* db_checksum_factory =
......@@ -1312,26 +1323,27 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
: false;
EnvOptions src_raw_env_options(db_options);
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
s = checkpoint.CreateCustomCheckpoint(
io_s = status_to_io_status(checkpoint.CreateCustomCheckpoint(
db_options,
[&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
FileType) {
// custom checkpoint will switch to calling copy_file_cb after it sees
// NotSupported returned from link_file_cb.
return Status::NotSupported();
return IOStatus::NotSupported();
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType type,
const std::string& checksum_func_name,
const std::string& checksum_val) {
if (type == kWalFile && !options_.backup_log_files) {
return Status::OK();
return IOStatus::OK();
}
Log(options_.info_log, "add file for backup %s", fname.c_str());
uint64_t size_bytes = 0;
Status st;
IOStatus io_st;
if (type == kTableFile || type == kBlobFile) {
st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
io_st = db_fs_->GetFileSize(src_dirname + fname, io_options_,
&size_bytes, nullptr);
}
EnvOptions src_env_options;
switch (type) {
......@@ -1358,8 +1370,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
src_env_options = src_raw_env_options;
break;
}
if (st.ok()) {
st = AddBackupFileWorkItem(
if (io_st.ok()) {
io_st = AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
options_.share_table_files &&
(type == kTableFile || type == kBlobFile),
......@@ -1370,7 +1382,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
options.progress_callback, "" /* contents */,
checksum_func_name, checksum_val);
}
return st;
return io_st;
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents,
FileType type) {
......@@ -1383,28 +1395,28 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
false /* shared_checksum */, options.progress_callback, contents);
} /* create_file_cb */,
&sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
compare_checksum);
if (s.ok()) {
compare_checksum));
if (io_s.ok()) {
new_backup->SetSequenceNumber(sequence_number);
}
}
ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
Status item_status;
IOStatus item_io_status;
for (auto& item : backup_items_to_finish) {
item.result.wait();
auto result = item.result.get();
item_status = result.status;
if (item_status.ok() && item.shared && item.needed_to_copy) {
item_status =
item.backup_env->RenameFile(item.dst_path_tmp, item.dst_path);
item_io_status = result.io_status;
if (item_io_status.ok() && item.shared && item.needed_to_copy) {
item_io_status = item.backup_env->GetFileSystem()->RenameFile(
item.dst_path_tmp, item.dst_path, io_options_, nullptr);
}
if (item_status.ok()) {
item_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
if (item_io_status.ok()) {
item_io_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
item.dst_relative, result.size, result.checksum_hex, result.db_id,
result.db_session_id));
}
if (!item_status.ok()) {
s = item_status;
if (!item_io_status.ok()) {
io_s = item_io_status;
}
}
......@@ -1414,33 +1426,34 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
}
auto backup_time = backup_env_->NowMicros() - start_backup;
if (s.ok()) {
if (io_s.ok()) {
// persist the backup metadata on the disk
s = new_backup->StoreToFile(options_.sync, test_future_options_.get());
io_s = new_backup->StoreToFile(options_.sync, test_future_options_.get());
}
if (s.ok() && options_.sync) {
std::unique_ptr<Directory> backup_private_directory;
backup_env_->NewDirectory(
GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
&backup_private_directory);
if (io_s.ok() && options_.sync) {
std::unique_ptr<FSDirectory> backup_private_directory;
backup_fs_
->NewDirectory(GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
io_options_, &backup_private_directory, nullptr)
.PermitUncheckedError();
if (backup_private_directory != nullptr) {
s = backup_private_directory->Fsync();
io_s = backup_private_directory->Fsync(io_options_, nullptr);
}
if (s.ok() && private_directory_ != nullptr) {
s = private_directory_->Fsync();
if (io_s.ok() && private_directory_ != nullptr) {
io_s = private_directory_->Fsync(io_options_, nullptr);
}
if (s.ok() && meta_directory_ != nullptr) {
s = meta_directory_->Fsync();
if (io_s.ok() && meta_directory_ != nullptr) {
io_s = meta_directory_->Fsync(io_options_, nullptr);
}
if (s.ok() && shared_directory_ != nullptr) {
s = shared_directory_->Fsync();
if (io_s.ok() && shared_directory_ != nullptr) {
io_s = shared_directory_->Fsync(io_options_, nullptr);
}
if (s.ok() && backup_directory_ != nullptr) {
s = backup_directory_->Fsync();
if (io_s.ok() && backup_directory_ != nullptr) {
io_s = backup_directory_->Fsync(io_options_, nullptr);
}
}
if (s.ok()) {
if (io_s.ok()) {
backup_statistics_.IncrementNumberSuccessBackup();
// here we know that we succeeded and installed the new backup
latest_backup_id_ = new_backup_id;
......@@ -1466,7 +1479,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
backup_statistics_.IncrementNumberFailBackup();
// clean all the files we might have created
ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
s.ToString().c_str());
io_s.ToString().c_str());
ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
backup_statistics_.ToString().c_str());
// delete files that we might have already written
......@@ -1477,15 +1490,15 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
RecordTick(stats, BACKUP_READ_BYTES, IOSTATS(bytes_read) - prev_bytes_read);
RecordTick(stats, BACKUP_WRITE_BYTES,
IOSTATS(bytes_written) - prev_bytes_written);
return s;
return io_s;
}
Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
IOStatus BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
assert(initialized_);
assert(!read_only_);
// Best effort deletion even with errors
Status overall_status = Status::OK();
IOStatus overall_status = IOStatus::OK();
ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
num_backups_to_keep);
......@@ -1497,25 +1510,25 @@ Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
}
for (auto backup_id : to_delete) {
// Do not GC until end
auto s = DeleteBackupNoGC(backup_id);
if (!s.ok()) {
overall_status = s;
IOStatus io_s = DeleteBackupNoGC(backup_id);
if (!io_s.ok()) {
overall_status = io_s;
}
}
// Clean up after any incomplete backup deletion, potentially from
// earlier session.
if (might_need_garbage_collect_) {
auto s = GarbageCollect();
if (!s.ok() && overall_status.ok()) {
overall_status = s;
IOStatus io_s = GarbageCollect();
if (!io_s.ok() && overall_status.ok()) {
overall_status = io_s;
}
}
return overall_status;
}
Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
auto s1 = DeleteBackupNoGC(backup_id);
auto s2 = Status::OK();
IOStatus BackupEngineImpl::DeleteBackup(BackupID backup_id) {
IOStatus s1 = DeleteBackupNoGC(backup_id);
IOStatus s2 = IOStatus::OK();
// Clean up after any incomplete backup deletion, potentially from
// earlier session.
......@@ -1534,26 +1547,26 @@ Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
}
// Does not auto-GarbageCollect nor lock
Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
IOStatus BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
assert(initialized_);
assert(!read_only_);
ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
auto backup = backups_.find(backup_id);
if (backup != backups_.end()) {
auto s = backup->second->Delete();
if (!s.ok()) {
return s;
IOStatus io_s = backup->second->Delete();
if (!io_s.ok()) {
return io_s;
}
backups_.erase(backup);
} else {
auto corrupt = corrupt_backups_.find(backup_id);
if (corrupt == corrupt_backups_.end()) {
return Status::NotFound("Backup not found");
return IOStatus::NotFound("Backup not found");
}
auto s = corrupt->second.second->Delete();
if (!s.ok()) {
return s;
IOStatus io_s = corrupt->second.second->Delete();
if (!io_s.ok()) {
return io_s;
}
corrupt->second.first.PermitUncheckedError();
corrupt_backups_.erase(corrupt);
......@@ -1565,11 +1578,12 @@ Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
std::vector<std::string> to_delete;
for (auto& itr : backuped_file_infos_) {
if (itr.second->refs == 0) {
Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(itr.first),
io_options_, nullptr);
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
s.ToString().c_str());
io_s.ToString().c_str());
to_delete.push_back(itr.first);
if (!s.ok()) {
if (!io_s.ok()) {
// Trying again later might work
might_need_garbage_collect_ = true;
}
......@@ -1582,14 +1596,15 @@ Status BackupEngineImpl::DeleteBackupNoGC(BackupID backup_id) {
// take care of private dirs -- GarbageCollect() will take care of them
// if they are not empty
std::string private_dir = GetPrivateFileRel(backup_id);
Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
IOStatus io_s =
backup_fs_->DeleteDir(GetAbsolutePath(private_dir), io_options_, nullptr);
ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
private_dir.c_str(), s.ToString().c_str());
if (!s.ok()) {
private_dir.c_str(), io_s.ToString().c_str());
if (!io_s.ok()) {
// Full gc or trying again later might work
might_need_garbage_collect_ = true;
}
return Status::OK();
return IOStatus::OK();
}
void BackupEngineImpl::SetBackupInfoFromBackupMeta(
......@@ -1660,10 +1675,9 @@ void BackupEngineImpl::GetCorruptedBackups(
}
}
Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) const {
IOStatus BackupEngineImpl::RestoreDBFromBackup(
const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir, const std::string& wal_dir) const {
assert(initialized_);
if (backup_id == kLatestBackupIDMarker) {
// Note: Read latest_valid_backup_id_ inside of lock
......@@ -1675,11 +1689,11 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
}
auto backup_itr = backups_.find(backup_id);
if (backup_itr == backups_.end()) {
return Status::NotFound("Backup not found");
return IOStatus::NotFound("Backup not found");
}
auto& backup = backup_itr->second;
if (backup->Empty()) {
return Status::NotFound("Backup not found");
return IOStatus::NotFound("Backup not found");
}
ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
......@@ -1687,8 +1701,10 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
static_cast<int>(options.keep_log_files));
// just in case. Ignore errors
db_env_->CreateDirIfMissing(db_dir).PermitUncheckedError();
db_env_->CreateDirIfMissing(wal_dir).PermitUncheckedError();
db_fs_->CreateDirIfMissing(db_dir, io_options_, nullptr)
.PermitUncheckedError();
db_fs_->CreateDirIfMissing(wal_dir, io_options_, nullptr)
.PermitUncheckedError();
if (options.keep_log_files) {
// delete files in db_dir, but keep all the log files
......@@ -1696,7 +1712,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
// move all the files from archive dir to wal_dir
std::string archive_dir = ArchivalDirectory(wal_dir);
std::vector<std::string> archive_files;
db_env_->GetChildren(archive_dir, &archive_files)
db_fs_->GetChildren(archive_dir, io_options_, &archive_files, nullptr)
.PermitUncheckedError(); // ignore errors
for (const auto& f : archive_files) {
uint64_t number;
......@@ -1706,12 +1722,12 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
ROCKS_LOG_INFO(options_.info_log,
"Moving log file from archive/ to wal_dir: %s",
f.c_str());
Status s =
db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
if (!s.ok()) {
IOStatus io_s = db_fs_->RenameFile(
archive_dir + "/" + f, wal_dir + "/" + f, io_options_, nullptr);
if (!io_s.ok()) {
// if we can't move log file from archive_dir to wal_dir,
// we should fail, since it might mean data loss
return s;
return io_s;
}
}
}
......@@ -1721,12 +1737,12 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
DeleteChildren(db_dir);
}
Status s;
IOStatus io_s;
std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
std::string temporary_current_file;
std::string final_current_file;
std::unique_ptr<Directory> db_dir_for_fsync;
std::unique_ptr<Directory> wal_dir_for_fsync;
std::unique_ptr<FSDirectory> db_dir_for_fsync;
std::unique_ptr<FSDirectory> wal_dir_for_fsync;
for (const auto& file_info : backup->GetFiles()) {
const std::string& file = file_info->filename;
......@@ -1738,7 +1754,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
FileType type;
bool ok = ParseFileName(dst, &number, &type);
if (!ok) {
return Status::Corruption("Backup corrupted: Fail to parse filename " +
return IOStatus::Corruption("Backup corrupted: Fail to parse filename " +
dst);
}
// 3. Construct the final path
......@@ -1746,17 +1762,19 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
if (type == kWalFile) {
dst = wal_dir + "/" + dst;
if (options_.sync && !wal_dir_for_fsync) {
s = db_env_->NewDirectory(wal_dir, &wal_dir_for_fsync);
if (!s.ok()) {
return s;
io_s = db_fs_->NewDirectory(wal_dir, io_options_, &wal_dir_for_fsync,
nullptr);
if (!io_s.ok()) {
return io_s;
}
}
} else {
dst = db_dir + "/" + dst;
if (options_.sync && !db_dir_for_fsync) {
s = db_env_->NewDirectory(db_dir, &db_dir_for_fsync);
if (!s.ok()) {
return s;
io_s = db_fs_->NewDirectory(db_dir, io_options_, &db_dir_for_fsync,
nullptr);
if (!io_s.ok()) {
return io_s;
}
}
}
......@@ -1782,19 +1800,19 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
restore_items_to_finish.push_back(
std::move(after_copy_or_create_work_item));
}
Status item_status;
IOStatus item_io_status;
for (auto& item : restore_items_to_finish) {
item.result.wait();
auto result = item.result.get();
item_status = result.status;
item_io_status = result.io_status;
// Note: It is possible that both of the following bad-status cases occur
// during copying. But, we only return one status.
if (!item_status.ok()) {
s = item_status;
if (!item_io_status.ok()) {
io_s = item_io_status;
break;
} else if (!item.checksum_hex.empty() &&
item.checksum_hex != result.checksum_hex) {
s = Status::Corruption(
io_s = IOStatus::Corruption(
"While restoring " + item.from_file + " -> " + item.to_file +
": expected checksum is " + item.checksum_hex +
" while computed checksum is " + result.checksum_hex);
......@@ -1804,35 +1822,36 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
// When enabled, the first Fsync is to ensure all files are fully persisted
// before renaming CURRENT.tmp
if (s.ok() && db_dir_for_fsync) {
if (io_s.ok() && db_dir_for_fsync) {
ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n");
s = db_dir_for_fsync->Fsync();
io_s = db_dir_for_fsync->Fsync(io_options_, nullptr);
}
if (s.ok() && wal_dir_for_fsync) {
s = wal_dir_for_fsync->Fsync();
if (io_s.ok() && wal_dir_for_fsync) {
io_s = wal_dir_for_fsync->Fsync(io_options_, nullptr);
}
if (s.ok() && !temporary_current_file.empty()) {
if (io_s.ok() && !temporary_current_file.empty()) {
ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n");
assert(!final_current_file.empty());
s = db_env_->RenameFile(temporary_current_file, final_current_file);
io_s = db_fs_->RenameFile(temporary_current_file, final_current_file,
io_options_, nullptr);
}
if (s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
// Second Fsync is to ensure the final atomic rename of DB restore is
// fully persisted even if power goes out right after restore operation
// returns success
assert(db_dir_for_fsync);
s = db_dir_for_fsync->Fsync();
io_s = db_dir_for_fsync->Fsync(io_options_, nullptr);
}
ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
s.ToString().c_str());
return s;
io_s.ToString().c_str());
return io_s;
}
Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id,
bool verify_with_checksum) const {
assert(initialized_);
// Check if backup_id is corrupted, or valid and registered
......@@ -1843,12 +1862,12 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
auto backup_itr = backups_.find(backup_id);
if (backup_itr == backups_.end()) {
return Status::NotFound();
return IOStatus::NotFound();
}
auto& backup = backup_itr->second;
if (backup->Empty()) {
return Status::NotFound();
return IOStatus::NotFound();
}
ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
......@@ -1860,7 +1879,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
const auto abs_dir = GetAbsolutePath(rel_dir);
// Shared directories allowed to be missing in some cases. Expected but
// missing files will be reported a few lines down.
ReadChildFileCurrentSizes(abs_dir, backup_env_, &curr_abs_path_to_size)
ReadChildFileCurrentSizes(abs_dir, backup_fs_, &curr_abs_path_to_size)
.PermitUncheckedError();
}
......@@ -1869,7 +1888,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
const auto abs_path = GetAbsolutePath(file_info->filename);
// check existence of the file
if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
return Status::NotFound("File missing: " + abs_path);
return IOStatus::NotFound("File missing: " + abs_path);
}
// verify file size
if (file_info->size != curr_abs_path_to_size[abs_path]) {
......@@ -1877,7 +1896,7 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
ToString(file_info->size) +
" while found file size is " +
ToString(curr_abs_path_to_size[abs_path]));
return Status::Corruption("File corrupted: File size mismatch for " +
return IOStatus::Corruption("File corrupted: File size mismatch for " +
abs_path + ": " + size_info);
}
if (verify_with_checksum && !file_info->checksum_hex.empty()) {
......@@ -1885,29 +1904,30 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id,
std::string checksum_hex;
ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n",
abs_path.c_str());
Status s = ReadFileAndComputeChecksum(abs_path, backup_env_, EnvOptions(),
IOStatus io_s =
ReadFileAndComputeChecksum(abs_path, backup_fs_, EnvOptions(),
0 /* size_limit */, &checksum_hex);
if (!s.ok()) {
return s;
if (!io_s.ok()) {
return io_s;
} else if (file_info->checksum_hex != checksum_hex) {
std::string checksum_info(
"Expected checksum is " + file_info->checksum_hex +
" while computed checksum is " + checksum_hex);
return Status::Corruption("File corrupted: Checksum mismatch for " +
return IOStatus::Corruption("File corrupted: Checksum mismatch for " +
abs_path + ": " + checksum_info);
}
}
}
return Status::OK();
return IOStatus::OK();
}
Status BackupEngineImpl::CopyOrCreateFile(
IOStatus BackupEngineImpl::CopyOrCreateFile(
const std::string& src, const std::string& dst, const std::string& contents,
Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
RateLimiter* rate_limiter, uint64_t* size, std::string* checksum_hex,
uint64_t size_limit, std::function<void()> progress_callback) {
assert(src.empty() != contents.empty());
Status s;
IOStatus io_s;
std::unique_ptr<FSWritableFile> dst_file;
std::unique_ptr<FSSequentialFile> src_file;
FileOptions dst_file_options;
......@@ -1923,14 +1943,14 @@ Status BackupEngineImpl::CopyOrCreateFile(
size_limit = std::numeric_limits<uint64_t>::max();
}
s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
&dst_file, nullptr);
if (s.ok() && !src.empty()) {
s = src_env->GetFileSystem()->NewSequentialFile(
if (io_s.ok() && !src.empty()) {
io_s = src_env->GetFileSystem()->NewSequentialFile(
src, FileOptions(src_env_options), &src_file, nullptr);
}
if (!s.ok()) {
return s;
if (!io_s.ok()) {
return io_s;
}
size_t buf_size =
......@@ -1950,12 +1970,12 @@ Status BackupEngineImpl::CopyOrCreateFile(
uint64_t processed_buffer_size = 0;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
return status_to_io_status(Status::Incomplete("Backup stopped"));
}
if (!src.empty()) {
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
io_s = src_reader->Read(buffer_to_read, &data, buf.get());
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
......@@ -1970,8 +1990,8 @@ Status BackupEngineImpl::CopyOrCreateFile(
(src.length() > 4 && src.rfind(".sst") == src.length() - 4) ? &data
: nullptr);
if (!s.ok()) {
return s;
if (!io_s.ok()) {
return io_s;
}
if (size != nullptr) {
......@@ -1980,7 +2000,7 @@ Status BackupEngineImpl::CopyOrCreateFile(
if (checksum_hex != nullptr) {
checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
}
s = dest_writer->Append(data);
io_s = dest_writer->Append(data);
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kWrite);
......@@ -1990,24 +2010,24 @@ Status BackupEngineImpl::CopyOrCreateFile(
std::lock_guard<std::mutex> lock(byte_report_mutex_);
progress_callback();
}
} while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
} while (io_s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
// Convert uint32_t checksum to hex checksum
if (checksum_hex != nullptr) {
checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
}
if (s.ok() && sync) {
s = dest_writer->Sync(false);
if (io_s.ok() && sync) {
io_s = dest_writer->Sync(false);
}
if (s.ok()) {
s = dest_writer->Close();
if (io_s.ok()) {
io_s = dest_writer->Close();
}
return s;
return io_s;
}
// fname will always start with "/"
Status BackupEngineImpl::AddBackupFileWorkItem(
IOStatus BackupEngineImpl::AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
......@@ -2040,7 +2060,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// Step 0: Check if default checksum function name is passed in
if (kDbFileChecksumFuncName == src_checksum_func_name) {
if (src_checksum_str == kUnknownFileChecksum) {
return Status::Aborted("Unknown checksum value for " + fname);
return status_to_io_status(
Status::Aborted("Unknown checksum value for " + fname));
}
checksum_hex = ChecksumStrToHex(src_checksum_str);
}
......@@ -2061,14 +2082,14 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// since the session id should suffice to avoid file name collision in
// the shared_checksum directory.
if (checksum_hex.empty() && db_session_id.empty()) {
Status s = ReadFileAndComputeChecksum(
src_dir + fname, db_env_, src_env_options, size_limit, &checksum_hex);
if (!s.ok()) {
return s;
IOStatus io_s = ReadFileAndComputeChecksum(
src_dir + fname, db_fs_, src_env_options, size_limit, &checksum_hex);
if (!io_s.ok()) {
return io_s;
}
}
if (size_bytes == port::kMaxUint64) {
return Status::NotFound("File missing: " + src_dir + fname);
return IOStatus::NotFound("File missing: " + src_dir + fname);
}
// dst_relative depends on the following conditions:
// 1) the naming scheme is kUseDbSessionId,
......@@ -2122,7 +2143,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
if (shared && !same_path) {
// Should be in shared directory but not a live path, check existence in
// shared directory
Status exist = backup_env_->FileExists(final_dest_path);
IOStatus exist =
backup_fs_->FileExists(final_dest_path, io_options_, nullptr);
if (exist.ok()) {
file_exists = true;
} else if (exist.IsNotFound()) {
......@@ -2146,7 +2168,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
fname.c_str());
need_to_copy = true;
// Defer any failure reporting to when we try to write the file
backup_env_->DeleteFile(final_dest_path).PermitUncheckedError();
backup_fs_->DeleteFile(final_dest_path, io_options_, nullptr)
.PermitUncheckedError();
} else {
// file exists and referenced
if (checksum_hex.empty()) {
......@@ -2178,11 +2201,11 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// ID, but even in that case, we double check the file sizes in
// BackupMeta::AddFile.
} else {
Status s = ReadFileAndComputeChecksum(src_dir + fname, db_env_,
src_env_options, size_limit,
&checksum_hex);
if (!s.ok()) {
return s;
IOStatus io_s = ReadFileAndComputeChecksum(src_dir + fname, db_fs_,
src_env_options,
size_limit, &checksum_hex);
if (!io_s.ok()) {
return io_s;
}
}
}
......@@ -2222,21 +2245,22 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
temp_dest_path, final_dest_path, dst_relative);
backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
CopyOrCreateResult result;
result.status = Status::OK();
result.io_status = IOStatus::OK();
result.size = size_bytes;
result.checksum_hex = std::move(checksum_hex);
result.db_id = std::move(db_id);
result.db_session_id = std::move(db_session_id);
promise_result.set_value(std::move(result));
}
return Status::OK();
return IOStatus::OK();
}
Status BackupEngineImpl::ReadFileAndComputeChecksum(
const std::string& src, Env* src_env, const EnvOptions& src_env_options,
uint64_t size_limit, std::string* checksum_hex) const {
IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
const std::string& src, const std::shared_ptr<FileSystem>& src_fs,
const EnvOptions& src_env_options, uint64_t size_limit,
std::string* checksum_hex) const {
if (checksum_hex == nullptr) {
return Status::Aborted("Checksum pointer is null");
return status_to_io_status(Status::Aborted("Checksum pointer is null"));
}
uint32_t checksum_value = 0;
if (size_limit == 0) {
......@@ -2244,11 +2268,10 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
}
std::unique_ptr<SequentialFileReader> src_reader;
Status s = SequentialFileReader::Create(src_env->GetFileSystem(), src,
FileOptions(src_env_options),
&src_reader, nullptr);
if (!s.ok()) {
return s;
IOStatus io_s = SequentialFileReader::Create(
src_fs, src, FileOptions(src_env_options), &src_reader, nullptr);
if (!io_s.ok()) {
return io_s;
}
RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
......@@ -2260,17 +2283,17 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
return status_to_io_status(Status::Incomplete("Backup stopped"));
}
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
io_s = src_reader->Read(buffer_to_read, &data, buf.get());
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
if (!s.ok()) {
return s;
if (!io_s.ok()) {
return io_s;
}
size_limit -= data.size();
......@@ -2279,7 +2302,7 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
checksum_hex->assign(ChecksumInt32ToHex(checksum_value));
return s;
return io_s;
}
Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
......@@ -2347,7 +2370,8 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
void BackupEngineImpl::DeleteChildren(const std::string& dir,
uint32_t file_type_filter) const {
std::vector<std::string> children;
db_env_->GetChildren(dir, &children).PermitUncheckedError(); // ignore errors
db_fs_->GetChildren(dir, io_options_, &children, nullptr)
.PermitUncheckedError(); // ignore errors
for (const auto& f : children) {
uint64_t number;
......@@ -2357,36 +2381,38 @@ void BackupEngineImpl::DeleteChildren(const std::string& dir,
// don't delete this file
continue;
}
db_env_->DeleteFile(dir + "/" + f).PermitUncheckedError(); // ignore errors
db_fs_->DeleteFile(dir + "/" + f, io_options_, nullptr)
.PermitUncheckedError(); // ignore errors
}
}
Status BackupEngineImpl::ReadChildFileCurrentSizes(
const std::string& dir, Env* env,
IOStatus BackupEngineImpl::ReadChildFileCurrentSizes(
const std::string& dir, const std::shared_ptr<FileSystem>& fs,
std::unordered_map<std::string, uint64_t>* result) const {
assert(result != nullptr);
std::vector<Env::FileAttributes> files_attrs;
Status status = env->FileExists(dir);
if (status.ok()) {
status = env->GetChildrenFileAttributes(dir, &files_attrs);
} else if (status.IsNotFound()) {
IOStatus io_status = fs->FileExists(dir, io_options_, nullptr);
if (io_status.ok()) {
io_status =
fs->GetChildrenFileAttributes(dir, io_options_, &files_attrs, nullptr);
} else if (io_status.IsNotFound()) {
// Insert no entries can be considered success
status = Status::OK();
io_status = IOStatus::OK();
}
const bool slash_needed = dir.empty() || dir.back() != '/';
for (const auto& file_attrs : files_attrs) {
result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
file_attrs.size_bytes);
}
return status;
return io_status;
}
Status BackupEngineImpl::GarbageCollect() {
IOStatus BackupEngineImpl::GarbageCollect() {
assert(!read_only_);
// We will make a best effort to remove all garbage even in the presence
// of inconsistencies or I/O failures that inhibit finding garbage.
Status overall_status = Status::OK();
IOStatus overall_status = IOStatus::OK();
// If all goes well, we don't need another auto-GC this session
might_need_garbage_collect_ = false;
......@@ -2402,14 +2428,15 @@ Status BackupEngineImpl::GarbageCollect() {
} else {
shared_path = GetAbsolutePath(GetSharedFileRel());
}
auto s = backup_env_->FileExists(shared_path);
if (s.ok()) {
s = backup_env_->GetChildren(shared_path, &shared_children);
} else if (s.IsNotFound()) {
s = Status::OK();
IOStatus io_s = backup_fs_->FileExists(shared_path, io_options_, nullptr);
if (io_s.ok()) {
io_s = backup_fs_->GetChildren(shared_path, io_options_,
&shared_children, nullptr);
} else if (io_s.IsNotFound()) {
io_s = IOStatus::OK();
}
if (!s.ok()) {
overall_status = s;
if (!io_s.ok()) {
overall_status = io_s;
// Trying again later might work
might_need_garbage_collect_ = true;
}
......@@ -2427,11 +2454,12 @@ Status BackupEngineImpl::GarbageCollect() {
child_itr->second->refs == 0) {
// this might be a directory, but DeleteFile will just fail in that
// case, so we're good
Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
IOStatus io_s = backup_fs_->DeleteFile(GetAbsolutePath(rel_fname),
io_options_, nullptr);
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
rel_fname.c_str(), s.ToString().c_str());
rel_fname.c_str(), io_s.ToString().c_str());
backuped_file_infos_.erase(rel_fname);
if (!s.ok()) {
if (!io_s.ok()) {
// Trying again later might work
might_need_garbage_collect_ = true;
}
......@@ -2442,10 +2470,11 @@ Status BackupEngineImpl::GarbageCollect() {
// delete obsolete private files
std::vector<std::string> private_children;
{
auto s = backup_env_->GetChildren(GetAbsolutePath(kPrivateDirName),
&private_children);
if (!s.ok()) {
overall_status = s;
IOStatus io_s =
backup_fs_->GetChildren(GetAbsolutePath(kPrivateDirName), io_options_,
&private_children, nullptr);
if (!io_s.ok()) {
overall_status = io_s;
// Trying again later might work
might_need_garbage_collect_ = true;
}
......@@ -2463,23 +2492,27 @@ Status BackupEngineImpl::GarbageCollect() {
std::string full_private_path =
GetAbsolutePath(GetPrivateFileRel(backup_id));
std::vector<std::string> subchildren;
if (backup_env_->GetChildren(full_private_path, &subchildren).ok()) {
if (backup_fs_
->GetChildren(full_private_path, io_options_, &subchildren, nullptr)
.ok()) {
for (auto& subchild : subchildren) {
Status s = backup_env_->DeleteFile(full_private_path + subchild);
IOStatus io_s = backup_fs_->DeleteFile(full_private_path + subchild,
io_options_, nullptr);
ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
(full_private_path + subchild).c_str(),
s.ToString().c_str());
if (!s.ok()) {
io_s.ToString().c_str());
if (!io_s.ok()) {
// Trying again later might work
might_need_garbage_collect_ = true;
}
}
}
// finally delete the private dir
Status s = backup_env_->DeleteDir(full_private_path);
IOStatus io_s =
backup_fs_->DeleteDir(full_private_path, io_options_, nullptr);
ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
full_private_path.c_str(), s.ToString().c_str());
if (!s.ok()) {
full_private_path.c_str(), io_s.ToString().c_str());
if (!io_s.ok()) {
// Trying again later might work
might_need_garbage_collect_ = true;
}
......@@ -2491,7 +2524,7 @@ Status BackupEngineImpl::GarbageCollect() {
// ------- BackupMeta class --------
Status BackupEngineImpl::BackupMeta::AddFile(
IOStatus BackupEngineImpl::BackupMeta::AddFile(
std::shared_ptr<FileInfo> file_info) {
auto itr = file_infos_->find(file_info->filename);
if (itr == file_infos_->end()) {
......@@ -2501,7 +2534,7 @@ Status BackupEngineImpl::BackupMeta::AddFile(
itr->second->refs = 1;
} else {
// if this happens, something is seriously wrong
return Status::Corruption("In memory metadata insertion error");
return IOStatus::Corruption("In memory metadata insertion error");
}
} else {
// Compare sizes, because we scanned that off the filesystem on both
......@@ -2514,7 +2547,7 @@ Status BackupEngineImpl::BackupMeta::AddFile(
msg.append(
" If this DB file checks as not corrupt, try deleting old"
" backups or backing up to a different backup directory.");
return Status::Corruption(msg);
return IOStatus::Corruption(msg);
}
if (file_info->checksum_hex.empty()) {
// No checksum available to check
......@@ -2536,7 +2569,7 @@ Status BackupEngineImpl::BackupMeta::AddFile(
msg.append(
" If this DB file checks as not corrupt, try deleting old"
" backups or backing up to a different backup directory.");
return Status::Corruption(msg);
return IOStatus::Corruption(msg);
}
++itr->second->refs; // increase refcount if already present
}
......@@ -2544,26 +2577,26 @@ Status BackupEngineImpl::BackupMeta::AddFile(
size_ += file_info->size;
files_.push_back(itr->second);
return Status::OK();
return IOStatus::OK();
}
Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
Status s;
IOStatus BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
IOStatus io_s;
for (const auto& file : files_) {
--file->refs; // decrease refcount
}
files_.clear();
// delete meta file
if (delete_meta) {
s = env_->FileExists(meta_filename_);
if (s.ok()) {
s = env_->DeleteFile(meta_filename_);
} else if (s.IsNotFound()) {
s = Status::OK(); // nothing to delete
io_s = fs_->FileExists(meta_filename_, iooptions_, nullptr);
if (io_s.ok()) {
io_s = fs_->DeleteFile(meta_filename_, iooptions_, nullptr);
} else if (io_s.IsNotFound()) {
io_s = IOStatus::OK(); // nothing to delete
}
}
timestamp_ = 0;
return s;
return io_s;
}
// Constants for backup meta file schema (see LoadFromFile)
......@@ -2637,7 +2670,7 @@ const std::string kNonIgnorableFieldPrefix{"ni::"};
// * Footer meta fields:
// * None yet (future use for meta file checksum anticipated)
//
Status BackupEngineImpl::BackupMeta::LoadFromFile(
IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
const std::string& backup_dir,
const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
RateLimiter* rate_limiter, Logger* info_log,
......@@ -2647,11 +2680,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
std::unique_ptr<LineFileReader> backup_meta_reader;
{
Status s =
LineFileReader::Create(env_->GetFileSystem(), meta_filename_,
FileOptions(), &backup_meta_reader, nullptr);
if (!s.ok()) {
return s;
IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(),
&backup_meta_reader, nullptr);
if (!io_s.ok()) {
return io_s;
}
}
......@@ -2671,12 +2703,12 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
if (ver == "2" || StartsWith(ver, "2.")) {
schema_major_version = 2;
} else {
return Status::NotSupported(
return IOStatus::NotSupported(
"Unsupported/unrecognized schema version: " + ver);
}
line.clear();
} else if (line.empty()) {
return Status::Corruption("Unexpected empty line");
return IOStatus::Corruption("Unexpected empty line");
}
}
if (!line.empty()) {
......@@ -2702,7 +2734,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
RateLimiter::OpType::kRead);
}
if (line.empty()) {
return Status::Corruption("Unexpected empty line");
return IOStatus::Corruption("Unexpected empty line");
}
// Number -> number of files -> exit loop reading optional meta fields
if (line[0] >= '0' && line[0] <= '9') {
......@@ -2712,7 +2744,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
// else, must be a meta field assignment
auto space_pos = line.find_first_of(' ');
if (space_pos == std::string::npos) {
return Status::Corruption("Expected number of files or meta field");
return IOStatus::Corruption("Expected number of files or meta field");
}
std::string field_name = line.substr(0, space_pos);
std::string field_data = line.substr(space_pos + 1);
......@@ -2720,14 +2752,14 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
// app metadata present
bool decode_success = Slice(field_data).DecodeHex(&app_metadata_);
if (!decode_success) {
return Status::Corruption(
return IOStatus::Corruption(
"Failed to decode stored hex encoded app metadata");
}
} else if (schema_major_version < 2) {
return Status::Corruption("Expected number of files or \"" +
return IOStatus::Corruption("Expected number of files or \"" +
kAppMetaDataFieldName + "\" field");
} else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
return Status::NotSupported("Unrecognized non-ignorable meta field " +
return IOStatus::NotSupported("Unrecognized non-ignorable meta field " +
field_name + " (from future version?)");
} else {
// Warn the first time we see any particular unrecognized meta field
......@@ -2747,7 +2779,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
std::vector<std::string> components = StringSplit(line, ' ');
if (components.size() < 1) {
return Status::Corruption("Empty line instead of file entry.");
return IOStatus::Corruption("Empty line instead of file entry.");
}
if (schema_major_version >= 2 && components.size() == 2 &&
line == kFooterMarker) {
......@@ -2765,30 +2797,30 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
std::string abs_path = backup_dir + "/" + filename;
auto e = abs_path_to_size.find(abs_path);
if (e == abs_path_to_size.end()) {
return Status::Corruption("Pathname in meta file not found on disk: " +
abs_path);
return IOStatus::Corruption(
"Pathname in meta file not found on disk: " + abs_path);
}
actual_size = e->second;
}
if (schema_major_version >= 2) {
if (components.size() % 2 != 1) {
return Status::Corruption(
return IOStatus::Corruption(
"Bad number of line components for file entry.");
}
} else {
// Check restricted original schema
if (components.size() < 3) {
return Status::Corruption("File checksum is missing for " + filename +
return IOStatus::Corruption("File checksum is missing for " + filename +
" in " + meta_filename_);
}
if (components[1] != kFileCrc32cFieldName) {
return Status::Corruption("Unknown checksum type for " + filename +
return IOStatus::Corruption("Unknown checksum type for " + filename +
" in " + meta_filename_);
}
if (components.size() > 3) {
return Status::Corruption("Extra data for entry " + filename + " in " +
meta_filename_);
return IOStatus::Corruption("Extra data for entry " + filename +
" in " + meta_filename_);
}
}
......@@ -2801,7 +2833,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
uint32_t checksum_value =
static_cast<uint32_t>(strtoul(field_data.c_str(), nullptr, 10));
if (field_data != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
return Status::Corruption("Invalid checksum value for " + filename +
return IOStatus::Corruption("Invalid checksum value for " + filename +
" in " + meta_filename_);
}
checksum_hex = ChecksumInt32ToHex(checksum_value);
......@@ -2809,12 +2841,12 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
uint64_t ex_size =
std::strtoull(field_data.c_str(), nullptr, /*base*/ 10);
if (ex_size != actual_size) {
return Status::Corruption("For file " + filename + " expected size " +
ToString(ex_size) + " but found size" +
ToString(actual_size));
return IOStatus::Corruption(
"For file " + filename + " expected size " + ToString(ex_size) +
" but found size" + ToString(actual_size));
}
} else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
return Status::NotSupported("Unrecognized non-ignorable file field " +
return IOStatus::NotSupported("Unrecognized non-ignorable file field " +
field_name + " (from future version?)");
} else {
// Warn the first time we see any particular unrecognized file field
......@@ -2836,16 +2868,16 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
RateLimiter::OpType::kRead);
}
if (line.empty()) {
return Status::Corruption("Unexpected empty line");
return IOStatus::Corruption("Unexpected empty line");
}
auto space_pos = line.find_first_of(' ');
if (space_pos == std::string::npos) {
return Status::Corruption("Expected footer field");
return IOStatus::Corruption("Expected footer field");
}
std::string field_name = line.substr(0, space_pos);
std::string field_data = line.substr(space_pos + 1);
if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
return Status::NotSupported("Unrecognized non-ignorable field " +
return IOStatus::NotSupported("Unrecognized non-ignorable field " +
field_name + " (from future version?)");
} else if (reported_ignored_fields->insert("footer:" + field_name)
.second) {
......@@ -2858,39 +2890,40 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
}
{
Status s = backup_meta_reader->GetStatus();
if (!s.ok()) {
return s;
IOStatus io_s = backup_meta_reader->GetStatus();
if (!io_s.ok()) {
return io_s;
}
}
if (num_files != files.size()) {
return Status::Corruption(
return IOStatus::Corruption(
"Inconsistent number of files or missing/incomplete header in " +
meta_filename_);
}
files_.reserve(files.size());
for (const auto& file_info : files) {
Status s = AddFile(file_info);
if (!s.ok()) {
return s;
IOStatus io_s = AddFile(file_info);
if (!io_s.ok()) {
return io_s;
}
}
return Status::OK();
return IOStatus::OK();
}
Status BackupEngineImpl::BackupMeta::StoreToFile(
IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
bool sync, const TEST_FutureSchemaVersion2Options* test_future_options) {
Status s;
std::unique_ptr<WritableFile> backup_meta_file;
EnvOptions env_options;
env_options.use_mmap_writes = false;
env_options.use_direct_writes = false;
s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
if (!s.ok()) {
return s;
IOStatus io_s;
std::unique_ptr<FSWritableFile> backup_meta_file;
FileOptions file_options;
file_options.use_mmap_writes = false;
file_options.use_direct_writes = false;
io_s = fs_->NewWritableFile(meta_tmp_filename_, file_options,
&backup_meta_file, nullptr);
if (!io_s.ok()) {
return io_s;
}
std::ostringstream buf;
......@@ -2938,18 +2971,19 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(
}
}
s = backup_meta_file->Append(Slice(buf.str()));
io_s = backup_meta_file->Append(Slice(buf.str()), iooptions_, nullptr);
IOSTATS_ADD(bytes_written, buf.str().size());
if (s.ok() && sync) {
s = backup_meta_file->Sync();
if (io_s.ok() && sync) {
io_s = backup_meta_file->Sync(iooptions_, nullptr);
}
if (s.ok()) {
s = backup_meta_file->Close();
if (io_s.ok()) {
io_s = backup_meta_file->Close(iooptions_, nullptr);
}
if (s.ok()) {
s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
if (io_s.ok()) {
io_s = fs_->RenameFile(meta_tmp_filename_, meta_filename_, iooptions_,
nullptr);
}
return s;
return io_s;
}
Status BackupEngineReadOnly::Open(const BackupEngineOptions& options, Env* env,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册