diff --git a/db/db_impl.cc b/db/db_impl.cc index 3227d6d88b62020ef5aa1ac8d3ec4c4fae1f637d..c2e853c885ee15c2d324e9819bb8179add2731a8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -548,6 +548,11 @@ int DBImpl::Level0StopWriteTrigger() { return options_.level0_stop_writes_trigger; } +Status DBImpl::Flush(const FlushOptions& options) { + Status status = FlushMemTable(options); + return status; +} + void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { assert(level >= 0); @@ -582,27 +587,36 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { } } -Status DBImpl::TEST_CompactMemTable() { +Status DBImpl::FlushMemTable(const FlushOptions& options) { // NULL batch means just wait for earlier writes to be done Status s = Write(WriteOptions(), NULL); - if (s.ok()) { + if (s.ok() && options.wait) { // Wait until the compaction completes - s = TEST_WaitForCompactMemTable(); + s = WaitForCompactMemTable(); } return s; } +Status DBImpl::WaitForCompactMemTable() { + Status s; + // Wait until the compaction completes + MutexLock l(&mutex_); + while (imm_ != NULL && bg_error_.ok()) { + bg_cv_.Wait(); + } + if (imm_ != NULL) { + s = bg_error_; + } + return s; +} + + +Status DBImpl::TEST_CompactMemTable() { + return FlushMemTable(FlushOptions()); +} + Status DBImpl::TEST_WaitForCompactMemTable() { - Status s; - // Wait until the compaction completes - MutexLock l(&mutex_); - while (imm_ != NULL && bg_error_.ok()) { - bg_cv_.Wait(); - } - if (imm_ != NULL) { - s = bg_error_; - } - return s; + return WaitForCompactMemTable(); } Status DBImpl::TEST_WaitForCompact() { diff --git a/db/db_impl.h b/db/db_impl.h index 525ed706f88ccf6a2b04e00c6435032b04a96813..9ff82caf57e51c0ee61b88a271447e22cef26836 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -43,6 +43,7 @@ class DBImpl : public DB { virtual int NumberLevels(); virtual int MaxMemCompactionLevel(); virtual int Level0StopWriteTrigger(); + virtual Status Flush(const FlushOptions& options); // Extra methods (for testing) that are not in the public DB interface @@ -100,6 +101,12 @@ class DBImpl : public DB { Status MakeRoomForWrite(bool force /* compact even if there is room? */); WriteBatch* BuildBatchGroup(Writer** last_writer); + // Force current memtable contents to be flushed. + Status FlushMemTable(const FlushOptions& options); + + // Wait for memtable compaction + Status WaitForCompactMemTable(); + void MaybeScheduleCompaction(); static void BGWork(void* db); void BackgroundCall(); diff --git a/db/db_test.cc b/db/db_test.cc index 5e6c809e1307fdf9d8d34711c06b7f0a529daf72..105a8879b9504aeb3423504a7e977746d290ac94 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -784,11 +784,11 @@ TEST(DBTest, WAL) { WriteOptions writeOpt = WriteOptions(); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); - ASSERT_OK(dbfull()->Put(writeOpt, "baz", "v1")); + ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1")); Reopen(); ASSERT_EQ("NOT_FOUND", Get("foo")); - ASSERT_EQ("NOT_FOUND", Get("baz")); + ASSERT_EQ("NOT_FOUND", Get("bar")); writeOpt.disableWAL = false; ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2")); @@ -812,6 +812,40 @@ TEST(DBTest, WAL) { ASSERT_EQ("v3", Get("foo")); } +TEST(DBTest, FLUSH) { + Options options = CurrentOptions(); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); + // this will not flush the last 2 writes + dbfull()->Flush(FlushOptions()); + ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1")); + + Reopen(); + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("NOT_FOUND", Get("bar")); + + writeOpt.disableWAL = true; + ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v2")); + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v2")); + dbfull()->Flush(FlushOptions()); + + Reopen(); + ASSERT_EQ("v2", Get("bar")); + ASSERT_EQ("v2", Get("foo")); + + writeOpt.disableWAL = false; + ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v3")); + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v3")); + dbfull()->Flush(FlushOptions()); + + Reopen(); + // 'foo' should be there because its put + // has WAL enabled. + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v3", Get("bar")); +} + TEST(DBTest, RecoveryWithEmptyLog) { do { ASSERT_OK(Put("foo", "v1")); @@ -1758,6 +1792,11 @@ class ModelDB: public DB { return -1; } + virtual Status Flush(const leveldb::FlushOptions& options) { + Status ret; + return ret; + } + private: class ModelIter: public Iterator { public: diff --git a/include/leveldb/db.h b/include/leveldb/db.h index c2fd5532e710b0846845ee520d22e06dfa3aff18..2764fcfa7961fcb104c382decda4f41668ef27d7 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -19,6 +19,7 @@ static const int kMinorVersion = 4; struct Options; struct ReadOptions; struct WriteOptions; +struct FlushOptions; class WriteBatch; // Abstract handle to particular state of a DB. @@ -150,6 +151,9 @@ class DB { // Number of files in level-0 that would stop writes. virtual int Level0StopWriteTrigger() = 0; + // Flush all mem-table data. + virtual Status Flush(const FlushOptions& options) = 0; + private: // No copying allowed DB(const DB&); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 5fa66f4326b6f6e1b09b45db78630a8676630f18..41c356ffbc5be100263c40442fd2f5680b73986f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -254,6 +254,17 @@ struct WriteOptions { } }; +// Options that control flush operations +struct FlushOptions { + // If true, the flush will wait until the flush is done. + // Default: true + bool wait; + + FlushOptions() + : wait(true) { + } +}; + } // namespace leveldb #endif // STORAGE_LEVELDB_INCLUDE_OPTIONS_H_