version_set.cc 32.3 KB
Newer Older
J
jorlow@chromium.org 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/version_set.h"

#include <algorithm>
#include <stdio.h>
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
14 15
#include "leveldb/env.h"
#include "leveldb/table_builder.h"
J
jorlow@chromium.org 已提交
16 17 18 19 20 21 22
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"

namespace leveldb {

J
jorlow@chromium.org 已提交
23 24 25
static const int kTargetFileSize = 2 * 1048576;

// Maximum bytes of overlaps in grandparent (i.e., level+2) before we
26
// stop building a single file in a level->level+1 compaction.
J
jorlow@chromium.org 已提交
27
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
28

J
jorlow@chromium.org 已提交
29
static double MaxBytesForLevel(int level) {
30 31 32 33 34 35
  // Note: the result for level zero is not really used since we set
  // the level-0 compaction threshold based on number of files.
  double result = 10 * 1048576.0;  // Result for both level-0 and level-1
  while (level > 1) {
    result *= 10;
    level--;
J
jorlow@chromium.org 已提交
36
  }
37
  return result;
J
jorlow@chromium.org 已提交
38 39 40
}

static uint64_t MaxFileSizeForLevel(int level) {
J
jorlow@chromium.org 已提交
41
  return kTargetFileSize;  // We could vary per level to reduce number of files?
J
jorlow@chromium.org 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
}

namespace {
std::string IntSetToString(const std::set<uint64_t>& s) {
  std::string result = "{";
  for (std::set<uint64_t>::const_iterator it = s.begin();
       it != s.end();
       ++it) {
    result += (result.size() > 1) ? "," : "";
    result += NumberToString(*it);
  }
  result += "}";
  return result;
}
}

Version::~Version() {
  assert(refs_ == 0);
60 61 62 63 64 65

  // Remove from linked list
  prev_->next_ = next_;
  next_->prev_ = prev_;

  // Drop references to files
J
jorlow@chromium.org 已提交
66
  for (int level = 0; level < config::kNumLevels; level++) {
D
dgrogan@chromium.org 已提交
67
    for (size_t i = 0; i < files_[level].size(); i++) {
J
jorlow@chromium.org 已提交
68
      FileMetaData* f = files_[level][i];
69
      assert(f->refs > 0);
J
jorlow@chromium.org 已提交
70 71 72 73 74 75 76 77 78 79 80
      f->refs--;
      if (f->refs <= 0) {
        delete f;
      }
    }
  }
}

// An internal iterator.  For a given version/level pair, yields
// information about the files in the level.  For a given entry, key()
// is the largest key that occurs in the file, and value() is an
J
jorlow@chromium.org 已提交
81 82
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
J
jorlow@chromium.org 已提交
83 84
class Version::LevelFileNumIterator : public Iterator {
 public:
85
  LevelFileNumIterator(const InternalKeyComparator& icmp,
J
jorlow@chromium.org 已提交
86
                       const std::vector<FileMetaData*>* flist)
87
      : icmp_(icmp),
J
jorlow@chromium.org 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
        flist_(flist),
        index_(flist->size()) {        // Marks as invalid
  }
  virtual bool Valid() const {
    return index_ < flist_->size();
  }
  virtual void Seek(const Slice& target) {
    uint32_t left = 0;
    uint32_t right = flist_->size() - 1;
    while (left < right) {
      uint32_t mid = (left + right) / 2;
      int cmp = icmp_.Compare((*flist_)[mid]->largest.Encode(), target);
      if (cmp < 0) {
        // Key at "mid.largest" is < than "target".  Therefore all
        // files at or before "mid" are uninteresting.
        left = mid + 1;
      } else {
        // Key at "mid.largest" is >= "target".  Therefore all files
        // after "mid" are uninteresting.
        right = mid;
      }
    }
    index_ = left;
  }
  virtual void SeekToFirst() { index_ = 0; }
  virtual void SeekToLast() {
    index_ = flist_->empty() ? 0 : flist_->size() - 1;
  }
  virtual void Next() {
    assert(Valid());
    index_++;
  }
  virtual void Prev() {
    assert(Valid());
    if (index_ == 0) {
      index_ = flist_->size();  // Marks as invalid
    } else {
      index_--;
    }
  }
  Slice key() const {
    assert(Valid());
    return (*flist_)[index_]->largest.Encode();
  }
  Slice value() const {
    assert(Valid());
    EncodeFixed64(value_buf_, (*flist_)[index_]->number);
J
jorlow@chromium.org 已提交
135
    EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size);
J
jorlow@chromium.org 已提交
136 137 138 139 140 141
    return Slice(value_buf_, sizeof(value_buf_));
  }
  virtual Status status() const { return Status::OK(); }
 private:
  const InternalKeyComparator icmp_;
  const std::vector<FileMetaData*>* const flist_;
D
dgrogan@chromium.org 已提交
142
  uint32_t index_;
J
jorlow@chromium.org 已提交
143

J
jorlow@chromium.org 已提交
144 145
  // Backing store for value().  Holds the file number and size.
  mutable char value_buf_[16];
J
jorlow@chromium.org 已提交
146 147 148 149 150 151
};

