提交 02675026 编写于 作者: D dyniusz

Support for LevelDB SST with .ldb suffix

Summary:
	Handle SST files with both ".sst" and ".ldb" suffix.
	This enables user to migrate from leveldb to rocksdb.

Test Plan:
        Added unit test with DB operating on SSTs with names schema.
        See db/dc_test.cc:SSTsWithLdbSuffixHandling for details

Reviewers: yhchiang, sdong, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D48003
上级 5855cdb6
......@@ -4,6 +4,7 @@
### New Features
* Added single delete operation as a more efficient way to delete keys that have not been overwritten.
* Added experimental AddFile() to DB interface that allow users to add files created by SstFileWriter into an empty Database, see include/rocksdb/sst_file_writer.h and DB::AddFile() for more info.
* Added support for opening SST files with .ldb suffix which enables opening LevelDB databases.
### Public API Changes
* Added SingleDelete() to the DB interface.
......
......@@ -4481,6 +4481,10 @@ Status DBImpl::CheckConsistency() {
uint64_t fsize = 0;
Status s = env_->GetFileSize(file_path, &fsize);
if (!s.ok() &&
env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
s = Status::OK();
}
if (!s.ok()) {
corruption_messages +=
"Can't access " + md.name + ": " + s.ToString() + "\n";
......
......@@ -26,7 +26,6 @@
#include "db/filename.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
......@@ -9639,6 +9638,47 @@ TEST_F(DBTest, AddExternalSstFileMultiThreaded) {
kSkipFIFOCompaction));
}
// 1 Create some SST files by inserting K-V pairs into DB
// 2 Close DB and change suffix from ".sst" to ".ldb" for every other SST file
// 3 Open DB and check if all key can be read
TEST_F(DBTest, SSTsWithLdbSuffixHandling) {
Options options = CurrentOptions();
options.write_buffer_size = 110 << 10; // 110KB
options.num_levels = 4;
DestroyAndReopen(options);
Random rnd(301);
int key_id = 0;
for (int i = 0; i < 10; ++i) {
GenerateNewFile(&rnd, &key_id, false);
}
Flush();
Close();
int const num_files = GetSstFileCount(dbname_);
ASSERT_GT(num_files, 0);
std::vector<std::string> filenames;
GetSstFiles(dbname_, &filenames);
int num_ldb_files = 0;
for (unsigned int i = 0; i < filenames.size(); ++i) {
if (i & 1) {
continue;
}
std::string const rdb_name = dbname_ + "/" + filenames[i];
std::string const ldb_name = Rocks2LevelTableFileName(rdb_name);
ASSERT_TRUE(env_->RenameFile(rdb_name, ldb_name).ok());
++num_ldb_files;
}
ASSERT_GT(num_ldb_files, 0);
ASSERT_EQ(num_files, GetSstFileCount(dbname_));
Reopen(options);
for (int k = 0; k < key_id; ++k) {
ASSERT_NE("NOT_FOUND", Get(Key(k)));
}
Destroy(options);
}
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
::testing::Values(1, 4));
......
......@@ -24,6 +24,9 @@
namespace rocksdb {
static const std::string kRocksDbTFileExt = "sst";
static const std::string kLevelDbTFileExt = "ldb";
// Given a path, flatten the path name by replacing all chars not in
// {[0-9,a-z,A-Z,-,_,.]} with _. And append '_LOG\0' at the end.
// Return the number of chars stored in dest not including the trailing '\0'.
......@@ -78,7 +81,16 @@ std::string ArchivedLogFileName(const std::string& name, uint64_t number) {
}
std::string MakeTableFileName(const std::string& path, uint64_t number) {
return MakeFileName(path, number, "sst");
return MakeFileName(path, number, kRocksDbTFileExt.c_str());
}
std::string Rocks2LevelTableFileName(const std::string& fullname) {
assert(fullname.size() > kRocksDbTFileExt.size() + 1);
if (fullname.size() <= kRocksDbTFileExt.size() + 1) {
return "";
}
return fullname.substr(0, fullname.size() - kRocksDbTFileExt.size()) +
kLevelDbTFileExt;
}
uint64_t TableFileNameToNumber(const std::string& name) {
......@@ -273,17 +285,23 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
if (!ConsumeDecimalNumber(&rest, &num)) {
return false;
}
if (rest.size() <= 1 || rest[0] != '.') {
return false;
}
rest.remove_prefix(1);
Slice suffix = rest;
if (suffix == Slice(".log")) {
if (suffix == Slice("log")) {
*type = kLogFile;
if (log_type && !archive_dir_found) {
*log_type = kAliveLogFile;
}
} else if (archive_dir_found) {
return false; // Archive dir can contain only log files
} else if (suffix == Slice(".sst")) {
} else if (suffix == Slice(kRocksDbTFileExt) ||
suffix == Slice(kLevelDbTFileExt)) {
*type = kTableFile;
} else if (suffix == Slice(".dbtmp")) {
} else if (suffix == Slice("dbtmp")) {
*type = kTempFile;
} else {
return false;
......
......@@ -55,6 +55,10 @@ extern std::string ArchivedLogFileName(const std::string& dbname,
extern std::string MakeTableFileName(const std::string& name, uint64_t number);
// Return the name of sstable with LevelDB suffix
// created from RocksDB sstable suffixed name
extern std::string Rocks2LevelTableFileName(const std::string& fullname);
// the reverse function of MakeTableFileName
// TODO(yhchiang): could merge this function with ParseFileName()
extern uint64_t TableFileNameToNumber(const std::string& name);
......
......@@ -666,6 +666,7 @@ class VersionSet {
Status GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData** metadata, ColumnFamilyData** cfd);
// This function doesn't support leveldb SST filenames
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
......
......@@ -794,19 +794,22 @@ std::string DBTestBase::DumpSSTableList() {
return property;
}
void DBTestBase::GetSstFiles(std::string path,
std::vector<std::string>* files) {
env_->GetChildren(path, files);
files->erase(
std::remove_if(files->begin(), files->end(), [](std::string name) {
uint64_t number;
FileType type;
return !(ParseFileName(name, &number, &type) && type == kTableFile);
}), files->end());
}
int DBTestBase::GetSstFileCount(std::string path) {
std::vector<std::string> files;
env_->GetChildren(path, &files);
int sst_count = 0;
uint64_t number;
FileType type;
for (size_t i = 0; i < files.size(); i++) {
if (ParseFileName(files[i], &number, &type) && type == kTableFile) {
sst_count++;
}
}
return sst_count;
GetSstFiles(path, &files);
return static_cast<int>(files.size());
}
// this will generate non-overlapping files since it keeps increasing key_idx
......
......@@ -618,6 +618,8 @@ class DBTestBase : public testing::Test {
std::string DumpSSTableList();
void GetSstFiles(std::string path, std::vector<std::string>* files);
int GetSstFileCount(std::string path);
// this will generate non-overlapping files since it keeps increasing key_idx
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册