提交 0b3172d0 编写于 作者: Y Yueh-Hsuan Chiang

Add EventListener::OnTableFileDeletion()

Summary:
Add EventListener::OnTableFileDeletion(), which will be
called when a table file is deleted.

Test Plan: Extend three existing tests in db_test to verify the deleted files.

Reviewers: rven, anthony, kradhakrishnan, igor, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D38931
上级 2d0b9e5f
...@@ -350,12 +350,15 @@ DBImpl::~DBImpl() { ...@@ -350,12 +350,15 @@ DBImpl::~DBImpl() {
if (opened_successfully_) { if (opened_successfully_) {
JobContext job_context(next_job_id_.fetch_add(1)); JobContext job_context(next_job_id_.fetch_add(1));
FindObsoleteFiles(&job_context, true); FindObsoleteFiles(&job_context, true);
mutex_.Unlock();
// manifest number starting from 2 // manifest number starting from 2
job_context.manifest_file_number = 1; job_context.manifest_file_number = 1;
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context); PurgeObsoleteFiles(job_context);
} }
job_context.Clean(); job_context.Clean();
mutex_.Lock();
} }
for (auto l : logs_to_free_) { for (auto l : logs_to_free_) {
...@@ -520,7 +523,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, ...@@ -520,7 +523,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->min_pending_output = std::numeric_limits<uint64_t>::max(); job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
} }
// get obsolete files // Get obsolete files. This function will also update the list of
// pending files in VersionSet().
versions_->GetObsoleteFiles(&job_context->sst_delete_files, versions_->GetObsoleteFiles(&job_context->sst_delete_files,
job_context->min_pending_output); job_context->min_pending_output);
...@@ -714,10 +718,10 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { ...@@ -714,10 +718,10 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
file_deletion_status.ToString().c_str()); file_deletion_status.ToString().c_str());
} }
if (type == kTableFile) { if (type == kTableFile) {
event_logger_.Log() << "job" << state.job_id << "event" EventHelpers::LogAndNotifyTableFileDeletion(
<< "table_file_deletion" &event_logger_, state.job_id, number, fname,
<< "file_number" << number file_deletion_status, GetName(),
<< "status" << file_deletion_status.ToString(); db_options_.listeners);
} }
} }
...@@ -751,10 +755,13 @@ void DBImpl::DeleteObsoleteFiles() { ...@@ -751,10 +755,13 @@ void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld(); mutex_.AssertHeld();
JobContext job_context(next_job_id_.fetch_add(1)); JobContext job_context(next_job_id_.fetch_add(1));
FindObsoleteFiles(&job_context, true); FindObsoleteFiles(&job_context, true);
mutex_.Unlock();
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context); PurgeObsoleteFiles(job_context);
} }
job_context.Clean(); job_context.Clean();
mutex_.Lock();
} }
Status DBImpl::Directories::CreateAndNewDirectory( Status DBImpl::Directories::CreateAndNewDirectory(
...@@ -1433,7 +1440,7 @@ Status DBImpl::CompactFiles( ...@@ -1433,7 +1440,7 @@ Status DBImpl::CompactFiles(
// FindObsoleteFiles(). This is because job_context does not // FindObsoleteFiles(). This is because job_context does not
// catch all created files if compaction failed. // catch all created files if compaction failed.
FindObsoleteFiles(&job_context, !s.ok()); FindObsoleteFiles(&job_context, !s.ok());
} } // release the mutex
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
...@@ -1444,6 +1451,7 @@ Status DBImpl::CompactFiles( ...@@ -1444,6 +1451,7 @@ Status DBImpl::CompactFiles(
// It also applies to access other states that DB owns. // It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog(); log_buffer.FlushBufferToLog();
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
// no mutex is locked here. No need to Unlock() and Lock() here.
PurgeObsoleteFiles(job_context); PurgeObsoleteFiles(job_context);
} }
job_context.Clean(); job_context.Clean();
...@@ -3948,9 +3956,11 @@ Status DBImpl::DeleteFile(std::string name) { ...@@ -3948,9 +3956,11 @@ Status DBImpl::DeleteFile(std::string name) {
} }
FindObsoleteFiles(&job_context, false); FindObsoleteFiles(&job_context, false);
} // lock released here } // lock released here
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
// remove files outside the db-lock // remove files outside the db-lock
if (job_context.HaveSomethingToDelete()) { if (job_context.HaveSomethingToDelete()) {
// Call PurgeObsoleteFiles() without holding mutex.
PurgeObsoleteFiles(job_context); PurgeObsoleteFiles(job_context);
} }
job_context.Clean(); job_context.Clean();
......
...@@ -11400,6 +11400,38 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) { ...@@ -11400,6 +11400,38 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) {
ASSERT_EQ(NumTableFilesAtLevel(2), 0); ASSERT_EQ(NumTableFilesAtLevel(2), 0);
} }
namespace {
class OnFileDeletionListener : public EventListener {
public:
OnFileDeletionListener() :
matched_count_(0),
expected_file_name_("") {}
void SetExpectedFileName(
const std::string file_name) {
expected_file_name_ = file_name;
}
void VerifyMatchedCount(size_t expected_value) {
ASSERT_EQ(matched_count_, expected_value);
}
void OnTableFileDeleted(
const TableFileDeletionInfo& info) override {
if (expected_file_name_ != "") {
ASSERT_EQ(expected_file_name_, info.file_path);
expected_file_name_ = "";
matched_count_++;
}
}
private:
size_t matched_count_;
std::string expected_file_name_;
};
} // namespace
TEST_F(DBTest, DynamicLevelCompressionPerLevel) { TEST_F(DBTest, DynamicLevelCompressionPerLevel) {
if (!Snappy_Supported()) { if (!Snappy_Supported()) {
return; return;
...@@ -11432,6 +11464,9 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel) { ...@@ -11432,6 +11464,9 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel) {
options.compression_per_level[1] = kNoCompression; options.compression_per_level[1] = kNoCompression;
options.compression_per_level[2] = kSnappyCompression; options.compression_per_level[2] = kSnappyCompression;
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options); DestroyAndReopen(options);
// Insert more than 80K. L4 should be base level. Neither L0 nor L4 should // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should
...@@ -11464,8 +11499,11 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel) { ...@@ -11464,8 +11499,11 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel) {
ColumnFamilyMetaData cf_meta; ColumnFamilyMetaData cf_meta;
db_->GetColumnFamilyMetaData(&cf_meta); db_->GetColumnFamilyMetaData(&cf_meta);
for (auto file : cf_meta.levels[4].files) { for (auto file : cf_meta.levels[4].files) {
listener->SetExpectedFileName(dbname_ + file.name);
ASSERT_OK(dbfull()->DeleteFile(file.name)); ASSERT_OK(dbfull()->DeleteFile(file.name));
} }
listener->VerifyMatchedCount(cf_meta.levels[4].files.size());
int num_keys = 0; int num_keys = 0;
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions())); std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
...@@ -12162,6 +12200,8 @@ TEST_F(DBTest, DeleteMovedFileAfterCompaction) { ...@@ -12162,6 +12200,8 @@ TEST_F(DBTest, DeleteMovedFileAfterCompaction) {
options.create_if_missing = true; options.create_if_missing = true;
options.level0_file_num_compaction_trigger = options.level0_file_num_compaction_trigger =
2; // trigger compaction when we have 2 files 2; // trigger compaction when we have 2 files
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
DestroyAndReopen(options); DestroyAndReopen(options);
Random rnd(301); Random rnd(301);
...@@ -12214,12 +12254,14 @@ TEST_F(DBTest, DeleteMovedFileAfterCompaction) { ...@@ -12214,12 +12254,14 @@ TEST_F(DBTest, DeleteMovedFileAfterCompaction) {
ASSERT_EQ("0,0,2", FilesPerLevel(0)); ASSERT_EQ("0,0,2", FilesPerLevel(0));
// iterator is holding the file // iterator is holding the file
ASSERT_TRUE(env_->FileExists(dbname_ + "/" + moved_file_name)); ASSERT_TRUE(env_->FileExists(dbname_ + moved_file_name));
listener->SetExpectedFileName(dbname_ + moved_file_name);
iterator.reset(); iterator.reset();
// this file should have been compacted away // this file should have been compacted away
ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + moved_file_name)); ASSERT_TRUE(!env_->FileExists(dbname_ + moved_file_name));
listener->VerifyMatchedCount(1);
} }
} }
...@@ -12393,6 +12435,10 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { ...@@ -12393,6 +12435,10 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
2; // trigger compaction when we have 2 files 2; // trigger compaction when we have 2 files
options.max_background_flushes = 2; options.max_background_flushes = 2;
options.max_background_compactions = 2; options.max_background_compactions = 2;
OnFileDeletionListener* listener = new OnFileDeletionListener();
options.listeners.emplace_back(listener);
Reopen(options); Reopen(options);
Random rnd(301); Random rnd(301);
...@@ -12441,6 +12487,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { ...@@ -12441,6 +12487,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
db_->GetLiveFilesMetaData(&metadata); db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(metadata.size(), 1U); ASSERT_EQ(metadata.size(), 1U);
auto file_on_L2 = metadata[0].name; auto file_on_L2 = metadata[0].name;
listener->SetExpectedFileName(dbname_ + file_on_L2);
ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr)); ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr));
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0)); ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
...@@ -12456,7 +12503,8 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { ...@@ -12456,7 +12503,8 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
ASSERT_EQ(metadata.size(), 2U); ASSERT_EQ(metadata.size(), 2U);
// This file should have been deleted // This file should have been deleted
ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + file_on_L2)); ASSERT_TRUE(!env_->FileExists(dbname_ + file_on_L2));
listener->VerifyMatchedCount(1);
} }
TEST_F(DBTest, CloseSpeedup) { TEST_F(DBTest, CloseSpeedup) {
......
...@@ -73,4 +73,36 @@ void EventHelpers::LogAndNotifyTableFileCreation( ...@@ -73,4 +73,36 @@ void EventHelpers::LogAndNotifyTableFileCreation(
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
} }
void EventHelpers::LogAndNotifyTableFileDeletion(
EventLogger* event_logger, int job_id,
uint64_t file_number, const std::string& file_path,
const Status& status, const std::string& dbname,
const std::vector<std::shared_ptr<EventListener>>& listeners) {
JSONWriter jwriter;
AppendCurrentTime(&jwriter);
jwriter << "job" << job_id
<< "event" << "table_file_deletion"
<< "file_number" << file_number;
if (!status.ok()) {
jwriter << "status" << status.ToString();
}
jwriter.EndObject();
event_logger->Log(jwriter);
#ifndef ROCKSDB_LITE
TableFileDeletionInfo info;
info.db_name = dbname;
info.job_id = job_id;
info.file_path = file_path;
info.status = status;
for (auto listener : listeners) {
listener->OnTableFileDeleted(info);
}
#endif // !ROCKSDB_LITE
}
} // namespace rocksdb } // namespace rocksdb
...@@ -23,6 +23,11 @@ class EventHelpers { ...@@ -23,6 +23,11 @@ class EventHelpers {
EventLogger* event_logger, EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners, const std::vector<std::shared_ptr<EventListener>>& listeners,
const FileDescriptor& fd, const TableFileCreationInfo& info); const FileDescriptor& fd, const TableFileCreationInfo& info);
static void LogAndNotifyTableFileDeletion(
EventLogger* event_logger, int job_id,
uint64_t file_number, const std::string& file_path,
const Status& status, const std::string& db_name,
const std::vector<std::shared_ptr<EventListener>>& listeners);
}; };
} // namespace rocksdb } // namespace rocksdb
...@@ -39,6 +39,17 @@ struct TableFileCreationInfo { ...@@ -39,6 +39,17 @@ struct TableFileCreationInfo {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
struct TableFileDeletionInfo {
// The name of the database where the file was deleted.
std::string db_name;
// The path to the deleted file.
std::string file_path;
// The id of the job which deleted the file.
int job_id;
// The status indicating whether the deletion was successfull or not.
Status status;
};
struct CompactionJobInfo { struct CompactionJobInfo {
CompactionJobInfo() = default; CompactionJobInfo() = default;
explicit CompactionJobInfo(const CompactionJobStats& _stats) : explicit CompactionJobInfo(const CompactionJobStats& _stats) :
...@@ -122,6 +133,20 @@ class EventListener { ...@@ -122,6 +133,20 @@ class EventListener {
bool triggered_writes_slowdown, bool triggered_writes_slowdown,
bool triggered_writes_stop) {} bool triggered_writes_stop) {}
// A call-back function for RocksDB which will be called whenever
// a SST file is deleted. Different from OnCompactionCompleted and
// OnFlushCompleted, this call-back is designed for external logging
// service and thus only provide string parameters instead
// of a pointer to DB. Applications that build logic basic based
// on file creations and deletions is suggested to implement
// OnFlushCompleted and OnCompactionCompleted.
//
// Note that if applications would like to use the passed reference
// outside this function call, they should make copies from the
// returned value.
virtual void OnTableFileDeleted(
const TableFileDeletionInfo& info) {}
// A call-back function for RocksDB which will be called whenever // A call-back function for RocksDB which will be called whenever
// a registered RocksDB compacts a file. The default implementation // a registered RocksDB compacts a file. The default implementation
// is a no-op. // is a no-op.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册