version_set.cc 170.2 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/version_set.h"
I
Igor Canadi 已提交
11

L
liuhuahang 已提交
12
#ifndef __STDC_FORMAT_MACROS
I
Igor Canadi 已提交
13
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
14 15
#endif

I
Igor Canadi 已提交
16
#include <inttypes.h>
17
#include <stdio.h>
赵明 已提交
18

J
jorlow@chromium.org 已提交
19
#include <algorithm>
20
#include <list>
I
Igor Canadi 已提交
21
#include <map>
I
Igor Canadi 已提交
22
#include <set>
23
#include <string>
24
#include <terark/valvec.hpp>
25
#include <unordered_map>
26
#include <vector>
赵明 已提交
27

28
#include "db/compaction.h"
29
#include "db/internal_stats.h"
J
jorlow@chromium.org 已提交
30 31 32
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
33
#include "db/merge_context.h"
34
#include "db/merge_helper.h"
J
jorlow@chromium.org 已提交
35
#include "db/table_cache.h"
S
sdong 已提交
36
#include "db/version_builder.h"
37
#include "monitoring/file_read_sample.h"
38
#include "monitoring/perf_context_imp.h"
39 40
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
41
#include "rocksdb/write_buffer_manager.h"
42 43
#include "table/format.h"
#include "table/get_context.h"
S
sdong 已提交
44
#include "table/internal_iterator.h"
45
#include "table/merging_iterator.h"
46
#include "table/meta_blocks.h"
47 48 49
#include "table/plain_table_factory.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
赵明 已提交
50
#include "util/c_style_callback.h"
J
jorlow@chromium.org 已提交
51
#include "util/coding.h"
52
#include "util/file_reader_writer.h"
53
#include "util/filename.h"
54
#include "util/logging.h"
55
#include "util/stop_watch.h"
56
#include "util/string_util.h"
S
sdong 已提交
57
#include "util/sync_point.h"
J
jorlow@chromium.org 已提交
58

59
namespace rocksdb {
J
jorlow@chromium.org 已提交
60

61 62
namespace {

63
// Find File in LevelFilesBrief data structure
64 65
// Within an index range defined by left and right
int FindFileInRange(const InternalKeyComparator& icmp,
赵明 已提交
66 67
                    const LevelFilesBrief& file_level, const Slice& key,
                    uint32_t left, uint32_t right) {
68 69 70
  return static_cast<int>(
      terark::lower_bound_ex_n(file_level.files, left, right, key,
                               TERARK_FIELD(largest_key), "" < icmp));
71 72
}

73
Status OverlapWithIterator(const Comparator* ucmp,
赵明 已提交
74 75 76
                           const Slice& smallest_user_key,
                           const Slice& largest_user_key,
                           InternalIterator* iter, bool* overlap) {
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
  InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
                          kValueTypeForSeek);
  iter->Seek(range_start.Encode());
  if (!iter->status().ok()) {
    return iter->status();
  }

  *overlap = false;
  if (iter->Valid()) {
    ParsedInternalKey seek_result;
    if (!ParseInternalKey(iter->key(), &seek_result)) {
      return Status::Corruption("DB have corrupted keys");
    }

    if (ucmp->Compare(seek_result.user_key, largest_user_key) <= 0) {
      *overlap = true;
    }
  }

  return iter->status();
}

99 100 101 102 103 104 105 106
// Class to help choose the next file to search for the particular key.
// Searches and returns files level by level.
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in a smaller level, later levels are irrelevant (unless we
// are MergeInProgress).
class FilePicker {
 public:
107 108 109 110 111
  FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
             const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
             unsigned int num_levels, FileIndexer* file_indexer,
             const Comparator* user_comparator,
             const InternalKeyComparator* internal_comparator)
112
      : num_levels_(num_levels),
113
        curr_level_(static_cast<unsigned int>(-1)),
114
        returned_file_level_(static_cast<unsigned int>(-1)),
115
        hit_file_level_(static_cast<unsigned int>(-1)),
116 117
        search_left_bound_(0),
        search_right_bound_(FileIndexer::kLevelMaxIndex),
118
#ifndef NDEBUG
119
        files_(files),
120
#endif
121
        level_files_brief_(file_levels),
122
        is_hit_file_last_in_level_(false),
123
        curr_file_level_(nullptr),
124 125 126 127 128
        user_key_(user_key),
        ikey_(ikey),
        file_indexer_(file_indexer),
        user_comparator_(user_comparator),
        internal_comparator_(internal_comparator) {
129 130 131
#ifdef NDEBUG
    (void)files;
#endif
132 133 134 135
    // Setup member variables to search first level.
    search_ended_ = !PrepareNextLevel();
    if (!search_ended_) {
      // Prefetch Level 0 table data to avoid cache miss if possible.
136 137
      for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
        auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
138 139 140 141 142 143 144
        if (r) {
          r->Prepare(ikey);
        }
      }
    }
  }

145
  int GetCurrentLevel() const { return curr_level_; }
146

147 148 149 150 151
  FdWithKeyRange* GetNextFile() {
    while (!search_ended_) {  // Loops over different levels.
      while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
        // Loops over all files in current level.
        FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
152
        hit_file_level_ = curr_level_;
153 154
        is_hit_file_last_in_level_ =
            curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
155 156 157 158
        int cmp_largest = -1;

        // Do key range filtering of files or/and fractional cascading if:
        // (1) not all the files are in level 0, or
赵星宇 已提交
159
        // (2) there are more than 3 current level files
赵明 已提交
160 161 162
        // If there are only 3 or less current level files in the system, we
        // skip the key range filtering. In this case, more likely, the system
        // is highly tuned to minimize number of tables queried by each query,
163 164 165 166 167 168
        // so it is unlikely that key range filtering is more efficient than
        // querying the files.
        if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
          // Check if key is within a file's range. If search left bound and
          // right bound point to the same find, we are sure key falls in
          // range.
赵明 已提交
169 170 171 172 173 174 175
          assert(curr_level_ == 0 ||
                 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
                 user_comparator_->Compare(
                     user_key_, ExtractUserKey(f->smallest_key)) <= 0);

          int cmp_smallest = user_comparator_->Compare(
              user_key_, ExtractUserKey(f->smallest_key));
176
          if (cmp_smallest >= 0) {
赵明 已提交
177 178
            cmp_largest = user_comparator_->Compare(
                user_key_, ExtractUserKey(f->largest_key));
179 180 181 182 183
          }

          // Setup file search bound for the next level based on the
          // comparison results
          if (curr_level_ > 0) {
赵明 已提交
184 185 186
            file_indexer_->GetNextLevelIndex(
                curr_level_, curr_index_in_curr_level_, cmp_smallest,
                cmp_largest, &search_left_bound_, &search_right_bound_);
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
          }
          // Key falls out of current file's range
          if (cmp_smallest < 0 || cmp_largest > 0) {
            if (curr_level_ == 0) {
              ++curr_index_in_curr_level_;
              continue;
            } else {
              // Search next level.
              break;
            }
          }
        }
#ifndef NDEBUG
        // Sanity check to make sure that the files are correctly sorted
        if (prev_file_) {
          if (curr_level_ != 0) {
            int comp_sign = internal_comparator_->Compare(
                prev_file_->largest_key, f->smallest_key);
            assert(comp_sign < 0);
          } else {
            // level == 0, the current file cannot be newer than the previous
            // one. Use compressed data structure, has no attribute seqNo
            assert(curr_index_in_curr_level_ > 0);
赵明 已提交
210 211 212
            assert(
                !NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
                                    files_[0][curr_index_in_curr_level_ - 1]));
213 214 215 216
          }
        }
        prev_file_ = f;
#endif
217
        returned_file_level_ = curr_level_;
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
        if (curr_level_ > 0 && cmp_largest < 0) {
          // No more files to search in this level.
          search_ended_ = !PrepareNextLevel();
        } else {
          ++curr_index_in_curr_level_;
        }
        return f;
      }
      // Start searching next level.
      search_ended_ = !PrepareNextLevel();
    }
    // Search ended.
    return nullptr;
  }

233 234 235 236
  // getter for current file level
  // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
  unsigned int GetHitFileLevel() { return hit_file_level_; }

237 238 239 240
  // Returns true if the most recent "hit file" (i.e., one returned by
  // GetNextFile()) is at the last index in its level.
  bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }

241 242 243
 private:
  unsigned int num_levels_;
  unsigned int curr_level_;
244
  unsigned int returned_file_level_;
245
  unsigned int hit_file_level_;
246 247
  int32_t search_left_bound_;
  int32_t search_right_bound_;
248
#ifndef NDEBUG
249
  std::vector<FileMetaData*>* files_;
250
#endif
251
  autovector<LevelFilesBrief>* level_files_brief_;
252
  bool search_ended_;
253
  bool is_hit_file_last_in_level_;
254
  LevelFilesBrief* curr_file_level_;
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
  unsigned int curr_index_in_curr_level_;
  unsigned int start_index_in_curr_level_;
  Slice user_key_;
  Slice ikey_;
  FileIndexer* file_indexer_;
  const Comparator* user_comparator_;
  const InternalKeyComparator* internal_comparator_;
#ifndef NDEBUG
  FdWithKeyRange* prev_file_;
#endif

  // Setup local variables to search next level.
  // Returns false if there are no more levels to search.
  bool PrepareNextLevel() {
    curr_level_++;
    while (curr_level_ < num_levels_) {
271
      curr_file_level_ = &(*level_files_brief_)[curr_level_];
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
      if (curr_file_level_->num_files == 0) {
        // When current level is empty, the search bound generated from upper
        // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
        // also empty.
        assert(search_left_bound_ == 0);
        assert(search_right_bound_ == -1 ||
               search_right_bound_ == FileIndexer::kLevelMaxIndex);
        // Since current level is empty, it will need to search all files in
        // the next level
        search_left_bound_ = 0;
        search_right_bound_ = FileIndexer::kLevelMaxIndex;
        curr_level_++;
        continue;
      }

      // Some files may overlap each other. We find
      // all files that overlap user_key and process them in order from
      // newest to oldest. In the context of merge-operator, this can occur at
      // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
      // are always compacted into a single entry).
      int32_t start_index;
      if (curr_level_ == 0) {
        // On Level-0, we read through all files to check for overlap.
        start_index = 0;
      } else {
        // On Level-n (n>=1), files are sorted. Binary search to find the
        // earliest file whose largest key >= ikey. Search left bound and
        // right bound are used to narrow the range.
300
        if (search_left_bound_ <= search_right_bound_) {
301
          if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
302 303
            search_right_bound_ =
                static_cast<int32_t>(curr_file_level_->num_files) - 1;
304
          }
305 306 307 308
          // `search_right_bound_` is an inclusive upper-bound, but since it was
          // determined based on user key, it is still possible the lookup key
          // falls to the right of `search_right_bound_`'s corresponding file.
          // So, pass a limit one higher, which allows us to detect this case.
309 310 311
          start_index =
              FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
                              static_cast<uint32_t>(search_left_bound_),
312 313 314 315 316 317 318 319 320 321
                              static_cast<uint32_t>(search_right_bound_) + 1);
          if (start_index == search_right_bound_ + 1) {
            // `ikey_` comes after `search_right_bound_`. The lookup key does
            // not exist on this level, so let's skip this level and do a full
            // binary search on the next level.
            search_left_bound_ = 0;
            search_right_bound_ = FileIndexer::kLevelMaxIndex;
            curr_level_++;
            continue;
          }
322 323
        } else {
          // search_left_bound > search_right_bound, key does not exist in
C
clark.kang 已提交
324
          // this level. Since no comparison is done in this level, it will
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
          // need to search all files in the next level.
          search_left_bound_ = 0;
          search_right_bound_ = FileIndexer::kLevelMaxIndex;
          curr_level_++;
          continue;
        }
      }
      start_index_in_curr_level_ = start_index;
      curr_index_in_curr_level_ = start_index;
#ifndef NDEBUG
      prev_file_ = nullptr;
#endif
      return true;
    }
    // curr_level_ = num_levels_. So, no more levels to search.
    return false;
  }
};
}  // anonymous namespace

赵明 已提交
345
VersionStorageInfo::~VersionStorageInfo() { delete[](files_ - 1); }
S
sdong 已提交
346

J
jorlow@chromium.org 已提交
347 348
Version::~Version() {
  assert(refs_ == 0);
349 350 351 352 353 354

  // Remove from linked list
  prev_->next_ = next_;
  next_->prev_ = prev_;

  // Drop references to files
奏之章 已提交
355
  for (int level = -1; level < storage_info_.num_levels_; level++) {
S
sdong 已提交
356 357
    for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
      FileMetaData* f = storage_info_.files_[level][i];
358
      assert(f->refs > 0);
Z
ZhaoMing 已提交
359
      f->refs--;
J
jorlow@chromium.org 已提交
360
      if (f->refs <= 0) {
361 362 363 364 365
        assert(cfd_ != nullptr);
        uint32_t path_id = f->fd.GetPathId();
        assert(path_id < cfd_->ioptions()->cf_paths.size());
        vset_->obsolete_files_.push_back(
            ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
J
jorlow@chromium.org 已提交
366 367 368 369 370
      }
    }
  }
}

371
int FindFile(const InternalKeyComparator& icmp,
赵明 已提交
372
             const LevelFilesBrief& file_level, const Slice& key) {
373 374
  return FindFileInRange(icmp, file_level, key, 0,
                         static_cast<uint32_t>(file_level.num_files));
375 376
}

377
void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
奏之章 已提交
378 379
                               const std::vector<FileMetaData*>& files,
                               Arena* arena) {
F
Feng Zhu 已提交
380 381 382 383 384 385
  assert(file_level);
  assert(arena);

  size_t num = files.size();
  file_level->num_files = num;
  char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
奏之章 已提交
386
  file_level->files = new (mem) FdWithKeyRange[num];
F
Feng Zhu 已提交
387 388 389 390 391 392 393 394 395 396 397 398 399 400

  for (size_t i = 0; i < num; i++) {
    Slice smallest_key = files[i]->smallest.Encode();
    Slice largest_key = files[i]->largest.Encode();

    // Copy key slice to sequential memory
    size_t smallest_size = smallest_key.size();
    size_t largest_size = largest_key.size();
    mem = arena->AllocateAligned(smallest_size + largest_size);
    memcpy(mem, smallest_key.data(), smallest_size);
    memcpy(mem + smallest_size, largest_key.data(), largest_size);

    FdWithKeyRange& f = file_level->files[i];
    f.fd = files[i]->fd;
401
    f.file_metadata = files[i];
F
Feng Zhu 已提交
402 403 404 405 406
    f.smallest_key = Slice(mem, smallest_size);
    f.largest_key = Slice(mem + smallest_size, largest_size);
  }
}

赵明 已提交
407 408
static bool AfterFile(const Comparator* ucmp, const Slice* user_key,
                      const FdWithKeyRange* f) {
A
Abhishek Kona 已提交
409 410
  // nullptr user_key occurs before all keys and is therefore never after *f
  return (user_key != nullptr &&
411
          ucmp->Compare(*user_key, ExtractUserKey(f->largest_key)) > 0);
G
Gabor Cselle 已提交
412 413
}

赵明 已提交
414 415
static bool BeforeFile(const Comparator* ucmp, const Slice* user_key,
                       const FdWithKeyRange* f) {
A
Abhishek Kona 已提交
416 417
  // nullptr user_key occurs after all keys and is therefore never before *f
  return (user_key != nullptr &&
418
          ucmp->Compare(*user_key, ExtractUserKey(f->smallest_key)) < 0);
G
Gabor Cselle 已提交
419 420
}

赵明 已提交
421 422 423 424 425
bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
                           bool disjoint_sorted_files,
                           const LevelFilesBrief& file_level,
                           const Slice* smallest_user_key,
                           const Slice* largest_user_key) {
G
Gabor Cselle 已提交
426 427 428
  const Comparator* ucmp = icmp.user_comparator();
  if (!disjoint_sorted_files) {
    // Need to check against all files
429 430
    for (size_t i = 0; i < file_level.num_files; i++) {
      const FdWithKeyRange* f = &(file_level.files[i]);
G
Gabor Cselle 已提交
431 432 433 434 435 436 437 438 439 440 441 442
      if (AfterFile(ucmp, smallest_user_key, f) ||
          BeforeFile(ucmp, largest_user_key, f)) {
        // No overlap
      } else {
        return true;  // Overlap
      }
    }
    return false;
  }

  // Binary search over file list
  uint32_t index = 0;
A
Abhishek Kona 已提交
443
  if (smallest_user_key != nullptr) {
A
Amy Xu 已提交
444
    // Find the leftmost possible internal key for smallest_user_key
445
    InternalKey small;
A
Amy Xu 已提交
446
    small.SetMinPossibleForUserKey(*smallest_user_key);
447
    index = FindFile(icmp, file_level, small.Encode());
G
Gabor Cselle 已提交
448 449
  }

450
  if (index >= file_level.num_files) {
G
Gabor Cselle 已提交
451 452 453 454
    // beginning of range is after all files, so no overlap.
    return false;
  }

455
  return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
456 457
}

458
namespace {
459

Z
ZhaoMing 已提交
460
class LevelIterator final : public InternalIterator, public Snapshot {
J
jorlow@chromium.org 已提交
461
 public:
462 463 464
  LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
                const EnvOptions& env_options,
                const InternalKeyComparator& icomparator,
465
                const LevelFilesBrief* flevel,
Z
ZhaoMing 已提交
466
                const DependenceMap& dependence_map,
467
                const SliceTransform* prefix_extractor, bool should_sample,
468
                HistogramImpl* file_read_hist, bool for_compaction,
赵明 已提交
469
                bool skip_filters, int level, RangeDelAggregator* range_del_agg)
470 471
      : table_cache_(table_cache),
        read_options_(read_options),
Z
ZhaoMing 已提交
472
        snapshot_(0),
473 474
        env_options_(env_options),
        icomparator_(icomparator),
F
Feng Zhu 已提交
475
        flevel_(flevel),
Z
ZhaoMing 已提交
476
        dependence_map_(dependence_map),
477
        prefix_extractor_(prefix_extractor),
478 479 480 481 482 483
        file_read_hist_(file_read_hist),
        should_sample_(should_sample),
        for_compaction_(for_compaction),
        skip_filters_(skip_filters),
        file_index_(flevel_->num_files),
        level_(level),
赵明 已提交
484
        range_del_agg_(range_del_agg) {
485 486
    // Empty level is not supported.
    assert(flevel_ != nullptr && flevel_->num_files > 0);
Z
ZhaoMing 已提交
487 488 489 490 491 492
    if (read_options_.snapshot != nullptr) {
      snapshot_ = read_options_.snapshot->GetSequenceNumber();
      read_options_.snapshot = this;
    }
    read_options_.iterate_lower_bound = nullptr;
    read_options_.iterate_upper_bound = nullptr;
A
Aaron Gao 已提交
493 494
  }

495 496
  virtual ~LevelIterator() { delete file_iter_.Set(nullptr); }

赵明 已提交
497
  SequenceNumber GetSequenceNumber() const override { return snapshot_; }
Z
ZhaoMing 已提交
498

499 500 501 502 503 504 505 506 507
  virtual void Seek(const Slice& target) override;
  virtual void SeekForPrev(const Slice& target) override;
  virtual void SeekToFirst() override;
  virtual void SeekToLast() override;
  virtual void Next() override;
  virtual void Prev() override;

  virtual bool Valid() const override { return file_iter_.Valid(); }
  virtual Slice key() const override {
Z
ZhaoMing 已提交
508
    assert(file_iter_.Valid());
509
    return file_iter_.key();
J
jorlow@chromium.org 已提交
510
  }
Z
ZhaoMing 已提交
511
  virtual LazyBuffer value() const override {
Z
ZhaoMing 已提交
512
    assert(file_iter_.Valid());
Z
ZhaoMing 已提交
513
    return file_iter_.value();
Z
ZhaoMing 已提交
514
  }
515
  virtual Status status() const override {
516
    return file_iter_.iter() ? file_iter_.status() : Status::OK();
J
jorlow@chromium.org 已提交
517
  }
I
Igor Sugak 已提交
518

J
jorlow@chromium.org 已提交
519
 private:
520 521 522 523
  void SkipEmptyFileForward();
  void SkipEmptyFileBackward();
  void SetFileIterator(InternalIterator* iter);
  void InitFileIterator(size_t new_file_index);
J
jorlow@chromium.org 已提交
524

525 526 527
  const Slice& file_smallest_key(size_t file_index) {
    assert(file_index < flevel_->num_files);
    return flevel_->files[file_index].smallest_key;
J
jorlow@chromium.org 已提交
528 529
  }

530
  bool KeyReachedUpperBound(const Slice& internal_key) {
531 532 533 534 535 536
    return read_options_.iterate_upper_bound != nullptr &&
           icomparator_.user_comparator()->Compare(
               ExtractUserKey(internal_key),
               *read_options_.iterate_upper_bound) >= 0;
  }

537 538 539 540 541 542 543
  InternalIterator* NewFileIterator() {
    assert(file_index_ < flevel_->num_files);
    auto file_meta = flevel_->files[file_index_];
    if (should_sample_) {
      sample_file_read_inc(file_meta.file_metadata);
    }
    return table_cache_->NewIterator(
544
        read_options_, env_options_, icomparator_, *file_meta.file_metadata,
Z
ZhaoMing 已提交
545
        dependence_map_, range_del_agg_, prefix_extractor_,
奏之章 已提交
546
        nullptr /* don't need reference to table */, file_read_hist_,
赵明 已提交
547
        for_compaction_, nullptr /* arena */, skip_filters_, level_);
548 549
  }

L
Lei Jin 已提交
550
  TableCache* table_cache_;
Z
ZhaoMing 已提交
551 552
  ReadOptions read_options_;
  SequenceNumber snapshot_;
L
Lei Jin 已提交
553 554
  const EnvOptions& env_options_;
  const InternalKeyComparator& icomparator_;
555
  const LevelFilesBrief* flevel_;
Z
ZhaoMing 已提交
556
  const DependenceMap& dependence_map_;
557
  mutable FileDescriptor current_value_;
558
  const SliceTransform* prefix_extractor_;
559

560
  HistogramImpl* file_read_hist_;
561
  bool should_sample_;
L
Lei Jin 已提交
562
  bool for_compaction_;
563
  bool skip_filters_;
564
  size_t file_index_;
565
  int level_;
566
  RangeDelAggregator* range_del_agg_;
567
  IteratorWrapper file_iter_;  // May be nullptr
L
Lei Jin 已提交
568
};
T
Tyler Harter 已提交
569

570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622
void LevelIterator::Seek(const Slice& target) {
  size_t new_file_index = FindFile(icomparator_, *flevel_, target);

  InitFileIterator(new_file_index);
  if (file_iter_.iter() != nullptr) {
    file_iter_.Seek(target);
  }
  SkipEmptyFileForward();
}

void LevelIterator::SeekForPrev(const Slice& target) {
  size_t new_file_index = FindFile(icomparator_, *flevel_, target);
  if (new_file_index >= flevel_->num_files) {
    new_file_index = flevel_->num_files - 1;
  }

  InitFileIterator(new_file_index);
  if (file_iter_.iter() != nullptr) {
    file_iter_.SeekForPrev(target);
    SkipEmptyFileBackward();
  }
}

void LevelIterator::SeekToFirst() {
  InitFileIterator(0);
  if (file_iter_.iter() != nullptr) {
    file_iter_.SeekToFirst();
  }
  SkipEmptyFileForward();
}

void LevelIterator::SeekToLast() {
  InitFileIterator(flevel_->num_files - 1);
  if (file_iter_.iter() != nullptr) {
    file_iter_.SeekToLast();
  }
  SkipEmptyFileBackward();
}

void LevelIterator::Next() {
  assert(Valid());
  file_iter_.Next();
  SkipEmptyFileForward();
}

void LevelIterator::Prev() {
  assert(Valid());
  file_iter_.Prev();
  SkipEmptyFileBackward();
}

void LevelIterator::SkipEmptyFileForward() {
  while (file_iter_.iter() == nullptr ||
623 624
         (!file_iter_.Valid() && file_iter_.status().ok() &&
          !file_iter_.iter()->IsOutOfBound())) {
625 626 627
    // Move to next file
    if (file_index_ >= flevel_->num_files - 1) {
      // Already at the last file
奏之章 已提交
628
      file_iter_.SetValid(false);
629 630 631
      return;
    }
    if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
奏之章 已提交
632
      file_iter_.SetValid(false);
633 634 635 636 637 638 639 640 641 642 643
      return;
    }
    InitFileIterator(file_index_ + 1);
    if (file_iter_.iter() != nullptr) {
      file_iter_.SeekToFirst();
    }
  }
}

