提交 c4e19b77 编写于 作者: O omegaga

Add a read option to enable background purge when cleaning up iterators

Summary:
Add a read option `background_purge_on_iterator_cleanup` to avoid deleting files in foreground when destroying iterators.
Instead, a job is scheduled in high priority queue and would be executed in a separate background thread.

Test Plan: Add a variant of PurgeObsoleteFileTest. Turn on background purge option in the new test, and use sleeping task to ensure files are deleted in background.

Reviewers: IslamAbdelRahman, sdong

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D59499
上级 fa813f74
......@@ -7,6 +7,7 @@
### New Features
* Add avoid_flush_during_recovery option.
* Add a read option background_purge_on_iterator_cleanup to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread.
## 4.9.0 (6/9/2016)
### Public API changes
......
......@@ -328,6 +328,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
num_running_compactions_(0),
bg_flush_scheduled_(0),
num_running_flushes_(0),
bg_purge_scheduled_(0),
disable_delete_obsolete_files_(0),
delete_obsolete_files_next_run_(
options.env->NowMicros() +
......@@ -407,7 +408,9 @@ DBImpl::~DBImpl() {
bg_flush_scheduled_ -= flushes_unscheduled;
// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
while (bg_compaction_scheduled_ || bg_flush_scheduled_ ||
bg_purge_scheduled_) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
}
EraseThreadStatusDbInfo();
......@@ -880,11 +883,42 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
}
}; // namespace
// Delete obsolete files and log status and information of file deletion
void DBImpl::DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
const std::string& fname, FileType type,
uint64_t number, uint32_t path_id) {
if (type == kTableFile) {
file_deletion_status = DeleteSSTFile(&db_options_, fname, path_id);
} else {
file_deletion_status = env_->DeleteFile(fname);
}
if (file_deletion_status.ok()) {
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
fname.c_str(), type, number, file_deletion_status.ToString().c_str());
} else if (env_->FileExists(fname).IsNotFound()) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
" -- %s\n",
job_id, fname.c_str(), type, number,
file_deletion_status.ToString().c_str());
} else {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
fname.c_str(), type, number, file_deletion_status.ToString().c_str());
}
if (type == kTableFile) {
EventHelpers::LogAndNotifyTableFileDeletion(
&event_logger_, job_id, number, fname, file_deletion_status, GetName(),
db_options_.listeners);
}
}
// Diffs the files listed in filenames and those that do not
// belong to live files are posibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
// we'd better have sth to delete
assert(state.HaveSomethingToDelete());
......@@ -1012,33 +1046,12 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
}
#endif // !ROCKSDB_LITE
Status file_deletion_status;
if (type == kTableFile) {
file_deletion_status = DeleteSSTFile(&db_options_, fname, path_id);
if (schedule_only) {
InstrumentedMutexLock guard_lock(&mutex_);
SchedulePendingPurge(fname, type, number, path_id, state.job_id);
} else {
file_deletion_status = env_->DeleteFile(fname);
}
if (file_deletion_status.ok()) {
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id,
fname.c_str(), type, number,
file_deletion_status.ToString().c_str());
} else if (env_->FileExists(fname).IsNotFound()) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
" -- %s\n",
state.job_id, fname.c_str(), type, number,
file_deletion_status.ToString().c_str());
} else {
Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
"[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
state.job_id, fname.c_str(), type, number,
file_deletion_status.ToString().c_str());
}
if (type == kTableFile) {
EventHelpers::LogAndNotifyTableFileDeletion(
&event_logger_, state.job_id, number, fname,
file_deletion_status, GetName(),
db_options_.listeners);
DeleteObsoleteFileImpl(file_deletion_status, state.job_id, fname, type,
number, path_id);
}
}
......@@ -2800,6 +2813,15 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
}
}
void DBImpl::SchedulePurge() {
mutex_.AssertHeld();
assert(opened_successfully_);
// Purge operations are put into High priority queue
bg_purge_scheduled_++;
env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
}
int DBImpl::BGCompactionsAllowed() const {
if (write_controller_.NeedSpeedupCompaction()) {
return db_options_.max_background_compactions;
......@@ -2854,6 +2876,14 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
}
}
void DBImpl::SchedulePendingPurge(std::string fname, FileType type,
uint64_t number, uint32_t path_id,
int job_id) {
mutex_.AssertHeld();
PurgeFileInfo file_info(fname, type, number, path_id, job_id);
purge_queue_.push_back(std::move(file_info));
}
void DBImpl::BGWorkFlush(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkFlush");
......@@ -2869,6 +2899,12 @@ void DBImpl::BGWorkCompaction(void* arg) {
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(ca.m);
}
void DBImpl::BGWorkPurge(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkPurge");
reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
}
void DBImpl::UnscheduleCallback(void* arg) {
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
delete reinterpret_cast<CompactionArg*>(arg);
......@@ -2878,6 +2914,34 @@ void DBImpl::UnscheduleCallback(void* arg) {
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
}
void DBImpl::BackgroundCallPurge() {
mutex_.Lock();
while (!purge_queue_.empty()) {
auto purge_file = purge_queue_.begin();
auto fname = purge_file->fname;
auto type = purge_file->type;
auto number = purge_file->number;
auto path_id = purge_file->path_id;
auto job_id = purge_file->job_id;
purge_queue_.pop_front();
mutex_.Unlock();
Status file_deletion_status;
DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number,
path_id);
mutex_.Lock();
}
bg_purge_scheduled_--;
bg_cv_.SignalAll();
// IMPORTANT:there should be no code after calling SignalAll. This call may
// signal the DB destructor that it's OK to proceed with destruction. In
// that case, all DB variables will be dealloacated and referencing them
// will cause trouble.
mutex_.Unlock();
}
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer) {
mutex_.AssertHeld();
......@@ -3477,12 +3541,17 @@ bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) {
namespace {
struct IterState {
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version)
: db(_db), mu(_mu), super_version(_super_version) {}
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
const ReadOptions* _read_options)
: db(_db),
mu(_mu),
super_version(_super_version),
read_options(_read_options) {}
DBImpl* db;
InstrumentedMutex* mu;
SuperVersion* super_version;
const ReadOptions* read_options;
};
static void CleanupIteratorState(void* arg1, void* arg2) {
......@@ -3492,6 +3561,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool background_purge =
state->read_options->background_purge_on_iterator_cleanup;
state->mu->Lock();
state->super_version->Cleanup();
......@@ -3500,7 +3571,17 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
delete state->super_version;
if (job_context.HaveSomethingToDelete()) {
state->db->PurgeObsoleteFiles(job_context);
if (background_purge) {
// PurgeObsoleteFiles here does not delete files. Instead, it adds the
// files to be deleted to a job queue, and deletes it in a separate
// background thread.
state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
state->mu->Lock();
state->db->SchedulePurge();
state->mu->Unlock();
} else {
state->db->PurgeObsoleteFiles(job_context);
}
}
job_context.Clean();
}
......@@ -3526,7 +3607,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
super_version->current->AddIterators(read_options, env_options_,
&merge_iter_builder);
internal_iter = merge_iter_builder.Finish();
IterState* cleanup = new IterState(this, &mutex_, super_version);
IterState* cleanup =
new IterState(this, &mutex_, super_version, &read_options);
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
return internal_iter;
......
......@@ -374,7 +374,10 @@ class DBImpl : public DB {
// belong to live files are posibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void PurgeObsoleteFiles(const JobContext& background_contet);
void PurgeObsoleteFiles(const JobContext& background_contet,
bool schedule_only = false);
void SchedulePurge();
ColumnFamilyHandle* DefaultColumnFamily() const override;
......@@ -554,6 +557,8 @@ class DBImpl : public DB {
struct WriteContext;
struct PurgeFileInfo;
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
......@@ -569,6 +574,10 @@ class DBImpl : public DB {
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
// Delete obsolete files and log status and information of file deletion
void DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
const std::string& fname, FileType type,
uint64_t number, uint32_t path_id);
// Background process needs to call
// auto x = CaptureCurrentFileNumberInPendingOutputs()
......@@ -640,11 +649,15 @@ class DBImpl : public DB {
void MaybeScheduleFlushOrCompaction();
void SchedulePendingFlush(ColumnFamilyData* cfd);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
uint32_t path_id, int job_id);
static void BGWorkCompaction(void* arg);
static void BGWorkFlush(void* db);
static void BGWorkPurge(void* arg);
static void UnscheduleCallback(void* arg);
void BackgroundCallCompaction(void* arg);
void BackgroundCallFlush();
void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, void* m = 0);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
......@@ -695,9 +708,9 @@ class DBImpl : public DB {
// * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't
// made any progress
// * whenever a compaction made any progress
// * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is
// done, even if it didn't make any progress)
// * whenever there is an error in background flush or compaction
// * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
// (i.e. whenever a flush is done, even if it didn't make any progress)
// * whenever there is an error in background purge, flush or compaction
InstrumentedCondVar bg_cv_;
uint64_t logfile_number_;
std::deque<uint64_t>
......@@ -817,6 +830,19 @@ class DBImpl : public DB {
// State is protected with db mutex.
std::list<uint64_t> pending_outputs_;
// PurgeFileInfo is a structure to hold information of files to be deleted in
// purge_queue_
struct PurgeFileInfo {
std::string fname;
FileType type;
uint64_t number;
uint32_t path_id;
int job_id;
PurgeFileInfo(std::string fn, FileType t, uint64_t num, uint32_t pid,
int jid)
: fname(fn), type(t), number(num), path_id(pid), job_id(jid) {}
};
// flush_queue_ and compaction_queue_ hold column families that we need to
// flush and compact, respectively.
// A column family is inserted into flush_queue_ when it satisfies condition
......@@ -841,6 +867,9 @@ class DBImpl : public DB {
// invariant(column family present in compaction_queue_ <==>
// ColumnFamilyData::pending_compaction_ == true)
std::deque<ColumnFamilyData*> compaction_queue_;
// A queue to store filenames of the files to be purged
std::deque<PurgeFileInfo> purge_queue_;
int unscheduled_flushes_;
int unscheduled_compactions_;
......@@ -856,6 +885,9 @@ class DBImpl : public DB {
// stores the number of flushes are currently running
int num_running_flushes_;
// number of background obsolete file purge jobs, submitted to the HIGH pool
int bg_purge_scheduled_;
// Information for a manual compaction
struct ManualCompaction {
ColumnFamilyData* cfd;
......
......@@ -9,20 +9,21 @@
#ifndef ROCKSDB_LITE
#include "rocksdb/db.h"
#include <stdlib.h>
#include <map>
#include <string>
#include <vector>
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/transaction_log.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "rocksdb/env.h"
#include "rocksdb/transaction_log.h"
#include <vector>
#include <stdlib.h>
#include <map>
#include <string>
namespace rocksdb {
......@@ -152,6 +153,15 @@ class DeleteFileTest : public testing::Test {
ASSERT_EQ(required_manifest, manifest_cnt);
}
static void DoSleep(void* arg) {
auto test = reinterpret_cast<DeleteFileTest*>(arg);
test->env_->SleepForMicroseconds(2 * 1000 * 1000);
}
// An empty job to guard all jobs are processed
static void GuardFinish(void* arg) {
TEST_SYNC_POINT("DeleteFileTest::GuardFinish");
}
};
TEST_F(DeleteFileTest, AddKeysAndQueryLevels) {
......@@ -231,6 +241,81 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) {
CloseDB();
}
TEST_F(DeleteFileTest, BackgroundPurgeTest) {
std::string first("0"), last("999999");
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
Slice first_slice(first), last_slice(last);
// We keep an iterator alive
Iterator* itr = 0;
CreateTwoLevels();
ReadOptions options;
options.background_purge_on_iterator_cleanup = true;
itr = db_->NewIterator(options);
db_->CompactRange(compact_options, &first_slice, &last_slice);
// 3 sst after compaction with live iterator
CheckFileTypeCounts(dbname_, 0, 3, 1);
test::SleepingBackgroundTask sleeping_task_before;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_before, Env::Priority::HIGH);
delete itr;
test::SleepingBackgroundTask sleeping_task_after;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_task_after, Env::Priority::HIGH);
// Make sure no purges are executed foreground
CheckFileTypeCounts(dbname_, 0, 3, 1);
sleeping_task_before.WakeUp();
sleeping_task_before.WaitUntilDone();
// Make sure all background purges are executed
sleeping_task_after.WakeUp();
sleeping_task_after.WaitUntilDone();
// 1 sst after iterator deletion
CheckFileTypeCounts(dbname_, 0, 1, 1);
CloseDB();
}
TEST_F(DeleteFileTest, BackgroundPurgeTestMultipleJobs) {
std::string first("0"), last("999999");
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
Slice first_slice(first), last_slice(last);
// We keep an iterator alive
CreateTwoLevels();
ReadOptions options;
options.background_purge_on_iterator_cleanup = true;
Iterator* itr1 = db_->NewIterator(options);
CreateTwoLevels();
Iterator* itr2 = db_->NewIterator(options);
db_->CompactRange(compact_options, &first_slice, &last_slice);
// 5 sst files after 2 compactions with 2 live iterators
CheckFileTypeCounts(dbname_, 0, 5, 1);
// ~DBImpl should wait until all BGWorkPurge are finished
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::~DBImpl:WaitJob", "DBImpl::BGWorkPurge"},
{"DeleteFileTest::GuardFinish",
"DeleteFileTest::BackgroundPurgeTestMultipleJobs:DBClose"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
delete itr1;
env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::HIGH);
delete itr2;
env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::HIGH);
CloseDB();
TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeTestMultipleJobs:DBClose");
// 1 sst after iterator deletion
CheckFileTypeCounts(dbname_, 0, 1, 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DeleteFileTest, DeleteFileWithIterator) {
CreateTwoLevels();
ReadOptions options;
......
......@@ -1487,6 +1487,12 @@ struct ReadOptions {
// Default: false
bool pin_data;
// If true, when PurgeObsoleteFile is called in CleanupIteratorState, we
// schedule a background job in the flush job queue and delete obsolete files
// in background.
// Default: false
bool background_purge_on_iterator_cleanup;
// If non-zero, NewIterator will create a new table reader which
// performs reads of the given size. Using a large size (> 2MB) can
// improve the performance of forward iteration on spinning disks.
......
......@@ -811,6 +811,7 @@ ReadOptions::ReadOptions()
total_order_seek(false),
prefix_same_as_start(false),
pin_data(false),
background_purge_on_iterator_cleanup(false),
readahead_size(0) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
......@@ -827,6 +828,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
total_order_seek(false),
prefix_same_as_start(false),
pin_data(false),
background_purge_on_iterator_cleanup(false),
readahead_size(0) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册