提交 d4627e6d 编写于 作者: S sheki

Move WAL files to archive directory, instead of deleting.

Summary:
Create a directory "archive" in the DB directory.
During DeleteObsolteFiles move the WAL files (*.log) to the Archive directory,
instead of deleting.

Test Plan: Created a DB using DB_Bench. Reopened it. Checked if files move.

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D6975
上级 d29f1819
......@@ -529,6 +529,10 @@ void leveldb_options_set_db_log_dir(
opt->rep.db_log_dir = db_log_dir;
}
void leveldb_options_set_WAL_ttl_seconds(leveldb_options_t* opt, uint64_t ttl) {
opt->rep.WAL_ttl_seconds = ttl;
}
leveldb_comparator_t* leveldb_comparator_create(
void* state,
void (*destructor)(void*),
......
......@@ -225,6 +225,9 @@ static bool FLAGS_disable_auto_compactions = false;
// that compacts Levelk with LevelK+1
static int FLAGS_source_compaction_factor = 1;
// Set the TTL for the WAL Files.
static uint64_t FLAGS_WAL_ttl_seconds = 0;
extern bool useOsBuffer;
extern bool useFsReadAhead;
extern bool useMmapRead;
......@@ -963,6 +966,7 @@ class Benchmark {
options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type;
options.WAL_ttl_seconds = FLAGS_WAL_ttl_seconds;
if (FLAGS_min_level_to_compress >= 0) {
assert(FLAGS_min_level_to_compress <= FLAGS_num_levels);
options.compression_per_level = new CompressionType[FLAGS_num_levels];
......@@ -1439,6 +1443,8 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--source_compaction_factor=%d%c",
&n, &junk) == 1 && n > 0) {
FLAGS_source_compaction_factor = n;
} else if (sscanf(argv[i], "--wal_ttl=%d%c", &n, &junk) == 1) {
FLAGS_WAL_ttl_seconds = static_cast<uint64_t>(n);
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1);
......
......@@ -5,12 +5,13 @@
#include "db/db_impl.h"
#include <algorithm>
#include <climits>
#include <cstdio>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include <algorithm>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
......@@ -42,6 +43,8 @@ namespace leveldb {
void dumpLeveldbBuildVersion(Logger * log);
const std::string DBImpl::ARCHIVAL_DIR = "archive";
static Status NewLogger(const std::string& dbname,
const std::string& db_log_dir,
Env* env,
......@@ -225,6 +228,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
host_name_ = "localhost";
}
last_log_ts = 0;
}
DBImpl::~DBImpl() {
......@@ -329,6 +333,18 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
}
std::string DBImpl::GetArchivalDirectoryName() {
return dbname_ + "/" + ARCHIVAL_DIR;
}
const Status DBImpl::CreateArchivalDirectory() {
if (options_.WAL_ttl_seconds > 0) {
std::string archivalPath = GetArchivalDirectoryName();
return env_->CreateDirIfMissing(archivalPath);
}
return Status::OK();
}
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'allfiles'.
void DBImpl::FindObsoleteFiles(DeletionState& deletion_state) {
......@@ -372,6 +388,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
uint64_t number;
FileType type;
std::vector<std::string> old_log_files;
for (size_t i = 0; i < state.allfiles.size(); i++) {
if (ParseFileName(state.allfiles[i], &number, &type)) {
bool keep = true;
......@@ -406,6 +423,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
}
if (!keep) {
const std::string currentFile = state.allfiles[i];
if (type == kTableFile) {
// record the files to be evicted from the cache
state.files_to_evict.push_back(number);
......@@ -413,11 +431,22 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
Log(options_.info_log, "Delete type=%d #%lld\n",
int(type),
static_cast<unsigned long long>(number));
Status st = env_->DeleteFile(dbname_ + "/" + state.allfiles[i]);
if(!st.ok()) {
Log(options_.info_log, "Delete type=%d #%lld FAILED\n",
if (type == kLogFile && options_.WAL_ttl_seconds > 0) {
Status st = env_->RenameFile(dbname_ + "/" + currentFile,
dbname_ + "/" + ARCHIVAL_DIR + "/" + currentFile);
if (!st.ok()) {
Log(options_.info_log, "RenameFile type=%d #%lld FAILED\n",
int(type),
static_cast<unsigned long long>(number));
}
} else {
Status st = env_->DeleteFile(dbname_ + "/" + currentFile);
if(!st.ok()) {
Log(options_.info_log, "Delete type=%d #%lld FAILED\n",
int(type),
static_cast<unsigned long long>(number));
}
}
}
}
......@@ -446,12 +475,40 @@ void DBImpl::EvictObsoleteFiles(DeletionState& state) {
void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld();
DeletionState deletion_state;
std::set<uint64_t> live;
std::vector<std::string> allfiles;
std::vector<uint64_t> files_to_evict;
FindObsoleteFiles(deletion_state);
PurgeObsoleteFiles(deletion_state);
EvictObsoleteFiles(deletion_state);
PurgeObsoleteWALFiles();
}
void DBImpl::PurgeObsoleteWALFiles() {
if (options_.WAL_ttl_seconds != ULONG_MAX && options_.WAL_ttl_seconds > 0) {
std::vector<std::string> WALFiles;
std::string archivalDir = dbname_ + "/" + ARCHIVAL_DIR;
env_->GetChildren(archivalDir, &WALFiles);
int64_t currentTime;
const Status status = env_->GetCurrentTime(&currentTime);
assert(status.ok());
for (std::vector<std::string>::iterator it = WALFiles.begin();
it != WALFiles.end();
++it) {
uint64_t fileMTime;
const std::string filePath = archivalDir + "/" + *it;
const Status s = env_->GetFileModificationTime(filePath, &fileMTime);
if (s.ok()) {
if (status.ok() &&
(currentTime - fileMTime > options_.WAL_ttl_seconds)) {
Status delStatus = env_->DeleteFile(filePath);
if (!delStatus.ok()) {
Log(options_.info_log,
"Failed Deleting a WAL file Error : i%s",
delStatus.ToString().c_str());
}
}
} // Ignore errors.
}
}
}
Status DBImpl::Recover(VersionEdit* edit, bool no_log_recory,
......@@ -2056,9 +2113,14 @@ Status DB::Open(const Options& options, const std::string& dbname,
"no_block_cache is true while block_cache is not NULL");
}
DBImpl* impl = new DBImpl(options, dbname);
Status s = impl->CreateArchivalDirectory();
if (!s.ok()) {
delete impl;
return s;
}
impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels());
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
......
......@@ -55,6 +55,9 @@ class DBImpl : public DB {
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size);
// Return's the path of the archival directory.
std::string GetArchivalDirectoryName();
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]
......@@ -113,6 +116,8 @@ protected:
void MaybeIgnoreError(Status* s) const;
const Status CreateArchivalDirectory();
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
......@@ -172,6 +177,7 @@ protected:
// Removes the file listed in files_to_evict from the table_cache
void EvictObsoleteFiles(DeletionState& deletion_state);
void PurgeObsoleteWALFiles();
// Constant after construction
const InternalFilterPolicy internal_filter_policy_;
bool owns_info_log_;
......@@ -292,6 +298,7 @@ protected:
CompactionStats* stats_;
static const int KEEP_LOG_FILE_NUM = 1000;
static const std::string ARCHIVAL_DIR;
std::string db_absolute_path_;
// count of the number of contiguous delaying writes
......
......@@ -3,6 +3,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include <set>
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "db/db_impl.h"
......@@ -1188,7 +1190,7 @@ TEST(DBTest, RepeatedWritesToSameKey) {
// We must have at most one file per level except for level-0,
// which may have up to kL0_StopWritesTrigger files.
const int kMaxFiles = dbfull()->NumberLevels() +
dbfull()->Level0StopWriteTrigger();
dbfull()->Level0StopWriteTrigger();
Random rnd(301);
std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
......@@ -1703,7 +1705,7 @@ TEST(DBTest, DeletionMarkers2) {
TEST(DBTest, OverlapInLevel0) {
do {
int tmp = dbfull()->MaxMemCompactionLevel();
int tmp = dbfull()->MaxMemCompactionLevel();
ASSERT_EQ(tmp, 2) << "Fix test to match config";
// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
......@@ -2194,6 +2196,73 @@ TEST(DBTest, SnapshotFiles) {
dbfull()->DisableFileDeletions();
}
void ListLogFiles(Env* env,
const std::string& path,
std::vector<uint64_t>* logFiles) {
std::vector<std::string> files;
env->GetChildren(path, &files);
uint64_t number;
FileType type;
for (size_t i = 0; i < files.size(); ++i) {
if (ParseFileName(files[i], &number, &type)) {
if (type == kLogFile) {
logFiles->push_back(number);
}
}
}
}
TEST(DBTest, WALArchival) {
std::string value(1024, '1');
Options options = CurrentOptions();
options.create_if_missing = true;
options.WAL_ttl_seconds = 1000;
DestroyAndReopen(&options);
// TEST : Create DB with a ttl.
// Put some keys. Count the log files present in the DB just after insert.
// Re-open db. Causes deletion/archival to take place.
// Assert that the files moved under "/archive".
std::string archiveDir = dbfull()->GetArchivalDirectoryName();
for (int i = 0; i < 10; ++i) {
for (int j = 0; j < 10; ++j) {
ASSERT_OK(Put(Key(10*i+j), value));
}
std::vector<uint64_t> logFiles;
ListLogFiles(env_, dbname_, &logFiles);
options.create_if_missing = false;
Reopen(&options);
std::vector<uint64_t> logs;
ListLogFiles(env_, archiveDir, &logs);
std::set<uint64_t> archivedFiles(logs.begin(), logs.end());
for (std::vector<uint64_t>::iterator it = logFiles.begin();
it != logFiles.end();
++it) {
ASSERT_TRUE(archivedFiles.find(*it) != archivedFiles.end());
}
}
// REOPEN database with 0 TTL. all files should have been deleted.
std::vector<uint64_t> logFiles;
ListLogFiles(env_, archiveDir, &logFiles);
ASSERT_TRUE(logFiles.size() > 0);
options.WAL_ttl_seconds = 1;
env_->SleepForMicroseconds(2*1000*1000);
Reopen(&options);
logFiles.clear();
ListLogFiles(env_, archiveDir, &logFiles);
ASSERT_TRUE(logFiles.size() == 0);
}
TEST(DBTest, ReadCompaction) {
std::string value(4096, '4'); // a string of size 4K
......
......@@ -90,10 +90,15 @@ class HdfsEnv : public Env {
virtual Status CreateDir(const std::string& name);
virtual Status CreateDirIfMissing(const std::string& name);
virtual Status DeleteDir(const std::string& name);
virtual Status GetFileSize(const std::string& fname, uint64_t* size);
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime);
virtual Status RenameFile(const std::string& src, const std::string& target);
virtual Status LockFile(const std::string& fname, FileLock** lock);
......@@ -247,10 +252,17 @@ class HdfsEnv : public Env {
virtual Status CreateDir(const std::string& name){return notsup;}
virtual Status CreateDirIfMissing(const std::string& name){return notsup;}
virtual Status DeleteDir(const std::string& name){return notsup;}
virtual Status GetFileSize(const std::string& fname, uint64_t* size){return notsup;}
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* time) {
return notsup;
}
virtual Status RenameFile(const std::string& src, const std::string& target){return notsup;}
virtual Status LockFile(const std::string& fname, FileLock** lock){return notsup;}
......
......@@ -316,6 +316,10 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK();
}
virtual Status CreateDirIfMissing(const std::string& dirname) {
return Status::OK();
}
virtual Status DeleteDir(const std::string& dirname) {
return Status::OK();
}
......@@ -330,6 +334,11 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK();
}
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* time) {
return Status::NotSupported("getFileMTime", "Not supported in MemEnv");
}
virtual Status RenameFile(const std::string& src,
const std::string& target) {
MutexLock lock(&mutex_);
......
......@@ -81,15 +81,22 @@ class Env {
// Delete the named file.
virtual Status DeleteFile(const std::string& fname) = 0;
// Create the specified directory.
// Create the specified directory. Returns error if directory exists.
virtual Status CreateDir(const std::string& dirname) = 0;
// Creates directory if missing. Return Ok if it exists, or successful in
// Creating.
virtual Status CreateDirIfMissing(const std::string& dirname) = 0;
// Delete the specified directory.
virtual Status DeleteDir(const std::string& dirname) = 0;
// Store the size of fname in *file_size.
virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
// Store the last modification time of fname in *file_mtime.
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) = 0;
// Rename file src to target.
virtual Status RenameFile(const std::string& src,
const std::string& target) = 0;
......@@ -323,10 +330,19 @@ class EnvWrapper : public Env {
}
Status DeleteFile(const std::string& f) { return target_->DeleteFile(f); }
Status CreateDir(const std::string& d) { return target_->CreateDir(d); }
Status CreateDirIfMissing(const std::string& d) {
return target_->CreateDirIfMissing(d);
}
Status DeleteDir(const std::string& d) { return target_->DeleteDir(d); }
Status GetFileSize(const std::string& f, uint64_t* s) {
return target_->GetFileSize(f, s);
}
Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) {
return target_->GetFileModificationTime(fname, file_mtime);
}
Status RenameFile(const std::string& s, const std::string& t) {
return target_->RenameFile(s, t);
}
......
......@@ -343,6 +343,16 @@ struct Options {
// Disable automatic compactions. Manual compactions can still
// be issued on this database.
bool disable_auto_compactions;
// The number of seconds a WAL(write ahead log) should be kept after it has
// been marked as Not Live. If the value is set. The WAL files are moved to
// the archive direcotory and deleted after the given TTL.
// If set to 0, WAL files are deleted as soon as they are not required by
// the database.
// If set to std::numeric_limits<uint64_t>::max() the WAL files will never be
// deleted.
// Default : 0
uint64_t WAL_ttl_seconds;
};
// Options that control read operations
......
......@@ -431,6 +431,16 @@ Status HdfsEnv::CreateDir(const std::string& name) {
return IOError(name, errno);
};
Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
const int value = hdfsExists(fileSys_, name.c_str());
// Not atomic. state might change b/w hdfsExists and CreateDir.
if (value == 0) {
return Status::OK();
} else {
return CreateDir(name);
}
};
Status HdfsEnv::DeleteDir(const std::string& name) {
return DeleteFile(name);
};
......@@ -446,6 +456,18 @@ Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
return IOError(fname, errno);
}
Status HdfsEnv::GetFileModificationTime(const std::string& fname,
uint64_t* time) {
hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
if (pFileInfo != NULL) {
*time = static_cast<uint64_t>(pFileInfo->mLastMod);
hdfsFreeFileInfo(pFileInfo, 1);
return Status::OK();
}
return IOError(fname, errno);
}
// The rename is not atomic. HDFS does not allow a renaming if the
// target already exists. So, we delete the target before attemting the
// rename.
......
......@@ -601,6 +601,16 @@ class PosixEnv : public Env {
return result;
};
virtual Status CreateDirIfMissing(const std::string& name) {
Status result;
if (mkdir(name.c_str(), 0755) != 0) {
if (errno != EEXIST) {
result = IOError(name, errno);
}
}
return result;
};
virtual Status DeleteDir(const std::string& name) {
Status result;
if (rmdir(name.c_str()) != 0) {
......@@ -621,6 +631,15 @@ class PosixEnv : public Env {
return s;
}
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* file_mtime) {
struct stat s;
if (stat(fname.c_str(), &s) !=0) {
return IOError(fname, errno);
}
*file_mtime = static_cast<uint64_t>(s.st_mtime);
return Status::OK();
}
virtual Status RenameFile(const std::string& src, const std::string& target) {
Status result;
if (rename(src.c_str(), target.c_str()) != 0) {
......
......@@ -53,7 +53,8 @@ Options::Options()
table_cache_numshardbits(4),
compaction_filter_args(NULL),
CompactionFilter(NULL),
disable_auto_compactions(false) {
disable_auto_compactions(false),
WAL_ttl_seconds(0){
}
void
......@@ -140,6 +141,8 @@ Options::Dump(
CompactionFilter);
Log(log," Options.disable_auto_compactions: %d",
disable_auto_compactions);
Log(log," Options.WAL_ttl_seconds: %ld",
WAL_ttl_seconds);
} // Options::Dump
} // namespace leveldb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册