version_set.cc 221.0 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5
//
J
jorlow@chromium.org 已提交
6 7 8 9 10
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/version_set.h"
I
Igor Canadi 已提交
11

12
#include <stdio.h>
J
jorlow@chromium.org 已提交
13
#include <algorithm>
14
#include <array>
15
#include <cinttypes>
16
#include <list>
I
Igor Canadi 已提交
17
#include <map>
I
Igor Canadi 已提交
18
#include <set>
19
#include <string>
20
#include <unordered_map>
21
#include <vector>
22
#include "compaction/compaction.h"
23
#include "db/internal_stats.h"
J
jorlow@chromium.org 已提交
24 25 26
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
27
#include "db/merge_context.h"
28
#include "db/merge_helper.h"
29
#include "db/pinned_iterators_manager.h"
J
jorlow@chromium.org 已提交
30
#include "db/table_cache.h"
S
sdong 已提交
31
#include "db/version_builder.h"
32
#include "file/filename.h"
33 34 35
#include "file/random_access_file_reader.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
36
#include "monitoring/file_read_sample.h"
37
#include "monitoring/perf_context_imp.h"
38
#include "monitoring/persistent_stats_history.h"
39 40
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
41
#include "rocksdb/write_buffer_manager.h"
42 43
#include "table/format.h"
#include "table/get_context.h"
S
sdong 已提交
44
#include "table/internal_iterator.h"
45
#include "table/merging_iterator.h"
46
#include "table/meta_blocks.h"
47
#include "table/multiget_context.h"
48
#include "table/plain/plain_table_factory.h"
49 50
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
51
#include "test_util/sync_point.h"
J
jorlow@chromium.org 已提交
52
#include "util/coding.h"
53
#include "util/stop_watch.h"
54
#include "util/string_util.h"
55
#include "util/user_comparator_wrapper.h"
J
jorlow@chromium.org 已提交
56

57
namespace ROCKSDB_NAMESPACE {
J
jorlow@chromium.org 已提交
58

59 60
namespace {

61
// Find File in LevelFilesBrief data structure
62 63
// Within an index range defined by left and right
int FindFileInRange(const InternalKeyComparator& icmp,
64
    const LevelFilesBrief& file_level,
65 66 67
    const Slice& key,
    uint32_t left,
    uint32_t right) {
68 69 70 71 72 73
  auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
    return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
  };
  const auto &b = file_level.files;
  return static_cast<int>(std::lower_bound(b + left,
                                           b + right, key, cmp) - b);
74 75
}

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
Status OverlapWithIterator(const Comparator* ucmp,
    const Slice& smallest_user_key,
    const Slice& largest_user_key,
    InternalIterator* iter,
    bool* overlap) {
  InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
                          kValueTypeForSeek);
  iter->Seek(range_start.Encode());
  if (!iter->status().ok()) {
    return iter->status();
  }

  *overlap = false;
  if (iter->Valid()) {
    ParsedInternalKey seek_result;
    if (!ParseInternalKey(iter->key(), &seek_result)) {
      return Status::Corruption("DB have corrupted keys");
    }

95 96
    if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
        0) {
97 98 99 100 101 102 103
      *overlap = true;
    }
  }

  return iter->status();
}

104 105 106 107 108 109 110 111
// Class to help choose the next file to search for the particular key.
// Searches and returns files level by level.
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in a smaller level, later levels are irrelevant (unless we
// are MergeInProgress).
class FilePicker {
 public:
112 113 114 115 116
  FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
             const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
             unsigned int num_levels, FileIndexer* file_indexer,
             const Comparator* user_comparator,
             const InternalKeyComparator* internal_comparator)
117
      : num_levels_(num_levels),
118
        curr_level_(static_cast<unsigned int>(-1)),
119
        returned_file_level_(static_cast<unsigned int>(-1)),
120
        hit_file_level_(static_cast<unsigned int>(-1)),
121 122
        search_left_bound_(0),
        search_right_bound_(FileIndexer::kLevelMaxIndex),
123
#ifndef NDEBUG
124
        files_(files),
125
#endif
126
        level_files_brief_(file_levels),
127
        is_hit_file_last_in_level_(false),
128
        curr_file_level_(nullptr),
129 130 131 132 133
        user_key_(user_key),
        ikey_(ikey),
        file_indexer_(file_indexer),
        user_comparator_(user_comparator),
        internal_comparator_(internal_comparator) {
134 135 136
#ifdef NDEBUG
    (void)files;
#endif
137 138 139 140
    // Setup member variables to search first level.
    search_ended_ = !PrepareNextLevel();
    if (!search_ended_) {
      // Prefetch Level 0 table data to avoid cache miss if possible.
141 142
      for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
        auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
143 144 145 146 147 148 149
        if (r) {
          r->Prepare(ikey);
        }
      }
    }
  }

150
  int GetCurrentLevel() const { return curr_level_; }
151

152 153 154 155 156
  FdWithKeyRange* GetNextFile() {
    while (!search_ended_) {  // Loops over different levels.
      while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
        // Loops over all files in current level.
        FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
157
        hit_file_level_ = curr_level_;
158 159
        is_hit_file_last_in_level_ =
            curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
160 161 162 163
        int cmp_largest = -1;

        // Do key range filtering of files or/and fractional cascading if:
        // (1) not all the files are in level 0, or
赵星宇 已提交
164 165
        // (2) there are more than 3 current level files
        // If there are only 3 or less current level files in the system, we skip
166 167 168 169 170 171 172 173
        // the key range filtering. In this case, more likely, the system is
        // highly tuned to minimize number of tables queried by each query,
        // so it is unlikely that key range filtering is more efficient than
        // querying the files.
        if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
          // Check if key is within a file's range. If search left bound and
          // right bound point to the same find, we are sure key falls in
          // range.
174 175 176 177 178 179 180
          assert(curr_level_ == 0 ||
                 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
                 user_comparator_->CompareWithoutTimestamp(
                     user_key_, ExtractUserKey(f->smallest_key)) <= 0);

          int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
              user_key_, ExtractUserKey(f->smallest_key));
181
          if (cmp_smallest >= 0) {
182 183
            cmp_largest = user_comparator_->CompareWithoutTimestamp(
                user_key_, ExtractUserKey(f->largest_key));
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
          }

          // Setup file search bound for the next level based on the
          // comparison results
          if (curr_level_ > 0) {
            file_indexer_->GetNextLevelIndex(curr_level_,
                                            curr_index_in_curr_level_,
                                            cmp_smallest, cmp_largest,
                                            &search_left_bound_,
                                            &search_right_bound_);
          }
          // Key falls out of current file's range
          if (cmp_smallest < 0 || cmp_largest > 0) {
            if (curr_level_ == 0) {
              ++curr_index_in_curr_level_;
              continue;
            } else {
              // Search next level.
              break;
            }
          }
        }
#ifndef NDEBUG
        // Sanity check to make sure that the files are correctly sorted
        if (prev_file_) {
          if (curr_level_ != 0) {
            int comp_sign = internal_comparator_->Compare(
                prev_file_->largest_key, f->smallest_key);
            assert(comp_sign < 0);
          } else {
            // level == 0, the current file cannot be newer than the previous
            // one. Use compressed data structure, has no attribute seqNo
            assert(curr_index_in_curr_level_ > 0);
            assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
                  files_[0][curr_index_in_curr_level_-1]));
          }
        }
        prev_file_ = f;
#endif
223
        returned_file_level_ = curr_level_;
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
        if (curr_level_ > 0 && cmp_largest < 0) {
          // No more files to search in this level.
          search_ended_ = !PrepareNextLevel();
        } else {
          ++curr_index_in_curr_level_;
        }
        return f;
      }
      // Start searching next level.
      search_ended_ = !PrepareNextLevel();
    }
    // Search ended.
    return nullptr;
  }

239 240 241 242
  // getter for current file level
  // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
  unsigned int GetHitFileLevel() { return hit_file_level_; }

243 244 245 246
  // Returns true if the most recent "hit file" (i.e., one returned by
  // GetNextFile()) is at the last index in its level.
  bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }

247 248 249
 private:
  unsigned int num_levels_;
  unsigned int curr_level_;
250
  unsigned int returned_file_level_;
251
  unsigned int hit_file_level_;
252 253
  int32_t search_left_bound_;
  int32_t search_right_bound_;
254
#ifndef NDEBUG
255
  std::vector<FileMetaData*>* files_;
256
#endif
257
  autovector<LevelFilesBrief>* level_files_brief_;
258
  bool search_ended_;
259
  bool is_hit_file_last_in_level_;
260
  LevelFilesBrief* curr_file_level_;
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
  unsigned int curr_index_in_curr_level_;
  unsigned int start_index_in_curr_level_;
  Slice user_key_;
  Slice ikey_;
  FileIndexer* file_indexer_;
  const Comparator* user_comparator_;
  const InternalKeyComparator* internal_comparator_;
#ifndef NDEBUG
  FdWithKeyRange* prev_file_;
#endif

  // Setup local variables to search next level.
  // Returns false if there are no more levels to search.
  bool PrepareNextLevel() {
    curr_level_++;
    while (curr_level_ < num_levels_) {
277
      curr_file_level_ = &(*level_files_brief_)[curr_level_];
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
      if (curr_file_level_->num_files == 0) {
        // When current level is empty, the search bound generated from upper
        // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
        // also empty.
        assert(search_left_bound_ == 0);
        assert(search_right_bound_ == -1 ||
               search_right_bound_ == FileIndexer::kLevelMaxIndex);
        // Since current level is empty, it will need to search all files in
        // the next level
        search_left_bound_ = 0;
        search_right_bound_ = FileIndexer::kLevelMaxIndex;
        curr_level_++;
        continue;
      }

      // Some files may overlap each other. We find
      // all files that overlap user_key and process them in order from
      // newest to oldest. In the context of merge-operator, this can occur at
      // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
      // are always compacted into a single entry).
      int32_t start_index;
      if (curr_level_ == 0) {
        // On Level-0, we read through all files to check for overlap.
        start_index = 0;
      } else {
        // On Level-n (n>=1), files are sorted. Binary search to find the
        // earliest file whose largest key >= ikey. Search left bound and
        // right bound are used to narrow the range.
306
        if (search_left_bound_ <= search_right_bound_) {
307
          if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
308 309
            search_right_bound_ =
                static_cast<int32_t>(curr_file_level_->num_files) - 1;
310
          }
311 312 313 314
          // `search_right_bound_` is an inclusive upper-bound, but since it was
          // determined based on user key, it is still possible the lookup key
          // falls to the right of `search_right_bound_`'s corresponding file.
          // So, pass a limit one higher, which allows us to detect this case.
315 316 317
          start_index =
              FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
                              static_cast<uint32_t>(search_left_bound_),
318 319 320 321 322 323 324 325 326 327
                              static_cast<uint32_t>(search_right_bound_) + 1);
          if (start_index == search_right_bound_ + 1) {
            // `ikey_` comes after `search_right_bound_`. The lookup key does
            // not exist on this level, so let's skip this level and do a full
            // binary search on the next level.
            search_left_bound_ = 0;
            search_right_bound_ = FileIndexer::kLevelMaxIndex;
            curr_level_++;
            continue;
          }
328 329
        } else {
          // search_left_bound > search_right_bound, key does not exist in
C
clark.kang 已提交
330
          // this level. Since no comparison is done in this level, it will
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
          // need to search all files in the next level.
          search_left_bound_ = 0;
          search_right_bound_ = FileIndexer::kLevelMaxIndex;
          curr_level_++;
          continue;
        }
      }
      start_index_in_curr_level_ = start_index;
      curr_index_in_curr_level_ = start_index;
#ifndef NDEBUG
      prev_file_ = nullptr;
#endif
      return true;
    }
    // curr_level_ = num_levels_. So, no more levels to search.
    return false;
  }
};
349 350 351 352 353 354

class FilePickerMultiGet {
 private:
  struct FilePickerContext;

 public:
355
  FilePickerMultiGet(MultiGetRange* range,
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
                     autovector<LevelFilesBrief>* file_levels,
                     unsigned int num_levels, FileIndexer* file_indexer,
                     const Comparator* user_comparator,
                     const InternalKeyComparator* internal_comparator)
      : num_levels_(num_levels),
        curr_level_(static_cast<unsigned int>(-1)),
        returned_file_level_(static_cast<unsigned int>(-1)),
        hit_file_level_(static_cast<unsigned int>(-1)),
        range_(range),
        batch_iter_(range->begin()),
        batch_iter_prev_(range->begin()),
        maybe_repeat_key_(false),
        current_level_range_(*range, range->begin(), range->end()),
        current_file_range_(*range, range->begin(), range->end()),
        level_files_brief_(file_levels),
        is_hit_file_last_in_level_(false),
        curr_file_level_(nullptr),
        file_indexer_(file_indexer),
        user_comparator_(user_comparator),
        internal_comparator_(internal_comparator) {
    for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
      fp_ctx_array_[iter.index()] =
          FilePickerContext(0, FileIndexer::kLevelMaxIndex);
    }

    // Setup member variables to search first level.
    search_ended_ = !PrepareNextLevel();
    if (!search_ended_) {
      // REVISIT
      // Prefetch Level 0 table data to avoid cache miss if possible.
      // As of now, only PlainTableReader and CuckooTableReader do any
      // prefetching. This may not be necessary anymore once we implement
      // batching in those table readers
      for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
        auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
        if (r) {
          for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
            r->Prepare(iter->ikey);
          }
        }
      }
    }
  }

  int GetCurrentLevel() const { return curr_level_; }

  // Iterates through files in the current level until it finds a file that
  // contains atleast one key from the MultiGet batch
  bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
                                  size_t* file_index, FdWithKeyRange** fd,
                                  bool* is_last_key_in_file) {
    size_t curr_file_index = *file_index;
    FdWithKeyRange* f = nullptr;
    bool file_hit = false;
    int cmp_largest = -1;
    if (curr_file_index >= curr_file_level_->num_files) {
A
anand76 已提交
412 413 414 415 416 417 418 419 420 421 422 423
      // In the unlikely case the next key is a duplicate of the current key,
      // and the current key is the last in the level and the internal key
      // was not found, we need to skip lookup for the remaining keys and
      // reset the search bounds
      if (batch_iter_ != current_level_range_.end()) {
        ++batch_iter_;
        for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
          struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
          fp_ctx.search_left_bound = 0;
          fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
        }
      }
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521
      return false;
    }
    // Loops over keys in the MultiGet batch until it finds a file with
    // atleast one of the keys. Then it keeps moving forward until the
    // last key in the batch that falls in that file
    while (batch_iter_ != current_level_range_.end() &&
           (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
                curr_file_index ||
            !file_hit)) {
      struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
      f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
      Slice& user_key = batch_iter_->ukey;

      // Do key range filtering of files or/and fractional cascading if:
      // (1) not all the files are in level 0, or
      // (2) there are more than 3 current level files
      // If there are only 3 or less current level files in the system, we
      // skip the key range filtering. In this case, more likely, the system
      // is highly tuned to minimize number of tables queried by each query,
      // so it is unlikely that key range filtering is more efficient than
      // querying the files.
      if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
        // Check if key is within a file's range. If search left bound and
        // right bound point to the same find, we are sure key falls in
        // range.
        assert(curr_level_ == 0 ||
               fp_ctx.curr_index_in_curr_level ==
                   fp_ctx.start_index_in_curr_level ||
               user_comparator_->Compare(user_key,
                                         ExtractUserKey(f->smallest_key)) <= 0);

        int cmp_smallest = user_comparator_->Compare(
            user_key, ExtractUserKey(f->smallest_key));
        if (cmp_smallest >= 0) {
          cmp_largest = user_comparator_->Compare(
              user_key, ExtractUserKey(f->largest_key));
        } else {
          cmp_largest = -1;
        }

        // Setup file search bound for the next level based on the
        // comparison results
        if (curr_level_ > 0) {
          file_indexer_->GetNextLevelIndex(
              curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
              cmp_largest, &fp_ctx.search_left_bound,
              &fp_ctx.search_right_bound);
        }
        // Key falls out of current file's range
        if (cmp_smallest < 0 || cmp_largest > 0) {
          next_file_range->SkipKey(batch_iter_);
        } else {
          file_hit = true;
        }
      } else {
        file_hit = true;
      }
      if (cmp_largest == 0) {
        // cmp_largest is 0, which means the next key will not be in this
        // file, so stop looking further. Also don't increment megt_iter_
        // as we may have to look for this key in the next file if we don't
        // find it in this one
        break;
      } else {
        if (curr_level_ == 0) {
          // We need to look through all files in level 0
          ++fp_ctx.curr_index_in_curr_level;
        }
        ++batch_iter_;
      }
      if (!file_hit) {
        curr_file_index =
            (batch_iter_ != current_level_range_.end())
                ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
                : curr_file_level_->num_files;
      }
    }

    *fd = f;
    *file_index = curr_file_index;
    *is_last_key_in_file = cmp_largest == 0;
    return file_hit;
  }

  FdWithKeyRange* GetNextFile() {
    while (!search_ended_) {
      // Start searching next level.
      if (batch_iter_ == current_level_range_.end()) {
        search_ended_ = !PrepareNextLevel();
        continue;
      } else {
        if (maybe_repeat_key_) {
          maybe_repeat_key_ = false;
          // Check if we found the final value for the last key in the
          // previous lookup range. If we did, then there's no need to look
          // any further for that key, so advance batch_iter_. Else, keep
          // batch_iter_ positioned on that key so we look it up again in
          // the next file
A
anand76 已提交
522 523 524 525
          // For L0, always advance the key because we will look in the next
          // file regardless for all keys not found yet
          if (current_level_range_.CheckKeyDone(batch_iter_) ||
              curr_level_ == 0) {
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
            ++batch_iter_;
          }
        }
        // batch_iter_prev_ will become the start key for the next file
        // lookup
        batch_iter_prev_ = batch_iter_;
      }

      MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
                                    current_level_range_.end());
      size_t curr_file_index =
          (batch_iter_ != current_level_range_.end())
              ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
              : curr_file_level_->num_files;
      FdWithKeyRange* f;
      bool is_last_key_in_file;
      if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
                                      &is_last_key_in_file)) {
        search_ended_ = !PrepareNextLevel();
      } else {
        MultiGetRange::Iterator upper_key = batch_iter_;
        if (is_last_key_in_file) {
          // Since cmp_largest is 0, batch_iter_ still points to the last key
          // that falls in this file, instead of the next one. Increment
          // upper_key so we can set the range properly for SST MultiGet
          ++upper_key;
          ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
          maybe_repeat_key_ = true;
        }
        // Set the range for this file
        current_file_range_ =
            MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
        returned_file_level_ = curr_level_;
        hit_file_level_ = curr_level_;
        is_hit_file_last_in_level_ =
            curr_file_index == curr_file_level_->num_files - 1;
        return f;
      }
    }

    // Search ended
    return nullptr;
  }

  // getter for current file level
  // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
  unsigned int GetHitFileLevel() { return hit_file_level_; }

  // Returns true if the most recent "hit file" (i.e., one returned by
  // GetNextFile()) is at the last index in its level.
  bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }

  const MultiGetRange& CurrentFileRange() { return current_file_range_; }

 private:
  unsigned int num_levels_;
  unsigned int curr_level_;
  unsigned int returned_file_level_;
  unsigned int hit_file_level_;

  struct FilePickerContext {
    int32_t search_left_bound;
    int32_t search_right_bound;
    unsigned int curr_index_in_curr_level;
    unsigned int start_index_in_curr_level;

    FilePickerContext(int32_t left, int32_t right)
A
anand76 已提交
593 594
        : search_left_bound(left), search_right_bound(right),
          curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655

    FilePickerContext() = default;
  };
  std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
  MultiGetRange* range_;
  // Iterator to iterate through the keys in a MultiGet batch, that gets reset
  // at the beginning of each level. Each call to GetNextFile() will position
  // batch_iter_ at or right after the last key that was found in the returned
  // SST file
  MultiGetRange::Iterator batch_iter_;
  // An iterator that records the previous position of batch_iter_, i.e last
  // key found in the previous SST file, in order to serve as the start of
  // the batch key range for the next SST file
  MultiGetRange::Iterator batch_iter_prev_;
  bool maybe_repeat_key_;
  MultiGetRange current_level_range_;
  MultiGetRange current_file_range_;
  autovector<LevelFilesBrief>* level_files_brief_;
  bool search_ended_;
  bool is_hit_file_last_in_level_;
  LevelFilesBrief* curr_file_level_;
  FileIndexer* file_indexer_;
  const Comparator* user_comparator_;
  const InternalKeyComparator* internal_comparator_;

  // Setup local variables to search next level.
  // Returns false if there are no more levels to search.
  bool PrepareNextLevel() {
    if (curr_level_ == 0) {
      MultiGetRange::Iterator mget_iter = current_level_range_.begin();
      if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
          curr_file_level_->num_files) {
        batch_iter_prev_ = current_level_range_.begin();
        batch_iter_ = current_level_range_.begin();
        return true;
      }
    }

    curr_level_++;
    // Reset key range to saved value
    while (curr_level_ < num_levels_) {
      bool level_contains_keys = false;
      curr_file_level_ = &(*level_files_brief_)[curr_level_];
      if (curr_file_level_->num_files == 0) {
        // When current level is empty, the search bound generated from upper
        // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
        // also empty.

        for (auto mget_iter = current_level_range_.begin();
             mget_iter != current_level_range_.end(); ++mget_iter) {
          struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];

          assert(fp_ctx.search_left_bound == 0);
          assert(fp_ctx.search_right_bound == -1 ||
                 fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
          // Since current level is empty, it will need to search all files in
          // the next level
          fp_ctx.search_left_bound = 0;
          fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
        }
        // Skip all subsequent empty levels
656 657 658 659 660
        do {
          ++curr_level_;
        } while ((curr_level_ < num_levels_) &&
                 (*level_files_brief_)[curr_level_].num_files == 0);
        continue;
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731
      }

      // Some files may overlap each other. We find
      // all files that overlap user_key and process them in order from
      // newest to oldest. In the context of merge-operator, this can occur at
      // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
      // are always compacted into a single entry).
      int32_t start_index = -1;
      current_level_range_ =
          MultiGetRange(*range_, range_->begin(), range_->end());
      for (auto mget_iter = current_level_range_.begin();
           mget_iter != current_level_range_.end(); ++mget_iter) {
        struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
        if (curr_level_ == 0) {
          // On Level-0, we read through all files to check for overlap.
          start_index = 0;
          level_contains_keys = true;
        } else {
          // On Level-n (n>=1), files are sorted. Binary search to find the
          // earliest file whose largest key >= ikey. Search left bound and
          // right bound are used to narrow the range.
          if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
            if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
              fp_ctx.search_right_bound =
                  static_cast<int32_t>(curr_file_level_->num_files) - 1;
            }
            // `search_right_bound_` is an inclusive upper-bound, but since it
            // was determined based on user key, it is still possible the lookup
            // key falls to the right of `search_right_bound_`'s corresponding
            // file. So, pass a limit one higher, which allows us to detect this
            // case.
            Slice& ikey = mget_iter->ikey;
            start_index = FindFileInRange(
                *internal_comparator_, *curr_file_level_, ikey,
                static_cast<uint32_t>(fp_ctx.search_left_bound),
                static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
            if (start_index == fp_ctx.search_right_bound + 1) {
              // `ikey_` comes after `search_right_bound_`. The lookup key does
              // not exist on this level, so let's skip this level and do a full
              // binary search on the next level.
              fp_ctx.search_left_bound = 0;
              fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
              current_level_range_.SkipKey(mget_iter);
              continue;
            } else {
              level_contains_keys = true;
            }
          } else {
            // search_left_bound > search_right_bound, key does not exist in
            // this level. Since no comparison is done in this level, it will
            // need to search all files in the next level.
            fp_ctx.search_left_bound = 0;
            fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
            current_level_range_.SkipKey(mget_iter);
            continue;
          }
        }
        fp_ctx.start_index_in_curr_level = start_index;
        fp_ctx.curr_index_in_curr_level = start_index;
      }
      if (level_contains_keys) {
        batch_iter_prev_ = current_level_range_.begin();
        batch_iter_ = current_level_range_.begin();
        return true;
      }
      curr_level_++;
    }
    // curr_level_ = num_levels_. So, no more levels to search.
    return false;
  }
};
732 733
}  // anonymous namespace

S
sdong 已提交
734 735
VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }

J
jorlow@chromium.org 已提交
736 737
Version::~Version() {
  assert(refs_ == 0);
738 739 740 741 742 743

  // Remove from linked list
  prev_->next_ = next_;
  next_->prev_ = prev_;

  // Drop references to files
S
sdong 已提交
744 745 746
  for (int level = 0; level < storage_info_.num_levels_; level++) {
    for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
      FileMetaData* f = storage_info_.files_[level][i];
747
      assert(f->refs > 0);
J
jorlow@chromium.org 已提交
748 749
      f->refs--;
      if (f->refs <= 0) {
750 751 752 753 754
        assert(cfd_ != nullptr);
        uint32_t path_id = f->fd.GetPathId();
        assert(path_id < cfd_->ioptions()->cf_paths.size());
        vset_->obsolete_files_.push_back(
            ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
J
jorlow@chromium.org 已提交
755 756 757 758 759
      }
    }
  }
}

760
int FindFile(const InternalKeyComparator& icmp,
761
             const LevelFilesBrief& file_level,
762
             const Slice& key) {
763 764
  return FindFileInRange(icmp, file_level, key, 0,
                         static_cast<uint32_t>(file_level.num_files));
765 766
}

767
void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
F
Feng Zhu 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790
        const std::vector<FileMetaData*>& files,
        Arena* arena) {
  assert(file_level);
  assert(arena);

  size_t num = files.size();
  file_level->num_files = num;
  char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
  file_level->files = new (mem)FdWithKeyRange[num];

  for (size_t i = 0; i < num; i++) {
    Slice smallest_key = files[i]->smallest.Encode();
    Slice largest_key = files[i]->largest.Encode();

    // Copy key slice to sequential memory
    size_t smallest_size = smallest_key.size();
    size_t largest_size = largest_key.size();
    mem = arena->AllocateAligned(smallest_size + largest_size);
    memcpy(mem, smallest_key.data(), smallest_size);
    memcpy(mem + smallest_size, largest_key.data(), largest_size);

    FdWithKeyRange& f = file_level->files[i];
    f.fd = files[i]->fd;
791
    f.file_metadata = files[i];
F
Feng Zhu 已提交
792 793 794 795 796
    f.smallest_key = Slice(mem, smallest_size);
    f.largest_key = Slice(mem + smallest_size, largest_size);
  }
}

G
Gabor Cselle 已提交
797
static bool AfterFile(const Comparator* ucmp,
798
                      const Slice* user_key, const FdWithKeyRange* f) {
A
Abhishek Kona 已提交
799 800
  // nullptr user_key occurs before all keys and is therefore never after *f
  return (user_key != nullptr &&
801 802
          ucmp->CompareWithoutTimestamp(*user_key,
                                        ExtractUserKey(f->largest_key)) > 0);
G
Gabor Cselle 已提交
803 804 805
}

static bool BeforeFile(const Comparator* ucmp,
806
                       const Slice* user_key, const FdWithKeyRange* f) {
A
Abhishek Kona 已提交
807 808
  // nullptr user_key occurs after all keys and is therefore never before *f
  return (user_key != nullptr &&
809 810
          ucmp->CompareWithoutTimestamp(*user_key,
                                        ExtractUserKey(f->smallest_key)) < 0);
G
Gabor Cselle 已提交
811 812
}

813 814
bool SomeFileOverlapsRange(
    const InternalKeyComparator& icmp,
G
Gabor Cselle 已提交
815
    bool disjoint_sorted_files,
816
    const LevelFilesBrief& file_level,
G
Gabor Cselle 已提交
817 818 819 820 821
    const Slice* smallest_user_key,
    const Slice* largest_user_key) {
  const Comparator* ucmp = icmp.user_comparator();
  if (!disjoint_sorted_files) {
    // Need to check against all files
822 823
    for (size_t i = 0; i < file_level.num_files; i++) {
      const FdWithKeyRange* f = &(file_level.files[i]);
G
Gabor Cselle 已提交
824 825 826 827 828 829 830 831 832 833 834 835
      if (AfterFile(ucmp, smallest_user_key, f) ||
          BeforeFile(ucmp, largest_user_key, f)) {
        // No overlap
      } else {
        return true;  // Overlap
      }
    }
    return false;
  }

  // Binary search over file list
  uint32_t index = 0;
A
Abhishek Kona 已提交
836
  if (smallest_user_key != nullptr) {
A
Amy Xu 已提交
837
    // Find the leftmost possible internal key for smallest_user_key
838
    InternalKey small;
A
Amy Xu 已提交
839
    small.SetMinPossibleForUserKey(*smallest_user_key);
840
    index = FindFile(icmp, file_level, small.Encode());
G
Gabor Cselle 已提交
841 842
  }

843
  if (index >= file_level.num_files) {
G
Gabor Cselle 已提交
844 845 846 847
    // beginning of range is after all files, so no overlap.
    return false;
  }

848
  return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
849 850
}