void LevelIterator::SkipEmptyFileBackward() {
  while (file_iter_.iter() == nullptr ||
644
         (!file_iter_.Valid() && file_iter_.status().ok())) {
645 646 647
    // Move to previous file
    if (file_index_ == 0) {
      // Already the first file
奏之章 已提交
648
      file_iter_.SetValid(false);
649 650 651 652 653 654 655 656 657 658
      return;
    }
    InitFileIterator(file_index_ - 1);
    if (file_iter_.iter() != nullptr) {
      file_iter_.SeekToLast();
    }
  }
}

void LevelIterator::SetFileIterator(InternalIterator* iter) {
Z
ZhaoMing 已提交
659
  delete file_iter_.Set(iter);
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
}

void LevelIterator::InitFileIterator(size_t new_file_index) {
  if (new_file_index >= flevel_->num_files) {
    file_index_ = new_file_index;
    SetFileIterator(nullptr);
    return;
  } else {
    // If the file iterator shows incomplete, we try it again if users seek
    // to the same file, as this time we may go to a different data block
    // which is cached in block cache.
    //
    if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
        new_file_index == file_index_) {
      // file_iter_ is already constructed with this iterator, so
      // no need to change anything
    } else {
      file_index_ = new_file_index;
      InternalIterator* iter = NewFileIterator();
      SetFileIterator(iter);
    }
  }
}

S
sdong 已提交
684 685
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
686
// Both of the constructor and destructor need to be called inside DB Mutex.
S
sdong 已提交
687 688 689
class BaseReferencedVersionBuilder {
 public:
  explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
690
      : version_builder_(new VersionBuilder(
691
            cfd->current()->version_set()->env_options(), cfd->table_cache(),
Z
ZhaoMing 已提交
692
            cfd->current()->storage_info(), cfd->ioptions()->info_log)),
S
sdong 已提交
693 694 695
        version_(cfd->current()) {
    version_->Ref();
  }
696 697 698 699
  ~BaseReferencedVersionBuilder() {
    delete version_builder_;
    version_->Unref();
  }
700
  VersionBuilder* version_builder() { return version_builder_; }
701
  VersionStorageInfo* version_storage() { return version_->storage_info(); }
S
sdong 已提交
702 703

 private:
704
  VersionBuilder* version_builder_;
S
sdong 已提交
705 706
  Version* version_;
};
707 708
}  // anonymous namespace

709 710
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
                                   const FileMetaData* file_meta,
711
                                   const std::string* fname) const {
I
Igor Canadi 已提交
712
  auto table_cache = cfd_->table_cache();
713
  auto ioptions = cfd_->ioptions();
714
  Status s = table_cache->GetTableProperties(
715 716
      env_options_, cfd_->internal_comparator(), file_meta->fd, tp,
      mutable_cf_options_.prefix_extractor.get(), true /* no io */);
717 718 719 720 721 722 723 724 725 726 727 728 729
  if (s.ok()) {
    return s;
  }

  // We only ignore error type `Incomplete` since it's by design that we
  // disallow table when it's not in table cache.
  if (!s.IsIncomplete()) {
    return s;
  }

  // 2. Table is not present in table cache, we'll read the table properties
  // directly from the properties block in the file.
  std::unique_ptr<RandomAccessFile> file;
730
  std::string file_name;
731
  if (fname != nullptr) {
732
    file_name = *fname;
733
  } else {
赵明 已提交
734 735
    file_name = TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
                              file_meta->fd.GetPathId());
736
  }
737
  s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
738 739 740 741 742 743 744
  if (!s.ok()) {
    return s;
  }

  TableProperties* raw_table_properties;
  // By setting the magic number to kInvalidTableMagicNumber, we can by
  // pass the magic number check in the footer.
745
  std::unique_ptr<RandomAccessFileReader> file_reader(
746 747 748 749 750
      new RandomAccessFileReader(
          std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
          0 /* hist_type */, nullptr /* file_read_hist */,
          nullptr /* rate_limiter */, false /* for_compaction*/,
          ioptions->listeners));
751
  s = ReadTableProperties(
752
      file_reader.get(), file_meta->fd.GetFileSize(),
753 754
      Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
      &raw_table_properties, false /* compression_type_missing */);
755 756 757
  if (!s.ok()) {
    return s;
  }
758
  RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
759 760 761 762 763 764

  *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
  return s;
}

Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
765
  Status s;
奏之章 已提交
766
  for (int level = -1; level < storage_info_.num_levels_; level++) {
767 768 769 770 771 772 773 774 775 776 777
    s = GetPropertiesOfAllTables(props, level);
    if (!s.ok()) {
      return s;
    }
  }

  return Status::OK();
}

Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
                                         int level) {
奏之章 已提交
778
  for (const auto file_meta : storage_info_.files_[level]) {
779
    auto fname =
780
        TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
781 782 783 784 785 786 787 788 789
                      file_meta->fd.GetPathId());
    // 1. If the table is already present in table cache, load table
    // properties from there.
    std::shared_ptr<const TableProperties> table_properties;
    Status s = GetTableProperties(&table_properties, file_meta, &fname);
    if (s.ok()) {
      props->insert({fname, table_properties});
    } else {
      return s;
790 791 792 793 794 795
    }
  }

  return Status::OK();
}

796 797 798
Status Version::GetPropertiesOfTablesInRange(const Range* range, std::size_t n,
                                             TablePropertiesCollection* props,
                                             bool include_blob) const {
799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
  auto push_props = [&](FileMetaData* file_meta, Status* s) {
    auto fname =
        TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
                      file_meta->fd.GetPathId());
    if (props->count(fname) == 0) {
      // 1. If the table is already present in table cache, load table
      // properties from there.
      std::shared_ptr<const TableProperties> table_properties;
      *s = GetTableProperties(&table_properties, file_meta, &fname);
      if (s->ok()) {
        props->insert({fname, table_properties});
      }
    }
  };
  Status s;
奏之章 已提交
814
  for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
815
    for (decltype(n) i = 0; i < n; i++) {
816 817 818 819 820 821
      // Convert user_key into a corresponding internal key.
      InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
      InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
      std::vector<FileMetaData*> files;
      storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
                                         false);
L
linyuanjin 已提交
822
      for (size_t j = 0; j < files.size(); ++j) {
823 824 825 826 827
        auto file_meta = files[j];
        if (!file_meta->prop.is_map_sst()) {
          push_props(file_meta, &s);
          if (!s.ok()) {
            return s;
奏之章 已提交
828
          }
829 830
        } else if (!include_blob) {
          continue;
831 832 833 834 835 836 837 838 839 840 841 842 843
        }
        for (auto& dependence : file_meta->prop.dependence) {
          auto find =
              storage_info_.dependence_map_.find(dependence.file_number);
          if (find == storage_info_.dependence_map_.end()) {
            return Status::Aborted(
                "Version::GetPropertiesOfTablesInRange missing dependence sst");
          }
          if (file_meta->prop.is_map_sst()) {
            files.push_back(find->second);
          } else {
            push_props(find->second, &s);
            if (!s.ok()) {
赵明 已提交
844 845
              return s;
            }
846 847 848 849 850 851 852 853 854
          }
        }
      }
    }
  }

  return Status::OK();
}

855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875
Status Version::GetAggregatedTableProperties(
    std::shared_ptr<const TableProperties>* tp, int level) {
  TablePropertiesCollection props;
  Status s;
  if (level < 0) {
    s = GetPropertiesOfAllTables(&props);
  } else {
    s = GetPropertiesOfAllTables(&props, level);
  }
  if (!s.ok()) {
    return s;
  }

  auto* new_tp = new TableProperties();
  for (const auto& item : props) {
    new_tp->Add(*item.second);
  }
  tp->reset(new_tp);
  return Status::OK();
}

876 877
size_t Version::GetMemoryUsageByTableReaders() {
  size_t total_usage = 0;
S
sdong 已提交
878
  for (auto& file_level : storage_info_.level_files_brief_) {
879 880
    for (size_t i = 0; i < file_level.num_files; i++) {
      total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
881 882
          env_options_, cfd_->internal_comparator(), file_level.files[i].fd,
          mutable_cf_options_.prefix_extractor.get());
883 884
    }
  }
奏之章 已提交
885 886 887 888 889
  for (auto file_meta : storage_info_.LevelFiles(-1)) {
    total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
        env_options_, cfd_->internal_comparator(), file_meta->fd,
        mutable_cf_options_.prefix_extractor.get());
  }
890 891 892
  return total_usage;
}

赵明 已提交
893 894 895 896 897 898 899 900 901 902 903 904 905
double Version::GetCompactionLoad() const {
  double read_amp = storage_info_.read_amplification();
  int level_add = cfd_->ioptions()->num_levels - 1;
  int slowdown = mutable_cf_options_.level0_slowdown_writes_trigger + level_add;
  int stop = mutable_cf_options_.level0_stop_writes_trigger + level_add;
  if (read_amp < slowdown) {
    return 0;
  } else if (read_amp >= stop) {
    return 1;
  }
  return (read_amp - slowdown) / std::max(1, stop - slowdown);
}

906 907
double Version::GetGarbageCollectionLoad() const {
  double sum = 0, antiquated = 0;
Z
ZhaoMing 已提交
908
  for (auto f : storage_info_.LevelFiles(-1)) {
909
    if (!f->is_gc_permitted() || f->being_compacted) {
910 911
      continue;
    }
912
    sum += f->prop.num_entries;
913 914
    antiquated += f->num_antiquation;
  }
Z
ZhaoMing 已提交
915
  return sum > 0 ? antiquated / sum : sum;
916 917
}

918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936
void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
  assert(cf_meta);
  assert(cfd_);

  cf_meta->name = cfd_->GetName();
  cf_meta->size = 0;
  cf_meta->file_count = 0;
  cf_meta->levels.clear();

  auto* ioptions = cfd_->ioptions();
  auto* vstorage = storage_info();

  for (int level = 0; level < cfd_->NumberLevels(); level++) {
    uint64_t level_size = 0;
    cf_meta->file_count += vstorage->LevelFiles(level).size();
    std::vector<SstFileMetaData> files;
    for (const auto& file : vstorage->LevelFiles(level)) {
      uint32_t path_id = file->fd.GetPathId();
      std::string file_path;
937 938
      if (path_id < ioptions->cf_paths.size()) {
        file_path = ioptions->cf_paths[path_id].path;
939
      } else {
940 941
        assert(!ioptions->cf_paths.empty());
        file_path = ioptions->cf_paths.back().path;
942
      }
943
      files.emplace_back(SstFileMetaData{
赵明 已提交
944 945 946
          MakeTableFileName("", file->fd.GetNumber()), file_path,
          static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
          file->fd.largest_seqno, file->smallest.user_key().ToString(),
947
          file->largest.user_key().ToString(),
948
          file->stats.num_reads_sampled.load(std::memory_order_relaxed),
949
          file->being_compacted});
Z
ZhaoMing 已提交
950 951 952
      auto& back = files.back();
      back.num_entries = file->prop.num_entries;
      back.num_deletions = file->prop.num_deletions;
953 954
      level_size += file->fd.GetFileSize();
    }
赵明 已提交
955
    cf_meta->levels.emplace_back(level, level_size, std::move(files));
956 957 958 959
    cf_meta->size += level_size;
  }
}

960 961
uint64_t Version::GetSstFilesSize() {
  uint64_t sst_files_size = 0;
奏之章 已提交
962
  for (int level = -1; level < storage_info_.num_levels_; level++) {
963 964 965 966 967 968
    for (const auto& file_meta : storage_info_.LevelFiles(level)) {
      sst_files_size += file_meta->fd.GetFileSize();
    }
  }
  return sst_files_size;
}
969

S
sdong 已提交
970
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
971 972
  // Estimation will be inaccurate when:
  // (1) there exist merge keys
S
sdong 已提交
973 974
  // (2) keys are directly overwritten
  // (3) deletion on non-existing keys
C
chenchanglong 已提交
975
  if (lsm_num_entries_ <= lsm_num_deletions_) {
976 977
    return 0;
  }
C
chenchanglong 已提交
978
  return lsm_num_entries_ - lsm_num_deletions_;
S
sdong 已提交
979 980
}

981 982 983
double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
    int level) const {
  assert(level < num_levels_);
Z
ZhaoMing 已提交
984 985 986 987 988 989 990

  struct {
    std::pair<uint64_t, uint64_t> (*callback)(void* args, const FileMetaData* f,
                                              uint64_t file_number);
    void* args;
  } get_file_info;
  auto get_file_size_lambda = [this, &get_file_info](
991 992 993
                                  const FileMetaData* f,
                                  uint64_t file_number = uint64_t(
                                      -1)) -> std::pair<uint64_t, uint64_t> {
Z
ZhaoMing 已提交
994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018
    if (f == nullptr) {
      auto find = dependence_map_.find(file_number);
      if (find == dependence_map_.end()) {
        // TODO log error
        return {};
      }
      f = find->second;
    } else {
      assert(file_number == uint64_t(-1));
    }
    uint64_t file_size = f->fd.GetFileSize();
    uint64_t data_size = f->prop.raw_key_size + f->prop.raw_value_size;
    if (f->prop.is_map_sst()) {
      for (auto& dependence : f->prop.dependence) {
        auto pair = get_file_info.callback(get_file_info.args, nullptr,
                                           dependence.file_number);
        file_size += pair.first;
        data_size += pair.second;
      }
    }
    return std::make_pair(file_size, data_size);
  };
  get_file_info.callback = c_style_callback(get_file_size_lambda);
  get_file_info.args = &get_file_size_lambda;

1019 1020 1021
  uint64_t sum_file_size_bytes = 0;
  uint64_t sum_data_size_bytes = 0;
  for (auto* file_meta : files_[level]) {
Z
ZhaoMing 已提交
1022 1023 1024
    auto pair = get_file_size_lambda(file_meta);
    sum_file_size_bytes += pair.first;
    sum_data_size_bytes += pair.second;
1025 1026 1027 1028 1029 1030 1031
  }
  if (sum_file_size_bytes == 0) {
    return -1.0;
  }
  return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
}

1032 1033
void Version::AddIterators(const ReadOptions& read_options,
                           const EnvOptions& soptions,
A
Andrew Kryczka 已提交
1034 1035
                           MergeIteratorBuilder* merge_iter_builder,
                           RangeDelAggregator* range_del_agg) {
S
sdong 已提交
1036
  assert(storage_info_.finalized_);
1037

1038
  for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
A
Andrew Kryczka 已提交
1039 1040
    AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
                         range_del_agg);
1041 1042 1043 1044 1045 1046
  }
}

void Version::AddIteratorsForLevel(const ReadOptions& read_options,
                                   const EnvOptions& soptions,
                                   MergeIteratorBuilder* merge_iter_builder,
A
Andrew Kryczka 已提交
1047 1048
                                   int level,
                                   RangeDelAggregator* range_del_agg) {
1049 1050 1051 1052 1053 1054
  assert(storage_info_.finalized_);
  if (level >= storage_info_.num_non_empty_levels()) {
    // This is an empty level
    return;
  } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
    // No files in this level
S
sdong 已提交
1055 1056 1057
    return;
  }

1058 1059
  bool should_sample = should_sample_file_read();

1060
  auto* arena = merge_iter_builder->GetArena();
Z
ZhaoMing 已提交
1061
  if (level <= 0 || storage_info_.LevelFilesBrief(level).num_files == 1) {
奏之章 已提交
1062 1063
    // Merge all level zero files together since they may overlap.
    // Or there is only one file ...
奏之章 已提交
1064 1065
    for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files;
         i++) {
奏之章 已提交
1066
      const auto& file = storage_info_.LevelFilesBrief(level).files[i];
1067
      merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
奏之章 已提交
1068
          read_options, soptions, cfd_->internal_comparator(),
Z
ZhaoMing 已提交
1069
          *file.file_metadata, storage_info_.dependence_map(), range_del_agg,
奏之章 已提交
1070
          mutable_cf_options_.prefix_extractor.get(), nullptr,
奏之章 已提交
1071
          cfd_->internal_stats()->GetFileReadHist(level), false, arena,
1072
          false /* skip_filters */, 0 /* level */));
1073
    }
1074 1075 1076 1077 1078 1079 1080 1081 1082
    if (should_sample) {
      // Count ones for every L0 files. This is done per iterator creation
      // rather than Seek(), while files in other levels are recored per seek.
      // If users execute one range query per iterator, there may be some
      // discrepancy here.
      for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
        sample_file_read_inc(meta);
      }
    }
1083
  } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1084 1085 1086
    // For levels > 0, we can use a concatenating iterator that sequentially
    // walks through the non-overlapping files in the level, opening them
    // lazily.
1087 1088 1089
    auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
    merge_iter_builder->AddIterator(new (mem) LevelIterator(
        cfd_->table_cache(), read_options, soptions,
1090
        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
Z
ZhaoMing 已提交
1091
        storage_info_.dependence_map(),
奏之章 已提交
1092 1093
        mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
        cfd_->internal_stats()->GetFileReadHist(level),
1094 1095
        false /* for_compaction */, IsFilterSkipped(level), level,
        range_del_agg));
1096 1097 1098
  }
}

1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
                                         const EnvOptions& env_options,
                                         const Slice& smallest_user_key,
                                         const Slice& largest_user_key,
                                         int level, bool* overlap) {
  assert(storage_info_.finalized_);

  auto icmp = cfd_->internal_comparator();
  auto ucmp = icmp.user_comparator();

  Arena arena;
  Status status;
1111 1112
  ReadRangeDelAggregator range_del_agg(&icmp,
                                       kMaxSequenceNumber /* upper_bound */);
1113 1114 1115

  *overlap = false;

奏之章 已提交
1116 1117 1118 1119
  if (level == 0 || storage_info_.LevelFilesBrief(level).num_files == 1) {
    for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files;
         i++) {
      const auto file = &storage_info_.LevelFilesBrief(level).files[i];
1120 1121 1122 1123 1124
      if (AfterFile(ucmp, &smallest_user_key, file) ||
          BeforeFile(ucmp, &largest_user_key, file)) {
        continue;
      }
      ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
奏之章 已提交
1125
          read_options, env_options, cfd_->internal_comparator(),
Z
ZhaoMing 已提交
1126
          *file->file_metadata, storage_info_.dependence_map(), &range_del_agg,
奏之章 已提交
1127 1128
          mutable_cf_options_.prefix_extractor.get(), nullptr,
          cfd_->internal_stats()->GetFileReadHist(level), false, &arena,
1129
          false /* skip_filters */, 0 /* level */));
赵明 已提交
1130 1131
      status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key,
                                   iter.get(), overlap);
1132 1133 1134 1135 1136 1137 1138 1139 1140
      if (!status.ok() || *overlap) {
        break;
      }
    }
  } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
    auto mem = arena.AllocateAligned(sizeof(LevelIterator));
    ScopedArenaIterator iter(new (mem) LevelIterator(
        cfd_->table_cache(), read_options, env_options,
        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
Z
ZhaoMing 已提交
1141
        storage_info_.dependence_map(),
奏之章 已提交
1142 1143
        mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
        cfd_->internal_stats()->GetFileReadHist(level),
1144 1145
        false /* for_compaction */, IsFilterSkipped(level), level,
        &range_del_agg));
赵明 已提交
1146 1147
    status = OverlapWithIterator(ucmp, smallest_user_key, largest_user_key,
                                 iter.get(), overlap);
1148 1149 1150 1151 1152 1153 1154 1155 1156
  }

  if (status.ok() && *overlap == false &&
      range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
    *overlap = true;
  }
  return status;
}

S
sdong 已提交
1157 1158
VersionStorageInfo::VersionStorageInfo(
    const InternalKeyComparator* internal_comparator,
I
Igor Canadi 已提交
1159
    const Comparator* user_comparator, int levels,
Z
ZhaoMing 已提交
1160
    CompactionStyle compaction_style, bool _force_consistency_checks)
S
sdong 已提交
1161 1162
    : internal_comparator_(internal_comparator),
      user_comparator_(user_comparator),
1163
      // cfd is nullptr if Version is dummy
I
Igor Canadi 已提交
1164
      num_levels_(levels),
S
sdong 已提交
1165
      num_non_empty_levels_(0),
S
sdong 已提交
1166 1167
      file_indexer_(user_comparator),
      compaction_style_(compaction_style),
Z
ZhaoMing 已提交
1168
      files_(new std::vector<FileMetaData*>[num_levels_ + 1]),
S
sdong 已提交
1169
      base_level_(num_levels_ == 1 ? -1 : 1),
1170
      level_multiplier_(0.0),
1171
      files_by_compaction_pri_(num_levels_),
1172
      level0_non_overlapping_(false),
1173 1174 1175
      next_file_to_compact_by_size_(num_levels_),
      compaction_score_(num_levels_),
      compaction_level_(num_levels_),
S
sdong 已提交
1176
      l0_delay_trigger_count_(0),
C
chenchanglong 已提交
1177 1178
      blob_file_size_(0),
      blob_num_entries_(0),
C
WIP  
chenchanglong 已提交
1179
      blob_num_deletions_(0),
C
chenchanglong 已提交
1180
      blob_num_antiquation_(0),
C
chenchanglong 已提交
1181
      lsm_file_size_(0),
C
chenchanglong 已提交
1182 1183
      lsm_num_entries_(0),
      lsm_num_deletions_(0),
1184
      estimated_compaction_needed_bytes_(0),
陈常龙 已提交
1185
      total_garbage_ratio_(0),
1186
      finalized_(false),
C
chenchanglong 已提交
1187 1188
      is_pick_compaction_fail(false),
      is_pick_garbage_collection_fail(false),
1189
      force_consistency_checks_(_force_consistency_checks) {
赵明 已提交
1190
  ++files_;  // level -1 used for dependence files
1191
}
1192

I
Igor Canadi 已提交
1193
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1194 1195 1196
                 const EnvOptions& env_opt,
                 const MutableCFOptions mutable_cf_options,
                 uint64_t version_number)
1197 1198
    : env_(vset->env_),
      cfd_(column_family_data),
I
Igor Canadi 已提交
1199 1200 1201 1202 1203 1204
      info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
      db_statistics_((cfd_ == nullptr) ? nullptr
                                       : cfd_->ioptions()->statistics),
      table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
      merge_operator_((cfd_ == nullptr) ? nullptr
                                        : cfd_->ioptions()->merge_operator),