static Iterator* GetFileIterator(void* arg,
                                 const ReadOptions& options,
                                 const Slice& file_value) {
  TableCache* cache = reinterpret_cast<TableCache*>(arg);
J
jorlow@chromium.org 已提交
152
  if (file_value.size() != 16) {
J
jorlow@chromium.org 已提交
153 154 155
    return NewErrorIterator(
        Status::Corruption("FileReader invoked with unexpected value"));
  } else {
J
jorlow@chromium.org 已提交
156 157 158
    return cache->NewIterator(options,
                              DecodeFixed64(file_value.data()),
                              DecodeFixed64(file_value.data() + 8));
J
jorlow@chromium.org 已提交
159 160 161 162 163 164
  }
}

Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
                                            int level) const {
  return NewTwoLevelIterator(
165
      new LevelFileNumIterator(vset_->icmp_, &files_[level]),
J
jorlow@chromium.org 已提交
166 167 168 169 170 171
      &GetFileIterator, vset_->table_cache_, options);
}

void Version::AddIterators(const ReadOptions& options,
                           std::vector<Iterator*>* iters) {
  // Merge all level zero files together since they may overlap
D
dgrogan@chromium.org 已提交
172
  for (size_t i = 0; i < files_[0].size(); i++) {
J
jorlow@chromium.org 已提交
173
    iters->push_back(
J
jorlow@chromium.org 已提交
174 175
        vset_->table_cache_->NewIterator(
            options, files_[0][i]->number, files_[0][i]->file_size));
J
jorlow@chromium.org 已提交
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
  }

  // For levels > 0, we can use a concatenating iterator that sequentially
  // walks through the non-overlapping files in the level, opening them
  // lazily.
  for (int level = 1; level < config::kNumLevels; level++) {
    if (!files_[level].empty()) {
      iters->push_back(NewConcatenatingIterator(options, level));
    }
  }
}

void Version::Ref() {
  ++refs_;
}

void Version::Unref() {
193
  assert(this != &vset_->dummy_versions_);
J
jorlow@chromium.org 已提交
194 195 196
  assert(refs_ >= 1);
  --refs_;
  if (refs_ == 0) {
197
    delete this;
J
jorlow@chromium.org 已提交
198 199 200 201 202 203 204 205 206 207 208
  }
}

std::string Version::DebugString() const {
  std::string r;
  for (int level = 0; level < config::kNumLevels; level++) {
    // E.g., level 1: 17:123['a' .. 'd'] 20:43['e' .. 'g']
    r.append("level ");
    AppendNumberTo(&r, level);
    r.push_back(':');
    const std::vector<FileMetaData*>& files = files_[level];
D
dgrogan@chromium.org 已提交
209
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
      r.push_back(' ');
      AppendNumberTo(&r, files[i]->number);
      r.push_back(':');
      AppendNumberTo(&r, files[i]->file_size);
      r.append("['");
      AppendEscapedStringTo(&r, files[i]->smallest.Encode());
      r.append("' .. '");
      AppendEscapedStringTo(&r, files[i]->largest.Encode());
      r.append("']");
    }
    r.push_back('\n');
  }
  return r;
}

// A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
 private:
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
  // Helper to sort by v->files_[file_number].smallest
  struct BySmallestKey {
    const InternalKeyComparator* internal_comparator;

    bool operator()(FileMetaData* f1, FileMetaData* f2) const {
      int r = internal_comparator->Compare(f1->smallest, f2->smallest);
      if (r != 0) {
        return (r < 0);
      } else {
        // Break ties by file number
        return (f1->number < f2->number);
      }
    }
  };

  typedef std::set<FileMetaData*, BySmallestKey> FileSet;
  struct LevelState {
    std::set<uint64_t> deleted_files;
    FileSet* added_files;
  };

J
jorlow@chromium.org 已提交
251
  VersionSet* vset_;
252 253
  Version* base_;
  LevelState levels_[config::kNumLevels];
J
jorlow@chromium.org 已提交
254 255 256 257

 public:
  // Initialize a builder with the files from *base and other info from *vset
  Builder(VersionSet* vset, Version* base)