851
namespace {
852

853
class LevelIterator final : public InternalIterator {
J
jorlow@chromium.org 已提交
854
 public:
855
  LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
856
                const FileOptions& file_options,
857 858 859 860 861 862 863
                const InternalKeyComparator& icomparator,
                const LevelFilesBrief* flevel,
                const SliceTransform* prefix_extractor, bool should_sample,
                HistogramImpl* file_read_hist, TableReaderCaller caller,
                bool skip_filters, int level, RangeDelAggregator* range_del_agg,
                const std::vector<AtomicCompactionUnitBoundary>*
                    compaction_boundaries = nullptr)
864
      : table_cache_(table_cache),
865
        read_options_(read_options),
866
        file_options_(file_options),
867
        icomparator_(icomparator),
868
        user_comparator_(icomparator.user_comparator()),
F
Feng Zhu 已提交
869
        flevel_(flevel),
870
        prefix_extractor_(prefix_extractor),
871 872
        file_read_hist_(file_read_hist),
        should_sample_(should_sample),
873
        caller_(caller),
874 875 876 877
        skip_filters_(skip_filters),
        file_index_(flevel_->num_files),
        level_(level),
        range_del_agg_(range_del_agg),
878 879
        pinned_iters_mgr_(nullptr),
        compaction_boundaries_(compaction_boundaries) {
880 881
    // Empty level is not supported.
    assert(flevel_ != nullptr && flevel_->num_files > 0);
A
Aaron Gao 已提交
882 883
  }

M
Michael Liu 已提交
884
  ~LevelIterator() override { delete file_iter_.Set(nullptr); }
885

M
Michael Liu 已提交
886 887 888 889
  void Seek(const Slice& target) override;
  void SeekForPrev(const Slice& target) override;
  void SeekToFirst() override;
  void SeekToLast() override;
890
  void Next() final override;
891
  bool NextAndGetResult(IterateResult* result) override;
M
Michael Liu 已提交
892
  void Prev() override;
893

M
Michael Liu 已提交
894 895
  bool Valid() const override { return file_iter_.Valid(); }
  Slice key() const override {
J
jorlow@chromium.org 已提交
896
    assert(Valid());
897
    return file_iter_.key();
J
jorlow@chromium.org 已提交
898
  }
899

M
Michael Liu 已提交
900
  Slice value() const override {
J
jorlow@chromium.org 已提交
901
    assert(Valid());
902
    return file_iter_.value();
J
jorlow@chromium.org 已提交
903
  }
904

M
Michael Liu 已提交
905
  Status status() const override {
906
    return file_iter_.iter() ? file_iter_.status() : Status::OK();
J
jorlow@chromium.org 已提交
907
  }
908 909 910 911 912 913 914 915 916 917 918

  inline bool MayBeOutOfLowerBound() override {
    assert(Valid());
    return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
  }

  inline bool MayBeOutOfUpperBound() override {
    assert(Valid());
    return file_iter_.MayBeOutOfUpperBound();
  }

M
Michael Liu 已提交
919
  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
920 921 922
    pinned_iters_mgr_ = pinned_iters_mgr;
    if (file_iter_.iter()) {
      file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
923
    }
J
jorlow@chromium.org 已提交
924
  }
925

M
Michael Liu 已提交
926
  bool IsKeyPinned() const override {
927 928 929
    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
           file_iter_.iter() && file_iter_.IsKeyPinned();
  }
930

M
Michael Liu 已提交
931
  bool IsValuePinned() const override {
932 933 934
    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
           file_iter_.iter() && file_iter_.IsValuePinned();
  }
I
Igor Sugak 已提交
935

J
jorlow@chromium.org 已提交
936
 private:
937 938
  // Return true if at least one invalid file is seen and skipped.
  bool SkipEmptyFileForward();
939 940 941
  void SkipEmptyFileBackward();
  void SetFileIterator(InternalIterator* iter);
  void InitFileIterator(size_t new_file_index);
J
jorlow@chromium.org 已提交
942

943 944 945 946 947 948 949
  // Called by both of Next() and NextAndGetResult(). Force inline.
  void NextImpl() {
    assert(Valid());
    file_iter_.Next();
    SkipEmptyFileForward();
  }

950 951 952
  const Slice& file_smallest_key(size_t file_index) {
    assert(file_index < flevel_->num_files);
    return flevel_->files[file_index].smallest_key;
J
jorlow@chromium.org 已提交
953 954
  }

955
  bool KeyReachedUpperBound(const Slice& internal_key) {
956
    return read_options_.iterate_upper_bound != nullptr &&
957
           user_comparator_.CompareWithoutTimestamp(
Y
Yanqin Jin 已提交
958 959
               ExtractUserKey(internal_key), /*a_has_ts=*/true,
               *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
960 961
  }

962 963 964 965 966 967 968
  InternalIterator* NewFileIterator() {
    assert(file_index_ < flevel_->num_files);
    auto file_meta = flevel_->files[file_index_];
    if (should_sample_) {
      sample_file_read_inc(file_meta.file_metadata);
    }

969 970 971 972 973 974
    const InternalKey* smallest_compaction_key = nullptr;
    const InternalKey* largest_compaction_key = nullptr;
    if (compaction_boundaries_ != nullptr) {
      smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
      largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
    }
975
    CheckMayBeOutOfLowerBound();
976
    return table_cache_->NewIterator(
977
        read_options_, file_options_, icomparator_, *file_meta.file_metadata,
978
        range_del_agg_, prefix_extractor_,
979 980 981
        nullptr /* don't need reference to table */, file_read_hist_, caller_,
        /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
        largest_compaction_key);
982 983
  }

984 985 986 987 988
  // Check if current file being fully within iterate_lower_bound.
  //
  // Note MyRocks may update iterate bounds between seek. To workaround it,
  // we need to check and update may_be_out_of_lower_bound_ accordingly.
  void CheckMayBeOutOfLowerBound() {
989 990
    if (read_options_.iterate_lower_bound != nullptr &&
        file_index_ < flevel_->num_files) {
991 992 993 994 995 996 997
      may_be_out_of_lower_bound_ =
          user_comparator_.Compare(
              ExtractUserKey(file_smallest_key(file_index_)),
              *read_options_.iterate_lower_bound) < 0;
    }
  }

L
Lei Jin 已提交
998 999
  TableCache* table_cache_;
  const ReadOptions read_options_;
1000
  const FileOptions& file_options_;
L
Lei Jin 已提交
1001
  const InternalKeyComparator& icomparator_;
1002
  const UserComparatorWrapper user_comparator_;
1003 1004
  const LevelFilesBrief* flevel_;
  mutable FileDescriptor current_value_;
1005 1006 1007
  // `prefix_extractor_` may be non-null even for total order seek. Checking
  // this variable is not the right way to identify whether prefix iterator
  // is used.
1008
  const SliceTransform* prefix_extractor_;
1009

1010
  HistogramImpl* file_read_hist_;
1011
  bool should_sample_;
1012
  TableReaderCaller caller_;
1013
  bool skip_filters_;
1014
  bool may_be_out_of_lower_bound_ = true;
1015
  size_t file_index_;
1016
  int level_;
1017
  RangeDelAggregator* range_del_agg_;
1018 1019
  IteratorWrapper file_iter_;  // May be nullptr
  PinnedIteratorsManager* pinned_iters_mgr_;
1020 1021 1022 1023

  // To be propagated to RangeDelAggregator in order to safely truncate range
  // tombstones.
  const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
L
Lei Jin 已提交
1024
};
T
Tyler Harter 已提交
1025

1026
void LevelIterator::Seek(const Slice& target) {
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
  // Check whether the seek key fall under the same file
  bool need_to_reseek = true;
  if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
    const FdWithKeyRange& cur_file = flevel_->files[file_index_];
    if (icomparator_.InternalKeyComparator::Compare(
            target, cur_file.largest_key) <= 0 &&
        icomparator_.InternalKeyComparator::Compare(
            target, cur_file.smallest_key) >= 0) {
      need_to_reseek = false;
      assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
             file_index_);
    }
  }
  if (need_to_reseek) {
    TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
    size_t new_file_index = FindFile(icomparator_, *flevel_, target);
    InitFileIterator(new_file_index);
  }
1045 1046 1047 1048

  if (file_iter_.iter() != nullptr) {
    file_iter_.Seek(target);
  }
1049
  if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
S
sdong 已提交
1050 1051
      !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
      file_iter_.iter() != nullptr && file_iter_.Valid()) {
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
    // We've skipped the file we initially positioned to. In the prefix
    // seek case, it is likely that the file is skipped because of
    // prefix bloom or hash, where more keys are skipped. We then check
    // the current key and invalidate the iterator if the prefix is
    // already passed.
    // When doing prefix iterator seek, when keys for one prefix have
    // been exhausted, it can jump to any key that is larger. Here we are
    // enforcing a stricter contract than that, in order to make it easier for
    // higher layers (merging and DB iterator) to reason the correctness:
    // 1. Within the prefix, the result should be accurate.
    // 2. If keys for the prefix is exhausted, it is either positioned to the
    //    next key after the prefix, or make the iterator invalid.
    // A side benefit will be that it invalidates the iterator earlier so that
    // the upper level merging iterator can merge fewer child iterators.
    Slice target_user_key = ExtractUserKey(target);
    Slice file_user_key = ExtractUserKey(file_iter_.key());
    if (prefix_extractor_->InDomain(target_user_key) &&
        (!prefix_extractor_->InDomain(file_user_key) ||
         user_comparator_.Compare(
             prefix_extractor_->Transform(target_user_key),
             prefix_extractor_->Transform(file_user_key)) != 0)) {
      SetFileIterator(nullptr);
    }
  }
1076
  CheckMayBeOutOfLowerBound();
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
}

void LevelIterator::SeekForPrev(const Slice& target) {
  size_t new_file_index = FindFile(icomparator_, *flevel_, target);
  if (new_file_index >= flevel_->num_files) {
    new_file_index = flevel_->num_files - 1;
  }

  InitFileIterator(new_file_index);
  if (file_iter_.iter() != nullptr) {
    file_iter_.SeekForPrev(target);
    SkipEmptyFileBackward();
  }
1090
  CheckMayBeOutOfLowerBound();
1091 1092 1093 1094 1095 1096 1097 1098
}

void LevelIterator::SeekToFirst() {
  InitFileIterator(0);
  if (file_iter_.iter() != nullptr) {
    file_iter_.SeekToFirst();
  }
  SkipEmptyFileForward();
1099
  CheckMayBeOutOfLowerBound();
1100 1101 1102 1103 1104 1105 1106 1107
}

void LevelIterator::SeekToLast() {
  InitFileIterator(flevel_->num_files - 1);
  if (file_iter_.iter() != nullptr) {
    file_iter_.SeekToLast();
  }
  SkipEmptyFileBackward();
1108
  CheckMayBeOutOfLowerBound();
1109 1110
}

1111 1112
void LevelIterator::Next() { NextImpl(); }

1113
bool LevelIterator::NextAndGetResult(IterateResult* result) {
1114 1115 1116
  NextImpl();
  bool is_valid = Valid();
  if (is_valid) {
1117 1118
    result->key = key();
    result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
1119 1120
  }
  return is_valid;
1121 1122 1123 1124 1125 1126 1127 1128
}

void LevelIterator::Prev() {
  assert(Valid());
  file_iter_.Prev();
  SkipEmptyFileBackward();
}

1129 1130
bool LevelIterator::SkipEmptyFileForward() {
  bool seen_empty_file = false;
1131
  while (file_iter_.iter() == nullptr ||
1132 1133
         (!file_iter_.Valid() && file_iter_.status().ok() &&
          !file_iter_.iter()->IsOutOfBound())) {
1134
    seen_empty_file = true;
1135 1136 1137 1138
    // Move to next file
    if (file_index_ >= flevel_->num_files - 1) {
      // Already at the last file
      SetFileIterator(nullptr);
1139
      break;
1140 1141 1142
    }
    if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
      SetFileIterator(nullptr);
1143
      break;
1144 1145 1146 1147 1148 1149
    }
    InitFileIterator(file_index_ + 1);
    if (file_iter_.iter() != nullptr) {
      file_iter_.SeekToFirst();
    }
  }
1150
  return seen_empty_file;
1151 1152 1153 1154
}

void LevelIterator::SkipEmptyFileBackward() {
  while (file_iter_.iter() == nullptr ||
1155
         (!file_iter_.Valid() && file_iter_.status().ok())) {
1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
    // Move to previous file
    if (file_index_ == 0) {
      // Already the first file
      SetFileIterator(nullptr);
      return;
    }
    InitFileIterator(file_index_ - 1);
    if (file_iter_.iter() != nullptr) {
      file_iter_.SeekToLast();
    }
  }
}

void LevelIterator::SetFileIterator(InternalIterator* iter) {
  if (pinned_iters_mgr_ && iter) {
    iter->SetPinnedItersMgr(pinned_iters_mgr_);
  }

  InternalIterator* old_iter = file_iter_.Set(iter);
  if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
    pinned_iters_mgr_->PinIterator(old_iter);
  } else {
    delete old_iter;
  }
}

void LevelIterator::InitFileIterator(size_t new_file_index) {
  if (new_file_index >= flevel_->num_files) {
    file_index_ = new_file_index;
    SetFileIterator(nullptr);
    return;
  } else {
    // If the file iterator shows incomplete, we try it again if users seek
    // to the same file, as this time we may go to a different data block
    // which is cached in block cache.
    //
    if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
        new_file_index == file_index_) {
      // file_iter_ is already constructed with this iterator, so
      // no need to change anything
    } else {
      file_index_ = new_file_index;
      InternalIterator* iter = NewFileIterator();
      SetFileIterator(iter);
    }
  }
}
1203
}  // anonymous namespace
1204

S
sdong 已提交
1205 1206
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
1207
// Both of the constructor and destructor need to be called inside DB Mutex.
S
sdong 已提交
1208 1209 1210
class BaseReferencedVersionBuilder {
 public:
  explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
1211
      : version_builder_(new VersionBuilder(
1212
            cfd->current()->version_set()->file_options(), cfd->table_cache(),
1213
            cfd->current()->storage_info(), cfd->ioptions()->info_log)),
S
sdong 已提交
1214 1215 1216
        version_(cfd->current()) {
    version_->Ref();
  }
1217 1218 1219
  ~BaseReferencedVersionBuilder() {
    version_->Unref();
  }
1220
  VersionBuilder* version_builder() { return version_builder_.get(); }
S
sdong 已提交
1221 1222

 private:
1223
  std::unique_ptr<VersionBuilder> version_builder_;
S
sdong 已提交
1224 1225
  Version* version_;
};
1226

1227 1228
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
                                   const FileMetaData* file_meta,
1229
                                   const std::string* fname) const {
I
Igor Canadi 已提交
1230
  auto table_cache = cfd_->table_cache();
1231
  auto ioptions = cfd_->ioptions();
1232
  Status s = table_cache->GetTableProperties(
1233
      file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1234
      mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
  if (s.ok()) {
    return s;
  }

  // We only ignore error type `Incomplete` since it's by design that we
  // disallow table when it's not in table cache.
  if (!s.IsIncomplete()) {
    return s;
  }

  // 2. Table is not present in table cache, we'll read the table properties
  // directly from the properties block in the file.
1247
  std::unique_ptr<FSRandomAccessFile> file;
1248
  std::string file_name;
1249
  if (fname != nullptr) {
1250
    file_name = *fname;
1251
  } else {
1252
    file_name =
1253
      TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1254
                    file_meta->fd.GetPathId());
1255
  }
1256 1257
  s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
                                        nullptr);
1258 1259 1260 1261 1262 1263 1264
  if (!s.ok()) {
    return s;
  }

  TableProperties* raw_table_properties;
  // By setting the magic number to kInvalidTableMagicNumber, we can by
  // pass the magic number check in the footer.
1265
  std::unique_ptr<RandomAccessFileReader> file_reader(
1266 1267 1268
      new RandomAccessFileReader(
          std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
          0 /* hist_type */, nullptr /* file_read_hist */,
1269
          nullptr /* rate_limiter */, ioptions->listeners));
1270
  s = ReadTableProperties(
1271
      file_reader.get(), file_meta->fd.GetFileSize(),
1272 1273
      Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
      &raw_table_properties, false /* compression_type_missing */);
1274 1275 1276
  if (!s.ok()) {
    return s;
  }
1277
  RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1278 1279 1280 1281 1282 1283

  *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
  return s;
}

Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1284
  Status s;
S
sdong 已提交
1285
  for (int level = 0; level < storage_info_.num_levels_; level++) {
1286 1287 1288 1289 1290 1291 1292 1293 1294
    s = GetPropertiesOfAllTables(props, level);
    if (!s.ok()) {
      return s;
    }
  }

  return Status::OK();
}

1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348
Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
                                            std::string* out_str) {
  if (max_entries_to_print <= 0) {
    return Status::OK();
  }
  int num_entries_left = max_entries_to_print;

  std::stringstream ss;

  for (int level = 0; level < storage_info_.num_levels_; level++) {
    for (const auto& file_meta : storage_info_.files_[level]) {
      auto fname =
          TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
                        file_meta->fd.GetPathId());

      ss << "=== file : " << fname << " ===\n";

      TableCache* table_cache = cfd_->table_cache();
      std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;

      Status s = table_cache->GetRangeTombstoneIterator(
          ReadOptions(), cfd_->internal_comparator(), *file_meta,
          &tombstone_iter);
      if (!s.ok()) {
        return s;
      }
      if (tombstone_iter) {
        tombstone_iter->SeekToFirst();

        while (tombstone_iter->Valid() && num_entries_left > 0) {
          ss << "start: " << tombstone_iter->start_key().ToString(true)
             << " end: " << tombstone_iter->end_key().ToString(true)
             << " seq: " << tombstone_iter->seq() << '\n';
          tombstone_iter->Next();
          num_entries_left--;
        }
        if (num_entries_left <= 0) {
          break;
        }
      }
    }
    if (num_entries_left <= 0) {
      break;
    }
  }
  assert(num_entries_left >= 0);
  if (num_entries_left <= 0) {
    ss << "(results may not be complete)\n";
  }

  *out_str = ss.str();
  return Status::OK();
}

1349 1350 1351 1352
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
                                         int level) {
  for (const auto& file_meta : storage_info_.files_[level]) {
    auto fname =
1353
        TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1354 1355 1356 1357 1358 1359 1360 1361 1362
                      file_meta->fd.GetPathId());
    // 1. If the table is already present in table cache, load table
    // properties from there.
    std::shared_ptr<const TableProperties> table_properties;
    Status s = GetTableProperties(&table_properties, file_meta, &fname);
    if (s.ok()) {
      props->insert({fname, table_properties});
    } else {
      return s;
1363 1364 1365 1366 1367 1368
    }
  }

  return Status::OK();
}

1369
Status Version::GetPropertiesOfTablesInRange(
1370
    const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1371
  for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1372
    for (decltype(n) i = 0; i < n; i++) {
1373 1374 1375 1376 1377 1378 1379 1380
      // Convert user_key into a corresponding internal key.
      InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
      InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
      std::vector<FileMetaData*> files;
      storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
                                         false);
      for (const auto& file_meta : files) {
        auto fname =
1381
            TableFileName(cfd_->ioptions()->cf_paths,
1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400
                          file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
        if (props->count(fname) == 0) {
          // 1. If the table is already present in table cache, load table
          // properties from there.
          std::shared_ptr<const TableProperties> table_properties;
          Status s = GetTableProperties(&table_properties, file_meta, &fname);
          if (s.ok()) {
            props->insert({fname, table_properties});
          } else {
            return s;
          }
        }
      }
    }
  }

  return Status::OK();
}

1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421
Status Version::GetAggregatedTableProperties(
    std::shared_ptr<const TableProperties>* tp, int level) {
  TablePropertiesCollection props;
  Status s;
  if (level < 0) {
    s = GetPropertiesOfAllTables(&props);
  } else {
    s = GetPropertiesOfAllTables(&props, level);
  }
  if (!s.ok()) {
    return s;
  }

  auto* new_tp = new TableProperties();
  for (const auto& item : props) {
    new_tp->Add(*item.second);
  }
  tp->reset(new_tp);
  return Status::OK();
}

1422 1423
size_t Version::GetMemoryUsageByTableReaders() {
  size_t total_usage = 0;
S
sdong 已提交
1424
  for (auto& file_level : storage_info_.level_files_brief_) {
1425 1426
    for (size_t i = 0; i < file_level.num_files; i++) {
      total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1427
          file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1428
          mutable_cf_options_.prefix_extractor.get());
1429 1430 1431 1432 1433
    }
  }
  return total_usage;
}

1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452
void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
  assert(cf_meta);
  assert(cfd_);

  cf_meta->name = cfd_->GetName();
  cf_meta->size = 0;
  cf_meta->file_count = 0;
  cf_meta->levels.clear();

  auto* ioptions = cfd_->ioptions();
  auto* vstorage = storage_info();

  for (int level = 0; level < cfd_->NumberLevels(); level++) {
    uint64_t level_size = 0;
    cf_meta->file_count += vstorage->LevelFiles(level).size();
    std::vector<SstFileMetaData> files;
    for (const auto& file : vstorage->LevelFiles(level)) {
      uint32_t path_id = file->fd.GetPathId();
      std::string file_path;
1453 1454
      if (path_id < ioptions->cf_paths.size()) {
        file_path = ioptions->cf_paths[path_id].path;
1455
      } else {
1456 1457
        assert(!ioptions->cf_paths.empty());
        file_path = ioptions->cf_paths.back().path;
1458
      }
1459
      const uint64_t file_number = file->fd.GetNumber();
1460
      files.emplace_back(SstFileMetaData{
1461 1462 1463
          MakeTableFileName("", file_number), file_number, file_path,
          static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
          file->fd.largest_seqno, file->smallest.user_key().ToString(),
1464
          file->largest.user_key().ToString(),
1465
          file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1466
          file->being_compacted, file->oldest_blob_file_number,
1467 1468
          file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
          file->file_checksum, file->file_checksum_func_name});
1469 1470
      files.back().num_entries = file->num_entries;
      files.back().num_deletions = file->num_deletions;
1471 1472 1473 1474 1475 1476 1477 1478
      level_size += file->fd.GetFileSize();
    }
    cf_meta->levels.emplace_back(
        level, level_size, std::move(files));
    cf_meta->size += level_size;
  }
}

1479 1480 1481 1482 1483 1484 1485 1486 1487
uint64_t Version::GetSstFilesSize() {
  uint64_t sst_files_size = 0;
  for (int level = 0; level < storage_info_.num_levels_; level++) {
    for (const auto& file_meta : storage_info_.LevelFiles(level)) {
      sst_files_size += file_meta->fd.GetFileSize();
    }
  }
  return sst_files_size;
}
1488

1489 1490 1491 1492 1493
void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
  uint64_t oldest_time = port::kMaxUint64;
  for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
    for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
      assert(meta->fd.table_reader != nullptr);
1494 1495 1496
      uint64_t file_creation_time = meta->TryGetFileCreationTime();
      if (file_creation_time == kUnknownFileCreationTime) {
        *creation_time = 0;
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506
        return;
      }
      if (file_creation_time < oldest_time) {
        oldest_time = file_creation_time;
      }
    }
  }
  *creation_time = oldest_time;
}

S
sdong 已提交
1507
uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1508 1509
  // Estimation will be inaccurate when:
  // (1) there exist merge keys
S
sdong 已提交
1510 1511
  // (2) keys are directly overwritten
  // (3) deletion on non-existing keys
1512
  // (4) low number of samples
1513
  if (current_num_samples_ == 0) {
1514 1515 1516
    return 0;
  }

1517
  if (current_num_non_deletions_ <= current_num_deletions_) {
1518 1519 1520
    return 0;
  }

1521
  uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1522

Y
Yueh-Hsuan Chiang 已提交
1523 1524 1525 1526 1527
  uint64_t file_count = 0;
  for (int level = 0; level < num_levels_; ++level) {
    file_count += files_[level].size();
  }

1528
  if (current_num_samples_ < file_count) {
1529
    // casting to avoid overflowing
1530
    return
D
Dmitri Smirnov 已提交
1531
      static_cast<uint64_t>(
D
Dmitri Smirnov 已提交
1532
        (est * static_cast<double>(file_count) / current_num_samples_)
D
Dmitri Smirnov 已提交
1533
      );
1534
  } else {
1535
    return est;
1536
  }
S
sdong 已提交
1537 1538
}

1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553
double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
    int level) const {
  assert(level < num_levels_);
  uint64_t sum_file_size_bytes = 0;
  uint64_t sum_data_size_bytes = 0;
  for (auto* file_meta : files_[level]) {
    sum_file_size_bytes += file_meta->fd.GetFileSize();
    sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
  }
  if (sum_file_size_bytes == 0) {
    return -1.0;
  }
  return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
}

1554
void Version::AddIterators(const ReadOptions& read_options,
1555
                           const FileOptions& soptions,
A
Andrew Kryczka 已提交
1556
                           MergeIteratorBuilder* merge_iter_builder,
1557
                           RangeDelAggregator* range_del_agg) {
S
sdong 已提交
1558
  assert(storage_info_.finalized_);
1559

1560
  for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
A
Andrew Kryczka 已提交
1561 1562
    AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
                         range_del_agg);
1563 1564 1565 1566
  }
}

void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1567
                                   const FileOptions& soptions,
1568
                                   MergeIteratorBuilder* merge_iter_builder,
A
Andrew Kryczka 已提交
1569
                                   int level,
1570
                                   RangeDelAggregator* range_del_agg) {
1571 1572 1573 1574 1575 1576
  assert(storage_info_.finalized_);
  if (level >= storage_info_.num_non_empty_levels()) {
    // This is an empty level
    return;
  } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
    // No files in this level
S
sdong 已提交
1577 1578 1579
    return;
  }

1580 1581
  bool should_sample = should_sample_file_read();

1582
  auto* arena = merge_iter_builder->GetArena();
1583 1584 1585 1586 1587
  if (level == 0) {
    // Merge all level zero files together since they may overlap
    for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
      const auto& file = storage_info_.LevelFilesBrief(0).files[i];
      merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1588 1589 1590 1591 1592 1593 1594 1595
          read_options, soptions, cfd_->internal_comparator(),
          *file.file_metadata, range_del_agg,
          mutable_cf_options_.prefix_extractor.get(), nullptr,
          cfd_->internal_stats()->GetFileReadHist(0),
          TableReaderCaller::kUserIterator, arena,
          /*skip_filters=*/false, /*level=*/0,
          /*smallest_compaction_key=*/nullptr,
          /*largest_compaction_key=*/nullptr));
1596
    }
1597 1598 1599 1600 1601 1602 1603 1604 1605
    if (should_sample) {
      // Count ones for every L0 files. This is done per iterator creation
      // rather than Seek(), while files in other levels are recored per seek.
      // If users execute one range query per iterator, there may be some
      // discrepancy here.
      for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
        sample_file_read_inc(meta);
      }
    }
1606
  } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1607 1608 1609
    // For levels > 0, we can use a concatenating iterator that sequentially
    // walks through the non-overlapping files in the level, opening them
    // lazily.
1610 1611 1612
    auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
    merge_iter_builder->AddIterator(new (mem) LevelIterator(
        cfd_->table_cache(), read_options, soptions,
1613
        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1614
        mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1615
        cfd_->internal_stats()->GetFileReadHist(level),
1616 1617
        TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
        range_del_agg, /*largest_compaction_key=*/nullptr));
1618 1619 1620
  }
}

1621
Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1622
                                         const FileOptions& file_options,
