提交 e06cf1a0 编写于 作者: M Mike Kolupaev

[wal changes 3/3] method in DB to sync WAL without blocking writers

Summary:
Subj. We really need this feature.

Previous diff D40899 has most of the changes to make this possible, this diff just adds the method.

Test Plan: `make check`, the new test fails without this diff; ran with ASAN, TSAN and valgrind.

Reviewers: igor, rven, IslamAbdelRahman, anthony, kradhakrishnan, tnovak, yhchiang, sdong

Reviewed By: sdong

Subscribers: MarkCallaghan, maykov, hermanlee4, yoshinorim, tnovak, dhruba

Differential Revision: https://reviews.facebook.net/D40905
上级 5dc3e688
......@@ -43,6 +43,7 @@
* DB:Open() will fail if the compression specified in Options is not linked with the binary. If you see this failure, recompile RocksDB with compression libraries present on your system. Also, previously our default compression was snappy. This behavior is now changed. Now, the default compression is snappy only if it's available on the system. If it isn't we change the default to kNoCompression.
* We changed how we account for memory used in block cache. Previously, we only counted the sum of block sizes currently present in block cache. Now, we count the actual memory usage of the blocks. For example, a block of size 4.5KB will use 8KB memory with jemalloc. This might decrease your memory usage and possibly decrease performance. Increase block cache size if you see this happening after an upgrade.
* Add BackupEngineImpl.options_.max_background_operations to specify the maximum number of operations that may be performed in parallel. Add support for parallelized backup and restore.
* Add DB::SyncWAL() that does a WAL sync without blocking writers.
## 3.11.0 (5/19/2015)
### New Features
......
......@@ -612,11 +612,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
}
Status DBImpl::SyncLog(log::Writer* log) {
assert(log);
return log->file()->Sync(db_options_.use_fsync);
}
namespace {
bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
const JobContext::CandidateFileInfo& second) {
......@@ -1951,6 +1946,85 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
return FlushMemTable(cfh->cfd(), flush_options);
}
Status DBImpl::SyncWAL() {
autovector<log::Writer*, 1> logs_to_sync;
bool need_log_dir_sync;
uint64_t current_log_number;
{
InstrumentedMutexLock l(&mutex_);
assert(!logs_.empty());
// This SyncWAL() call only cares about logs up to this number.
current_log_number = logfile_number_;
while (logs_.front().number <= current_log_number &&
logs_.front().getting_synced) {
log_sync_cv_.Wait();
}
// First check that logs are safe to sync in background.
for (auto it = logs_.begin();
it != logs_.end() && it->number <= current_log_number; ++it) {
if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
return Status::NotSupported(
"SyncWAL() is not supported for this implementation of WAL file",
db_options_.allow_mmap_writes
? "try setting Options::allow_mmap_writes to false"
: Slice());
}
}
for (auto it = logs_.begin();
it != logs_.end() && it->number <= current_log_number; ++it) {
auto& log = *it;
assert(!log.getting_synced);
log.getting_synced = true;
logs_to_sync.push_back(log.writer.get());
}
need_log_dir_sync = !log_dir_synced_;
}
Status status;
for (log::Writer* log : logs_to_sync) {
status = log->file()->SyncWithoutFlush(db_options_.use_fsync);
if (!status.ok()) {
break;
}
}
if (status.ok() && need_log_dir_sync) {
status = directories_.GetWalDir()->Fsync();
}
{
InstrumentedMutexLock l(&mutex_);
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
}
return status;
}
void DBImpl::MarkLogsSynced(
uint64_t up_to, bool synced_dir, const Status& status) {
mutex_.AssertHeld();
if (synced_dir &&
logfile_number_ == up_to &&
status.ok()) {
log_dir_synced_ = true;
}
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& log = *it;
assert(log.getting_synced);
if (status.ok() && logs_.size() > 1) {
logs_to_free_.push_back(log.writer.release());
logs_.erase(it++);
} else {
log.getting_synced = false;
++it;
}
}
log_sync_cv_.SignalAll();
}
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
return versions_->LastSequence();
}
......@@ -3475,13 +3549,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t last_sequence = versions_->LastSequence();
WriteThread::Writer* last_writer = &w;
autovector<WriteBatch*> write_batch_group;
bool need_wal_sync = !write_options.disableWAL && write_options.sync;
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
if (status.ok()) {
last_batch_group_size_ =
write_thread_.BuildBatchGroup(&last_writer, &write_batch_group);
if (need_wal_sync) {
if (need_log_sync) {
while (logs_.front().getting_synced) {
log_sync_cv_.Wait();
}
......@@ -3543,7 +3618,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
log_empty_ = false;
log_size = log_entry.size();
RecordTick(stats_, WAL_FILE_BYTES, log_size);
if (status.ok() && need_wal_sync) {
if (status.ok() && need_log_sync) {
RecordTick(stats_, WAL_FILE_SYNCED);
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
......@@ -3554,18 +3629,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for (auto& log : logs_) {
status = SyncLog(log.writer.get());
status = log.writer->file()->Sync(db_options_.use_fsync);
if (!status.ok()) {
break;
}
}
if (status.ok() && !log_dir_synced_) {
if (status.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync();
}
log_dir_synced_ = true;
}
}
if (status.ok()) {
......@@ -3616,16 +3690,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.AssertHeld();
if (need_wal_sync) {
while (logs_.size() > 1) {
auto& log = logs_.front();
assert(log.getting_synced);
logs_to_free_.push_back(log.writer.release());
logs_.pop_front();
}
assert(logs_.back().getting_synced);
logs_.back().getting_synced = false;
log_sync_cv_.SignalAll();
if (need_log_sync) {
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
}
write_thread_.ExitWriteThread(&w, last_writer, status);
......@@ -3714,7 +3780,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_opt));
new_log = new log::Writer(std::move(file_writer));
log_dir_synced_ = false;
}
}
......@@ -3739,6 +3804,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
logfile_number_ = new_log_number;
assert(new_log != nullptr);
log_empty_ = true;
log_dir_synced_ = false;
logs_.emplace_back(logfile_number_, std::unique_ptr<log::Writer>(new_log));
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
......
......@@ -158,6 +158,7 @@ class DBImpl : public DB {
using DB::Flush;
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) override;
virtual Status SyncWAL() override;
virtual SequenceNumber GetLatestSequenceNumber() const override;
......@@ -308,8 +309,6 @@ class DBImpl : public DB {
// It is not necessary to hold the mutex when invoking this method.
void PurgeObsoleteFiles(const JobContext& background_contet);
Status SyncLog(log::Writer* log);
ColumnFamilyHandle* DefaultColumnFamily() const override;
const SnapshotList& snapshots() const { return snapshots_; }
......@@ -497,6 +496,9 @@ class DBImpl : public DB {
void AddToFlushQueue(ColumnFamilyData* cfd);
ColumnFamilyData* PopFirstFromFlushQueue();
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
......
......@@ -5172,6 +5172,10 @@ class ModelDB: public DB {
return ret;
}
virtual Status SyncWAL() override {
return Status::OK();
}
virtual Status DisableFileDeletions() override { return Status::OK(); }
virtual Status EnableFileDeletions(bool force) override {
return Status::OK();
......@@ -8418,8 +8422,16 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest, UnsupportedManualSync) {
DestroyAndReopen(CurrentOptions());
env_->is_wal_sync_thread_safe_.store(false);
Status s = db_->SyncWAL();
ASSERT_TRUE(s.IsNotSupported());
}
INSTANTIATE_TEST_CASE_P(DBTestWithParam, DBTestWithParam,
::testing::Values(1, 4));
} // namespace rocksdb
#endif
......
......@@ -142,6 +142,7 @@ class TestWritableFile : public WritableFile {
virtual Status Close() override;
virtual Status Flush() override;
virtual Status Sync() override;
virtual bool IsSyncThreadSafe() const override { return true; }
private:
FileState state_;
......@@ -891,6 +892,41 @@ TEST_P(FaultInjectionTest, UninstalledCompaction) {
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(FaultInjectionTest, ManualLogSyncTest) {
SleepingBackgroundTask sleeping_task_low;
env_->SetBackgroundThreads(1, Env::HIGH);
// Block the job queue to prevent flush job from running.
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::HIGH);
WriteOptions write_options;
write_options.sync = false;
std::string key_space, value_space;
ASSERT_OK(
db_->Put(write_options, Key(1, &key_space), Value(1, &value_space)));
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(db_->Flush(flush_options));
ASSERT_OK(
db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
ASSERT_OK(db_->SyncWAL());
env_->SetFilesystemActive(false);
NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
sleeping_task_low.WakeUp();
ASSERT_OK(OpenDB());
std::string val;
Value(2, &value_space);
ASSERT_OK(ReadValue(2, &val));
ASSERT_EQ(value_space, val);
Value(1, &value_space);
ASSERT_OK(ReadValue(1, &val));
ASSERT_EQ(value_space, val);
}
INSTANTIATE_TEST_CASE_P(FaultTest, FaultInjectionTest, ::testing::Bool());
} // namespace rocksdb
......
......@@ -545,6 +545,12 @@ class DB {
return Flush(options, DefaultColumnFamily());
}
// Sync the wal. Note that Write() followed by SyncWAL() is not exactly the
// same as Write() with sync=true: in the latter case the changes won't be
// visible until the sync is done.
// Currently only works if allow_mmap_writes = false in Options.
virtual Status SyncWAL() = 0;
// The sequence number of the most recent transaction.
virtual SequenceNumber GetLatestSequenceNumber() const = 0;
......
......@@ -461,6 +461,12 @@ class WritableFile {
return Sync();
}
// true if Sync() and Fsync() are safe to call concurrently with Append()
// and Flush().
virtual bool IsSyncThreadSafe() const {
return false;
}
/*
* Change the priority in rate limiter if rate limiting is enabled.
* If rate limiting is not enabled, this call has no effect.
......@@ -616,7 +622,7 @@ class RandomRWFile {
class Directory {
public:
virtual ~Directory() {}
// Fsync directory
// Fsync directory. Can be called concurrently from multiple threads.
virtual Status Fsync() = 0;
};
......@@ -894,6 +900,7 @@ class WritableFileWrapper : public WritableFile {
Status Flush() override { return target_->Flush(); }
Status Sync() override { return target_->Sync(); }
Status Fsync() override { return target_->Fsync(); }
bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
void SetIOPriority(Env::IOPriority pri) override {
target_->SetIOPriority(pri);
}
......
......@@ -975,7 +975,9 @@ struct DBOptions {
// Allow the OS to mmap file for reading sst tables. Default: false
bool allow_mmap_reads;
// Allow the OS to mmap file for writing. Default: false
// Allow the OS to mmap file for writing.
// DB::SyncWAL() only works if this is set to false.
// Default: false
bool allow_mmap_writes;
// Disable child process inherit open files. Default: true
......
......@@ -192,6 +192,10 @@ class StackableDB : public DB {
return db_->Flush(fopts, column_family);
}
virtual Status SyncWAL() override {
return db_->SyncWAL();
}
#ifndef ROCKSDB_LITE
virtual Status DisableFileDeletions() override {
......
......@@ -223,6 +223,9 @@ class SpecialEnv : public EnvWrapper {
++env_->sync_counter_;
return base_->Sync();
}
bool IsSyncThreadSafe() const override {
return env_->is_wal_sync_thread_safe_.load();
}
private:
SpecialEnv* env_;
......@@ -389,6 +392,8 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int64_t> addon_time_;
bool no_sleep_;
std::atomic<bool> is_wal_sync_thread_safe_ {true};
};
class DBTestBase : public testing::Test {
......
......@@ -679,6 +679,10 @@ class PosixWritableFile : public WritableFile {
return Status::OK();
}
virtual bool IsSyncThreadSafe() const override {
return true;
}
virtual uint64_t GetFileSize() override { return filesize_; }
virtual Status InvalidateCache(size_t offset, size_t length) override {
......
......@@ -154,11 +154,7 @@ Status WritableFileWriter::Sync(bool use_fsync) {
}
TEST_KILL_RANDOM(rocksdb_kill_odds);
if (pending_sync_) {
if (use_fsync) {
s = writable_file_->Fsync();
} else {
s = writable_file_->Sync();
}
s = SyncInternal(use_fsync);
if (!s.ok()) {
return s;
}
......@@ -171,6 +167,25 @@ Status WritableFileWriter::Sync(bool use_fsync) {
return Status::OK();
}
Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (!writable_file_->IsSyncThreadSafe()) {
return Status::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
"WritableFile::IsSyncThreadSafe() is false");
}
return SyncInternal(use_fsync);
}
Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status s;
if (use_fsync) {
s = writable_file_->Fsync();
} else {
s = writable_file_->Sync();
}
return s;
}
Status WritableFileWriter::RangeSync(off_t offset, off_t nbytes) {
IOSTATS_TIMER_GUARD(range_sync_nanos);
return writable_file_->RangeSync(offset, nbytes);
......
......@@ -74,6 +74,11 @@ class WritableFileWriter {
Status Sync(bool use_fsync);
// Sync only the data that was already Flush()ed. Safe to call concurrently
// with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
// returns NotSupported status.
Status SyncWithoutFlush(bool use_fsync);
uint64_t GetFileSize() { return filesize_; }
Status InvalidateCache(size_t offset, size_t length) {
......@@ -85,6 +90,7 @@ class WritableFileWriter {
private:
Status RangeSync(off_t offset, off_t nbytes);
size_t RequestToken(size_t bytes);
Status SyncInternal(bool use_fsync);
};
class RandomRWFileAccessor {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册