258 259 260 261 262
      : vset_(vset),
        base_(base) {
    base_->Ref();
    BySmallestKey cmp;
    cmp.internal_comparator = &vset_->icmp_;
J
jorlow@chromium.org 已提交
263
    for (int level = 0; level < config::kNumLevels; level++) {
264
      levels_[level].added_files = new FileSet(cmp);
J
jorlow@chromium.org 已提交
265 266 267 268 269
    }
  }

  ~Builder() {
    for (int level = 0; level < config::kNumLevels; level++) {
270 271 272 273 274
      std::vector<FileMetaData*> to_unref(levels_[level].added_files->begin(),
                                          levels_[level].added_files->end());
      delete levels_[level].added_files;
      for (int i = 0; i < to_unref.size(); i++) {
        FileMetaData* f = to_unref[i];
J
jorlow@chromium.org 已提交
275 276 277 278 279 280
        f->refs--;
        if (f->refs <= 0) {
          delete f;
        }
      }
    }
281
    base_->Unref();
J
jorlow@chromium.org 已提交
282 283 284 285 286
  }

  // Apply all of the edits in *edit to the current state.
  void Apply(VersionEdit* edit) {
    // Update compaction pointers
D
dgrogan@chromium.org 已提交
287
    for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
J
jorlow@chromium.org 已提交
288 289 290 291 292 293 294 295 296 297 298 299
      const int level = edit->compact_pointers_[i].first;
      vset_->compact_pointer_[level] =
          edit->compact_pointers_[i].second.Encode().ToString();
    }

    // Delete files
    const VersionEdit::DeletedFileSet& del = edit->deleted_files_;
    for (VersionEdit::DeletedFileSet::const_iterator iter = del.begin();
         iter != del.end();
         ++iter) {
      const int level = iter->first;
      const uint64_t number = iter->second;
300
      levels_[level].deleted_files.insert(number);
J
jorlow@chromium.org 已提交
301 302 303
    }

    // Add new files
D
dgrogan@chromium.org 已提交
304
    for (size_t i = 0; i < edit->new_files_.size(); i++) {
J
jorlow@chromium.org 已提交
305 306 307
      const int level = edit->new_files_[i].first;
      FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
      f->refs = 1;
308 309
      levels_[level].deleted_files.erase(f->number);
      levels_[level].added_files->insert(f);
J
jorlow@chromium.org 已提交
310 311 312 313 314
    }
  }

  // Save the current state in *v.
  void SaveTo(Version* v) {
315 316
    BySmallestKey cmp;
    cmp.internal_comparator = &vset_->icmp_;
J
jorlow@chromium.org 已提交
317
    for (int level = 0; level < config::kNumLevels; level++) {
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
      // Merge the set of added files with the set of pre-existing files.
      // Drop any deleted files.  Store the result in *v.
      const std::vector<FileMetaData*>& base_files = base_->files_[level];
      std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
      std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
      const FileSet* added = levels_[level].added_files;
      v->files_[level].reserve(base_files.size() + added->size());
      for (FileSet::const_iterator added_iter = added->begin();
           added_iter != added->end();
           ++added_iter) {
        // Add all smaller files listed in base_
        for (std::vector<FileMetaData*>::const_iterator bpos
                 = std::upper_bound(base_iter, base_end, *added_iter, cmp);
             base_iter != bpos;
             ++base_iter) {
          MaybeAddFile(v, level, *base_iter);
        }

        MaybeAddFile(v, level, *added_iter);
      }

      // Add remaining base files
      for (; base_iter != base_end; ++base_iter) {
        MaybeAddFile(v, level, *base_iter);
J
jorlow@chromium.org 已提交
342
      }
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367

#ifndef NDEBUG
      // Make sure there is no overlap in levels > 0
      if (level > 0) {
        for (int i = 1; i < v->files_[level].size(); i++) {
          const InternalKey& prev_end = v->files_[level][i-1]->largest;
          const InternalKey& this_begin = v->files_[level][i]->smallest;
          if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
            fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
                    EscapeString(prev_end.Encode()).c_str(),
                    EscapeString(this_begin.Encode()).c_str());
            abort();
          }
        }
      }
#endif
    }
  }

  void MaybeAddFile(Version* v, int level, FileMetaData* f) {
    if (levels_[level].deleted_files.count(f->number) > 0) {
      // File is deleted: do nothing
    } else {
      f->refs++;
      v->files_[level].push_back(f);
J
jorlow@chromium.org 已提交
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
    }
  }
};

VersionSet::VersionSet(const std::string& dbname,
                       const Options* options,
                       TableCache* table_cache,
                       const InternalKeyComparator* cmp)
    : env_(options->env),
      dbname_(dbname),
      options_(options),
      table_cache_(table_cache),
      icmp_(*cmp),
      next_file_number_(2),
      manifest_file_number_(0),  // Filled by Recover()
383 384 385
      last_sequence_(0),
      log_number_(0),
      prev_log_number_(0),
J
jorlow@chromium.org 已提交
386 387
      descriptor_file_(NULL),
      descriptor_log_(NULL),
388 389 390
      dummy_versions_(this),
      current_(NULL) {
  AppendVersion(new Version(this));
J
jorlow@chromium.org 已提交
391 392 393
}