1623 1624 1625 1626 1627 1628 1629 1630 1631 1632
                                         const Slice& smallest_user_key,
                                         const Slice& largest_user_key,
                                         int level, bool* overlap) {
  assert(storage_info_.finalized_);

  auto icmp = cfd_->internal_comparator();
  auto ucmp = icmp.user_comparator();

  Arena arena;
  Status status;
1633 1634
  ReadRangeDelAggregator range_del_agg(&icmp,
                                       kMaxSequenceNumber /* upper_bound */);
1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645

  *overlap = false;

  if (level == 0) {
    for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
      const auto file = &storage_info_.LevelFilesBrief(0).files[i];
      if (AfterFile(ucmp, &smallest_user_key, file) ||
          BeforeFile(ucmp, &largest_user_key, file)) {
        continue;
      }
      ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1646
          read_options, file_options, cfd_->internal_comparator(),
1647 1648 1649 1650 1651 1652 1653
          *file->file_metadata, &range_del_agg,
          mutable_cf_options_.prefix_extractor.get(), nullptr,
          cfd_->internal_stats()->GetFileReadHist(0),
          TableReaderCaller::kUserIterator, &arena,
          /*skip_filters=*/false, /*level=*/0,
          /*smallest_compaction_key=*/nullptr,
          /*largest_compaction_key=*/nullptr));
1654 1655 1656 1657 1658 1659 1660 1661 1662
      status = OverlapWithIterator(
          ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
      if (!status.ok() || *overlap) {
        break;
      }
    }
  } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
    auto mem = arena.AllocateAligned(sizeof(LevelIterator));
    ScopedArenaIterator iter(new (mem) LevelIterator(
1663
        cfd_->table_cache(), read_options, file_options,
1664
        cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1665
        mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1666
        cfd_->internal_stats()->GetFileReadHist(level),
1667
        TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679
        &range_del_agg));
    status = OverlapWithIterator(
        ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
  }

  if (status.ok() && *overlap == false &&
      range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
    *overlap = true;
  }
  return status;
}

S
sdong 已提交
1680 1681
VersionStorageInfo::VersionStorageInfo(
    const InternalKeyComparator* internal_comparator,
I
Igor Canadi 已提交
1682
    const Comparator* user_comparator, int levels,
1683 1684
    CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
    bool _force_consistency_checks)
S
sdong 已提交
1685 1686
    : internal_comparator_(internal_comparator),
      user_comparator_(user_comparator),
1687
      // cfd is nullptr if Version is dummy
I
Igor Canadi 已提交
1688
      num_levels_(levels),
S
sdong 已提交
1689
      num_non_empty_levels_(0),
S
sdong 已提交
1690 1691
      file_indexer_(user_comparator),
      compaction_style_(compaction_style),
1692
      files_(new std::vector<FileMetaData*>[num_levels_]),
S
sdong 已提交
1693
      base_level_(num_levels_ == 1 ? -1 : 1),
1694
      level_multiplier_(0.0),
1695
      files_by_compaction_pri_(num_levels_),
1696
      level0_non_overlapping_(false),
1697 1698 1699
      next_file_to_compact_by_size_(num_levels_),
      compaction_score_(num_levels_),
      compaction_level_(num_levels_),
S
sdong 已提交
1700
      l0_delay_trigger_count_(0),
1701 1702 1703 1704 1705
      accumulated_file_size_(0),
      accumulated_raw_key_size_(0),
      accumulated_raw_value_size_(0),
      accumulated_num_non_deletions_(0),
      accumulated_num_deletions_(0),
1706 1707 1708
      current_num_non_deletions_(0),
      current_num_deletions_(0),
      current_num_samples_(0),
1709
      estimated_compaction_needed_bytes_(0),
1710 1711
      finalized_(false),
      force_consistency_checks_(_force_consistency_checks) {
S
sdong 已提交
1712 1713 1714 1715 1716 1717 1718
  if (ref_vstorage != nullptr) {
    accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
    accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
    accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
    accumulated_num_non_deletions_ =
        ref_vstorage->accumulated_num_non_deletions_;
    accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1719 1720 1721
    current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
    current_num_deletions_ = ref_vstorage->current_num_deletions_;
    current_num_samples_ = ref_vstorage->current_num_samples_;
1722
    oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1723
  }
1724
}
1725

I
Igor Canadi 已提交
1726
Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1727
                 const FileOptions& file_opt,
1728 1729
                 const MutableCFOptions mutable_cf_options,
                 uint64_t version_number)
1730 1731
    : env_(vset->env_),
      cfd_(column_family_data),
I
Igor Canadi 已提交
1732 1733 1734 1735 1736 1737
      info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
      db_statistics_((cfd_ == nullptr) ? nullptr
                                       : cfd_->ioptions()->statistics),
      table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
      merge_operator_((cfd_ == nullptr) ? nullptr
                                        : cfd_->ioptions()->merge_operator),
1738 1739 1740 1741 1742 1743 1744 1745 1746 1747
      storage_info_(
          (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
          (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
          cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
          cfd_ == nullptr ? kCompactionStyleLevel
                          : cfd_->ioptions()->compaction_style,
          (cfd_ == nullptr || cfd_->current() == nullptr)
              ? nullptr
              : cfd_->current()->storage_info(),
          cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
S
sdong 已提交
1748 1749 1750 1751
      vset_(vset),
      next_(this),
      prev_(this),
      refs_(0),
1752
      file_options_(file_opt),
1753
      mutable_cf_options_(mutable_cf_options),
S
sdong 已提交
1754 1755
      version_number_(version_number) {}

1756
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
H
Huisheng Liu 已提交
1757
                  PinnableSlice* value, std::string* timestamp, Status* status,
A
Andrew Kryczka 已提交
1758
                  MergeContext* merge_context,
1759
                  SequenceNumber* max_covering_tombstone_seq, bool* value_found,
Y
Yi Wu 已提交
1760
                  bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1761
                  bool* is_blob, bool do_merge) {
1762 1763
  Slice ikey = k.internal_key();
  Slice user_key = k.user_key();
1764 1765

  assert(status->ok() || status->IsMergeInProgress());
1766

1767 1768 1769 1770 1771
  if (key_exists != nullptr) {
    // will falsify below if not found
    *key_exists = true;
  }

1772
  PinnedIteratorsManager pinned_iters_mgr;
1773 1774 1775 1776 1777
  uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
  if (vset_ && vset_->block_cache_tracer_ &&
      vset_->block_cache_tracer_->is_tracing_enabled()) {
    tracing_get_id = vset_->block_cache_tracer_->NextGetId();
  }
S
sdong 已提交
1778
  GetContext get_context(
S
sdong 已提交
1779
      user_comparator(), merge_operator_, info_log_, db_statistics_,
S
sdong 已提交
1780
      status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
H
Huisheng Liu 已提交
1781 1782
      do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
      merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq,
1783
      merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1784
      tracing_get_id);
1785 1786 1787 1788 1789

  // Pin blocks that we read to hold merge operands
  if (merge_operator_) {
    pinned_iters_mgr.StartPinning();
  }
1790

S
sdong 已提交
1791 1792 1793 1794
  FilePicker fp(
      storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
      storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
      user_comparator(), internal_comparator());
1795
  FdWithKeyRange* f = fp.GetNextFile();
1796

1797
  while (f != nullptr) {
1798
    if (*max_covering_tombstone_seq > 0) {
1799 1800 1801
      // The remaining files we look at will only contain covered keys, so we
      // stop here.
      break;
1802
    }
1803 1804 1805
    if (get_context.sample()) {
      sample_file_read_inc(f->file_metadata);
    }
1806

1807 1808 1809 1810
    bool timer_enabled =
        GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
        get_perf_context()->per_level_perf_context_enabled;
    StopWatchNano timer(env_, timer_enabled /* auto_start */);
1811
    *status = table_cache_->Get(
1812 1813
        read_options, *internal_comparator(), *f->file_metadata, ikey,
        &get_context, mutable_cf_options_.prefix_extractor.get(),
1814
        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1815
        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1816 1817
                        fp.IsHitFileLastInLevel()),
        fp.GetCurrentLevel());
1818
    // TODO: examine the behavior for corrupted key
1819 1820 1821 1822
    if (timer_enabled) {
      PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
                                fp.GetCurrentLevel());
    }
1823 1824
    if (!status->ok()) {
      return;
1825
    }
1826

1827 1828
    // report the counters before returning
    if (get_context.State() != GetContext::kNotFound &&
1829 1830 1831
        get_context.State() != GetContext::kMerge &&
        db_statistics_ != nullptr) {
      get_context.ReportCounters();
1832
    }
1833 1834 1835 1836
    switch (get_context.State()) {
      case GetContext::kNotFound:
        // Keep searching in other files
        break;
1837
      case GetContext::kMerge:
1838
        // TODO: update per-level perfcontext user_key_return_count for kMerge
1839
        break;
1840
      case GetContext::kFound:
1841 1842 1843 1844 1845 1846 1847
        if (fp.GetHitFileLevel() == 0) {
          RecordTick(db_statistics_, GET_HIT_L0);
        } else if (fp.GetHitFileLevel() == 1) {
          RecordTick(db_statistics_, GET_HIT_L1);
        } else if (fp.GetHitFileLevel() >= 2) {
          RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
        }
1848 1849
        PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
                                  fp.GetHitFileLevel());
1850
        return;
1851 1852 1853
      case GetContext::kDeleted:
        // Use empty error message for speed
        *status = Status::NotFound();
1854
        return;
1855
      case GetContext::kCorrupt:
1856 1857
        *status = Status::Corruption("corrupted key for ", user_key);
        return;
Y
Yi Wu 已提交
1858 1859 1860 1861
      case GetContext::kBlobIndex:
        ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
        *status = Status::NotSupported(
            "Encounter unexpected blob index. Please open DB with "
1862
            "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
Y
Yi Wu 已提交
1863
        return;
1864
    }
1865
    f = fp.GetNextFile();
1866
  }
1867 1868
  if (db_statistics_ != nullptr) {
    get_context.ReportCounters();
1869
  }
1870
  if (GetContext::kMerge == get_context.State()) {
1871 1872 1873 1874
    if (!do_merge) {
      *status = Status::OK();
      return;
    }
1875 1876 1877 1878 1879
    if (!merge_operator_) {
      *status =  Status::InvalidArgument(
          "merge_operator is not properly initialized.");
      return;
    }
1880 1881
    // merge_operands are in saver and we hit the beginning of the key history
    // do a final merge of nullptr and operands;
M
Maysam Yabandeh 已提交
1882 1883 1884
    std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
    *status = MergeHelper::TimedFullMerge(
        merge_operator_, user_key, nullptr, merge_context->GetOperands(),
1885 1886
        str_value, info_log_, db_statistics_, env_,
        nullptr /* result_operand */, true);
M
Maysam Yabandeh 已提交
1887 1888 1889
    if (LIKELY(value != nullptr)) {
      value->PinSelf();
    }
1890
  } else {
1891 1892 1893
    if (key_exists != nullptr) {
      *key_exists = false;
    }
1894
    *status = Status::NotFound(); // Use an empty error message for speed
1895
  }
1896 1897
}

1898 1899 1900 1901 1902 1903 1904 1905
void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
                       ReadCallback* callback, bool* is_blob) {
  PinnedIteratorsManager pinned_iters_mgr;

  // Pin blocks that we read to hold merge operands
  if (merge_operator_) {
    pinned_iters_mgr.StartPinning();
  }
1906
  uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
1907

1908 1909 1910 1911
  if (vset_ && vset_->block_cache_tracer_ &&
      vset_->block_cache_tracer_->is_tracing_enabled()) {
    tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
  }
1912 1913 1914 1915 1916 1917 1918 1919 1920
  // Even though we know the batch size won't be > MAX_BATCH_SIZE,
  // use autovector in order to avoid unnecessary construction of GetContext
  // objects, which is expensive
  autovector<GetContext, 16> get_ctx;
  for (auto iter = range->begin(); iter != range->end(); ++iter) {
    assert(iter->s->ok() || iter->s->IsMergeInProgress());
    get_ctx.emplace_back(
        user_comparator(), merge_operator_, info_log_, db_statistics_,
        iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
H
Huisheng Liu 已提交
1921 1922
        iter->value, /*timestamp*/ nullptr, nullptr, &(iter->merge_context),
        true, &iter->max_covering_tombstone_seq, this->env_, nullptr,
1923 1924
        merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
        tracing_mget_id);
1925 1926 1927 1928 1929
    // MergeInProgress status, if set, has been transferred to the get_context
    // state, so we set status to ok here. From now on, the iter status will
    // be used for IO errors, and get_context state will be used for any
    // key level errors
    *(iter->s) = Status::OK();
1930 1931 1932 1933 1934
  }
  int get_ctx_index = 0;
  for (auto iter = range->begin(); iter != range->end();
       ++iter, get_ctx_index++) {
    iter->get_context = &(get_ctx[get_ctx_index]);
1935 1936 1937 1938
  }

  MultiGetRange file_picker_range(*range, range->begin(), range->end());
  FilePickerMultiGet fp(
1939
      &file_picker_range,
1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973
      &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
      &storage_info_.file_indexer_, user_comparator(), internal_comparator());
  FdWithKeyRange* f = fp.GetNextFile();

  while (f != nullptr) {
    MultiGetRange file_range = fp.CurrentFileRange();
    bool timer_enabled =
        GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
        get_perf_context()->per_level_perf_context_enabled;
    StopWatchNano timer(env_, timer_enabled /* auto_start */);
    Status s = table_cache_->MultiGet(
        read_options, *internal_comparator(), *f->file_metadata, &file_range,
        mutable_cf_options_.prefix_extractor.get(),
        cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
        IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
                        fp.IsHitFileLastInLevel()),
        fp.GetCurrentLevel());
    // TODO: examine the behavior for corrupted key
    if (timer_enabled) {
      PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
                                fp.GetCurrentLevel());
    }
    if (!s.ok()) {
      // TODO: Set status for individual keys appropriately
      for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
        *iter->s = s;
        file_range.MarkKeyDone(iter);
      }
      return;
    }
    uint64_t batch_size = 0;
    for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
      GetContext& get_context = *iter->get_context;
      Status* status = iter->s;
1974 1975 1976 1977 1978 1979 1980 1981 1982
      // The Status in the KeyContext takes precedence over GetContext state
      // Status may be an error if there were any IO errors in the table
      // reader. We never expect Status to be NotFound(), as that is
      // determined by get_context
      assert(!status->IsNotFound());
      if (!status->ok()) {
        file_range.MarkKeyDone(iter);
        continue;
      }
1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032

      if (get_context.sample()) {
        sample_file_read_inc(f->file_metadata);
      }
      batch_size++;
      // report the counters before returning
      if (get_context.State() != GetContext::kNotFound &&
          get_context.State() != GetContext::kMerge &&
          db_statistics_ != nullptr) {
        get_context.ReportCounters();
      } else {
        if (iter->max_covering_tombstone_seq > 0) {
          // The remaining files we look at will only contain covered keys, so
          // we stop here for this key
          file_picker_range.SkipKey(iter);
        }
      }
      switch (get_context.State()) {
        case GetContext::kNotFound:
          // Keep searching in other files
          break;
        case GetContext::kMerge:
          // TODO: update per-level perfcontext user_key_return_count for kMerge
          break;
        case GetContext::kFound:
          if (fp.GetHitFileLevel() == 0) {
            RecordTick(db_statistics_, GET_HIT_L0);
          } else if (fp.GetHitFileLevel() == 1) {
            RecordTick(db_statistics_, GET_HIT_L1);
          } else if (fp.GetHitFileLevel() >= 2) {
            RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
          }
          PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
                                    fp.GetHitFileLevel());
          file_range.MarkKeyDone(iter);
          continue;
        case GetContext::kDeleted:
          // Use empty error message for speed
          *status = Status::NotFound();
          file_range.MarkKeyDone(iter);
          continue;
        case GetContext::kCorrupt:
          *status =
              Status::Corruption("corrupted key for ", iter->lkey->user_key());
          file_range.MarkKeyDone(iter);
          continue;
        case GetContext::kBlobIndex:
          ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
          *status = Status::NotSupported(
              "Encounter unexpected blob index. Please open DB with "
2033
              "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078
          file_range.MarkKeyDone(iter);
          continue;
      }
    }
    RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
    if (file_picker_range.empty()) {
      break;
    }
    f = fp.GetNextFile();
  }

  // Process any left over keys
  for (auto iter = range->begin(); iter != range->end(); ++iter) {
    GetContext& get_context = *iter->get_context;
    Status* status = iter->s;
    Slice user_key = iter->lkey->user_key();

    if (db_statistics_ != nullptr) {
      get_context.ReportCounters();
    }
    if (GetContext::kMerge == get_context.State()) {
      if (!merge_operator_) {
        *status = Status::InvalidArgument(
            "merge_operator is not properly initialized.");
        range->MarkKeyDone(iter);
        continue;
      }
      // merge_operands are in saver and we hit the beginning of the key history
      // do a final merge of nullptr and operands;
      std::string* str_value =
          iter->value != nullptr ? iter->value->GetSelf() : nullptr;
      *status = MergeHelper::TimedFullMerge(
          merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
          str_value, info_log_, db_statistics_, env_,
          nullptr /* result_operand */, true);
      if (LIKELY(iter->value != nullptr)) {
        iter->value->PinSelf();
      }
    } else {
      range->MarkKeyDone(iter);
      *status = Status::NotFound();  // Use an empty error message for speed
    }
  }
}

2079
bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2080 2081
  // Reaching the bottom level implies misses at all upper levels, so we'll
  // skip checking the filters when we predict a hit.
2082 2083
  return cfd_->ioptions()->optimize_filters_for_hits &&
         (level > 0 || is_file_last_in_level) &&
2084 2085 2086
         level == storage_info_.num_non_empty_levels() - 1;
}

S
sdong 已提交
2087
void VersionStorageInfo::GenerateLevelFilesBrief() {
2088
  level_files_brief_.resize(num_non_empty_levels_);
2089
  for (int level = 0; level < num_non_empty_levels_; level++) {
2090 2091
    DoGenerateLevelFilesBrief(
        &level_files_brief_[level], files_[level], &arena_);
2092 2093 2094
  }
}

2095 2096 2097 2098
void Version::PrepareApply(
    const MutableCFOptions& mutable_cf_options,
    bool update_stats) {
  UpdateAccumulatedStats(update_stats);
S
sdong 已提交
2099
  storage_info_.UpdateNumNonEmptyLevels();
2100
  storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
Y
Yi Wu 已提交
2101
  storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
S
sdong 已提交
2102 2103
  storage_info_.GenerateFileIndexer();
  storage_info_.GenerateLevelFilesBrief();
2104
  storage_info_.GenerateLevel0NonOverlapping();
2105
  storage_info_.GenerateBottommostFiles();
2106 2107
}

2108
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2109 2110
  if (file_meta->init_stats_from_file ||
      file_meta->compensated_file_size > 0) {
2111 2112 2113 2114
    return false;
  }
  std::shared_ptr<const TableProperties> tp;
  Status s = GetTableProperties(&tp, file_meta);
2115
  file_meta->init_stats_from_file = true;
2116
  if (!s.ok()) {
2117 2118 2119 2120
    ROCKS_LOG_ERROR(vset_->db_options_->info_log,
                    "Unable to load table properties for file %" PRIu64
                    " --- %s\n",
                    file_meta->fd.GetNumber(), s.ToString().c_str());
2121 2122 2123 2124
    return false;
  }
  if (tp.get() == nullptr) return false;
  file_meta->num_entries = tp->num_entries;
2125
  file_meta->num_deletions = tp->num_deletions;
2126 2127 2128 2129 2130 2131
  file_meta->raw_value_size = tp->raw_value_size;
  file_meta->raw_key_size = tp->raw_key_size;

  return true;
}

S
sdong 已提交
2132
void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2133 2134 2135
  TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
                           nullptr);

2136 2137 2138 2139 2140 2141 2142
  assert(file_meta->init_stats_from_file);
  accumulated_file_size_ += file_meta->fd.GetFileSize();
  accumulated_raw_key_size_ += file_meta->raw_key_size;
  accumulated_raw_value_size_ += file_meta->raw_value_size;
  accumulated_num_non_deletions_ +=
      file_meta->num_entries - file_meta->num_deletions;
  accumulated_num_deletions_ += file_meta->num_deletions;
2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156

  current_num_non_deletions_ +=
      file_meta->num_entries - file_meta->num_deletions;
  current_num_deletions_ += file_meta->num_deletions;
  current_num_samples_++;
}

void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
  if (file_meta->init_stats_from_file) {
    current_num_non_deletions_ -=
        file_meta->num_entries - file_meta->num_deletions;
    current_num_deletions_ -= file_meta->num_deletions;
    current_num_samples_--;
  }
2157 2158
}

2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180
void Version::UpdateAccumulatedStats(bool update_stats) {
  if (update_stats) {
    // maximum number of table properties loaded from files.
    const int kMaxInitCount = 20;
    int init_count = 0;
    // here only the first kMaxInitCount files which haven't been
    // initialized from file will be updated with num_deletions.
    // The motivation here is to cap the maximum I/O per Version creation.
    // The reason for choosing files from lower-level instead of higher-level
    // is that such design is able to propagate the initialization from
    // lower-level to higher-level:  When the num_deletions of lower-level
    // files are updated, it will make the lower-level files have accurate
    // compensated_file_size, making lower-level to higher-level compaction
    // will be triggered, which creates higher-level files whose num_deletions
    // will be updated here.
    for (int level = 0;
         level < storage_info_.num_levels_ && init_count < kMaxInitCount;
         ++level) {
      for (auto* file_meta : storage_info_.files_[level]) {
        if (MaybeInitializeFileMetaData(file_meta)) {
          // each FileMeta will be initialized only once.
          storage_info_.UpdateAccumulatedStats(file_meta);
2181 2182
          // when option "max_open_files" is -1, all the file metadata has
          // already been read, so MaybeInitializeFileMetaData() won't incur
L
Leonidas Galanis 已提交
2183 2184
          // any I/O cost. "max_open_files=-1" means that the table cache passed
          // to the VersionSet and then to the ColumnFamilySet has a size of
2185
          // TableCache::kInfiniteCapacity
L
Leonidas Galanis 已提交
2186
          if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2187
              TableCache::kInfiniteCapacity) {
2188 2189
            continue;
          }
2190 2191 2192
          if (++init_count >= kMaxInitCount) {
            break;
          }
2193 2194 2195
        }
      }
    }
2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206
    // In case all sampled-files contain only deletion entries, then we
    // load the table-property of a file in higher-level to initialize
    // that value.
    for (int level = storage_info_.num_levels_ - 1;
         storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
         --level) {
      for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
           storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
        if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
          storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
        }
2207 2208 2209 2210
      }
    }
  }

S
sdong 已提交
2211
  storage_info_.ComputeCompensatedSizes();
S
sdong 已提交
2212 2213 2214 2215
}

void VersionStorageInfo::ComputeCompensatedSizes() {
  static const int kDeletionWeightOnCompaction = 2;
2216 2217 2218 2219 2220
  uint64_t average_value_size = GetAverageValueSize();

  // compute the compensated size
  for (int level = 0; level < num_levels_; level++) {
    for (auto* file_meta : files_[level]) {
2221
      // Here we only compute compensated_file_size for those file_meta
I
Igor Canadi 已提交
2222 2223 2224
      // which compensated_file_size is uninitialized (== 0). This is true only
      // for files that have been created right now and no other thread has
      // access to them. That's why we can safely mutate compensated_file_size.
2225
      if (file_meta->compensated_file_size == 0) {
2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236
        file_meta->compensated_file_size = file_meta->fd.GetFileSize();
        // Here we only boost the size of deletion entries of a file only
        // when the number of deletion entries is greater than the number of
        // non-deletion entries in the file.  The motivation here is that in
        // a stable workload, the number of deletion entries should be roughly
        // equal to the number of non-deletion entries.  If we compensate the
        // size of deletion entries in a stable workload, the deletion
        // compensation logic might introduce unwanted effet which changes the
        // shape of LSM tree.
        if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
          file_meta->compensated_file_size +=
2237 2238
              (file_meta->num_deletions * 2 - file_meta->num_entries) *
              average_value_size * kDeletionWeightOnCompaction;
2239
        }
2240
      }
2241 2242 2243 2244
    }
  }
}

S
sdong 已提交
2245 2246
int VersionStorageInfo::MaxInputLevel() const {
  if (compaction_style_ == kCompactionStyleLevel) {
2247
    return num_levels() - 2;
S
sdong 已提交
2248 2249 2250 2251
  }
  return 0;
}

2252 2253 2254 2255 2256 2257 2258 2259
int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
  if (allow_ingest_behind) {
    assert(num_levels() > 1);
    return num_levels() - 2;
  }
  return num_levels() - 1;
}

2260 2261 2262 2263
void VersionStorageInfo::EstimateCompactionBytesNeeded(
    const MutableCFOptions& mutable_cf_options) {
  // Only implemented for level-based compaction
  if (compaction_style_ != kCompactionStyleLevel) {
2264
    estimated_compaction_needed_bytes_ = 0;
2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277
    return;
  }

  // Start from Level 0, if level 0 qualifies compaction to level 1,
  // we estimate the size of compaction.
  // Then we move on to the next level and see whether it qualifies compaction
  // to the next level. The size of the level is estimated as the actual size
  // on the level plus the input bytes from the previous level if there is any.
  // If it exceeds, take the exceeded bytes as compaction input and add the size
  // of the compaction size to tatal size.
  // We keep doing it to Level 2, 3, etc, until the last level and return the
  // accumulated bytes.

I
Igor Canadi 已提交
2278
  uint64_t bytes_compact_to_next_level = 0;
2279 2280 2281 2282
  uint64_t level_size = 0;
  for (auto* f : files_[0]) {
    level_size += f->fd.GetFileSize();
  }
2283 2284
  // Level 0
  bool level0_compact_triggered = false;
2285 2286 2287
  if (static_cast<int>(files_[0].size()) >=
          mutable_cf_options.level0_file_num_compaction_trigger ||
      level_size >= mutable_cf_options.max_bytes_for_level_base) {
2288
    level0_compact_triggered = true;
2289 2290
    estimated_compaction_needed_bytes_ = level_size;
    bytes_compact_to_next_level = level_size;
2291 2292 2293 2294 2295
  } else {
    estimated_compaction_needed_bytes_ = 0;
  }

  // Level 1 and up.
2296
  uint64_t bytes_next_level = 0;
2297
  for (int level = base_level(); level <= MaxInputLevel(); level++) {
2298
    level_size = 0;
2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312
    if (bytes_next_level > 0) {
#ifndef NDEBUG
      uint64_t level_size2 = 0;
      for (auto* f : files_[level]) {
        level_size2 += f->fd.GetFileSize();
      }
      assert(level_size2 == bytes_next_level);
#endif
      level_size = bytes_next_level;
      bytes_next_level = 0;
    } else {
      for (auto* f : files_[level]) {
        level_size += f->fd.GetFileSize();
      }
2313 2314 2315 2316 2317 2318 2319 2320
    }
    if (level == base_level() && level0_compact_triggered) {
      // Add base level size to compaction if level0 compaction triggered.
      estimated_compaction_needed_bytes_ += level_size;
    }
    // Add size added by previous compaction
    level_size += bytes_compact_to_next_level;
    bytes_compact_to_next_level = 0;
I
Igor Canadi 已提交
2321
    uint64_t level_target = MaxBytesForLevel(level);
2322 2323
    if (level_size > level_target) {
      bytes_compact_to_next_level = level_size - level_target;
2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340
      // Estimate the actual compaction fan-out ratio as size ratio between
      // the two levels.

      assert(bytes_next_level == 0);
      if (level + 1 < num_levels_) {
        for (auto* f : files_[level + 1]) {
          bytes_next_level += f->fd.GetFileSize();
        }
      }
      if (bytes_next_level > 0) {
        assert(level_size > 0);
        estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
            static_cast<double>(bytes_compact_to_next_level) *
            (static_cast<double>(bytes_next_level) /
                 static_cast<double>(level_size) +
             1));
      }
2341 2342 2343 2344
    }
  }
}

S
Sagar Vemuri 已提交
2345 2346
namespace {
uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
2347
                                 const MutableCFOptions& mutable_cf_options,
S
Sagar Vemuri 已提交
2348 2349 2350 2351 2352 2353 2354
                                 const std::vector<FileMetaData*>& files) {
  uint32_t ttl_expired_files_count = 0;

  int64_t _current_time;
  auto status = ioptions.env->GetCurrentTime(&_current_time);
  if (status.ok()) {
    const uint64_t current_time = static_cast<uint64_t>(_current_time);
2355 2356 2357 2358 2359
    for (FileMetaData* f : files) {
      if (!f->being_compacted) {
        uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
        if (oldest_ancester_time != 0 &&
            oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
S
Sagar Vemuri 已提交
2360 2361 2362 2363 2364 2365 2366 2367 2368
          ttl_expired_files_count++;
        }
      }
    }
  }
  return ttl_expired_files_count;
}
}  // anonymous namespace

