提交 72630236 编写于 作者: G gabor@google.com

Bugfixes: for Get(), don't hold mutex while writing log.

- Fix bug in Get: when it triggers a compaction, it could sometimes
  mark the compaction with the wrong level (if there was a gap
  in the set of levels examined for the Get).

- Do not hold mutex while writing to the log file or to the
  MANIFEST file.

  Added a new benchmark that runs a writer thread concurrently with
  reader threads.

  Percentiles
  ------------------------------
  micros/op: avg  median 99   99.9  99.99  99.999 max
  ------------------------------------------------------
  before:    42   38     110  225   32000  42000  48000
  after:     24   20     55   65    130    1100   7000

- Fixed race in optimized Get.  It should have been using the
  pinned memtables, not the current memtables.



git-svn-id: https://leveldb.googlecode.com/svn/trunk@50 62dab493-f737-651d-591e-8d6aee1b9529
上级 e3584f9c
...@@ -280,6 +280,7 @@ struct ThreadState { ...@@ -280,6 +280,7 @@ struct ThreadState {
int tid; // 0..n-1 when running in n threads int tid; // 0..n-1 when running in n threads
Random rand; // Has different seeds for different threads Random rand; // Has different seeds for different threads
Stats stats; Stats stats;
SharedState* shared;
ThreadState(int index) ThreadState(int index)
: tid(index), : tid(index),
...@@ -418,13 +419,14 @@ class Benchmark { ...@@ -418,13 +419,14 @@ class Benchmark {
// Reset parameters that may be overriddden bwlow // Reset parameters that may be overriddden bwlow
num_ = FLAGS_num; num_ = FLAGS_num;
reads_ = num_; reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
value_size_ = FLAGS_value_size; value_size_ = FLAGS_value_size;
entries_per_batch_ = 1; entries_per_batch_ = 1;
write_options_ = WriteOptions(); write_options_ = WriteOptions();
void (Benchmark::*method)(ThreadState*) = NULL; void (Benchmark::*method)(ThreadState*) = NULL;
bool fresh_db = false; bool fresh_db = false;
int num_threads = FLAGS_threads;
if (name == Slice("fillseq")) { if (name == Slice("fillseq")) {
fresh_db = true; fresh_db = true;
...@@ -460,6 +462,9 @@ class Benchmark { ...@@ -460,6 +462,9 @@ class Benchmark {
} else if (name == Slice("readrandomsmall")) { } else if (name == Slice("readrandomsmall")) {
reads_ /= 1000; reads_ /= 1000;
method = &Benchmark::ReadRandom; method = &Benchmark::ReadRandom;
} else if (name == Slice("readwhilewriting")) {
num_threads++; // Add extra thread for writing
method = &Benchmark::ReadWhileWriting;
} else if (name == Slice("compact")) { } else if (name == Slice("compact")) {
method = &Benchmark::Compact; method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) { } else if (name == Slice("crc32c")) {
...@@ -494,7 +499,7 @@ class Benchmark { ...@@ -494,7 +499,7 @@ class Benchmark {
} }
if (method != NULL) { if (method != NULL) {
RunBenchmark(name, method); RunBenchmark(num_threads, name, method);
} }
} }
} }
...@@ -535,8 +540,8 @@ class Benchmark { ...@@ -535,8 +540,8 @@ class Benchmark {
} }
} }
void RunBenchmark(Slice name, void (Benchmark::*method)(ThreadState*)) { void RunBenchmark(int n, Slice name,
const int n = FLAGS_threads; void (Benchmark::*method)(ThreadState*)) {
SharedState shared; SharedState shared;
shared.total = n; shared.total = n;
shared.num_initialized = 0; shared.num_initialized = 0;
...@@ -549,6 +554,7 @@ class Benchmark { ...@@ -549,6 +554,7 @@ class Benchmark {
arg[i].method = method; arg[i].method = method;
arg[i].shared = &shared; arg[i].shared = &shared;
arg[i].thread = new ThreadState(i); arg[i].thread = new ThreadState(i);
arg[i].thread->shared = &shared;
Env::Default()->StartThread(ThreadBody, &arg[i]); Env::Default()->StartThread(ThreadBody, &arg[i]);
} }
...@@ -688,7 +694,6 @@ class Benchmark { ...@@ -688,7 +694,6 @@ class Benchmark {
RandomGenerator gen; RandomGenerator gen;
WriteBatch batch; WriteBatch batch;
Status s; Status s;
std::string val;
int64_t bytes = 0; int64_t bytes = 0;
for (int i = 0; i < num_; i += entries_per_batch_) { for (int i = 0; i < num_; i += entries_per_batch_) {
batch.Clear(); batch.Clear();
...@@ -760,6 +765,36 @@ class Benchmark { ...@@ -760,6 +765,36 @@ class Benchmark {
} }
} }
void ReadWhileWriting(ThreadState* thread) {
if (thread->tid > 0) {
ReadRandom(thread);
} else {
// Special thread that keeps writing until other threads are done.
RandomGenerator gen;
while (true) {
{
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
// Other threads have finished
break;
}
}
const int k = thread->rand.Next() % FLAGS_num;
char key[100];
snprintf(key, sizeof(key), "%016d", k);
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
}
}
void Compact(ThreadState* thread) { void Compact(ThreadState* thread) {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
......
...@@ -113,6 +113,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) ...@@ -113,6 +113,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL), logfile_(NULL),
logfile_number_(0), logfile_number_(0),
log_(NULL), log_(NULL),
logger_(NULL),
logger_cv_(&mutex_),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
manual_compaction_(NULL) { manual_compaction_(NULL) {
mem_->Ref(); mem_->Ref();
...@@ -308,6 +310,11 @@ Status DBImpl::Recover(VersionEdit* edit) { ...@@ -308,6 +310,11 @@ Status DBImpl::Recover(VersionEdit* edit) {
std::sort(logs.begin(), logs.end()); std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) { for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence); s = RecoverLogFile(logs[i], edit, &max_sequence);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
} }
if (s.ok()) { if (s.ok()) {
...@@ -485,7 +492,7 @@ Status DBImpl::CompactMemTable() { ...@@ -485,7 +492,7 @@ Status DBImpl::CompactMemTable() {
if (s.ok()) { if (s.ok()) {
edit.SetPrevLogNumber(0); edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit); s = versions_->LogAndApply(&edit, &mutex_);
} }
if (s.ok()) { if (s.ok()) {
...@@ -523,7 +530,10 @@ void DBImpl::TEST_CompactRange( ...@@ -523,7 +530,10 @@ void DBImpl::TEST_CompactRange(
Status DBImpl::TEST_CompactMemTable() { Status DBImpl::TEST_CompactMemTable() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
LoggerId self;
AcquireLoggingResponsibility(&self);
Status s = MakeRoomForWrite(true /* force compaction */); Status s = MakeRoomForWrite(true /* force compaction */);
ReleaseLoggingResponsibility(&self);
if (s.ok()) { if (s.ok()) {
// Wait until the compaction completes // Wait until the compaction completes
while (imm_ != NULL && bg_error_.ok()) { while (imm_ != NULL && bg_error_.ok()) {
...@@ -600,7 +610,7 @@ void DBImpl::BackgroundCompaction() { ...@@ -600,7 +610,7 @@ void DBImpl::BackgroundCompaction() {
c->edit()->DeleteFile(c->level(), f->number); c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest); f->smallest, f->largest);
status = versions_->LogAndApply(c->edit()); status = versions_->LogAndApply(c->edit(), &mutex_);
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), static_cast<unsigned long long>(f->number),
...@@ -748,7 +758,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { ...@@ -748,7 +758,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
} }
compact->outputs.clear(); compact->outputs.clear();
Status s = versions_->LogAndApply(compact->compaction->edit()); Status s = versions_->LogAndApply(compact->compaction->edit(), &mutex_);
if (s.ok()) { if (s.ok()) {
compact->compaction->ReleaseInputs(); compact->compaction->ReleaseInputs();
DeleteObsoleteFiles(); DeleteObsoleteFiles();
...@@ -1004,9 +1014,9 @@ Status DBImpl::Get(const ReadOptions& options, ...@@ -1004,9 +1014,9 @@ Status DBImpl::Get(const ReadOptions& options,
mutex_.Unlock(); mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any). // First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);
if (mem_->Get(lkey, value, &s)) { if (mem->Get(lkey, value, &s)) {
// Done // Done
} else if (imm_ != NULL && imm_->Get(lkey, value, &s)) { } else if (imm != NULL && imm->Get(lkey, value, &s)) {
// Done // Done
} else { } else {
s = current->Get(options, lkey, value, &stats); s = current->Get(options, lkey, value, &stats);
...@@ -1053,34 +1063,65 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { ...@@ -1053,34 +1063,65 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key); return DB::Delete(options, key);
} }
// There is at most one thread that is the current logger. This call
// waits until preceding logger(s) have finished and becomes the
// current logger.
void DBImpl::AcquireLoggingResponsibility(LoggerId* self) {
while (logger_ != NULL) {
logger_cv_.Wait();
}
logger_ = self;
}
void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
assert(logger_ == self);
logger_ = NULL;
logger_cv_.SignalAll();
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status; Status status;
MutexLock l(&mutex_); MutexLock l(&mutex_);
LoggerId self;
AcquireLoggingResponsibility(&self);
status = MakeRoomForWrite(false); // May temporarily release lock and wait status = MakeRoomForWrite(false); // May temporarily release lock and wait
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) { if (status.ok()) {
WriteBatchInternal::SetSequence(updates, last_sequence + 1); WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates); last_sequence += WriteBatchInternal::Count(updates);
versions_->SetLastSequence(last_sequence);
// Add to log and apply to memtable // Add to log and apply to memtable. We can release the lock during
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); // this phase since the "logger_" flag protects against concurrent
if (status.ok() && options.sync) { // loggers and concurrent writes into mem_.
status = logfile_->Sync(); {
} assert(logger_ == &self);
if (status.ok()) { mutex_.Unlock();
status = WriteBatchInternal::InsertInto(updates, mem_); status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) {
status = logfile_->Sync();
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
assert(logger_ == &self);
} }
versions_->SetLastSequence(last_sequence);
} }
if (options.post_write_snapshot != NULL) { if (options.post_write_snapshot != NULL) {
*options.post_write_snapshot = *options.post_write_snapshot =
status.ok() ? snapshots_.New(last_sequence) : NULL; status.ok() ? snapshots_.New(last_sequence) : NULL;
} }
ReleaseLoggingResponsibility(&self);
return status; return status;
} }
// REQUIRES: mutex_ is held
// REQUIRES: this thread is the current logger
Status DBImpl::MakeRoomForWrite(bool force) { Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(logger_ != NULL);
bool allow_delay = !force; bool allow_delay = !force;
Status s; Status s;
while (true) { while (true) {
...@@ -1249,7 +1290,7 @@ Status DB::Open(const Options& options, const std::string& dbname, ...@@ -1249,7 +1290,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
impl->logfile_ = lfile; impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile); impl->log_ = new log::Writer(lfile);
s = impl->versions_->LogAndApply(&edit); s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
} }
if (s.ok()) { if (s.ok()) {
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
......
...@@ -87,6 +87,11 @@ class DBImpl : public DB { ...@@ -87,6 +87,11 @@ class DBImpl : public DB {
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base); Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
// Only thread is allowed to log at a time.
struct LoggerId { }; // Opaque identifier for logging thread
void AcquireLoggingResponsibility(LoggerId* self);
void ReleaseLoggingResponsibility(LoggerId* self);
Status MakeRoomForWrite(bool force /* compact even if there is room? */); Status MakeRoomForWrite(bool force /* compact even if there is room? */);
struct CompactionState; struct CompactionState;
...@@ -126,6 +131,8 @@ class DBImpl : public DB { ...@@ -126,6 +131,8 @@ class DBImpl : public DB {
WritableFile* logfile_; WritableFile* logfile_;
uint64_t logfile_number_; uint64_t logfile_number_;
log::Writer* log_; log::Writer* log_;
LoggerId* logger_; // NULL, or the id of the current logging thread
port::CondVar logger_cv_; // For threads waiting to log
SnapshotList snapshots_; SnapshotList snapshots_;
// Set of table files to protect from deletion because they are // Set of table files to protect from deletion because they are
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/table.h" #include "leveldb/table.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
...@@ -345,6 +346,41 @@ TEST(DBTest, GetPicksCorrectFile) { ...@@ -345,6 +346,41 @@ TEST(DBTest, GetPicksCorrectFile) {
ASSERT_EQ("vx", Get("x")); ASSERT_EQ("vx", Get("x"));
} }
TEST(DBTest, GetEncountersEmptyLevel) {
// Arrange for the following to happen:
// * sstable A in level 0
// * nothing in level 1
// * sstable B in level 2
// Then do enough Get() calls to arrange for an automatic compaction
// of sstable A. A bug would cause the compaction to be marked as
// occuring at level 1 (instead of the correct level 0).
// Step 1: First place sstables in levels 0 and 2
int compaction_count = 0;
while (NumTableFilesAtLevel(0) == 0 ||
NumTableFilesAtLevel(2) == 0) {
ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2";
compaction_count++;
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
}
// Step 2: clear level 1 if necessary.
dbfull()->TEST_CompactRange(1, "a", "z");
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
// Step 3: read until level 0 compaction disappears.
int read_count = 0;
while (NumTableFilesAtLevel(0) > 0) {
ASSERT_LE(read_count, 10000) << "did not trigger level 0 compaction";
read_count++;
ASSERT_EQ("NOT_FOUND", Get("missing"));
}
}
TEST(DBTest, IterEmpty) { TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions()); Iterator* iter = db_->NewIterator(ReadOptions());
...@@ -1355,6 +1391,9 @@ void BM_LogAndApply(int iters, int num_base_files) { ...@@ -1355,6 +1391,9 @@ void BM_LogAndApply(int iters, int num_base_files) {
Env* env = Env::Default(); Env* env = Env::Default();
port::Mutex mu;
MutexLock l(&mu);
InternalKeyComparator cmp(BytewiseComparator()); InternalKeyComparator cmp(BytewiseComparator());
Options options; Options options;
VersionSet vset(dbname, &options, NULL, &cmp); VersionSet vset(dbname, &options, NULL, &cmp);
...@@ -1366,7 +1405,7 @@ void BM_LogAndApply(int iters, int num_base_files) { ...@@ -1366,7 +1405,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); vbase.AddFile(2, fnum++, 1 /* file size */, start, limit);
} }
ASSERT_OK(vset.LogAndApply(&vbase)); ASSERT_OK(vset.LogAndApply(&vbase, &mu));
uint64_t start_micros = env->NowMicros(); uint64_t start_micros = env->NowMicros();
...@@ -1376,7 +1415,7 @@ void BM_LogAndApply(int iters, int num_base_files) { ...@@ -1376,7 +1415,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); vedit.AddFile(2, fnum++, 1 /* file size */, start, limit);
vset.LogAndApply(&vedit); vset.LogAndApply(&vedit, &mu);
} }
uint64_t stop_micros = env->NowMicros(); uint64_t stop_micros = env->NowMicros();
unsigned int us = stop_micros - start_micros; unsigned int us = stop_micros - start_micros;
......
...@@ -250,6 +250,7 @@ Status Version::Get(const ReadOptions& options, ...@@ -250,6 +250,7 @@ Status Version::Get(const ReadOptions& options,
stats->seek_file = NULL; stats->seek_file = NULL;
stats->seek_file_level = -1; stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL; FileMetaData* last_file_read = NULL;
int last_file_read_level = -1;
// We can search level-by-level since entries never hop across // We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data // levels. Therefore we are guaranteed that if we find data
...@@ -301,11 +302,12 @@ Status Version::Get(const ReadOptions& options, ...@@ -301,11 +302,12 @@ Status Version::Get(const ReadOptions& options,
if (last_file_read != NULL && stats->seek_file == NULL) { if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file. // We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read; stats->seek_file = last_file_read;
stats->seek_file_level = (i == 0 ? level - 1 : level); stats->seek_file_level = last_file_read_level;
} }
FileMetaData* f = files[i]; FileMetaData* f = files[i];
last_file_read = f; last_file_read = f;
last_file_read_level = level;
Iterator* iter = vset_->table_cache_->NewIterator( Iterator* iter = vset_->table_cache_->NewIterator(
options, options,
...@@ -609,7 +611,7 @@ void VersionSet::AppendVersion(Version* v) { ...@@ -609,7 +611,7 @@ void VersionSet::AppendVersion(Version* v) {
v->next_->prev_ = v; v->next_->prev_ = v;
} }
Status VersionSet::LogAndApply(VersionEdit* edit) { Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) { if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_); assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_); assert(edit->log_number_ < next_file_number_);
...@@ -637,6 +639,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit) { ...@@ -637,6 +639,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit) {
std::string new_manifest_file; std::string new_manifest_file;
Status s; Status s;
if (descriptor_log_ == NULL) { if (descriptor_log_ == NULL) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == NULL); assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_); new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_); edit->SetNextFile(next_file_number_);
...@@ -647,20 +651,27 @@ Status VersionSet::LogAndApply(VersionEdit* edit) { ...@@ -647,20 +651,27 @@ Status VersionSet::LogAndApply(VersionEdit* edit) {
} }
} }
// Write new record to MANIFEST log // Unlock during expensive MANIFEST log write
if (s.ok()) { {
std::string record; mu->Unlock();
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record); // Write new record to MANIFEST log
if (s.ok()) { if (s.ok()) {
s = descriptor_file_->Sync(); std::string record;
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
} }
}
// If we just created a new descriptor file, install it by writing a // If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it. // new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) { if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_); s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}
mu->Lock();
} }
// Install the new version // Install the new version
...@@ -776,6 +787,9 @@ Status VersionSet::Recover() { ...@@ -776,6 +787,9 @@ Status VersionSet::Recover() {
if (!have_prev_log_number) { if (!have_prev_log_number) {
prev_log_number = 0; prev_log_number = 0;
} }
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
} }
if (s.ok()) { if (s.ok()) {
...@@ -794,6 +808,12 @@ Status VersionSet::Recover() { ...@@ -794,6 +808,12 @@ Status VersionSet::Recover() {
return s; return s;
} }
void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) {
next_file_number_ = number + 1;
}
}
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) { static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0; int64_t sum = 0;
for (size_t i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {
......
...@@ -138,15 +138,14 @@ class VersionSet { ...@@ -138,15 +138,14 @@ class VersionSet {
// Apply *edit to the current version to form a new descriptor that // Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new // is both saved to persistent state and installed as the new
// current version. // current version. Will release *mu while actually writing to the file.
Status LogAndApply(VersionEdit* edit); // REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu);
// Recover the last saved descriptor from persistent storage. // Recover the last saved descriptor from persistent storage.
Status Recover(); Status Recover();
// Save current contents to *log
Status WriteSnapshot(log::Writer* log);
// Return the current version. // Return the current version.
Version* current() const { return current_; } Version* current() const { return current_; }
...@@ -171,6 +170,9 @@ class VersionSet { ...@@ -171,6 +170,9 @@ class VersionSet {
last_sequence_ = s; last_sequence_ = s;
} }
// Mark the specified file number as used.
void MarkFileNumberUsed(uint64_t number);
// Return the current log file number. // Return the current log file number.
uint64_t LogNumber() const { return log_number_; } uint64_t LogNumber() const { return log_number_; }
...@@ -247,6 +249,9 @@ class VersionSet { ...@@ -247,6 +249,9 @@ class VersionSet {
void SetupOtherInputs(Compaction* c); void SetupOtherInputs(Compaction* c);
// Save current contents to *log
Status WriteSnapshot(log::Writer* log);
void AppendVersion(Version* v); void AppendVersion(Version* v);
Env* const env_; Env* const env_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册