VersionSet::~VersionSet() {
394 395
  current_->Unref();
  assert(dummy_versions_.next_ == &dummy_versions_);  // List must be empty
J
jorlow@chromium.org 已提交
396 397 398 399
  delete descriptor_log_;
  delete descriptor_file_;
}

400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
void VersionSet::AppendVersion(Version* v) {
  // Make "v" current
  assert(v->refs_ == 0);
  assert(v != current_);
  if (current_ != NULL) {
    current_->Unref();
  }
  current_ = v;
  v->Ref();

  // Append to linked list
  v->prev_ = dummy_versions_.prev_;
  v->next_ = &dummy_versions_;
  v->prev_->next_ = v;
  v->next_->prev_ = v;
}

Status VersionSet::LogAndApply(VersionEdit* edit) {
418 419 420 421 422 423 424 425 426 427 428
  if (edit->has_log_number_) {
    assert(edit->log_number_ >= log_number_);
    assert(edit->log_number_ < next_file_number_);
  } else {
    edit->SetLogNumber(log_number_);
  }

  if (!edit->has_prev_log_number_) {
    edit->SetPrevLogNumber(prev_log_number_);
  }

J
jorlow@chromium.org 已提交
429
  edit->SetNextFile(next_file_number_);
430
  edit->SetLastSequence(last_sequence_);
J
jorlow@chromium.org 已提交
431 432 433 434 435 436 437

  Version* v = new Version(this);
  {
    Builder builder(this, current_);
    builder.Apply(edit);
    builder.SaveTo(v);
  }
438
  Finalize(v);
J
jorlow@chromium.org 已提交
439 440 441

  // Initialize new descriptor log file if necessary by creating
  // a temporary file that contains a snapshot of the current version.
442 443 444 445 446 447 448 449 450 451
  std::string new_manifest_file;
  Status s;
  if (descriptor_log_ == NULL) {
    assert(descriptor_file_ == NULL);
    new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
    edit->SetNextFile(next_file_number_);
    s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
    if (s.ok()) {
      descriptor_log_ = new log::Writer(descriptor_file_);
      s = WriteSnapshot(descriptor_log_);
J
jorlow@chromium.org 已提交
452 453 454
    }
  }

455
  // Write new record to MANIFEST log
J
jorlow@chromium.org 已提交
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
  if (s.ok()) {
    std::string record;
    edit->EncodeTo(&record);
    s = descriptor_log_->AddRecord(record);
    if (s.ok()) {
      s = descriptor_file_->Sync();
    }
  }

  // If we just created a new descriptor file, install it by writing a
  // new CURRENT file that points to it.
  if (s.ok() && !new_manifest_file.empty()) {
    s = SetCurrentFile(env_, dbname_, manifest_file_number_);
  }

  // Install the new version
  if (s.ok()) {
473
    AppendVersion(v);
474 475
    log_number_ = edit->log_number_;
    prev_log_number_ = edit->prev_log_number_;
J
jorlow@chromium.org 已提交
476 477 478 479 480 481 482 483 484 485 486 487 488 489
  } else {
    delete v;
    if (!new_manifest_file.empty()) {
      delete descriptor_log_;
      delete descriptor_file_;
      descriptor_log_ = NULL;
      descriptor_file_ = NULL;
      env_->DeleteFile(new_manifest_file);
    }
  }

  return s;
}