S
sdong 已提交
2369
void VersionStorageInfo::ComputeCompactionScore(
Y
Yi Wu 已提交
2370
    const ImmutableCFOptions& immutable_cf_options,
2371
    const MutableCFOptions& mutable_cf_options) {
S
sdong 已提交
2372
  for (int level = 0; level <= MaxInputLevel(); level++) {
2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385
    double score;
    if (level == 0) {
      // We treat level-0 specially by bounding the number of files
      // instead of number of bytes for two reasons:
      //
      // (1) With larger write-buffer sizes, it is nice not to do too
      // many level-0 compactions.
      //
      // (2) The files in level-0 are merged on every read and
      // therefore we wish to avoid too many files when the individual
      // file size is small (perhaps because of a small write-buffer
      // setting, or very high compression ratios, or lots of
      // overwrites/deletions).
S
sdong 已提交
2386
      int num_sorted_runs = 0;
I
Igor Canadi 已提交
2387
      uint64_t total_size = 0;
2388 2389 2390
      for (auto* f : files_[level]) {
        if (!f->being_compacted) {
          total_size += f->compensated_file_size;
S
sdong 已提交
2391
          num_sorted_runs++;
2392 2393
        }
      }
S
sdong 已提交
2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404
      if (compaction_style_ == kCompactionStyleUniversal) {
        // For universal compaction, we use level0 score to indicate
        // compaction score for the whole DB. Adding other levels as if
        // they are L0 files.
        for (int i = 1; i < num_levels(); i++) {
          if (!files_[i].empty() && !files_[i][0]->being_compacted) {
            num_sorted_runs++;
          }
        }
      }

S
sdong 已提交
2405
      if (compaction_style_ == kCompactionStyleFIFO) {
2406 2407 2408
        score = static_cast<double>(total_size) /
                mutable_cf_options.compaction_options_fifo.max_table_files_size;
        if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
2409 2410 2411 2412 2413
          score = std::max(
              static_cast<double>(num_sorted_runs) /
                  mutable_cf_options.level0_file_num_compaction_trigger,
              score);
        }
2414
        if (mutable_cf_options.ttl > 0) {
2415 2416 2417 2418
          score = std::max(
              static_cast<double>(GetExpiredTtlFilesCount(
                  immutable_cf_options, mutable_cf_options, files_[level])),
              score);
S
Sagar Vemuri 已提交
2419
        }
2420

2421
      } else {
S
sdong 已提交
2422
        score = static_cast<double>(num_sorted_runs) /
2423
                mutable_cf_options.level0_file_num_compaction_trigger;
A
Andrew Kryczka 已提交
2424 2425 2426 2427 2428
        if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
          // Level-based involves L0->L0 compactions that can lead to oversized
          // L0 files. Take into account size as well to avoid later giant
          // compactions to the base level.
          score = std::max(
2429 2430
              score, static_cast<double>(total_size) /
                     mutable_cf_options.max_bytes_for_level_base);
A
Andrew Kryczka 已提交
2431
        }
2432 2433 2434
      }
    } else {
      // Compute the ratio of current size to size limit.
I
Igor Canadi 已提交
2435 2436
      uint64_t level_bytes_no_compacting = 0;
      for (auto f : files_[level]) {
2437
        if (!f->being_compacted) {
I
Igor Canadi 已提交
2438 2439 2440 2441
          level_bytes_no_compacting += f->compensated_file_size;
        }
      }
      score = static_cast<double>(level_bytes_no_compacting) /
2442
              MaxBytesForLevel(level);
2443 2444 2445 2446 2447 2448 2449
    }
    compaction_level_[level] = level;
    compaction_score_[level] = score;
  }

  // sort all the levels based on their score. Higher scores get listed
  // first. Use bubble sort because the number of entries are small.
2450 2451
  for (int i = 0; i < num_levels() - 2; i++) {
    for (int j = i + 1; j < num_levels() - 1; j++) {
2452 2453 2454 2455 2456 2457 2458 2459 2460 2461
      if (compaction_score_[i] < compaction_score_[j]) {
        double score = compaction_score_[i];
        int level = compaction_level_[i];
        compaction_score_[i] = compaction_score_[j];
        compaction_level_[i] = compaction_level_[j];
        compaction_score_[j] = score;
        compaction_level_[j] = level;
      }
    }
  }
2462
  ComputeFilesMarkedForCompaction();
2463
  ComputeBottommostFilesMarkedForCompaction();
2464 2465
  if (mutable_cf_options.ttl > 0) {
    ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
S
Sagar Vemuri 已提交
2466
  }
2467
  if (mutable_cf_options.periodic_compaction_seconds > 0) {
S
Sagar Vemuri 已提交
2468 2469 2470
    ComputeFilesMarkedForPeriodicCompaction(
        immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
  }
2471
  EstimateCompactionBytesNeeded(mutable_cf_options);
2472 2473 2474 2475
}

void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
  files_marked_for_compaction_.clear();
2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488
  int last_qualify_level = 0;

  // Do not include files from the last level with data
  // If table properties collector suggests a file on the last level,
  // we should not move it to a new level.
  for (int level = num_levels() - 1; level >= 1; level--) {
    if (!files_[level].empty()) {
      last_qualify_level = level - 1;
      break;
    }
  }

  for (int level = 0; level <= last_qualify_level; level++) {
2489 2490 2491 2492 2493 2494
    for (auto* f : files_[level]) {
      if (!f->being_compacted && f->marked_for_compaction) {
        files_marked_for_compaction_.emplace_back(level, f);
      }
    }
  }
2495 2496
}

S
Sagar Vemuri 已提交
2497
void VersionStorageInfo::ComputeExpiredTtlFiles(
2498 2499
    const ImmutableCFOptions& ioptions, const uint64_t ttl) {
  assert(ttl > 0);
S
Sagar Vemuri 已提交
2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510

  expired_ttl_files_.clear();

  int64_t _current_time;
  auto status = ioptions.env->GetCurrentTime(&_current_time);
  if (!status.ok()) {
    return;
  }
  const uint64_t current_time = static_cast<uint64_t>(_current_time);

  for (int level = 0; level < num_levels() - 1; level++) {
2511 2512 2513 2514 2515
    for (FileMetaData* f : files_[level]) {
      if (!f->being_compacted) {
        uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
        if (oldest_ancester_time > 0 &&
            oldest_ancester_time < (current_time - ttl)) {
S
Sagar Vemuri 已提交
2516 2517 2518 2519 2520 2521 2522
          expired_ttl_files_.emplace_back(level, f);
        }
      }
    }
  }
}

S
Sagar Vemuri 已提交
2523 2524 2525
void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
    const ImmutableCFOptions& ioptions,
    const uint64_t periodic_compaction_seconds) {
2526
  assert(periodic_compaction_seconds > 0);
S
Sagar Vemuri 已提交
2527 2528 2529 2530 2531 2532 2533 2534 2535

  files_marked_for_periodic_compaction_.clear();

  int64_t temp_current_time;
  auto status = ioptions.env->GetCurrentTime(&temp_current_time);
  if (!status.ok()) {
    return;
  }
  const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2536

2537 2538
  // If periodic_compaction_seconds is larger than current time, periodic
  // compaction can't possibly be triggered.
2539 2540 2541
  if (periodic_compaction_seconds > current_time) {
    return;
  }
2542

S
Sagar Vemuri 已提交
2543 2544 2545 2546 2547
  const uint64_t allowed_time_limit =
      current_time - periodic_compaction_seconds;

  for (int level = 0; level < num_levels(); level++) {
    for (auto f : files_[level]) {
2548
      if (!f->being_compacted) {
2549 2550 2551 2552 2553 2554
        // Compute a file's modification time in the following order:
        // 1. Use file_creation_time table property if it is > 0.
        // 2. Use creation_time table property if it is > 0.
        // 3. Use file's mtime metadata if the above two table properties are 0.
        // Don't consider the file at all if the modification time cannot be
        // correctly determined based on the above conditions.
2555 2556
        uint64_t file_modification_time = f->TryGetFileCreationTime();
        if (file_modification_time == kUnknownFileCreationTime) {
2557
          file_modification_time = f->TryGetOldestAncesterTime();
2558
        }
2559
        if (file_modification_time == kUnknownOldestAncesterTime) {
2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572
          auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
                                         f->fd.GetPathId());
          status = ioptions.env->GetFileModificationTime(
              file_path, &file_modification_time);
          if (!status.ok()) {
            ROCKS_LOG_WARN(ioptions.info_log,
                           "Can't get file modification time: %s: %s",
                           file_path.c_str(), status.ToString().c_str());
            continue;
          }
        }
        if (file_modification_time > 0 &&
            file_modification_time < allowed_time_limit) {
S
Sagar Vemuri 已提交
2573 2574 2575 2576 2577 2578 2579
          files_marked_for_periodic_compaction_.emplace_back(level, f);
        }
      }
    }
  }
}

2580
namespace {
2581 2582 2583

// used to sort files by size
struct Fsize {
2584
  size_t index;
2585 2586 2587
  FileMetaData* file;
};

2588 2589
// Compator that is used to sort files based on their size
// In normal mode: descending size
2590
bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
2591 2592
  return (first.file->compensated_file_size >
      second.file->compensated_file_size);
2593
}
K
kailiu 已提交
2594
} // anonymous namespace
2595

2596
void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
S
sdong 已提交
2597 2598
  auto* level_files = &files_[level];
  // Must not overlap
2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615
#ifndef NDEBUG
  if (level > 0 && !level_files->empty() &&
      internal_comparator_->Compare(
          (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
    auto* f2 = (*level_files)[level_files->size() - 1];
    if (info_log != nullptr) {
      Error(info_log, "Adding new file %" PRIu64
                      " range (%s, %s) to level %d but overlapping "
                      "with existing file %" PRIu64 " %s %s",
            f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
            f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
            f2->smallest.DebugString(true).c_str(),
            f2->largest.DebugString(true).c_str());
      LogFlush(info_log);
    }
    assert(false);
  }
2616 2617
#else
  (void)info_log;
2618
#endif
S
sdong 已提交
2619 2620 2621 2622
  f->refs++;
  level_files->push_back(f);
}

2623 2624 2625 2626
// Version::PrepareApply() need to be called before calling the function, or
// following functions called:
// 1. UpdateNumNonEmptyLevels();
// 2. CalculateBaseBytes();
2627
// 3. UpdateFilesByCompactionPri();
2628 2629
// 4. GenerateFileIndexer();
// 5. GenerateLevelFilesBrief();
2630
// 6. GenerateLevel0NonOverlapping();
2631
// 7. GenerateBottommostFiles();
2632 2633 2634
void VersionStorageInfo::SetFinalized() {
  finalized_ = true;
#ifndef NDEBUG
S
sdong 已提交
2635 2636 2637 2638 2639 2640
  if (compaction_style_ != kCompactionStyleLevel) {
    // Not level based compaction.
    return;
  }
  assert(base_level_ < 0 || num_levels() == 1 ||
         (base_level_ >= 1 && base_level_ < num_levels()));
2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668
  // Verify all levels newer than base_level are empty except L0
  for (int level = 1; level < base_level(); level++) {
    assert(NumLevelBytes(level) == 0);
  }
  uint64_t max_bytes_prev_level = 0;
  for (int level = base_level(); level < num_levels() - 1; level++) {
    if (LevelFiles(level).size() == 0) {
      continue;
    }
    assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
    max_bytes_prev_level = MaxBytesForLevel(level);
  }
  int num_empty_non_l0_level = 0;
  for (int level = 0; level < num_levels(); level++) {
    assert(LevelFiles(level).size() == 0 ||
           LevelFiles(level).size() == LevelFilesBrief(level).num_files);
    if (level > 0 && NumLevelBytes(level) > 0) {
      num_empty_non_l0_level++;
    }
    if (LevelFiles(level).size() > 0) {
      assert(level < num_non_empty_levels());
    }
  }
  assert(compaction_level_.size() > 0);
  assert(compaction_level_.size() == compaction_score_.size());
#endif
}

S
sdong 已提交
2669
void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2670 2671 2672 2673 2674 2675 2676 2677 2678 2679
  num_non_empty_levels_ = num_levels_;
  for (int i = num_levels_ - 1; i >= 0; i--) {
    if (files_[i].size() != 0) {
      return;
    } else {
      num_non_empty_levels_ = i;
    }
  }
}

2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707
namespace {
// Sort `temp` based on ratio of overlapping size over file size
void SortFileByOverlappingRatio(
    const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
    const std::vector<FileMetaData*>& next_level_files,
    std::vector<Fsize>* temp) {
  std::unordered_map<uint64_t, uint64_t> file_to_order;
  auto next_level_it = next_level_files.begin();

  for (auto& file : files) {
    uint64_t overlapping_bytes = 0;
    // Skip files in next level that is smaller than current file
    while (next_level_it != next_level_files.end() &&
           icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
      next_level_it++;
    }

    while (next_level_it != next_level_files.end() &&
           icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
      overlapping_bytes += (*next_level_it)->fd.file_size;

      if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
        // next level file cross large boundary of current file.
        break;
      }
      next_level_it++;
    }

2708
    assert(file->compensated_file_size != 0);
2709
    file_to_order[file->fd.GetNumber()] =
2710
        overlapping_bytes * 1024u / file->compensated_file_size;
2711 2712 2713 2714 2715 2716 2717 2718 2719 2720
  }

  std::sort(temp->begin(), temp->end(),
            [&](const Fsize& f1, const Fsize& f2) -> bool {
              return file_to_order[f1.file->fd.GetNumber()] <
                     file_to_order[f2.file->fd.GetNumber()];
            });
}
}  // namespace

2721
void VersionStorageInfo::UpdateFilesByCompactionPri(
Y
Yi Wu 已提交
2722
    CompactionPri compaction_pri) {
2723 2724
  if (compaction_style_ == kCompactionStyleNone ||
      compaction_style_ == kCompactionStyleFIFO ||
S
sdong 已提交
2725
      compaction_style_ == kCompactionStyleUniversal) {
I
Igor Canadi 已提交
2726 2727 2728
    // don't need this
    return;
  }
2729
  // No need to sort the highest level because it is never compacted.
2730
  for (int level = 0; level < num_levels() - 1; level++) {
2731
    const std::vector<FileMetaData*>& files = files_[level];
2732 2733
    auto& files_by_compaction_pri = files_by_compaction_pri_[level];
    assert(files_by_compaction_pri.size() == 0);
2734 2735 2736

    // populate a temp vector for sorting based on size
    std::vector<Fsize> temp(files.size());
2737
    for (size_t i = 0; i < files.size(); i++) {
2738 2739 2740 2741
      temp[i].index = i;
      temp[i].file = files[i];
    }

S
sdong 已提交
2742 2743
    // sort the top number_of_files_to_sort_ based on file size
    size_t num = VersionStorageInfo::kNumberFilesToSort;
2744 2745
    if (num > temp.size()) {
      num = temp.size();
2746
    }
Y
Yi Wu 已提交
2747
    switch (compaction_pri) {
2748
      case kByCompensatedSize:
2749 2750 2751
        std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
                          CompareCompensatedSizeDescending);
        break;
2752
      case kOldestLargestSeqFirst:
2753
        std::sort(temp.begin(), temp.end(),
2754
                  [](const Fsize& f1, const Fsize& f2) -> bool {
2755 2756
                    return f1.file->fd.largest_seqno <
                           f2.file->fd.largest_seqno;
2757 2758
                  });
        break;
2759 2760
      case kOldestSmallestSeqFirst:
        std::sort(temp.begin(), temp.end(),
2761
                  [](const Fsize& f1, const Fsize& f2) -> bool {
2762 2763
                    return f1.file->fd.smallest_seqno <
                           f2.file->fd.smallest_seqno;
2764 2765
                  });
        break;
2766 2767 2768 2769
      case kMinOverlappingRatio:
        SortFileByOverlappingRatio(*internal_comparator_, files_[level],
                                   files_[level + 1], &temp);
        break;
2770 2771 2772
      default:
        assert(false);
    }
2773 2774
    assert(temp.size() == files.size());

2775
    // initialize files_by_compaction_pri_
2776 2777
    for (size_t i = 0; i < temp.size(); i++) {
      files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2778 2779
    }
    next_file_to_compact_by_size_[level] = 0;
2780
    assert(files_[level].size() == files_by_compaction_pri_[level].size());
2781 2782 2783
  }
}

2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794
void VersionStorageInfo::GenerateLevel0NonOverlapping() {
  assert(!finalized_);
  level0_non_overlapping_ = true;
  if (level_files_brief_.size() == 0) {
    return;
  }

  // A copy of L0 files sorted by smallest key
  std::vector<FdWithKeyRange> level0_sorted_file(
      level_files_brief_[0].files,
      level_files_brief_[0].files + level_files_brief_[0].num_files);
2795 2796 2797 2798 2799
  std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
            [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
              return (internal_comparator_->Compare(f1.smallest_key,
                                                    f2.smallest_key) < 0);
            });
2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810

  for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
    FdWithKeyRange& f = level0_sorted_file[i];
    FdWithKeyRange& prev = level0_sorted_file[i - 1];
    if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
      level0_non_overlapping_ = false;
      break;
    }
  }
}

2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823
void VersionStorageInfo::GenerateBottommostFiles() {
  assert(!finalized_);
  assert(bottommost_files_.empty());
  for (size_t level = 0; level < level_files_brief_.size(); ++level) {
    for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
         ++file_idx) {
      const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
      int l0_file_idx;
      if (level == 0) {
        l0_file_idx = static_cast<int>(file_idx);
      } else {
        l0_file_idx = -1;
      }
2824 2825 2826
      Slice smallest_user_key = ExtractUserKey(f.smallest_key);
      Slice largest_user_key = ExtractUserKey(f.largest_key);
      if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848
                                         static_cast<int>(level),
                                         l0_file_idx)) {
        bottommost_files_.emplace_back(static_cast<int>(level),
                                       f.file_metadata);
      }
    }
  }
}

void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
  assert(seqnum >= oldest_snapshot_seqnum_);
  oldest_snapshot_seqnum_ = seqnum;
  if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
    ComputeBottommostFilesMarkedForCompaction();
  }
}

void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
  bottommost_files_marked_for_compaction_.clear();
  bottommost_files_mark_threshold_ = kMaxSequenceNumber;
  for (auto& level_and_file : bottommost_files_) {
    if (!level_and_file.second->being_compacted &&
2849
        level_and_file.second->fd.largest_seqno != 0 &&
2850 2851 2852 2853
        level_and_file.second->num_deletions > 1) {
      // largest_seqno might be nonzero due to containing the final key in an
      // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
      // ensures the file really contains deleted or overwritten keys.
2854
      if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
2855 2856 2857 2858
        bottommost_files_marked_for_compaction_.push_back(level_and_file);
      } else {
        bottommost_files_mark_threshold_ =
            std::min(bottommost_files_mark_threshold_,
2859
                     level_and_file.second->fd.largest_seqno);
2860 2861 2862 2863 2864
      }
    }
  }
}

J
jorlow@chromium.org 已提交
2865 2866 2867 2868
void Version::Ref() {
  ++refs_;
}

2869
bool Version::Unref() {
J
jorlow@chromium.org 已提交
2870 2871 2872
  assert(refs_ >= 1);
  --refs_;
  if (refs_ == 0) {
2873
    delete this;
2874
    return true;
J
jorlow@chromium.org 已提交
2875
  }
2876
  return false;
J
jorlow@chromium.org 已提交
2877 2878
}

S
sdong 已提交
2879 2880 2881
bool VersionStorageInfo::OverlapInLevel(int level,
                                        const Slice* smallest_user_key,
                                        const Slice* largest_user_key) {
2882 2883 2884 2885
  if (level >= num_non_empty_levels_) {
    // empty level, no overlap
    return false;
  }
S
sdong 已提交
2886
  return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2887
                               level_files_brief_[level], smallest_user_key,
2888
                               largest_user_key);
G
Gabor Cselle 已提交
2889 2890 2891
}

// Store in "*inputs" all files in "level" that overlap [begin,end]
A
Abhishek Kona 已提交
2892
// If hint_index is specified, then it points to a file in the
2893 2894
// overlapping range.
// The file_index returns a pointer to any file in an overlapping range.
S
sdong 已提交
2895 2896
void VersionStorageInfo::GetOverlappingInputs(
    int level, const InternalKey* begin, const InternalKey* end,
2897
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2898
    bool expand_range, InternalKey** next_smallest) const {
2899 2900 2901 2902 2903
  if (level >= num_non_empty_levels_) {
    // this level is empty, no overlapping inputs
    return;
  }

G
Gabor Cselle 已提交
2904
  inputs->clear();
2905 2906 2907
  if (file_index) {
    *file_index = -1;
  }
S
sdong 已提交
2908
  const Comparator* user_cmp = user_comparator_;
2909
  if (level > 0) {
2910 2911
    GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
                                          file_index, false, next_smallest);
2912 2913
    return;
  }
A
Aaron Gao 已提交
2914

2915 2916 2917 2918 2919 2920
  if (next_smallest) {
    // next_smallest key only makes sense for non-level 0, where files are
    // non-overlapping
    *next_smallest = nullptr;
  }

2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941
  Slice user_begin, user_end;
  if (begin != nullptr) {
    user_begin = begin->user_key();
  }
  if (end != nullptr) {
    user_end = end->user_key();
  }

  // index stores the file index need to check.
  std::list<size_t> index;
  for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
    index.emplace_back(i);
  }

  while (!index.empty()) {
    bool found_overlapping_file = false;
    auto iter = index.begin();
    while (iter != index.end()) {
      FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
      const Slice file_start = ExtractUserKey(f->smallest_key);
      const Slice file_limit = ExtractUserKey(f->largest_key);
2942 2943
      if (begin != nullptr &&
          user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
2944 2945
        // "f" is completely before specified range; skip it
        iter++;
2946
      } else if (end != nullptr &&
2947
                 user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961
        // "f" is completely after specified range; skip it
        iter++;
      } else {
        // if overlap
        inputs->emplace_back(files_[level][*iter]);
        found_overlapping_file = true;
        // record the first file index.
        if (file_index && *file_index == -1) {
          *file_index = static_cast<int>(*iter);
        }
        // the related file is overlap, erase to avoid checking again.
        iter = index.erase(iter);
        if (expand_range) {
          if (begin != nullptr &&
2962
              user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
2963 2964
            user_begin = file_start;
          }
2965 2966
          if (end != nullptr &&
              user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
2967 2968
            user_end = file_limit;
          }
H
Hans Wennborg 已提交
2969 2970
        }
      }
G
Gabor Cselle 已提交
2971
    }
2972 2973 2974 2975
    // if all the files left are not overlap, break
    if (!found_overlapping_file) {
      break;
    }
G
Gabor Cselle 已提交
2976
  }
2977 2978
}

A
Aaron Gao 已提交
2979 2980 2981 2982 2983 2984 2985 2986 2987
// Store in "*inputs" files in "level" that within range [begin,end]
// Guarantee a "clean cut" boundary between the files in inputs
// and the surrounding files and the maxinum number of files.
// This will ensure that no parts of a key are lost during compaction.
// If hint_index is specified, then it points to a file in the range.
// The file_index returns a pointer to any file in an overlapping range.
void VersionStorageInfo::GetCleanInputsWithinInterval(
    int level, const InternalKey* begin, const InternalKey* end,
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
2988 2989 2990 2991 2992 2993
  inputs->clear();
  if (file_index) {
    *file_index = -1;
  }
  if (level >= num_non_empty_levels_ || level == 0 ||
      level_files_brief_[level].num_files == 0) {
A
Aaron Gao 已提交
2994
    // this level is empty, no inputs within range
2995
    // also don't support clean input interval within L0
A
Aaron Gao 已提交
2996 2997 2998
    return;
  }

2999
  GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
3000 3001
                                        hint_index, file_index,
                                        true /* within_interval */);
A
Aaron Gao 已提交
3002 3003
}

3004 3005 3006 3007
// Store in "*inputs" all files in "level" that overlap [begin,end]
// Employ binary search to find at least one file that overlaps the
// specified range. From that file, iterate backwards and
// forwards to find all overlapping files.
A
Aaron Gao 已提交
3008 3009 3010 3011
// if within_range is set, then only store the maximum clean inputs
// within range [begin, end]. "clean" means there is a boudnary
// between the files in "*inputs" and the surrounding files
void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3012
    int level, const InternalKey* begin, const InternalKey* end,
A
Aaron Gao 已提交
3013
    std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3014
    bool within_interval, InternalKey** next_smallest) const {
3015
  assert(level > 0);
3016

3017
  auto user_cmp = user_comparator_;
3018 3019
  const FdWithKeyRange* files = level_files_brief_[level].files;
  const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3020

3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075
  // begin to use binary search to find lower bound
  // and upper bound.
  int start_index = 0;
  int end_index = num_files;

  if (begin != nullptr) {
    // if within_interval is true, with file_key would find
    // not overlapping ranges in std::lower_bound.
    auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
                                             const InternalKey* k) {
      auto& file_key = within_interval ? f.file_metadata->smallest
                                       : f.file_metadata->largest;
      return sstableKeyCompare(user_cmp, file_key, *k) < 0;
    };

    start_index = static_cast<int>(
        std::lower_bound(files,
                         files + (hint_index == -1 ? num_files : hint_index),
                         begin, cmp) -
        files);

    if (start_index > 0 && within_interval) {
      bool is_overlapping = true;
      while (is_overlapping && start_index < num_files) {
        auto& pre_limit = files[start_index - 1].file_metadata->largest;
        auto& cur_start = files[start_index].file_metadata->smallest;
        is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
        start_index += is_overlapping;
      }
    }
  }

  if (end != nullptr) {
    // if within_interval is true, with file_key would find
    // not overlapping ranges in std::upper_bound.
    auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
                                             const FdWithKeyRange& f) {
      auto& file_key = within_interval ? f.file_metadata->largest
                                       : f.file_metadata->smallest;
      return sstableKeyCompare(user_cmp, *k, file_key) < 0;
    };

    end_index = static_cast<int>(
        std::upper_bound(files + start_index, files + num_files, end, cmp) -
        files);

    if (end_index < num_files && within_interval) {
      bool is_overlapping = true;
      while (is_overlapping && end_index > start_index) {
        auto& next_start = files[end_index].file_metadata->smallest;
        auto& cur_limit = files[end_index - 1].file_metadata->largest;
        is_overlapping =
            sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
        end_index -= is_overlapping;
      }
3076 3077
    }
  }
A
Abhishek Kona 已提交
3078

3079 3080
  assert(start_index <= end_index);

3081
  // If there were no overlapping files, return immediately.
3082
  if (start_index == end_index) {
3083
    if (next_smallest) {
3084
      *next_smallest = nullptr;
3085
    }
3086 3087
    return;
  }
3088 3089 3090

  assert(start_index < end_index);

3091 3092
  // returns the index where an overlap is found
  if (file_index) {
3093
    *file_index = start_index;
3094
  }
A
Aaron Gao 已提交
3095 3096

  // insert overlapping files into vector
3097
  for (int i = start_index; i < end_index; i++) {
A
Aaron Gao 已提交
3098 3099
    inputs->push_back(files_[level][i]);
  }
3100 3101 3102

  if (next_smallest != nullptr) {
    // Provide the next key outside the range covered by inputs
3103
    if (end_index < static_cast<int>(files_[level].size())) {
3104 3105 3106 3107 3108
      **next_smallest = files_[level][end_index]->smallest;
    } else {
      *next_smallest = nullptr;
    }
  }
3109
}
A
Abhishek Kona 已提交
3110

S
sdong 已提交
3111
uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3112
  assert(level >= 0);
3113
  assert(level < num_levels());
3114 3115 3116
  return TotalFileSize(files_[level]);
}

S
sdong 已提交
3117 3118
const char* VersionStorageInfo::LevelSummary(
    LevelSummaryStorage* scratch) const {
3119
  int len = 0;
S
sdong 已提交
3120
  if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3121
    assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3122 3123 3124 3125 3126 3127
    if (level_multiplier_ != 0.0) {
      len = snprintf(
          scratch->buffer, sizeof(scratch->buffer),
          "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
          base_level_, level_multiplier_, level_max_bytes_[base_level_]);
    }
3128 3129
  }
  len +=
S
sdong 已提交
3130
      snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3131
  for (int i = 0; i < num_levels(); i++) {
3132 3133 3134 3135 3136
    int sz = sizeof(scratch->buffer) - len;
    int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
    if (ret < 0 || ret >= sz) break;
    len += ret;
  }