1205 1206 1207 1208 1209 1210 1211
      storage_info_(
          (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
          (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
          cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
          cfd_ == nullptr ? kCompactionStyleLevel
                          : cfd_->ioptions()->compaction_style,
          cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
S
sdong 已提交
1212 1213 1214 1215
      vset_(vset),
      next_(this),
      prev_(this),
      refs_(0),
1216
      env_options_(env_opt),
1217
      mutable_cf_options_(mutable_cf_options),
S
sdong 已提交
1218 1219
      version_number_(version_number) {}

Z
ZhaoMing 已提交
1220
Status Version::fetch_buffer(LazyBuffer* buffer) const {
Z
ZhaoMing 已提交
1221 1222 1223 1224
  auto context = get_context(buffer);
  Slice user_key(reinterpret_cast<const char*>(context->data[0]),
                 context->data[1]);
  uint64_t sequence = context->data[2];
赵明 已提交
1225
  auto pair = *reinterpret_cast<DependenceMap::value_type*>(context->data[3]);
Z
ZhaoMing 已提交
1226
  bool value_found = false;
Z
ZhaoMing 已提交
1227
  SequenceNumber context_seq;
Z
ZhaoMing 已提交
1228 1229
  GetContext get_context(cfd_->internal_comparator().user_comparator(), nullptr,
                         cfd_->ioptions()->info_log, db_statistics_,
Z
ZhaoMing 已提交
1230
                         GetContext::kNotFound, user_key, buffer, &value_found,
Z
ZhaoMing 已提交
1231
                         nullptr, nullptr, nullptr, env_, &context_seq);
Z
ZhaoMing 已提交
1232
  IterKey iter_key;
Z
ZhaoMing 已提交
1233
  iter_key.SetInternalKey(user_key, sequence, kValueTypeForSeek);
赵明 已提交
1234 1235 1236 1237
  auto s = table_cache_->Get(
      ReadOptions(), cfd_->internal_comparator(), *pair.second,
      storage_info_.dependence_map(), iter_key.GetInternalKey(), &get_context,
      mutable_cf_options_.prefix_extractor.get(), nullptr, true);
Z
ZhaoMing 已提交
1238 1239 1240
  if (!s.ok()) {
    return s;
  }
Z
ZhaoMing 已提交
1241 1242
  if (context_seq != sequence || (get_context.State() != GetContext::kFound &&
                                  get_context.State() != GetContext::kMerge)) {
Z
ZhaoMing 已提交
1243 1244 1245
    if (get_context.State() == GetContext::kCorrupt) {
      return std::move(get_context).CorruptReason();
    } else {
Z
ZhaoMing 已提交
1246 1247
      char buf[128];
      snprintf(buf, sizeof buf,
Z
ZhaoMing 已提交
1248
               "file number = %" PRIu64 "(%" PRIu64 "), sequence = %" PRIu64,
Z
ZhaoMing 已提交
1249
               pair.second->fd.GetNumber(), pair.first, sequence);
Z
ZhaoMing 已提交
1250
      return Status::Corruption("Separate value missing", buf);
Z
ZhaoMing 已提交
1251
    }
Z
ZhaoMing 已提交
1252
  }
Z
ZhaoMing 已提交
1253
  assert(buffer->file_number() == pair.second->fd.GetNumber());
Z
ZhaoMing 已提交
1254
  return Status::OK();
Z
ZhaoMing 已提交
1255 1256
}

Z
ZhaoMing 已提交
1257
void Version::TransToCombined(const Slice& user_key, uint64_t sequence,
Z
ZhaoMing 已提交
1258 1259
                              LazyBuffer& value) const {
  auto s = value.fetch();
Z
ZhaoMing 已提交
1260
  if (!s.ok()) {
Z
ZhaoMing 已提交
1261
    value.reset(std::move(s));
Z
ZhaoMing 已提交
1262 1263
    return;
  }
Z
ZhaoMing 已提交
1264
  uint64_t file_number = SeparateHelper::DecodeFileNumber(value.slice());
Z
ZhaoMing 已提交
1265
  auto& dependence_map = storage_info_.dependence_map();
Z
ZhaoMing 已提交
1266 1267 1268 1269
  auto find = dependence_map.find(file_number);
  if (find == dependence_map.end()) {
    value.reset(Status::Corruption("Separate value dependence missing"));
  } else {
赵明 已提交
1270 1271 1272
    value.reset(this,
                {reinterpret_cast<uint64_t>(user_key.data()), user_key.size(),
                 sequence, reinterpret_cast<uint64_t>(&*find)},
Z
ZhaoMing 已提交
1273
                Slice::Invalid(), find->second->fd.GetNumber());
Z
ZhaoMing 已提交
1274
  }
Z
ZhaoMing 已提交
1275 1276
}

Z
ZhaoMing 已提交
1277
void Version::Get(const ReadOptions& read_options, const Slice& user_key,
Z
ZhaoMing 已提交
1278
                  const LookupKey& k, LazyBuffer* value, Status* status,
A
Andrew Kryczka 已提交
1279
                  MergeContext* merge_context,
1280
                  SequenceNumber* max_covering_tombstone_seq, bool* value_found,
Z
ZhaoMing 已提交
1281 1282
                  bool* key_exists, SequenceNumber* seq,
                  ReadCallback* callback) {
1283
  Slice ikey = k.internal_key();
1284 1285

  assert(status->ok() || status->IsMergeInProgress());
1286

1287 1288 1289 1290 1291
  if (key_exists != nullptr) {
    // will falsify below if not found
    *key_exists = true;
  }

S
sdong 已提交
1292
  GetContext get_context(
S
sdong 已提交
1293
      user_comparator(), merge_operator_, info_log_, db_statistics_,
S
sdong 已提交
1294
      status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
Z
ZhaoMing 已提交
1295 1296
      value, value_found, merge_context, this, max_covering_tombstone_seq,
      this->env_, seq, callback);
1297

S
sdong 已提交
1298 1299 1300 1301
  FilePicker fp(
      storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
      storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
      user_comparator(), internal_comparator());
1302
  FdWithKeyRange* f = fp.GetNextFile();
1303

1304
  while (f != nullptr) {
赵明 已提交
1305
    if (get_context.is_finished()) {
1306 1307 1308
      // The remaining files we look at will only contain covered keys, so we
      // stop here.
      break;
1309
    }
1310 1311 1312
    if (get_context.sample()) {
      sample_file_read_inc(f->file_metadata);
    }
1313

1314 1315 1316 1317
    bool timer_enabled =
        GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
        get_perf_context()->per_level_perf_context_enabled;
    StopWatchNano timer(env_, timer_enabled /* auto_start */);
1318
    *status = table_cache_->Get(
Z
ZhaoMing 已提交
1319 1320
        read_options, *internal_comparator(), *f->file_metadata,
        storage_info_.dependence_map(), ikey, &get_context,
奏之章 已提交
1321
        mutable_cf_options_.prefix_extractor.get(),
1322
        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1323
        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1324 1325
                        fp.IsHitFileLastInLevel()),
        fp.GetCurrentLevel());
1326
    // TODO: examine the behavior for corrupted key
1327 1328 1329 1330
    if (timer_enabled) {
      PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
                                fp.GetCurrentLevel());
    }
1331 1332
    if (!status->ok()) {
      return;
1333
    }
1334

1335 1336
    // report the counters before returning
    if (get_context.State() != GetContext::kNotFound &&
1337 1338 1339
        get_context.State() != GetContext::kMerge &&
        db_statistics_ != nullptr) {
      get_context.ReportCounters();
1340
    }
1341 1342 1343 1344
    switch (get_context.State()) {
      case GetContext::kNotFound:
        // Keep searching in other files
        break;
1345
      case GetContext::kMerge:
1346
        // TODO: update per-level perfcontext user_key_return_count for kMerge
1347
        break;
1348
      case GetContext::kFound:
1349 1350 1351 1352 1353 1354 1355
        if (fp.GetHitFileLevel() == 0) {
          RecordTick(db_statistics_, GET_HIT_L0);
        } else if (fp.GetHitFileLevel() == 1) {
          RecordTick(db_statistics_, GET_HIT_L1);
        } else if (fp.GetHitFileLevel() >= 2) {
          RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
        }
赵明 已提交
1356 1357
        PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
                                  fp.GetHitFileLevel());
1358
        return;
1359 1360 1361
      case GetContext::kDeleted:
        // Use empty error message for speed
        *status = Status::NotFound();
1362
        return;
1363
      case GetContext::kCorrupt:
Z
ZhaoMing 已提交
1364
        *status = std::move(get_context).CorruptReason();
1365
        return;
1366
    }
1367
    f = fp.GetNextFile();
1368 1369
  }

1370 1371
  if (db_statistics_ != nullptr) {
    get_context.ReportCounters();
1372
  }
1373
  if (GetContext::kMerge == get_context.State()) {
1374
    if (!merge_operator_) {
奏之章 已提交
1375
      *status = Status::InvalidArgument(
1376 1377 1378
          "merge_operator is not properly initialized.");
      return;
    }
1379 1380
    // merge_operands are in saver and we hit the beginning of the key history
    // do a final merge of nullptr and operands;
M
Maysam Yabandeh 已提交
1381
    *status = MergeHelper::TimedFullMerge(
赵明 已提交
1382 1383
        merge_operator_, user_key, nullptr, merge_context->GetOperands(), value,
        info_log_, db_statistics_, env_, true);
Z
ZhaoMing 已提交
1384
    if (status->ok()) {
Z
ZhaoMing 已提交
1385
      value->pin();
Z
ZhaoMing 已提交
1386
    }
1387
  } else {
1388 1389 1390
    if (key_exists != nullptr) {
      *key_exists = false;
    }
赵明 已提交
1391
    *status = Status::NotFound();  // Use an empty error message for speed
1392
  }
1393 1394
}

Z
ZhaoMing 已提交
1395
void Version::GetKey(const Slice& user_key, const Slice& ikey, Status* status,
Z
ZhaoMing 已提交
1396
                     ValueType* type, SequenceNumber* seq, LazyBuffer* value) {
Z
ZhaoMing 已提交
1397 1398 1399
  bool value_found;
  GetContext get_context(cfd_->internal_comparator().user_comparator(), nullptr,
                         cfd_->ioptions()->info_log, db_statistics_,
Z
ZhaoMing 已提交
1400
                         GetContext::kNotFound, user_key, value, &value_found,
Z
ZhaoMing 已提交
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
                         nullptr, nullptr, nullptr, env_, seq);
  ReadOptions options;

  FilePicker fp(
      storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
      storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
      user_comparator(), internal_comparator());
  FdWithKeyRange* f = fp.GetNextFile();

  while (f != nullptr) {
赵明 已提交
1411 1412 1413 1414 1415
    *status =
        table_cache_->Get(options, *internal_comparator(), *f->file_metadata,
                          storage_info_.dependence_map(), ikey, &get_context,
                          mutable_cf_options_.prefix_extractor.get(), nullptr,
                          true, fp.GetCurrentLevel());
Z
ZhaoMing 已提交
1416 1417 1418 1419 1420 1421 1422
    if (!status->ok()) {
      return;
    }
    switch (get_context.State()) {
      case GetContext::kNotFound:
        break;
      case GetContext::kMerge:
Z
ZhaoMing 已提交
1423
        *type = get_context.is_index() ? kTypeMergeIndex : kTypeMerge;
Z
ZhaoMing 已提交
1424 1425
        return;
      case GetContext::kFound:
Z
ZhaoMing 已提交
1426
        *type = get_context.is_index() ? kTypeValueIndex : kTypeValue;
Z
ZhaoMing 已提交
1427 1428 1429 1430 1431
        return;
      case GetContext::kDeleted:
        *status = Status::NotFound();
        return;
      case GetContext::kCorrupt:
Z
ZhaoMing 已提交
1432
        *status = std::move(get_context).CorruptReason();
Z
ZhaoMing 已提交
1433 1434 1435 1436 1437 1438 1439
        return;
    }
    f = fp.GetNextFile();
  }
  *status = Status::NotFound();
}

1440
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
1441 1442
  // Reaching the bottom level implies misses at all upper levels, so we'll
  // skip checking the filters when we predict a hit.
1443 1444
  return cfd_->ioptions()->optimize_filters_for_hits &&
         (level > 0 || is_file_last_in_level) &&
1445 1446 1447
         level == storage_info_.num_non_empty_levels() - 1;
}

S
sdong 已提交
1448
void VersionStorageInfo::GenerateLevelFilesBrief() {
1449
  level_files_brief_.resize(num_non_empty_levels_);
1450
  for (int level = 0; level < num_non_empty_levels_; level++) {
赵明 已提交
1451 1452
    DoGenerateLevelFilesBrief(&level_files_brief_[level], files_[level],
                              &arena_);
1453 1454 1455
  }
}

Z
ZhaoMing 已提交
1456 1457
void Version::PrepareApply(const MutableCFOptions& mutable_cf_options) {
  storage_info_.ComputeCompensatedSizes();
S
sdong 已提交
1458
  storage_info_.UpdateNumNonEmptyLevels();
1459
  storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
Y
Yi Wu 已提交
1460
  storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
S
sdong 已提交
1461 1462
  storage_info_.GenerateFileIndexer();
  storage_info_.GenerateLevelFilesBrief();
1463
  storage_info_.GenerateLevel0NonOverlapping();
1464
  storage_info_.GenerateBottommostFiles();
1465 1466
}

S
sdong 已提交
1467
void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
Z
ZhaoMing 已提交
1468
  if (file_meta->is_gc_forbidden()) {
C
chenchanglong 已提交
1469
    lsm_file_size_ += file_meta->fd.GetFileSize();
C
chenchanglong 已提交
1470 1471
    lsm_num_entries_ += file_meta->prop.num_entries;
    lsm_num_deletions_ += file_meta->prop.num_deletions;
C
chenchanglong 已提交
1472
  } else {
C
chenchanglong 已提交
1473 1474
    blob_file_size_ += file_meta->fd.GetFileSize();
    blob_num_entries_ += file_meta->prop.num_entries;
C
WIP  
chenchanglong 已提交
1475
    blob_num_deletions_ += file_meta->prop.num_deletions;
C
chenchanglong 已提交
1476
    blob_num_antiquation_ += file_meta->num_antiquation;
C
chenchanglong 已提交
1477
  }
S
sdong 已提交
1478 1479 1480
}

void VersionStorageInfo::ComputeCompensatedSizes() {
1481 1482
  uint64_t average_value_size = GetAverageValueSize();

Z
ZhaoMing 已提交
1483 1484 1485 1486 1487
  struct {
    uint64_t (*callback)(void* args, const FileMetaData* f,
                         uint64_t file_number, uint64_t entry_count);
    void* args;
  } compute_compensated_size;
1488 1489 1490 1491
  auto compute_compensated_size_lambda =
      [this, average_value_size, &compute_compensated_size](
          const FileMetaData* f, uint64_t file_number = uint64_t(-1),
          uint64_t entry_count = 0) -> uint64_t {
Z
ZhaoMing 已提交
1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
    if (f == nullptr) {
      auto find = dependence_map_.find(file_number);
      if (find == dependence_map_.end()) {
        // TODO log error
        return 0;
      }
      f = find->second;
    } else {
      assert(file_number == uint64_t(-1));
    }
    uint64_t file_size = 0;
    if (f->prop.is_map_sst()) {
      for (auto& dependence : f->prop.dependence) {
        file_size += compute_compensated_size.callback(
            compute_compensated_size.args, nullptr, dependence.file_number,
            dependence.entry_count);
      }
    } else {
陈常龙 已提交
1510
      file_size =
1511
          FileSizeWithBlob(f) + f->prop.num_deletions * average_value_size;
Z
ZhaoMing 已提交
1512 1513 1514 1515 1516 1517 1518 1519
    }
    return entry_count == 0 ? file_size
                            : file_size * entry_count /
                                  std::max<uint64_t>(1, f->prop.num_entries);
  };
  compute_compensated_size.callback =
      c_style_callback(compute_compensated_size_lambda);
  compute_compensated_size.args = &compute_compensated_size_lambda;
1520

1521
  // compute the compensated size
Z
ZhaoMing 已提交
1522
  for (int level = -1; level < num_levels_; level++) {
1523
    for (auto* file_meta : files_[level]) {
1524
      // Here we only compute compensated_file_size for those file_meta
I
Igor Canadi 已提交
1525 1526 1527
      // which compensated_file_size is uninitialized (== 0). This is true only
      // for files that have been created right now and no other thread has
      // access to them. That's why we can safely mutate compensated_file_size.
1528
      if (file_meta->compensated_file_size == 0) {
1529 1530
        file_meta->compensated_file_size =
            compute_compensated_size_lambda(file_meta);
1531
      }
1532 1533 1534 1535
    }
  }
}

S
sdong 已提交
1536 1537
int VersionStorageInfo::MaxInputLevel() const {
  if (compaction_style_ == kCompactionStyleLevel) {
1538
    return num_levels() - 2;
S
sdong 已提交
1539 1540 1541 1542
  }
  return 0;
}

1543 1544 1545 1546 1547 1548 1549 1550
int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
  if (allow_ingest_behind) {
    assert(num_levels() > 1);
    return num_levels() - 2;
  }
  return num_levels() - 1;
}

1551 1552 1553 1554
void VersionStorageInfo::EstimateCompactionBytesNeeded(
    const MutableCFOptions& mutable_cf_options) {
  // Only implemented for level-based compaction
  if (compaction_style_ != kCompactionStyleLevel) {
1555
    estimated_compaction_needed_bytes_ = 0;
1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568
    return;
  }

  // Start from Level 0, if level 0 qualifies compaction to level 1,
  // we estimate the size of compaction.
  // Then we move on to the next level and see whether it qualifies compaction
  // to the next level. The size of the level is estimated as the actual size
  // on the level plus the input bytes from the previous level if there is any.
  // If it exceeds, take the exceeded bytes as compaction input and add the size
  // of the compaction size to tatal size.
  // We keep doing it to Level 2, 3, etc, until the last level and return the
  // accumulated bytes.

I
Igor Canadi 已提交
1569
  uint64_t bytes_compact_to_next_level = 0;
1570 1571 1572 1573
  uint64_t level_size = 0;
  for (auto* f : files_[0]) {
    level_size += f->fd.GetFileSize();
  }
1574 1575
  // Level 0
  bool level0_compact_triggered = false;
1576 1577 1578
  if (static_cast<int>(files_[0].size()) >=
          mutable_cf_options.level0_file_num_compaction_trigger ||
      level_size >= mutable_cf_options.max_bytes_for_level_base) {
1579
    level0_compact_triggered = true;
1580 1581
    estimated_compaction_needed_bytes_ = level_size;
    bytes_compact_to_next_level = level_size;
1582 1583 1584 1585 1586
  } else {
    estimated_compaction_needed_bytes_ = 0;
  }

  // Level 1 and up.
1587
  uint64_t bytes_next_level = 0;
1588
  for (int level = base_level(); level <= MaxInputLevel(); level++) {
1589
    level_size = 0;
1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
    if (bytes_next_level > 0) {
#ifndef NDEBUG
      uint64_t level_size2 = 0;
      for (auto* f : files_[level]) {
        level_size2 += f->fd.GetFileSize();
      }
      assert(level_size2 == bytes_next_level);
#endif
      level_size = bytes_next_level;
      bytes_next_level = 0;
    } else {
      for (auto* f : files_[level]) {
        level_size += f->fd.GetFileSize();
      }
1604 1605 1606 1607 1608 1609 1610 1611
    }
    if (level == base_level() && level0_compact_triggered) {
      // Add base level size to compaction if level0 compaction triggered.
      estimated_compaction_needed_bytes_ += level_size;
    }
    // Add size added by previous compaction
    level_size += bytes_compact_to_next_level;
    bytes_compact_to_next_level = 0;
I
Igor Canadi 已提交
1612
    uint64_t level_target = MaxBytesForLevel(level);
1613 1614
    if (level_size > level_target) {
      bytes_compact_to_next_level = level_size - level_target;
1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
      // Estimate the actual compaction fan-out ratio as size ratio between
      // the two levels.

      assert(bytes_next_level == 0);
      if (level + 1 < num_levels_) {
        for (auto* f : files_[level + 1]) {
          bytes_next_level += f->fd.GetFileSize();
        }
      }
      if (bytes_next_level > 0) {
        assert(level_size > 0);
        estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
            static_cast<double>(bytes_compact_to_next_level) *
            (static_cast<double>(bytes_next_level) /
                 static_cast<double>(level_size) +
             1));
      }
1632 1633 1634 1635
    }
  }
}

S
Sagar Vemuri 已提交
1636 1637
namespace {
uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
1638
                                 const MutableCFOptions& mutable_cf_options,
S
Sagar Vemuri 已提交
1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651
                                 const std::vector<FileMetaData*>& files) {
  uint32_t ttl_expired_files_count = 0;

  int64_t _current_time;
  auto status = ioptions.env->GetCurrentTime(&_current_time);
  if (status.ok()) {
    const uint64_t current_time = static_cast<uint64_t>(_current_time);
    for (auto f : files) {
      if (!f->being_compacted && f->fd.table_reader != nullptr &&
          f->fd.table_reader->GetTableProperties() != nullptr) {
        auto creation_time =
            f->fd.table_reader->GetTableProperties()->creation_time;
        if (creation_time > 0 &&
1652 1653
            creation_time < (current_time -
                             mutable_cf_options.compaction_options_fifo.ttl)) {
S
Sagar Vemuri 已提交
1654 1655 1656 1657 1658 1659 1660 1661 1662
          ttl_expired_files_count++;
        }
      }
    }
  }
  return ttl_expired_files_count;
}
}  // anonymous namespace

S
sdong 已提交
1663
void VersionStorageInfo::ComputeCompactionScore(
Y
Yi Wu 已提交
1664
    const ImmutableCFOptions& immutable_cf_options,
1665
    const MutableCFOptions& mutable_cf_options) {
S
sdong 已提交
1666
  for (int level = 0; level <= MaxInputLevel(); level++) {
1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679
    double score;
    if (level == 0) {
      // We treat level-0 specially by bounding the number of files
      // instead of number of bytes for two reasons:
      //
      // (1) With larger write-buffer sizes, it is nice not to do too
      // many level-0 compactions.
      //
      // (2) The files in level-0 are merged on every read and
      // therefore we wish to avoid too many files when the individual
      // file size is small (perhaps because of a small write-buffer
      // setting, or very high compression ratios, or lots of
      // overwrites/deletions).
S
sdong 已提交
1680
      int num_sorted_runs = 0;
I
Igor Canadi 已提交
1681
      uint64_t total_size = 0;
1682 1683 1684
      for (auto* f : files_[level]) {
        if (!f->being_compacted) {
          total_size += f->compensated_file_size;
S
sdong 已提交
1685
          num_sorted_runs++;
1686 1687
        }
      }
S
sdong 已提交
1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698
      if (compaction_style_ == kCompactionStyleUniversal) {
        // For universal compaction, we use level0 score to indicate
        // compaction score for the whole DB. Adding other levels as if
        // they are L0 files.
        for (int i = 1; i < num_levels(); i++) {
          if (!files_[i].empty() && !files_[i][0]->being_compacted) {
            num_sorted_runs++;
          }
        }
      }

S
sdong 已提交
1699
      if (compaction_style_ == kCompactionStyleFIFO) {
1700 1701 1702
        score = static_cast<double>(total_size) /
                mutable_cf_options.compaction_options_fifo.max_table_files_size;
        if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
1703 1704 1705 1706 1707
          score = std::max(
              static_cast<double>(num_sorted_runs) /
                  mutable_cf_options.level0_file_num_compaction_trigger,
              score);
        }
1708 1709 1710 1711 1712
        if (mutable_cf_options.compaction_options_fifo.ttl > 0) {
          score = std::max(
              static_cast<double>(GetExpiredTtlFilesCount(
                  immutable_cf_options, mutable_cf_options, files_[level])),
              score);
S
Sagar Vemuri 已提交
1713
        }
1714

1715
      } else {
S
sdong 已提交
1716
        score = static_cast<double>(num_sorted_runs) /
1717
                mutable_cf_options.level0_file_num_compaction_trigger;
A
Andrew Kryczka 已提交
1718 1719 1720 1721
        if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
          // Level-based involves L0->L0 compactions that can lead to oversized
          // L0 files. Take into account size as well to avoid later giant
          // compactions to the base level.
赵明 已提交
1722 1723 1724
          score =
              std::max(score, static_cast<double>(total_size) /
                                  mutable_cf_options.max_bytes_for_level_base);
A
Andrew Kryczka 已提交
1725
        }
1726 1727 1728
      }
    } else {
      // Compute the ratio of current size to size limit.
I
Igor Canadi 已提交
1729 1730
      uint64_t level_bytes_no_compacting = 0;
      for (auto f : files_[level]) {
1731
        if (!f->being_compacted) {
I
Igor Canadi 已提交
1732 1733 1734 1735
          level_bytes_no_compacting += f->compensated_file_size;
        }
      }
      score = static_cast<double>(level_bytes_no_compacting) /
1736
              MaxBytesForLevel(level);
1737 1738 1739 1740 1741 1742 1743
    }
    compaction_level_[level] = level;
    compaction_score_[level] = score;
  }

  // sort all the levels based on their score. Higher scores get listed
  // first. Use bubble sort because the number of entries are small.
1744 1745
  for (int i = 0; i < num_levels() - 2; i++) {
    for (int j = i + 1; j < num_levels() - 1; j++) {
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755
      if (compaction_score_[i] < compaction_score_[j]) {
        double score = compaction_score_[i];
        int level = compaction_level_[i];
        compaction_score_[i] = compaction_score_[j];
        compaction_level_[i] = compaction_level_[j];
        compaction_score_[j] = score;
        compaction_level_[j] = level;
      }
    }
  }
陈常龙 已提交
1756 1757 1758 1759

  // Calculate total_garbage_ratio_ as criterion for NeedsGarbageCollection().
  double num_entries = 0;
  for (auto& f : LevelFiles(-1)) {
Z
ZhaoMing 已提交
1760
    if (f->is_gc_forbidden()) {
M
WIP  
maguoshun 已提交
1761 1762
      continue;
    }
陈常龙 已提交
1763 1764 1765
    total_garbage_ratio_ += f->num_antiquation;
    num_entries += f->prop.num_entries;
  }
赵明 已提交
1766
  total_garbage_ratio_ /= std::max<double>(1, num_entries);
陈常龙 已提交
1767

C
chenchanglong 已提交
1768
  is_pick_compaction_fail = false;
1769
  ComputeFilesMarkedForCompaction();
1770
  ComputeBottommostFilesMarkedForCompaction();
1771 1772
  if (mutable_cf_options.ttl > 0) {
    ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
S
Sagar Vemuri 已提交
1773
  }
1774
  EstimateCompactionBytesNeeded(mutable_cf_options);
1775 1776 1777 1778
}

void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
  files_marked_for_compaction_.clear();