490
Status VersionSet::Recover() {
J
jorlow@chromium.org 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516
  struct LogReporter : public log::Reader::Reporter {
    Status* status;
    virtual void Corruption(size_t bytes, const Status& s) {
      if (this->status->ok()) *this->status = s;
    }
  };

  // Read "CURRENT" file, which contains a pointer to the current manifest file
  std::string current;
  Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
  if (!s.ok()) {
    return s;
  }
  if (current.empty() || current[current.size()-1] != '\n') {
    return Status::Corruption("CURRENT file does not end with newline");
  }
  current.resize(current.size() - 1);

  std::string dscname = dbname_ + "/" + current;
  SequentialFile* file;
  s = env_->NewSequentialFile(dscname, &file);
  if (!s.ok()) {
    return s;
  }

  bool have_log_number = false;
517
  bool have_prev_log_number = false;
J
jorlow@chromium.org 已提交
518 519 520
  bool have_next_file = false;
  bool have_last_sequence = false;
  uint64_t next_file = 0;
521 522 523
  uint64_t last_sequence = 0;
  uint64_t log_number = 0;
  uint64_t prev_log_number = 0;
J
jorlow@chromium.org 已提交
524 525 526 527 528
  Builder builder(this, current_);

  {
    LogReporter reporter;
    reporter.status = &s;
529
    log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
J
jorlow@chromium.org 已提交
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
    Slice record;
    std::string scratch;
    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
      VersionEdit edit;
      s = edit.DecodeFrom(record);
      if (s.ok()) {
        if (edit.has_comparator_ &&
            edit.comparator_ != icmp_.user_comparator()->Name()) {
          s = Status::InvalidArgument(
              edit.comparator_ + "does not match existing comparator ",
              icmp_.user_comparator()->Name());
        }
      }

      if (s.ok()) {
        builder.Apply(&edit);
      }

      if (edit.has_log_number_) {
549
        log_number = edit.log_number_;
J
jorlow@chromium.org 已提交
550 551 552
        have_log_number = true;
      }

553 554 555 556 557
      if (edit.has_prev_log_number_) {
        prev_log_number = edit.prev_log_number_;
        have_prev_log_number = true;
      }

J
jorlow@chromium.org 已提交
558 559 560 561 562 563
      if (edit.has_next_file_number_) {
        next_file = edit.next_file_number_;
        have_next_file = true;
      }

      if (edit.has_last_sequence_) {
564
        last_sequence = edit.last_sequence_;
J
jorlow@chromium.org 已提交
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
        have_last_sequence = true;
      }
    }
  }
  delete file;
  file = NULL;

  if (s.ok()) {
    if (!have_next_file) {
      s = Status::Corruption("no meta-nextfile entry in descriptor");
    } else if (!have_log_number) {
      s = Status::Corruption("no meta-lognumber entry in descriptor");
    } else if (!have_last_sequence) {
      s = Status::Corruption("no last-sequence-number entry in descriptor");
    }
580 581 582 583

    if (!have_prev_log_number) {
      prev_log_number = 0;
    }
J
jorlow@chromium.org 已提交
584 585 586 587 588
  }

  if (s.ok()) {
    Version* v = new Version(this);
    builder.SaveTo(v);
589 590 591 592 593 594 595 596
    // Install recovered version
    Finalize(v);
    AppendVersion(v);
    manifest_file_number_ = next_file;
    next_file_number_ = next_file + 1;
    last_sequence_ = last_sequence;
    log_number_ = log_number;
    prev_log_number_ = prev_log_number;
J
jorlow@chromium.org 已提交
597 598 599 600 601
  }

  return s;
}

602 603
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
  int64_t sum = 0;
D
dgrogan@chromium.org 已提交
604
  for (size_t i = 0; i < files.size(); i++) {
605 606 607 608 609
    sum += files[i]->file_size;
  }
  return sum;
}

610
void VersionSet::Finalize(Version* v) {
J
jorlow@chromium.org 已提交
611 612 613 614
  // Precomputed best level for next compaction
  int best_level = -1;
  double best_score = -1;

615
  for (int level = 0; level < config::kNumLevels-1; level++) {
616
    double score;
J
jorlow@chromium.org 已提交
617
    if (level == 0) {
618 619 620 621 622 623 624 625 626 627 628
      // We treat level-0 specially by bounding the number of files
      // instead of number of bytes for two reasons:
      //
      // (1) With larger write-buffer sizes, it is nice not to do too
      // many level-0 compactions.
      //
      // (2) The files in level-0 are merged on every read and
      // therefore we wish to avoid too many files when the individual
      // file size is small (perhaps because of a small write-buffer
      // setting, or very high compression ratios, or lots of
      // overwrites/deletions).
629 630
      score = v->files_[level].size() /
          static_cast<double>(config::kL0_CompactionTrigger);
631 632 633 634
    } else {
      // Compute the ratio of current size to size limit.
      const uint64_t level_bytes = TotalFileSize(v->files_[level]);
      score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
J
jorlow@chromium.org 已提交
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
    }

    if (score > best_score) {
      best_level = level;
      best_score = score;
    }
  }

  v->compaction_level_ = best_level;
  v->compaction_score_ = best_score;
}

Status VersionSet::WriteSnapshot(log::Writer* log) {
  // TODO: Break up into multiple records to reduce memory usage on recovery?

  // Save metadata
  VersionEdit edit;
  edit.SetComparatorName(icmp_.user_comparator()->Name());

  // Save compaction pointers
  for (int level = 0; level < config::kNumLevels; level++) {
    if (!compact_pointer_[level].empty()) {
      InternalKey key;
      key.DecodeFrom(compact_pointer_[level]);
      edit.SetCompactPointer(level, key);
    }
  }

  // Save files
  for (int level = 0; level < config::kNumLevels; level++) {
    const std::vector<FileMetaData*>& files = current_->files_[level];
D
dgrogan@chromium.org 已提交
666
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682
      const FileMetaData* f = files[i];
      edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
    }
  }

  std::string record;
  edit.EncodeTo(&record);
  return log->AddRecord(record);
}

int VersionSet::NumLevelFiles(int level) const {
  assert(level >= 0);
  assert(level < config::kNumLevels);
  return current_->files_[level].size();
}