I
Igor Canadi 已提交
3137 3138 3139 3140
  if (len > 0) {
    // overwrite the last space
    --len;
  }
3141 3142 3143 3144 3145
  len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
                  "] max score %.2f", compaction_score_[0]);

  if (!files_marked_for_compaction_.empty()) {
    snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3146
             " (%" ROCKSDB_PRIszt " files need compaction)",
3147 3148 3149
             files_marked_for_compaction_.size());
  }

3150 3151 3152
  return scratch->buffer;
}

S
sdong 已提交
3153 3154
const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
                                                 int level) const {
3155 3156 3157
  int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
  for (const auto& f : files_[level]) {
    int sz = sizeof(scratch->buffer) - len;
I
Igor Canadi 已提交
3158
    char sztxt[16];
3159
    AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3160
    int ret = snprintf(scratch->buffer + len, sz,
3161
                       "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3162
                       f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
I
Igor Canadi 已提交
3163
                       static_cast<int>(f->being_compacted));
3164 3165 3166 3167
    if (ret < 0 || ret >= sz)
      break;
    len += ret;
  }
I
Igor Canadi 已提交
3168 3169 3170 3171
  // overwrite the last space (only if files_[level].size() is non-zero)
  if (files_[level].size() && len > 0) {
    --len;
  }
3172 3173 3174 3175
  snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
  return scratch->buffer;
}

S
sdong 已提交
3176
int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3177 3178
  uint64_t result = 0;
  std::vector<FileMetaData*> overlaps;
3179
  for (int level = 1; level < num_levels() - 1; level++) {
3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190
    for (const auto& f : files_[level]) {
      GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
      const uint64_t sum = TotalFileSize(overlaps);
      if (sum > result) {
        result = sum;
      }
    }
  }
  return result;
}

3191 3192 3193 3194 3195 3196 3197 3198 3199 3200
uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
  // Note: the result for level zero is not really used since we set
  // the level-0 compaction threshold based on number of files.
  assert(level >= 0);
  assert(level < static_cast<int>(level_max_bytes_.size()));
  return level_max_bytes_[level];
}

void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
                                            const MutableCFOptions& options) {
S
sdong 已提交
3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215
  // Special logic to set number of sorted runs.
  // It is to match the previous behavior when all files are in L0.
  int num_l0_count = static_cast<int>(files_[0].size());
  if (compaction_style_ == kCompactionStyleUniversal) {
    // For universal compaction, we use level0 score to indicate
    // compaction score for the whole DB. Adding other levels as if
    // they are L0 files.
    for (int i = 1; i < num_levels(); i++) {
      if (!files_[i].empty()) {
        num_l0_count++;
      }
    }
  }
  set_l0_delay_trigger_count(num_l0_count);

3216 3217
  level_max_bytes_.resize(ioptions.num_levels);
  if (!ioptions.level_compaction_dynamic_level_bytes) {
S
sdong 已提交
3218
    base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3219 3220 3221 3222 3223 3224 3225 3226 3227

    // Calculate for static bytes base case
    for (int i = 0; i < ioptions.num_levels; ++i) {
      if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
        level_max_bytes_[i] = options.max_bytes_for_level_base;
      } else if (i > 1) {
        level_max_bytes_[i] = MultiplyCheckOverflow(
            MultiplyCheckOverflow(level_max_bytes_[i - 1],
                                  options.max_bytes_for_level_multiplier),
S
sdong 已提交
3228
            options.MaxBytesMultiplerAdditional(i - 1));
3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262
      } else {
        level_max_bytes_[i] = options.max_bytes_for_level_base;
      }
    }
  } else {
    uint64_t max_level_size = 0;

    int first_non_empty_level = -1;
    // Find size of non-L0 level of most data.
    // Cannot use the size of the last level because it can be empty or less
    // than previous levels after compaction.
    for (int i = 1; i < num_levels_; i++) {
      uint64_t total_size = 0;
      for (const auto& f : files_[i]) {
        total_size += f->fd.GetFileSize();
      }
      if (total_size > 0 && first_non_empty_level == -1) {
        first_non_empty_level = i;
      }
      if (total_size > max_level_size) {
        max_level_size = total_size;
      }
    }

    // Prefill every level's max bytes to disallow compaction from there.
    for (int i = 0; i < num_levels_; i++) {
      level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
    }

    if (max_level_size == 0) {
      // No data for L1 and up. L0 compacts to last level directly.
      // No compaction from L1+ needs to be scheduled.
      base_level_ = num_levels_ - 1;
    } else {
3263 3264 3265 3266 3267 3268 3269
      uint64_t l0_size = 0;
      for (const auto& f : files_[0]) {
        l0_size += f->fd.GetFileSize();
      }

      uint64_t base_bytes_max =
          std::max(options.max_bytes_for_level_base, l0_size);
3270 3271
      uint64_t base_bytes_min = static_cast<uint64_t>(
          base_bytes_max / options.max_bytes_for_level_multiplier);
3272 3273 3274 3275 3276

      // Try whether we can make last level's target size to be max_level_size
      uint64_t cur_level_size = max_level_size;
      for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
        // Round up after dividing
3277 3278
        cur_level_size = static_cast<uint64_t>(
            cur_level_size / options.max_bytes_for_level_multiplier);
3279 3280 3281
      }

      // Calculate base level and its size.
3282
      uint64_t base_level_size;
3283 3284 3285 3286
      if (cur_level_size <= base_bytes_min) {
        // Case 1. If we make target size of last level to be max_level_size,
        // target size of the first non-empty level would be smaller than
        // base_bytes_min. We set it be base_bytes_min.
3287
        base_level_size = base_bytes_min + 1U;
3288
        base_level_ = first_non_empty_level;
3289
        ROCKS_LOG_INFO(ioptions.info_log,
3290 3291
                       "More existing levels in DB than needed. "
                       "max_bytes_for_level_multiplier may not be guaranteed.");
3292 3293 3294 3295 3296
      } else {
        // Find base level (where L0 data is compacted to).
        base_level_ = first_non_empty_level;
        while (base_level_ > 1 && cur_level_size > base_bytes_max) {
          --base_level_;
3297 3298
          cur_level_size = static_cast<uint64_t>(
              cur_level_size / options.max_bytes_for_level_multiplier);
3299 3300 3301 3302
        }
        if (cur_level_size > base_bytes_max) {
          // Even L1 will be too large
          assert(base_level_ == 1);
3303
          base_level_size = base_bytes_max;
3304
        } else {
3305
          base_level_size = cur_level_size;
3306 3307 3308
        }
      }

3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331
      level_multiplier_ = options.max_bytes_for_level_multiplier;
      assert(base_level_size > 0);
      if (l0_size > base_level_size &&
          (l0_size > options.max_bytes_for_level_base ||
           static_cast<int>(files_[0].size() / 2) >=
               options.level0_file_num_compaction_trigger)) {
        // We adjust the base level according to actual L0 size, and adjust
        // the level multiplier accordingly, when:
        //   1. the L0 size is larger than level size base, or
        //   2. number of L0 files reaches twice the L0->L1 compaction trigger
        // We don't do this otherwise to keep the LSM-tree structure stable
        // unless the L0 compation is backlogged.
        base_level_size = l0_size;
        if (base_level_ == num_levels_ - 1) {
          level_multiplier_ = 1.0;
        } else {
          level_multiplier_ = std::pow(
              static_cast<double>(max_level_size) /
                  static_cast<double>(base_level_size),
              1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
        }
      }

3332
      uint64_t level_size = base_level_size;
3333 3334
      for (int i = base_level_; i < num_levels_; i++) {
        if (i > base_level_) {
3335
          level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3336
        }
3337 3338 3339 3340 3341
        // Don't set any level below base_bytes_max. Otherwise, the LSM can
        // assume an hourglass shape where L1+ sizes are smaller than L0. This
        // causes compaction scoring, which depends on level sizes, to favor L1+
        // at the expense of L0, which may fill up and stall.
        level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3342 3343 3344 3345 3346
      }
    }
  }
}

A
Andres Notzli 已提交
3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380
uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
  // Estimate the live data size by adding up the size of the last level for all
  // key ranges. Note: Estimate depends on the ordering of files in level 0
  // because files in level 0 can be overlapping.
  uint64_t size = 0;

  auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
    return internal_comparator_->Compare(*x, *y) < 0;
  };
  // (Ordered) map of largest keys in non-overlapping files
  std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);

  for (int l = num_levels_ - 1; l >= 0; l--) {
    bool found_end = false;
    for (auto file : files_[l]) {
      // Find the first file where the largest key is larger than the smallest
      // key of the current file. If this file does not overlap with the
      // current file, none of the files in the map does. If there is
      // no potential overlap, we can safely insert the rest of this level
      // (if the level is not 0) into the map without checking again because
      // the elements in the level are sorted and non-overlapping.
      auto lb = (found_end && l != 0) ?
        ranges.end() : ranges.lower_bound(&file->smallest);
      found_end = (lb == ranges.end());
      if (found_end || internal_comparator_->Compare(
            file->largest, (*lb).second->smallest) < 0) {
          ranges.emplace_hint(lb, &file->largest, file);
          size += file->fd.file_size;
      }
    }
  }
  return size;
}

3381
bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3382 3383
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int last_level, int last_l0_idx) {
3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404
  assert((last_l0_idx != -1) == (last_level == 0));
  // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
  // bottommost only if it's the oldest L0 file and there are no files on older
  // levels. It'd be better to consider it bottommost if there's no overlap in
  // older levels/files.
  if (last_level == 0 &&
      last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
    return true;
  }

  // Checks whether there are files living beyond the `last_level`. If lower
  // levels have files, it checks for overlap between [`smallest_key`,
  // `largest_key`] and those files. Bottomlevel optimizations can be made if
  // there are no files in lower levels or if there is no overlap with the files
  // in the lower levels.
  for (int level = last_level + 1; level < num_levels(); level++) {
    // The range is not in the bottommost level if there are files in lower
    // levels when the `last_level` is 0 or if there are files in lower levels
    // which overlap with [`smallest_key`, `largest_key`].
    if (files_[level].size() > 0 &&
        (last_level == 0 ||
3405
         OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3406 3407 3408 3409 3410
      return true;
    }
  }
  return false;
}
A
Andres Notzli 已提交
3411

3412
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
3413
  for (int level = 0; level < storage_info_.num_levels(); level++) {
S
sdong 已提交
3414
    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3415
    for (const auto& file : files) {
3416
      live->push_back(file->fd);
3417 3418 3419 3420
    }
  }
}

3421
std::string Version::DebugString(bool hex, bool print_stats) const {
J
jorlow@chromium.org 已提交
3422
  std::string r;
S
sdong 已提交
3423
  for (int level = 0; level < storage_info_.num_levels_; level++) {
3424 3425
    // E.g.,
    //   --- level 1 ---
3426 3427
    //   17:123[1 .. 124]['a' .. 'd']
    //   20:43[124 .. 128]['e' .. 'g']
3428 3429
    //
    // if print_stats=true:
3430
    //   17:123[1 .. 124]['a' .. 'd'](4096)
3431
    r.append("--- level ");
J
jorlow@chromium.org 已提交
3432
    AppendNumberTo(&r, level);
3433 3434
    r.append(" --- version# ");
    AppendNumberTo(&r, version_number_);
3435
    r.append(" ---\n");
S
sdong 已提交
3436
    const std::vector<FileMetaData*>& files = storage_info_.files_[level];
D
dgrogan@chromium.org 已提交
3437
    for (size_t i = 0; i < files.size(); i++) {
J
jorlow@chromium.org 已提交
3438
      r.push_back(' ');
3439
      AppendNumberTo(&r, files[i]->fd.GetNumber());
J
jorlow@chromium.org 已提交
3440
      r.push_back(':');
3441
      AppendNumberTo(&r, files[i]->fd.GetFileSize());
G
Gabor Cselle 已提交
3442
      r.append("[");
3443 3444 3445 3446 3447
      AppendNumberTo(&r, files[i]->fd.smallest_seqno);
      r.append(" .. ");
      AppendNumberTo(&r, files[i]->fd.largest_seqno);
      r.append("]");
      r.append("[");
Z
Zheng Shao 已提交
3448
      r.append(files[i]->smallest.DebugString(hex));
G
Gabor Cselle 已提交
3449
      r.append(" .. ");
Z
Zheng Shao 已提交
3450
      r.append(files[i]->largest.DebugString(hex));
3451
      r.append("]");
3452 3453 3454 3455
      if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
        r.append(" blob_file:");
        AppendNumberTo(&r, files[i]->oldest_blob_file_number);
      }
3456 3457 3458 3459 3460 3461 3462
      if (print_stats) {
        r.append("(");
        r.append(ToString(
            files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
        r.append(")");
      }
      r.append("\n");
J
jorlow@chromium.org 已提交
3463 3464 3465 3466 3467
    }
  }
  return r;
}

3468 3469 3470 3471
// this is used to batch writes to the manifest file
struct VersionSet::ManifestWriter {
  Status status;
  bool done;
3472
  InstrumentedCondVar cv;
3473
  ColumnFamilyData* cfd;
3474
  const MutableCFOptions mutable_cf_options;
3475
  const autovector<VersionEdit*>& edit_list;
A
Abhishek Kona 已提交
3476

3477
  explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3478
                          const MutableCFOptions& cf_options,
3479
                          const autovector<VersionEdit*>& e)
3480 3481 3482 3483 3484
      : done(false),
        cv(mu),
        cfd(_cfd),
        mutable_cf_options(cf_options),
        edit_list(e) {}
3485 3486
};

3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502
Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
  assert(edit);
  if (edit->is_in_atomic_group_) {
    TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
    if (replay_buffer_.empty()) {
      replay_buffer_.resize(edit->remaining_entries_ + 1);
      TEST_SYNC_POINT_CALLBACK(
          "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
    }
    read_edits_in_atomic_group_++;
    if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
        static_cast<uint32_t>(replay_buffer_.size())) {
      TEST_SYNC_POINT_CALLBACK(
          "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
      return Status::Corruption("corrupted atomic group");
    }
3503
    replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531
    if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
      TEST_SYNC_POINT_CALLBACK(
          "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
      return Status::OK();
    }
    return Status::OK();
  }

  // A normal edit.
  if (!replay_buffer().empty()) {
    TEST_SYNC_POINT_CALLBACK(
        "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
    return Status::Corruption("corrupted atomic group");
  }
  return Status::OK();
}

bool AtomicGroupReadBuffer::IsFull() const {
  return read_edits_in_atomic_group_ == replay_buffer_.size();
}

bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }

void AtomicGroupReadBuffer::Clear() {
  read_edits_in_atomic_group_ = 0;
  replay_buffer_.clear();
}

3532
VersionSet::VersionSet(const std::string& dbname,
S
Siying Dong 已提交
3533
                       const ImmutableDBOptions* _db_options,
3534
                       const FileOptions& storage_options, Cache* table_cache,
3535
                       WriteBufferManager* write_buffer_manager,
3536 3537 3538 3539 3540
                       WriteController* write_controller,
                       BlockCacheTracer* const block_cache_tracer)
    : column_family_set_(new ColumnFamilySet(
          dbname, _db_options, storage_options, table_cache,
          write_buffer_manager, write_controller, block_cache_tracer)),
S
Siying Dong 已提交
3541
      env_(_db_options->env),
3542
      fs_(_db_options->fs.get()),
J
jorlow@chromium.org 已提交
3543
      dbname_(dbname),
S
Siying Dong 已提交
3544
      db_options_(_db_options),
J
jorlow@chromium.org 已提交
3545 3546
      next_file_number_(2),
      manifest_file_number_(0),  // Filled by Recover()
3547
      options_file_number_(0),
3548
      pending_manifest_file_number_(0),
3549
      last_sequence_(0),
3550
      last_allocated_sequence_(0),
3551
      last_published_sequence_(0),
3552
      prev_log_number_(0),
A
Abhishek Kona 已提交
3553
      current_version_number_(0),
3554
      manifest_file_size_(0),
3555
      file_options_(storage_options),
3556
      block_cache_tracer_(block_cache_tracer) {}
J
jorlow@chromium.org 已提交
3557 3558

VersionSet::~VersionSet() {
3559 3560
  // we need to delete column_family_set_ because its destructor depends on
  // VersionSet
3561
  Cache* table_cache = column_family_set_->get_table_cache();
3562
  column_family_set_.reset();
3563 3564 3565 3566
  for (auto& file : obsolete_files_) {
    if (file.metadata->table_reader_handle) {
      table_cache->Release(file.metadata->table_reader_handle);
      TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
3567
    }
3568
    file.DeleteMetadata();
3569 3570
  }
  obsolete_files_.clear();
3571 3572
}

3573 3574
void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
                               Version* v) {
I
Igor Canadi 已提交
3575 3576
  // compute new compaction score
  v->storage_info()->ComputeCompactionScore(
Y
Yi Wu 已提交
3577
      *column_family_data->ioptions(),
3578
      *column_family_data->GetLatestMutableCFOptions());
I
Igor Canadi 已提交
3579

3580
  // Mark v finalized
S
sdong 已提交
3581
  v->storage_info_.SetFinalized();
3582

3583 3584
  // Make "v" current
  assert(v->refs_ == 0);
3585 3586 3587 3588 3589
  Version* current = column_family_data->current();
  assert(v != current);
  if (current != nullptr) {
    assert(current->refs_ > 0);
    current->Unref();
3590
  }
3591
  column_family_data->SetCurrent(v);
3592 3593 3594
  v->Ref();

  // Append to linked list
3595 3596
  v->prev_ = column_family_data->dummy_versions()->prev_;
  v->next_ = column_family_data->dummy_versions();
3597 3598 3599 3600
  v->prev_->next_ = v;
  v->next_->prev_ = v;
}

3601 3602
Status VersionSet::ProcessManifestWrites(
    std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
3603
    FSDirectory* db_directory, bool new_descriptor_log,
3604 3605 3606 3607
    const ColumnFamilyOptions* new_cf_options) {
  assert(!writers.empty());
  ManifestWriter& first_writer = writers.front();
  ManifestWriter* last_writer = &first_writer;
3608

3609 3610
  assert(!manifest_writers_.empty());
  assert(manifest_writers_.front() == &first_writer);
A
Abhishek Kona 已提交
3611

3612
  autovector<VersionEdit*> batch_edits;
3613 3614 3615 3616 3617 3618 3619 3620
  autovector<Version*> versions;
  autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
  std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;

  if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
    // No group commits for column family add or drop
    LogAndApplyCFHelper(first_writer.edit_list.front());
    batch_edits.push_back(first_writer.edit_list.front());
3621
  } else {
3622
    auto it = manifest_writers_.cbegin();
3623
    size_t group_start = std::numeric_limits<size_t>::max();
3624 3625
    while (it != manifest_writers_.cend()) {
      if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
3626 3627 3628
        // no group commits for column family add or drop
        break;
      }
3629 3630
      last_writer = *(it++);
      assert(last_writer != nullptr);
3631
      assert(last_writer->cfd != nullptr);
3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661
      if (last_writer->cfd->IsDropped()) {
        // If we detect a dropped CF at this point, and the corresponding
        // version edits belong to an atomic group, then we need to find out
        // the preceding version edits in the same atomic group, and update
        // their `remaining_entries_` member variable because we are NOT going
        // to write the version edits' of dropped CF to the MANIFEST. If we
        // don't update, then Recover can report corrupted atomic group because
        // the `remaining_entries_` do not match.
        if (!batch_edits.empty()) {
          if (batch_edits.back()->is_in_atomic_group_ &&
              batch_edits.back()->remaining_entries_ > 0) {
            assert(group_start < batch_edits.size());
            const auto& edit_list = last_writer->edit_list;
            size_t k = 0;
            while (k < edit_list.size()) {
              if (!edit_list[k]->is_in_atomic_group_) {
                break;
              } else if (edit_list[k]->remaining_entries_ == 0) {
                ++k;
                break;
              }
              ++k;
            }
            for (auto i = group_start; i < batch_edits.size(); ++i) {
              assert(static_cast<uint32_t>(k) <=
                     batch_edits.back()->remaining_entries_);
              batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
            }
          }
        }
3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680
        continue;
      }
      // We do a linear search on versions because versions is small.
      // TODO(yanqin) maybe consider unordered_map
      Version* version = nullptr;
      VersionBuilder* builder = nullptr;
      for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
        uint32_t cf_id = last_writer->cfd->GetID();
        if (versions[i]->cfd()->GetID() == cf_id) {
          version = versions[i];
          assert(!builder_guards.empty() &&
                 builder_guards.size() == versions.size());
          builder = builder_guards[i]->version_builder();
          TEST_SYNC_POINT_CALLBACK(
              "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
          break;
        }
      }
      if (version == nullptr) {
3681
        version = new Version(last_writer->cfd, this, file_options_,
3682 3683 3684 3685 3686 3687 3688 3689 3690 3691
                              last_writer->mutable_cf_options,
                              current_version_number_++);
        versions.push_back(version);
        mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
        builder_guards.emplace_back(
            new BaseReferencedVersionBuilder(last_writer->cfd));
        builder = builder_guards.back()->version_builder();
      }
      assert(builder != nullptr);  // make checker happy
      for (const auto& e : last_writer->edit_list) {
3692 3693 3694 3695 3696 3697 3698 3699 3700
        if (e->is_in_atomic_group_) {
          if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
              (batch_edits.back()->is_in_atomic_group_ &&
               batch_edits.back()->remaining_entries_ == 0)) {
            group_start = batch_edits.size();
          }
        } else if (group_start != std::numeric_limits<size_t>::max()) {
          group_start = std::numeric_limits<size_t>::max();
        }
C
Cheng Chang 已提交
3701
        Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
3702
        if (!s.ok()) {
C
Cheng Chang 已提交
3703 3704 3705 3706
          // free up the allocated memory
          for (auto v : versions) {
            delete v;
          }
3707 3708
          return s;
        }
3709
        batch_edits.push_back(e);
3710
      }
3711
    }
3712 3713 3714 3715
    for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
      assert(!builder_guards.empty() &&
             builder_guards.size() == versions.size());
      auto* builder = builder_guards[i]->version_builder();
C
Cheng Chang 已提交
3716
      Status s = builder->SaveTo(versions[i]->storage_info());
3717
      if (!s.ok()) {
C
Cheng Chang 已提交
3718 3719 3720 3721
        // free up the allocated memory
        for (auto v : versions) {
          delete v;
        }
3722 3723
        return s;
      }
3724
    }
J
jorlow@chromium.org 已提交
3725 3726
  }

3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762
#ifndef NDEBUG
  // Verify that version edits of atomic groups have correct
  // remaining_entries_.
  size_t k = 0;
  while (k < batch_edits.size()) {
    while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
      ++k;
    }
    if (k == batch_edits.size()) {
      break;
    }
    size_t i = k;
    while (i < batch_edits.size()) {
      if (!batch_edits[i]->is_in_atomic_group_) {
        break;
      }
      assert(i - k + batch_edits[i]->remaining_entries_ ==
             batch_edits[k]->remaining_entries_);
      if (batch_edits[i]->remaining_entries_ == 0) {
        ++i;
        break;
      }
      ++i;
    }
    assert(batch_edits[i - 1]->is_in_atomic_group_);
    assert(0 == batch_edits[i - 1]->remaining_entries_);
    std::vector<VersionEdit*> tmp;
    for (size_t j = k; j != i; ++j) {
      tmp.emplace_back(batch_edits[j]);
    }
    TEST_SYNC_POINT_CALLBACK(
        "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
    k = i;
  }
#endif  // NDEBUG

3763
  uint64_t new_manifest_file_size = 0;
C
Cheng Chang 已提交
3764
  Status s;
A
Abhishek Kona 已提交
3765

3766
  assert(pending_manifest_file_number_ == 0);
3767
  if (!descriptor_log_ ||
3768
      manifest_file_size_ > db_options_->max_manifest_file_size) {
3769
    TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
A
Abhishek Kona 已提交
3770
    new_descriptor_log = true;
3771 3772
  } else {
    pending_manifest_file_number_ = manifest_file_number_;
A
Abhishek Kona 已提交
3773 3774
  }

3775 3776 3777 3778
  // Local cached copy of state variable(s). WriteCurrentStateToManifest()
  // reads its content after releasing db mutex to avoid race with
  // SwitchMemtable().
  std::unordered_map<uint32_t, MutableCFState> curr_state;
I
Igor Canadi 已提交
3779
  if (new_descriptor_log) {
3780 3781 3782
    pending_manifest_file_number_ = NewFileNumber();
    batch_edits.back()->SetNextFile(next_file_number_.load());

3783 3784
    // if we are writing out new snapshot make sure to persist max column
    // family.
I
Igor Canadi 已提交
3785
    if (column_family_set_->GetMaxColumnFamily() > 0) {
3786
      first_writer.edit_list.front()->SetMaxColumnFamily(
3787
          column_family_set_->GetMaxColumnFamily());
I
Igor Canadi 已提交
3788
    }
3789 3790 3791 3792
    for (const auto* cfd : *column_family_set_) {
      assert(curr_state.find(cfd->GetID()) == curr_state.end());
      curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
    }
J
jorlow@chromium.org 已提交
3793 3794
  }

3795
  {
3796
    FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
3797
    mu->Unlock();
3798

S
sdong 已提交
3799
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3800
    if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3801 3802 3803 3804 3805 3806
      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
        assert(!builder_guards.empty() &&
               builder_guards.size() == versions.size());
        assert(!mutable_cf_options_ptrs.empty() &&
               builder_guards.size() == versions.size());
        ColumnFamilyData* cfd = versions[i]->cfd_;
3807
        s = builder_guards[i]->version_builder()->LoadTableHandlers(
3808
            cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
3809
            true /* prefetch_index_and_filter_in_cache */,
3810
            false /* is_initial_load */,
3811
            mutable_cf_options_ptrs[i]->prefix_extractor.get());
3812 3813 3814 3815 3816 3817
        if (!s.ok()) {
          if (db_options_->paranoid_checks) {
            break;
          }
          s = Status::OK();
        }
3818
      }
3819 3820
    }

3821 3822 3823
    if (s.ok() && new_descriptor_log) {
      // This is fine because everything inside of this block is serialized --
      // only one thread can be here at the same time
3824
      // create new manifest file
3825 3826
      ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
                     pending_manifest_file_number_);
3827 3828
      std::string descriptor_fname =
          DescriptorFileName(dbname_, pending_manifest_file_number_);
3829 3830 3831
      std::unique_ptr<FSWritableFile> descriptor_file;
      s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
                          opt_file_opts);
I
Igor Canadi 已提交
3832
      if (s.ok()) {
3833
        descriptor_file->SetPreallocationBlockSize(
3834
            db_options_->manifest_preallocation_size);
3835

3836
        std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
3837
            std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
3838
            nullptr, db_options_->listeners));
3839 3840
        descriptor_log_.reset(
            new log::Writer(std::move(file_writer), 0, false));
3841
        s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
I
Igor Canadi 已提交
3842 3843 3844
      }
    }

3845 3846 3847 3848 3849
    if (s.ok()) {
      if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
        for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
          versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
        }
3850
      }
3851

3852
      // Write new records to MANIFEST log
Y
Yanqin Jin 已提交
3853 3854 3855
#ifndef NDEBUG
      size_t idx = 0;
#endif
I
Igor Canadi 已提交
3856 3857
      for (auto& e : batch_edits) {
        std::string record;
3858
        if (!e->EncodeTo(&record)) {
3859 3860
          s = Status::Corruption("Unable to encode VersionEdit:" +
                                 e->DebugString(true));
3861 3862
          break;
        }
S
sdong 已提交
3863 3864
        TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
                         rocksdb_kill_odds * REDUCE_ODDS2);
Y
Yanqin Jin 已提交
3865 3866
#ifndef NDEBUG
        if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
3867 3868 3869
          TEST_SYNC_POINT_CALLBACK(
              "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
              nullptr);
Y
Yanqin Jin 已提交
3870 3871 3872 3873 3874
          TEST_SYNC_POINT(
              "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
        }
        ++idx;
#endif /* !NDEBUG */
3875 3876 3877 3878 3879
        s = descriptor_log_->AddRecord(record);
        if (!s.ok()) {
          break;
        }
      }
S
sdong 已提交
3880 3881
      if (s.ok()) {
        s = SyncManifest(env_, db_options_, descriptor_log_->file());
3882
      }
3883
      if (!s.ok()) {
3884
        ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
3885
                        s.ToString().c_str());
3886
      }
J
jorlow@chromium.org 已提交
3887 3888
    }