1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791
  int last_qualify_level = 0;

  // Do not include files from the last level with data
  // If table properties collector suggests a file on the last level,
  // we should not move it to a new level.
  for (int level = num_levels() - 1; level >= 1; level--) {
    if (!files_[level].empty()) {
      last_qualify_level = level - 1;
      break;
    }
  }

  for (int level = 0; level <= last_qualify_level; level++) {
1792 1793 1794 1795 1796 1797
    for (auto* f : files_[level]) {
      if (!f->being_compacted && f->marked_for_compaction) {
        files_marked_for_compaction_.emplace_back(level, f);
      }
    }
  }
1798 1799
}

S
Sagar Vemuri 已提交
1800
void VersionStorageInfo::ComputeExpiredTtlFiles(
1801 1802
    const ImmutableCFOptions& ioptions, const uint64_t ttl) {
  assert(ttl > 0);
S
Sagar Vemuri 已提交
1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818

  expired_ttl_files_.clear();

  int64_t _current_time;
  auto status = ioptions.env->GetCurrentTime(&_current_time);
  if (!status.ok()) {
    return;
  }
  const uint64_t current_time = static_cast<uint64_t>(_current_time);

  for (int level = 0; level < num_levels() - 1; level++) {
    for (auto f : files_[level]) {
      if (!f->being_compacted && f->fd.table_reader != nullptr &&
          f->fd.table_reader->GetTableProperties() != nullptr) {
        auto creation_time =
            f->fd.table_reader->GetTableProperties()->creation_time;
1819
        if (creation_time > 0 && creation_time < (current_time - ttl)) {
S
Sagar Vemuri 已提交
1820 1821 1822 1823 1824 1825 1826
          expired_ttl_files_.emplace_back(level, f);
        }
      }
    }
  }
}

1827
namespace {
1828 1829 1830

// used to sort files by size
struct Fsize {
1831
  size_t index;
1832 1833 1834
  FileMetaData* file;
};

赵明 已提交
1835
}  // anonymous namespace
1836

赵明 已提交
1837 1838 1839
void VersionStorageInfo::AddFile(int level, FileMetaData* f,
                                 bool (*exists)(void*, uint64_t),
                                 void* exists_args, Logger* info_log) {
S
sdong 已提交
1840
  auto* level_files = &files_[level];
C
chenchanglong 已提交
1841
// Must not overlap
1842
#ifndef NDEBUG
奏之章 已提交
1843
  if (level > 0 && !level_files->empty() &&
1844 1845 1846
      internal_comparator_->Compare(level_files->back()->largest,
                                    f->smallest) >= 0) {
    auto* f2 = level_files->back();
1847
    if (info_log != nullptr) {
1848 1849 1850 1851
      Error(info_log,
            "Adding new file %" PRIu64
            " range (%s, %s) to level %d but overlapping "
            "with existing file %" PRIu64 " %s %s",
奏之章 已提交
1852 1853 1854 1855
            f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
            f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
            f2->smallest.DebugString(true).c_str(),
            f2->largest.DebugString(true).c_str());
1856 1857 1858 1859
      LogFlush(info_log);
    }
    assert(false);
  }
1860 1861
#else
  (void)info_log;
1862
#endif
S
sdong 已提交
1863
  level_files->push_back(f);
Z
ZhaoMing 已提交
1864
  dependence_map_.emplace(f->fd.GetNumber(), f);
奏之章 已提交
1865
  if (level == -1) {
陈常龙 已提交
1866 1867 1868
    // Function exists indicates if subject file were relying by other files.
    // When this function is not set, dependence_map_ will update with subject
    // file's property.
赵明 已提交
1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880
    if (exists == nullptr) {
      for (auto file_number : f->prop.inheritance_chain) {
        assert(dependence_map_.count(file_number) == 0);
        dependence_map_.emplace(file_number, f);
      }
    } else {
      for (auto file_number : f->prop.inheritance_chain) {
        assert(dependence_map_.count(file_number) == 0);
        if (exists(exists_args, file_number)) {
          dependence_map_.emplace(file_number, f);
        }
      }
Z
ZhaoMing 已提交
1881 1882
    }
  } else {
赵明 已提交
1883
    if (f->prop.is_map_sst()) {
赵明 已提交
1884
      space_amplification_[level] |= kHasMapSst;
Z
ZhaoMing 已提交
1885 1886
    }
  }
赵明 已提交
1887
  if (f->prop.has_range_deletions()) {
赵明 已提交
1888 1889 1890 1891
    space_amplification_[level] |= kHasRangeDeletion;
  }
}

1892 1893
void VersionStorageInfo::FinishAddFile(SequenceNumber _oldest_snapshot_seqnum) {
  oldest_snapshot_seqnum_ = _oldest_snapshot_seqnum;
Z
ZhaoMing 已提交
1894 1895 1896 1897 1898 1899 1900
  for (int i = -1; i < num_levels_; ++i) {
    for (auto f : files_[i]) {
      ++f->refs;
    }
  }
}

赵明 已提交
1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914
uint64_t VersionStorageInfo::FileSize(const FileMetaData* f,
                                      uint64_t file_number,
                                      uint64_t entry_count) const {
  if (f == nullptr) {
    auto find = dependence_map_.find(file_number);
    if (find == dependence_map_.end()) {
      // TODO log error
      return 0;
    }
    f = find->second;
  } else {
    assert(file_number == uint64_t(-1));
  }
  uint64_t file_size = f->fd.GetFileSize();
赵明 已提交
1915
  if (f->prop.is_map_sst()) {
赵明 已提交
1916 1917 1918 1919 1920 1921
    for (auto& dependence : f->prop.dependence) {
      file_size +=
          FileSize(nullptr, dependence.file_number, dependence.entry_count);
    }
  }
  assert(entry_count <= std::max<uint64_t>(1, f->prop.num_entries));
Z
ZhaoMing 已提交
1922 1923 1924 1925
  return entry_count == 0
             ? file_size
             : uint64_t(double(file_size) * entry_count /
                        std::max<uint64_t>(1, f->prop.num_entries));
赵明 已提交
1926 1927 1928 1929
}

uint64_t VersionStorageInfo::FileSizeWithBlob(const FileMetaData* f,
                                              bool recursive,
Z
ZhaoMing 已提交
1930
                                              double ratio) const {
赵明 已提交
1931
  uint64_t file_size = f->fd.GetFileSize();
Z
ZhaoMing 已提交
1932
  if (recursive || f->prop.is_map_sst()) {
赵明 已提交
1933
    for (auto& dependence : f->prop.dependence) {
Z
ZhaoMing 已提交
1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944
      auto find = dependence_map_.find(dependence.file_number);
      if (find == dependence_map_.end()) {
        // TODO log error
        continue;
      }
      double new_ratio =
          find->second->prop.num_entries == 0
              ? ratio
              : ratio * dependence.entry_count / find->second->prop.num_entries;
      file_size +=
          FileSizeWithBlob(find->second, f->prop.is_map_sst(), new_ratio);
Z
ZhaoMing 已提交
1945 1946
    }
  }
Z
ZhaoMing 已提交
1947
  return uint64_t(ratio * file_size);
Z
ZhaoMing 已提交
1948 1949
}

1950 1951 1952 1953
// Version::PrepareApply() need to be called before calling the function, or
// following functions called:
// 1. UpdateNumNonEmptyLevels();
// 2. CalculateBaseBytes();
1954
// 3. UpdateFilesByCompactionPri();
1955 1956
// 4. GenerateFileIndexer();
// 5. GenerateLevelFilesBrief();
1957
// 6. GenerateLevel0NonOverlapping();
1958
// 7. GenerateBottommostFiles();
1959 1960 1961
void VersionStorageInfo::SetFinalized() {
  finalized_ = true;
#ifndef NDEBUG
S
sdong 已提交
1962 1963 1964 1965 1966 1967
  if (compaction_style_ != kCompactionStyleLevel) {
    // Not level based compaction.
    return;
  }
  assert(base_level_ < 0 || num_levels() == 1 ||
         (base_level_ >= 1 && base_level_ < num_levels()));
1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995
  // Verify all levels newer than base_level are empty except L0
  for (int level = 1; level < base_level(); level++) {
    assert(NumLevelBytes(level) == 0);
  }
  uint64_t max_bytes_prev_level = 0;
  for (int level = base_level(); level < num_levels() - 1; level++) {
    if (LevelFiles(level).size() == 0) {
      continue;
    }
    assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
    max_bytes_prev_level = MaxBytesForLevel(level);
  }
  int num_empty_non_l0_level = 0;
  for (int level = 0; level < num_levels(); level++) {
    assert(LevelFiles(level).size() == 0 ||
           LevelFiles(level).size() == LevelFilesBrief(level).num_files);
    if (level > 0 && NumLevelBytes(level) > 0) {
      num_empty_non_l0_level++;
    }
    if (LevelFiles(level).size() > 0) {
      assert(level < num_non_empty_levels());
    }
  }
  assert(compaction_level_.size() > 0);
  assert(compaction_level_.size() == compaction_score_.size());
#endif
}

S
sdong 已提交
1996
void VersionStorageInfo::UpdateNumNonEmptyLevels() {
1997 1998 1999 2000 2001 2002 2003 2004 2005 2006
  num_non_empty_levels_ = num_levels_;
  for (int i = num_levels_ - 1; i >= 0; i--) {
    if (files_[i].size() != 0) {
      return;
    } else {
      num_non_empty_levels_ = i;
    }
  }
}

2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039
namespace {
// Sort `temp` based on ratio of overlapping size over file size
void SortFileByOverlappingRatio(
    const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
    const std::vector<FileMetaData*>& next_level_files,
    std::vector<Fsize>* temp) {
  std::unordered_map<uint64_t, uint64_t> file_to_order;
  auto next_level_it = next_level_files.begin();

  for (auto& file : files) {
    uint64_t overlapping_bytes = 0;
    // Skip files in next level that is smaller than current file
    while (next_level_it != next_level_files.end() &&
           icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
      next_level_it++;
    }

    while (next_level_it != next_level_files.end() &&
           icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
      overlapping_bytes += (*next_level_it)->fd.file_size;

      if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
        // next level file cross large boundary of current file.
        break;
      }
      next_level_it++;
    }

    assert(file->fd.file_size != 0);
    file_to_order[file->fd.GetNumber()] =
        overlapping_bytes * 1024u / file->fd.file_size;
  }

2040 2041 2042
  terark::sort_ex_a(*temp, [&](const Fsize& f) {
    return file_to_order[f.file->fd.GetNumber()];
  });
2043 2044 2045
}
}  // namespace

2046
void VersionStorageInfo::UpdateFilesByCompactionPri(
Y
Yi Wu 已提交
2047
    CompactionPri compaction_pri) {
2048 2049
  if (compaction_style_ == kCompactionStyleNone ||
      compaction_style_ == kCompactionStyleFIFO ||
S
sdong 已提交
2050
      compaction_style_ == kCompactionStyleUniversal) {
I
Igor Canadi 已提交
2051 2052 2053
    // don't need this
    return;
  }
2054
  // No need to sort the highest level because it is never compacted.
2055
  for (int level = 0; level < num_levels() - 1; level++) {
2056
    const std::vector<FileMetaData*>& files = files_[level];
2057 2058
    auto& files_by_compaction_pri = files_by_compaction_pri_[level];
    assert(files_by_compaction_pri.size() == 0);
2059 2060 2061

    // populate a temp vector for sorting based on size
    std::vector<Fsize> temp(files.size());
2062
    for (size_t i = 0; i < files.size(); i++) {
2063 2064 2065 2066
      temp[i].index = i;
      temp[i].file = files[i];
    }

S
sdong 已提交
2067 2068
    // sort the top number_of_files_to_sort_ based on file size
    size_t num = VersionStorageInfo::kNumberFilesToSort;
2069 2070
    if (num > temp.size()) {
      num = temp.size();
2071
    }
Y
Yi Wu 已提交
2072
    switch (compaction_pri) {
2073
      case kByCompensatedSize:
2074
        std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
L
leipeng 已提交
2075
                          TERARK_CMP(file->compensated_file_size, >));
2076
        break;
2077
      case kOldestLargestSeqFirst:
L
leipeng 已提交
2078
        terark::sort_a(temp, TERARK_CMP(file->fd.largest_seqno, <));
2079
        break;
2080
      case kOldestSmallestSeqFirst:
L
leipeng 已提交
2081
        terark::sort_a(temp, TERARK_CMP(file->fd.smallest_seqno, <));
2082
        break;
2083 2084 2085 2086
      case kMinOverlappingRatio:
        SortFileByOverlappingRatio(*internal_comparator_, files_[level],
                                   files_[level + 1], &temp);
        break;
2087 2088 2089
      default:
        assert(false);
    }
2090 2091
    assert(temp.size() == files.size());

2092
    // initialize files_by_compaction_pri_
2093 2094
    for (size_t i = 0; i < temp.size(); i++) {
      files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2095 2096
    }
    next_file_to_compact_by_size_[level] = 0;
2097
    assert(files_[level].size() == files_by_compaction_pri_[level].size());
2098 2099 2100
  }
}

2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111
void VersionStorageInfo::GenerateLevel0NonOverlapping() {
  assert(!finalized_);
  level0_non_overlapping_ = true;
  if (level_files_brief_.size() == 0) {
    return;
  }

  // A copy of L0 files sorted by smallest key
  std::vector<FdWithKeyRange> level0_sorted_file(
      level_files_brief_[0].files,
      level_files_brief_[0].files + level_files_brief_[0].num_files);
L
leipeng 已提交
2112
  auto icmp = internal_comparator_;
L
leipeng 已提交
2113
  terark::sort_a(level0_sorted_file, TERARK_FIELD(smallest_key) < *icmp);
2114 2115 2116 2117

  for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
    FdWithKeyRange& f = level0_sorted_file[i];
    FdWithKeyRange& prev = level0_sorted_file[i - 1];
L
leipeng 已提交
2118
    if (icmp->Compare(prev.largest_key, f.smallest_key) >= 0) {
2119 2120 2121 2122 2123 2124
      level0_non_overlapping_ = false;
      break;
    }
  }
}

2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137
void VersionStorageInfo::GenerateBottommostFiles() {
  assert(!finalized_);
  assert(bottommost_files_.empty());
  for (size_t level = 0; level < level_files_brief_.size(); ++level) {
    for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
         ++file_idx) {
      const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
      int l0_file_idx;
      if (level == 0) {
        l0_file_idx = static_cast<int>(file_idx);
      } else {
        l0_file_idx = -1;
      }
2138 2139 2140
      Slice smallest_user_key = ExtractUserKey(f.smallest_key);
      Slice largest_user_key = ExtractUserKey(f.largest_key);
      if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162
                                         static_cast<int>(level),
                                         l0_file_idx)) {
        bottommost_files_.emplace_back(static_cast<int>(level),
                                       f.file_metadata);
      }
    }
  }
}

void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
  assert(seqnum >= oldest_snapshot_seqnum_);
  oldest_snapshot_seqnum_ = seqnum;
  if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
    ComputeBottommostFilesMarkedForCompaction();
  }
}

void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
  bottommost_files_marked_for_compaction_.clear();
  bottommost_files_mark_threshold_ = kMaxSequenceNumber;
  for (auto& level_and_file : bottommost_files_) {
    if (!level_and_file.second->being_compacted &&
2163
        level_and_file.second->fd.largest_seqno != 0 &&
赵明 已提交
2164
        level_and_file.second->prop.num_deletions > 1) {
2165 2166 2167
      // largest_seqno might be nonzero due to containing the final key in an
      // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
      // ensures the file really contains deleted or overwritten keys.
2168
      if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
2169 2170 2171 2172
        bottommost_files_marked_for_compaction_.push_back(level_and_file);
      } else {
        bottommost_files_mark_threshold_ =
            std::min(bottommost_files_mark_threshold_,
2173
                     level_and_file.second->fd.largest_seqno);
2174 2175 2176 2177 2178
      }
    }
  }
}

赵明 已提交
2179
void Version::Ref() { ++refs_; }
J
jorlow@chromium.org 已提交
2180

2181
bool Version::Unref() {
J
jorlow@chromium.org 已提交
2182 2183 2184
  assert(refs_ >= 1);
  --refs_;
  if (refs_ == 0) {
2185
    delete this;
2186
    return true;
J
jorlow@chromium.org 已提交
2187
  }
2188
  return false;
J
jorlow@chromium.org 已提交
2189 2190
}

S
sdong 已提交
2191 2192 2193
bool VersionStorageInfo::OverlapInLevel(int level,
                                        const Slice* smallest_user_key,
                                        const Slice* largest_user_key) {
2194 2195 2196 2197
  if (level >= num_non_empty_levels_) {
    // empty level, no overlap
    return false;
  }
S
sdong 已提交
2198
  return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2199
                               level_files_brief_[level], smallest_user_key,
2200
                               largest_user_key);
G
Gabor Cselle 已提交
2201 2202 2203
}

// Store in "*inputs" all files in "level" that overlap [begin,end]
A
Abhishek Kona 已提交
2204
// If hint_index is specified, then it points to a file in the
2205 2206
// overlapping range.
// The file_index returns a pointer to any file in an overlapping range.
S
sdong 已提交
2207 2208
void VersionStorageInfo::GetOverlappingInputs(
    int level, const InternalKey* begin, const InternalKey* end,
2209
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2210
    bool expand_range, InternalKey** next_smallest) const {
2211 2212 2213 2214 2215
  if (level >= num_non_empty_levels_) {
    // this level is empty, no overlapping inputs
    return;
  }

G
Gabor Cselle 已提交
2216
  inputs->clear();
2217 2218 2219
  if (file_index) {
    *file_index = -1;
  }
S
sdong 已提交
2220
  const Comparator* user_cmp = user_comparator_;
2221
  if (level > 0) {
2222 2223
    GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
                                          file_index, false, next_smallest);
2224 2225
    return;
  }
A
Aaron Gao 已提交
2226

2227 2228 2229 2230 2231 2232
  if (next_smallest) {
    // next_smallest key only makes sense for non-level 0, where files are
    // non-overlapping
    *next_smallest = nullptr;
  }

2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256
  Slice user_begin, user_end;
  if (begin != nullptr) {
    user_begin = begin->user_key();
  }
  if (end != nullptr) {
    user_end = end->user_key();
  }

  // index stores the file index need to check.
  std::list<size_t> index;
  for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
    index.emplace_back(i);
  }

  while (!index.empty()) {
    bool found_overlapping_file = false;
    auto iter = index.begin();
    while (iter != index.end()) {
      FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
      const Slice file_start = ExtractUserKey(f->smallest_key);
      const Slice file_limit = ExtractUserKey(f->largest_key);
      if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
        // "f" is completely before specified range; skip it
        iter++;
2257 2258
      } else if (end != nullptr &&
                 user_cmp->Compare(file_start, user_end) > 0) {
2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275
        // "f" is completely after specified range; skip it
        iter++;
      } else {
        // if overlap
        inputs->emplace_back(files_[level][*iter]);
        found_overlapping_file = true;
        // record the first file index.
        if (file_index && *file_index == -1) {
          *file_index = static_cast<int>(*iter);
        }
        // the related file is overlap, erase to avoid checking again.
        iter = index.erase(iter);
        if (expand_range) {
          if (begin != nullptr &&
              user_cmp->Compare(file_start, user_begin) < 0) {
            user_begin = file_start;
          }
2276
          if (end != nullptr && user_cmp->Compare(file_limit, user_end) > 0) {
2277 2278
            user_end = file_limit;
          }
H
Hans Wennborg 已提交
2279 2280
        }
      }
G
Gabor Cselle 已提交
2281
    }
2282 2283 2284 2285
    // if all the files left are not overlap, break
    if (!found_overlapping_file) {
      break;
    }
G
Gabor Cselle 已提交
2286
  }
2287 2288
}

A
Aaron Gao 已提交
2289 2290 2291 2292 2293 2294 2295 2296 2297
// Store in "*inputs" files in "level" that within range [begin,end]
// Guarantee a "clean cut" boundary between the files in inputs
// and the surrounding files and the maxinum number of files.
// This will ensure that no parts of a key are lost during compaction.
// If hint_index is specified, then it points to a file in the range.
// The file_index returns a pointer to any file in an overlapping range.
void VersionStorageInfo::GetCleanInputsWithinInterval(
    int level, const InternalKey* begin, const InternalKey* end,
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
2298 2299 2300 2301 2302 2303
  inputs->clear();
  if (file_index) {
    *file_index = -1;
  }
  if (level >= num_non_empty_levels_ || level == 0 ||
      level_files_brief_[level].num_files == 0) {
A
Aaron Gao 已提交
2304
    // this level is empty, no inputs within range
2305
    // also don't support clean input interval within L0
A
Aaron Gao 已提交
2306 2307 2308
    return;
  }

2309 2310
  const auto& level_files = level_files_brief_[level];
  if (begin == nullptr) {
2311
    begin = &level_files.files[0].file_metadata->smallest;
A
Aaron Gao 已提交
2312
  }
2313
  if (end == nullptr) {
2314
    end = &level_files.files[level_files.num_files - 1].file_metadata->largest;
A
Aaron Gao 已提交
2315
  }
2316

赵明 已提交
2317 2318
  GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
                                        file_index, true /* within_interval */);
A
Aaron Gao 已提交
2319 2320
}

2321 2322 2323 2324
// Store in "*inputs" all files in "level" that overlap [begin,end]
// Employ binary search to find at least one file that overlaps the
// specified range. From that file, iterate backwards and
// forwards to find all overlapping files.
A
Aaron Gao 已提交
2325 2326 2327 2328
// if within_range is set, then only store the maximum clean inputs
// within range [begin, end]. "clean" means there is a boudnary
// between the files in "*inputs" and the surrounding files
void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
2329
    int level, const InternalKey* begin, const InternalKey* end,
A
Aaron Gao 已提交
2330
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2331
    bool within_interval, InternalKey** next_smallest) const {
2332 2333 2334
  assert(level > 0);
  int min = 0;
  int mid = 0;
2335
  int max = static_cast<int>(files_[level].size()) - 1;
2336
  bool foundOverlap = false;
2337
  auto user_cmp = user_comparator_;
2338 2339 2340 2341 2342 2343 2344 2345 2346

  // if the caller already knows the index of a file that has overlap,
  // then we can skip the binary search.
  if (hint_index != -1) {
    mid = hint_index;
    foundOverlap = true;
  }

  while (!foundOverlap && min <= max) {
奏之章 已提交
2347
    mid = (min + max) / 2;
2348
    FdWithKeyRange* f = &(level_files_brief_[level].files[mid]);
2349 2350 2351 2352
    auto& smallest = f->file_metadata->smallest;
    auto& largest = f->file_metadata->largest;
    if ((!within_interval && sstableKeyCompare(user_cmp, begin, largest) > 0) ||
        (within_interval && sstableKeyCompare(user_cmp, begin, smallest) > 0)) {
2353
      min = mid + 1;
A
Aaron Gao 已提交
2354
    } else if ((!within_interval &&
2355
                sstableKeyCompare(user_cmp, smallest, end) > 0) ||
A
Aaron Gao 已提交
2356
               (within_interval &&
2357
                sstableKeyCompare(user_cmp, largest, end) > 0)) {
2358 2359 2360 2361 2362 2363
      max = mid - 1;
    } else {
      foundOverlap = true;
      break;
    }
  }
A
Abhishek Kona 已提交
2364

2365 2366
  // If there were no overlapping files, return immediately.
  if (!foundOverlap) {
2367 2368 2369
    if (next_smallest) {
      next_smallest = nullptr;
    }
2370 2371
    return;
  }
2372 2373 2374 2375
  // returns the index where an overlap is found
  if (file_index) {
    *file_index = mid;
  }
A
Aaron Gao 已提交
2376 2377 2378

  int start_index, end_index;
  if (within_interval) {
赵明 已提交
2379 2380
    ExtendFileRangeWithinInterval(level, begin, end, mid, &start_index,
                                  &end_index);
A
Aaron Gao 已提交
2381
  } else {
赵明 已提交
2382 2383
    ExtendFileRangeOverlappingInterval(level, begin, end, mid, &start_index,
                                       &end_index);
2384
    assert(end_index >= start_index);
A
Aaron Gao 已提交
2385 2386 2387 2388 2389
  }
  // insert overlapping files into vector
  for (int i = start_index; i <= end_index; i++) {
    inputs->push_back(files_[level][i]);
  }
2390 2391 2392 2393 2394 2395 2396 2397 2398

  if (next_smallest != nullptr) {
    // Provide the next key outside the range covered by inputs
    if (++end_index < static_cast<int>(files_[level].size())) {
      **next_smallest = files_[level][end_index]->smallest;
    } else {
      *next_smallest = nullptr;
    }
  }
2399
}
A
Abhishek Kona 已提交
2400

