提交 cee1e8a0 编写于 作者: I Islam AbdelRahman

Parallelize LoadTableHandlers

Summary: Add a new option that all LoadTableHandlers to use multiple threads to load files on DB Open and Recover

Test Plan:
make check -j64
COMPILE_WITH_TSAN=1 make check -j64
DISABLE_JEMALLOC=1 make all valgrind_check -j64 (still running)

Reviewers: yhchiang, anthony, rven, kradhakrishnan, igor, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D43755
上级 4249f159
......@@ -8377,6 +8377,62 @@ TEST_F(DBTest, UnsupportedManualSync) {
ASSERT_TRUE(s.IsNotSupported());
}
TEST_F(DBTest, OpenDBWithInfiniteMaxOpenFiles) {
// Open DB with infinite max open files
// - First iteration use 1 thread to open files
// - Second iteration use 5 threads to open files
for (int iter = 0; iter < 2; iter++) {
Options options;
options.create_if_missing = true;
options.write_buffer_size = 100000;
options.disable_auto_compactions = true;
options.max_open_files = -1;
if (iter == 0) {
options.max_file_opening_threads = 1;
} else {
options.max_file_opening_threads = 5;
}
options = CurrentOptions(options);
DestroyAndReopen(options);
// Create 12 Files in L0 (then move then to L2)
for (int i = 0; i < 12; i++) {
std::string k = "L2_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush());
}
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
db_->CompactRange(compact_options, nullptr, nullptr);
// Create 12 Files in L0
for (int i = 0; i < 12; i++) {
std::string k = "L0_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush());
}
Close();
// Reopening the DB will load all exisitng files
Reopen(options);
ASSERT_EQ("12,0,12", FilesPerLevel(0));
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
for (const auto& level : files) {
for (const auto& file : level) {
ASSERT_TRUE(file.table_reader_handle != nullptr);
}
}
for (int i = 0; i < 12; i++) {
ASSERT_EQ(Get("L0_" + Key(i)), "L0_" + Key(i) + std::string(1000, 'a'));
ASSERT_EQ(Get("L2_" + Key(i)), "L2_" + Key(i) + std::string(1000, 'a'));
}
}
}
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
::testing::Values(1, 4));
......
......@@ -15,7 +15,9 @@
#include <inttypes.h>
#include <algorithm>
#include <atomic>
#include <set>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
......@@ -278,12 +280,26 @@ class VersionBuilder::Rep {
CheckConsistency(vstorage);
}
void LoadTableHandlers() {
void LoadTableHandlers(int max_threads) {
assert(table_cache_ != nullptr);
std::vector<FileMetaData*> files_meta;
for (int level = 0; level < base_vstorage_->num_levels(); level++) {
for (auto& file_meta_pair : levels_[level].added_files) {
auto* file_meta = file_meta_pair.second;
assert(!file_meta->table_reader_handle);
files_meta.push_back(file_meta);
}
}
std::atomic<size_t> next_file_meta_idx(0);
std::function<void()> load_handlers_func = [&]() {
while (true) {
size_t file_idx = next_file_meta_idx.fetch_add(1);
if (file_idx >= files_meta.size()) {
break;
}
auto* file_meta = files_meta[file_idx];
table_cache_->FindTable(
env_options_, *(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle, false);
......@@ -293,6 +309,19 @@ class VersionBuilder::Rep {
file_meta->table_reader_handle);
}
}
};
if (max_threads <= 1) {
load_handlers_func();
} else {
std::vector<std::thread> threads;
for (int i = 0; i < max_threads; i++) {
threads.emplace_back(load_handlers_func);
}
for (auto& t : threads) {
t.join();
}
}
}
......@@ -321,7 +350,9 @@ void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
}
void VersionBuilder::LoadTableHandlers() { rep_->LoadTableHandlers(); }
void VersionBuilder::LoadTableHandlers(int max_threads) {
rep_->LoadTableHandlers(max_threads);
}
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
FileMetaData* f) {
rep_->MaybeAddFile(vstorage, level, f);
......
......@@ -30,7 +30,7 @@ class VersionBuilder {
int level);
void Apply(VersionEdit* edit);
void SaveTo(VersionStorageInfo* vstorage);
void LoadTableHandlers();
void LoadTableHandlers(int max_threads = 1);
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f);
private:
......
......@@ -2393,9 +2393,9 @@ Status VersionSet::Recover(
auto* builder = builders_iter->second->version_builder();
if (db_options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder->LoadTableHandlers();
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder->LoadTableHandlers(db_options_->max_file_opening_threads);
}
Version* v = new Version(cfd, this, current_version_number_++);
......
......@@ -810,6 +810,11 @@ struct DBOptions {
// Default: 5000
int max_open_files;
// If max_open_files is -1, DB will open all files on DB::Open(). You can
// use this option to increase the number of threads used to open the files.
// Default: 1
int max_file_opening_threads;
// Once write-ahead logs exceed this size, we will start forcing the flush of
// column families whose memtables are backed by the oldest live WAL file
// (i.e. the ones that are causing all the space amplification). If set to 0
......
......@@ -208,6 +208,7 @@ DBOptions::DBOptions()
info_log_level(DEBUG_LEVEL),
#endif // NDEBUG
max_open_files(5000),
max_file_opening_threads(1),
max_total_wal_size(0),
statistics(nullptr),
disableDataSync(false),
......@@ -256,6 +257,7 @@ DBOptions::DBOptions(const Options& options)
info_log(options.info_log),
info_log_level(options.info_log_level),
max_open_files(options.max_open_files),
max_file_opening_threads(options.max_file_opening_threads),
max_total_wal_size(options.max_total_wal_size),
statistics(options.statistics),
disableDataSync(options.disableDataSync),
......@@ -306,6 +308,7 @@ void DBOptions::Dump(Logger* log) const {
Warn(log, " Options.env: %p", env);
Warn(log, " Options.info_log: %p", info_log.get());
Warn(log, " Options.max_open_files: %d", max_open_files);
Warn(log, "Options.max_file_opening_threads: %d", max_file_opening_threads);
Warn(log, " Options.max_total_wal_size: %" PRIu64, max_total_wal_size);
Warn(log, " Options.disableDataSync: %d", disableDataSync);
Warn(log, " Options.use_fsync: %d", use_fsync);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册