3889 3890
    // If we just created a new descriptor file, install it by writing a
    // new CURRENT file that points to it.
3891
    if (s.ok() && new_descriptor_log) {
A
Aaron Gao 已提交
3892 3893
      s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
                         db_directory);
3894
      TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
3895 3896
    }

3897 3898 3899 3900
    if (s.ok()) {
      // find offset in manifest file where this version is stored.
      new_manifest_file_size = descriptor_log_->file()->GetFileSize();
    }
A
Abhishek Kona 已提交
3901

3902
    if (first_writer.edit_list.front()->is_column_family_drop_) {
3903
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3904 3905 3906 3907
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
      TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
    }

3908
    LogFlush(db_options_->info_log);
3909
    TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3910
    mu->Lock();
J
jorlow@chromium.org 已提交
3911 3912
  }

3913
  // Append the old manifest file to the obsolete_manifest_ list to be deleted
3914 3915 3916 3917 3918 3919
  // by PurgeObsoleteFiles later.
  if (s.ok() && new_descriptor_log) {
    obsolete_manifests_.emplace_back(
        DescriptorFileName("", manifest_file_number_));
  }

3920
  // Install the new versions
J
jorlow@chromium.org 已提交
3921
  if (s.ok()) {
3922
    if (first_writer.edit_list.front()->is_column_family_add_) {
3923
      assert(batch_edits.size() == 1);
3924
      assert(new_cf_options != nullptr);
3925 3926
      CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
    } else if (first_writer.edit_list.front()->is_column_family_drop_) {
3927
      assert(batch_edits.size() == 1);
3928
      first_writer.cfd->SetDropped();
3929
      first_writer.cfd->UnrefAndTryDelete();
3930
    } else {
3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948
      // Each version in versions corresponds to a column family.
      // For each column family, update its log number indicating that logs
      // with number smaller than this should be ignored.
      for (const auto version : versions) {
        uint64_t max_log_number_in_batch = 0;
        uint32_t cf_id = version->cfd_->GetID();
        for (const auto& e : batch_edits) {
          if (e->has_log_number_ && e->column_family_ == cf_id) {
            max_log_number_in_batch =
                std::max(max_log_number_in_batch, e->log_number_);
          }
        }
        if (max_log_number_in_batch != 0) {
          assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
          version->cfd_->SetLogNumber(max_log_number_in_batch);
        }
      }

3949
      uint64_t last_min_log_number_to_keep = 0;
I
Igor Canadi 已提交
3950
      for (auto& e : batch_edits) {
S
Siying Dong 已提交
3951
        if (e->has_min_log_number_to_keep_) {
3952 3953
          last_min_log_number_to_keep =
              std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
S
Siying Dong 已提交
3954
        }
I
Igor Canadi 已提交
3955
      }
3956

3957
      if (last_min_log_number_to_keep != 0) {
S
Siying Dong 已提交
3958
        // Should only be set in 2PC mode.
3959
        MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
S
Siying Dong 已提交
3960 3961
      }

3962 3963 3964 3965
      for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
        ColumnFamilyData* cfd = versions[i]->cfd_;
        AppendVersion(cfd, versions[i]);
      }
3966
    }
3967
    manifest_file_number_ = pending_manifest_file_number_;
3968
    manifest_file_size_ = new_manifest_file_size;
3969
    prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
J
jorlow@chromium.org 已提交
3970
  } else {
3971 3972
    std::string version_edits;
    for (auto& e : batch_edits) {
3973 3974 3975 3976 3977
      version_edits += ("\n" + e->DebugString(true));
    }
    ROCKS_LOG_ERROR(db_options_->info_log,
                    "Error in committing version edit to MANIFEST: %s",
                    version_edits.c_str());
C
Cheng Chang 已提交
3978 3979 3980
    for (auto v : versions) {
      delete v;
    }
3981 3982 3983 3984
    // If manifest append failed for whatever reason, the file could be
    // corrupted. So we need to force the next version update to start a
    // new manifest file.
    descriptor_log_.reset();
3985
    if (new_descriptor_log) {
3986 3987 3988
      ROCKS_LOG_INFO(db_options_->info_log,
                     "Deleting manifest %" PRIu64 " current manifest %" PRIu64
                     "\n",
3989
                     manifest_file_number_, pending_manifest_file_number_);
3990 3991
      env_->DeleteFile(
          DescriptorFileName(dbname_, pending_manifest_file_number_));
J
jorlow@chromium.org 已提交
3992 3993
    }
  }
3994

3995
  pending_manifest_file_number_ = 0;
J
jorlow@chromium.org 已提交
3996

3997 3998 3999 4000
  // wake up all the waiting writers
  while (true) {
    ManifestWriter* ready = manifest_writers_.front();
    manifest_writers_.pop_front();
4001 4002 4003 4004 4005 4006 4007 4008 4009 4010
    bool need_signal = true;
    for (const auto& w : writers) {
      if (&w == ready) {
        need_signal = false;
        break;
      }
    }
    ready->status = s;
    ready->done = true;
    if (need_signal) {
4011 4012
      ready->cv.Signal();
    }
4013 4014 4015
    if (ready == last_writer) {
      break;
    }
4016 4017 4018 4019
  }
  if (!manifest_writers_.empty()) {
    manifest_writers_.front()->cv.Signal();
  }
J
jorlow@chromium.org 已提交
4020 4021 4022
  return s;
}

4023
// 'datas' is gramatically incorrect. We still use this notation to indicate
4024 4025
// that this variable represents a collection of column_family_data.
Status VersionSet::LogAndApply(
4026 4027 4028
    const autovector<ColumnFamilyData*>& column_family_datas,
    const autovector<const MutableCFOptions*>& mutable_cf_options_list,
    const autovector<autovector<VersionEdit*>>& edit_lists,
4029
    InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059
    const ColumnFamilyOptions* new_cf_options) {
  mu->AssertHeld();
  int num_edits = 0;
  for (const auto& elist : edit_lists) {
    num_edits += static_cast<int>(elist.size());
  }
  if (num_edits == 0) {
    return Status::OK();
  } else if (num_edits > 1) {
#ifndef NDEBUG
    for (const auto& edit_list : edit_lists) {
      for (const auto& edit : edit_list) {
        assert(!edit->IsColumnFamilyManipulation());
      }
    }
#endif /* ! NDEBUG */
  }

  int num_cfds = static_cast<int>(column_family_datas.size());
  if (num_cfds == 1 && column_family_datas[0] == nullptr) {
    assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
    assert(edit_lists[0][0]->is_column_family_add_);
    assert(new_cf_options != nullptr);
  }
  std::deque<ManifestWriter> writers;
  if (num_cfds > 0) {
    assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
    assert(static_cast<size_t>(num_cfds) == edit_lists.size());
  }
  for (int i = 0; i < num_cfds; ++i) {
4060 4061
    writers.emplace_back(mu, column_family_datas[i],
                         *mutable_cf_options_list[i], edit_lists[i]);
4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095
    manifest_writers_.push_back(&writers[i]);
  }
  assert(!writers.empty());
  ManifestWriter& first_writer = writers.front();
  while (!first_writer.done && &first_writer != manifest_writers_.front()) {
    first_writer.cv.Wait();
  }
  if (first_writer.done) {
    // All non-CF-manipulation operations can be grouped together and committed
    // to MANIFEST. They should all have finished. The status code is stored in
    // the first manifest writer.
#ifndef NDEBUG
    for (const auto& writer : writers) {
      assert(writer.done);
    }
#endif /* !NDEBUG */
    return first_writer.status;
  }

  int num_undropped_cfds = 0;
  for (auto cfd : column_family_datas) {
    // if cfd == nullptr, it is a column family add.
    if (cfd == nullptr || !cfd->IsDropped()) {
      ++num_undropped_cfds;
    }
  }
  if (0 == num_undropped_cfds) {
    for (int i = 0; i != num_cfds; ++i) {
      manifest_writers_.pop_front();
    }
    // Notify new head of manifest write queue.
    if (!manifest_writers_.empty()) {
      manifest_writers_.front()->cv.Signal();
    }
4096
    return Status::ColumnFamilyDropped();
4097 4098 4099 4100 4101 4102
  }

  return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
                               new_cf_options);
}

I
Igor Canadi 已提交
4103 4104
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
  assert(edit->IsColumnFamilyManipulation());
4105
  edit->SetNextFile(next_file_number_.load());
M
Maysam Yabandeh 已提交
4106 4107 4108 4109
  // The log might have data that is not visible to memtbale and hence have not
  // updated the last_sequence_ yet. It is also possible that the log has is
  // expecting some new data that is not written yet. Since LastSequence is an
  // upper bound on the sequence, it is ok to record
4110 4111 4112
  // last_allocated_sequence_ as the last sequence.
  edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
                                                      : last_sequence_);
I
Igor Canadi 已提交
4113 4114 4115 4116 4117 4118 4119
  if (edit->is_column_family_drop_) {
    // if we drop column family, we have to make sure to save max column family,
    // so that we don't reuse existing ID
    edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
  }
}

4120 4121 4122
Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
                                     VersionBuilder* builder, VersionEdit* edit,
                                     InstrumentedMutex* mu) {
4123 4124 4125
#ifdef NDEBUG
  (void)cfd;
#endif
4126
  mu->AssertHeld();
I
Igor Canadi 已提交
4127
  assert(!edit->IsColumnFamilyManipulation());
4128

4129 4130
  if (edit->has_log_number_) {
    assert(edit->log_number_ >= cfd->GetLogNumber());
4131
    assert(edit->log_number_ < next_file_number_.load());
I
Igor Canadi 已提交
4132
  }
4133

I
Igor Canadi 已提交
4134 4135 4136
  if (!edit->has_prev_log_number_) {
    edit->SetPrevLogNumber(prev_log_number_);
  }
4137
  edit->SetNextFile(next_file_number_.load());
M
Maysam Yabandeh 已提交
4138 4139 4140 4141
  // The log might have data that is not visible to memtbale and hence have not
  // updated the last_sequence_ yet. It is also possible that the log has is
  // expecting some new data that is not written yet. Since LastSequence is an
  // upper bound on the sequence, it is ok to record
4142 4143 4144
  // last_allocated_sequence_ as the last sequence.
  edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
                                                      : last_sequence_);
I
Igor Canadi 已提交
4145

4146 4147 4148
  Status s = builder->Apply(edit);

  return s;
4149 4150
}

4151
Status VersionSet::ApplyOneVersionEditToBuilder(
4152 4153 4154
    VersionEdit& edit,
    const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
    std::unordered_map<int, std::string>& column_families_not_found,
4155 4156
    std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
        builders,
4157
    VersionEditParams* version_edit_params) {
4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180
  // Not found means that user didn't supply that column
  // family option AND we encountered column family add
  // record. Once we encounter column family drop record,
  // we will delete the column family from
  // column_families_not_found.
  bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
                          column_families_not_found.end());
  // in builders means that user supplied that column family
  // option AND that we encountered column family add record
  bool cf_in_builders = builders.find(edit.column_family_) != builders.end();

  // they can't both be true
  assert(!(cf_in_not_found && cf_in_builders));

  ColumnFamilyData* cfd = nullptr;

  if (edit.is_column_family_add_) {
    if (cf_in_builders || cf_in_not_found) {
      return Status::Corruption(
          "Manifest adding the same column family twice: " +
          edit.column_family_name_);
    }
    auto cf_options = name_to_options.find(edit.column_family_name_);
4181 4182 4183 4184 4185 4186
    // implicitly add persistent_stats column family without requiring user
    // to specify
    bool is_persistent_stats_column_family =
        edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
    if (cf_options == name_to_options.end() &&
        !is_persistent_stats_column_family) {
4187 4188 4189
      column_families_not_found.insert(
          {edit.column_family_, edit.column_family_name_});
    } else {
4190 4191 4192 4193 4194 4195 4196 4197
      // recover persistent_stats CF from a DB that already contains it
      if (is_persistent_stats_column_family) {
        ColumnFamilyOptions cfo;
        OptimizeForPersistentStats(&cfo);
        cfd = CreateColumnFamily(cfo, &edit);
      } else {
        cfd = CreateColumnFamily(cf_options->second, &edit);
      }
4198
      cfd->set_initialized();
4199 4200 4201
      builders.insert(std::make_pair(
          edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
                                   new BaseReferencedVersionBuilder(cfd))));
4202 4203 4204 4205 4206 4207 4208 4209
    }
  } else if (edit.is_column_family_drop_) {
    if (cf_in_builders) {
      auto builder = builders.find(edit.column_family_);
      assert(builder != builders.end());
      builders.erase(builder);
      cfd = column_family_set_->GetColumnFamily(edit.column_family_);
      assert(cfd != nullptr);
4210
      if (cfd->UnrefAndTryDelete()) {
4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236
        cfd = nullptr;
      } else {
        // who else can have reference to cfd!?
        assert(false);
      }
    } else if (cf_in_not_found) {
      column_families_not_found.erase(edit.column_family_);
    } else {
      return Status::Corruption(
          "Manifest - dropping non-existing column family");
    }
  } else if (!cf_in_not_found) {
    if (!cf_in_builders) {
      return Status::Corruption(
          "Manifest record referencing unknown column family");
    }

    cfd = column_family_set_->GetColumnFamily(edit.column_family_);
    // this should never happen since cf_in_builders is true
    assert(cfd != nullptr);

    // if it is not column family add or column family drop,
    // then it's a file add/delete, which should be forwarded
    // to builder
    auto builder = builders.find(edit.column_family_);
    assert(builder != builders.end());
4237 4238 4239 4240
    Status s = builder->second->version_builder()->Apply(&edit);
    if (!s.ok()) {
      return s;
    }
4241
  }
4242
  return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
4243
}
4244

4245
Status VersionSet::ExtractInfoFromVersionEdit(
4246 4247
    ColumnFamilyData* cfd, const VersionEdit& from_edit,
    VersionEditParams* version_edit_params) {
4248
  if (cfd != nullptr) {
4249 4250 4251 4252 4253
    if (from_edit.has_db_id_) {
      version_edit_params->SetDBId(from_edit.db_id_);
    }
    if (from_edit.has_log_number_) {
      if (cfd->GetLogNumber() > from_edit.log_number_) {
4254 4255 4256 4257 4258
        ROCKS_LOG_WARN(
            db_options_->info_log,
            "MANIFEST corruption detected, but ignored - Log numbers in "
            "records NOT monotonically increasing");
      } else {
4259 4260
        cfd->SetLogNumber(from_edit.log_number_);
        version_edit_params->SetLogNumber(from_edit.log_number_);
4261 4262
      }
    }
4263 4264
    if (from_edit.has_comparator_ &&
        from_edit.comparator_ != cfd->user_comparator()->Name()) {
4265 4266
      return Status::InvalidArgument(
          cfd->user_comparator()->Name(),
4267
          "does not match existing comparator " + from_edit.comparator_);
4268 4269 4270
    }
  }

4271 4272
  if (from_edit.has_prev_log_number_) {
    version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
4273 4274
  }

4275 4276
  if (from_edit.has_next_file_number_) {
    version_edit_params->SetNextFile(from_edit.next_file_number_);
4277 4278
  }

4279 4280
  if (from_edit.has_max_column_family_) {
    version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
4281 4282
  }

4283 4284 4285 4286
  if (from_edit.has_min_log_number_to_keep_) {
    version_edit_params->min_log_number_to_keep_ =
        std::max(version_edit_params->min_log_number_to_keep_,
                 from_edit.min_log_number_to_keep_);
4287 4288
  }

4289 4290
  if (from_edit.has_last_sequence_) {
    version_edit_params->SetLastSequence(from_edit.last_sequence_);
4291 4292 4293 4294
  }
  return Status::OK();
}

4295 4296
Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
                                          FileSystem* fs,
4297 4298
                                          std::string* manifest_path,
                                          uint64_t* manifest_file_number) {
4299
  assert(fs != nullptr);
4300
  assert(manifest_path != nullptr);
4301 4302
  assert(manifest_file_number != nullptr);

4303
  std::string fname;
4304
  Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4305 4306 4307 4308 4309 4310 4311 4312 4313
  if (!s.ok()) {
    return s;
  }
  if (fname.empty() || fname.back() != '\n') {
    return Status::Corruption("CURRENT file does not end with newline");
  }
  // remove the trailing '\n'
  fname.resize(fname.size() - 1);
  FileType type;
4314
  bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4315 4316 4317
  if (!parse_ok || type != kDescriptorFile) {
    return Status::Corruption("CURRENT file corrupted");
  }
4318 4319
  *manifest_path = dbname;
  if (dbname.back() != '/') {
4320 4321 4322 4323 4324 4325
    manifest_path->push_back('/');
  }
  *manifest_path += fname;
  return Status::OK();
}

4326 4327 4328 4329 4330 4331
Status VersionSet::ReadAndRecover(
    log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
    const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
    std::unordered_map<int, std::string>& column_families_not_found,
    std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
        builders,
4332
    VersionEditParams* version_edit_params, std::string* db_id) {
4333 4334 4335 4336 4337 4338 4339 4340 4341 4342 4343 4344
  assert(reader != nullptr);
  assert(read_buffer != nullptr);
  Status s;
  Slice record;
  std::string scratch;
  size_t recovered_edits = 0;
  while (reader->ReadRecord(&record, &scratch) && s.ok()) {
    VersionEdit edit;
    s = edit.DecodeFrom(record);
    if (!s.ok()) {
      break;
    }
4345 4346 4347 4348 4349 4350
    if (edit.has_db_id_) {
      db_id_ = edit.GetDbId();
      if (db_id != nullptr) {
        db_id->assign(edit.GetDbId());
      }
    }
4351 4352 4353 4354 4355 4356 4357 4358 4359
    s = read_buffer->AddEdit(&edit);
    if (!s.ok()) {
      break;
    }
    if (edit.is_in_atomic_group_) {
      if (read_buffer->IsFull()) {
        // Apply edits in an atomic group when we have read all edits in the
        // group.
        for (auto& e : read_buffer->replay_buffer()) {
4360 4361 4362
          s = ApplyOneVersionEditToBuilder(e, name_to_options,
                                           column_families_not_found, builders,
                                           version_edit_params);
4363 4364 4365 4366 4367 4368 4369 4370 4371 4372 4373 4374
          if (!s.ok()) {
            break;
          }
          recovered_edits++;
        }
        if (!s.ok()) {
          break;
        }
        read_buffer->Clear();
      }
    } else {
      // Apply a normal edit immediately.
4375 4376 4377
      s = ApplyOneVersionEditToBuilder(edit, name_to_options,
                                       column_families_not_found, builders,
                                       version_edit_params);
4378 4379 4380 4381 4382 4383 4384 4385 4386 4387 4388 4389 4390 4391
      if (s.ok()) {
        recovered_edits++;
      }
    }
  }
  if (!s.ok()) {
    // Clear the buffer if we fail to decode/apply an edit.
    read_buffer->Clear();
  }
  TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
                           &recovered_edits);
  return s;
}

I
Igor Canadi 已提交
4392
Status VersionSet::Recover(
4393 4394
    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
    std::string* db_id) {
I
Igor Canadi 已提交
4395
  std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
4396 4397
  for (const auto& cf : column_families) {
    cf_name_to_options.emplace(cf.name, cf.options);
I
Igor Canadi 已提交
4398 4399 4400 4401
  }
  // keeps track of column families in manifest that were not found in
  // column families parameters. if those column families are not dropped
  // by subsequent manifest records, Recover() will return failure status
I
Igor Canadi 已提交
4402
  std::unordered_map<int, std::string> column_families_not_found;
J
jorlow@chromium.org 已提交
4403 4404

  // Read "CURRENT" file, which contains a pointer to the current manifest file
4405
  std::string manifest_path;
4406
  Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
4407
                                    &manifest_file_number_);
J
jorlow@chromium.org 已提交
4408 4409 4410 4411
  if (!s.ok()) {
    return s;
  }

4412
  ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4413
                 manifest_path.c_str());
H
heyongqiang 已提交
4414

4415
  std::unique_ptr<SequentialFileReader> manifest_file_reader;
4416
  {
4417 4418 4419 4420
    std::unique_ptr<FSSequentialFile> manifest_file;
    s = fs_->NewSequentialFile(manifest_path,
                               fs_->OptimizeForManifestRead(file_options_),
                               &manifest_file, nullptr);
4421 4422 4423 4424
    if (!s.ok()) {
      return s;
    }
    manifest_file_reader.reset(
4425 4426
        new SequentialFileReader(std::move(manifest_file), manifest_path,
                                 db_options_->log_readahead_size));
J
jorlow@chromium.org 已提交
4427 4428
  }

4429 4430
  std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
      builders;
J
jorlow@chromium.org 已提交
4431

4432
  // add default column family
4433
  auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
I
Igor Canadi 已提交
4434
  if (default_cf_iter == cf_name_to_options.end()) {
I
Igor Canadi 已提交
4435
    return Status::InvalidArgument("Default column family not specified");
I
Igor Canadi 已提交
4436
  }
I
Igor Canadi 已提交
4437
  VersionEdit default_cf_edit;
4438
  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
I
Igor Canadi 已提交
4439 4440 4441
  default_cf_edit.SetColumnFamily(0);
  ColumnFamilyData* default_cfd =
      CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
4442 4443 4444
  // In recovery, nobody else can access it, so it's fine to set it to be
  // initialized earlier.
  default_cfd->set_initialized();
4445 4446 4447
  builders.insert(
      std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
                            new BaseReferencedVersionBuilder(default_cfd))));
4448
  uint64_t current_manifest_file_size = 0;
4449
  VersionEditParams version_edit_params;
J
jorlow@chromium.org 已提交
4450
  {
I
Igor Canadi 已提交
4451
    VersionSet::LogReporter reporter;
J
jorlow@chromium.org 已提交
4452
    reporter.status = &s;
4453
    log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4454
                       true /* checksum */, 0 /* log_number */);
J
jorlow@chromium.org 已提交
4455 4456
    Slice record;
    std::string scratch;
4457
    AtomicGroupReadBuffer read_buffer;
4458 4459 4460
    s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
                       column_families_not_found, builders,
                       &version_edit_params, db_id);
4461 4462
    current_manifest_file_size = reader.GetReadOffset();
    assert(current_manifest_file_size != 0);
J
jorlow@chromium.org 已提交
4463 4464 4465
  }

  if (s.ok()) {
4466
    if (!version_edit_params.has_next_file_number_) {
J
jorlow@chromium.org 已提交
4467
      s = Status::Corruption("no meta-nextfile entry in descriptor");
4468
    } else if (!version_edit_params.has_log_number_) {
J
jorlow@chromium.org 已提交
4469
      s = Status::Corruption("no meta-lognumber entry in descriptor");
4470
    } else if (!version_edit_params.has_last_sequence_) {
J
jorlow@chromium.org 已提交
4471 4472
      s = Status::Corruption("no last-sequence-number entry in descriptor");
    }
4473

4474 4475
    if (!version_edit_params.has_prev_log_number_) {
      version_edit_params.SetPrevLogNumber(0);
4476
    }
4477

4478 4479
    column_family_set_->UpdateMaxColumnFamily(
        version_edit_params.max_column_family_);
4480

S
Siying Dong 已提交
4481 4482
    // When reading DB generated using old release, min_log_number_to_keep=0.
    // All log files will be scanned for potential prepare entries.
4483 4484 4485
    MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
    MarkFileNumberUsed(version_edit_params.prev_log_number_);
    MarkFileNumberUsed(version_edit_params.log_number_);
J
jorlow@chromium.org 已提交
4486 4487
  }

I
Igor Canadi 已提交
4488
  // there were some column families in the MANIFEST that weren't specified
4489
  // in the argument. This is OK in read_only mode
4490
  if (read_only == false && !column_families_not_found.empty()) {
4491
    std::string list_of_not_found;
I
Igor Canadi 已提交
4492 4493
    for (const auto& cf : column_families_not_found) {
      list_of_not_found += ", " + cf.second;
4494 4495
    }
    list_of_not_found = list_of_not_found.substr(2);
I
Igor Canadi 已提交
4496
    s = Status::InvalidArgument(
I
Igor Canadi 已提交
4497 4498
        "You have to open all column families. Column families not opened: " +
        list_of_not_found);
I
Igor Canadi 已提交
4499 4500
  }

4501 4502 4503 4504 4505 4506 4507 4508 4509 4510 4511 4512
  if (s.ok()) {
    for (auto cfd : *column_family_set_) {
      assert(builders.count(cfd->GetID()) > 0);
      auto* builder = builders[cfd->GetID()]->version_builder();
      if (!builder->CheckConsistencyForNumLevels()) {
        s = Status::InvalidArgument(
            "db has more levels than options.num_levels");
        break;
      }
    }
  }

J
jorlow@chromium.org 已提交
4513
  if (s.ok()) {
I
Igor Canadi 已提交
4514
    for (auto cfd : *column_family_set_) {
4515 4516 4517
      if (cfd->IsDropped()) {
        continue;
      }
4518 4519 4520
      if (read_only) {
        cfd->table_cache()->SetTablesAreImmortal();
      }
4521
      assert(cfd->initialized());
4522 4523
      auto builders_iter = builders.find(cfd->GetID());
      assert(builders_iter != builders.end());
4524
      auto builder = builders_iter->second->version_builder();
4525

4526 4527
      // unlimited table cache. Pre-load table handle now.
      // Need to do it out of the mutex.
4528
      s = builder->LoadTableHandlers(
4529 4530 4531 4532
          cfd->internal_stats(), db_options_->max_file_opening_threads,
          false /* prefetch_index_and_filter_in_cache */,
          true /* is_initial_load */,
          cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
4533 4534 4535 4536 4537 4538
      if (!s.ok()) {
        if (db_options_->paranoid_checks) {
          return s;
        }
        s = Status::OK();
      }
4539

4540
      Version* v = new Version(cfd, this, file_options_,
4541 4542
                               *cfd->GetLatestMutableCFOptions(),
                               current_version_number_++);
S
sdong 已提交
4543
      builder->SaveTo(v->storage_info());
4544

4545
      // Install recovered version
4546 4547
      v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
          !(db_options_->skip_stats_update_on_db_open));
I
Igor Canadi 已提交
4548
      AppendVersion(cfd, v);
4549
    }
4550

I
Igor Canadi 已提交
4551
    manifest_file_size_ = current_manifest_file_size;
4552 4553 4554 4555 4556
    next_file_number_.store(version_edit_params.next_file_number_ + 1);
    last_allocated_sequence_ = version_edit_params.last_sequence_;
    last_published_sequence_ = version_edit_params.last_sequence_;
    last_sequence_ = version_edit_params.last_sequence_;
    prev_log_number_ = version_edit_params.prev_log_number_;
H
heyongqiang 已提交
4557

4558 4559
    ROCKS_LOG_INFO(
        db_options_->info_log,
4560
        "Recovered from manifest file:%s succeeded,"
4561 4562 4563 4564
        "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
        ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
        ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
        ",min_log_number_to_keep is %" PRIu64 "\n",
4565
        manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4566 4567 4568
        last_sequence_.load(), version_edit_params.log_number_,
        prev_log_number_, column_family_set_->GetMaxColumnFamily(),
        min_log_number_to_keep_2pc());
4569 4570

    for (auto cfd : *column_family_set_) {
4571 4572 4573
      if (cfd->IsDropped()) {
        continue;
      }
4574
      ROCKS_LOG_INFO(db_options_->info_log,
4575 4576
                     "Column family [%s] (ID %" PRIu32
                     "), log number is %" PRIu64 "\n",
4577
                     cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4578
    }
J
jorlow@chromium.org 已提交
4579 4580 4581 4582 4583
  }

  return s;
}