A
Aaron Gao 已提交
2401 2402 2403
// Store in *start_index and *end_index the range of all files in
// "level" that overlap [begin,end]
// The mid_index specifies the index of at least one file that
2404 2405
// overlaps the specified range. From that file, iterate backward
// and forward to find all overlapping files.
S
sdong 已提交
2406
// Use FileLevel in searching, make it faster
A
Aaron Gao 已提交
2407
void VersionStorageInfo::ExtendFileRangeOverlappingInterval(
2408
    int level, const InternalKey* begin, const InternalKey* end,
A
Aaron Gao 已提交
2409
    unsigned int mid_index, int* start_index, int* end_index) const {
2410
  auto user_cmp = user_comparator_;
2411
  const FdWithKeyRange* files = level_files_brief_[level].files;
2412 2413
#ifndef NDEBUG
  {
A
Aaron Gao 已提交
2414 2415 2416
    // assert that the file at mid_index overlaps with the range
    assert(mid_index < level_files_brief_[level].num_files);
    const FdWithKeyRange* f = &files[mid_index];
2417 2418 2419 2420
    auto& smallest = f->file_metadata->smallest;
    auto& largest = f->file_metadata->largest;
    if (sstableKeyCompare(user_cmp, begin, smallest) <= 0) {
      assert(sstableKeyCompare(user_cmp, smallest, end) <= 0);
2421
    } else {
赵明 已提交
2422 2423
      // fprintf(stderr, "ExtendFileRangeOverlappingInterval\n%s - %s\n%s"
      //                 " - %s\n%d %d\n",
2424 2425 2426 2427 2428 2429 2430
      //         begin ? begin->DebugString().c_str() : "(null)",
      //         end ? end->DebugString().c_str() : "(null)",
      //         smallest->DebugString().c_str(),
      //         largest->DebugString().c_str(),
      //         sstableKeyCompare(user_cmp, smallest, begin),
      //         sstableKeyCompare(user_cmp, largest, begin));
      assert(sstableKeyCompare(user_cmp, begin, largest) <= 0);
2431 2432 2433
    }
  }
#endif
A
Aaron Gao 已提交
2434 2435
  *start_index = mid_index + 1;
  *end_index = mid_index;
T
Tamir Duberstein 已提交
2436
  int count __attribute__((__unused__));
2437
  count = 0;
2438 2439

  // check backwards from 'mid' to lower indices
奏之章 已提交
2440
  for (int i = mid_index; i >= 0; i--) {
2441
    const FdWithKeyRange* f = &files[i];
2442 2443
    auto& largest = f->file_metadata->largest;
    if (sstableKeyCompare(user_cmp, begin, largest) <= 0) {
A
Aaron Gao 已提交
2444
      *start_index = i;
2445
      assert((count++, true));
2446 2447 2448 2449 2450
    } else {
      break;
    }
  }
  // check forward from 'mid+1' to higher indices
赵明 已提交
2451 2452
  for (unsigned int i = mid_index + 1; i < level_files_brief_[level].num_files;
       i++) {
2453
    const FdWithKeyRange* f = &files[i];
2454 2455
    auto& smallest = f->file_metadata->smallest;
    if (sstableKeyCompare(user_cmp, smallest, end) <= 0) {
2456
      assert((count++, true));
A
Aaron Gao 已提交
2457
      *end_index = i;
2458 2459 2460 2461
    } else {
      break;
    }
  }
A
Aaron Gao 已提交
2462
  assert(count == *end_index - *start_index + 1);
2463 2464
}

A
Aaron Gao 已提交
2465 2466 2467 2468 2469 2470 2471 2472
// Store in *start_index and *end_index the clean range of all files in
// "level" within [begin,end]
// The mid_index specifies the index of at least one file within
// the specified range. From that file, iterate backward
// and forward to find all overlapping files and then "shrink" to
// the clean range required.
// Use FileLevel in searching, make it faster
void VersionStorageInfo::ExtendFileRangeWithinInterval(
2473
    int level, const InternalKey* begin, const InternalKey* end,
A
Aaron Gao 已提交
2474 2475
    unsigned int mid_index, int* start_index, int* end_index) const {
  assert(level != 0);
2476
  auto* user_cmp = user_comparator_;
2477
  const FdWithKeyRange* files = level_files_brief_[level].files;
A
Aaron Gao 已提交
2478 2479 2480 2481 2482
#ifndef NDEBUG
  {
    // assert that the file at mid_index is within the range
    assert(mid_index < level_files_brief_[level].num_files);
    const FdWithKeyRange* f = &files[mid_index];
2483 2484 2485 2486
    auto& smallest = f->file_metadata->smallest;
    auto& largest = f->file_metadata->largest;
    assert(sstableKeyCompare(user_cmp, begin, smallest) <= 0 &&
           sstableKeyCompare(user_cmp, largest, end) <= 0);
A
Aaron Gao 已提交
2487 2488
  }
#endif
赵明 已提交
2489 2490
  ExtendFileRangeOverlappingInterval(level, begin, end, mid_index, start_index,
                                     end_index);
A
Aaron Gao 已提交
2491 2492 2493 2494
  int left = *start_index;
  int right = *end_index;
  // shrink from left to right
  while (left <= right) {
2495 2496
    auto& smallest = files[left].file_metadata->smallest;
    if (sstableKeyCompare(user_cmp, begin, smallest) > 0) {
A
Aaron Gao 已提交
2497 2498 2499 2500
      left++;
      continue;
    }
    if (left > 0) {  // If not first file
2501 2502
      auto& largest = files[left - 1].file_metadata->largest;
      if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
A
Aaron Gao 已提交
2503 2504 2505
        left++;
        continue;
      }
2506
    }
A
Aaron Gao 已提交
2507
    break;
2508
  }
A
Aaron Gao 已提交
2509 2510
  // shrink from right to left
  while (left <= right) {
2511 2512
    auto& largest = files[right].file_metadata->largest;
    if (sstableKeyCompare(user_cmp, largest, end) > 0) {
A
Aaron Gao 已提交
2513 2514
      right--;
      continue;
2515
    }
A
Aaron Gao 已提交
2516 2517
    if (right < static_cast<int>(level_files_brief_[level].num_files) -
                    1) {  // If not the last file
2518 2519
      auto& smallest = files[right + 1].file_metadata->smallest;
      if (sstableKeyCompare(user_cmp, smallest, largest) == 0) {
A
Aaron Gao 已提交
2520 2521 2522 2523 2524 2525
        // The last user key in range overlaps with the next file's first key
        right--;
        continue;
      }
    }
    break;
2526 2527
  }

A
Aaron Gao 已提交
2528 2529
  *start_index = left;
  *end_index = right;
2530 2531
}

S
sdong 已提交
2532
uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
2533
  assert(level >= 0);
2534
  assert(level < num_levels());
2535 2536 2537
  return TotalFileSize(files_[level]);
}

S
sdong 已提交
2538 2539
const char* VersionStorageInfo::LevelSummary(
    LevelSummaryStorage* scratch) const {
2540
  int len = 0;
S
sdong 已提交
2541
  if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2542
    assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
2543 2544 2545 2546 2547 2548
    if (level_multiplier_ != 0.0) {
      len = snprintf(
          scratch->buffer, sizeof(scratch->buffer),
          "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
          base_level_, level_multiplier_, level_max_bytes_[base_level_]);
    }
2549 2550
  }
  len +=
S
sdong 已提交
2551
      snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
2552
  for (int i = 0; i < num_levels(); i++) {
2553 2554 2555 2556 2557
    int sz = sizeof(scratch->buffer) - len;
    int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
    if (ret < 0 || ret >= sz) break;
    len += ret;
  }
I
Igor Canadi 已提交
2558 2559 2560 2561
  if (len > 0) {
    // overwrite the last space
    --len;
  }
2562 2563 2564 2565 2566
  len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
                  "] max score %.2f", compaction_score_[0]);

  if (!files_marked_for_compaction_.empty()) {
    snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
2567
             " (%" ROCKSDB_PRIszt " files need compaction)",
2568 2569 2570
             files_marked_for_compaction_.size());
  }

2571 2572 2573
  return scratch->buffer;
}

S
sdong 已提交
2574 2575
const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
                                                 int level) const {
2576 2577 2578
  int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
  for (const auto& f : files_[level]) {
    int sz = sizeof(scratch->buffer) - len;
I
Igor Canadi 已提交
2579
    char sztxt[16];
2580
    AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
2581
    int ret = snprintf(scratch->buffer + len, sz,
2582
                       "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
2583
                       f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
I
Igor Canadi 已提交
2584
                       static_cast<int>(f->being_compacted));
奏之章 已提交
2585
    if (ret < 0 || ret >= sz) {
2586
      break;
奏之章 已提交
2587
    }
2588 2589
    len += ret;
  }
I
Igor Canadi 已提交
2590 2591 2592 2593
  // overwrite the last space (only if files_[level].size() is non-zero)
  if (files_[level].size() && len > 0) {
    --len;
  }
2594 2595 2596 2597
  snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
  return scratch->buffer;
}

S
sdong 已提交
2598
int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
2599 2600
  uint64_t result = 0;
  std::vector<FileMetaData*> overlaps;
2601
  for (int level = 1; level < num_levels() - 1; level++) {
2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612
    for (const auto& f : files_[level]) {
      GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
      const uint64_t sum = TotalFileSize(overlaps);
      if (sum > result) {
        result = sum;
      }
    }
  }
  return result;
}

2613 2614 2615 2616 2617 2618 2619 2620 2621 2622
uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
  // Note: the result for level zero is not really used since we set
  // the level-0 compaction threshold based on number of files.
  assert(level >= 0);
  assert(level < static_cast<int>(level_max_bytes_.size()));
  return level_max_bytes_[level];
}

void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
                                            const MutableCFOptions& options) {
S
sdong 已提交
2623 2624 2625
  // Special logic to set number of sorted runs.
  // It is to match the previous behavior when all files are in L0.
  int num_l0_count = static_cast<int>(files_[0].size());
V
vrofze 已提交
2626
  if (compaction_style_ == kCompactionStyleUniversal &&
Z
ZhaoMing 已提交
2627
      !ioptions.enable_lazy_compaction) {
S
sdong 已提交
2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638
    // For universal compaction, we use level0 score to indicate
    // compaction score for the whole DB. Adding other levels as if
    // they are L0 files.
    for (int i = 1; i < num_levels(); i++) {
      if (!files_[i].empty()) {
        num_l0_count++;
      }
    }
  }
  set_l0_delay_trigger_count(num_l0_count);

2639 2640
  level_max_bytes_.resize(ioptions.num_levels);
  if (!ioptions.level_compaction_dynamic_level_bytes) {
S
sdong 已提交
2641
    base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
2642 2643 2644 2645 2646 2647 2648 2649 2650

    // Calculate for static bytes base case
    for (int i = 0; i < ioptions.num_levels; ++i) {
      if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
        level_max_bytes_[i] = options.max_bytes_for_level_base;
      } else if (i > 1) {
        level_max_bytes_[i] = MultiplyCheckOverflow(
            MultiplyCheckOverflow(level_max_bytes_[i - 1],
                                  options.max_bytes_for_level_multiplier),
S
sdong 已提交
2651
            options.MaxBytesMultiplerAdditional(i - 1));
2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685
      } else {
        level_max_bytes_[i] = options.max_bytes_for_level_base;
      }
    }
  } else {
    uint64_t max_level_size = 0;

    int first_non_empty_level = -1;
    // Find size of non-L0 level of most data.
    // Cannot use the size of the last level because it can be empty or less
    // than previous levels after compaction.
    for (int i = 1; i < num_levels_; i++) {
      uint64_t total_size = 0;
      for (const auto& f : files_[i]) {
        total_size += f->fd.GetFileSize();
      }
      if (total_size > 0 && first_non_empty_level == -1) {
        first_non_empty_level = i;
      }
      if (total_size > max_level_size) {
        max_level_size = total_size;
      }
    }

    // Prefill every level's max bytes to disallow compaction from there.
    for (int i = 0; i < num_levels_; i++) {
      level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
    }

    if (max_level_size == 0) {
      // No data for L1 and up. L0 compacts to last level directly.
      // No compaction from L1+ needs to be scheduled.
      base_level_ = num_levels_ - 1;
    } else {
2686 2687 2688 2689 2690 2691 2692
      uint64_t l0_size = 0;
      for (const auto& f : files_[0]) {
        l0_size += f->fd.GetFileSize();
      }

      uint64_t base_bytes_max =
          std::max(options.max_bytes_for_level_base, l0_size);
2693 2694
      uint64_t base_bytes_min = static_cast<uint64_t>(
          base_bytes_max / options.max_bytes_for_level_multiplier);
2695 2696 2697 2698 2699

      // Try whether we can make last level's target size to be max_level_size
      uint64_t cur_level_size = max_level_size;
      for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
        // Round up after dividing
2700 2701
        cur_level_size = static_cast<uint64_t>(
            cur_level_size / options.max_bytes_for_level_multiplier);
2702 2703 2704
      }

      // Calculate base level and its size.
2705
      uint64_t base_level_size;
2706 2707 2708 2709
      if (cur_level_size <= base_bytes_min) {
        // Case 1. If we make target size of last level to be max_level_size,
        // target size of the first non-empty level would be smaller than
        // base_bytes_min. We set it be base_bytes_min.
2710
        base_level_size = base_bytes_min + 1U;
2711
        base_level_ = first_non_empty_level;
2712 2713 2714
        ROCKS_LOG_WARN(ioptions.info_log,
                       "More existing levels in DB than needed. "
                       "max_bytes_for_level_multiplier may not be guaranteed.");
2715 2716 2717 2718 2719
      } else {
        // Find base level (where L0 data is compacted to).
        base_level_ = first_non_empty_level;
        while (base_level_ > 1 && cur_level_size > base_bytes_max) {
          --base_level_;
2720 2721
          cur_level_size = static_cast<uint64_t>(
              cur_level_size / options.max_bytes_for_level_multiplier);
2722 2723 2724 2725
        }
        if (cur_level_size > base_bytes_max) {
          // Even L1 will be too large
          assert(base_level_ == 1);
2726
          base_level_size = base_bytes_max;
2727
        } else {
2728
          base_level_size = cur_level_size;
2729 2730 2731
        }
      }

2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754
      level_multiplier_ = options.max_bytes_for_level_multiplier;
      assert(base_level_size > 0);
      if (l0_size > base_level_size &&
          (l0_size > options.max_bytes_for_level_base ||
           static_cast<int>(files_[0].size() / 2) >=
               options.level0_file_num_compaction_trigger)) {
        // We adjust the base level according to actual L0 size, and adjust
        // the level multiplier accordingly, when:
        //   1. the L0 size is larger than level size base, or
        //   2. number of L0 files reaches twice the L0->L1 compaction trigger
        // We don't do this otherwise to keep the LSM-tree structure stable
        // unless the L0 compation is backlogged.
        base_level_size = l0_size;
        if (base_level_ == num_levels_ - 1) {
          level_multiplier_ = 1.0;
        } else {
          level_multiplier_ = std::pow(
              static_cast<double>(max_level_size) /
                  static_cast<double>(base_level_size),
              1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
        }
      }

2755
      uint64_t level_size = base_level_size;
2756 2757
      for (int i = base_level_; i < num_levels_; i++) {
        if (i > base_level_) {
2758
          level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
2759
        }
2760 2761 2762 2763 2764
        // Don't set any level below base_bytes_max. Otherwise, the LSM can
        // assume an hourglass shape where L1+ sizes are smaller than L0. This
        // causes compaction scoring, which depends on level sizes, to favor L1+
        // at the expense of L0, which may fill up and stall.
        level_max_bytes_[i] = std::max(level_size, base_bytes_max);
2765 2766 2767 2768 2769
      }
    }
  }
}

A
Andres Notzli 已提交
2770
uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
陈常龙 已提交
2771
  // See VersionStorageInfo::GetEstimatedActiveKeys
Z
ZhaoMing 已提交
2772 2773
  if (lsm_num_entries_ <= lsm_num_deletions_) {
    return 0;
A
Andres Notzli 已提交
2774
  }
2775
  double r = double(lsm_num_entries_ - lsm_num_deletions_) / lsm_num_entries_;
Z
ZhaoMing 已提交
2776 2777 2778 2779 2780
  return size_t(r * (lsm_file_size_ +
                     (blob_num_entries_ > blob_num_antiquation_
                          ? double(blob_num_entries_ - blob_num_antiquation_) /
                                blob_num_entries_ * blob_file_size_
                          : 0)));
A
Andres Notzli 已提交
2781 2782
}

2783
bool VersionStorageInfo::RangeMightExistAfterSortedRun(
2784 2785
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int last_level, int last_l0_idx) {
2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806
  assert((last_l0_idx != -1) == (last_level == 0));
  // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
  // bottommost only if it's the oldest L0 file and there are no files on older
  // levels. It'd be better to consider it bottommost if there's no overlap in
  // older levels/files.
  if (last_level == 0 &&
      last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
    return true;
  }

  // Checks whether there are files living beyond the `last_level`. If lower
  // levels have files, it checks for overlap between [`smallest_key`,
  // `largest_key`] and those files. Bottomlevel optimizations can be made if
  // there are no files in lower levels or if there is no overlap with the files
  // in the lower levels.
  for (int level = last_level + 1; level < num_levels(); level++) {
    // The range is not in the bottommost level if there are files in lower
    // levels when the `last_level` is 0 or if there are files in lower levels
    // which overlap with [`smallest_key`, `largest_key`].
    if (files_[level].size() > 0 &&
        (last_level == 0 ||
2807
         OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
2808 2809 2810 2811 2812
      return true;
    }
  }
  return false;
}
A
Andres Notzli 已提交
2813

2814
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
奏之章 已提交
2815
  for (int level = -1; level < storage_info_.num_levels(); level++) {
S
sdong 已提交
2816
    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
2817
    for (const auto& file : files) {
2818
      live->push_back(file->fd);
2819 2820 2821 2822
    }
  }
}

2823
std::string Version::DebugString(bool hex, bool print_stats) const {
J
jorlow@chromium.org 已提交
2824
  std::string r;
S
sdong 已提交
2825
  for (int level = 0; level < storage_info_.num_levels_; level++) {
2826 2827 2828 2829
    // E.g.,
    //   --- level 1 ---
    //   17:123['a' .. 'd']
    //   20:43['e' .. 'g']
2830 2831 2832
    //
    // if print_stats=true:
    //   17:123['a' .. 'd'](4096)
2833
    r.append("--- level ");
J
jorlow@chromium.org 已提交
2834
    AppendNumberTo(&r, level);
2835 2836
    r.append(" --- version# ");
    AppendNumberTo(&r, version_number_);
2837
    r.append(" ---\n");
S
sdong 已提交
2838
    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
D
dgrogan@chromium.org 已提交
2839
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
2840
      r.push_back(' ');
2841
      AppendNumberTo(&r, files[i]->fd.GetNumber());
J
jorlow@chromium.org 已提交
2842
      r.push_back(':');
2843
      AppendNumberTo(&r, files[i]->fd.GetFileSize());
G
Gabor Cselle 已提交
2844
      r.append("[");
Z
Zheng Shao 已提交
2845
      r.append(files[i]->smallest.DebugString(hex));
G
Gabor Cselle 已提交
2846
      r.append(" .. ");
Z
Zheng Shao 已提交
2847
      r.append(files[i]->largest.DebugString(hex));
2848 2849 2850 2851
      r.append("]");
      if (print_stats) {
        r.append("(");
        r.append(ToString(
奏之章 已提交
2852
            files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
2853 2854 2855
        r.append(")");
      }
      r.append("\n");
J
jorlow@chromium.org 已提交
2856 2857 2858 2859 2860
    }
  }
  return r;
}

2861 2862 2863 2864
// this is used to batch writes to the manifest file
struct VersionSet::ManifestWriter {
  Status status;
  bool done;
2865
  InstrumentedCondVar cv;
2866
  ColumnFamilyData* cfd;
2867
  const MutableCFOptions mutable_cf_options;
2868
  const autovector<VersionEdit*>& edit_list;
A
Abhishek Kona 已提交
2869

2870
  explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
2871
                          const MutableCFOptions& cf_options,
2872
                          const autovector<VersionEdit*>& e)
2873 2874 2875 2876 2877
      : done(false),
        cv(mu),
        cfd(_cfd),
        mutable_cf_options(cf_options),
        edit_list(e) {}
2878 2879
};

2880
VersionSet::VersionSet(const std::string& dbname,
S
Siying Dong 已提交
2881
                       const ImmutableDBOptions* _db_options,
赵明 已提交
2882 2883
                       const EnvOptions& storage_options, bool seq_per_batch,
                       Cache* table_cache,
2884
                       WriteBufferManager* write_buffer_manager,
2885
                       WriteController* write_controller)
2886
    : column_family_set_(
S
Siying Dong 已提交
2887
          new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
2888
                              write_buffer_manager, write_controller)),
S
Siying Dong 已提交
2889
      env_(_db_options->env),
J
jorlow@chromium.org 已提交
2890
      dbname_(dbname),
S
Siying Dong 已提交
2891
      db_options_(_db_options),
J
jorlow@chromium.org 已提交
2892 2893
      next_file_number_(2),
      manifest_file_number_(0),  // Filled by Recover()
2894
      options_file_number_(0),
2895
      pending_manifest_file_number_(0),
2896
      last_sequence_(0),
2897
      last_allocated_sequence_(0),
2898
      last_published_sequence_(0),
2899
      prev_log_number_(0),
A
Abhishek Kona 已提交
2900
      current_version_number_(0),
2901
      manifest_file_size_(0),
2902
      manifest_edit_count_(0),
R
rockeet 已提交
2903
      seq_per_batch_(seq_per_batch),
2904
      env_options_(storage_options) {}
J
jorlow@chromium.org 已提交
2905

2906 2907 2908 2909 2910
void CloseTables(void* ptr, size_t) {
  TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
  table_reader->Close();
}

J
jorlow@chromium.org 已提交
2911
VersionSet::~VersionSet() {
2912 2913
  // we need to delete column_family_set_ because its destructor depends on
  // VersionSet
2914 2915
  Cache* table_cache = column_family_set_->get_table_cache();
  table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
2916
  column_family_set_.reset();
2917 2918 2919 2920
  for (auto& file : obsolete_files_) {
    if (file.metadata->table_reader_handle) {
      table_cache->Release(file.metadata->table_reader_handle);
      TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
2921
    }
2922
    file.DeleteMetadata();
2923 2924
  }
  obsolete_files_.clear();
2925 2926
}

2927 2928
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
                               Version* v) {
I
Igor Canadi 已提交
2929 2930
  // compute new compaction score
  v->storage_info()->ComputeCompactionScore(
Y
Yi Wu 已提交
2931
      *column_family_data->ioptions(),
2932
      *column_family_data->GetLatestMutableCFOptions());
I
Igor Canadi 已提交
2933

2934
  // Mark v finalized
S
sdong 已提交
2935
  v->storage_info_.SetFinalized();
2936

2937 2938
  // Make "v" current
  assert(v->refs_ == 0);
2939 2940 2941 2942 2943
  Version* current = column_family_data->current();
  assert(v != current);
  if (current != nullptr) {
    assert(current->refs_ > 0);
    current->Unref();
2944
  }
2945
  column_family_data->SetCurrent(v);
2946 2947 2948
  v->Ref();

  // Append to linked list
2949 2950
  v->prev_ = column_family_data->dummy_versions()->prev_;
  v->next_ = column_family_data->dummy_versions();
2951 2952 2953 2954
  v->prev_->next_ = v;
  v->next_->prev_ = v;
}

