提交 12e030a9 编写于 作者: I Islam AbdelRahman

Use CompactRangeOptions for CompactRange

Summary:
This diff update DB::CompactRange to use RangeCompactionOptions instead of using multiple parameters
Old CompactRange is still available but deprecated

Test Plan:
make all check
make rocksdbjava
USE_CLANG=1 make all
OPT=-DROCKSDB_LITE make release

Reviewers: sdong, yhchiang, igor

Reviewed By: igor

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D40209
上级 c89369f5
......@@ -16,6 +16,7 @@
* options.hard_rate_limit is deprecated.
* When options.soft_rate_limit or options.level0_slowdown_writes_trigger is triggered, the way to slow down writes is changed to: write rate to DB is limited to to options.delayed_write_rate.
* DB::GetApproximateSizes() adds a parameter to allow the estimation to include data in mem table, with default to be not to include. It is now only supported in skip list mem table.
* DB::CompactRange() now accept CompactRangeOptions instead of multiple paramters. CompactRangeOptions is defined in include/rocksdb/options.h.
## 3.11.0 (5/19/2015)
### New Features
......
......@@ -77,6 +77,7 @@ using rocksdb::BackupEngine;
using rocksdb::BackupableDBOptions;
using rocksdb::BackupInfo;
using rocksdb::RestoreOptions;
using rocksdb::CompactRangeOptions;
using std::shared_ptr;
......@@ -1006,6 +1007,7 @@ void rocksdb_compact_range(
const char* limit_key, size_t limit_key_len) {
Slice a, b;
db->rep->CompactRange(
CompactRangeOptions(),
// Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr));
......@@ -1018,7 +1020,7 @@ void rocksdb_compact_range_cf(
const char* limit_key, size_t limit_key_len) {
Slice a, b;
db->rep->CompactRange(
column_family->rep,
CompactRangeOptions(), column_family->rep,
// Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr));
......
......@@ -215,11 +215,13 @@ class ColumnFamilyTest : public testing::Test {
}
void CompactAll(int cf) {
ASSERT_OK(db_->CompactRange(handles_[cf], nullptr, nullptr));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[cf], nullptr,
nullptr));
}
void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit));
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
}
int NumTableFilesAtLevel(int level, int cf) {
......
......@@ -309,16 +309,18 @@ class CompactionJobStatsTest : public testing::Test {
void Compact(int cf, const Slice& start, const Slice& limit,
uint32_t target_path_id) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1,
target_path_id));
CompactRangeOptions compact_options;
compact_options.target_path_id = target_path_id;
ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
}
void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit));
ASSERT_OK(
db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
}
void Compact(const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(&start, &limit));
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
}
void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
......
......@@ -3748,7 +3748,7 @@ class Benchmark {
void Compact(ThreadState* thread) {
DB* db = SelectDB(thread);
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
}
void PrintStats(const char* key) {
......
......@@ -1328,11 +1328,10 @@ void DBImpl::NotifyOnFlushCompleted(
#endif // ROCKSDB_LITE
}
Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool change_level, int target_level,
uint32_t target_path_id) {
if (target_path_id >= db_options_.db_paths.size()) {
Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
if (options.target_path_id >= db_options_.db_paths.size()) {
return Status::InvalidArgument("Invalid target path ID");
}
......@@ -1362,8 +1361,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
cfd->NumberLevels() > 1) {
// Always compact all files together.
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
cfd->NumberLevels() - 1, target_path_id, begin,
end);
cfd->NumberLevels() - 1, options.target_path_id,
begin, end);
final_output_level = cfd->NumberLevels() - 1;
} else {
for (int level = 0; level <= max_level_with_files; level++) {
......@@ -1384,8 +1383,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
output_level = ColumnFamilyData::kCompactToBaseLevel;
}
}
s = RunManualCompaction(cfd, level, output_level, target_path_id, begin,
end);
s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
begin, end);
if (!s.ok()) {
break;
}
......@@ -1403,8 +1402,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
return s;
}
if (change_level) {
s = ReFitLevel(cfd, final_output_level, target_level);
if (options.change_level) {
s = ReFitLevel(cfd, final_output_level, options.target_level);
}
LogFlush(db_options_.info_log);
......
......@@ -125,10 +125,9 @@ class DBImpl : public DB {
const Range* range, int n, uint64_t* sizes,
bool include_memtable = false) override;
using DB::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) override;
virtual Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override;
using DB::CompactFiles;
virtual Status CompactFiles(const CompactionOptions& compact_options,
......
......@@ -58,10 +58,9 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1,
uint32_t target_path_id = 0) override {
virtual Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
......
此差异已折叠。
......@@ -201,8 +201,11 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
// 2 ssts, 1 manifest
CheckFileTypeCounts(dbname_, 0, 2, 1);
std::string first("0"), last("999999");
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
Slice first_slice(first), last_slice(last);
db_->CompactRange(&first_slice, &last_slice, true, 2);
db_->CompactRange(compact_options, &first_slice, &last_slice);
// 1 sst after compaction
CheckFileTypeCounts(dbname_, 0, 1, 1);
......@@ -211,7 +214,7 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
Iterator *itr = 0;
CreateTwoLevels();
itr = db_->NewIterator(ReadOptions());
db_->CompactRange(&first_slice, &last_slice, true, 2);
db_->CompactRange(compact_options, &first_slice, &last_slice);
// 3 sst after compaction with live iterator
CheckFileTypeCounts(dbname_, 0, 3, 1);
delete itr;
......
......@@ -659,7 +659,7 @@ class FaultInjectionTest : public testing::Test {
Build(write_options, 0, num_pre_sync);
if (sync_use_compact_) {
db_->CompactRange(nullptr, nullptr);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
}
write_options.sync = false;
Build(write_options, num_pre_sync, num_post_sync);
......
......@@ -201,7 +201,8 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
ASSERT_OK(Flush(static_cast<int>(i)));
const Slice kStart = "a";
const Slice kEnd = "z";
ASSERT_OK(dbfull()->CompactRange(handles_[i], &kStart, &kEnd));
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
&kStart, &kEnd));
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
......
......@@ -294,7 +294,7 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
db->Flush(o);
cout << "Compaction started ...\n";
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
cout << "Compaction ended\n";
dumpDb(db);
......@@ -341,7 +341,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
tmp_sum += i;
}
db->Flush(o);
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(tmp_sum, counters->assert_get("b"));
if (count > max_merge) {
// in this case, FullMerge should be called instead.
......@@ -360,7 +360,7 @@ void testPartialMerge(Counters* counters, DB* db, size_t max_merge,
tmp_sum += i;
}
db->Flush(o);
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(tmp_sum, counters->assert_get("c"));
ASSERT_EQ(num_partial_merge_calls, 0U);
}
......@@ -467,7 +467,7 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
counters.add("test-key", 1);
counters.add("test-key", 1);
counters.add("test-key", 1);
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
}
DB* reopen_db;
......
......@@ -33,6 +33,7 @@ struct ReadOptions;
struct WriteOptions;
struct FlushOptions;
struct CompactionOptions;
struct CompactRangeOptions;
struct TableProperties;
class WriteBatch;
class Env;
......@@ -415,25 +416,42 @@ class DB {
// begin==nullptr is treated as a key before all keys in the database.
// end==nullptr is treated as a key after all keys in the database.
// Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr);
// db->CompactRange(options, nullptr, nullptr);
// Note that after the entire database is compacted, all data are pushed
// down to the last level containing any data. If the total data size
// after compaction is reduced, that level might not be appropriate for
// hosting all the files. In this case, client could set change_level
// to true, to move the files back to the minimum level capable of holding
// the data set or a given level (specified by non-negative target_level).
// Compaction outputs should be placed in options.db_paths[target_path_id].
// Behavior is undefined if target_path_id is out of range.
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) = 0;
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) {
return CompactRange(DefaultColumnFamily(), begin, end, change_level,
target_level, target_path_id);
// down to the last level containing any data. If the total data size after
// compaction is reduced, that level might not be appropriate for hosting all
// the files. In this case, client could set options.change_level to true, to
// move the files back to the minimum level capable of holding the data set
// or a given level (specified by non-negative options.target_level).
virtual Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) = 0;
virtual Status CompactRange(const CompactRangeOptions& options,
const Slice* begin, const Slice* end) {
return CompactRange(options, DefaultColumnFamily(), begin, end);
}
__attribute__((deprecated)) virtual Status
CompactRange(ColumnFamilyHandle* column_family, const Slice* begin,
const Slice* end, bool change_level = false,
int target_level = -1, uint32_t target_path_id = 0) {
CompactRangeOptions options;
options.change_level = change_level;
options.target_level = target_level;
options.target_path_id = target_path_id;
return CompactRange(options, column_family, begin, end);
}
__attribute__((deprecated)) virtual Status
CompactRange(const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) {
CompactRangeOptions options;
options.change_level = change_level;
options.target_level = target_level;
options.target_path_id = target_path_id;
return CompactRange(options, DefaultColumnFamily(), begin, end);
}
virtual Status SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& new_options) {
return Status::NotSupported("Not implemented");
......
......@@ -1237,6 +1237,19 @@ struct CompactionOptions {
: compression(kSnappyCompression),
output_file_size_limit(std::numeric_limits<uint64_t>::max()) {}
};
// CompactRangeOptions is used by CompactRange() call.
struct CompactRangeOptions {
// If true, compacted files will be moved to the minimum level capable
// of holding the data or given level (specified non-negative target_level).
bool change_level = false;
// If change_level is true and target_level have non-negative value, compacted
// files will be moved to target_level.
int target_level = -1;
// Compaction outputs will be placed in options.db_paths[target_path_id].
// Behavior is undefined if target_path_id is out of range.
uint32_t target_path_id = 0;
};
} // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_OPTIONS_H_
......@@ -127,12 +127,10 @@ class StackableDB : public DB {
}
using DB::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) override {
return db_->CompactRange(column_family, begin, end, change_level,
target_level, target_path_id);
virtual Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override {
return db_->CompactRange(options, column_family, begin, end);
}
using DB::CompactFiles;
......
......@@ -1476,13 +1476,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db,
jint jtarget_level, jint jtarget_path_id) {
rocksdb::Status s;
rocksdb::CompactRangeOptions compact_options;
compact_options.change_level = jreduce_level;
compact_options.target_level = jtarget_level;
compact_options.target_path_id = static_cast<uint32_t>(jtarget_path_id);
if (cf_handle != nullptr) {
s = db->CompactRange(cf_handle, nullptr, nullptr, jreduce_level,
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
s = db->CompactRange(compact_options, cf_handle, nullptr, nullptr);
} else {
// backwards compatibility
s = db->CompactRange(nullptr, nullptr, jreduce_level,
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
s = db->CompactRange(compact_options, nullptr, nullptr);
}
if (s.ok()) {
......@@ -1533,13 +1535,15 @@ void rocksdb_compactrange_helper(JNIEnv* env, rocksdb::DB* db,
const rocksdb::Slice end_slice(reinterpret_cast<char*>(end), jend_len);
rocksdb::Status s;
rocksdb::CompactRangeOptions compact_options;
compact_options.change_level = jreduce_level;
compact_options.target_level = jtarget_level;
compact_options.target_path_id = static_cast<uint32_t>(jtarget_path_id);
if (cf_handle != nullptr) {
s = db->CompactRange(cf_handle, &begin_slice, &end_slice, jreduce_level,
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
s = db->CompactRange(compact_options, cf_handle, &begin_slice, &end_slice);
} else {
// backwards compatibility
s = db->CompactRange(&begin_slice, &end_slice, jreduce_level,
jtarget_level, static_cast<uint32_t>(jtarget_path_id));
s = db->CompactRange(compact_options, &begin_slice, &end_slice);
}
env->ReleaseByteArrayElements(jbegin, begin, JNI_ABORT);
......
......@@ -441,7 +441,7 @@ void CompactorCommand::DoCommand() {
end = new Slice(to_);
}
db_->CompactRange(begin, end);
db_->CompactRange(CompactRangeOptions(), begin, end);
exec_state_ = LDBCommandExecuteResult::Succeed("");
delete begin;
......@@ -519,7 +519,7 @@ void DBLoaderCommand::DoCommand() {
cout << "Warning: " << bad_lines << " bad lines ignored." << endl;
}
if (compact_) {
db_->CompactRange(nullptr, nullptr);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
}
}
......@@ -1204,7 +1204,7 @@ void ReduceDBLevelsCommand::DoCommand() {
}
// Compact the whole DB to put all files to the highest level.
fprintf(stdout, "Compacting the db...\n");
db_->CompactRange(nullptr, nullptr);
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
CloseDB();
EnvOptions soptions;
......@@ -1309,9 +1309,10 @@ void ChangeCompactionStyleCommand::DoCommand() {
files_per_level.c_str());
// manual compact into a single file and move the file to level 0
db_->CompactRange(nullptr, nullptr,
true /* reduce level */,
0 /* reduce to level 0 */);
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 0;
db_->CompactRange(compact_options, nullptr, nullptr);
// verify compaction result
files_per_level = "";
......
......@@ -77,7 +77,7 @@ TEST_F(ManualCompactionTest, CompactTouchesAllKeys) {
db->Put(WriteOptions(), Slice("key4"), Slice("destroy"));
Slice key4("key4");
db->CompactRange(nullptr, &key4);
db->CompactRange(CompactRangeOptions(), nullptr, &key4);
Iterator* itr = db->NewIterator(ReadOptions());
itr->SeekToFirst();
ASSERT_TRUE(itr->Valid());
......@@ -130,7 +130,7 @@ TEST_F(ManualCompactionTest, Test) {
rocksdb::Slice greatest(end_key.data(), end_key.size());
// commenting out the line below causes the example to work correctly
db->CompactRange(&least, &greatest);
db->CompactRange(CompactRangeOptions(), &least, &greatest);
// count the keys
rocksdb::Iterator* iter = db->NewIterator(rocksdb::ReadOptions());
......
......@@ -54,10 +54,9 @@ class CompactedDBImpl : public DBImpl {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DBImpl::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool change_level = false, int target_level = -1,
uint32_t target_path_id = 0) override {
virtual Status CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
......
......@@ -515,7 +515,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) {
slists.Append("c", "bbnagnagsx");
slists.Append("a", "sa");
slists.Append("b", "df");
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
slists.Get("a", &a);
slists.Get("b", &b);
slists.Get("c", &c);
......@@ -536,7 +536,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) {
ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
// Compact, Get
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;");
ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
......@@ -544,7 +544,7 @@ TEST_F(StringAppendOperatorTest, PersistentFlushAndCompaction) {
// Append, Flush, Compact, Get
slists.Append("b", "afcg");
db->Flush(rocksdb::FlushOptions());
db->CompactRange(nullptr, nullptr);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);
slists.Get("b", &b);
ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg");
}
......
......@@ -589,7 +589,7 @@ class SpatialDBImpl : public SpatialDB {
Status t = Flush(FlushOptions(), cfh);
if (t.ok()) {
t = CompactRange(cfh, nullptr, nullptr);
t = CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr);
}
{
......
......@@ -168,9 +168,9 @@ class TtlTest : public testing::Test {
// Runs a manual compaction
void ManualCompact(ColumnFamilyHandle* cf = nullptr) {
if (cf == nullptr) {
db_ttl_->CompactRange(nullptr, nullptr);
db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
} else {
db_ttl_->CompactRange(cf, nullptr, nullptr);
db_ttl_->CompactRange(CompactRangeOptions(), cf, nullptr, nullptr);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册