I
Igor Canadi 已提交
4584
Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
4585 4586
                                      const std::string& dbname,
                                      FileSystem* fs) {
I
Igor Canadi 已提交
4587 4588
  // these are just for performance reasons, not correcntes,
  // so we're fine using the defaults
4589
  FileOptions soptions;
I
Igor Canadi 已提交
4590
  // Read "CURRENT" file, which contains a pointer to the current manifest file
4591 4592
  std::string manifest_path;
  uint64_t manifest_file_number;
4593 4594
  Status s =
      GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
I
Igor Canadi 已提交
4595 4596 4597
  if (!s.ok()) {
    return s;
  }
4598

4599
  std::unique_ptr<SequentialFileReader> file_reader;
4600
  {
4601 4602
    std::unique_ptr<FSSequentialFile> file;
    s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
4603 4604
    if (!s.ok()) {
      return s;
I
Igor Canadi 已提交
4605
  }
4606
  file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
4607
  }
I
Igor Canadi 已提交
4608 4609 4610

  std::map<uint32_t, std::string> column_family_names;
  // default column family is always implicitly there
4611
  column_family_names.insert({0, kDefaultColumnFamilyName});
I
Igor Canadi 已提交
4612 4613
  VersionSet::LogReporter reporter;
  reporter.status = &s;
4614
  log::Reader reader(nullptr, std::move(file_reader), &reporter,
4615
                     true /* checksum */, 0 /* log_number */);
I
Igor Canadi 已提交
4616 4617 4618
  Slice record;
  std::string scratch;
  while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4619 4620 4621 4622 4623 4624
    VersionEdit edit;
    s = edit.DecodeFrom(record);
    if (!s.ok()) {
      break;
    }
    if (edit.is_column_family_add_) {
4625 4626 4627 4628 4629
      if (column_family_names.find(edit.column_family_) !=
          column_family_names.end()) {
        s = Status::Corruption("Manifest adding the same column family twice");
        break;
      }
4630 4631 4632
      column_family_names.insert(
          {edit.column_family_, edit.column_family_name_});
    } else if (edit.is_column_family_drop_) {
4633 4634 4635 4636 4637 4638
      if (column_family_names.find(edit.column_family_) ==
          column_family_names.end()) {
        s = Status::Corruption(
            "Manifest - dropping non-existing column family");
        break;
      }
4639 4640
      column_family_names.erase(edit.column_family_);
    }
I
Igor Canadi 已提交
4641 4642 4643 4644 4645 4646
  }

  column_families->clear();
  if (s.ok()) {
    for (const auto& iter : column_family_names) {
      column_families->push_back(iter.second);
4647
    }
I
Igor Canadi 已提交
4648 4649 4650 4651
  }

  return s;
}
4652

I
Igor Canadi 已提交
4653
#ifndef ROCKSDB_LITE
4654 4655
Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
                                        const Options* options,
4656
                                        const FileOptions& file_options,
4657 4658 4659 4660 4661 4662
                                        int new_levels) {
  if (new_levels <= 1) {
    return Status::InvalidArgument(
        "Number of levels needs to be bigger than 1");
  }

4663
  ImmutableDBOptions db_options(*options);
I
Igor Canadi 已提交
4664
  ColumnFamilyOptions cf_options(*options);
4665 4666
  std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
                                        options->table_cache_numshardbits));
S
sdong 已提交
4667
  WriteController wc(options->delayed_write_rate);
4668
  WriteBufferManager wb(options->db_write_buffer_size);
4669
  VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
4670
                      /*block_cache_tracer=*/nullptr);
4671 4672
  Status status;

4673
  std::vector<ColumnFamilyDescriptor> dummy;
4674
  ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
I
Igor Canadi 已提交
4675
                                          ColumnFamilyOptions(*options));
I
Igor Canadi 已提交
4676
  dummy.push_back(dummy_descriptor);
4677
  status = versions.Recover(dummy);
4678 4679 4680 4681
  if (!status.ok()) {
    return status;
  }

4682
  Version* current_version =
4683
      versions.GetColumnFamilySet()->GetDefault()->current();
S
sdong 已提交
4684
  auto* vstorage = current_version->storage_info();
4685
  int current_levels = vstorage->num_levels();
4686 4687 4688 4689 4690 4691 4692 4693 4694 4695

  if (current_levels <= new_levels) {
    return Status::OK();
  }

  // Make sure there are file only on one level from
  // (new_levels-1) to (current_levels-1)
  int first_nonempty_level = -1;
  int first_nonempty_level_filenum = 0;
  for (int i = new_levels - 1; i < current_levels; i++) {
S
sdong 已提交
4696
    int file_num = vstorage->NumLevelFiles(i);
4697 4698 4699 4700 4701 4702 4703 4704 4705 4706 4707 4708 4709 4710 4711 4712
    if (file_num != 0) {
      if (first_nonempty_level < 0) {
        first_nonempty_level = i;
        first_nonempty_level_filenum = file_num;
      } else {
        char msg[255];
        snprintf(msg, sizeof(msg),
                 "Found at least two levels containing files: "
                 "[%d:%d],[%d:%d].\n",
                 first_nonempty_level, first_nonempty_level_filenum, i,
                 file_num);
        return Status::InvalidArgument(msg);
      }
    }
  }

I
Igor Canadi 已提交
4713
  // we need to allocate an array with the old number of levels size to
4714
  // avoid SIGSEGV in WriteCurrentStatetoManifest()
I
Igor Canadi 已提交
4715
  // however, all levels bigger or equal to new_levels will be empty
4716
  std::vector<FileMetaData*>* new_files_list =
I
Igor Canadi 已提交
4717
      new std::vector<FileMetaData*>[current_levels];
4718
  for (int i = 0; i < new_levels - 1; i++) {
S
sdong 已提交
4719
    new_files_list[i] = vstorage->LevelFiles(i);
4720 4721 4722
  }

  if (first_nonempty_level > 0) {
S
sdong 已提交
4723
    new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
4724 4725
  }

S
sdong 已提交
4726 4727 4728
  delete[] vstorage -> files_;
  vstorage->files_ = new_files_list;
  vstorage->num_levels_ = new_levels;
4729

Y
Yi Wu 已提交
4730
  MutableCFOptions mutable_cf_options(*options);
4731
  VersionEdit ve;
4732 4733
  InstrumentedMutex dummy_mutex;
  InstrumentedMutexLock l(&dummy_mutex);
4734 4735 4736
  return versions.LogAndApply(
      versions.GetColumnFamilySet()->GetDefault(),
      mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
4737 4738
}

4739 4740 4741 4742 4743 4744 4745 4746 4747 4748 4749 4750 4751 4752 4753 4754 4755 4756 4757 4758 4759 4760 4761 4762 4763 4764 4765 4766
// Get the checksum information including the checksum and checksum function
// name of all SST files in VersionSet. Store the information in
// FileChecksumList which contains a map from file number to its checksum info.
// If DB is not running, make sure call VersionSet::Recover() to load the file
// metadata from Manifest to VersionSet before calling this function.
Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
  // Clean the previously stored checksum information if any.
  if (checksum_list == nullptr) {
    return Status::InvalidArgument("checksum_list is nullptr");
  }
  checksum_list->reset();

  for (auto cfd : *column_family_set_) {
    if (cfd->IsDropped() || !cfd->initialized()) {
      continue;
    }
    for (int level = 0; level < cfd->NumberLevels(); level++) {
      for (const auto& file :
           cfd->current()->storage_info()->LevelFiles(level)) {
        checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
                                             file->file_checksum,
                                             file->file_checksum_func_name);
      }
    }
  }
  return Status::OK();
}

I
Igor Canadi 已提交
4767
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
4768
                                bool verbose, bool hex, bool json) {
4769
  // Open the specified manifest file.
4770
  std::unique_ptr<SequentialFileReader> file_reader;
4771 4772
  Status s;
  {
4773 4774 4775 4776 4777
    std::unique_ptr<FSSequentialFile> file;
    s = options.file_system->NewSequentialFile(
        dscname,
        options.file_system->OptimizeForManifestRead(file_options_), &file,
        nullptr);
4778 4779 4780
    if (!s.ok()) {
      return s;
    }
4781 4782
    file_reader.reset(new SequentialFileReader(
        std::move(file), dscname, db_options_->log_readahead_size));
4783 4784 4785 4786 4787 4788 4789
  }

  bool have_prev_log_number = false;
  bool have_next_file = false;
  bool have_last_sequence = false;
  uint64_t next_file = 0;
  uint64_t last_sequence = 0;
I
Igor Canadi 已提交
4790
  uint64_t previous_log_number = 0;
4791
  int count = 0;
4792
  std::unordered_map<uint32_t, std::string> comparators;
4793 4794
  std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
      builders;
4795 4796 4797

  // add default column family
  VersionEdit default_cf_edit;
4798
  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4799 4800 4801
  default_cf_edit.SetColumnFamily(0);
  ColumnFamilyData* default_cfd =
      CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
4802 4803 4804
  builders.insert(
      std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
                            new BaseReferencedVersionBuilder(default_cfd))));
4805 4806

  {
I
Igor Canadi 已提交
4807
    VersionSet::LogReporter reporter;
4808
    reporter.status = &s;
4809
    log::Reader reader(nullptr, std::move(file_reader), &reporter,
4810
                       true /* checksum */, 0 /* log_number */);
4811 4812 4813
    Slice record;
    std::string scratch;
    while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4814
      VersionEdit edit;
4815
      s = edit.DecodeFrom(record);
4816 4817
      if (!s.ok()) {
        break;
4818 4819
      }

4820
      // Write out each individual edit
4821 4822 4823 4824
      if (verbose && !json) {
        printf("%s\n", edit.DebugString(hex).c_str());
      } else if (json) {
        printf("%s\n", edit.DebugJSON(count, hex).c_str());
4825 4826 4827
      }
      count++;

4828 4829 4830 4831 4832
      bool cf_in_builders =
          builders.find(edit.column_family_) != builders.end();

      if (edit.has_comparator_) {
        comparators.insert({edit.column_family_, edit.comparator_});
4833 4834
      }

4835 4836
      ColumnFamilyData* cfd = nullptr;

4837 4838 4839 4840 4841 4842
      if (edit.is_column_family_add_) {
        if (cf_in_builders) {
          s = Status::Corruption(
              "Manifest adding the same column family twice");
          break;
        }
4843
        cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
4844
        cfd->set_initialized();
4845 4846 4847
        builders.insert(std::make_pair(
            edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
                                     new BaseReferencedVersionBuilder(cfd))));
4848 4849 4850 4851 4852 4853 4854 4855 4856
      } else if (edit.is_column_family_drop_) {
        if (!cf_in_builders) {
          s = Status::Corruption(
              "Manifest - dropping non-existing column family");
          break;
        }
        auto builder_iter = builders.find(edit.column_family_);
        builders.erase(builder_iter);
        comparators.erase(edit.column_family_);
4857
        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4858
        assert(cfd != nullptr);
4859
        cfd->UnrefAndTryDelete();
4860
        cfd = nullptr;
4861 4862 4863 4864 4865 4866 4867
      } else {
        if (!cf_in_builders) {
          s = Status::Corruption(
              "Manifest record referencing unknown column family");
          break;
        }

4868
        cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4869 4870 4871 4872 4873 4874 4875 4876
        // this should never happen since cf_in_builders is true
        assert(cfd != nullptr);

        // if it is not column family add or column family drop,
        // then it's a file add/delete, which should be forwarded
        // to builder
        auto builder = builders.find(edit.column_family_);
        assert(builder != builders.end());
4877 4878 4879 4880
        s = builder->second->version_builder()->Apply(&edit);
        if (!s.ok()) {
          break;
        }
4881 4882
      }

4883 4884 4885 4886
      if (cfd != nullptr && edit.has_log_number_) {
        cfd->SetLogNumber(edit.log_number_);
      }

S
Siying Dong 已提交
4887

4888
      if (edit.has_prev_log_number_) {
I
Igor Canadi 已提交
4889
        previous_log_number = edit.prev_log_number_;
4890 4891 4892 4893 4894 4895 4896 4897 4898 4899 4900 4901
        have_prev_log_number = true;
      }

      if (edit.has_next_file_number_) {
        next_file = edit.next_file_number_;
        have_next_file = true;
      }

      if (edit.has_last_sequence_) {
        last_sequence = edit.last_sequence_;
        have_last_sequence = true;
      }
4902 4903 4904 4905

      if (edit.has_max_column_family_) {
        column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
      }
S
Siying Dong 已提交
4906 4907 4908 4909

      if (edit.has_min_log_number_to_keep_) {
        MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
      }
4910 4911
    }
  }
4912
  file_reader.reset();
4913 4914 4915 4916 4917 4918 4919 4920 4921 4922 4923

  if (s.ok()) {
    if (!have_next_file) {
      s = Status::Corruption("no meta-nextfile entry in descriptor");
      printf("no meta-nextfile entry in descriptor");
    } else if (!have_last_sequence) {
      printf("no last-sequence-number entry in descriptor");
      s = Status::Corruption("no last-sequence-number entry in descriptor");
    }

    if (!have_prev_log_number) {
I
Igor Canadi 已提交
4924
      previous_log_number = 0;
4925 4926 4927 4928
    }
  }

  if (s.ok()) {
4929
    for (auto cfd : *column_family_set_) {
4930 4931 4932
      if (cfd->IsDropped()) {
        continue;
      }
4933 4934
      auto builders_iter = builders.find(cfd->GetID());
      assert(builders_iter != builders.end());
4935
      auto builder = builders_iter->second->version_builder();
4936

4937
      Version* v = new Version(cfd, this, file_options_,
4938 4939
                               *cfd->GetLatestMutableCFOptions(),
                               current_version_number_++);
S
sdong 已提交
4940
      builder->SaveTo(v->storage_info());
4941
      v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
4942

4943 4944 4945 4946
      printf("--------------- Column family \"%s\"  (ID %" PRIu32
             ") --------------\n",
             cfd->GetName().c_str(), cfd->GetID());
      printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
4947 4948 4949 4950 4951 4952 4953 4954 4955
      auto comparator = comparators.find(cfd->GetID());
      if (comparator != comparators.end()) {
        printf("comparator: %s\n", comparator->second.c_str());
      } else {
        printf("comparator: <NO COMPARATOR>\n");
      }
      printf("%s \n", v->DebugString(hex).c_str());
      delete v;
    }
4956

4957
    next_file_number_.store(next_file + 1);
4958
    last_allocated_sequence_ = last_sequence;
4959
    last_published_sequence_ = last_sequence;
4960
    last_sequence_ = last_sequence;
I
Igor Canadi 已提交
4961
    prev_log_number_ = previous_log_number;
4962

4963 4964 4965 4966 4967 4968 4969
    printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
           "  prev_log_number %" PRIu64 " max_column_family %" PRIu32
           " min_log_number_to_keep "
           "%" PRIu64 "\n",
           next_file_number_.load(), last_sequence, previous_log_number,
           column_family_set_->GetMaxColumnFamily(),
           min_log_number_to_keep_2pc());
4970
  }
4971

4972 4973
  return s;
}
I
Igor Canadi 已提交
4974
#endif  // ROCKSDB_LITE
4975

A
Andrew Kryczka 已提交
4976 4977 4978
void VersionSet::MarkFileNumberUsed(uint64_t number) {
  // only called during recovery and repair which are single threaded, so this
  // works because there can't be concurrent calls
4979 4980
  if (next_file_number_.load(std::memory_order_relaxed) <= number) {
    next_file_number_.store(number + 1, std::memory_order_relaxed);
4981 4982
  }
}
S
Siying Dong 已提交
4983 4984 4985 4986 4987 4988 4989 4990
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
  if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
    min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
  }
}

4991 4992 4993
Status VersionSet::WriteCurrentStateToManifest(
    const std::unordered_map<uint32_t, MutableCFState>& curr_state,
    log::Writer* log) {
J
jorlow@chromium.org 已提交
4994
  // TODO: Break up into multiple records to reduce memory usage on recovery?
4995

I
Igor Canadi 已提交
4996 4997
  // WARNING: This method doesn't hold a mutex!!

I
Igor Canadi 已提交
4998 4999
  // This is done without DB mutex lock held, but only within single-threaded
  // LogAndApply. Column family manipulations can only happen within LogAndApply
I
Igor Canadi 已提交
5000
  // (the same single thread), so we're safe to iterate.
5001 5002 5003 5004 5005

  if (db_options_->write_dbid_to_manifest) {
    VersionEdit edit_for_db_id;
    assert(!db_id_.empty());
    edit_for_db_id.SetDBId(db_id_);
5006
    edit_for_db_id.SetStateUponManifestSwitch(true);
5007 5008 5009 5010 5011 5012 5013 5014 5015 5016 5017
    std::string db_id_record;
    if (!edit_for_db_id.EncodeTo(&db_id_record)) {
      return Status::Corruption("Unable to Encode VersionEdit:" +
                                edit_for_db_id.DebugString(true));
    }
    Status add_record = log->AddRecord(db_id_record);
    if (!add_record.ok()) {
      return add_record;
    }
  }

I
Igor Canadi 已提交
5018
  for (auto cfd : *column_family_set_) {
5019 5020 5021
    if (cfd->IsDropped()) {
      continue;
    }
5022
    assert(cfd->initialized());
5023 5024 5025
    {
      // Store column family info
      VersionEdit edit;
5026
      if (cfd->GetID() != 0) {
5027 5028
        // default column family is always there,
        // no need to explicitly write it
5029 5030
        edit.AddColumnFamily(cfd->GetName());
        edit.SetColumnFamily(cfd->GetID());
I
Igor Canadi 已提交
5031
      }
5032
      edit.SetStateUponManifestSwitch(true);
I
Igor Canadi 已提交
5033 5034 5035
      edit.SetComparatorName(
          cfd->internal_comparator().user_comparator()->Name());
      std::string record;
5036 5037 5038 5039
      if (!edit.EncodeTo(&record)) {
        return Status::Corruption(
            "Unable to Encode VersionEdit:" + edit.DebugString(true));
      }
I
Igor Canadi 已提交
5040 5041 5042
      Status s = log->AddRecord(record);
      if (!s.ok()) {
        return s;
5043
      }
5044
    }
5045

5046 5047 5048
    {
      // Save files
      VersionEdit edit;
5049
      edit.SetColumnFamily(cfd->GetID());
5050
      edit.SetStateUponManifestSwitch(true);
5051

I
Igor Canadi 已提交
5052
      for (int level = 0; level < cfd->NumberLevels(); level++) {
S
sdong 已提交
5053 5054
        for (const auto& f :
             cfd->current()->storage_info()->LevelFiles(level)) {
5055 5056
          edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
                       f->fd.GetFileSize(), f->smallest, f->largest,
5057
                       f->fd.smallest_seqno, f->fd.largest_seqno,
5058
                       f->marked_for_compaction, f->oldest_blob_file_number,
5059 5060
                       f->oldest_ancester_time, f->file_creation_time,
                       f->file_checksum, f->file_checksum_func_name);
5061 5062
        }
      }
5063 5064 5065 5066
      const auto iter = curr_state.find(cfd->GetID());
      assert(iter != curr_state.end());
      uint64_t log_number = iter->second.log_number;
      edit.SetLogNumber(log_number);
5067
      std::string record;
5068 5069 5070 5071
      if (!edit.EncodeTo(&record)) {
        return Status::Corruption(
            "Unable to Encode VersionEdit:" + edit.DebugString(true));
      }
5072 5073 5074
      Status s = log->AddRecord(record);
      if (!s.ok()) {
        return s;
5075
      }
5076 5077
    }
  }
5078 5079 5080 5081 5082 5083 5084 5085 5086 5087
  VersionEdit end_flag;
  end_flag.SetStateUponManifestSwitch(true);
  end_flag.SetManifestSwitched(true);
  std::string end_record;
  if (!end_flag.EncodeTo(&end_record)) {
    return Status::Corruption("Unable to Encode VersionEdit:" +
                              end_flag.DebugString(true));
  }
  Status s_end_record = log->AddRecord(end_record);
  return  s_end_record;
J
jorlow@chromium.org 已提交
5088 5089
}

5090 5091 5092 5093 5094 5095
// TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
// function is called repeatedly with consecutive pairs of slices. For example
// if the slice list is [a, b, c, d] this function is called with arguments
// (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
// we avoid doing binary search for the keys b and c twice and instead somehow
// maintain state of where they first appear in the files.
5096 5097
uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
                                     Version* v, const Slice& start,
5098
                                     const Slice& end, int start_level,
5099
                                     int end_level, TableReaderCaller caller) {
5100 5101
  const auto& icmp = v->cfd_->internal_comparator();

5102
  // pre-condition
5103
  assert(icmp.Compare(start, end) <= 0);
5104

5105
  uint64_t total_full_size = 0;
S
sdong 已提交
5106
  const auto* vstorage = v->storage_info();
5107 5108 5109
  const int num_non_empty_levels = vstorage->num_non_empty_levels();
  end_level = (end_level == -1) ? num_non_empty_levels
                                : std::min(end_level, num_non_empty_levels);
5110

5111 5112
  assert(start_level <= end_level);

5113 5114 5115 5116 5117 5118 5119 5120 5121 5122 5123 5124 5125 5126 5127 5128 5129 5130 5131 5132
  // Outline of the optimization that uses options.files_size_error_margin.
  // When approximating the files total size that is used to store a keys range,
  // we first sum up the sizes of the files that fully fall into the range.
  // Then we sum up the sizes of all the files that may intersect with the range
  // (this includes all files in L0 as well). Then, if total_intersecting_size
  // is smaller than total_full_size * options.files_size_error_margin - we can
  // infer that the intersecting files have a sufficiently negligible
  // contribution to the total size, and we can approximate the storage required
  // for the keys in range as just half of the intersecting_files_size.
  // E.g., if the value of files_size_error_margin is 0.1, then the error of the
  // approximation is limited to only ~10% of the total size of files that fully
  // fall into the keys range. In such case, this helps to avoid a costly
  // process of binary searching the intersecting files that is required only
  // for a more precise calculation of the total size.

  autovector<FdWithKeyRange*, 32> first_files;
  autovector<FdWithKeyRange*, 16> last_files;

  // scan all the levels
  for (int level = start_level; level < end_level; ++level) {
5133
    const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5134
    if (files_brief.num_files == 0) {
5135 5136 5137 5138
      // empty level, skip exploration
      continue;
    }

5139 5140 5141 5142 5143 5144 5145
    if (level == 0) {
      // level 0 files are not in sorted order, we need to iterate through
      // the list to compute the total bytes that require scanning,
      // so handle the case explicitly (similarly to first_files case)
      for (size_t i = 0; i < files_brief.num_files; i++) {
        first_files.push_back(&files_brief.files[i]);
      }
5146 5147 5148 5149 5150 5151
      continue;
    }

    assert(level > 0);
    assert(files_brief.num_files > 0);

5152 5153 5154 5155 5156
    // identify the file position for start key
    const int idx_start =
        FindFileInRange(icmp, files_brief, start, 0,
                        static_cast<uint32_t>(files_brief.num_files - 1));
    assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5157

5158 5159 5160 5161 5162 5163 5164 5165 5166
    // identify the file position for end key
    int idx_end = idx_start;
    if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
      idx_end =
          FindFileInRange(icmp, files_brief, end, idx_start,
                          static_cast<uint32_t>(files_brief.num_files - 1));
    }
    assert(idx_end >= idx_start &&
           static_cast<size_t>(idx_end) < files_brief.num_files);
5167

5168 5169 5170 5171 5172 5173 5174 5175
    // scan all files from the starting index to the ending index
    // (inferred from the sorted order)

    // first scan all the intermediate full files (excluding first and last)
    for (int i = idx_start + 1; i < idx_end; ++i) {
      uint64_t file_size = files_brief.files[i].fd.GetFileSize();
      // The entire file falls into the range, so we can just take its size.
      assert(file_size ==
5176
             ApproximateSize(v, files_brief.files[i], start, end, caller));
5177 5178 5179 5180 5181 5182 5183 5184 5185
      total_full_size += file_size;
    }

    // save the first and the last files (which may be the same file), so we
    // can scan them later.
    first_files.push_back(&files_brief.files[idx_start]);
    if (idx_start != idx_end) {
      // we need to estimate size for both files, only if they are different
      last_files.push_back(&files_brief.files[idx_end]);
J
jorlow@chromium.org 已提交
5186 5187
    }
  }
5188

5189 5190 5191 5192 5193 5194 5195 5196
  // The sum of all file sizes that intersect the [start, end] keys range.
  uint64_t total_intersecting_size = 0;
  for (const auto* file_ptr : first_files) {
    total_intersecting_size += file_ptr->fd.GetFileSize();
  }
  for (const auto* file_ptr : last_files) {
    total_intersecting_size += file_ptr->fd.GetFileSize();
  }
5197

5198 5199 5200 5201 5202 5203 5204 5205 5206 5207 5208 5209
  // Now scan all the first & last files at each level, and estimate their size.
  // If the total_intersecting_size is less than X% of the total_full_size - we
  // want to approximate the result in order to avoid the costly binary search
  // inside ApproximateSize. We use half of file size as an approximation below.

  const double margin = options.files_size_error_margin;
  if (margin > 0 && total_intersecting_size <
                        static_cast<uint64_t>(total_full_size * margin)) {
    total_full_size += total_intersecting_size / 2;
  } else {
    // Estimate for all the first files, at each level
    for (const auto file_ptr : first_files) {
5210
      total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5211 5212 5213 5214
    }

    // Estimate for all the last files, at each level
    for (const auto file_ptr : last_files) {
5215 5216 5217
      // We could use ApproximateSize here, but calling ApproximateOffsetOf
      // directly is just more efficient.
      total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5218
    }
5219
  }
5220 5221

  return total_full_size;
5222 5223
}

5224 5225 5226
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
                                         const Slice& key,
                                         TableReaderCaller caller) {
5227 5228
  // pre-condition
  assert(v);
5229
  const auto& icmp = v->cfd_->internal_comparator();
5230 5231

  uint64_t result = 0;
5232
  if (icmp.Compare(f.largest_key, key) <= 0) {
5233 5234
    // Entire file is before "key", so just add the file size
    result = f.fd.GetFileSize();
5235
  } else if (icmp.Compare(f.smallest_key, key) > 0) {
5236 5237 5238 5239 5240
    // Entire file is after "key", so ignore
    result = 0;
  } else {
    // "key" falls in the range for this table.  Add the
    // approximate offset of "key" within the table.
5241 5242 5243
    TableCache* table_cache = v->cfd_->table_cache();
    if (table_cache != nullptr) {
      result = table_cache->ApproximateOffsetOf(
5244
          key, f.file_metadata->fd, caller, icmp,
5245 5246
          v->GetMutableCFOptions().prefix_extractor.get());
    }
5247
  }
J
jorlow@chromium.org 已提交
5248 5249 5250
  return result;
}

5251 5252 5253 5254 5255 5256 5257 5258 5259 5260 5261 5262 5263 5264 5265 5266 5267 5268 5269 5270 5271 5272 5273 5274 5275 5276 5277 5278 5279 5280 5281 5282 5283 5284 5285 5286 5287
uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
                                     const Slice& start, const Slice& end,
                                     TableReaderCaller caller) {
  // pre-condition
  assert(v);
  const auto& icmp = v->cfd_->internal_comparator();
  assert(icmp.Compare(start, end) <= 0);

  if (icmp.Compare(f.largest_key, start) <= 0 ||
      icmp.Compare(f.smallest_key, end) > 0) {
    // Entire file is before or after the start/end keys range
    return 0;
  }

  if (icmp.Compare(f.smallest_key, start) >= 0) {
    // Start of the range is before the file start - approximate by end offset
    return ApproximateOffsetOf(v, f, end, caller);
  }

  if (icmp.Compare(f.largest_key, end) < 0) {
    // End of the range is after the file end - approximate by subtracting
    // start offset from the file size
    uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
    assert(f.fd.GetFileSize() >= start_offset);
    return f.fd.GetFileSize() - start_offset;
  }

  // The interval falls entirely in the range for this file.
  TableCache* table_cache = v->cfd_->table_cache();
  if (table_cache == nullptr) {
    return 0;
  }
  return table_cache->ApproximateSize(
      start, end, f.file_metadata->fd, caller, icmp,
      v->GetMutableCFOptions().prefix_extractor.get());
}

5288
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
5289 5290
  // pre-calculate space requirement
  int64_t total_files = 0;
I
Igor Canadi 已提交
5291
  for (auto cfd : *column_family_set_) {
5292 5293 5294
    if (!cfd->initialized()) {
      continue;
    }
5295 5296
    Version* dummy_versions = cfd->dummy_versions();
    for (Version* v = dummy_versions->next_; v != dummy_versions;
I
Igor Canadi 已提交
5297
         v = v->next_) {
S
sdong 已提交
5298
      const auto* vstorage = v->storage_info();
5299
      for (int level = 0; level < vstorage->num_levels(); level++) {
S
sdong 已提交
5300
        total_files += vstorage->LevelFiles(level).size();
5301
      }
5302 5303 5304 5305
    }
  }

  // just one time extension to the right size
5306
  live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
5307