2955 2956 2957 2958 2959 2960 2961
Status VersionSet::ProcessManifestWrites(
    std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
    Directory* db_directory, bool new_descriptor_log,
    const ColumnFamilyOptions* new_cf_options) {
  assert(!writers.empty());
  ManifestWriter& first_writer = writers.front();
  ManifestWriter* last_writer = &first_writer;
2962

2963 2964
  assert(!manifest_writers_.empty());
  assert(manifest_writers_.front() == &first_writer);
A
Abhishek Kona 已提交
2965

2966
  autovector<VersionEdit*> batch_edits;
2967 2968 2969 2970 2971 2972 2973 2974
  autovector<Version*> versions;
  autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
  std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;

  if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
    // No group commits for column family add or drop
    LogAndApplyCFHelper(first_writer.edit_list.front());
    batch_edits.push_back(first_writer.edit_list.front());
2975
  } else {
2976
    auto it = manifest_writers_.cbegin();
2977
    size_t group_start = std::numeric_limits<size_t>::max();
2978 2979
    while (it != manifest_writers_.cend()) {
      if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
2980 2981 2982
        // no group commits for column family add or drop
        break;
      }
2983 2984
      last_writer = *(it++);
      assert(last_writer != nullptr);
2985
      assert(last_writer->cfd != nullptr);
2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015
      if (last_writer->cfd->IsDropped()) {
        // If we detect a dropped CF at this point, and the corresponding
        // version edits belong to an atomic group, then we need to find out
        // the preceding version edits in the same atomic group, and update
        // their `remaining_entries_` member variable because we are NOT going
        // to write the version edits' of dropped CF to the MANIFEST. If we
        // don't update, then Recover can report corrupted atomic group because
        // the `remaining_entries_` do not match.
        if (!batch_edits.empty()) {
          if (batch_edits.back()->is_in_atomic_group_ &&
              batch_edits.back()->remaining_entries_ > 0) {
            assert(group_start < batch_edits.size());
            const auto& edit_list = last_writer->edit_list;
            size_t k = 0;
            while (k < edit_list.size()) {
              if (!edit_list[k]->is_in_atomic_group_) {
                break;
              } else if (edit_list[k]->remaining_entries_ == 0) {
                ++k;
                break;
              }
              ++k;
            }
            for (auto i = group_start; i < batch_edits.size(); ++i) {
              assert(static_cast<uint32_t>(k) <=
                     batch_edits.back()->remaining_entries_);
              batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
            }
          }
        }
3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045
        continue;
      }
      // We do a linear search on versions because versions is small.
      // TODO(yanqin) maybe consider unordered_map
      Version* version = nullptr;
      VersionBuilder* builder = nullptr;
      for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
        uint32_t cf_id = last_writer->cfd->GetID();
        if (versions[i]->cfd()->GetID() == cf_id) {
          version = versions[i];
          assert(!builder_guards.empty() &&
                 builder_guards.size() == versions.size());
          builder = builder_guards[i]->version_builder();
          TEST_SYNC_POINT_CALLBACK(
              "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
          break;
        }
      }
      if (version == nullptr) {
        version = new Version(last_writer->cfd, this, env_options_,
                              last_writer->mutable_cf_options,
                              current_version_number_++);
        versions.push_back(version);
        mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
        builder_guards.emplace_back(
            new BaseReferencedVersionBuilder(last_writer->cfd));
        builder = builder_guards.back()->version_builder();
      }
      assert(builder != nullptr);  // make checker happy
      for (const auto& e : last_writer->edit_list) {
3046 3047 3048 3049 3050 3051 3052 3053 3054
        if (e->is_in_atomic_group_) {
          if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
              (batch_edits.back()->is_in_atomic_group_ &&
               batch_edits.back()->remaining_entries_ == 0)) {
            group_start = batch_edits.size();
          }
        } else if (group_start != std::numeric_limits<size_t>::max()) {
          group_start = std::numeric_limits<size_t>::max();
        }
3055 3056
        LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
        batch_edits.push_back(e);
3057
      }
3058
    }
J
jorlow@chromium.org 已提交
3059 3060
  }

3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096
#ifndef NDEBUG
  // Verify that version edits of atomic groups have correct
  // remaining_entries_.
  size_t k = 0;
  while (k < batch_edits.size()) {
    while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
      ++k;
    }
    if (k == batch_edits.size()) {
      break;
    }
    size_t i = k;
    while (i < batch_edits.size()) {
      if (!batch_edits[i]->is_in_atomic_group_) {
        break;
      }
      assert(i - k + batch_edits[i]->remaining_entries_ ==
             batch_edits[k]->remaining_entries_);
      if (batch_edits[i]->remaining_entries_ == 0) {
        ++i;
        break;
      }
      ++i;
    }
    assert(batch_edits[i - 1]->is_in_atomic_group_);
    assert(0 == batch_edits[i - 1]->remaining_entries_);
    std::vector<VersionEdit*> tmp;
    for (size_t j = k; j != i; ++j) {
      tmp.emplace_back(batch_edits[j]);
    }
    TEST_SYNC_POINT_CALLBACK(
        "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
    k = i;
  }
#endif  // NDEBUG

3097
  uint64_t new_manifest_file_size = 0;
3098
  Status s;
A
Abhishek Kona 已提交
3099

3100
  assert(pending_manifest_file_number_ == 0);
3101
  if (!descriptor_log_ ||
3102 3103
      manifest_file_size_ > db_options_->max_manifest_file_size ||
      manifest_edit_count_ > db_options_->max_manifest_edit_count) {
3104
    pending_manifest_file_number_ = NewFileNumber();
3105
    batch_edits.back()->SetNextFile(next_file_number_.load());
A
Abhishek Kona 已提交
3106
    new_descriptor_log = true;
3107 3108
  } else {
    pending_manifest_file_number_ = manifest_file_number_;
A
Abhishek Kona 已提交
3109 3110
  }

I
Igor Canadi 已提交
3111
  if (new_descriptor_log) {
3112 3113
    // if we are writing out new snapshot make sure to persist max column
    // family.
I
Igor Canadi 已提交
3114
    if (column_family_set_->GetMaxColumnFamily() > 0) {
3115
      first_writer.edit_list.front()->SetMaxColumnFamily(
3116
          column_family_set_->GetMaxColumnFamily());
I
Igor Canadi 已提交
3117
    }
J
jorlow@chromium.org 已提交
3118 3119
  }

3120
  {
3121
    EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_);
3122
    mu->Unlock();
3123

Z
ZhaoMing 已提交
3124 3125 3126 3127 3128 3129 3130 3131 3132
    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
        assert(!builder_guards.empty() &&
               builder_guards.size() == versions.size());
        auto* builder = builder_guards[i]->version_builder();
        builder->SaveTo(versions[i]->storage_info());
      }
    }

S
sdong 已提交
3133
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3134 3135
    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() &&
        column_family_set_->get_table_cache()->GetCapacity() ==
3136
            TableCache::kInfiniteCapacity) {
3137 3138 3139 3140 3141 3142 3143 3144 3145 3146
      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
        assert(!builder_guards.empty() &&
               builder_guards.size() == versions.size());
        assert(!mutable_cf_options_ptrs.empty() &&
               builder_guards.size() == versions.size());
        ColumnFamilyData* cfd = versions[i]->cfd_;
        builder_guards[i]->version_builder()->LoadTableHandlers(
            cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
            mutable_cf_options_ptrs[i]->prefix_extractor.get());
      }
3147 3148
    }

I
Igor Canadi 已提交
3149 3150
    // This is fine because everything inside of this block is serialized --
    // only one thread can be here at the same time
I
Igor Canadi 已提交
3151
    if (new_descriptor_log) {
3152
      // create new manifest file
3153 3154
      ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
                     pending_manifest_file_number_);
3155 3156
      std::string descriptor_fname =
          DescriptorFileName(dbname_, pending_manifest_file_number_);
3157
      std::unique_ptr<WritableFile> descriptor_file;
3158 3159
      s = NewWritableFile(env_, descriptor_fname, &descriptor_file,
                          opt_env_opts);
I
Igor Canadi 已提交
3160
      if (s.ok()) {
3161
        descriptor_file->SetPreallocationBlockSize(
3162
            db_options_->manifest_preallocation_size);
3163

3164
        std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
3165 3166
            std::move(descriptor_file), descriptor_fname, opt_env_opts, nullptr,
            db_options_->listeners));
3167 3168
        descriptor_log_.reset(
            new log::Writer(std::move(file_writer), 0, false));
I
Igor Canadi 已提交
3169 3170 3171 3172
        s = WriteSnapshot(descriptor_log_.get());
      }
    }

3173 3174
    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
Z
ZhaoMing 已提交
3175
        versions[i]->PrepareApply(*mutable_cf_options_ptrs[i]);
3176
      }
3177
    }
3178

3179
    // Write new records to MANIFEST log
J
jorlow@chromium.org 已提交
3180
    if (s.ok()) {
I
Igor Canadi 已提交
3181 3182
      for (auto& e : batch_edits) {
        std::string record;
3183
        if (!e->EncodeTo(&record)) {
3184 3185
          s = Status::Corruption("Unable to encode VersionEdit:" +
                                 e->DebugString(true));
3186 3187
          break;
        }
S
sdong 已提交
3188 3189
        TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
                         rocksdb_kill_odds * REDUCE_ODDS2);
3190 3191 3192 3193 3194
        s = descriptor_log_->AddRecord(record);
        if (!s.ok()) {
          break;
        }
      }
S
sdong 已提交
3195 3196
      if (s.ok()) {
        s = SyncManifest(env_, db_options_, descriptor_log_->file());
3197
      }
3198
      if (!s.ok()) {
3199
        ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
3200
                        s.ToString().c_str());
3201
      }
J
jorlow@chromium.org 已提交
3202 3203
    }

3204 3205
    // If we just created a new descriptor file, install it by writing a
    // new CURRENT file that points to it.
3206
    if (s.ok() && new_descriptor_log) {
A
Aaron Gao 已提交
3207 3208
      s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
                         db_directory);
3209 3210
    }

3211 3212 3213 3214
    if (s.ok()) {
      // find offset in manifest file where this version is stored.
      new_manifest_file_size = descriptor_log_->file()->GetFileSize();
    }
A
Abhishek Kona 已提交
3215

3216
    if (first_writer.edit_list.front()->is_column_family_drop_) {
3217
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3218 3219 3220 3221
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
    }

3222
    LogFlush(db_options_->info_log);
3223
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3224
    mu->Lock();
J
jorlow@chromium.org 已提交
3225 3226
  }

Z
ZhaoMing 已提交
3227 3228
  if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
    for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3229 3230 3231 3232
      assert(!builder_guards.empty() &&
             builder_guards.size() == versions.size());
      versions[i]->storage_info()->FinishAddFile(
          builder_guards[i]->version_storage()->oldest_snapshot_seqnum());
Z
ZhaoMing 已提交
3233 3234 3235
    }
  }

3236
  // Append the old manifest file to the obsolete_manifest_ list to be deleted
3237 3238 3239 3240 3241 3242
  // by PurgeObsoleteFiles later.
  if (s.ok() && new_descriptor_log) {
    obsolete_manifests_.emplace_back(
        DescriptorFileName("", manifest_file_number_));
  }

3243
  // Install the new versions
J
jorlow@chromium.org 已提交
3244
  if (s.ok()) {
3245
    if (first_writer.edit_list.front()->is_column_family_add_) {
3246
      assert(batch_edits.size() == 1);
3247
      assert(new_cf_options != nullptr);
3248 3249
      CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
    } else if (first_writer.edit_list.front()->is_column_family_drop_) {
3250
      assert(batch_edits.size() == 1);
3251 3252 3253
      first_writer.cfd->SetDropped();
      if (first_writer.cfd->Unref()) {
        delete first_writer.cfd;
3254 3255
      }
    } else {
3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273
      // Each version in versions corresponds to a column family.
      // For each column family, update its log number indicating that logs
      // with number smaller than this should be ignored.
      for (const auto version : versions) {
        uint64_t max_log_number_in_batch = 0;
        uint32_t cf_id = version->cfd_->GetID();
        for (const auto& e : batch_edits) {
          if (e->has_log_number_ && e->column_family_ == cf_id) {
            max_log_number_in_batch =
                std::max(max_log_number_in_batch, e->log_number_);
          }
        }
        if (max_log_number_in_batch != 0) {
          assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
          version->cfd_->SetLogNumber(max_log_number_in_batch);
        }
      }

3274
      uint64_t last_min_log_number_to_keep = 0;
I
Igor Canadi 已提交
3275
      for (auto& e : batch_edits) {
S
Siying Dong 已提交
3276
        if (e->has_min_log_number_to_keep_) {
3277 3278
          last_min_log_number_to_keep =
              std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
S
Siying Dong 已提交
3279
        }
I
Igor Canadi 已提交
3280
      }
3281

3282
      if (last_min_log_number_to_keep != 0) {
S
Siying Dong 已提交
3283
        // Should only be set in 2PC mode.
3284
        MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
S
Siying Dong 已提交
3285 3286
      }

3287 3288 3289 3290
      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
        ColumnFamilyData* cfd = versions[i]->cfd_;
        AppendVersion(cfd, versions[i]);
      }
3291
    }
3292
    manifest_file_number_ = pending_manifest_file_number_;
3293
    manifest_file_size_ = new_manifest_file_size;
3294 3295 3296 3297 3298
    if (new_descriptor_log) {
      manifest_edit_count_ = 0;
    } else {
      manifest_edit_count_ += batch_edits.size();
    }
3299
    prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
J
jorlow@chromium.org 已提交
3300
  } else {
3301 3302
    std::string version_edits;
    for (auto& e : batch_edits) {
3303 3304 3305 3306 3307 3308 3309
      version_edits += ("\n" + e->DebugString(true));
    }
    ROCKS_LOG_ERROR(db_options_->info_log,
                    "Error in committing version edit to MANIFEST: %s",
                    version_edits.c_str());
    for (auto v : versions) {
      delete v;
3310
    }
3311
    if (new_descriptor_log) {
3312 3313 3314
      ROCKS_LOG_INFO(db_options_->info_log,
                     "Deleting manifest %" PRIu64 " current manifest %" PRIu64
                     "\n",
3315
                     manifest_file_number_, pending_manifest_file_number_);
3316
      descriptor_log_.reset();
3317 3318
      env_->DeleteFile(
          DescriptorFileName(dbname_, pending_manifest_file_number_));
J
jorlow@chromium.org 已提交
3319 3320
    }
  }
Z
ZhaoMing 已提交
3321 3322 3323
  for (auto e : batch_edits) {
    e->DoApplyCallback(s);
  }
3324

3325
  pending_manifest_file_number_ = 0;
J
jorlow@chromium.org 已提交
3326

3327 3328 3329 3330
  // wake up all the waiting writers
  while (true) {
    ManifestWriter* ready = manifest_writers_.front();
    manifest_writers_.pop_front();
3331 3332 3333 3334 3335 3336 3337 3338 3339 3340
    bool need_signal = true;
    for (const auto& w : writers) {
      if (&w == ready) {
        need_signal = false;
        break;
      }
    }
    ready->status = s;
    ready->done = true;
    if (need_signal) {
3341 3342
      ready->cv.Signal();
    }
3343 3344 3345
    if (ready == last_writer) {
      break;
    }
3346 3347 3348 3349
  }
  if (!manifest_writers_.empty()) {
    manifest_writers_.front()->cv.Signal();
  }
J
jorlow@chromium.org 已提交
3350 3351 3352
  return s;
}

3353 3354 3355
// 'datas' is gramatically incorrect. We still use this notation is to indicate
// that this variable represents a collection of column_family_data.
Status VersionSet::LogAndApply(
3356 3357 3358
    const autovector<ColumnFamilyData*>& column_family_datas,
    const autovector<const MutableCFOptions*>& mutable_cf_options_list,
    const autovector<autovector<VersionEdit*>>& edit_lists,
3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389
    InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
    const ColumnFamilyOptions* new_cf_options) {
  mu->AssertHeld();
  int num_edits = 0;
  for (const auto& elist : edit_lists) {
    num_edits += static_cast<int>(elist.size());
  }
  if (num_edits == 0) {
    return Status::OK();
  } else if (num_edits > 1) {
#ifndef NDEBUG
    for (const auto& edit_list : edit_lists) {
      for (const auto& edit : edit_list) {
        assert(!edit->IsColumnFamilyManipulation());
      }
    }
#endif /* ! NDEBUG */
  }

  int num_cfds = static_cast<int>(column_family_datas.size());
  if (num_cfds == 1 && column_family_datas[0] == nullptr) {
    assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
    assert(edit_lists[0][0]->is_column_family_add_);
    assert(new_cf_options != nullptr);
  }
  std::deque<ManifestWriter> writers;
  if (num_cfds > 0) {
    assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
    assert(static_cast<size_t>(num_cfds) == edit_lists.size());
  }
  for (int i = 0; i < num_cfds; ++i) {
3390 3391
    writers.emplace_back(mu, column_family_datas[i],
                         *mutable_cf_options_list[i], edit_lists[i]);
3392 3393 3394 3395 3396 3397 3398 3399
    manifest_writers_.push_back(&writers[i]);
  }
  assert(!writers.empty());
  ManifestWriter& first_writer = writers.front();
  while (!first_writer.done && &first_writer != manifest_writers_.front()) {
    first_writer.cv.Wait();
  }
  if (first_writer.done) {
C
chenchanglong 已提交
3400 3401 3402
// All non-CF-manipulation operations can be grouped together and committed
// to MANIFEST. They should all have finished. The status code is stored in
// the first manifest writer.
3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427
#ifndef NDEBUG
    for (const auto& writer : writers) {
      assert(writer.done);
    }
#endif /* !NDEBUG */
    return first_writer.status;
  }

  int num_undropped_cfds = 0;
  for (auto cfd : column_family_datas) {
    // if cfd == nullptr, it is a column family add.
    if (cfd == nullptr || !cfd->IsDropped()) {
      ++num_undropped_cfds;
    }
  }
  if (0 == num_undropped_cfds) {
    // TODO (yanqin) maybe use a different status code to denote column family
    // drop other than OK and ShutdownInProgress
    for (int i = 0; i != num_cfds; ++i) {
      manifest_writers_.pop_front();
    }
    // Notify new head of manifest write queue.
    if (!manifest_writers_.empty()) {
      manifest_writers_.front()->cv.Signal();
    }
3428
    return Status::ShutdownInProgress();
3429 3430 3431 3432 3433 3434
  }

  return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
                               new_cf_options);
}

I
Igor Canadi 已提交
3435 3436
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
  assert(edit->IsColumnFamilyManipulation());
3437
  edit->SetNextFile(next_file_number_.load());
M
Maysam Yabandeh 已提交
3438 3439 3440 3441
  // The log might have data that is not visible to memtbale and hence have not
  // updated the last_sequence_ yet. It is also possible that the log has is
  // expecting some new data that is not written yet. Since LastSequence is an
  // upper bound on the sequence, it is ok to record
3442 3443 3444
  // last_allocated_sequence_ as the last sequence.
  edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
                                                      : last_sequence_);
I
Igor Canadi 已提交
3445 3446 3447 3448 3449 3450 3451
  if (edit->is_column_family_drop_) {
    // if we drop column family, we have to make sure to save max column family,
    // so that we don't reuse existing ID
    edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
  }
}

S
sdong 已提交
3452
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
A
Andrew Kryczka 已提交
3453
                                   VersionBuilder* builder, Version* /*v*/,
3454
                                   VersionEdit* edit, InstrumentedMutex* mu) {
3455 3456 3457
#ifdef NDEBUG
  (void)cfd;
#endif
3458
  mu->AssertHeld();
I
Igor Canadi 已提交
3459
  assert(!edit->IsColumnFamilyManipulation());
3460

3461 3462
  if (edit->has_log_number_) {
    assert(edit->log_number_ >= cfd->GetLogNumber());
3463
    assert(edit->log_number_ < next_file_number_.load());
I
Igor Canadi 已提交
3464
  }
3465

I
Igor Canadi 已提交
3466 3467 3468
  if (!edit->has_prev_log_number_) {
    edit->SetPrevLogNumber(prev_log_number_);
  }
3469
  edit->SetNextFile(next_file_number_.load());
M
Maysam Yabandeh 已提交
3470 3471 3472 3473
  // The log might have data that is not visible to memtbale and hence have not
  // updated the last_sequence_ yet. It is also possible that the log has is
  // expecting some new data that is not written yet. Since LastSequence is an
  // upper bound on the sequence, it is ok to record
3474 3475 3476
  // last_allocated_sequence_ as the last sequence.
  edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
                                                      : last_sequence_);
I
Igor Canadi 已提交
3477

3478
  builder->Apply(edit);
3479 3480
}