683 684 685 686 687 688 689 690 691 692 693 694 695 696 697
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
  // Update code if kNumLevels changes
  assert(config::kNumLevels == 7);
  snprintf(scratch->buffer, sizeof(scratch->buffer),
           "files[ %d %d %d %d %d %d %d ]",
           int(current_->files_[0].size()),
           int(current_->files_[1].size()),
           int(current_->files_[2].size()),
           int(current_->files_[3].size()),
           int(current_->files_[4].size()),
           int(current_->files_[5].size()),
           int(current_->files_[6].size()));
  return scratch->buffer;
}

J
jorlow@chromium.org 已提交
698 699 700 701
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
  uint64_t result = 0;
  for (int level = 0; level < config::kNumLevels; level++) {
    const std::vector<FileMetaData*>& files = v->files_[level];
D
dgrogan@chromium.org 已提交
702
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
      if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
        // Entire file is before "ikey", so just add the file size
        result += files[i]->file_size;
      } else if (icmp_.Compare(files[i]->smallest, ikey) > 0) {
        // Entire file is after "ikey", so ignore
        if (level > 0) {
          // Files other than level 0 are sorted by meta->smallest, so
          // no further files in this level will contain data for
          // "ikey".
          break;
        }
      } else {
        // "ikey" falls in the range for this table.  Add the
        // approximate offset of "ikey" within the table.
        Table* tableptr;
        Iterator* iter = table_cache_->NewIterator(
J
jorlow@chromium.org 已提交
719
            ReadOptions(), files[i]->number, files[i]->file_size, &tableptr);
J
jorlow@chromium.org 已提交
720 721 722 723 724 725 726 727 728 729 730
        if (tableptr != NULL) {
          result += tableptr->ApproximateOffsetOf(ikey.Encode());
        }
        delete iter;
      }
    }
  }
  return result;
}

void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
731 732 733
  for (Version* v = dummy_versions_.next_;
       v != &dummy_versions_;
       v = v->next_) {
J
jorlow@chromium.org 已提交
734 735
    for (int level = 0; level < config::kNumLevels; level++) {
      const std::vector<FileMetaData*>& files = v->files_[level];
D
dgrogan@chromium.org 已提交
736
      for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
737 738 739 740 741 742
        live->insert(files[i]->number);
      }
    }
  }
}

743 744 745 746
int64_t VersionSet::NumLevelBytes(int level) const {
  assert(level >= 0);
  assert(level < config::kNumLevels);
  return TotalFileSize(current_->files_[level]);
J
jorlow@chromium.org 已提交
747 748 749 750
}

int64_t VersionSet::MaxNextLevelOverlappingBytes() {
  int64_t result = 0;
751 752
  std::vector<FileMetaData*> overlaps;
  for (int level = 0; level < config::kNumLevels - 1; level++) {
D
dgrogan@chromium.org 已提交
753
    for (size_t i = 0; i < current_->files_[level].size(); i++) {
754 755
      const FileMetaData* f = current_->files_[level][i];
      GetOverlappingInputs(level+1, f->smallest, f->largest, &overlaps);
J
jorlow@chromium.org 已提交
756
      const int64_t sum = TotalFileSize(overlaps);
757 758 759 760 761 762 763 764
      if (sum > result) {
        result = sum;
      }
    }
  }
  return result;
}

J
jorlow@chromium.org 已提交
765 766 767 768 769 770 771 772 773 774
// Store in "*inputs" all files in "level" that overlap [begin,end]
void VersionSet::GetOverlappingInputs(
    int level,
    const InternalKey& begin,
    const InternalKey& end,
    std::vector<FileMetaData*>* inputs) {
  inputs->clear();
  Slice user_begin = begin.user_key();
  Slice user_end = end.user_key();
  const Comparator* user_cmp = icmp_.user_comparator();
D
dgrogan@chromium.org 已提交
775
  for (size_t i = 0; i < current_->files_[level].size(); i++) {
J
jorlow@chromium.org 已提交
776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
    FileMetaData* f = current_->files_[level][i];
    if (user_cmp->Compare(f->largest.user_key(), user_begin) < 0 ||
        user_cmp->Compare(f->smallest.user_key(), user_end) > 0) {
      // Either completely before or after range; skip it
    } else {
      inputs->push_back(f);
    }
  }
}

// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
                          InternalKey* smallest,
                          InternalKey* largest) {
  assert(!inputs.empty());
  smallest->Clear();
  largest->Clear();
D
dgrogan@chromium.org 已提交
795
  for (size_t i = 0; i < inputs.size(); i++) {
J
jorlow@chromium.org 已提交
796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
    FileMetaData* f = inputs[i];
    if (i == 0) {
      *smallest = f->smallest;
      *largest = f->largest;
    } else {
      if (icmp_.Compare(f->smallest, *smallest) < 0) {
        *smallest = f->smallest;
      }
      if (icmp_.Compare(f->largest, *largest) > 0) {
        *largest = f->largest;
      }
    }
  }
}