I
Igor Canadi 已提交
5308
  for (auto cfd : *column_family_set_) {
5309 5310 5311
    if (!cfd->initialized()) {
      continue;
    }
5312 5313
    auto* current = cfd->current();
    bool found_current = false;
5314 5315
    Version* dummy_versions = cfd->dummy_versions();
    for (Version* v = dummy_versions->next_; v != dummy_versions;
I
Igor Canadi 已提交
5316
         v = v->next_) {
5317 5318 5319
      v->AddLiveFiles(live_list);
      if (v == current) {
        found_current = true;
J
jorlow@chromium.org 已提交
5320 5321
      }
    }
5322 5323 5324 5325 5326
    if (!found_current && current != nullptr) {
      // Should never happen unless it is a bug.
      assert(false);
      current->AddLiveFiles(live_list);
    }
J
jorlow@chromium.org 已提交
5327 5328 5329
  }
}

5330
InternalIterator* VersionSet::MakeInputIterator(
5331
    const Compaction* c, RangeDelAggregator* range_del_agg,
5332
    const FileOptions& file_options_compactions) {
L
Lei Jin 已提交
5333 5334
  auto cfd = c->column_family_data();
  ReadOptions read_options;
5335
  read_options.verify_checksums = true;
L
Lei Jin 已提交
5336
  read_options.fill_cache = false;
5337 5338 5339 5340 5341
  // Compaction iterators shouldn't be confined to a single prefix.
  // Compactions use Seek() for
  // (a) concurrent compactions,
  // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
  read_options.total_order_seek = true;
J
jorlow@chromium.org 已提交
5342 5343 5344 5345

  // Level-0 files have to be merged together.  For other levels,
  // we will make a concatenating iterator per level.
  // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5346 5347 5348
  const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
                                              c->num_input_levels() - 1
                                        : c->num_input_levels());
S
sdong 已提交
5349
  InternalIterator** list = new InternalIterator* [space];
5350 5351
  size_t num = 0;
  for (size_t which = 0; which < c->num_input_levels(); which++) {
F
Feng Zhu 已提交
5352
    if (c->input_levels(which)->num_files != 0) {
5353
      if (c->level(which) == 0) {
5354
        const LevelFilesBrief* flevel = c->input_levels(which);
F
Feng Zhu 已提交
5355
        for (size_t i = 0; i < flevel->num_files; i++) {
L
Lei Jin 已提交
5356
          list[num++] = cfd->table_cache()->NewIterator(
5357 5358
              read_options, file_options_compactions,
              cfd->internal_comparator(),
5359
              *flevel->files[i].file_metadata, range_del_agg,
5360
              c->mutable_cf_options()->prefix_extractor.get(),
5361 5362 5363 5364 5365 5366
              /*table_reader_ptr=*/nullptr,
              /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
              /*arena=*/nullptr,
              /*skip_filters=*/false, /*level=*/static_cast<int>(which),
              /*smallest_compaction_key=*/nullptr,
              /*largest_compaction_key=*/nullptr);
J
jorlow@chromium.org 已提交
5367 5368 5369
        }
      } else {
        // Create concatenating iterator for the files from this level
5370
        list[num++] = new LevelIterator(
5371
            cfd->table_cache(), read_options, file_options_compactions,
5372
            cfd->internal_comparator(), c->input_levels(which),
5373
            c->mutable_cf_options()->prefix_extractor.get(),
5374 5375 5376 5377
            /*should_sample=*/false,
            /*no per level latency histogram=*/nullptr,
            TableReaderCaller::kCompaction, /*skip_filters=*/false,
            /*level=*/static_cast<int>(which), range_del_agg,
5378
            c->boundaries(which));
J
jorlow@chromium.org 已提交
5379 5380 5381 5382
      }
    }
  }
  assert(num <= space);
S
sdong 已提交
5383
  InternalIterator* result =
5384 5385
      NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
                         static_cast<int>(num));
J
jorlow@chromium.org 已提交
5386 5387 5388 5389
  delete[] list;
  return result;
}

A
Abhishek Kona 已提交
5390
// verify that the files listed in this compaction are present
5391 5392
// in the current version
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
5393
#ifndef NDEBUG
I
Igor Canadi 已提交
5394
  Version* version = c->column_family_data()->current();
S
sdong 已提交
5395
  const VersionStorageInfo* vstorage = version->storage_info();
5396
  if (c->input_version() != version) {
5397 5398
    ROCKS_LOG_INFO(
        db_options_->info_log,
5399 5400
        "[%s] compaction output being applied to a different base version from"
        " input version",
I
Igor Canadi 已提交
5401
        c->column_family_data()->GetName().c_str());
S
sdong 已提交
5402 5403 5404

    if (vstorage->compaction_style_ == kCompactionStyleLevel &&
        c->start_level() == 0 && c->num_input_levels() > 2U) {
5405 5406 5407 5408 5409 5410 5411 5412 5413 5414 5415
      // We are doing a L0->base_level compaction. The assumption is if
      // base level is not L1, levels from L1 to base_level - 1 is empty.
      // This is ensured by having one compaction from L0 going on at the
      // same time in level-based compaction. So that during the time, no
      // compaction/flush can put files to those levels.
      for (int l = c->start_level() + 1; l < c->output_level(); l++) {
        if (vstorage->NumLevelFiles(l) != 0) {
          return false;
        }
      }
    }
5416 5417
  }

5418
  for (size_t input = 0; input < c->num_input_levels(); ++input) {
5419
    int level = c->level(input);
5420
    for (size_t i = 0; i < c->num_input_files(input); ++i) {
5421 5422
      uint64_t number = c->input(input, i)->fd.GetNumber();
      bool found = false;
5423
      for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
5424 5425 5426 5427 5428
        FileMetaData* f = vstorage->files_[level][j];
        if (f->fd.GetNumber() == number) {
          found = true;
          break;
        }
5429
      }
5430 5431
      if (!found) {
        return false;  // input files non existent in current version
5432 5433 5434
      }
    }
  }
5435 5436
#else
  (void)c;
5437
#endif
5438 5439 5440
  return true;     // everything good
}

5441
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5442
                                      FileMetaData** meta,
5443 5444
                                      ColumnFamilyData** cfd) {
  for (auto cfd_iter : *column_family_set_) {
5445 5446 5447
    if (!cfd_iter->initialized()) {
      continue;
    }
5448
    Version* version = cfd_iter->current();
S
sdong 已提交
5449
    const auto* vstorage = version->storage_info();
5450
    for (int level = 0; level < vstorage->num_levels(); level++) {
S
sdong 已提交
5451
      for (const auto& file : vstorage->LevelFiles(level)) {
5452
        if (file->fd.GetNumber() == number) {
5453
          *meta = file;
5454
          *filelevel = level;
5455
          *cfd = cfd_iter;
5456 5457
          return Status::OK();
        }
5458 5459 5460 5461 5462 5463
      }
    }
  }
  return Status::NotFound("File not present in any level");
}

5464
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
I
Igor Canadi 已提交
5465
  for (auto cfd : *column_family_set_) {
5466
    if (cfd->IsDropped() || !cfd->initialized()) {
5467 5468
      continue;
    }
I
Igor Canadi 已提交
5469
    for (int level = 0; level < cfd->NumberLevels(); level++) {
S
sdong 已提交
5470 5471
      for (const auto& file :
           cfd->current()->storage_info()->LevelFiles(level)) {
5472
        LiveFileMetaData filemetadata;
5473
        filemetadata.column_family_name = cfd->GetName();
5474
        uint32_t path_id = file->fd.GetPathId();
5475 5476
        if (path_id < cfd->ioptions()->cf_paths.size()) {
          filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5477
        } else {
5478 5479
          assert(!cfd->ioptions()->cf_paths.empty());
          filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5480
        }
5481 5482 5483
        const uint64_t file_number = file->fd.GetNumber();
        filemetadata.name = MakeTableFileName("", file_number);
        filemetadata.file_number = file_number;
5484
        filemetadata.level = level;
5485
        filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
I
Igor Canadi 已提交
5486 5487
        filemetadata.smallestkey = file->smallest.user_key().ToString();
        filemetadata.largestkey = file->largest.user_key().ToString();
5488 5489
        filemetadata.smallest_seqno = file->fd.smallest_seqno;
        filemetadata.largest_seqno = file->fd.largest_seqno;
5490 5491 5492
        filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
            std::memory_order_relaxed);
        filemetadata.being_compacted = file->being_compacted;
5493 5494
        filemetadata.num_entries = file->num_entries;
        filemetadata.num_deletions = file->num_deletions;
5495
        filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5496 5497
        filemetadata.file_checksum = file->file_checksum;
        filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5498 5499
        metadata->push_back(filemetadata);
      }
5500 5501 5502 5503
    }
  }
}

5504
void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5505
                                  std::vector<std::string>* manifest_filenames,
I
Igor Canadi 已提交
5506
                                  uint64_t min_pending_output) {
5507 5508
  assert(manifest_filenames->empty());
  obsolete_manifests_.swap(*manifest_filenames);
5509 5510 5511 5512
  std::vector<ObsoleteFileInfo> pending_files;
  for (auto& f : obsolete_files_) {
    if (f.metadata->fd.GetNumber() < min_pending_output) {
      files->push_back(std::move(f));
I
Igor Canadi 已提交
5513
    } else {
5514
      pending_files.push_back(std::move(f));
I
Igor Canadi 已提交
5515 5516 5517
    }
  }
  obsolete_files_.swap(pending_files);
I
Igor Canadi 已提交
5518 5519
}

5520
ColumnFamilyData* VersionSet::CreateColumnFamily(
5521
    const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
5522 5523
  assert(edit->is_column_family_add_);

5524 5525
  MutableCFOptions dummy_cf_options;
  Version* dummy_versions =
5526
      new Version(nullptr, this, file_options_, dummy_cf_options);
5527 5528 5529
  // Ref() dummy version once so that later we can call Unref() to delete it
  // by avoiding calling "delete" explicitly (~Version is private)
  dummy_versions->Ref();
I
Igor Canadi 已提交
5530
  auto new_cfd = column_family_set_->CreateColumnFamily(
5531 5532
      edit->column_family_name_, edit->column_family_, dummy_versions,
      cf_options);
I
Igor Canadi 已提交
5533

5534
  Version* v = new Version(new_cfd, this, file_options_,
5535 5536
                           *new_cfd->GetLatestMutableCFOptions(),
                           current_version_number_++);
5537

5538 5539 5540
  // Fill level target base information.
  v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
                                        *new_cfd->GetLatestMutableCFOptions());
5541
  AppendVersion(new_cfd, v);
5542 5543
  // GetLatestMutableCFOptions() is safe here without mutex since the
  // cfd is not available to client
A
agiardullo 已提交
5544 5545
  new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
                             LastSequence());
I
Igor Canadi 已提交
5546
  new_cfd->SetLogNumber(edit->log_number_);
5547 5548 5549
  return new_cfd;
}

5550 5551 5552 5553 5554 5555 5556 5557
uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
  uint64_t count = 0;
  for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
    count++;
  }
  return count;
}

5558 5559 5560 5561 5562 5563 5564 5565 5566 5567 5568 5569 5570 5571 5572 5573 5574 5575
uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
  std::unordered_set<uint64_t> unique_files;
  uint64_t total_files_size = 0;
  for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
    VersionStorageInfo* storage_info = v->storage_info();
    for (int level = 0; level < storage_info->num_levels_; level++) {
      for (const auto& file_meta : storage_info->LevelFiles(level)) {
        if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
            unique_files.end()) {
          unique_files.insert(file_meta->fd.packed_number_and_path_id);
          total_files_size += file_meta->fd.GetFileSize();
        }
      }
    }
  }
  return total_files_size;
}

5576 5577
ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
                                       const ImmutableDBOptions* _db_options,
5578
                                       const FileOptions& _file_options,
5579 5580 5581
                                       Cache* table_cache,
                                       WriteBufferManager* write_buffer_manager,
                                       WriteController* write_controller)
5582
    : VersionSet(dbname, _db_options, _file_options, table_cache,
5583
                 write_buffer_manager, write_controller,
5584 5585
                 /*block_cache_tracer=*/nullptr),
      number_of_edits_to_skip_(0) {}
5586 5587 5588 5589 5590 5591 5592 5593 5594 5595 5596 5597 5598 5599 5600 5601 5602 5603 5604 5605 5606 5607 5608 5609 5610 5611 5612 5613 5614 5615 5616 5617 5618 5619 5620 5621 5622 5623 5624 5625 5626 5627 5628 5629 5630

ReactiveVersionSet::~ReactiveVersionSet() {}

Status ReactiveVersionSet::Recover(
    const std::vector<ColumnFamilyDescriptor>& column_families,
    std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
    std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
    std::unique_ptr<Status>* manifest_reader_status) {
  assert(manifest_reader != nullptr);
  assert(manifest_reporter != nullptr);
  assert(manifest_reader_status != nullptr);

  std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
  for (const auto& cf : column_families) {
    cf_name_to_options.insert({cf.name, cf.options});
  }

  // add default column family
  auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
  if (default_cf_iter == cf_name_to_options.end()) {
    return Status::InvalidArgument("Default column family not specified");
  }
  VersionEdit default_cf_edit;
  default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
  default_cf_edit.SetColumnFamily(0);
  ColumnFamilyData* default_cfd =
      CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
  // In recovery, nobody else can access it, so it's fine to set it to be
  // initialized earlier.
  default_cfd->set_initialized();
  std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
      builders;
  std::unordered_map<int, std::string> column_families_not_found;
  builders.insert(
      std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
                            new BaseReferencedVersionBuilder(default_cfd))));

  manifest_reader_status->reset(new Status());
  manifest_reporter->reset(new LogReporter());
  static_cast<LogReporter*>(manifest_reporter->get())->status =
      manifest_reader_status->get();
  Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
  log::Reader* reader = manifest_reader->get();

  int retry = 0;
5631
  VersionEdit version_edit;
5632 5633 5634 5635
  while (s.ok() && retry < 1) {
    assert(reader != nullptr);
    Slice record;
    std::string scratch;
5636 5637
    s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
                       column_families_not_found, builders, &version_edit);
5638
    if (s.ok()) {
5639 5640 5641
      bool enough = version_edit.has_next_file_number_ &&
                    version_edit.has_log_number_ &&
                    version_edit.has_last_sequence_;
5642 5643 5644 5645 5646 5647 5648 5649 5650 5651 5652 5653 5654 5655 5656 5657 5658 5659 5660 5661 5662 5663 5664 5665 5666 5667 5668 5669 5670 5671 5672 5673 5674 5675 5676 5677 5678 5679 5680 5681 5682
      if (enough) {
        for (const auto& cf : column_families) {
          auto cfd = column_family_set_->GetColumnFamily(cf.name);
          if (cfd == nullptr) {
            enough = false;
            break;
          }
        }
      }
      if (enough) {
        for (const auto& cf : column_families) {
          auto cfd = column_family_set_->GetColumnFamily(cf.name);
          assert(cfd != nullptr);
          if (!cfd->IsDropped()) {
            auto builder_iter = builders.find(cfd->GetID());
            assert(builder_iter != builders.end());
            auto builder = builder_iter->second->version_builder();
            assert(builder != nullptr);
            s = builder->LoadTableHandlers(
                cfd->internal_stats(), db_options_->max_file_opening_threads,
                false /* prefetch_index_and_filter_in_cache */,
                true /* is_initial_load */,
                cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
            if (!s.ok()) {
              enough = false;
              if (s.IsPathNotFound()) {
                s = Status::OK();
              }
              break;
            }
          }
        }
      }
      if (enough) {
        break;
      }
    }
    ++retry;
  }

  if (s.ok()) {
5683 5684
    if (!version_edit.has_prev_log_number_) {
      version_edit.prev_log_number_ = 0;
5685
    }
5686
    column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
5687

5688 5689 5690
    MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
    MarkFileNumberUsed(version_edit.prev_log_number_);
    MarkFileNumberUsed(version_edit.log_number_);
5691 5692 5693 5694 5695 5696 5697 5698 5699 5700 5701 5702 5703 5704 5705 5706 5707 5708 5709 5710 5711 5712

    for (auto cfd : *column_family_set_) {
      assert(builders.count(cfd->GetID()) > 0);
      auto builder = builders[cfd->GetID()]->version_builder();
      if (!builder->CheckConsistencyForNumLevels()) {
        s = Status::InvalidArgument(
            "db has more levels than options.num_levels");
        break;
      }
    }
  }

  if (s.ok()) {
    for (auto cfd : *column_family_set_) {
      if (cfd->IsDropped()) {
        continue;
      }
      assert(cfd->initialized());
      auto builders_iter = builders.find(cfd->GetID());
      assert(builders_iter != builders.end());
      auto* builder = builders_iter->second->version_builder();

5713
      Version* v = new Version(cfd, this, file_options_,
5714 5715 5716 5717 5718 5719 5720 5721 5722
                               *cfd->GetLatestMutableCFOptions(),
                               current_version_number_++);
      builder->SaveTo(v->storage_info());

      // Install recovered version
      v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
                      !(db_options_->skip_stats_update_on_db_open));
      AppendVersion(cfd, v);
    }
5723 5724 5725 5726 5727
    next_file_number_.store(version_edit.next_file_number_ + 1);
    last_allocated_sequence_ = version_edit.last_sequence_;
    last_published_sequence_ = version_edit.last_sequence_;
    last_sequence_ = version_edit.last_sequence_;
    prev_log_number_ = version_edit.prev_log_number_;
5728 5729 5730 5731 5732 5733 5734 5735 5736 5737 5738 5739 5740 5741 5742 5743 5744 5745 5746 5747 5748
    for (auto cfd : *column_family_set_) {
      if (cfd->IsDropped()) {
        continue;
      }
      ROCKS_LOG_INFO(db_options_->info_log,
                     "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
                     cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
    }
  }
  return s;
}

Status ReactiveVersionSet::ReadAndApply(
    InstrumentedMutex* mu,
    std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
    std::unordered_set<ColumnFamilyData*>* cfds_changed) {
  assert(manifest_reader != nullptr);
  assert(cfds_changed != nullptr);
  mu->AssertHeld();

  Status s;
5749
  uint64_t applied_edits = 0;
5750 5751 5752 5753 5754 5755 5756 5757 5758 5759 5760
  while (s.ok()) {
    Slice record;
    std::string scratch;
    log::Reader* reader = manifest_reader->get();
    std::string old_manifest_path = reader->file()->file_name();
    while (reader->ReadRecord(&record, &scratch)) {
      VersionEdit edit;
      s = edit.DecodeFrom(record);
      if (!s.ok()) {
        break;
      }
5761

5762
      // Skip the first VersionEdits of each MANIFEST generated by
5763
      // VersionSet::WriteCurrentStatetoManifest.
5764 5765 5766 5767 5768 5769 5770 5771 5772
      if (number_of_edits_to_skip_ > 0) {
        ColumnFamilyData* cfd =
            column_family_set_->GetColumnFamily(edit.column_family_);
        if (cfd != nullptr && !cfd->IsDropped()) {
          --number_of_edits_to_skip_;
        }
        continue;
      }

5773
      s = read_buffer_.AddEdit(&edit);
5774 5775 5776
      if (!s.ok()) {
        break;
      }
5777
      VersionEdit temp_edit;
5778 5779 5780 5781 5782
      if (edit.is_in_atomic_group_) {
        if (read_buffer_.IsFull()) {
          // Apply edits in an atomic group when we have read all edits in the
          // group.
          for (auto& e : read_buffer_.replay_buffer()) {
5783
            s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
5784 5785 5786 5787 5788 5789 5790 5791 5792 5793 5794 5795
            if (!s.ok()) {
              break;
            }
            applied_edits++;
          }
          if (!s.ok()) {
            break;
          }
          read_buffer_.Clear();
        }
      } else {
        // Apply a normal edit immediately.
5796
        s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
5797 5798
        if (s.ok()) {
          applied_edits++;
5799 5800
        }
      }
5801 5802 5803 5804
    }
    if (!s.ok()) {
      // Clear the buffer if we fail to decode/apply an edit.
      read_buffer_.Clear();
5805 5806 5807 5808 5809 5810 5811 5812 5813
    }
    // It's possible that:
    // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
    // 2) we have finished reading the current MANIFEST.
    // 3) we have encountered an IOError reading the current MANIFEST.
    // We need to look for the next MANIFEST and start from there. If we cannot
    // find the next MANIFEST, we should exit the loop.
    s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
    reader = manifest_reader->get();
5814 5815 5816 5817 5818 5819 5820
    if (s.ok()) {
      if (reader->file()->file_name() == old_manifest_path) {
        // Still processing the same MANIFEST, thus no need to continue this
        // loop since no record is available if we have reached here.
        break;
      } else {
        // We have switched to a new MANIFEST whose first records have been
5821 5822 5823 5824 5825 5826 5827 5828
        // generated by VersionSet::WriteCurrentStatetoManifest. Since the
        // secondary instance has already finished recovering upon start, there
        // is no need for the secondary to process these records. Actually, if
        // the secondary were to replay these records, the secondary may end up
        // adding the same SST files AGAIN to each column family, causing
        // consistency checks done by VersionBuilder to fail. Therefore, we
        // record the number of records to skip at the beginning of the new
        // MANIFEST and ignore them.
5829 5830 5831 5832 5833
        number_of_edits_to_skip_ = 0;
        for (auto* cfd : *column_family_set_) {
          if (cfd->IsDropped()) {
            continue;
          }
5834 5835 5836
          // Increase number_of_edits_to_skip by 2 because
          // WriteCurrentStatetoManifest() writes 2 version edits for each
          // column family at the beginning of the newly-generated MANIFEST.
5837
          // TODO(yanqin) remove hard-coded value.
5838 5839 5840 5841 5842
          if (db_options_->write_dbid_to_manifest) {
            number_of_edits_to_skip_ += 3;
          } else {
            number_of_edits_to_skip_ += 2;
          }
5843 5844
        }
      }
5845 5846 5847 5848 5849 5850 5851 5852 5853 5854 5855 5856 5857 5858 5859 5860 5861
    }
  }

  if (s.ok()) {
    for (auto cfd : *column_family_set_) {
      auto builder_iter = active_version_builders_.find(cfd->GetID());
      if (builder_iter == active_version_builders_.end()) {
        continue;
      }
      auto builder = builder_iter->second->version_builder();
      if (!builder->CheckConsistencyForNumLevels()) {
        s = Status::InvalidArgument(
            "db has more levels than options.num_levels");
        break;
      }
    }
  }
5862 5863
  TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
                           &applied_edits);
5864 5865 5866 5867
  return s;
}

Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
5868
    VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
5869
    VersionEdit* version_edit) {
5870 5871 5872 5873 5874 5875 5876 5877 5878 5879 5880
  ColumnFamilyData* cfd =
      column_family_set_->GetColumnFamily(edit.column_family_);

  // If we cannot find this column family in our column family set, then it
  // may be a new column family created by the primary after the secondary
  // starts. It is also possible that the secondary instance opens only a subset
  // of column families. Ignore it for now.
  if (nullptr == cfd) {
    return Status::OK();
  }
  if (active_version_builders_.find(edit.column_family_) ==
5881 5882
          active_version_builders_.end() &&
      !cfd->IsDropped()) {
5883 5884 5885 5886 5887 5888 5889 5890 5891 5892 5893
    std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
        new BaseReferencedVersionBuilder(cfd));
    active_version_builders_.insert(
        std::make_pair(edit.column_family_, std::move(builder_guard)));
  }

  auto builder_iter = active_version_builders_.find(edit.column_family_);
  assert(builder_iter != active_version_builders_.end());
  auto builder = builder_iter->second->version_builder();
  assert(builder != nullptr);

5894 5895 5896 5897 5898 5899 5900 5901
  if (edit.is_column_family_add_) {
    // TODO (yanqin) for now the secondary ignores column families created
    // after Open. This also simplifies handling of switching to a new MANIFEST
    // and processing the snapshot of the system at the beginning of the
    // MANIFEST.
  } else if (edit.is_column_family_drop_) {
    // Drop the column family by setting it to be 'dropped' without destroying
    // the column family handle.
5902 5903 5904
    // TODO (haoyu) figure out how to handle column faimly drop for
    // secondary instance. (Is it possible that the ref count for cfd is 0 but
    // the ref count for its versions is higher than 0?)
5905
    cfd->SetDropped();
5906
    if (cfd->UnrefAndTryDelete()) {
5907 5908
      cfd = nullptr;
    }
5909
    active_version_builders_.erase(builder_iter);
5910
  } else {
5911 5912 5913 5914
    Status s = builder->Apply(&edit);
    if (!s.ok()) {
      return s;
    }
5915
  }
5916
  Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
5917 5918 5919 5920
  if (!s.ok()) {
    return s;
  }

5921
  if (cfd != nullptr && !cfd->IsDropped()) {
5922 5923 5924 5925 5926 5927 5928 5929 5930 5931 5932
    s = builder->LoadTableHandlers(
        cfd->internal_stats(), db_options_->max_file_opening_threads,
        false /* prefetch_index_and_filter_in_cache */,
        false /* is_initial_load */,
        cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
    TEST_SYNC_POINT_CALLBACK(
        "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
        "AfterLoadTableHandlers",
        &s);

    if (s.ok()) {
5933
      auto version = new Version(cfd, this, file_options_,
5934 5935 5936 5937 5938 5939 5940 5941 5942 5943 5944 5945 5946 5947 5948
                                 *cfd->GetLatestMutableCFOptions(),
                                 current_version_number_++);
      builder->SaveTo(version->storage_info());
      version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
      AppendVersion(cfd, version);
      active_version_builders_.erase(builder_iter);
      if (cfds_changed->count(cfd) == 0) {
        cfds_changed->insert(cfd);
      }
    } else if (s.IsPathNotFound()) {
      s = Status::OK();
    }
    // Some other error has occurred during LoadTableHandlers.
  }

L
Levi Tamasi 已提交
5949
  if (version_edit->HasNextFile()) {
5950
    next_file_number_.store(version_edit->next_file_number_ + 1);
5951
  }
5952 5953 5954 5955
  if (version_edit->has_last_sequence_) {
    last_allocated_sequence_ = version_edit->last_sequence_;
    last_published_sequence_ = version_edit->last_sequence_;
    last_sequence_ = version_edit->last_sequence_;
5956
  }
5957 5958 5959
  if (version_edit->has_prev_log_number_) {
    prev_log_number_ = version_edit->prev_log_number_;
    MarkFileNumberUsed(version_edit->prev_log_number_);
5960
  }
5961 5962
  if (version_edit->has_log_number_) {
    MarkFileNumberUsed(version_edit->log_number_);
5963
  }
5964 5965
  column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
  MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
5966
  return s;
5967 5968 5969 5970 5971 5972 5973 5974 5975
}

Status ReactiveVersionSet::MaybeSwitchManifest(
    log::Reader::Reporter* reporter,
    std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
  assert(manifest_reader != nullptr);
  Status s;
  do {
    std::string manifest_path;
5976
    s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
5977
                               &manifest_file_number_);
5978
    std::unique_ptr<FSSequentialFile> manifest_file;
5979 5980 5981 5982 5983 5984 5985 5986 5987
    if (s.ok()) {
      if (nullptr == manifest_reader->get() ||
          manifest_reader->get()->file()->file_name() != manifest_path) {
        TEST_SYNC_POINT(
            "ReactiveVersionSet::MaybeSwitchManifest:"
            "AfterGetCurrentManifestPath:0");
        TEST_SYNC_POINT(
            "ReactiveVersionSet::MaybeSwitchManifest:"
            "AfterGetCurrentManifestPath:1");
5988 5989 5990
        s = fs_->NewSequentialFile(manifest_path,
                                   env_->OptimizeForManifestRead(file_options_),
                                   &manifest_file, nullptr);
5991 5992 5993 5994 5995 5996 5997 5998
      } else {
        // No need to switch manifest.
        break;
      }
    }
    std::unique_ptr<SequentialFileReader> manifest_file_reader;
    if (s.ok()) {
      manifest_file_reader.reset(
5999 6000
          new SequentialFileReader(std::move(manifest_file), manifest_path,
                                   db_options_->log_readahead_size));
6001 6002 6003 6004 6005 6006 6007 6008
      manifest_reader->reset(new log::FragmentBufferedReader(
          nullptr, std::move(manifest_file_reader), reporter,
          true /* checksum */, 0 /* log_number */));
      ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
                     manifest_path.c_str());
      // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
      // active_version_builders_ map because we choose to construct the
      // versions from scratch, thanks to the first part of each MANIFEST
6009 6010
      // written by VersionSet::WriteCurrentStatetoManifest. This is not
      // necessary, but we choose this at present for the sake of simplicity.
6011 6012 6013 6014 6015 6016
      active_version_builders_.clear();
    }
  } while (s.IsPathNotFound());
  return s;
}

6017
}  // namespace ROCKSDB_NAMESPACE