3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607
Status VersionSet::ApplyOneVersionEdit(
    VersionEdit& edit,
    const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
    std::unordered_map<int, std::string>& column_families_not_found,
    std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
    bool* have_log_number, uint64_t* /* log_number */,
    bool* have_prev_log_number, uint64_t* previous_log_number,
    bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
    SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
    uint32_t* max_column_family) {
  // Not found means that user didn't supply that column
  // family option AND we encountered column family add
  // record. Once we encounter column family drop record,
  // we will delete the column family from
  // column_families_not_found.
  bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
                          column_families_not_found.end());
  // in builders means that user supplied that column family
  // option AND that we encountered column family add record
  bool cf_in_builders = builders.find(edit.column_family_) != builders.end();

  // they can't both be true
  assert(!(cf_in_not_found && cf_in_builders));

  ColumnFamilyData* cfd = nullptr;

  if (edit.is_column_family_add_) {
    if (cf_in_builders || cf_in_not_found) {
      return Status::Corruption(
          "Manifest adding the same column family twice: " +
          edit.column_family_name_);
    }
    auto cf_options = name_to_options.find(edit.column_family_name_);
    if (cf_options == name_to_options.end()) {
      column_families_not_found.insert(
          {edit.column_family_, edit.column_family_name_});
    } else {
      cfd = CreateColumnFamily(cf_options->second, &edit);
      cfd->set_initialized();
      builders.insert(
          {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
    }
  } else if (edit.is_column_family_drop_) {
    if (cf_in_builders) {
      auto builder = builders.find(edit.column_family_);
      assert(builder != builders.end());
      delete builder->second;
      builders.erase(builder);
      cfd = column_family_set_->GetColumnFamily(edit.column_family_);
      assert(cfd != nullptr);
      if (cfd->Unref()) {
        delete cfd;
        cfd = nullptr;
      } else {
        // who else can have reference to cfd!?
        assert(false);
      }
    } else if (cf_in_not_found) {
      column_families_not_found.erase(edit.column_family_);
    } else {
      return Status::Corruption(
          "Manifest - dropping non-existing column family");
    }
  } else if (!cf_in_not_found) {
    if (!cf_in_builders) {
      return Status::Corruption(
          "Manifest record referencing unknown column family");
    }

    cfd = column_family_set_->GetColumnFamily(edit.column_family_);
    // this should never happen since cf_in_builders is true
    assert(cfd != nullptr);

    // if it is not column family add or column family drop,
    // then it's a file add/delete, which should be forwarded
    // to builder
    auto builder = builders.find(edit.column_family_);
    assert(builder != builders.end());
    builder->second->version_builder()->Apply(&edit);
  }

  if (cfd != nullptr) {
    if (edit.has_log_number_) {
      if (cfd->GetLogNumber() > edit.log_number_) {
        ROCKS_LOG_WARN(
            db_options_->info_log,
            "MANIFEST corruption detected, but ignored - Log numbers in "
            "records NOT monotonically increasing");
      } else {
        cfd->SetLogNumber(edit.log_number_);
        *have_log_number = true;
      }
    }
    if (edit.has_comparator_ &&
        edit.comparator_ != cfd->user_comparator()->Name()) {
      return Status::InvalidArgument(
          cfd->user_comparator()->Name(),
          "does not match existing comparator " + edit.comparator_);
    }
  }

  if (edit.has_prev_log_number_) {
    *previous_log_number = edit.prev_log_number_;
    *have_prev_log_number = true;
  }

  if (edit.has_next_file_number_) {
    *next_file = edit.next_file_number_;
    *have_next_file = true;
  }

  if (edit.has_max_column_family_) {
    *max_column_family = edit.max_column_family_;
  }

  if (edit.has_min_log_number_to_keep_) {
    *min_log_number_to_keep =
        std::max(*min_log_number_to_keep, edit.min_log_number_to_keep_);
  }

  if (edit.has_last_sequence_) {
    *last_sequence = edit.last_sequence_;
    *have_last_sequence = true;
  }
  return Status::OK();
}

I
Igor Canadi 已提交
3608
Status VersionSet::Recover(
3609 3610
    const std::vector<ColumnFamilyDescriptor>& column_families,
    bool read_only) {
I
Igor Canadi 已提交
3611 3612 3613 3614 3615 3616 3617
  std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
  for (auto cf : column_families) {
    cf_name_to_options.insert({cf.name, cf.options});
  }
  // keeps track of column families in manifest that were not found in
  // column families parameters. if those column families are not dropped
  // by subsequent manifest records, Recover() will return failure status
I
Igor Canadi 已提交
3618
  std::unordered_map<int, std::string> column_families_not_found;
J
jorlow@chromium.org 已提交
3619 3620

  // Read "CURRENT" file, which contains a pointer to the current manifest file
K
kailiu 已提交
3621
  std::string manifest_filename;
赵明 已提交
3622 3623
  Status s =
      ReadFileToString(env_, CurrentFileName(dbname_), &manifest_filename);
J
jorlow@chromium.org 已提交
3624 3625 3626
  if (!s.ok()) {
    return s;
  }
赵明 已提交
3627
  if (manifest_filename.empty() || manifest_filename.back() != '\n') {
J
jorlow@chromium.org 已提交
3628 3629
    return Status::Corruption("CURRENT file does not end with newline");
  }
K
kailiu 已提交
3630 3631
  // remove the trailing '\n'
  manifest_filename.resize(manifest_filename.size() - 1);
3632 3633 3634 3635 3636 3637
  FileType type;
  bool parse_ok =
      ParseFileName(manifest_filename, &manifest_file_number_, &type);
  if (!parse_ok || type != kDescriptorFile) {
    return Status::Corruption("CURRENT file corrupted");
  }
J
jorlow@chromium.org 已提交
3638

3639 3640
  ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
                 manifest_filename.c_str());
H
heyongqiang 已提交
3641

K
kailiu 已提交
3642
  manifest_filename = dbname_ + "/" + manifest_filename;
3643
  std::unique_ptr<SequentialFileReader> manifest_file_reader;
3644
  {
3645
    std::unique_ptr<SequentialFile> manifest_file;
3646
    s = env_->NewSequentialFile(manifest_filename, &manifest_file,
3647
                                env_->OptimizeForManifestRead(env_options_));
3648 3649 3650 3651
    if (!s.ok()) {
      return s;
    }
    manifest_file_reader.reset(
3652
        new SequentialFileReader(std::move(manifest_file), manifest_filename));
J
jorlow@chromium.org 已提交
3653
  }
I
Igor Canadi 已提交
3654
  uint64_t current_manifest_file_size;
3655
  uint64_t current_manifest_edit_count = 0;
I
Igor Canadi 已提交
3656
  s = env_->GetFileSize(manifest_filename, &current_manifest_file_size);
3657 3658 3659
  if (!s.ok()) {
    return s;
  }
J
jorlow@chromium.org 已提交
3660 3661

  bool have_log_number = false;
3662
  bool have_prev_log_number = false;
J
jorlow@chromium.org 已提交
3663 3664 3665
  bool have_next_file = false;
  bool have_last_sequence = false;
  uint64_t next_file = 0;
3666 3667
  uint64_t last_sequence = 0;
  uint64_t log_number = 0;
I
Igor Canadi 已提交
3668
  uint64_t previous_log_number = 0;
3669
  uint32_t max_column_family = 0;
S
Siying Dong 已提交
3670
  uint64_t min_log_number_to_keep = 0;
S
sdong 已提交
3671
  std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
J
jorlow@chromium.org 已提交
3672

3673
  // add default column family
3674
  auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
I
Igor Canadi 已提交
3675
  if (default_cf_iter == cf_name_to_options.end()) {
I
Igor Canadi 已提交
3676
    return Status::InvalidArgument("Default column family not specified");
I
Igor Canadi 已提交
3677
  }
I
Igor Canadi 已提交
3678
  VersionEdit default_cf_edit;
3679
  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
I
Igor Canadi 已提交
3680 3681 3682
  default_cf_edit.SetColumnFamily(0);
  ColumnFamilyData* default_cfd =
      CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
3683 3684 3685
  // In recovery, nobody else can access it, so it's fine to set it to be
  // initialized earlier.
  default_cfd->set_initialized();
S
sdong 已提交
3686
  builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
3687

J
jorlow@chromium.org 已提交
3688
  {
I
Igor Canadi 已提交
3689
    VersionSet::LogReporter reporter;
J
jorlow@chromium.org 已提交
3690
    reporter.status = &s;
3691
    log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
3692 3693
                       true /* checksum */, 0 /* log_number */,
                       false /* retry_after_eof */);
J
jorlow@chromium.org 已提交
3694 3695
    Slice record;
    std::string scratch;
3696 3697
    std::vector<VersionEdit> replay_buffer;
    size_t num_entries_decoded = 0;
J
jorlow@chromium.org 已提交
3698
    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3699
      VersionEdit edit;
J
jorlow@chromium.org 已提交
3700
      s = edit.DecodeFrom(record);
3701 3702
      if (!s.ok()) {
        break;
J
jorlow@chromium.org 已提交
3703
      }
3704
      ++current_manifest_edit_count;
J
jorlow@chromium.org 已提交
3705

3706 3707 3708
      if (edit.is_in_atomic_group_) {
        if (replay_buffer.empty()) {
          replay_buffer.resize(edit.remaining_entries_ + 1);
3709 3710
          TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:FirstInAtomicGroup",
                                   &edit);
I
Igor Canadi 已提交
3711
        }
3712 3713 3714
        ++num_entries_decoded;
        if (num_entries_decoded + edit.remaining_entries_ !=
            static_cast<uint32_t>(replay_buffer.size())) {
3715 3716 3717 3718
          TEST_SYNC_POINT_CALLBACK(
              "VersionSet::Recover:IncorrectAtomicGroupSize", &edit);
          s = Status::Corruption("corrupted atomic group");
          break;
I
Igor Canadi 已提交
3719
        }
3720 3721
        replay_buffer[num_entries_decoded - 1] = std::move(edit);
        if (num_entries_decoded == replay_buffer.size()) {
3722 3723
          TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
                                   &edit);
3724
          for (auto& e : replay_buffer) {
Z
ZhaoMing 已提交
3725
            e.set_open_db(true);
3726 3727 3728 3729 3730 3731 3732 3733 3734
            s = ApplyOneVersionEdit(
                e, cf_name_to_options, column_families_not_found, builders,
                &have_log_number, &log_number, &have_prev_log_number,
                &previous_log_number, &have_next_file, &next_file,
                &have_last_sequence, &last_sequence, &min_log_number_to_keep,
                &max_column_family);
            if (!s.ok()) {
              break;
            }
I
Igor Canadi 已提交
3735
          }
3736 3737
          replay_buffer.clear();
          num_entries_decoded = 0;
3738
        }
3739
        TEST_SYNC_POINT("VersionSet::Recover:AtomicGroup");
3740 3741
      } else {
        if (!replay_buffer.empty()) {
3742 3743 3744 3745
          TEST_SYNC_POINT_CALLBACK(
              "VersionSet::Recover:AtomicGroupMixedWithNormalEdits", &edit);
          s = Status::Corruption("corrupted atomic group");
          break;
I
Igor Canadi 已提交
3746
        }
Z
ZhaoMing 已提交
3747
        edit.set_open_db(true);
3748 3749 3750 3751 3752 3753
        s = ApplyOneVersionEdit(
            edit, cf_name_to_options, column_families_not_found, builders,
            &have_log_number, &log_number, &have_prev_log_number,
            &previous_log_number, &have_next_file, &next_file,
            &have_last_sequence, &last_sequence, &min_log_number_to_keep,
            &max_column_family);
3754
      }
3755 3756
      if (!s.ok()) {
        break;
J
jorlow@chromium.org 已提交
3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768
      }
    }
  }

  if (s.ok()) {
    if (!have_next_file) {
      s = Status::Corruption("no meta-nextfile entry in descriptor");
    } else if (!have_log_number) {
      s = Status::Corruption("no meta-lognumber entry in descriptor");
    } else if (!have_last_sequence) {
      s = Status::Corruption("no last-sequence-number entry in descriptor");
    }
3769 3770

    if (!have_prev_log_number) {
I
Igor Canadi 已提交
3771
      previous_log_number = 0;
3772
    }
3773

3774 3775
    column_family_set_->UpdateMaxColumnFamily(max_column_family);

S
Siying Dong 已提交
3776 3777 3778
    // When reading DB generated using old release, min_log_number_to_keep=0.
    // All log files will be scanned for potential prepare entries.
    MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
A
Andrew Kryczka 已提交
3779 3780
    MarkFileNumberUsed(previous_log_number);
    MarkFileNumberUsed(log_number);
J
jorlow@chromium.org 已提交
3781 3782
  }

I
Igor Canadi 已提交
3783
  // there were some column families in the MANIFEST that weren't specified
3784
  // in the argument. This is OK in read_only mode
Z
ZhaoMing 已提交
3785
  if (!read_only && !column_families_not_found.empty()) {
3786
    std::string list_of_not_found;
I
Igor Canadi 已提交
3787 3788
    for (const auto& cf : column_families_not_found) {
      list_of_not_found += ", " + cf.second;
3789 3790
    }
    list_of_not_found = list_of_not_found.substr(2);
I
Igor Canadi 已提交
3791
    s = Status::InvalidArgument(
I
Igor Canadi 已提交
3792 3793
        "You have to open all column families. Column families not opened: " +
        list_of_not_found);
I
Igor Canadi 已提交
3794 3795
  }

3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807
  if (s.ok()) {
    for (auto cfd : *column_family_set_) {
      assert(builders.count(cfd->GetID()) > 0);
      auto* builder = builders[cfd->GetID()]->version_builder();
      if (!builder->CheckConsistencyForNumLevels()) {
        s = Status::InvalidArgument(
            "db has more levels than options.num_levels");
        break;
      }
    }
  }

J
jorlow@chromium.org 已提交
3808
  if (s.ok()) {
Z
ZhaoMing 已提交
3809
    for (auto cfd : *column_family_set_) {
3810 3811 3812
      if (cfd->IsDropped()) {
        continue;
      }
3813 3814 3815
      if (read_only) {
        cfd->table_cache()->SetTablesAreImmortal();
      }
3816
      assert(cfd->initialized());
3817 3818
      auto builders_iter = builders.find(cfd->GetID());
      assert(builders_iter != builders.end());
3819
      auto* builder = builders_iter->second->version_builder();
3820

3821 3822
      if (GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
          TableCache::kInfiniteCapacity) {
3823 3824
        // unlimited table cache. Pre-load table handle now.
        // Need to do it out of the mutex.
3825
        builder->LoadTableHandlers(
Z
ZhaoMing 已提交
3826
            cfd->internal_stats(),
3827
            false /* prefetch_index_and_filter_in_cache */,
Z
ZhaoMing 已提交
3828 3829
            cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
            db_options_->max_file_opening_threads);
3830
      }
3831 3832 3833
      builder->UpgradeFileMetaData(
          cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
          db_options_->max_file_opening_threads);
3834

3835 3836 3837
      Version* v = new Version(cfd, this, env_options_,
                               *cfd->GetLatestMutableCFOptions(),
                               current_version_number_++);
S
sdong 已提交
3838
      builder->SaveTo(v->storage_info());
3839 3840
      v->storage_info()->FinishAddFile(
          builders_iter->second->version_storage()->oldest_snapshot_seqnum());
3841

3842
      // Install recovered version
Z
ZhaoMing 已提交
3843
      v->PrepareApply(*cfd->GetLatestMutableCFOptions());
I
Igor Canadi 已提交
3844
      AppendVersion(cfd, v);
3845
    }
3846

I
Igor Canadi 已提交
3847
    manifest_file_size_ = current_manifest_file_size;
3848
    manifest_edit_count_ = current_manifest_edit_count;
3849
    next_file_number_.store(next_file + 1);
3850
    last_allocated_sequence_ = last_sequence;
3851
    last_published_sequence_ = last_sequence;
3852
    last_sequence_ = last_sequence;
I
Igor Canadi 已提交
3853
    prev_log_number_ = previous_log_number;
H
heyongqiang 已提交
3854

3855 3856
    ROCKS_LOG_INFO(
        db_options_->info_log,
3857
        "Recovered from manifest file:%s succeeded,"
K
Kai Liu 已提交
3858 3859
        "manifest_file_number is %lu, next_file_number is %lu, "
        "last_sequence is %lu, log_number is %lu,"
3860
        "prev_log_number is %lu,"
S
Siying Dong 已提交
3861 3862
        "max_column_family is %u,"
        "min_log_number_to_keep is %lu\n",
3863
        manifest_filename.c_str(), (unsigned long)manifest_file_number_,
3864
        (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
3865
        (unsigned long)log_number, (unsigned long)prev_log_number_,
S
Siying Dong 已提交
3866
        column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
3867 3868

    for (auto cfd : *column_family_set_) {
3869 3870 3871
      if (cfd->IsDropped()) {
        continue;
      }
3872 3873 3874
      ROCKS_LOG_INFO(db_options_->info_log,
                     "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
                     cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
3875
    }
J
jorlow@chromium.org 已提交
3876 3877
  }

Y
Yi Wu 已提交
3878
  for (auto& builder : builders) {
3879 3880 3881
    delete builder.second;
  }

J
jorlow@chromium.org 已提交
3882 3883 3884
  return s;
}

I
Igor Canadi 已提交
3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
                                      const std::string& dbname, Env* env) {
  // these are just for performance reasons, not correcntes,
  // so we're fine using the defaults
  EnvOptions soptions;
  // Read "CURRENT" file, which contains a pointer to the current manifest file
  std::string current;
  Status s = ReadFileToString(env, CurrentFileName(dbname), &current);
  if (!s.ok()) {
    return s;
  }
奏之章 已提交
3896
  if (current.empty() || current[current.size() - 1] != '\n') {
I
Igor Canadi 已提交
3897 3898 3899 3900 3901
    return Status::Corruption("CURRENT file does not end with newline");
  }
  current.resize(current.size() - 1);

  std::string dscname = dbname + "/" + current;
3902

3903
  std::unique_ptr<SequentialFileReader> file_reader;
3904
  {
3905 3906 3907 3908
    std::unique_ptr<SequentialFile> file;
    s = env->NewSequentialFile(dscname, &file, soptions);
    if (!s.ok()) {
      return s;
赵明 已提交
3909 3910
    }
    file_reader.reset(new SequentialFileReader(std::move(file), dscname));
3911
  }
I
Igor Canadi 已提交
3912 3913 3914

  std::map<uint32_t, std::string> column_family_names;
  // default column family is always implicitly there
3915
  column_family_names.insert({0, kDefaultColumnFamilyName});
I
Igor Canadi 已提交
3916 3917
  VersionSet::LogReporter reporter;
  reporter.status = &s;
3918
  log::Reader reader(nullptr, std::move(file_reader), &reporter,
3919 3920
                     true /* checksum */, 0 /* log_number */,
                     false /* retry_after_eof */);
I
Igor Canadi 已提交
3921 3922 3923
  Slice record;
  std::string scratch;
  while (reader.ReadRecord(&record, &scratch) && s.ok()) {
3924 3925 3926 3927 3928 3929
    VersionEdit edit;
    s = edit.DecodeFrom(record);
    if (!s.ok()) {
      break;
    }
    if (edit.is_column_family_add_) {
3930 3931 3932 3933 3934
      if (column_family_names.find(edit.column_family_) !=
          column_family_names.end()) {
        s = Status::Corruption("Manifest adding the same column family twice");
        break;
      }
3935 3936 3937
      column_family_names.insert(
          {edit.column_family_, edit.column_family_name_});
    } else if (edit.is_column_family_drop_) {
3938 3939 3940 3941 3942 3943
      if (column_family_names.find(edit.column_family_) ==
          column_family_names.end()) {
        s = Status::Corruption(
            "Manifest - dropping non-existing column family");
        break;
      }
3944 3945
      column_family_names.erase(edit.column_family_);
    }
I
Igor Canadi 已提交
3946 3947 3948 3949 3950 3951
  }

  column_families->clear();
  if (s.ok()) {
    for (const auto& iter : column_family_names) {
      column_families->push_back(iter.second);
3952
    }
I
Igor Canadi 已提交
3953 3954 3955 3956
  }

  return s;
}
3957

I
Igor Canadi 已提交
3958
#ifndef ROCKSDB_LITE
3959 3960
Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
                                        const Options* options,
3961
                                        const EnvOptions& env_options,
3962 3963 3964 3965 3966 3967
                                        int new_levels) {
  if (new_levels <= 1) {
    return Status::InvalidArgument(
        "Number of levels needs to be bigger than 1");
  }

3968
  ImmutableDBOptions db_options(*options);
I
Igor Canadi 已提交
3969
  ColumnFamilyOptions cf_options(*options);
3970 3971
  std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
                                        options->table_cache_numshardbits));
S
sdong 已提交
3972
  WriteController wc(options->delayed_write_rate);
3973
  WriteBufferManager wb(options->db_write_buffer_size);
R
rockeet 已提交
3974
  const bool seq_per_batch = false;
赵明 已提交
3975 3976
  VersionSet versions(dbname, &db_options, env_options, seq_per_batch, tc.get(),
                      &wb, &wc);
3977 3978
  Status status;

3979
  std::vector<ColumnFamilyDescriptor> dummy;
3980
  ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
I
Igor Canadi 已提交
3981
                                          ColumnFamilyOptions(*options));
I
Igor Canadi 已提交
3982
  dummy.push_back(dummy_descriptor);
3983
  status = versions.Recover(dummy);
3984 3985 3986 3987
  if (!status.ok()) {
    return status;
  }

3988
  Version* current_version =
3989
      versions.GetColumnFamilySet()->GetDefault()->current();
S
sdong 已提交
3990
  auto* vstorage = current_version->storage_info();
3991
  int current_levels = vstorage->num_levels();
3992 3993 3994 3995 3996 3997 3998 3999 4000 4001

  if (current_levels <= new_levels) {
    return Status::OK();
  }

  // Make sure there are file only on one level from
  // (new_levels-1) to (current_levels-1)
  int first_nonempty_level = -1;
  int first_nonempty_level_filenum = 0;
  for (int i = new_levels - 1; i < current_levels; i++) {
S
sdong 已提交
4002
    int file_num = vstorage->NumLevelFiles(i);
4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018
    if (file_num != 0) {
      if (first_nonempty_level < 0) {
        first_nonempty_level = i;
        first_nonempty_level_filenum = file_num;
      } else {
        char msg[255];
        snprintf(msg, sizeof(msg),
                 "Found at least two levels containing files: "
                 "[%d:%d],[%d:%d].\n",
                 first_nonempty_level, first_nonempty_level_filenum, i,
                 file_num);
        return Status::InvalidArgument(msg);
      }
    }
  }

I
Igor Canadi 已提交
4019 4020 4021
  // we need to allocate an array with the old number of levels size to
  // avoid SIGSEGV in WriteSnapshot()
  // however, all levels bigger or equal to new_levels will be empty
4022
  std::vector<FileMetaData*>* new_files_list =
奏之章 已提交
4023
      new std::vector<FileMetaData*>[current_levels + 1];
奏之章 已提交
4024 4025
  ++new_files_list;
  for (int i = -1; i < new_levels - 1; i++) {
S
sdong 已提交
4026
    new_files_list[i] = vstorage->LevelFiles(i);
4027 4028 4029
  }

  if (first_nonempty_level > 0) {
S
sdong 已提交
4030
    new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
4031 4032
  }

赵明 已提交
4033
  delete[](vstorage->files_ - 1);
S
sdong 已提交
4034 4035
  vstorage->files_ = new_files_list;
  vstorage->num_levels_ = new_levels;
4036

Y
Yi Wu 已提交
4037
  MutableCFOptions mutable_cf_options(*options);
4038
  VersionEdit ve;
4039 4040
  InstrumentedMutex dummy_mutex;
  InstrumentedMutexLock l(&dummy_mutex);
赵明 已提交
4041 4042 4043
  return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(),
                              mutable_cf_options, &ve, &dummy_mutex, nullptr,
                              true);
4044 4045
}

I
Igor Canadi 已提交
4046
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
4047
                                bool verbose, bool hex, bool json) {
4048
  // Open the specified manifest file.
4049
  std::unique_ptr<SequentialFileReader> file_reader;
4050 4051
  Status s;
  {
4052
    std::unique_ptr<SequentialFile> file;
4053 4054
    s = options.env->NewSequentialFile(
        dscname, &file, env_->OptimizeForManifestRead(env_options_));
4055 4056 4057
    if (!s.ok()) {
      return s;
    }
4058
    file_reader.reset(new SequentialFileReader(std::move(file), dscname));
4059 4060 4061 4062 4063 4064 4065
  }

  bool have_prev_log_number = false;
  bool have_next_file = false;
  bool have_last_sequence = false;
  uint64_t next_file = 0;
  uint64_t last_sequence = 0;
I
Igor Canadi 已提交
4066
  uint64_t previous_log_number = 0;
4067
  int count = 0;
4068
  std::unordered_map<uint32_t, std::string> comparators;
S
sdong 已提交
4069
  std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
4070 4071 4072

  // add default column family
  VersionEdit default_cf_edit;
4073
  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4074 4075 4076
  default_cf_edit.SetColumnFamily(0);
  ColumnFamilyData* default_cfd =
      CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
S
sdong 已提交
4077
  builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
4078 4079

  {
I
Igor Canadi 已提交
4080
    VersionSet::LogReporter reporter;
4081
    reporter.status = &s;
4082
    log::Reader reader(nullptr, std::move(file_reader), &reporter,
4083 4084
                       true /* checksum */, 0 /* log_number */,
                       false /* retry_after_eof */);
4085 4086 4087
    Slice record;
    std::string scratch;
    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4088
      VersionEdit edit;
4089
      s = edit.DecodeFrom(record);
4090 4091
      if (!s.ok()) {
        break;
4092 4093
      }

4094
      // Write out each individual edit
4095 4096 4097 4098
      if (verbose && !json) {
        printf("%s\n", edit.DebugString(hex).c_str());
      } else if (json) {
        printf("%s\n", edit.DebugJSON(count, hex).c_str());
4099 4100 4101
      }
      count++;

4102 4103 4104 4105 4106
      bool cf_in_builders =
          builders.find(edit.column_family_) != builders.end();

      if (edit.has_comparator_) {
        comparators.insert({edit.column_family_, edit.comparator_});
4107 4108
      }

4109 4110
      ColumnFamilyData* cfd = nullptr;

4111 4112 4113 4114 4115 4116
      if (edit.is_column_family_add_) {
        if (cf_in_builders) {
          s = Status::Corruption(
              "Manifest adding the same column family twice");
          break;
        }
4117
        cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
4118
        cfd->set_initialized();
S
sdong 已提交
4119 4120
        builders.insert(
            {edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
4121 4122 4123 4124 4125 4126 4127 4128 4129 4130
      } else if (edit.is_column_family_drop_) {
        if (!cf_in_builders) {
          s = Status::Corruption(
              "Manifest - dropping non-existing column family");
          break;
        }
        auto builder_iter = builders.find(edit.column_family_);
        delete builder_iter->second;
        builders.erase(builder_iter);
        comparators.erase(edit.column_family_);
4131
        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4132 4133 4134
        assert(cfd != nullptr);
        cfd->Unref();
        delete cfd;
4135
        cfd = nullptr;
4136 4137 4138 4139 4140 4141 4142
      } else {
        if (!cf_in_builders) {
          s = Status::Corruption(
              "Manifest record referencing unknown column family");
          break;
        }

4143
        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4144 4145 4146 4147 4148 4149 4150 4151
        // this should never happen since cf_in_builders is true
        assert(cfd != nullptr);

        // if it is not column family add or column family drop,
        // then it's a file add/delete, which should be forwarded
        // to builder
        auto builder = builders.find(edit.column_family_);
        assert(builder != builders.end());
4152
        builder->second->version_builder()->Apply(&edit);
4153 4154
      }

4155 4156 4157 4158
      if (cfd != nullptr && edit.has_log_number_) {
        cfd->SetLogNumber(edit.log_number_);
      }

4159
      if (edit.has_prev_log_number_) {
I
Igor Canadi 已提交
4160
        previous_log_number = edit.prev_log_number_;
4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172
        have_prev_log_number = true;
      }

      if (edit.has_next_file_number_) {
        next_file = edit.next_file_number_;
        have_next_file = true;
      }

      if (edit.has_last_sequence_) {
        last_sequence = edit.last_sequence_;
        have_last_sequence = true;
      }
4173 4174 4175 4176

      if (edit.has_max_column_family_) {
        column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
      }
S
Siying Dong 已提交
4177 4178 4179 4180

      if (edit.has_min_log_number_to_keep_) {
        MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
      }
4181 4182
    }
  }
4183
  file_reader.reset();
4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194

  if (s.ok()) {
    if (!have_next_file) {
      s = Status::Corruption("no meta-nextfile entry in descriptor");
      printf("no meta-nextfile entry in descriptor");
    } else if (!have_last_sequence) {
      printf("no last-sequence-number entry in descriptor");
      s = Status::Corruption("no last-sequence-number entry in descriptor");
    }

    if (!have_prev_log_number) {
I
Igor Canadi 已提交
4195
      previous_log_number = 0;
4196 4197 4198 4199
    }
  }

  if (s.ok()) {
4200
    for (auto cfd : *column_family_set_) {
4201 4202 4203
      if (cfd->IsDropped()) {
        continue;
      }
4204 4205
      auto builders_iter = builders.find(cfd->GetID());
      assert(builders_iter != builders.end());
4206
      auto builder = builders_iter->second->version_builder();
4207

4208 4209 4210
      Version* v = new Version(cfd, this, env_options_,
                               *cfd->GetLatestMutableCFOptions(),
                               current_version_number_++);
S
sdong 已提交
4211
      builder->SaveTo(v->storage_info());
4212 4213
      v->storage_info()->FinishAddFile(
          builders_iter->second->version_storage()->oldest_snapshot_seqnum());
Z
ZhaoMing 已提交
4214
      v->PrepareApply(*cfd->GetLatestMutableCFOptions());
4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227

      printf("--------------- Column family \"%s\"  (ID %u) --------------\n",
             cfd->GetName().c_str(), (unsigned int)cfd->GetID());
      printf("log number: %lu\n", (unsigned long)cfd->GetLogNumber());
      auto comparator = comparators.find(cfd->GetID());
      if (comparator != comparators.end()) {
        printf("comparator: %s\n", comparator->second.c_str());
      } else {
        printf("comparator: <NO COMPARATOR>\n");
      }
      printf("%s \n", v->DebugString(hex).c_str());
      delete v;
    }
4228

4229 4230 4231 4232 4233
    // Free builders
    for (auto& builder : builders) {
      delete builder.second;
    }

4234
    next_file_number_.store(next_file + 1);
4235
    last_allocated_sequence_ = last_sequence;
4236
    last_published_sequence_ = last_sequence;
4237
    last_sequence_ = last_sequence;
I
Igor Canadi 已提交
4238
    prev_log_number_ = previous_log_number;
4239

4240
    printf(
4241
        "next_file_number %lu last_sequence "
S
Siying Dong 已提交
4242 4243
        "%lu  prev_log_number %lu max_column_family %u min_log_number_to_keep "
        "%" PRIu64 "\n",
4244
        (unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
I
Igor Canadi 已提交
4245
        (unsigned long)previous_log_number,
S
Siying Dong 已提交
4246
        column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
4247
  }
4248

4249 4250
  return s;
}
I
Igor Canadi 已提交
4251
#endif  // ROCKSDB_LITE
4252

A
Andrew Kryczka 已提交
4253 4254 4255
void VersionSet::MarkFileNumberUsed(uint64_t number) {
  // only called during recovery and repair which are single threaded, so this
  // works because there can't be concurrent calls
4256 4257
  if (next_file_number_.load(std::memory_order_relaxed) <= number) {
    next_file_number_.store(number + 1, std::memory_order_relaxed);
4258 4259 4260
  }
}

S
Siying Dong 已提交
4261 4262 4263 4264 4265 4266 4267 4268
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
  if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
    min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
  }
}

J
jorlow@chromium.org 已提交
4269 4270
Status VersionSet::WriteSnapshot(log::Writer* log) {
  // TODO: Break up into multiple records to reduce memory usage on recovery?
4271

I
Igor Canadi 已提交
4272 4273
  // WARNING: This method doesn't hold a mutex!!

I
Igor Canadi 已提交
4274 4275
  // This is done without DB mutex lock held, but only within single-threaded
  // LogAndApply. Column family manipulations can only happen within LogAndApply
I
Igor Canadi 已提交
4276
  // (the same single thread), so we're safe to iterate.
I
Igor Canadi 已提交
4277
  for (auto cfd : *column_family_set_) {
4278 4279 4280
    if (cfd->IsDropped()) {
      continue;
    }
4281
    assert(cfd->initialized());
4282 4283 4284
    {
      // Store column family info
      VersionEdit edit;
4285
      if (cfd->GetID() != 0) {
4286 4287
        // default column family is always there,
        // no need to explicitly write it
4288 4289
        edit.AddColumnFamily(cfd->GetName());
        edit.SetColumnFamily(cfd->GetID());
I
Igor Canadi 已提交
4290 4291 4292 4293
      }
      edit.SetComparatorName(
          cfd->internal_comparator().user_comparator()->Name());
      std::string record;
4294
      if (!edit.EncodeTo(&record)) {
赵明 已提交
4295 4296
        return Status::Corruption("Unable to Encode VersionEdit:" +
                                  edit.DebugString(true));
4297
      }
I
Igor Canadi 已提交
4298 4299 4300
      Status s = log->AddRecord(record);
      if (!s.ok()) {
        return s;
4301
      }
4302
    }
4303

4304 4305 4306
    {
      // Save files
      VersionEdit edit;
4307
      edit.SetColumnFamily(cfd->GetID());
4308

奏之章 已提交
4309
      for (int level = -1; level < cfd->NumberLevels(); level++) {
S
sdong 已提交
4310 4311
        for (const auto& f :
             cfd->current()->storage_info()->LevelFiles(level)) {
4312 4313
          edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
                       f->fd.GetFileSize(), f->smallest, f->largest,
4314
                       f->fd.smallest_seqno, f->fd.largest_seqno,
Z
ZhaoMing 已提交
4315
                       f->marked_for_compaction, f->prop);
4316 4317
        }
      }
4318
      edit.SetLogNumber(cfd->GetLogNumber());
4319
      std::string record;
4320
      if (!edit.EncodeTo(&record)) {
赵明 已提交
4321 4322
        return Status::Corruption("Unable to Encode VersionEdit:" +
                                  edit.DebugString(true));
4323
      }
4324 4325 4326
      Status s = log->AddRecord(record);
      if (!s.ok()) {
        return s;
4327
      }
4328 4329 4330
    }
  }

I
Igor Canadi 已提交
4331
  return Status::OK();
J
jorlow@chromium.org 已提交
4332 4333
}

