提交 56dd0341 编写于 作者: S sdong

read_options.background_purge_on_iterator_cleanup to cover forward iterator...

read_options.background_purge_on_iterator_cleanup to cover forward iterator and log file closing too.

Summary: With read_options.background_purge_on_iterator_cleanup=true, File deletion and closing can still happen in forward iterator, or WAL file closing. Cover those cases too.

Test Plan: I am adding unit tests.

Reviewers: andrewkr, IslamAbdelRahman, yiwu

Reviewed By: yiwu

Subscribers: leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D61503
上级 ccecf3f4
......@@ -869,7 +869,7 @@ arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
autovector_test: util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
column_family_test: db/column_family_test.o $(LIBOBJECTS) $(TESTHARNESS)
column_family_test: db/column_family_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS)
......
......@@ -2837,6 +2837,217 @@ TEST_F(ColumnFamilyTest, CompactionSpeedupTwoColumnFamilies) {
ASSERT_EQ(2, dbfull()->BGCompactionsAllowed());
}
#ifndef ROCKSDB_LITE
TEST_F(ColumnFamilyTest, FlushCloseWALFiles) {
SpecialEnv env(Env::Default());
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
// Block flush jobs from running
test::SleepingBackgroundTask sleeping_task;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::HIGH);
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
ASSERT_EQ(2, env.num_open_wal_file_.load());
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
#endif // !ROCKSDB_LITE
TEST_F(ColumnFamilyTest, IteratorCloseWALFile1) {
SpecialEnv env(Env::Default());
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
// Create an iterator holding the current super version.
Iterator* it = db_->NewIterator(ReadOptions(), handles_[1]);
// A flush will make `it` hold the last reference of its super version.
Flush(1);
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
// Flush jobs will close previous WAL files after finishing. By
// block flush jobs from running, we trigger a condition where
// the iterator destructor should close the WAL files.
test::SleepingBackgroundTask sleeping_task;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::HIGH);
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
delete it;
ASSERT_EQ(1, env.num_open_wal_file_.load());
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
WaitForFlush(1);
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
TEST_F(ColumnFamilyTest, IteratorCloseWALFile2) {
SpecialEnv env(Env::Default());
// Allow both of flush and purge job to schedule.
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(2));
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
// Create an iterator holding the current super version.
ReadOptions ro;
ro.background_purge_on_iterator_cleanup = true;
Iterator* it = db_->NewIterator(ro, handles_[1]);
// A flush will make `it` hold the last reference of its super version.
Flush(1);
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
"DBImpl::BGWorkPurge:start"},
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
delete it;
ASSERT_EQ(2, env.num_open_wal_file_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
ASSERT_EQ(1, env.num_open_wal_file_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
#ifndef ROCKSDB_LITE // TEST functions are not supported in lite
TEST_F(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
SpecialEnv env(Env::Default());
// Allow both of flush and purge job to schedule.
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
column_family_options_.memtable_factory.reset(new SpecialSkipListFactory(3));
column_family_options_.level0_file_num_compaction_trigger = 2;
Open();
CreateColumnFamilies({"one"});
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodar2", "mirko"));
Flush(1);
// Create an iterator holding the current super version, as well as
// the SST file just flushed.
ReadOptions ro;
ro.tailing = true;
ro.background_purge_on_iterator_cleanup = true;
Iterator* it = db_->NewIterator(ro, handles_[1]);
// A flush will make `it` hold the last reference of its super version.
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodar2", "mirko"));
Flush(1);
WaitForCompaction();
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
ASSERT_OK(Put(0, "fodor", "mirko"));
ASSERT_OK(Put(1, "fodor", "mirko"));
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"ColumnFamilyTest::IteratorCloseWALFile2:0",
"DBImpl::BGWorkPurge:start"},
{"ColumnFamilyTest::IteratorCloseWALFile2:2",
"DBImpl::BackgroundCallFlush:start"},
{"DBImpl::BGWorkPurge:end", "ColumnFamilyTest::IteratorCloseWALFile2:1"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wo;
wo.sync = true;
ASSERT_OK(db_->Put(wo, handles_[1], "fodor", "mirko"));
env.delete_count_.store(0);
ASSERT_EQ(2, env.num_open_wal_file_.load());
// Deleting the iterator will clear its super version, triggering
// closing all files
it->Seek("");
ASSERT_EQ(2, env.num_open_wal_file_.load());
ASSERT_EQ(0, env.delete_count_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:0");
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:1");
ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());
TEST_SYNC_POINT("ColumnFamilyTest::IteratorCloseWALFile2:2");
WaitForFlush(1);
ASSERT_EQ(1, env.num_open_wal_file_.load());
ASSERT_EQ(1, env.delete_count_.load());
delete it;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Reopen();
ASSERT_EQ("mirko", Get(0, "fodor"));
ASSERT_EQ("mirko", Get(1, "fodor"));
db_options_.env = env_;
Close();
}
#endif // !ROCKSDB_LITE
// Disable on windows because SyncWAL requires env->IsSyncThreadSafe()
// to return true which is not so in unbuffered mode.
#ifndef OS_WIN
......
......@@ -716,6 +716,16 @@ uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
return min_log;
}
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
if (!job_context->logs_to_free.empty()) {
for (auto l : job_context->logs_to_free) {
AddToLogsToFreeQueue(l);
}
job_context->logs_to_free.clear();
SchedulePurge();
}
}
// * Returns the list of live files in 'sst_live'
// If it's doing full scan:
// * Returns the list of all files in the filesystem in
......@@ -2988,8 +2998,9 @@ void DBImpl::BGWorkCompaction(void* arg) {
void DBImpl::BGWorkPurge(void* db) {
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
TEST_SYNC_POINT("DBImpl::BGWorkPurge");
TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
}
void DBImpl::UnscheduleCallback(void* arg) {
......@@ -3004,20 +3015,32 @@ void DBImpl::UnscheduleCallback(void* arg) {
void DBImpl::BackgroundCallPurge() {
mutex_.Lock();
while (!purge_queue_.empty()) {
auto purge_file = purge_queue_.begin();
auto fname = purge_file->fname;
auto type = purge_file->type;
auto number = purge_file->number;
auto path_id = purge_file->path_id;
auto job_id = purge_file->job_id;
purge_queue_.pop_front();
// We use one single loop to clear both queues so that after existing the loop
// both queues are empty. This is stricter than what is needed, but can make
// it easier for us to reason the correctness.
while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) {
if (!purge_queue_.empty()) {
auto purge_file = purge_queue_.begin();
auto fname = purge_file->fname;
auto type = purge_file->type;
auto number = purge_file->number;
auto path_id = purge_file->path_id;
auto job_id = purge_file->job_id;
purge_queue_.pop_front();
mutex_.Unlock();
Status file_deletion_status;
DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number,
path_id);
mutex_.Lock();
mutex_.Unlock();
Status file_deletion_status;
DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number,
path_id);
mutex_.Lock();
} else {
assert(!logs_to_free_queue_.empty());
log::Writer* log_writer = *(logs_to_free_queue_.begin());
logs_to_free_queue_.pop_front();
mutex_.Unlock();
delete log_writer;
mutex_.Lock();
}
}
bg_purge_scheduled_--;
......@@ -3084,6 +3107,8 @@ void DBImpl::BackgroundCallFlush() {
JobContext job_context(next_job_id_.fetch_add(1), true);
assert(bg_flush_scheduled_);
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
{
InstrumentedMutexLock l(&mutex_);
......@@ -3655,6 +3680,9 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
state->mu->Lock();
state->super_version->Cleanup();
state->db->FindObsoleteFiles(&job_context, false, true);
if (state->background_purge) {
state->db->ScheduleBgLogWriterClose(&job_context);
}
state->mu->Unlock();
delete state->super_version;
......
......@@ -365,6 +365,10 @@ class DBImpl : public DB {
// compaction status.
int BGCompactionsAllowed() const;
// move logs pending closing from job_context to the DB queue and
// schedule a purge
void ScheduleBgLogWriterClose(JobContext* job_context);
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
......@@ -493,6 +497,9 @@ class DBImpl : public DB {
void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
void MarkLogAsContainingPrepSection(uint64_t log);
void AddToLogsToFreeQueue(log::Writer* log_writer) {
logs_to_free_queue_.push_back(log_writer);
}
Status NewDB();
......@@ -879,6 +886,9 @@ class DBImpl : public DB {
// A queue to store filenames of the files to be purged
std::deque<PurgeFileInfo> purge_queue_;
// A queue to store log writers to close
std::deque<log::Writer*> logs_to_free_queue_;
int unscheduled_flushes_;
int unscheduled_compactions_;
......
......@@ -30,6 +30,8 @@ SpecialEnv::SpecialEnv(Env* base)
manifest_write_error_.store(false, std::memory_order_release);
log_write_error_.store(false, std::memory_order_release);
random_file_open_counter_.store(0, std::memory_order_relaxed);
delete_count_.store(0, std::memory_order_relaxed);
num_open_wal_file_.store(0);
log_write_slowdown_ = 0;
bytes_written_ = 0;
sync_counter_ = 0;
......
......@@ -285,7 +285,10 @@ class SpecialEnv : public EnvWrapper {
class WalFile : public WritableFile {
public:
WalFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
: env_(env), base_(std::move(b)) {}
: env_(env), base_(std::move(b)) {
env_->num_open_wal_file_.fetch_add(1);
}
virtual ~WalFile() { env_->num_open_wal_file_.fetch_add(-1); }
Status Append(const Slice& data) override {
#if !(defined NDEBUG) || !defined(OS_WIN)
TEST_SYNC_POINT("SpecialEnv::WalFile::Append:1");
......@@ -443,6 +446,11 @@ class SpecialEnv : public EnvWrapper {
addon_time_.load();
}
virtual Status DeleteFile(const std::string& fname) override {
delete_count_.fetch_add(1);
return target()->DeleteFile(fname);
}
Random rnd_;
port::Mutex rnd_mutex_; // Lock to pretect rnd_
......@@ -470,6 +478,9 @@ class SpecialEnv : public EnvWrapper {
// Slow down every log write, in micro-seconds.
std::atomic<int> log_write_slowdown_;
// Number of WAL files that are still open for write.
std::atomic<int> num_open_wal_file_;
bool count_random_reads_;
anon::AtomicCounter random_read_counter_;
std::atomic<size_t> random_read_bytes_counter_;
......@@ -494,6 +505,8 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int64_t> addon_time_;
std::atomic<int> delete_count_;
bool time_elapse_only_sleep_;
bool no_sleep_;
......
......@@ -16,12 +16,17 @@
#include <vector>
#include <string>
#include "db/dbformat.h"
#include "db/column_family.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
#include "db/job_context.h"
#include "db/log_writer.h"
#include "db/memtable_list.h"
#include "db/snapshot_impl.h"
#include "db/version_edit.h"
#include "db/write_controller.h"
#include "db/write_thread.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
......@@ -33,11 +38,6 @@
#include "util/instrumented_mutex.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"
#include "db/job_context.h"
namespace rocksdb {
......
......@@ -153,10 +153,14 @@ void ForwardIterator::SVCleanup() {
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(&job_context, false, true);
if (read_options_.background_purge_on_iterator_cleanup) {
db_->ScheduleBgLogWriterClose(&job_context);
}
db_->mutex_.Unlock();
delete sv_;
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context);
db_->PurgeObsoleteFiles(
job_context, read_options_.background_purge_on_iterator_cleanup);
}
job_context.Clean();
}
......
......@@ -12,7 +12,6 @@
#include <string>
#include <vector>
#include "db/column_family.h"
#include "db/log_writer.h"
namespace rocksdb {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册