811 812 813 814 815 816 817 818 819 820 821 822
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
                           const std::vector<FileMetaData*>& inputs2,
                           InternalKey* smallest,
                           InternalKey* largest) {
  std::vector<FileMetaData*> all = inputs1;
  all.insert(all.end(), inputs2.begin(), inputs2.end());
  GetRange(all, smallest, largest);
}

J
jorlow@chromium.org 已提交
823 824 825 826 827 828 829 830 831 832 833 834 835 836 837
Iterator* VersionSet::MakeInputIterator(Compaction* c) {
  ReadOptions options;
  options.verify_checksums = options_->paranoid_checks;
  options.fill_cache = false;

  // Level-0 files have to be merged together.  For other levels,
  // we will make a concatenating iterator per level.
  // TODO(opt): use concatenating iterator for level-0 if there is no overlap
  const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
  Iterator** list = new Iterator*[space];
  int num = 0;
  for (int which = 0; which < 2; which++) {
    if (!c->inputs_[which].empty()) {
      if (c->level() + which == 0) {
        const std::vector<FileMetaData*>& files = c->inputs_[which];
D
dgrogan@chromium.org 已提交
838
        for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
839 840
          list[num++] = table_cache_->NewIterator(
              options, files[i]->number, files[i]->file_size);
J
jorlow@chromium.org 已提交
841 842 843 844
        }
      } else {
        // Create concatenating iterator for the files from this level
        list[num++] = NewTwoLevelIterator(
845
            new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
J
jorlow@chromium.org 已提交
846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
            &GetFileIterator, table_cache_, options);
      }
    }
  }
  assert(num <= space);
  Iterator* result = NewMergingIterator(&icmp_, list, num);
  delete[] list;
  return result;
}

Compaction* VersionSet::PickCompaction() {
  if (!NeedsCompaction()) {
    return NULL;
  }
  const int level = current_->compaction_level_;
  assert(level >= 0);
862
  assert(level+1 < config::kNumLevels);
J
jorlow@chromium.org 已提交
863 864 865 866 867 868

  Compaction* c = new Compaction(level);
  c->input_version_ = current_;
  c->input_version_->Ref();

  // Pick the first file that comes after compact_pointer_[level]
D
dgrogan@chromium.org 已提交
869
  for (size_t i = 0; i < current_->files_[level].size(); i++) {
J
jorlow@chromium.org 已提交
870 871 872 873 874 875 876 877 878 879 880 881 882 883
    FileMetaData* f = current_->files_[level][i];
    if (compact_pointer_[level].empty() ||
        icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
      c->inputs_[0].push_back(f);
      break;
    }
  }
  if (c->inputs_[0].empty()) {
    // Wrap-around to the beginning of the key space
    c->inputs_[0].push_back(current_->files_[level][0]);
  }

  // Files in level 0 may overlap each other, so pick up all overlapping ones
  if (level == 0) {
884 885
    InternalKey smallest, largest;
    GetRange(c->inputs_[0], &smallest, &largest);
J
jorlow@chromium.org 已提交
886 887 888 889 890 891 892
    // Note that the next call will discard the file we placed in
    // c->inputs_[0] earlier and replace it with an overlapping set
    // which will include the picked file.
    GetOverlappingInputs(0, smallest, largest, &c->inputs_[0]);
    assert(!c->inputs_[0].empty());
  }

893 894 895 896 897 898 899 900 901 902
  SetupOtherInputs(c);

  return c;
}

void VersionSet::SetupOtherInputs(Compaction* c) {
  const int level = c->level();
  InternalKey smallest, largest;
  GetRange(c->inputs_[0], &smallest, &largest);

J
jorlow@chromium.org 已提交
903 904
  GetOverlappingInputs(level+1, smallest, largest, &c->inputs_[1]);

905 906 907 908
  // Get entire range covered by compaction
  InternalKey all_start, all_limit;
  GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);

J
jorlow@chromium.org 已提交
909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930
  // See if we can grow the number of inputs in "level" without
  // changing the number of "level+1" files we pick up.
  if (!c->inputs_[1].empty()) {
    std::vector<FileMetaData*> expanded0;
    GetOverlappingInputs(level, all_start, all_limit, &expanded0);
    if (expanded0.size() > c->inputs_[0].size()) {
      InternalKey new_start, new_limit;
      GetRange(expanded0, &new_start, &new_limit);
      std::vector<FileMetaData*> expanded1;
      GetOverlappingInputs(level+1, new_start, new_limit, &expanded1);
      if (expanded1.size() == c->inputs_[1].size()) {
        Log(env_, options_->info_log,
            "Expanding@%d %d+%d to %d+%d\n",
            level,
            int(c->inputs_[0].size()),
            int(c->inputs_[1].size()),
            int(expanded0.size()),
            int(expanded1.size()));
        smallest = new_start;
        largest = new_limit;
        c->inputs_[0] = expanded0;
        c->inputs_[1] = expanded1;
931
        GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
J
jorlow@chromium.org 已提交
932 933 934 935
      }
    }
  }