4334 4335 4336 4337 4338 4339
// TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
// function is called repeatedly with consecutive pairs of slices. For example
// if the slice list is [a, b, c, d] this function is called with arguments
// (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
// we avoid doing binary search for the keys b and c twice and instead somehow
// maintain state of where they first appear in the files.
4340
uint64_t VersionSet::ApproximateSize(Version* v, const Slice& start,
4341 4342
                                     const Slice& end, int start_level,
                                     int end_level) {
4343 4344
  // pre-condition
  assert(v->cfd_->internal_comparator().Compare(start, end) <= 0);
4345

4346
  uint64_t size = 0;
S
sdong 已提交
4347
  const auto* vstorage = v->storage_info();
4348 4349 4350
  end_level = end_level == -1
                  ? vstorage->num_non_empty_levels()
                  : std::min(end_level, vstorage->num_non_empty_levels());
4351

4352 4353 4354
  assert(start_level <= end_level);

  for (int level = start_level; level < end_level; level++) {
4355 4356 4357 4358 4359 4360 4361 4362 4363 4364 4365 4366 4367 4368 4369 4370
    const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
    if (!files_brief.num_files) {
      // empty level, skip exploration
      continue;
    }

    if (!level) {
      // level 0 data is sorted order, handle the use case explicitly
      size += ApproximateSizeLevel0(v, files_brief, start, end);
      continue;
    }

    assert(level > 0);
    assert(files_brief.num_files > 0);

    // identify the file position for starting key
I
Igor Canadi 已提交
4371 4372 4373
    const uint64_t idx_start = FindFileInRange(
        v->cfd_->internal_comparator(), files_brief, start,
        /*start=*/0, static_cast<uint32_t>(files_brief.num_files - 1));
4374 4375 4376 4377 4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391 4392 4393
    assert(idx_start < files_brief.num_files);

    // scan all files from the starting position until the ending position
    // inferred from the sorted order
    for (uint64_t i = idx_start; i < files_brief.num_files; i++) {
      uint64_t val;
      val = ApproximateSize(v, files_brief.files[i], end);
      if (!val) {
        // the files after this will not have the range
        break;
      }

      size += val;

      if (i == idx_start) {
        // subtract the bytes needed to be scanned to get to the starting
        // key
        val = ApproximateSize(v, files_brief.files[i], start);
        assert(size >= val);
        size -= val;
J
jorlow@chromium.org 已提交
4394 4395 4396
      }
    }
  }
4397 4398 4399 4400 4401 4402 4403 4404 4405 4406 4407 4408 4409 4410 4411 4412 4413 4414 4415 4416 4417 4418 4419 4420 4421

  return size;
}

uint64_t VersionSet::ApproximateSizeLevel0(Version* v,
                                           const LevelFilesBrief& files_brief,
                                           const Slice& key_start,
                                           const Slice& key_end) {
  // level 0 files are not in sorted order, we need to iterate through
  // the list to compute the total bytes that require scanning
  uint64_t size = 0;
  for (size_t i = 0; i < files_brief.num_files; i++) {
    const uint64_t start = ApproximateSize(v, files_brief.files[i], key_start);
    const uint64_t end = ApproximateSize(v, files_brief.files[i], key_end);
    assert(end >= start);
    size += end - start;
  }
  return size;
}

uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
                                     const Slice& key) {
  // pre-condition
  assert(v);

赵明 已提交
4422
  struct {
Z
ZhaoMing 已提交
4423
    uint64_t (*callback)(void*, const FileMetaData*, uint64_t);
赵明 已提交
4424 4425
    void* args;
  } approximate_size;
Z
ZhaoMing 已提交
4426 4427 4428
  auto approximate_size_lambda = [v, &approximate_size, &key](
                                     const FileMetaData* file_meta,
                                     uint64_t entry_count) {
4429
    uint64_t result = 0;
Z
ZhaoMing 已提交
4430 4431 4432
    double ratio = file_meta->prop.num_entries == 0
                       ? 1
                       : double(entry_count) / file_meta->prop.num_entries;
陈常龙 已提交
4433
    auto vstorage = v->storage_info();
赵明 已提交
4434
    if (!file_meta->prop.is_map_sst()) {
4435 4436 4437
      auto& icomp = v->cfd_->internal_comparator();
      if (icomp.Compare(file_meta->largest.Encode(), key) <= 0) {
        // Entire file is before "key", so just add the file size
4438
        result = vstorage->FileSizeWithBlob(file_meta, true, ratio);
4439 4440 4441 4442 4443 4444
      } else if (icomp.Compare(file_meta->smallest.Encode(), key) > 0) {
        // Entire file is after "key", so ignore
        result = 0;
      } else {
        // "key" falls in the range for this table.  Add the
        // approximate offset of "key" within the table.
赵明 已提交
4445
        TableReader* table_reader_ptr = file_meta->fd.table_reader;
4446 4447 4448 4449 4450 4451
        if (table_reader_ptr != nullptr) {
          result = table_reader_ptr->ApproximateOffsetOf(key);
        } else {
          TableCache* table_cache = v->cfd_->table_cache();
          Cache::Handle* handle = nullptr;
          auto s = table_cache->FindTable(
赵明 已提交
4452 4453
              v->env_options_, v->cfd_->internal_comparator(), file_meta->fd,
              &handle, v->GetMutableCFOptions().prefix_extractor.get());
4454 4455 4456 4457 4458 4459
          if (s.ok()) {
            table_reader_ptr = table_cache->GetTableReaderFromHandle(handle);
            result = table_reader_ptr->ApproximateOffsetOf(key);
            table_cache->ReleaseHandle(handle);
          }
        }
Z
ZhaoMing 已提交
4460
        if (result > 0) {
4461 4462
          result = uint64_t(double(result) / file_meta->fd.GetFileSize() *
                            vstorage->FileSizeWithBlob(file_meta, true, ratio));
陈常龙 已提交
4463
        }
4464 4465
      }
    } else {
Z
ZhaoMing 已提交
4466
      auto& dependence_map = v->storage_info()->dependence_map();
Z
ZhaoMing 已提交
4467 4468
      for (auto& dependence : file_meta->prop.dependence) {
        auto find = dependence_map.find(dependence.file_number);
Z
ZhaoMing 已提交
4469
        if (find == dependence_map.end()) {
4470 4471 4472
          // TODO log error
          continue;
        }
Z
ZhaoMing 已提交
4473 4474
        result += approximate_size.callback(approximate_size.args, find->second,
                                            dependence.entry_count);
4475 4476 4477 4478
      }
    }
    return result;
  };
赵明 已提交
4479 4480
  approximate_size.callback = c_style_callback(approximate_size_lambda);
  approximate_size.args = &approximate_size_lambda;
Z
ZhaoMing 已提交
4481 4482
  return approximate_size_lambda(f.file_metadata,
                                 f.file_metadata->prop.num_entries);
J
jorlow@chromium.org 已提交
4483 4484
}

4485
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
4486 4487
  // pre-calculate space requirement
  int64_t total_files = 0;
I
Igor Canadi 已提交
4488
  for (auto cfd : *column_family_set_) {
4489 4490 4491
    if (!cfd->initialized()) {
      continue;
    }
4492 4493
    Version* dummy_versions = cfd->dummy_versions();
    for (Version* v = dummy_versions->next_; v != dummy_versions;
I
Igor Canadi 已提交
4494
         v = v->next_) {
S
sdong 已提交
4495
      const auto* vstorage = v->storage_info();
奏之章 已提交
4496
      for (int level = -1; level < vstorage->num_levels(); level++) {
S
sdong 已提交
4497
        total_files += vstorage->LevelFiles(level).size();
4498
      }
4499 4500 4501 4502
    }
  }

  // just one time extension to the right size
4503
  live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
4504

I
Igor Canadi 已提交
4505
  for (auto cfd : *column_family_set_) {
4506 4507 4508
    if (!cfd->initialized()) {
      continue;
    }
4509 4510
    auto* current = cfd->current();
    bool found_current = false;
4511 4512
    Version* dummy_versions = cfd->dummy_versions();
    for (Version* v = dummy_versions->next_; v != dummy_versions;
I
Igor Canadi 已提交
4513
         v = v->next_) {
4514 4515 4516
      v->AddLiveFiles(live_list);
      if (v == current) {
        found_current = true;
J
jorlow@chromium.org 已提交
4517 4518
      }
    }
4519 4520 4521 4522 4523
    if (!found_current && current != nullptr) {
      // Should never happen unless it is a bug.
      assert(false);
      current->AddLiveFiles(live_list);
    }
J
jorlow@chromium.org 已提交
4524 4525 4526
  }
}

4527
InternalIterator* VersionSet::MakeInputIterator(
4528 4529
    const Compaction* c, RangeDelAggregator* range_del_agg,
    const EnvOptions& env_options_compactions) {
L
Lei Jin 已提交
4530 4531
  auto cfd = c->column_family_data();
  ReadOptions read_options;
4532
  read_options.verify_checksums = true;
L
Lei Jin 已提交
4533
  read_options.fill_cache = false;
4534 4535 4536 4537 4538
  // Compaction iterators shouldn't be confined to a single prefix.
  // Compactions use Seek() for
  // (a) concurrent compactions,
  // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
  read_options.total_order_seek = true;
J
jorlow@chromium.org 已提交
4539 4540 4541 4542

  // Level-0 files have to be merged together.  For other levels,
  // we will make a concatenating iterator per level.
  // TODO(opt): use concatenating iterator for level-0 if there is no overlap
4543 4544 4545
  const size_t space = (c->level() <= 0 ? c->input_levels(0)->num_files +
                                              c->num_input_levels() - 1
                                        : c->num_input_levels());
赵明 已提交
4546
  InternalIterator** list = new InternalIterator*[space];
Z
ZhaoMing 已提交
4547
  auto& dependence_map = c->input_version()->storage_info()->dependence_map();
4548 4549
  size_t num = 0;
  for (size_t which = 0; which < c->num_input_levels(); which++) {
F
Feng Zhu 已提交
4550
    if (c->input_levels(which)->num_files != 0) {
Z
ZhaoMing 已提交
4551
      if (c->level(which) <= 0 || c->input_levels(which)->num_files == 1) {
4552
        const LevelFilesBrief* flevel = c->input_levels(which);
F
Feng Zhu 已提交
4553
        for (size_t i = 0; i < flevel->num_files; i++) {
L
Lei Jin 已提交
4554
          list[num++] = cfd->table_cache()->NewIterator(
4555
              read_options, env_options_compactions, cfd->internal_comparator(),
Z
ZhaoMing 已提交
4556
              *flevel->files[i].file_metadata, dependence_map, range_del_agg,
4557
              c->mutable_cf_options()->prefix_extractor.get(),
4558 4559
              nullptr /* table_reader_ptr */,
              nullptr /* no per level latency histogram */,
4560
              true /* for_compaction */, nullptr /* arena */,
奏之章 已提交
4561
              false /* skip_filters */, c->level(which) /* level */);
J
jorlow@chromium.org 已提交
4562 4563 4564
        }
      } else {
        // Create concatenating iterator for the files from this level
4565 4566
        list[num++] = new LevelIterator(
            cfd->table_cache(), read_options, env_options_compactions,
Z
ZhaoMing 已提交
4567
            cfd->internal_comparator(), c->input_levels(which), dependence_map,
4568
            c->mutable_cf_options()->prefix_extractor.get(),
4569 4570 4571
            false /* should_sample */,
            nullptr /* no per level latency histogram */,
            true /* for_compaction */, false /* skip_filters */,
赵明 已提交
4572
            static_cast<int>(which) /* level */, range_del_agg);
J
jorlow@chromium.org 已提交
4573 4574 4575 4576
      }
    }
  }
  assert(num <= space);
S
sdong 已提交
4577
  InternalIterator* result =
4578 4579
      NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
                         static_cast<int>(num));
J
jorlow@chromium.org 已提交
4580 4581 4582 4583
  delete[] list;
  return result;
}

A
Abhishek Kona 已提交
4584
// verify that the files listed in this compaction are present
4585 4586
// in the current version
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
4587
#ifndef NDEBUG
Z
ZhaoMing 已提交
4588 4589 4590
  if (c->compaction_type() == kGarbageCollection) {
    return true;
  }
I
Igor Canadi 已提交
4591
  Version* version = c->column_family_data()->current();
S
sdong 已提交
4592
  const VersionStorageInfo* vstorage = version->storage_info();
4593
  if (c->input_version() != version) {
4594 4595
    ROCKS_LOG_INFO(
        db_options_->info_log,
4596 4597
        "[%s] compaction output being applied to a different base version from"
        " input version",
I
Igor Canadi 已提交
4598
        c->column_family_data()->GetName().c_str());
S
sdong 已提交
4599 4600 4601

    if (vstorage->compaction_style_ == kCompactionStyleLevel &&
        c->start_level() == 0 && c->num_input_levels() > 2U) {
4602 4603 4604 4605 4606 4607 4608 4609 4610 4611 4612
      // We are doing a L0->base_level compaction. The assumption is if
      // base level is not L1, levels from L1 to base_level - 1 is empty.
      // This is ensured by having one compaction from L0 going on at the
      // same time in level-based compaction. So that during the time, no
      // compaction/flush can put files to those levels.
      for (int l = c->start_level() + 1; l < c->output_level(); l++) {
        if (vstorage->NumLevelFiles(l) != 0) {
          return false;
        }
      }
    }
4613 4614
  }

4615
  for (size_t input = 0; input < c->num_input_levels(); ++input) {
4616
    int level = c->level(input);
4617
    for (size_t i = 0; i < c->num_input_files(input); ++i) {
4618 4619
      uint64_t number = c->input(input, i)->fd.GetNumber();
      bool found = false;
4620
      for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
4621 4622 4623 4624 4625
        FileMetaData* f = vstorage->files_[level][j];
        if (f->fd.GetNumber() == number) {
          found = true;
          break;
        }
4626
      }
4627 4628
      if (!found) {
        return false;  // input files non existent in current version
4629 4630 4631
      }
    }
  }
4632 4633
#else
  (void)c;
4634
#endif
赵明 已提交
4635
  return true;  // everything good
4636 4637
}

4638
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
4639
                                      FileMetaData** meta,
4640 4641
                                      ColumnFamilyData** cfd) {
  for (auto cfd_iter : *column_family_set_) {
4642 4643 4644
    if (!cfd_iter->initialized()) {
      continue;
    }
4645
    Version* version = cfd_iter->current();
S
sdong 已提交
4646
    const auto* vstorage = version->storage_info();
4647
    for (int level = 0; level < vstorage->num_levels(); level++) {
S
sdong 已提交
4648
      for (const auto& file : vstorage->LevelFiles(level)) {
4649
        if (file->fd.GetNumber() == number) {
4650
          *meta = file;
4651
          *filelevel = level;
4652
          *cfd = cfd_iter;
4653 4654
          return Status::OK();
        }
4655 4656 4657 4658 4659 4660
      }
    }
  }
  return Status::NotFound("File not present in any level");
}

4661
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
I
Igor Canadi 已提交
4662
  for (auto cfd : *column_family_set_) {
4663
    if (cfd->IsDropped() || !cfd->initialized()) {
4664 4665
      continue;
    }
赵明 已提交
4666
    for (int level = -1; level < cfd->NumberLevels(); level++) {
S
sdong 已提交
4667 4668
      for (const auto& file :
           cfd->current()->storage_info()->LevelFiles(level)) {
4669
        LiveFileMetaData filemetadata;
4670
        filemetadata.column_family_name = cfd->GetName();
4671
        uint32_t path_id = file->fd.GetPathId();
4672 4673
        if (path_id < cfd->ioptions()->cf_paths.size()) {
          filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
4674
        } else {
4675 4676
          assert(!cfd->ioptions()->cf_paths.empty());
          filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
4677 4678
        }
        filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
4679
        filemetadata.level = level;
4680
        filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
I
Igor Canadi 已提交
4681 4682
        filemetadata.smallestkey = file->smallest.user_key().ToString();
        filemetadata.largestkey = file->largest.user_key().ToString();
4683 4684
        filemetadata.smallest_seqno = file->fd.smallest_seqno;
        filemetadata.largest_seqno = file->fd.largest_seqno;
赵明 已提交
4685 4686
        filemetadata.num_reads_sampled =
            file->stats.num_reads_sampled.load(std::memory_order_relaxed);
4687
        filemetadata.being_compacted = file->being_compacted;
4688
        filemetadata.num_entries = file->prop.num_entries;
赵明 已提交
4689
        filemetadata.num_deletions = file->prop.num_deletions;
4690 4691
        metadata->push_back(filemetadata);
      }
4692 4693 4694 4695
    }
  }
}

4696
void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
4697
                                  std::vector<std::string>* manifest_filenames,
I
Igor Canadi 已提交
4698
                                  uint64_t min_pending_output) {
4699 4700
  assert(manifest_filenames->empty());
  obsolete_manifests_.swap(*manifest_filenames);
4701 4702 4703 4704
  std::vector<ObsoleteFileInfo> pending_files;
  for (auto& f : obsolete_files_) {
    if (f.metadata->fd.GetNumber() < min_pending_output) {
      files->push_back(std::move(f));
I
Igor Canadi 已提交
4705
    } else {
4706
      pending_files.push_back(std::move(f));
I
Igor Canadi 已提交
4707 4708 4709
    }
  }
  obsolete_files_.swap(pending_files);
I
Igor Canadi 已提交
4710 4711
}

4712
ColumnFamilyData* VersionSet::CreateColumnFamily(
4713
    const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
4714 4715
  assert(edit->is_column_family_add_);

4716 4717 4718
  MutableCFOptions dummy_cf_options;
  Version* dummy_versions =
      new Version(nullptr, this, env_options_, dummy_cf_options);
4719 4720 4721
  // Ref() dummy version once so that later we can call Unref() to delete it
  // by avoiding calling "delete" explicitly (~Version is private)
  dummy_versions->Ref();
I
Igor Canadi 已提交
4722
  auto new_cfd = column_family_set_->CreateColumnFamily(
4723 4724
      edit->column_family_name_, edit->column_family_, dummy_versions,
      cf_options);
I
Igor Canadi 已提交
4725

4726 4727 4728
  Version* v = new Version(new_cfd, this, env_options_,
                           *new_cfd->GetLatestMutableCFOptions(),
                           current_version_number_++);
4729

4730 4731 4732
  // Fill level target base information.
  v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
                                        *new_cfd->GetLatestMutableCFOptions());
4733
  AppendVersion(new_cfd, v);
4734 4735
  // GetLatestMutableCFOptions() is safe here without mutex since the
  // cfd is not available to client
A
agiardullo 已提交
4736
  new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
Z
ZhaoMing 已提交
4737
                             seq_per_batch_, LastSequence());
I
Igor Canadi 已提交
4738
  new_cfd->SetLogNumber(edit->log_number_);
4739 4740 4741
  return new_cfd;
}

4742 4743 4744 4745 4746 4747 4748 4749
uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
  uint64_t count = 0;
  for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
    count++;
  }
  return count;
}

4750 4751 4752 4753 4754
uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
  std::unordered_set<uint64_t> unique_files;
  uint64_t total_files_size = 0;
  for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
    VersionStorageInfo* storage_info = v->storage_info();
陈常龙 已提交
4755 4756 4757 4758
    for (int level = -1; level < storage_info->num_levels_; level++) {
      for (auto f : storage_info->LevelFiles(level)) {
        if (unique_files.insert(f->fd.packed_number_and_path_id).second) {
          total_files_size += f->fd.GetFileSize();
4759 4760 4761 4762 4763 4764 4765
        }
      }
    }
  }
  return total_files_size;
}

4766
}  // namespace rocksdb