提交 ae36e509 编写于 作者: D Dhruba Borthakur

The BackupAPI should also list the length of the manifest file.

Summary:
The GetLiveFiles() api lists the set of sst files and the current
MANIFEST file. But the database continues to append new data to the
MANIFEST file even when the application is backing it up to the
backup location. This means that the database-version that is
stored in the MANIFEST FILE in the backup location
does not correspond to the sst files returned by GetLiveFiles.

This API adds a new parameter to GetLiveFiles. This new parmeter
returns the current size of the MANIFEST file.

Test Plan: Unit test attached.

Reviewers: heyongqiang

Reviewed By: heyongqiang

Differential Revision: https://reviews.facebook.net/D5631
上级 dd45b8cd
......@@ -26,7 +26,10 @@ Status DBImpl::EnableFileDeletions() {
return Status::OK();
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret) {
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size) {
*manifest_file_size = 0;
// flush all dirty data to disk.
Status status = Flush(FlushOptions());
......@@ -55,6 +58,9 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret) {
ret[live.size()+1] = DescriptorFileName("",
versions_->ManifestFileNumber());
// find length of manifest file while holding the mutex lock
*manifest_file_size = versions_->ManifestFileSize();
return Status::OK();
}
......
......@@ -51,7 +51,8 @@ class DBImpl : public DB {
virtual Status Flush(const FlushOptions& options);
virtual Status DisableFileDeletions();
virtual Status EnableFileDeletions();
virtual Status GetLiveFiles(std::vector<std::string>&);
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size);
// Extra methods (for testing) that are not in the public DB interface
......
......@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "db/db_impl.h"
......@@ -1704,27 +1705,69 @@ TEST(DBTest, SnapshotFiles) {
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
// get a file snapshot
uint64_t manifest_number = 0;
uint64_t manifest_size = 0;
std::vector<std::string> files;
dbfull()->DisableFileDeletions();
dbfull()->GetLiveFiles(files);
dbfull()->GetLiveFiles(files, &manifest_size);
// CURRENT, MANIFEST, *.sst files
ASSERT_EQ(files.size(), 3);
uint64_t number = 0;
FileType type;
// copy these files to a new snapshot directory
std::string snapdir = dbname_ + ".snapdir/";
std::string mkdir = "mkdir -p " + snapdir;
ASSERT_EQ(system(mkdir.c_str()), 0);
for (int i = 0; i < files.size(); i++) {
for (unsigned int i = 0; i < files.size(); i++) {
std::string src = dbname_ + "/" + files[i];
std::string dest = snapdir + "/" + files[i];
std::string cmd = "cp " + src + " " + dest;
ASSERT_EQ(system(cmd.c_str()), 0);
uint64_t size;
ASSERT_OK(env_->GetFileSize(src, &size));
// record the number and the size of the
// latest manifest file
if (ParseFileName(files[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) {
if (number > manifest_number) {
manifest_number = number;
ASSERT_GE(size, manifest_size);
size = manifest_size; // copy only valid MANIFEST data
}
}
}
SequentialFile* srcfile;
ASSERT_OK(env_->NewSequentialFile(src, &srcfile));
WritableFile* destfile;
ASSERT_OK(env_->NewWritableFile(dest, &destfile));
char buffer[4096];
Slice slice;
while (size > 0) {
uint64_t one = std::min(sizeof(buffer), size);
ASSERT_OK(srcfile->Read(one, &slice, buffer));
ASSERT_OK(destfile->Append(slice));
size -= slice.size();
}
ASSERT_OK(destfile->Close());
delete destfile;
delete srcfile;
}
// release file snapshot
dbfull()->DisableFileDeletions();
// overwrite one key, this key should not appear in the snapshot
std::vector<std::string> extras;
for (unsigned int i = 0; i < 1; i++) {
extras.push_back(RandomString(&rnd, 100000));
ASSERT_OK(Put(Key(i), extras[i]));
}
// verify that data in the snapshot are correct
Options opts;
DB* snapdb;
......@@ -1734,11 +1777,44 @@ TEST(DBTest, SnapshotFiles) {
ReadOptions roptions;
std::string val;
for (int i = 0; i < 80; i++) {
for (unsigned int i = 0; i < 80; i++) {
stat = snapdb->Get(roptions, Key(i), &val);
ASSERT_EQ(values[i].compare(val), 0);
}
delete snapdb;
// look at the new live files after we added an 'extra' key
// and after we took the first snapshot.
uint64_t new_manifest_number = 0;
uint64_t new_manifest_size = 0;
std::vector<std::string> newfiles;
dbfull()->DisableFileDeletions();
dbfull()->GetLiveFiles(newfiles, &new_manifest_size);
// find the new manifest file. assert that this manifest file is
// the same one as in the previous snapshot. But its size should be
// larger because we added an extra key after taking the
// previous shapshot.
for (unsigned int i = 0; i < newfiles.size(); i++) {
std::string src = dbname_ + "/" + newfiles[i];
// record the lognumber and the size of the
// latest manifest file
if (ParseFileName(newfiles[i].substr(1), &number, &type)) {
if (type == kDescriptorFile) {
if (number > new_manifest_number) {
uint64_t size;
new_manifest_number = number;
ASSERT_OK(env_->GetFileSize(src, &size));
ASSERT_GE(size, new_manifest_size);
}
}
}
}
ASSERT_EQ(manifest_number, new_manifest_number);
ASSERT_GT(new_manifest_size, manifest_size);
// release file snapshot
dbfull()->DisableFileDeletions();
}
// Multi-threaded test:
......@@ -1936,7 +2012,7 @@ class ModelDB: public DB {
virtual Status EnableFileDeletions() {
return Status::OK();
}
virtual Status GetLiveFiles(std::vector<std::string>&) {
virtual Status GetLiveFiles(std::vector<std::string>&, uint64_t* size) {
return Status::OK();
}
......
......@@ -269,7 +269,8 @@ Version::Version(VersionSet* vset)
file_to_compact_(NULL),
file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {
compaction_level_(-1),
offset_manifest_file_(0) {
files_ = new std::vector<FileMetaData*>[vset->NumberLevels()];
}
......@@ -699,20 +700,20 @@ VersionSet::VersionSet(const std::string& dbname,
descriptor_log_(NULL),
dummy_versions_(this),
current_(NULL) {
compact_pointer_ = new std::string[options_->num_levels];
max_file_size_ = new uint64_t[options_->num_levels];
level_max_bytes_ = new uint64_t[options->num_levels];
int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
for (int i = 0; i < options_->num_levels; i++) {
if (i > 1) {
max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;
level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier;
} else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
}
}
compact_pointer_ = new std::string[options_->num_levels];
max_file_size_ = new uint64_t[options_->num_levels];
level_max_bytes_ = new uint64_t[options->num_levels];
int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
for (int i = 0; i < options_->num_levels; i++) {
if (i > 1) {
max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;
level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier;
} else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
}
}
AppendVersion(new Version(this));
}
......@@ -769,6 +770,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
uint64_t new_manifest_file_size = 0;
Status s;
if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
......@@ -807,11 +809,15 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
// find offset in manifest file where this version is stored.
new_manifest_file_size = descriptor_file_->GetFileSize();
mu->Lock();
}
// Install the new version
if (s.ok()) {
v->offset_manifest_file_ = new_manifest_file_size;
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
......@@ -857,6 +863,11 @@ Status VersionSet::Recover() {
if (!s.ok()) {
return s;
}
uint64_t manifest_file_size;
s = env_->GetFileSize(dscname, &manifest_file_size);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
......@@ -936,6 +947,7 @@ Status VersionSet::Recover() {
builder.SaveTo(v);
// Install recovered version
Finalize(v);
v->offset_manifest_file_ = manifest_file_size;
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
......
......@@ -131,6 +131,9 @@ class Version {
double compaction_score_;
int compaction_level_;
// The offset in the manifest file where this version is stored.
uint64_t offset_manifest_file_;
explicit Version(VersionSet* vset);
~Version();
......@@ -257,6 +260,9 @@ class VersionSet {
// of files per level. Uses *scratch as backing store.
const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const;
// Return the size of the current manifest file
const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; }
private:
class Builder;
......
......@@ -164,9 +164,13 @@ class DB {
virtual Status EnableFileDeletions() = 0;
// Retrieve the list of all files in the database. The files are
// related to the dbname and are not absolute paths. This list
// can be used to generate a backup.
virtual Status GetLiveFiles(std::vector<std::string>&) = 0;
// relative to the dbname and are not absolute paths. This list
// can be used to generate a backup. The valid size of the manifest
// file is returned in manifest_file_size. The manifest file is
// an ever growing file, but only the portion specified
// by manifest_file_size is valid for this snapshot.
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size) = 0;
private:
// No copying allowed
......
......@@ -233,6 +233,13 @@ class WritableFile {
return Sync();
}
/*
* Get the size of valid data in the file.
*/
virtual uint64_t GetFileSize() {
return 0;
}
private:
// No copying allowed
WritableFile(const WritableFile&);
......
......@@ -319,6 +319,16 @@ class PosixMmapFile : public WritableFile {
// fdatasync because pending_sync_ has already been cleared.
return Sync();
}
/**
* Get the size of valid data in the file. This will not match the
* size that is returned from the filesystem because we use mmap
* to extend file by map_size every time.
*/
virtual uint64_t GetFileSize() {
size_t used = dst_ - base_;
return file_offset_ + used;
}
};
static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册