936 937 938 939 940 941
  // Compute the set of grandparent files that overlap this compaction
  // (parent == level+1; grandparent == level+2)
  if (level + 2 < config::kNumLevels) {
    GetOverlappingInputs(level + 2, all_start, all_limit, &c->grandparents_);
  }

J
jorlow@chromium.org 已提交
942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970
  if (false) {
    Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'",
        level,
        EscapeString(smallest.Encode()).c_str(),
        EscapeString(largest.Encode()).c_str());
  }

  // Update the place where we will do the next compaction for this level.
  // We update this immediately instead of waiting for the VersionEdit
  // to be applied so that if the compaction fails, we will try a different
  // key range next time.
  compact_pointer_[level] = largest.Encode().ToString();
  c->edit_.SetCompactPointer(level, largest);
}

Compaction* VersionSet::CompactRange(
    int level,
    const InternalKey& begin,
    const InternalKey& end) {
  std::vector<FileMetaData*> inputs;
  GetOverlappingInputs(level, begin, end, &inputs);
  if (inputs.empty()) {
    return NULL;
  }

  Compaction* c = new Compaction(level);
  c->input_version_ = current_;
  c->input_version_->Ref();
  c->inputs_[0] = inputs;
971
  SetupOtherInputs(c);
J
jorlow@chromium.org 已提交
972 973 974 975 976 977
  return c;
}

Compaction::Compaction(int level)
    : level_(level),
      max_output_file_size_(MaxFileSizeForLevel(level)),
978 979
      input_version_(NULL),
      grandparent_index_(0),
J
jorlow@chromium.org 已提交
980 981
      seen_key_(false),
      overlapped_bytes_(0) {
J
jorlow@chromium.org 已提交
982 983 984 985 986 987 988 989 990 991 992
  for (int i = 0; i < config::kNumLevels; i++) {
    level_ptrs_[i] = 0;
  }
}

Compaction::~Compaction() {
  if (input_version_ != NULL) {
    input_version_->Unref();
  }
}

993
bool Compaction::IsTrivialMove() const {
J
jorlow@chromium.org 已提交
994
  // Avoid a move if there is lots of overlapping grandparent data.
995 996
  // Otherwise, the move could create a parent file that will require
  // a very expensive merge later on.
J
jorlow@chromium.org 已提交
997 998 999
  return (num_input_files(0) == 1 &&
          num_input_files(1) == 0 &&
          TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
1000 1001
}

J
jorlow@chromium.org 已提交
1002 1003
void Compaction::AddInputDeletions(VersionEdit* edit) {
  for (int which = 0; which < 2; which++) {
D
dgrogan@chromium.org 已提交
1004
    for (size_t i = 0; i < inputs_[which].size(); i++) {
J
jorlow@chromium.org 已提交
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030
      edit->DeleteFile(level_ + which, inputs_[which][i]->number);
    }
  }
}

bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
  // Maybe use binary search to find right entry instead of linear search?
  const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
  for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
    const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
    for (; level_ptrs_[lvl] < files.size(); ) {
      FileMetaData* f = files[level_ptrs_[lvl]];
      if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
        // We've advanced far enough
        if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
          // Key falls in this file's range, so definitely not base level
          return false;
        }
        break;
      }
      level_ptrs_[lvl]++;
    }
  }
  return true;
}

1031
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
1032 1033 1034
  // Scan to find earliest grandparent file that contains key.
  const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
  while (grandparent_index_ < grandparents_.size() &&
1035 1036
      icmp->Compare(internal_key,
                    grandparents_[grandparent_index_]->largest.Encode()) > 0) {
J
jorlow@chromium.org 已提交
1037 1038 1039
    if (seen_key_) {
      overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
    }
1040 1041
    grandparent_index_++;
  }
J
jorlow@chromium.org 已提交
1042
  seen_key_ = true;
1043

J
jorlow@chromium.org 已提交
1044 1045 1046
  if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
    // Too much overlap for current output; start new output
    overlapped_bytes_ = 0;
1047 1048 1049 1050 1051 1052
    return true;
  } else {
    return false;
  }
}

J
jorlow@chromium.org 已提交
1053 1054 1055 1056 1057 1058 1059 1060
void Compaction::ReleaseInputs() {
  if (input_version_ != NULL) {
    input_version_->Unref();
    input_version_ = NULL;
  }
}

}