db_impl.cc 157.3 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_impl.h"

L
liuhuahang 已提交
12
#ifndef __STDC_FORMAT_MACROS
I
Igor Canadi 已提交
13
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
14 15
#endif

I
Igor Canadi 已提交
16
#include <inttypes.h>
J
jorlow@chromium.org 已提交
17
#include <algorithm>
18 19
#include <climits>
#include <cstdio>
J
jorlow@chromium.org 已提交
20
#include <set>
21
#include <stdexcept>
22 23
#include <stdint.h>
#include <string>
24
#include <unordered_set>
25
#include <unordered_map>
T
Tomislav Novak 已提交
26
#include <utility>
27
#include <vector>
28

J
jorlow@chromium.org 已提交
29
#include "db/builder.h"
I
Igor Canadi 已提交
30
#include "db/flush_job.h"
31
#include "db/db_iter.h"
K
kailiu 已提交
32
#include "db/dbformat.h"
J
jorlow@chromium.org 已提交
33 34 35 36
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
K
kailiu 已提交
37
#include "db/memtable_list.h"
38
#include "db/merge_context.h"
39
#include "db/merge_helper.h"
J
jorlow@chromium.org 已提交
40
#include "db/table_cache.h"
K
kailiu 已提交
41
#include "db/table_properties_collector.h"
L
Lei Jin 已提交
42
#include "db/forward_iterator.h"
43
#include "db/transaction_log_impl.h"
J
jorlow@chromium.org 已提交
44 45
#include "db/version_set.h"
#include "db/write_batch_internal.h"
46
#include "port/port.h"
I
Igor Canadi 已提交
47
#include "rocksdb/cache.h"
48
#include "port/likely.h"
49 50 51 52
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
I
Igor Canadi 已提交
53
#include "rocksdb/version.h"
54 55
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
S
Siying Dong 已提交
56
#include "rocksdb/table.h"
J
jorlow@chromium.org 已提交
57
#include "table/block.h"
58
#include "table/block_based_table_factory.h"
J
jorlow@chromium.org 已提交
59
#include "table/merger.h"
K
kailiu 已提交
60
#include "table/table_builder.h"
J
jorlow@chromium.org 已提交
61
#include "table/two_level_iterator.h"
62
#include "util/auto_roll_logger.h"
K
kailiu 已提交
63
#include "util/autovector.h"
64
#include "util/build_version.h"
J
jorlow@chromium.org 已提交
65
#include "util/coding.h"
I
Igor Canadi 已提交
66
#include "util/hash_skiplist_rep.h"
67
#include "util/hash_linklist_rep.h"
J
jorlow@chromium.org 已提交
68
#include "util/logging.h"
H
Haobo Xu 已提交
69
#include "util/log_buffer.h"
J
jorlow@chromium.org 已提交
70
#include "util/mutexlock.h"
71
#include "util/perf_context_imp.h"
72
#include "util/iostats_context_imp.h"
73
#include "util/stop_watch.h"
74
#include "util/sync_point.h"
J
jorlow@chromium.org 已提交
75

76
namespace rocksdb {
J
jorlow@chromium.org 已提交
77

78
const std::string kDefaultColumnFamilyName("default");
79

I
Igor Canadi 已提交
80
void DumpRocksDBBuildVersion(Logger * log);
81

S
Stanislau Hlebik 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95
struct DBImpl::WriteContext {
  autovector<SuperVersion*> superversions_to_free_;
  autovector<log::Writer*> logs_to_free_;

  ~WriteContext() {
    for (auto& sv : superversions_to_free_) {
      delete sv;
    }
    for (auto& log : logs_to_free_) {
      delete log;
    }
  }
};

J
jorlow@chromium.org 已提交
96 97 98
struct DBImpl::CompactionState {
  Compaction* const compaction;

99 100 101 102 103
  // If there were two snapshots with seq numbers s1 and
  // s2 and s1 < s2, and if we find two instances of a key k1 then lies
  // entirely within s1 and s2, then the earlier version of k1 can be safely
  // deleted because that version is not visible in any snapshot.
  std::vector<SequenceNumber> existing_snapshots;
J
jorlow@chromium.org 已提交
104 105 106 107

  // Files produced by compaction
  struct Output {
    uint64_t number;
108
    uint32_t path_id;
J
jorlow@chromium.org 已提交
109 110
    uint64_t file_size;
    InternalKey smallest, largest;
111
    SequenceNumber smallest_seqno, largest_seqno;
J
jorlow@chromium.org 已提交
112 113
  };
  std::vector<Output> outputs;
114
  std::list<uint64_t> allocated_file_numbers;
J
jorlow@chromium.org 已提交
115 116

  // State kept for output being generated
117 118
  unique_ptr<WritableFile> outfile;
  unique_ptr<TableBuilder> builder;
J
jorlow@chromium.org 已提交
119 120 121 122 123 124 125 126 127

  uint64_t total_bytes;

  Output* current_output() { return &outputs[outputs.size()-1]; }

  explicit CompactionState(Compaction* c)
      : compaction(c),
        total_bytes(0) {
  }
128

129 130 131 132 133 134 135 136
  // Create a client visible context of this compaction
  CompactionFilter::Context GetFilterContextV1() {
    CompactionFilter::Context context;
    context.is_full_compaction = compaction->IsFullCompaction();
    context.is_manual_compaction = compaction->IsManualCompaction();
    return context;
  }

137
  // Create a client visible context of this compaction
D
Danny Guo 已提交
138 139
  CompactionFilterContext GetFilterContext() {
    CompactionFilterContext context;
140
    context.is_full_compaction = compaction->IsFullCompaction();
141
    context.is_manual_compaction = compaction->IsManualCompaction();
142 143
    return context;
  }
D
Danny Guo 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187

  std::vector<std::string> key_str_buf_;
  std::vector<std::string> existing_value_str_buf_;
  // new_value_buf_ will only be appended if a value changes
  std::vector<std::string> new_value_buf_;
  // if values_changed_buf_[i] is true
  // new_value_buf_ will add a new entry with the changed value
  std::vector<bool> value_changed_buf_;
  // to_delete_buf_[i] is true iff key_buf_[i] is deleted
  std::vector<bool> to_delete_buf_;

  std::vector<std::string> other_key_str_buf_;
  std::vector<std::string> other_value_str_buf_;

  std::vector<Slice> combined_key_buf_;
  std::vector<Slice> combined_value_buf_;

  std::string cur_prefix_;

  // Buffers the kv-pair that will be run through compaction filter V2
  // in the future.
  void BufferKeyValueSlices(const Slice& key, const Slice& value) {
    key_str_buf_.emplace_back(key.ToString());
    existing_value_str_buf_.emplace_back(value.ToString());
  }

  // Buffers the kv-pair that will not be run through compaction filter V2
  // in the future.
  void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
    other_key_str_buf_.emplace_back(key.ToString());
    other_value_str_buf_.emplace_back(value.ToString());
  }

  // Add a kv-pair to the combined buffer
  void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
    // The real strings are stored in the batch buffers
    combined_key_buf_.emplace_back(key);
    combined_value_buf_.emplace_back(value);
  }

  // Merging the two buffers
  void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
    size_t i = 0;
    size_t j = 0;
188
    size_t total_size = key_str_buf_.size() + other_key_str_buf_.size();
D
Danny Guo 已提交
189 190 191 192 193
    combined_key_buf_.reserve(total_size);
    combined_value_buf_.reserve(total_size);

    while (i + j < total_size) {
      int comp_res = 0;
194 195 196
      if (i < key_str_buf_.size() && j < other_key_str_buf_.size()) {
        comp_res = comparator->Compare(key_str_buf_[i], other_key_str_buf_[j]);
      } else if (i >= key_str_buf_.size() && j < other_key_str_buf_.size()) {
D
Danny Guo 已提交
197
        comp_res = 1;
198
      } else if (j >= other_key_str_buf_.size() && i < key_str_buf_.size()) {
D
Danny Guo 已提交
199 200 201
        comp_res = -1;
      }
      if (comp_res > 0) {
202
        AddToCombinedKeyValueSlices(other_key_str_buf_[j], other_value_str_buf_[j]);
D
Danny Guo 已提交
203 204
        j++;
      } else if (comp_res < 0) {
205
        AddToCombinedKeyValueSlices(key_str_buf_[i], existing_value_str_buf_[i]);
D
Danny Guo 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
        i++;
      }
    }
  }

  void CleanupBatchBuffer() {
    to_delete_buf_.clear();
    key_str_buf_.clear();
    existing_value_str_buf_.clear();
    new_value_buf_.clear();
    value_changed_buf_.clear();

    to_delete_buf_.shrink_to_fit();
    key_str_buf_.shrink_to_fit();
    existing_value_str_buf_.shrink_to_fit();
    new_value_buf_.shrink_to_fit();
    value_changed_buf_.shrink_to_fit();

    other_key_str_buf_.clear();
    other_value_str_buf_.clear();
    other_key_str_buf_.shrink_to_fit();
    other_value_str_buf_.shrink_to_fit();
  }

  void CleanupMergedBuffer() {
    combined_key_buf_.clear();
    combined_value_buf_.clear();
    combined_key_buf_.shrink_to_fit();
    combined_value_buf_.shrink_to_fit();
  }
J
jorlow@chromium.org 已提交
236 237 238 239 240
};

Options SanitizeOptions(const std::string& dbname,
                        const InternalKeyComparator* icmp,
                        const Options& src) {
241
  auto db_options = SanitizeOptions(dbname, DBOptions(src));
242
  auto cf_options = SanitizeOptions(icmp, ColumnFamilyOptions(src));
243 244 245 246 247
  return Options(db_options, cf_options);
}

DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
  DBOptions result = src;
L
Lei Jin 已提交
248

249 250
  // result.max_open_files means an "infinite" open files.
  if (result.max_open_files != -1) {
251
    ClipToRange(&result.max_open_files, 20, 1000000);
252
  }
253

254
  if (result.info_log == nullptr) {
K
Kai Liu 已提交
255 256
    Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
                                       result, &result.info_log);
J
jorlow@chromium.org 已提交
257 258
    if (!s.ok()) {
      // No place suitable for logging
259
      result.info_log = nullptr;
J
jorlow@chromium.org 已提交
260 261
    }
  }
262

263 264 265 266 267 268
  if (!result.rate_limiter) {
    if (result.bytes_per_sync == 0) {
      result.bytes_per_sync = 1024 * 1024;
    }
  }

269 270 271 272
  if (result.wal_dir.empty()) {
    // Use dbname as default
    result.wal_dir = dbname;
  }
273
  if (result.wal_dir.back() == '/') {
I
Igor Canadi 已提交
274
    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
275
  }
276

277
  if (result.db_paths.size() == 0) {
278
    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
279 280
  }

J
jorlow@chromium.org 已提交
281 282 283
  return result;
}

284 285
namespace {

286 287
Status SanitizeOptionsByTable(
    const DBOptions& db_opts,
288 289 290
    const std::vector<ColumnFamilyDescriptor>& column_families) {
  Status s;
  for (auto cf : column_families) {
291
    s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
292 293 294 295 296 297 298
    if (!s.ok()) {
      return s;
    }
  }
  return Status::OK();
}

299
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions) {
300 301 302 303 304 305
  // Compressing memtable flushes might not help unless the sequential load
  // optimization is used for leveled compaction. Otherwise the CPU and
  // latency overhead is not offset by saving much space.

  bool can_compress;

306
  if (ioptions.compaction_style == kCompactionStyleUniversal) {
307
    can_compress =
308
        (ioptions.compaction_options_universal.compression_size_percent < 0);
309 310
  } else {
    // For leveled compress when min_level_to_compress == 0.
311 312
    can_compress = ioptions.compression_per_level.empty() ||
                   ioptions.compression_per_level[0] != kNoCompression;
313 314 315
  }

  if (can_compress) {
316
    return ioptions.compression;
317 318 319 320
  } else {
    return kNoCompression;
  }
}
321
}  // namespace
322

I
Igor Canadi 已提交
323
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
J
jorlow@chromium.org 已提交
324
    : env_(options.env),
H
heyongqiang 已提交
325
      dbname_(dbname),
326 327
      db_options_(SanitizeOptions(dbname, options)),
      stats_(db_options_.statistics.get()),
328
      db_lock_(nullptr),
H
Haobo Xu 已提交
329
      mutex_(options.use_adaptive_mutex),
I
Igor Canadi 已提交
330
      shutting_down_(false),
J
jorlow@chromium.org 已提交
331
      bg_cv_(&mutex_),
332
      logfile_number_(0),
I
Igor Canadi 已提交
333
      log_empty_(true),
334
      default_cf_handle_(nullptr),
I
Igor Canadi 已提交
335 336
      total_log_size_(0),
      max_total_in_memory_state_(0),
337
      tmp_batch_(),
338
      bg_schedule_needed_(false),
339
      bg_compaction_scheduled_(0),
340
      bg_manual_only_(0),
341
      bg_flush_scheduled_(0),
342
      manual_compaction_(nullptr),
343
      disable_delete_obsolete_files_(0),
I
Igor Canadi 已提交
344
      delete_obsolete_files_last_run_(options.env->NowMicros()),
345
      last_stats_dump_time_microsec_(0),
346
      flush_on_destroy_(false),
L
Lei Jin 已提交
347
      env_options_(options),
I
Igor Canadi 已提交
348 349 350
#ifndef ROCKSDB_LITE
      wal_manager_(db_options_, env_options_),
#endif  // ROCKSDB_LITE
351
      bg_work_gate_closed_(false),
352 353
      refitting_level_(false),
      opened_successfully_(false) {
H
heyongqiang 已提交
354
  env_->GetAbsolutePath(dbname, &db_absolute_path_);
355

J
jorlow@chromium.org 已提交
356
  // Reserve ten files or so for other uses and give the rest to TableCache.
357
  // Give a large number for setting of "infinite" open files.
358 359
  const int table_cache_size = (db_options_.max_open_files == -1) ?
        4194304 : db_options_.max_open_files - 10;
360 361
  // Reserve ten files or so for other uses and give the rest to TableCache.
  table_cache_ =
362 363
      NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits,
                  db_options_.table_cache_remove_scan_count_limit);
364

365 366
  versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
                                 table_cache_.get(), &write_controller_));
I
Igor Canadi 已提交
367 368
  column_family_memtables_.reset(new ColumnFamilyMemTablesImpl(
      versions_->GetColumnFamilySet(), &flush_scheduler_));
369

I
Igor Canadi 已提交
370
  DumpRocksDBBuildVersion(db_options_.info_log.get());
371 372
  DumpDBFileSummary(db_options_, dbname_);
  db_options_.Dump(db_options_.info_log.get());
373

374
  LogFlush(db_options_.info_log);
J
jorlow@chromium.org 已提交
375 376 377
}

DBImpl::~DBImpl() {
378 379 380
  mutex_.Lock();
  if (flush_on_destroy_) {
    for (auto cfd : *versions_->GetColumnFamilySet()) {
381
      if (!cfd->mem()->IsEmpty()) {
382 383 384 385
        cfd->Ref();
        mutex_.Unlock();
        FlushMemTable(cfd, FlushOptions());
        mutex_.Lock();
386
        cfd->Unref();
387 388
      }
    }
389
    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
390
  }
I
Igor Canadi 已提交
391 392

  // Wait for background work to finish
I
Igor Canadi 已提交
393
  shutting_down_.store(true, std::memory_order_release);
I
Igor Canadi 已提交
394
  while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
H
hans@chromium.org 已提交
395
    bg_cv_.Wait();
J
jorlow@chromium.org 已提交
396
  }
397

I
Igor Canadi 已提交
398 399
  flush_scheduler_.Clear();

I
Igor Canadi 已提交
400 401 402 403 404
  if (default_cf_handle_ != nullptr) {
    // we need to delete handle outside of lock because it does its own locking
    mutex_.Unlock();
    delete default_cf_handle_;
    mutex_.Lock();
405 406
  }

I
Igor Canadi 已提交
407 408 409 410 411 412 413 414 415 416
  // Clean up obsolete files due to SuperVersion release.
  // (1) Need to delete to obsolete files before closing because RepairDB()
  // scans all existing files in the file system and builds manifest file.
  // Keeping obsolete files confuses the repair process.
  // (2) Need to check if we Open()/Recover() the DB successfully before
  // deleting because if VersionSet recover fails (may be due to corrupted
  // manifest file), it is not able to identify live files correctly. As a
  // result, all "live" files can get deleted by accident. However, corrupted
  // manifest is recoverable by RepairDB().
  if (opened_successfully_) {
I
Igor Canadi 已提交
417 418
    JobContext job_context;
    FindObsoleteFiles(&job_context, true);
I
Igor Canadi 已提交
419
    // manifest number starting from 2
I
Igor Canadi 已提交
420 421 422
    job_context.manifest_file_number = 1;
    if (job_context.HaveSomethingToDelete()) {
      PurgeObsoleteFiles(job_context);
423 424 425
    }
  }

426
  // versions need to be destroyed before table_cache since it can hold
427 428
  // references to table_cache.
  versions_.reset();
429
  mutex_.Unlock();
I
Igor Canadi 已提交
430 431 432
  if (db_lock_ != nullptr) {
    env_->UnlockFile(db_lock_);
  }
433

434
  LogFlush(db_options_.info_log);
J
jorlow@chromium.org 已提交
435 436 437
}

Status DBImpl::NewDB() {
438
  VersionEdit new_db;
439
  new_db.SetLogNumber(0);
J
jorlow@chromium.org 已提交
440 441 442
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);

443
  Log(db_options_.info_log, "Creating manifest 1 \n");
J
jorlow@chromium.org 已提交
444
  const std::string manifest = DescriptorFileName(dbname_, 1);
445
  unique_ptr<WritableFile> file;
I
Igor Canadi 已提交
446
  Status s = env_->NewWritableFile(
L
Lei Jin 已提交
447
      manifest, &file, env_->OptimizeForManifestWrite(env_options_));
J
jorlow@chromium.org 已提交
448 449 450
  if (!s.ok()) {
    return s;
  }
451
  file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size);
J
jorlow@chromium.org 已提交
452
  {
453
    log::Writer log(std::move(file));
J
jorlow@chromium.org 已提交
454 455 456 457 458 459
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
  }
  if (s.ok()) {
    // Make "CURRENT" file that points to the new manifest file.
460
    s = SetCurrentFile(env_, dbname_, 1, db_directory_.get());
J
jorlow@chromium.org 已提交
461 462 463 464 465 466 467
  } else {
    env_->DeleteFile(manifest);
  }
  return s;
}

void DBImpl::MaybeIgnoreError(Status* s) const {
468
  if (s->ok() || db_options_.paranoid_checks) {
J
jorlow@chromium.org 已提交
469 470
    // No change needed
  } else {
471
    Log(db_options_.info_log, "Ignoring error %s", s->ToString().c_str());
J
jorlow@chromium.org 已提交
472 473 474 475
    *s = Status::OK();
  }
}

476
const Status DBImpl::CreateArchivalDirectory() {
477 478
  if (db_options_.WAL_ttl_seconds > 0 || db_options_.WAL_size_limit_MB > 0) {
    std::string archivalPath = ArchivalDirectory(db_options_.wal_dir);
479 480 481 482 483
    return env_->CreateDirIfMissing(archivalPath);
  }
  return Status::OK();
}

484
void DBImpl::PrintStatistics() {
485
  auto dbstats = db_options_.statistics.get();
486
  if (dbstats) {
487
    Log(db_options_.info_log,
488 489
        "STATISTCS:\n %s",
        dbstats->ToString().c_str());
490 491 492
  }
}

493
void DBImpl::MaybeDumpStats() {
494
  if (db_options_.stats_dump_period_sec == 0) return;
H
Haobo Xu 已提交
495 496 497 498

  const uint64_t now_micros = env_->NowMicros();

  if (last_stats_dump_time_microsec_ +
499
      db_options_.stats_dump_period_sec * 1000000
H
Haobo Xu 已提交
500 501 502 503 504 505
      <= now_micros) {
    // Multiple threads could race in here simultaneously.
    // However, the last one will update last_stats_dump_time_microsec_
    // atomically. We could see more than one dump during one dump
    // period in rare cases.
    last_stats_dump_time_microsec_ = now_micros;
506

507 508 509 510 511 512
    bool tmp1 = false;
    bool tmp2 = false;
    DBPropertyType cf_property_type =
        GetPropertyType("rocksdb.cfstats", &tmp1, &tmp2);
    DBPropertyType db_property_type =
        GetPropertyType("rocksdb.dbstats", &tmp1, &tmp2);
H
Haobo Xu 已提交
513
    std::string stats;
514 515 516
    {
      MutexLock l(&mutex_);
      for (auto cfd : *versions_->GetColumnFamilySet()) {
517 518
        cfd->internal_stats()->GetStringProperty(cf_property_type,
                                                 "rocksdb.cfstats", &stats);
519
      }
520 521
      default_cf_internal_stats_->GetStringProperty(db_property_type,
                                                    "rocksdb.dbstats", &stats);
522
    }
523 524
    Log(db_options_.info_log, "------- DUMPING STATS -------");
    Log(db_options_.info_log, "%s", stats.c_str());
525

526
    PrintStatistics();
527 528 529
  }
}

530
// Returns the list of live files in 'sst_live' and the list
K
kailiu 已提交
531
// of all files in the filesystem in 'candidate_files'.
I
Igor Canadi 已提交
532 533
// no_full_scan = true -- never do the full scan using GetChildren()
// force = false -- don't force the full scan, except every
534
//  db_options_.delete_obsolete_files_period_micros
I
Igor Canadi 已提交
535
// force = true -- force the full scan
I
Igor Canadi 已提交
536
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
I
Igor Canadi 已提交
537
                               bool no_full_scan) {
D
Dhruba Borthakur 已提交
538 539
  mutex_.AssertHeld();

540
  // if deletion is disabled, do nothing
541
  if (disable_delete_obsolete_files_ > 0) {
542 543 544
    return;
  }

545 546 547 548 549
  bool doing_the_full_scan = false;

  // logic for figurint out if we're doing the full scan
  if (no_full_scan) {
    doing_the_full_scan = false;
550
  } else if (force || db_options_.delete_obsolete_files_period_micros == 0) {
551 552 553 554
    doing_the_full_scan = true;
  } else {
    const uint64_t now_micros = env_->NowMicros();
    if (delete_obsolete_files_last_run_ +
555
        db_options_.delete_obsolete_files_period_micros < now_micros) {
556 557 558 559 560
      doing_the_full_scan = true;
      delete_obsolete_files_last_run_ = now_micros;
    }
  }

I
Igor Canadi 已提交
561
  // get obsolete files
I
Igor Canadi 已提交
562
  versions_->GetObsoleteFiles(&job_context->sst_delete_files);
I
Igor Canadi 已提交
563

I
Igor Canadi 已提交
564
  // store the current filenum, lognum, etc
I
Igor Canadi 已提交
565 566
  job_context->manifest_file_number = versions_->ManifestFileNumber();
  job_context->pending_manifest_file_number =
567
      versions_->PendingManifestFileNumber();
I
Igor Canadi 已提交
568 569
  job_context->log_number = versions_->MinLogNumber();
  job_context->prev_log_number = versions_->PrevLogNumber();
I
Igor Canadi 已提交
570

I
Igor Canadi 已提交
571
  if (!doing_the_full_scan && !job_context->HaveSomethingToDelete()) {
572 573 574 575 576 577 578
    // avoid filling up sst_live if we're sure that we
    // are not going to do the full scan and that we don't have
    // anything to delete at the moment
    return;
  }

  // don't delete live files
579
  for (auto pair : pending_outputs_) {
I
Igor Canadi 已提交
580
    job_context->sst_live.emplace_back(pair.first, pair.second, 0);
581
  }
I
Igor Canadi 已提交
582
  versions_->AddLiveFiles(&job_context->sst_live);
I
Igor Canadi 已提交
583

584
  if (doing_the_full_scan) {
585 586
    for (uint32_t path_id = 0;
         path_id < db_options_.db_paths.size(); path_id++) {
587 588 589
      // set of all files in the directory. We'll exclude files that are still
      // alive in the subsequent processings.
      std::vector<std::string> files;
590
      env_->GetChildren(db_options_.db_paths[path_id].path,
591
                        &files);  // Ignore errors
592
      for (std::string file : files) {
I
Igor Canadi 已提交
593
        // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
I
Igor Canadi 已提交
594
        job_context->candidate_files.emplace_back("/" + file, path_id);
595 596
      }
    }
597 598

    //Add log files in wal_dir
599
    if (db_options_.wal_dir != dbname_) {
600
      std::vector<std::string> log_files;
601
      env_->GetChildren(db_options_.wal_dir, &log_files);  // Ignore errors
602
      for (std::string log_file : log_files) {
I
Igor Canadi 已提交
603
        job_context->candidate_files.emplace_back(log_file, 0);
604 605
      }
    }
606
    // Add info log files in db_log_dir
607
    if (!db_options_.db_log_dir.empty() && db_options_.db_log_dir != dbname_) {
608
      std::vector<std::string> info_log_files;
609 610
      // Ignore errors
      env_->GetChildren(db_options_.db_log_dir, &info_log_files);
611
      for (std::string log_file : info_log_files) {
I
Igor Canadi 已提交
612
        job_context->candidate_files.emplace_back(log_file, 0);
613 614
      }
    }
615
  }
616 617
}

618
namespace {
I
Igor Canadi 已提交
619 620
bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
                          const JobContext::CandidateFileInfo& second) {
621 622 623 624 625
  if (first.file_name > second.file_name) {
    return true;
  } else if (first.file_name < second.file_name) {
    return false;
  } else {
626
    return (first.path_id > second.path_id);
627 628 629 630
  }
}
};  // namespace

D
Dhruba Borthakur 已提交
631
// Diffs the files listed in filenames and those that do not
I
Igor Canadi 已提交
632
// belong to live files are posibly removed. Also, removes all the
633
// files in sst_delete_files and log_delete_files.
634
// It is not necessary to hold the mutex when invoking this method.
I
Igor Canadi 已提交
635
void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
636 637
  // we'd better have sth to delete
  assert(state.HaveSomethingToDelete());
638

I
Igor Canadi 已提交
639 640 641 642
  // this checks if FindObsoleteFiles() was run before. If not, don't do
  // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
  // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
  if (state.manifest_file_number == 0) {
I
Igor Canadi 已提交
643 644
    return;
  }
645

646
  // Now, convert live list to an unordered map, WITHOUT mutex held;
647
  // set is slow.
648
  std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
I
Igor Canadi 已提交
649
  for (const FileDescriptor& fd : state.sst_live) {
650 651
    sst_live_map[fd.GetNumber()] = &fd;
  }
I
Igor Canadi 已提交
652

I
Igor Canadi 已提交
653 654 655 656
  auto candidate_files = state.candidate_files;
  candidate_files.reserve(candidate_files.size() +
                          state.sst_delete_files.size() +
                          state.log_delete_files.size());
K
kailiu 已提交
657 658
  // We may ignore the dbname when generating the file names.
  const char* kDumbDbName = "";
659
  for (auto file : state.sst_delete_files) {
660 661 662
    candidate_files.emplace_back(
        MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
        file->fd.GetPathId());
663
    delete file;
I
Igor Canadi 已提交
664 665
  }

K
kailiu 已提交
666 667
  for (auto file_num : state.log_delete_files) {
    if (file_num > 0) {
668 669
      candidate_files.emplace_back(LogFileName(kDumbDbName, file_num).substr(1),
                                   0);
I
Igor Canadi 已提交
670 671
    }
  }
672

K
kailiu 已提交
673
  // dedup state.candidate_files so we don't try to delete the same
I
Igor Canadi 已提交
674
  // file twice
675
  sort(candidate_files.begin(), candidate_files.end(), CompareCandidateFile);
676 677
  candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()),
                        candidate_files.end());
J
jorlow@chromium.org 已提交
678

679
  std::vector<std::string> old_info_log_files;
680
  InfoLogPrefix info_log_prefix(!db_options_.db_log_dir.empty(), dbname_);
681 682 683
  for (const auto& candidate_file : candidate_files) {
    std::string to_delete = candidate_file.file_name;
    uint32_t path_id = candidate_file.path_id;
K
kailiu 已提交
684 685 686
    uint64_t number;
    FileType type;
    // Ignore file if we cannot recognize it.
687
    if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
K
kailiu 已提交
688 689
      continue;
    }
J
jorlow@chromium.org 已提交
690

K
kailiu 已提交
691 692 693 694 695 696 697 698
    bool keep = true;
    switch (type) {
      case kLogFile:
        keep = ((number >= state.log_number) ||
                (number == state.prev_log_number));
        break;
      case kDescriptorFile:
        // Keep my manifest file, and any newer incarnations'
699
        // (can happen during manifest roll)
K
kailiu 已提交
700 701 702
        keep = (number >= state.manifest_file_number);
        break;
      case kTableFile:
703
        keep = (sst_live_map.find(number) != sst_live_map.end());
K
kailiu 已提交
704 705 706
        break;
      case kTempFile:
        // Any temp files that are currently being written to must
707 708 709 710
        // be recorded in pending_outputs_, which is inserted into "live".
        // Also, SetCurrentFile creates a temp file when writing out new
        // manifest, which is equal to state.pending_manifest_file_number. We
        // should not delete that file
711
        keep = (sst_live_map.find(number) != sst_live_map.end()) ||
712
               (number == state.pending_manifest_file_number);
K
kailiu 已提交
713 714 715 716
        break;
      case kInfoLogFile:
        keep = true;
        if (number != 0) {
717
          old_info_log_files.push_back(to_delete);
J
jorlow@chromium.org 已提交
718
        }
K
kailiu 已提交
719 720 721 722 723 724 725 726 727 728 729 730 731
        break;
      case kCurrentFile:
      case kDBLockFile:
      case kIdentityFile:
      case kMetaDatabase:
        keep = true;
        break;
    }

    if (keep) {
      continue;
    }

732
    std::string fname;
K
kailiu 已提交
733 734
    if (type == kTableFile) {
      // evict from cache
735
      TableCache::Evict(table_cache_.get(), number);
736
      fname = TableFileName(db_options_.db_paths, number, path_id);
737
    } else {
738 739
      fname = ((type == kLogFile) ?
          db_options_.wal_dir : dbname_) + "/" + to_delete;
K
kailiu 已提交
740
    }
741

I
Igor Canadi 已提交
742 743 744 745 746 747 748 749
#ifdef ROCKSDB_LITE
    Status s = env_->DeleteFile(fname);
    Log(db_options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n",
        fname.c_str(), type, number, s.ToString().c_str());
#else   // not ROCKSDB_LITE
    if (type == kLogFile && (db_options_.WAL_ttl_seconds > 0 ||
                             db_options_.WAL_size_limit_MB > 0)) {
      wal_manager_.ArchiveWALFile(fname, number);
K
kailiu 已提交
750 751
    } else {
      Status s = env_->DeleteFile(fname);
752
      Log(db_options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n",
753
          fname.c_str(), type, number, s.ToString().c_str());
J
jorlow@chromium.org 已提交
754
    }
I
Igor Canadi 已提交
755
#endif  // ROCKSDB_LITE
J
jorlow@chromium.org 已提交
756
  }
H
heyongqiang 已提交
757

758
  // Delete old info log files.
759
  size_t old_info_log_file_count = old_info_log_files.size();
760
  if (old_info_log_file_count >= db_options_.keep_log_file_num) {
761
    std::sort(old_info_log_files.begin(), old_info_log_files.end());
762
    size_t end = old_info_log_file_count - db_options_.keep_log_file_num;
763
    for (unsigned int i = 0; i <= end; i++) {
764
      std::string& to_delete = old_info_log_files.at(i);
765 766 767
      std::string full_path_to_delete = (db_options_.db_log_dir.empty() ?
           dbname_ : db_options_.db_log_dir) + "/" + to_delete;
      Log(db_options_.info_log, "Delete info log file %s\n",
768 769
          full_path_to_delete.c_str());
      Status s = env_->DeleteFile(full_path_to_delete);
770
      if (!s.ok()) {
771
        Log(db_options_.info_log, "Delete info log file %s FAILED -- %s\n",
772 773
            to_delete.c_str(), s.ToString().c_str());
      }
H
heyongqiang 已提交
774 775
    }
  }
I
Igor Canadi 已提交
776 777 778
#ifndef ROCKSDB_LITE
  wal_manager_.PurgeObsoleteWALFiles();
#endif  // ROCKSDB_LITE
779
  LogFlush(db_options_.info_log);
D
Dhruba Borthakur 已提交
780 781 782 783
}

void DBImpl::DeleteObsoleteFiles() {
  mutex_.AssertHeld();
I
Igor Canadi 已提交
784 785 786 787
  JobContext job_context;
  FindObsoleteFiles(&job_context, true);
  if (job_context.HaveSomethingToDelete()) {
    PurgeObsoleteFiles(job_context);
788
  }
789 790
}

791
Status DBImpl::Recover(
792 793
    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
    bool error_if_log_file_exist) {
J
jorlow@chromium.org 已提交
794 795
  mutex_.AssertHeld();

796
  bool is_new_db = false;
797
  assert(db_lock_ == nullptr);
I
Igor Canadi 已提交
798
  if (!read_only) {
799 800 801 802 803 804 805 806 807 808 809 810
    // We call CreateDirIfMissing() as the directory may already exist (if we
    // are reopening a DB), when this happens we don't want creating the
    // directory to cause an error. However, we need to check if creating the
    // directory fails or else we may get an obscure message about the lock
    // file not existing. One real-world example of this occurring is if
    // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
    // when dbname_ is "dir/db" but when "dir" doesn't exist.
    Status s = env_->CreateDirIfMissing(dbname_);
    if (!s.ok()) {
      return s;
    }

811
    for (auto& db_path : db_options_.db_paths) {
812
      s = env_->CreateDirIfMissing(db_path.path);
813 814 815 816 817
      if (!s.ok()) {
        return s;
      }
    }

818 819 820 821 822
    s = env_->NewDirectory(dbname_, &db_directory_);
    if (!s.ok()) {
      return s;
    }

823
    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
824 825 826
    if (!s.ok()) {
      return s;
    }
J
jorlow@chromium.org 已提交
827

828
    if (!env_->FileExists(CurrentFileName(dbname_))) {
829
      if (db_options_.create_if_missing) {
830
        s = NewDB();
831
        is_new_db = true;
832 833 834 835 836 837
        if (!s.ok()) {
          return s;
        }
      } else {
        return Status::InvalidArgument(
            dbname_, "does not exist (create_if_missing is false)");
J
jorlow@chromium.org 已提交
838 839
      }
    } else {
840
      if (db_options_.error_if_exists) {
841 842 843
        return Status::InvalidArgument(
            dbname_, "exists (error_if_exists is true)");
      }
J
jorlow@chromium.org 已提交
844
    }
M
Mayank Agarwal 已提交
845 846 847 848 849 850 851
    // Check for the IDENTITY file and create it if not there
    if (!env_->FileExists(IdentityFileName(dbname_))) {
      s = SetIdentityFile(env_, dbname_);
      if (!s.ok()) {
        return s;
      }
    }
J
jorlow@chromium.org 已提交
852 853
  }

854
  Status s = versions_->Recover(column_families, read_only);
855
  if (db_options_.paranoid_checks && s.ok()) {
I
Igor Canadi 已提交
856 857
    s = CheckConsistency();
  }
J
jorlow@chromium.org 已提交
858 859
  if (s.ok()) {
    SequenceNumber max_sequence(0);
860 861
    default_cf_handle_ = new ColumnFamilyHandleImpl(
        versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
862
    default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
863 864
    single_column_family_mode_ =
        versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
865 866 867 868 869 870 871

    // Recover from all newer log files than the ones named in the
    // descriptor (new log files may have been added by the previous
    // incarnation without registering them in the descriptor).
    //
    // Note that PrevLogNumber() is no longer used, but we pay
    // attention to it in case we are recovering a database
872
    // produced by an older version of rocksdb.
873
    const uint64_t min_log = versions_->MinLogNumber();
874 875
    const uint64_t prev_log = versions_->PrevLogNumber();
    std::vector<std::string> filenames;
876
    s = env_->GetChildren(db_options_.wal_dir, &filenames);
877 878
    if (!s.ok()) {
      return s;
879
    }
K
kailiu 已提交
880

881 882
    std::vector<uint64_t> logs;
    for (size_t i = 0; i < filenames.size(); i++) {
K
kailiu 已提交
883 884
      uint64_t number;
      FileType type;
885 886 887 888 889 890 891 892 893
      if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
        if (is_new_db) {
          return Status::Corruption(
              "While creating a new Db, wal_dir contains "
              "existing log file: ",
              filenames[i]);
        } else if ((number >= min_log) || (number == prev_log)) {
          logs.push_back(number);
        }
894
      }
J
jorlow@chromium.org 已提交
895
    }
896

H
heyongqiang 已提交
897 898 899 900 901 902
    if (logs.size() > 0 && error_if_log_file_exist) {
      return Status::Corruption(""
          "The db was opened in readonly mode with error_if_log_file_exist"
          "flag but a log file already exists");
    }

S
Stanislau Hlebik 已提交
903 904 905 906 907 908 909
    if (!logs.empty()) {
      // Recover in the order in which the logs were generated
      std::sort(logs.begin(), logs.end());
      s = RecoverLogFiles(logs, &max_sequence, read_only);
      if (!s.ok()) {
        // Clear memtables if recovery failed
        for (auto cfd : *versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
910
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
S
Stanislau Hlebik 已提交
911 912
        }
      }
913
    }
L
Lei Jin 已提交
914
    SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence());
J
jorlow@chromium.org 已提交
915 916
  }

L
Lei Jin 已提交
917 918
  // Initial value
  max_total_in_memory_state_ = 0;
I
Igor Canadi 已提交
919
  for (auto cfd : *versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
920 921 922
    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
    max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
                                  mutable_cf_options->max_write_buffer_number;
I
Igor Canadi 已提交
923 924
  }

J
jorlow@chromium.org 已提交
925 926 927
  return s;
}

S
Stanislau Hlebik 已提交
928 929 930
// REQUIRES: log_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
                               SequenceNumber* max_sequence, bool read_only) {
J
jorlow@chromium.org 已提交
931 932
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
933
    Logger* info_log;
J
jorlow@chromium.org 已提交
934
    const char* fname;
935 936
    Status* status;  // nullptr if db_options_.paranoid_checks==false or
                     //            db_options_.skip_log_error_on_recovery==true
J
jorlow@chromium.org 已提交
937
    virtual void Corruption(size_t bytes, const Status& s) {
938
      Log(info_log, "%s%s: dropping %d bytes; %s",
939
          (this->status == nullptr ? "(ignoring error) " : ""),
J
jorlow@chromium.org 已提交
940
          fname, static_cast<int>(bytes), s.ToString().c_str());
941
      if (this->status != nullptr && this->status->ok()) *this->status = s;
J
jorlow@chromium.org 已提交
942 943 944 945
    }
  };

  mutex_.AssertHeld();
S
Stanislau Hlebik 已提交
946
  Status status;
947
  std::unordered_map<int, VersionEdit> version_edits;
948
  // no need to refcount because iteration is under mutex
949 950
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    VersionEdit edit;
951 952
    edit.SetColumnFamily(cfd->GetID());
    version_edits.insert({cfd->GetID(), edit});
953
  }
I
Igor Canadi 已提交
954

S
Stanislau Hlebik 已提交
955 956 957 958 959 960 961 962 963
  for (auto log_number : log_numbers) {
    // The previous incarnation may not have written any MANIFEST
    // records after allocating this log number.  So we manually
    // update the file number allocation counter in VersionSet.
    versions_->MarkFileNumberUsed(log_number);
    // Open the log file
    std::string fname = LogFileName(db_options_.wal_dir, log_number);
    unique_ptr<SequentialFile> file;
    status = env_->NewSequentialFile(fname, &file, env_options_);
J
jorlow@chromium.org 已提交
964
    if (!status.ok()) {
S
Stanislau Hlebik 已提交
965 966 967 968 969 970 971 972
      MaybeIgnoreError(&status);
      if (!status.ok()) {
        return status;
      } else {
        // Fail with one log file, but that's ok.
        // Try next one.
        continue;
      }
J
jorlow@chromium.org 已提交
973 974
    }

S
Stanislau Hlebik 已提交
975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
    // Create the log reader.
    LogReporter reporter;
    reporter.env = env_;
    reporter.info_log = db_options_.info_log.get();
    reporter.fname = fname.c_str();
    reporter.status =
        (db_options_.paranoid_checks && !db_options_.skip_log_error_on_recovery
             ? &status
             : nullptr);
    // We intentially make log::Reader do checksumming even if
    // paranoid_checks==false so that corruptions cause entire commits
    // to be skipped instead of propagating bad information (like overly
    // large sequence numbers).
    log::Reader reader(std::move(file), &reporter, true /*checksum*/,
                       0 /*initial_offset*/);
    Log(db_options_.info_log, "Recovering log #%" PRIu64 "", log_number);

    // Read all the records and add to a memtable
    std::string scratch;
    Slice record;
    WriteBatch batch;
    while (reader.ReadRecord(&record, &scratch)) {
      if (record.size() < 12) {
        reporter.Corruption(record.size(),
                            Status::Corruption("log record too small"));
        continue;
      }
      WriteBatchInternal::SetContents(&batch, record);

      // If column family was not found, it might mean that the WAL write
      // batch references to the column family that was dropped after the
      // insert. We don't want to fail the whole write batch in that case --
      // we just ignore the update.
      // That's why we set ignore missing column families to true
      status = WriteBatchInternal::InsertInto(
          &batch, column_family_memtables_.get(), true, log_number);

      MaybeIgnoreError(&status);
      if (!status.ok()) {
        return status;
      }
      const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
                                      WriteBatchInternal::Count(&batch) - 1;
      if (last_seq > *max_sequence) {
        *max_sequence = last_seq;
      }

      if (!read_only) {
I
Igor Canadi 已提交
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
        // we can do this because this is called before client has access to the
        // DB and there is only a single thread operating on DB
        ColumnFamilyData* cfd;

        while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
          cfd->Unref();
          // If this asserts, it means that InsertInto failed in
          // filtering updates to already-flushed column families
          assert(cfd->GetLogNumber() <= log_number);
          auto iter = version_edits.find(cfd->GetID());
          assert(iter != version_edits.end());
          VersionEdit* edit = &iter->second;
          status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
          if (!status.ok()) {
            // Reflect errors immediately so that conditions like full
            // file-systems cause the DB::Open() to fail.
            return status;
1040
          }
L
Lei Jin 已提交
1041
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
1042
        }
J
jorlow@chromium.org 已提交
1043 1044 1045
      }
    }

I
Igor Canadi 已提交
1046
    flush_scheduler_.Clear();
S
Stanislau Hlebik 已提交
1047 1048 1049
    if (versions_->LastSequence() < *max_sequence) {
      versions_->SetLastSequence(*max_sequence);
    }
1050 1051
  }

1052
  if (!read_only) {
1053 1054
    // no need to refcount since client still doesn't have access
    // to the DB and can not drop column families while we iterate
S
Stanislau Hlebik 已提交
1055
    auto max_log_number = log_numbers.back();
1056
    for (auto cfd : *versions_->GetColumnFamilySet()) {
1057
      auto iter = version_edits.find(cfd->GetID());
1058 1059 1060
      assert(iter != version_edits.end());
      VersionEdit* edit = &iter->second;

S
Stanislau Hlebik 已提交
1061
      if (cfd->GetLogNumber() > max_log_number) {
1062
        // Column family cfd has already flushed the data
S
Stanislau Hlebik 已提交
1063
        // from all logs. Memtable has to be empty because
1064
        // we filter the updates based on log_number
1065
        // (in WriteBatch::InsertInto)
1066 1067 1068 1069 1070 1071 1072
        assert(cfd->mem()->GetFirstSequenceNumber() == 0);
        assert(edit->NumEntries() == 0);
        continue;
      }

      // flush the final memtable (if non-empty)
      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1073
        status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
S
Stanislau Hlebik 已提交
1074 1075 1076 1077
        if (!status.ok()) {
          // Recovery failed
          break;
        }
L
Lei Jin 已提交
1078
        cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
1079
      }
J
jorlow@chromium.org 已提交
1080

1081
      // write MANIFEST with update
S
Stanislau Hlebik 已提交
1082
      // writing log_number in the manifest means that any log file
1083 1084
      // with number strongly less than (log_number + 1) is already
      // recovered and should be ignored on next reincarnation.
S
Stanislau Hlebik 已提交
1085 1086 1087
      // Since we already recovered max_log_number, we want all logs
      // with numbers `<= max_log_number` (includes this one) to be ignored
      edit->SetLogNumber(max_log_number + 1);
1088 1089 1090
      // we must mark the next log number as used, even though it's
      // not actually used. that is because VersionSet assumes
      // VersionSet::next_file_number_ always to be strictly greater than any
I
Igor Canadi 已提交
1091
      // log number
S
Stanislau Hlebik 已提交
1092
      versions_->MarkFileNumberUsed(max_log_number + 1);
1093 1094
      status = versions_->LogAndApply(
          cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
1095
      if (!status.ok()) {
S
Stanislau Hlebik 已提交
1096 1097
        // Recovery failed
        break;
1098 1099
      }
    }
1100
  }
I
Igor Canadi 已提交
1101

J
jorlow@chromium.org 已提交
1102 1103 1104
  return status;
}

1105 1106
Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
                                           VersionEdit* edit) {
J
jorlow@chromium.org 已提交
1107
  mutex_.AssertHeld();
1108
  const uint64_t start_micros = env_->NowMicros();
J
jorlow@chromium.org 已提交
1109
  FileMetaData meta;
1110 1111
  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  pending_outputs_[meta.fd.GetNumber()] = 0;  // path 0 for level 0 file.
1112 1113
  ReadOptions ro;
  ro.total_order_seek = true;
1114
  Arena arena;
1115 1116
  Status s;
  {
1117 1118 1119 1120
    ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
    const SequenceNumber newest_snapshot = snapshots_.GetNewest();
    const SequenceNumber earliest_seqno_in_memtable =
        mem->GetFirstSequenceNumber();
1121
    Log(db_options_.info_log, "[%s] Level-0 table #%" PRIu64 ": started",
1122
        cfd->GetName().c_str(), meta.fd.GetNumber());
1123

1124 1125 1126 1127 1128
    {
      mutex_.Unlock();
      s = BuildTable(
          dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
          iter.get(), &meta, cfd->internal_comparator(), newest_snapshot,
1129 1130
          earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
          cfd->ioptions()->compression_opts, Env::IO_HIGH);
1131
      LogFlush(db_options_.info_log);
1132 1133
      mutex_.Lock();
    }
1134

1135
    Log(db_options_.info_log,
1136 1137 1138 1139
        "[%s] Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
        cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
        s.ToString().c_str());
  }
1140
  pending_outputs_.erase(meta.fd.GetNumber());
1141 1142 1143 1144

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
1145
  if (s.ok() && meta.fd.GetFileSize() > 0) {
1146 1147 1148
    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
1149 1150
  }

L
Lei Jin 已提交
1151
  InternalStats::CompactionStats stats(1);
1152
  stats.micros = env_->NowMicros() - start_micros;
1153
  stats.bytes_written = meta.fd.GetFileSize();
M
Mark Callaghan 已提交
1154
  stats.files_out_levelnp1 = 1;
1155
  cfd->internal_stats()->AddCompactionStats(level, stats);
1156 1157
  cfd->internal_stats()->AddCFStats(
      InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
L
Lei Jin 已提交
1158
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
J
jorlow@chromium.org 已提交
1159 1160 1161
  return s;
}

1162 1163
Status DBImpl::FlushMemTableToOutputFile(
    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
I
Igor Canadi 已提交
1164
    bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) {
1165
  mutex_.AssertHeld();
1166
  assert(cfd->imm()->size() != 0);
1167
  assert(cfd->imm()->IsFlushPending());
1168

I
Igor Canadi 已提交
1169 1170 1171 1172 1173
  FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
                     env_options_, versions_.get(), &mutex_, &shutting_down_,
                     &pending_outputs_, snapshots_.GetNewest(), job_context,
                     log_buffer, db_directory_.get(),
                     GetCompressionFlush(*cfd->ioptions()), stats_);
1174

I
Igor Canadi 已提交
1175
  Status s = flush_job.Run();
J
jorlow@chromium.org 已提交
1176 1177

  if (s.ok()) {
I
Igor Canadi 已提交
1178
    InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
1179 1180 1181
    if (madeProgress) {
      *madeProgress = 1;
    }
S
sdong 已提交
1182
    VersionStorageInfo::LevelSummaryStorage tmp;
I
Igor Canadi 已提交
1183
    LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
S
sdong 已提交
1184
                cfd->current()->storage_info()->LevelSummary(&tmp));
1185

1186
    if (disable_delete_obsolete_files_ == 0) {
I
Igor Canadi 已提交
1187
      // add to deletion state
1188
      while (alive_log_files_.size() &&
I
Igor Canadi 已提交
1189 1190
             alive_log_files_.begin()->number < versions_->MinLogNumber()) {
        const auto& earliest = *alive_log_files_.begin();
I
Igor Canadi 已提交
1191
        job_context->log_delete_files.push_back(earliest.number);
I
Igor Canadi 已提交
1192
        total_log_size_ -= earliest.size;
1193 1194
        alive_log_files_.pop_front();
      }
1195
    }
J
jorlow@chromium.org 已提交
1196
  }
1197

1198
  if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks &&
1199 1200 1201 1202 1203
      bg_error_.ok()) {
    // if a bad error happened (not ShutdownInProgress) and paranoid_checks is
    // true, mark DB read-only
    bg_error_ = s;
  }
1204
  RecordFlushIOStats();
J
jorlow@chromium.org 已提交
1205 1206 1207
  return s;
}

1208
Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
1209
                            const Slice* begin, const Slice* end,
1210 1211
                            bool reduce_level, int target_level,
                            uint32_t target_path_id) {
1212
  if (target_path_id >= db_options_.db_paths.size()) {
1213 1214 1215
    return Status::InvalidArgument("Invalid target path ID");
  }

1216 1217
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
I
Igor Canadi 已提交
1218 1219

  Status s = FlushMemTable(cfd, FlushOptions());
L
Lei Jin 已提交
1220
  if (!s.ok()) {
1221
    LogFlush(db_options_.info_log);
L
Lei Jin 已提交
1222 1223 1224
    return s;
  }

I
Igor Canadi 已提交
1225
  int max_level_with_files = 0;
G
Gabor Cselle 已提交
1226 1227
  {
    MutexLock l(&mutex_);
I
Igor Canadi 已提交
1228 1229
    Version* base = cfd->current();
    for (int level = 1; level < cfd->NumberLevels(); level++) {
S
sdong 已提交
1230
      if (base->storage_info()->OverlapInLevel(level, begin, end)) {
G
Gabor Cselle 已提交
1231 1232 1233 1234
        max_level_with_files = level;
      }
    }
  }
1235 1236
  for (int level = 0; level <= max_level_with_files; level++) {
    // in case the compaction is unversal or if we're compacting the
1237 1238 1239
    // bottom-most level, the output level will be the same as input one.
    // level 0 can never be the bottommost level (i.e. if all files are in level
    // 0, we will compact to level 1)
1240 1241
    if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
        cfd->ioptions()->compaction_style == kCompactionStyleFIFO ||
1242
        (level == max_level_with_files && level > 0)) {
1243
      s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
1244
    } else {
1245 1246
      s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin,
                              end);
L
Lei Jin 已提交
1247 1248
    }
    if (!s.ok()) {
1249
      LogFlush(db_options_.info_log);
L
Lei Jin 已提交
1250
      return s;
1251
    }
G
Gabor Cselle 已提交
1252
  }
1253 1254

  if (reduce_level) {
I
Igor Canadi 已提交
1255
    s = ReFitLevel(cfd, max_level_with_files, target_level);
1256
  }
1257
  LogFlush(db_options_.info_log);
L
Lei Jin 已提交
1258

1259 1260 1261 1262 1263 1264 1265
  {
    MutexLock l(&mutex_);
    // an automatic compaction that has been scheduled might have been
    // preempted by the manual compactions. Need to schedule it back.
    MaybeScheduleFlushOrCompaction();
  }

L
Lei Jin 已提交
1266
  return s;
1267 1268
}

1269 1270
bool DBImpl::SetOptions(ColumnFamilyHandle* column_family,
    const std::unordered_map<std::string, std::string>& options_map) {
L
Lei Jin 已提交
1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301
  auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  if (options_map.empty()) {
    Log(db_options_.info_log, "SetOptions() on column family [%s], empty input",
        cfd->GetName().c_str());
    return false;
  }

  MutableCFOptions new_options;
  bool succeed = false;
  {
    MutexLock l(&mutex_);
    if (cfd->SetOptions(options_map)) {
      new_options = *cfd->GetLatestMutableCFOptions();
      succeed = true;
    }
  }

  Log(db_options_.info_log, "SetOptions() on column family [%s], inputs:",
      cfd->GetName().c_str());
  for (const auto& o : options_map) {
    Log(db_options_.info_log, "%s: %s\n", o.first.c_str(), o.second.c_str());
  }
  if (succeed) {
    Log(db_options_.info_log, "[%s] SetOptions succeeded",
        cfd->GetName().c_str());
    new_options.Dump(db_options_.info_log.get());
  } else {
    Log(db_options_.info_log, "[%s] SetOptions failed",
        cfd->GetName().c_str());
  }
  return succeed;
1302 1303
}

1304
// return the same level if it cannot be moved
1305 1306
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
    const MutableCFOptions& mutable_cf_options, int level) {
1307
  mutex_.AssertHeld();
S
sdong 已提交
1308
  const auto* vstorage = cfd->current()->storage_info();
1309
  int minimum_level = level;
1310
  for (int i = level - 1; i > 0; --i) {
1311
    // stop if level i is not empty
S
sdong 已提交
1312
    if (vstorage->NumLevelFiles(i) > 0) break;
1313
    // stop if level i is too small (cannot fit the level files)
1314
    if (mutable_cf_options.MaxBytesForLevel(i) <
S
sdong 已提交
1315
        vstorage->NumLevelBytes(level)) {
1316 1317
      break;
    }
1318 1319 1320 1321 1322 1323

    minimum_level = i;
  }
  return minimum_level;
}

I
Igor Canadi 已提交
1324 1325
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
  assert(level < cfd->NumberLevels());
1326

I
Igor Canadi 已提交
1327
  SuperVersion* superversion_to_free = nullptr;
1328
  SuperVersion* new_superversion = new SuperVersion();
I
Igor Canadi 已提交
1329 1330

  mutex_.Lock();
1331 1332 1333

  // only allow one thread refitting
  if (refitting_level_) {
I
Igor Canadi 已提交
1334
    mutex_.Unlock();
1335
    Log(db_options_.info_log, "ReFitLevel: another thread is refitting");
I
Igor Canadi 已提交
1336
    delete new_superversion;
L
Lei Jin 已提交
1337
    return Status::NotSupported("another thread is refitting");
1338 1339 1340 1341 1342
  }
  refitting_level_ = true;

  // wait for all background threads to stop
  bg_work_gate_closed_ = true;
1343
  while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) {
1344
    Log(db_options_.info_log,
1345 1346
        "RefitLevel: waiting for background threads to stop: %d %d",
        bg_compaction_scheduled_, bg_flush_scheduled_);
1347 1348 1349
    bg_cv_.Wait();
  }

1350 1351
  const MutableCFOptions mutable_cf_options =
    *cfd->GetLatestMutableCFOptions();
1352
  // move to a smaller level
1353 1354
  int to_level = target_level;
  if (target_level < 0) {
1355
    to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1356
  }
1357 1358 1359

  assert(to_level <= level);

L
Lei Jin 已提交
1360
  Status status;
1361
  if (to_level < level) {
1362 1363
    Log(db_options_.info_log, "[%s] Before refitting:\n%s",
        cfd->GetName().c_str(), cfd->current()->DebugString().data());
1364

1365
    VersionEdit edit;
I
Igor Canadi 已提交
1366
    edit.SetColumnFamily(cfd->GetID());
S
sdong 已提交
1367
    for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) {
1368
      edit.DeleteFile(level, f->fd.GetNumber());
1369 1370 1371
      edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
                   f->fd.GetFileSize(), f->smallest, f->largest,
                   f->smallest_seqno, f->largest_seqno);
1372
    }
1373
    Log(db_options_.info_log, "[%s] Apply version edit:\n%s",
I
Igor Canadi 已提交
1374
        cfd->GetName().c_str(), edit.DebugString().data());
1375

1376 1377
    status = versions_->LogAndApply(cfd,
        mutable_cf_options, &edit, &mutex_, db_directory_.get());
L
Lei Jin 已提交
1378 1379
    superversion_to_free = InstallSuperVersion(
        cfd, new_superversion, mutable_cf_options);
I
Igor Canadi 已提交
1380
    new_superversion = nullptr;
1381

1382
    Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(),
I
Igor Canadi 已提交
1383
        status.ToString().data());
1384 1385

    if (status.ok()) {
1386
      Log(db_options_.info_log, "[%s] After refitting:\n%s",
I
Igor Canadi 已提交
1387
          cfd->GetName().c_str(), cfd->current()->DebugString().data());
1388 1389 1390 1391 1392
    }
  }

  refitting_level_ = false;
  bg_work_gate_closed_ = false;
I
Igor Canadi 已提交
1393 1394 1395 1396

  mutex_.Unlock();
  delete superversion_to_free;
  delete new_superversion;
L
Lei Jin 已提交
1397
  return status;
G
Gabor Cselle 已提交
1398 1399
}

1400 1401 1402
int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  return cfh->cfd()->NumberLevels();
1403 1404
}

1405 1406
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
L
Lei Jin 已提交
1407 1408 1409
  MutexLock l(&mutex_);
  return cfh->cfd()->GetSuperVersion()->
      mutable_cf_options.max_mem_compaction_level;
1410 1411
}

1412 1413
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
L
Lei Jin 已提交
1414 1415 1416
  MutexLock l(&mutex_);
  return cfh->cfd()->GetSuperVersion()->
      mutable_cf_options.level0_stop_writes_trigger;
1417 1418
}

L
Lei Jin 已提交
1419
Status DBImpl::Flush(const FlushOptions& flush_options,
1420 1421
                     ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
L
Lei Jin 已提交
1422
  return FlushMemTable(cfh->cfd(), flush_options);
H
heyongqiang 已提交
1423 1424
}

1425
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1426 1427 1428
  return versions_->LastSequence();
}

I
Igor Canadi 已提交
1429
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
1430 1431
                                   int output_level, uint32_t output_path_id,
                                   const Slice* begin, const Slice* end) {
1432
  assert(input_level >= 0);
1433

G
Gabor Cselle 已提交
1434 1435
  InternalKey begin_storage, end_storage;

H
hans@chromium.org 已提交
1436
  ManualCompaction manual;
I
Igor Canadi 已提交
1437
  manual.cfd = cfd;
1438 1439
  manual.input_level = input_level;
  manual.output_level = output_level;
1440
  manual.output_path_id = output_path_id;
G
Gabor Cselle 已提交
1441
  manual.done = false;
1442
  manual.in_progress = false;
1443 1444 1445
  // For universal compaction, we enforce every manual compaction to compact
  // all files.
  if (begin == nullptr ||
1446 1447
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1448
    manual.begin = nullptr;
G
Gabor Cselle 已提交
1449 1450 1451 1452
  } else {
    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
    manual.begin = &begin_storage;
  }
1453
  if (end == nullptr ||
1454 1455
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1456
    manual.end = nullptr;
G
Gabor Cselle 已提交
1457 1458 1459 1460 1461 1462
  } else {
    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
    manual.end = &end_storage;
  }

  MutexLock l(&mutex_);
1463

1464 1465 1466 1467 1468 1469
  // When a manual compaction arrives, temporarily disable scheduling of
  // non-manual compactions and wait until the number of scheduled compaction
  // jobs drops to zero. This is needed to ensure that this manual compaction
  // can compact any range of keys/files.
  //
  // bg_manual_only_ is non-zero when at least one thread is inside
1470
  // RunManualCompaction(), i.e. during that time no other compaction will
1471 1472 1473
  // get scheduled (see MaybeScheduleFlushOrCompaction).
  //
  // Note that the following loop doesn't stop more that one thread calling
1474
  // RunManualCompaction() from getting to the second while loop below.
1475 1476 1477 1478 1479
  // However, only one of them will actually schedule compaction, while
  // others will wait on a condition variable until it completes.

  ++bg_manual_only_;
  while (bg_compaction_scheduled_ > 0) {
1480
    Log(db_options_.info_log,
I
Igor Canadi 已提交
1481 1482 1483
        "[%s] Manual compaction waiting for all other scheduled background "
        "compactions to finish",
        cfd->GetName().c_str());
1484 1485
    bg_cv_.Wait();
  }
1486

1487
  Log(db_options_.info_log, "[%s] Manual compaction starting",
I
Igor Canadi 已提交
1488
      cfd->GetName().c_str());
1489

1490 1491 1492 1493
  // We don't check bg_error_ here, because if we get the error in compaction,
  // the compaction will set manual.status to bg_error_ and set manual.done to
  // true.
  while (!manual.done) {
1494 1495 1496
    assert(bg_manual_only_ > 0);
    if (manual_compaction_ != nullptr) {
      // Running either this or some other manual compaction
G
Gabor Cselle 已提交
1497
      bg_cv_.Wait();
1498 1499
    } else {
      manual_compaction_ = &manual;
1500 1501
      bg_compaction_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
G
Gabor Cselle 已提交
1502
    }
H
hans@chromium.org 已提交
1503
  }
1504

1505 1506 1507
  assert(!manual.in_progress);
  assert(bg_manual_only_ > 0);
  --bg_manual_only_;
L
Lei Jin 已提交
1508
  return manual.status;
J
jorlow@chromium.org 已提交
1509 1510
}

1511
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
L
Lei Jin 已提交
1512
                             const FlushOptions& flush_options) {
S
Stanislau Hlebik 已提交
1513 1514 1515 1516
  Status s;
  {
    WriteContext context;
    MutexLock guard_lock(&mutex_);
1517 1518 1519 1520 1521 1522

    if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) {
      // Nothing to flush
      return Status::OK();
    }

I
Igor Canadi 已提交
1523 1524
    WriteThread::Writer w(&mutex_);
    s = write_thread_.EnterWriteThread(&w, 0);
S
Stanislau Hlebik 已提交
1525 1526 1527 1528 1529 1530 1531 1532
    assert(s.ok() && !w.done);  // No timeout and nobody should do our job

    // SetNewMemtableAndNewLogFile() will release and reacquire mutex
    // during execution
    s = SetNewMemtableAndNewLogFile(cfd, &context);
    cfd->imm()->FlushRequested();
    MaybeScheduleFlushOrCompaction();

I
Igor Canadi 已提交
1533
    write_thread_.ExitWriteThread(&w, &w, s);
S
Stanislau Hlebik 已提交
1534
  }
S
Stanislau Hlebik 已提交
1535

L
Lei Jin 已提交
1536
  if (s.ok() && flush_options.wait) {
1537
    // Wait until the compaction completes
1538
    s = WaitForFlushMemTable(cfd);
1539 1540
  }
  return s;
J
jorlow@chromium.org 已提交
1541 1542
}

1543
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
1544 1545 1546
  Status s;
  // Wait until the compaction completes
  MutexLock l(&mutex_);
1547
  while (cfd->imm()->size() > 0 && bg_error_.ok()) {
1548 1549
    bg_cv_.Wait();
  }
1550
  if (!bg_error_.ok()) {
1551 1552 1553
    s = bg_error_;
  }
  return s;
H
heyongqiang 已提交
1554 1555
}

1556
void DBImpl::MaybeScheduleFlushOrCompaction() {
J
jorlow@chromium.org 已提交
1557
  mutex_.AssertHeld();
1558
  bg_schedule_needed_ = false;
1559 1560
  if (bg_work_gate_closed_) {
    // gate closed for backgrond work
I
Igor Canadi 已提交
1561
  } else if (shutting_down_.load(std::memory_order_acquire)) {
J
jorlow@chromium.org 已提交
1562 1563
    // DB is being deleted; no more background compactions
  } else {
1564
    bool is_flush_pending = false;
1565
    // no need to refcount since we're under a mutex
1566
    for (auto cfd : *versions_->GetColumnFamilySet()) {
I
Igor Canadi 已提交
1567
      if (cfd->imm()->IsFlushPending()) {
1568 1569 1570
        is_flush_pending = true;
      }
    }
1571
    if (is_flush_pending) {
1572
      // memtable flush needed
1573
      if (bg_flush_scheduled_ < db_options_.max_background_flushes) {
1574 1575
        bg_flush_scheduled_++;
        env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
1576
      } else if (db_options_.max_background_flushes > 0) {
1577
        bg_schedule_needed_ = true;
1578
      }
1579
    }
1580
    bool is_compaction_needed = false;
1581
    // no need to refcount since we're under a mutex
1582
    for (auto cfd : *versions_->GetColumnFamilySet()) {
S
sdong 已提交
1583
      if (cfd->current()->storage_info()->NeedsCompaction()) {
1584 1585 1586 1587
        is_compaction_needed = true;
        break;
      }
    }
1588

1589 1590
    // Schedule BGWorkCompaction if there's a compaction pending (or a memtable
    // flush, but the HIGH pool is not enabled)
1591 1592 1593 1594
    // Do it only if max_background_compactions hasn't been reached and
    // bg_manual_only_ == 0
    if (!bg_manual_only_ &&
        (is_compaction_needed ||
1595 1596
         (is_flush_pending && db_options_.max_background_flushes == 0))) {
      if (bg_compaction_scheduled_ < db_options_.max_background_compactions) {
1597 1598 1599 1600 1601
        bg_compaction_scheduled_++;
        env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
      } else {
        bg_schedule_needed_ = true;
      }
1602 1603 1604 1605
    }
  }
}

1606
void DBImpl::RecordFlushIOStats() {
I
Igor Canadi 已提交
1607
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
1608 1609 1610 1611
  IOSTATS_RESET(bytes_written);
}

void DBImpl::RecordCompactionIOStats() {
L
Lei Jin 已提交
1612
  RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1613
  IOSTATS_RESET(bytes_read);
L
Lei Jin 已提交
1614
  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1615 1616 1617
  IOSTATS_RESET(bytes_written);
}

1618
void DBImpl::BGWorkFlush(void* db) {
1619
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
1620 1621 1622 1623
  reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
}

void DBImpl::BGWorkCompaction(void* db) {
1624
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
1625 1626 1627
  reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}

I
Igor Canadi 已提交
1628
Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
H
Haobo Xu 已提交
1629
                               LogBuffer* log_buffer) {
1630
  mutex_.AssertHeld();
1631 1632 1633 1634 1635

  if (!bg_error_.ok()) {
    return bg_error_;
  }

1636 1637 1638 1639 1640
  // call_status is failure if at least one flush was a failure. even if
  // flushing one column family reports a failure, we will continue flushing
  // other column families. however, call_status will be a failure in that case.
  Status call_status;
  // refcounting in iteration
1641
  for (auto cfd : *versions_->GetColumnFamilySet()) {
1642 1643
    cfd->Ref();
    Status flush_status;
1644 1645
    const MutableCFOptions mutable_cf_options =
      *cfd->GetLatestMutableCFOptions();
1646
    while (flush_status.ok() && cfd->imm()->IsFlushPending()) {
1647 1648
      LogToBuffer(
          log_buffer,
1649
          "BackgroundCallFlush doing FlushMemTableToOutputFile with column "
I
Igor Canadi 已提交
1650 1651
          "family [%s], flush slots available %d",
          cfd->GetName().c_str(),
1652
          db_options_.max_background_flushes - bg_flush_scheduled_);
1653
      flush_status = FlushMemTableToOutputFile(
I
Igor Canadi 已提交
1654
          cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
1655 1656 1657
    }
    if (call_status.ok() && !flush_status.ok()) {
      call_status = flush_status;
1658
    }
1659
    cfd->Unref();
J
jorlow@chromium.org 已提交
1660
  }
1661
  versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
1662
  return call_status;
J
jorlow@chromium.org 已提交
1663 1664
}

1665
void DBImpl::BackgroundCallFlush() {
1666
  bool madeProgress = false;
I
Igor Canadi 已提交
1667
  JobContext job_context(true);
1668 1669
  assert(bg_flush_scheduled_);

1670
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
H
Haobo Xu 已提交
1671 1672 1673 1674
  {
    MutexLock l(&mutex_);

    Status s;
I
Igor Canadi 已提交
1675
    if (!shutting_down_.load(std::memory_order_acquire)) {
I
Igor Canadi 已提交
1676
      s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
H
Haobo Xu 已提交
1677 1678 1679 1680 1681
      if (!s.ok()) {
        // Wait a little bit before retrying background compaction in
        // case this is an environmental problem and we do not want to
        // chew up resources for failed compactions for the duration of
        // the problem.
1682 1683
        uint64_t error_cnt =
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
H
Haobo Xu 已提交
1684 1685
        bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
        mutex_.Unlock();
1686
        Log(db_options_.info_log,
1687 1688 1689
            "Waiting after background flush error: %s"
            "Accumulated background error counts: %" PRIu64,
            s.ToString().c_str(), error_cnt);
H
Haobo Xu 已提交
1690
        log_buffer.FlushBufferToLog();
1691
        LogFlush(db_options_.info_log);
H
Haobo Xu 已提交
1692 1693 1694 1695 1696 1697 1698
        env_->SleepForMicroseconds(1000000);
        mutex_.Lock();
      }
    }

    // If !s.ok(), this means that Flush failed. In that case, we want
    // to delete all obsolete files and we force FindObsoleteFiles()
I
Igor Canadi 已提交
1699
    FindObsoleteFiles(&job_context, !s.ok());
H
Haobo Xu 已提交
1700
    // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1701
    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
1702
      mutex_.Unlock();
1703 1704 1705 1706 1707
      // Have to flush the info logs before bg_flush_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
H
Haobo Xu 已提交
1708
      log_buffer.FlushBufferToLog();
I
Igor Canadi 已提交
1709 1710
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
1711
      }
1712 1713
      mutex_.Lock();
    }
I
Igor Canadi 已提交
1714

H
Haobo Xu 已提交
1715
    bg_flush_scheduled_--;
1716 1717 1718 1719 1720
    // Any time the mutex is released After finding the work to do, another
    // thread might execute MaybeScheduleFlushOrCompaction(). It is possible
    // that there is a pending job but it is not scheduled because of the
    // max thread limit.
    if (madeProgress || bg_schedule_needed_) {
H
Haobo Xu 已提交
1721 1722
      MaybeScheduleFlushOrCompaction();
    }
I
Igor Canadi 已提交
1723
    RecordFlushIOStats();
H
Haobo Xu 已提交
1724
    bg_cv_.SignalAll();
I
Igor Canadi 已提交
1725 1726 1727 1728
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
1729
  }
J
jorlow@chromium.org 已提交
1730 1731
}

1732
void DBImpl::BackgroundCallCompaction() {
1733
  bool madeProgress = false;
I
Igor Canadi 已提交
1734
  JobContext job_context(true);
H
Haobo Xu 已提交
1735 1736

  MaybeDumpStats();
1737
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
1738 1739 1740 1741
  {
    MutexLock l(&mutex_);
    assert(bg_compaction_scheduled_);
    Status s;
I
Igor Canadi 已提交
1742
    if (!shutting_down_.load(std::memory_order_acquire)) {
I
Igor Canadi 已提交
1743
      s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
1744 1745 1746 1747 1748
      if (!s.ok()) {
        // Wait a little bit before retrying background compaction in
        // case this is an environmental problem and we do not want to
        // chew up resources for failed compactions for the duration of
        // the problem.
1749 1750
        uint64_t error_cnt =
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
1751 1752
        bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
        mutex_.Unlock();
H
Haobo Xu 已提交
1753
        log_buffer.FlushBufferToLog();
1754
        Log(db_options_.info_log,
1755 1756 1757
            "Waiting after background compaction error: %s, "
            "Accumulated background error counts: %" PRIu64,
            s.ToString().c_str(), error_cnt);
1758
        LogFlush(db_options_.info_log);
1759 1760 1761 1762
        env_->SleepForMicroseconds(1000000);
        mutex_.Lock();
      }
    }
H
Haobo Xu 已提交
1763

1764 1765
    // If !s.ok(), this means that Compaction failed. In that case, we want
    // to delete all obsolete files we might have created and we force
I
Igor Canadi 已提交
1766 1767 1768
    // FindObsoleteFiles(). This is because job_context does not
    // catch all created files if compaction failed.
    FindObsoleteFiles(&job_context, !s.ok());
1769 1770

    // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1771
    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
1772
      mutex_.Unlock();
1773 1774 1775 1776 1777
      // Have to flush the info logs before bg_compaction_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
H
Haobo Xu 已提交
1778
      log_buffer.FlushBufferToLog();
I
Igor Canadi 已提交
1779 1780
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
1781
      }
1782 1783
      mutex_.Lock();
    }
D
Dhruba Borthakur 已提交
1784

1785
    bg_compaction_scheduled_--;
J
jorlow@chromium.org 已提交
1786

1787 1788
    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();

1789 1790 1791
    // Previous compaction may have produced too many files in a level,
    // So reschedule another compaction if we made progress in the
    // last compaction.
1792 1793 1794 1795 1796 1797
    //
    // Also, any time the mutex is released After finding the work to do,
    // another thread might execute MaybeScheduleFlushOrCompaction(). It is
    // possible  that there is a pending job but it is not scheduled because of
    // the max thread limit.
    if (madeProgress || bg_schedule_needed_) {
1798 1799
      MaybeScheduleFlushOrCompaction();
    }
1800 1801
    if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
      // signal if
I
Igor Canadi 已提交
1802
      // * madeProgress -- need to wakeup DelayWrite
1803 1804 1805 1806 1807 1808
      // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
      // * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction
      // If none of this is true, there is no need to signal since nobody is
      // waiting for it
      bg_cv_.SignalAll();
    }
I
Igor Canadi 已提交
1809 1810 1811 1812
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
1813
  }
J
jorlow@chromium.org 已提交
1814 1815
}

I
Igor Canadi 已提交
1816
Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
1817
                                    LogBuffer* log_buffer) {
1818
  *madeProgress = false;
J
jorlow@chromium.org 已提交
1819
  mutex_.AssertHeld();
1820

1821 1822
  bool is_manual = (manual_compaction_ != nullptr) &&
                   (manual_compaction_->in_progress == false);
1823

1824 1825 1826 1827 1828 1829 1830 1831 1832 1833
  if (!bg_error_.ok()) {
    if (is_manual) {
      manual_compaction_->status = bg_error_;
      manual_compaction_->done = true;
      manual_compaction_->in_progress = false;
      manual_compaction_ = nullptr;
    }
    return bg_error_;
  }

1834 1835 1836
  if (is_manual) {
    // another thread cannot pick up the same work
    manual_compaction_->in_progress = true;
1837 1838 1839 1840
  } else if (manual_compaction_ != nullptr) {
    // there should be no automatic compactions running when manual compaction
    // is running
    return Status::OK();
1841 1842 1843 1844 1845
  }

  // FLUSH preempts compaction
  Status flush_stat;
  for (auto cfd : *versions_->GetColumnFamilySet()) {
1846 1847
    const MutableCFOptions mutable_cf_options =
      *cfd->GetLatestMutableCFOptions();
1848 1849 1850 1851 1852
    while (cfd->imm()->IsFlushPending()) {
      LogToBuffer(
          log_buffer,
          "BackgroundCompaction doing FlushMemTableToOutputFile, "
          "compaction slots available %d",
1853
          db_options_.max_background_compactions - bg_compaction_scheduled_);
1854
      cfd->Ref();
1855
      flush_stat = FlushMemTableToOutputFile(
I
Igor Canadi 已提交
1856
          cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
1857
      cfd->Unref();
1858 1859 1860 1861 1862 1863 1864
      if (!flush_stat.ok()) {
        if (is_manual) {
          manual_compaction_->status = flush_stat;
          manual_compaction_->done = true;
          manual_compaction_->in_progress = false;
          manual_compaction_ = nullptr;
        }
1865
        return flush_stat;
1866 1867 1868 1869
      }
    }
  }

1870 1871 1872
  // Compaction makes a copy of the latest MutableCFOptions. It should be used
  // throughout the compaction procedure to make sure consistency. It will
  // eventually be installed into SuperVersion
1873
  unique_ptr<Compaction> c;
1874 1875
  InternalKey manual_end_storage;
  InternalKey* manual_end = &manual_end_storage;
H
hans@chromium.org 已提交
1876
  if (is_manual) {
G
Gabor Cselle 已提交
1877
    ManualCompaction* m = manual_compaction_;
1878
    assert(m->in_progress);
1879 1880 1881
    c.reset(m->cfd->CompactRange(
          *m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level,
          m->output_path_id, m->begin, m->end, &manual_end));
1882
    if (!c) {
1883
      m->done = true;
G
Gabor Cselle 已提交
1884
    }
I
Igor Canadi 已提交
1885 1886 1887 1888 1889 1890 1891 1892 1893
    LogToBuffer(log_buffer,
                "[%s] Manual compaction from level-%d to level-%d from %s .. "
                "%s; will stop at %s\n",
                m->cfd->GetName().c_str(), m->input_level, m->output_level,
                (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
                (m->end ? m->end->DebugString().c_str() : "(end)"),
                ((m->done || manual_end == nullptr)
                     ? "(end)"
                     : manual_end->DebugString().c_str()));
I
Igor Canadi 已提交
1894
  } else {
1895
    // no need to refcount in iteration since it's always under a mutex
I
Igor Canadi 已提交
1896
    for (auto cfd : *versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
1897 1898 1899 1900
      // Pick up latest mutable CF Options and use it throughout the
      // compaction job
      auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
      if (!mutable_cf_options->disable_auto_compactions) {
1901 1902 1903
        // NOTE: try to avoid unnecessary copy of MutableCFOptions if
        // compaction is not necessary. Need to make sure mutex is held
        // until we make a copy in the following code
L
Lei Jin 已提交
1904
        c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
I
Igor Canadi 已提交
1905 1906
        if (c != nullptr) {
          // update statistics
L
Lei Jin 已提交
1907
          MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
I
Igor Canadi 已提交
1908 1909 1910
                      c->inputs(0)->size());
          break;
        }
I
Igor Canadi 已提交
1911 1912
      }
    }
J
jorlow@chromium.org 已提交
1913 1914 1915
  }

  Status status;
1916
  if (!c) {
H
hans@chromium.org 已提交
1917
    // Nothing to do
1918
    LogToBuffer(log_buffer, "Compaction nothing to do");
I
Igor Canadi 已提交
1919 1920 1921 1922 1923
  } else if (c->IsDeletionCompaction()) {
    // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
    // file if there is alive snapshot pointing to it
    assert(c->num_input_files(1) == 0);
    assert(c->level() == 0);
1924
    assert(c->column_family_data()->ioptions()->compaction_style ==
I
Igor Canadi 已提交
1925 1926
           kCompactionStyleFIFO);
    for (const auto& f : *c->inputs(0)) {
1927
      c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
I
Igor Canadi 已提交
1928
    }
1929 1930 1931
    status = versions_->LogAndApply(
        c->column_family_data(), *c->mutable_cf_options(), c->edit(),
        &mutex_, db_directory_.get());
I
Igor Canadi 已提交
1932 1933
    InstallSuperVersionBackground(c->column_family_data(), job_context,
                                  *c->mutable_cf_options());
I
Igor Canadi 已提交
1934 1935 1936 1937 1938
    LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
                c->column_family_data()->GetName().c_str(),
                c->num_input_files(0));
    c->ReleaseCompactionFiles(status);
    *madeProgress = true;
H
hans@chromium.org 已提交
1939
  } else if (!is_manual && c->IsTrivialMove()) {
J
jorlow@chromium.org 已提交
1940
    // Move file to next level
1941
    assert(c->num_input_files(0) == 1);
J
jorlow@chromium.org 已提交
1942
    FileMetaData* f = c->input(0, 0);
1943
    c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
1944 1945 1946
    c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
                       f->fd.GetFileSize(), f->smallest, f->largest,
                       f->smallest_seqno, f->largest_seqno);
1947 1948 1949
    status = versions_->LogAndApply(c->column_family_data(),
                                    *c->mutable_cf_options(),
                                    c->edit(), &mutex_, db_directory_.get());
1950
    // Use latest MutableCFOptions
I
Igor Canadi 已提交
1951 1952
    InstallSuperVersionBackground(c->column_family_data(), job_context,
                                  *c->mutable_cf_options());
1953

S
sdong 已提交
1954 1955 1956 1957 1958
    VersionStorageInfo::LevelSummaryStorage tmp;
    LogToBuffer(log_buffer, "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64
                            " bytes %s: %s\n",
                c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
                c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(),
S
sdong 已提交
1959
                c->input_version()->storage_info()->LevelSummary(&tmp));
I
Igor Canadi 已提交
1960
    c->ReleaseCompactionFiles(status);
1961
    *madeProgress = true;
J
jorlow@chromium.org 已提交
1962
  } else {
1963
    MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
1964
    CompactionState* compact = new CompactionState(c.get());
I
Igor Canadi 已提交
1965 1966
    status = DoCompactionWork(compact, *c->mutable_cf_options(), job_context,
                              log_buffer);
1967
    CleanupCompaction(compact, status);
I
Igor Canadi 已提交
1968
    c->ReleaseCompactionFiles(status);
1969
    c->ReleaseInputs();
1970
    *madeProgress = true;
J
jorlow@chromium.org 已提交
1971
  }
1972
  c.reset();
J
jorlow@chromium.org 已提交
1973 1974 1975

  if (status.ok()) {
    // Done
F
Feng Zhu 已提交
1976
  } else if (status.IsShutdownInProgress()) {
J
jorlow@chromium.org 已提交
1977 1978
    // Ignore compaction errors found during shutting down
  } else {
1979
    Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Compaction error: %s",
1980
        status.ToString().c_str());
1981
    if (db_options_.paranoid_checks && bg_error_.ok()) {
J
jorlow@chromium.org 已提交
1982 1983 1984
      bg_error_ = status;
    }
  }
H
hans@chromium.org 已提交
1985 1986

  if (is_manual) {
G
Gabor Cselle 已提交
1987
    ManualCompaction* m = manual_compaction_;
1988
    if (!status.ok()) {
L
Lei Jin 已提交
1989
      m->status = status;
1990 1991
      m->done = true;
    }
1992 1993 1994 1995 1996 1997 1998 1999 2000
    // For universal compaction:
    //   Because universal compaction always happens at level 0, so one
    //   compaction will pick up all overlapped files. No files will be
    //   filtered out due to size limit and left for a successive compaction.
    //   So we can safely conclude the current compaction.
    //
    //   Also note that, if we don't stop here, then the current compaction
    //   writes a new file back to level 0, which will be used in successive
    //   compaction. Hence the manual compaction will never finish.
2001 2002 2003 2004 2005
    //
    // Stop the compaction if manual_end points to nullptr -- this means
    // that we compacted the whole range. manual_end should always point
    // to nullptr in case of universal compaction
    if (manual_end == nullptr) {
2006 2007
      m->done = true;
    }
G
Gabor Cselle 已提交
2008 2009 2010
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
I
Igor Canadi 已提交
2011
      // Universal and FIFO compactions should always compact the whole range
2012 2013
      assert(m->cfd->ioptions()->compaction_style != kCompactionStyleUniversal);
      assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
2014
      m->tmp_storage = *manual_end;
G
Gabor Cselle 已提交
2015 2016
      m->begin = &m->tmp_storage;
    }
2017
    m->in_progress = false; // not being processed anymore
2018
    manual_compaction_ = nullptr;
H
hans@chromium.org 已提交
2019
  }
2020
  return status;
J
jorlow@chromium.org 已提交
2021 2022
}

2023
void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
J
jorlow@chromium.org 已提交
2024
  mutex_.AssertHeld();
2025
  if (compact->builder != nullptr) {
J
jorlow@chromium.org 已提交
2026 2027
    // May happen if we get a shutdown call in the middle of compaction
    compact->builder->Abandon();
2028
    compact->builder.reset();
J
jorlow@chromium.org 已提交
2029
  } else {
2030
    assert(!status.ok() || compact->outfile == nullptr);
J
jorlow@chromium.org 已提交
2031
  }
D
dgrogan@chromium.org 已提交
2032
  for (size_t i = 0; i < compact->outputs.size(); i++) {
J
jorlow@chromium.org 已提交
2033 2034
    const CompactionState::Output& out = compact->outputs[i];
    pending_outputs_.erase(out.number);
2035 2036 2037 2038

    // If this file was inserted into the table cache then remove
    // them here because this compaction was not committed.
    if (!status.ok()) {
I
Igor Canadi 已提交
2039
      TableCache::Evict(table_cache_.get(), out.number);
2040
    }
J
jorlow@chromium.org 已提交
2041 2042 2043 2044
  }
  delete compact;
}

2045
// Allocate the file numbers for the output file. We allocate as
2046
// many output file numbers as there are files in level+1 (at least one)
2047 2048 2049
// Insert them into pending_outputs so that they do not get deleted.
void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
  mutex_.AssertHeld();
2050 2051
  assert(compact != nullptr);
  assert(compact->builder == nullptr);
2052
  int filesNeeded = compact->compaction->num_input_files(1);
2053
  for (int i = 0; i < std::max(filesNeeded, 1); i++) {
2054
    uint64_t file_number = versions_->NewFileNumber();
2055
    pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
2056 2057 2058 2059 2060 2061 2062
    compact->allocated_file_numbers.push_back(file_number);
  }
}

// Frees up unused file number.
void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
  mutex_.AssertHeld();
2063
  for (const auto file_number : compact->allocated_file_numbers) {
2064 2065 2066 2067
    pending_outputs_.erase(file_number);
  }
}

2068 2069
Status DBImpl::OpenCompactionOutputFile(
    CompactionState* compact, const MutableCFOptions& mutable_cf_options) {
2070 2071
  assert(compact != nullptr);
  assert(compact->builder == nullptr);
J
jorlow@chromium.org 已提交
2072
  uint64_t file_number;
2073 2074 2075 2076 2077 2078 2079
  // If we have not yet exhausted the pre-allocated file numbers,
  // then use the one from the front. Otherwise, we have to acquire
  // the heavyweight lock and allocate a new file number.
  if (!compact->allocated_file_numbers.empty()) {
    file_number = compact->allocated_file_numbers.front();
    compact->allocated_file_numbers.pop_front();
  } else {
J
jorlow@chromium.org 已提交
2080 2081
    mutex_.Lock();
    file_number = versions_->NewFileNumber();
2082
    pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
J
jorlow@chromium.org 已提交
2083 2084
    mutex_.Unlock();
  }
2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098
  // Make the output file
  std::string fname = TableFileName(db_options_.db_paths, file_number,
                                    compact->compaction->GetOutputPathId());
  Status s = env_->NewWritableFile(fname, &compact->outfile, env_options_);

  if (!s.ok()) {
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
        "[%s] OpenCompactionOutputFiles for table #%" PRIu64 " "
        "fails at NewWritableFile with status %s",
        compact->compaction->column_family_data()->GetName().c_str(),
        file_number, s.ToString().c_str());
    LogFlush(db_options_.info_log);
    return s;
  }
2099 2100
  CompactionState::Output out;
  out.number = file_number;
2101
  out.path_id = compact->compaction->GetOutputPathId();
2102 2103
  out.smallest.Clear();
  out.largest.Clear();
2104
  out.smallest_seqno = out.largest_seqno = 0;
J
jorlow@chromium.org 已提交
2105

2106 2107 2108 2109
  compact->outputs.push_back(out);
  compact->outfile->SetIOPriority(Env::IO_LOW);
  compact->outfile->SetPreallocationBlockSize(
      compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
2110

2111 2112 2113 2114 2115
  ColumnFamilyData* cfd = compact->compaction->column_family_data();
  compact->builder.reset(NewTableBuilder(
      *cfd->ioptions(), cfd->internal_comparator(), compact->outfile.get(),
      compact->compaction->OutputCompressionType(),
      cfd->ioptions()->compression_opts));
2116
  LogFlush(db_options_.info_log);
J
jorlow@chromium.org 已提交
2117 2118 2119 2120 2121
  return s;
}

Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
                                          Iterator* input) {
2122
  assert(compact != nullptr);
2123
  assert(compact->outfile);
2124
  assert(compact->builder != nullptr);
J
jorlow@chromium.org 已提交
2125 2126

  const uint64_t output_number = compact->current_output()->number;
2127
  const uint32_t output_path_id = compact->current_output()->path_id;
J
jorlow@chromium.org 已提交
2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140
  assert(output_number != 0);

  // Check for iterator errors
  Status s = input->status();
  const uint64_t current_entries = compact->builder->NumEntries();
  if (s.ok()) {
    s = compact->builder->Finish();
  } else {
    compact->builder->Abandon();
  }
  const uint64_t current_bytes = compact->builder->FileSize();
  compact->current_output()->file_size = current_bytes;
  compact->total_bytes += current_bytes;
2141
  compact->builder.reset();
J
jorlow@chromium.org 已提交
2142 2143

  // Finish and check for file errors
2144 2145
  if (s.ok() && !db_options_.disableDataSync) {
    if (db_options_.use_fsync) {
L
Lei Jin 已提交
2146
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
2147 2148
      s = compact->outfile->Fsync();
    } else {
L
Lei Jin 已提交
2149
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
2150 2151
      s = compact->outfile->Sync();
    }
J
jorlow@chromium.org 已提交
2152 2153 2154 2155
  }
  if (s.ok()) {
    s = compact->outfile->Close();
  }
2156
  compact->outfile.reset();
J
jorlow@chromium.org 已提交
2157 2158 2159

  if (s.ok() && current_entries > 0) {
    // Verify that the table is usable
I
Igor Canadi 已提交
2160
    ColumnFamilyData* cfd = compact->compaction->column_family_data();
2161
    FileDescriptor fd(output_number, output_path_id, current_bytes);
I
Igor Canadi 已提交
2162
    Iterator* iter = cfd->table_cache()->NewIterator(
L
Lei Jin 已提交
2163
        ReadOptions(), env_options_, cfd->internal_comparator(), fd);
J
jorlow@chromium.org 已提交
2164 2165 2166
    s = iter->status();
    delete iter;
    if (s.ok()) {
2167
      Log(db_options_.info_log, "[%s] Generated table #%" PRIu64 ": %" PRIu64
2168 2169 2170
                             " keys, %" PRIu64 " bytes",
          cfd->GetName().c_str(), output_number, current_entries,
          current_bytes);
J
jorlow@chromium.org 已提交
2171 2172 2173 2174 2175 2176
    }
  }
  return s;
}


2177
Status DBImpl::InstallCompactionResults(CompactionState* compact,
2178
    const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer) {
J
jorlow@chromium.org 已提交
2179
  mutex_.AssertHeld();
2180 2181 2182 2183 2184

  // paranoia: verify that the files that we started with
  // still exist in the current version and in the same original level.
  // This ensures that a concurrent compaction did not erroneously
  // pick the same files to compact.
2185
  if (!versions_->VerifyCompactionFileConsistency(compact->compaction)) {
2186
    Log(db_options_.info_log, "[%s] Compaction %d@%d + %d@%d files aborted",
I
Igor Canadi 已提交
2187 2188 2189 2190
        compact->compaction->column_family_data()->GetName().c_str(),
        compact->compaction->num_input_files(0), compact->compaction->level(),
        compact->compaction->num_input_files(1),
        compact->compaction->output_level());
L
Lei Jin 已提交
2191
    return Status::Corruption("Compaction input files inconsistent");
2192 2193
  }

I
Igor Canadi 已提交
2194 2195
  LogToBuffer(log_buffer, "[%s] Compacted %d@%d + %d@%d files => %lld bytes",
              compact->compaction->column_family_data()->GetName().c_str(),
2196 2197 2198 2199 2200
              compact->compaction->num_input_files(0),
              compact->compaction->level(),
              compact->compaction->num_input_files(1),
              compact->compaction->output_level(),
              static_cast<long long>(compact->total_bytes));
J
jorlow@chromium.org 已提交
2201 2202 2203

  // Add compaction outputs
  compact->compaction->AddInputDeletions(compact->compaction->edit());
D
dgrogan@chromium.org 已提交
2204
  for (size_t i = 0; i < compact->outputs.size(); i++) {
J
jorlow@chromium.org 已提交
2205
    const CompactionState::Output& out = compact->outputs[i];
2206 2207 2208 2209
    compact->compaction->edit()->AddFile(compact->compaction->output_level(),
                                         out.number, out.path_id, out.file_size,
                                         out.smallest, out.largest,
                                         out.smallest_seqno, out.largest_seqno);
J
jorlow@chromium.org 已提交
2210
  }
I
Igor Canadi 已提交
2211
  return versions_->LogAndApply(compact->compaction->column_family_data(),
2212
                                mutable_cf_options,
I
Igor Canadi 已提交
2213 2214
                                compact->compaction->edit(), &mutex_,
                                db_directory_.get());
J
jorlow@chromium.org 已提交
2215 2216
}

2217 2218 2219 2220 2221 2222 2223
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
2224 2225
  SequenceNumber in, std::vector<SequenceNumber>& snapshots,
  SequenceNumber* prev_snapshot) {
2226
  SequenceNumber prev __attribute__((unused)) = 0;
2227 2228 2229
  for (const auto cur : snapshots) {
    assert(prev <= cur);
    if (cur >= in) {
2230
      *prev_snapshot = prev;
2231
      return cur;
2232
    }
2233 2234
    prev = cur; // assignment
    assert(prev);
2235
  }
2236
  Log(db_options_.info_log,
2237 2238
      "Looking for seqid %" PRIu64 " but maxseqid is %" PRIu64 "", in,
      snapshots[snapshots.size() - 1]);
2239 2240 2241 2242
  assert(0);
  return 0;
}

I
Igor Canadi 已提交
2243 2244 2245
uint64_t DBImpl::CallFlushDuringCompaction(
    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
    JobContext* job_context, LogBuffer* log_buffer) {
2246
  if (db_options_.max_background_flushes > 0) {
2247 2248 2249
    // flush thread will take care of this
    return 0;
  }
I
Igor Canadi 已提交
2250
  if (cfd->imm()->imm_flush_needed.load(std::memory_order_relaxed)) {
I
Igor Canadi 已提交
2251 2252 2253 2254
    const uint64_t imm_start = env_->NowMicros();
    mutex_.Lock();
    if (cfd->imm()->IsFlushPending()) {
      cfd->Ref();
I
Igor Canadi 已提交
2255 2256
      FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, job_context,
                                log_buffer);
I
Igor Canadi 已提交
2257
      cfd->Unref();
I
Igor Canadi 已提交
2258
      bg_cv_.SignalAll();  // Wakeup DelayWrite() if necessary
I
Igor Canadi 已提交
2259 2260 2261 2262 2263 2264 2265 2266
    }
    mutex_.Unlock();
    log_buffer->FlushBufferToLog();
    return env_->NowMicros() - imm_start;
  }
  return 0;
}

D
Danny Guo 已提交
2267
Status DBImpl::ProcessKeyValueCompaction(
I
Igor Canadi 已提交
2268 2269 2270 2271 2272
    const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported,
    SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot,
    SequenceNumber latest_snapshot, JobContext* job_context,
    bool bottommost_level, int64_t* imm_micros, Iterator* input,
    CompactionState* compact, bool is_compaction_v2, int* num_output_records,
D
Danny Guo 已提交
2273
    LogBuffer* log_buffer) {
2274 2275
  assert(num_output_records != nullptr);

D
Danny Guo 已提交
2276
  size_t combined_idx = 0;
J
jorlow@chromium.org 已提交
2277
  Status status;
D
Danny Guo 已提交
2278
  std::string compaction_filter_value;
J
jorlow@chromium.org 已提交
2279
  ParsedInternalKey ikey;
2280
  IterKey current_user_key;
J
jorlow@chromium.org 已提交
2281
  bool has_current_user_key = false;
2282
  IterKey delete_key;
2283 2284
  SequenceNumber last_sequence_for_key __attribute__((unused)) =
    kMaxSequenceNumber;
2285
  SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
2286
  ColumnFamilyData* cfd = compact->compaction->column_family_data();
I
Igor Canadi 已提交
2287
  MergeHelper merge(
2288
      cfd->user_comparator(), cfd->ioptions()->merge_operator,
L
Lei Jin 已提交
2289
      db_options_.info_log.get(), cfd->ioptions()->min_partial_merge_operands,
2290
      false /* internal key corruption is expected */);
2291
  auto compaction_filter = cfd->ioptions()->compaction_filter;
2292 2293
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (!compaction_filter) {
2294
    auto context = compact->GetFilterContextV1();
2295
    compaction_filter_from_factory =
2296
        cfd->ioptions()->compaction_filter_factory->CreateCompactionFilter(
I
Igor Canadi 已提交
2297
            context);
2298 2299
    compaction_filter = compaction_filter_from_factory.get();
  }
2300

2301 2302 2303 2304
  int64_t key_drop_user = 0;
  int64_t key_drop_newer_entry = 0;
  int64_t key_drop_obsolete = 0;
  int64_t loop_cnt = 0;
I
Igor Canadi 已提交
2305
  while (input->Valid() && !shutting_down_.load(std::memory_order_acquire) &&
2306
         !cfd->IsDropped() && status.ok()) {
2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323
    if (++loop_cnt > 1000) {
      if (key_drop_user > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
        key_drop_user = 0;
      }
      if (key_drop_newer_entry > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
                   key_drop_newer_entry);
        key_drop_newer_entry = 0;
      }
      if (key_drop_obsolete > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
        key_drop_obsolete = 0;
      }
      RecordCompactionIOStats();
      loop_cnt = 0;
    }
2324 2325 2326 2327
    // FLUSH preempts compaction
    // TODO(icanadi) this currently only checks if flush is necessary on
    // compacting column family. we should also check if flush is necessary on
    // other column families, too
I
Igor Canadi 已提交
2328 2329
    (*imm_micros) += CallFlushDuringCompaction(cfd, mutable_cf_options,
                                               job_context, log_buffer);
2330

D
Danny Guo 已提交
2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351
    Slice key;
    Slice value;
    // If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
    // This prefix batch should contain results after calling
    // compaction_filter_v2.
    //
    // If is_compaction_v2 is off, this function will go through all the
    // kv-pairs in input.
    if (!is_compaction_v2) {
      key = input->key();
      value = input->value();
    } else {
      if (combined_idx >= compact->combined_key_buf_.size()) {
        break;
      }
      assert(combined_idx < compact->combined_key_buf_.size());
      key = compact->combined_key_buf_[combined_idx];
      value = compact->combined_value_buf_[combined_idx];

      ++combined_idx;
    }
H
Haobo Xu 已提交
2352

2353
    if (compact->compaction->ShouldStopBefore(key) &&
2354
        compact->builder != nullptr) {
D
Danny Guo 已提交
2355
      status = FinishCompactionOutputFile(compact, input);
2356 2357 2358 2359 2360 2361
      if (!status.ok()) {
        break;
      }
    }

    // Handle key/value, add to state, etc.
J
jorlow@chromium.org 已提交
2362
    bool drop = false;
2363
    bool current_entry_is_merging = false;
J
jorlow@chromium.org 已提交
2364 2365
    if (!ParseInternalKey(key, &ikey)) {
      // Do not hide error keys
2366 2367
      // TODO: error key stays in db forever? Figure out the intention/rationale
      // v10 error v8 : we cannot hide v8 even though it's pretty obvious.
2368
      current_user_key.Clear();
J
jorlow@chromium.org 已提交
2369 2370
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
2371
      visible_in_snapshot = kMaxSequenceNumber;
J
jorlow@chromium.org 已提交
2372 2373
    } else {
      if (!has_current_user_key ||
2374
          cfd->user_comparator()->Compare(ikey.user_key,
2375
                                          current_user_key.GetKey()) != 0) {
J
jorlow@chromium.org 已提交
2376
        // First occurrence of this user key
L
Lei Jin 已提交
2377
        current_user_key.SetKey(ikey.user_key);
J
jorlow@chromium.org 已提交
2378 2379
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
2380
        visible_in_snapshot = kMaxSequenceNumber;
H
Haobo Xu 已提交
2381
        // apply the compaction filter to the first occurrence of the user key
D
Danny Guo 已提交
2382
        if (compaction_filter && !is_compaction_v2 &&
H
Haobo Xu 已提交
2383 2384 2385 2386 2387 2388 2389 2390 2391
            ikey.type == kTypeValue &&
            (visible_at_tip || ikey.sequence > latest_snapshot)) {
          // If the user has specified a compaction filter and the sequence
          // number is greater than any external snapshot, then invoke the
          // filter.
          // If the return value of the compaction filter is true, replace
          // the entry with a delete marker.
          bool value_changed = false;
          compaction_filter_value.clear();
I
Igor Canadi 已提交
2392 2393 2394
          bool to_delete = compaction_filter->Filter(
              compact->compaction->level(), ikey.user_key, value,
              &compaction_filter_value, &value_changed);
H
Haobo Xu 已提交
2395
          if (to_delete) {
2396 2397 2398
            // make a copy of the original key and convert it to a delete
            delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
                                      kTypeDeletion);
H
Haobo Xu 已提交
2399
            // anchor the key again
2400
            key = delete_key.GetKey();
H
Haobo Xu 已提交
2401 2402 2403 2404
            // needed because ikey is backed by key
            ParseInternalKey(key, &ikey);
            // no value associated with delete
            value.clear();
2405
            ++key_drop_user;
H
Haobo Xu 已提交
2406 2407 2408 2409
          } else if (value_changed) {
            value = compaction_filter_value;
          }
        }
J
jorlow@chromium.org 已提交
2410 2411
      }

2412 2413 2414
      // If there are no snapshots, then this kv affect visibility at tip.
      // Otherwise, search though all existing snapshots to find
      // the earlist snapshot that is affected by this kv.
2415
      SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
2416 2417 2418 2419
      SequenceNumber visible = visible_at_tip ? visible_at_tip :
        is_snapshot_supported ?  findEarliestVisibleSnapshot(ikey.sequence,
                                  compact->existing_snapshots, &prev_snapshot)
                              : 0;
2420 2421 2422 2423 2424

      if (visible_in_snapshot == visible) {
        // If the earliest snapshot is which this key is visible in
        // is the same as the visibily of a previous instance of the
        // same key, then this kv is not visible in any snapshot.
J
jorlow@chromium.org 已提交
2425
        // Hidden by an newer entry for same user key
2426
        // TODO: why not > ?
2427
        assert(last_sequence_for_key >= ikey.sequence);
J
jorlow@chromium.org 已提交
2428
        drop = true;    // (A)
2429
        ++key_drop_newer_entry;
J
jorlow@chromium.org 已提交
2430
      } else if (ikey.type == kTypeDeletion &&
D
Danny Guo 已提交
2431
          ikey.sequence <= earliest_snapshot &&
2432
          compact->compaction->KeyNotExistsBeyondOutputLevel(ikey.user_key)) {
J
jorlow@chromium.org 已提交
2433 2434 2435 2436 2437 2438 2439 2440
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
2441
        ++key_drop_obsolete;
2442
      } else if (ikey.type == kTypeMerge) {
2443 2444 2445 2446 2447 2448
        if (!merge.HasOperator()) {
          LogToBuffer(log_buffer, "Options::merge_operator is null.");
          status = Status::InvalidArgument(
              "merge_operator is not properly initialized.");
          break;
        }
2449 2450 2451 2452 2453 2454
        // We know the merge type entry is not hidden, otherwise we would
        // have hit (A)
        // We encapsulate the merge related state machine in a different
        // object to minimize change to the existing flow. Turn out this
        // logic could also be nicely re-used for memtable flush purge
        // optimization in BuildTable.
D
Danny Guo 已提交
2455 2456
        int steps = 0;
        merge.MergeUntil(input, prev_snapshot, bottommost_level,
2457
            db_options_.statistics.get(), &steps);
D
Danny Guo 已提交
2458 2459 2460
        // Skip the Merge ops
        combined_idx = combined_idx - 1 + steps;

2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478
        current_entry_is_merging = true;
        if (merge.IsSuccess()) {
          // Successfully found Put/Delete/(end-of-key-range) while merging
          // Get the merge result
          key = merge.key();
          ParseInternalKey(key, &ikey);
          value = merge.value();
        } else {
          // Did not find a Put/Delete/(end-of-key-range) while merging
          // We now have some stack of merge operands to write out.
          // NOTE: key,value, and ikey are now referring to old entries.
          //       These will be correctly set below.
          assert(!merge.keys().empty());
          assert(merge.keys().size() == merge.values().size());

          // Hack to make sure last_sequence_for_key is correct
          ParseInternalKey(merge.keys().front(), &ikey);
        }
J
jorlow@chromium.org 已提交
2479 2480 2481
      }

      last_sequence_for_key = ikey.sequence;
2482
      visible_in_snapshot = visible;
J
jorlow@chromium.org 已提交
2483 2484 2485
    }

    if (!drop) {
2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501
      // We may write a single key (e.g.: for Put/Delete or successful merge).
      // Or we may instead have to write a sequence/list of keys.
      // We have to write a sequence iff we have an unsuccessful merge
      bool has_merge_list = current_entry_is_merging && !merge.IsSuccess();
      const std::deque<std::string>* keys = nullptr;
      const std::deque<std::string>* values = nullptr;
      std::deque<std::string>::const_reverse_iterator key_iter;
      std::deque<std::string>::const_reverse_iterator value_iter;
      if (has_merge_list) {
        keys = &merge.keys();
        values = &merge.values();
        key_iter = keys->rbegin();    // The back (*rbegin()) is the first key
        value_iter = values->rbegin();

        key = Slice(*key_iter);
        value = Slice(*value_iter);
2502
      }
2503

2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514
      // If we have a list of keys to write, traverse the list.
      // If we have a single key to write, simply write that key.
      while (true) {
        // Invariant: key,value,ikey will always be the next entry to write
        char* kptr = (char*)key.data();
        std::string kstr;

        // Zeroing out the sequence number leads to better compression.
        // If this is the bottommost level (no files in lower levels)
        // and the earliest snapshot is larger than this seqno
        // then we can squash the seqno to zero.
2515
        if (bottommost_level && ikey.sequence < earliest_snapshot &&
2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526
            ikey.type != kTypeMerge) {
          assert(ikey.type != kTypeDeletion);
          // make a copy because updating in place would cause problems
          // with the priority queue that is managing the input key iterator
          kstr.assign(key.data(), key.size());
          kptr = (char *)kstr.c_str();
          UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type);
        }

        Slice newkey(kptr, key.size());
        assert((key.clear(), 1)); // we do not need 'key' anymore
2527

2528 2529
        // Open output file if necessary
        if (compact->builder == nullptr) {
2530
          status = OpenCompactionOutputFile(compact, mutable_cf_options);
2531 2532 2533 2534
          if (!status.ok()) {
            break;
          }
        }
2535 2536

        SequenceNumber seqno = GetInternalKeySeqno(newkey);
2537 2538
        if (compact->builder->NumEntries() == 0) {
          compact->current_output()->smallest.DecodeFrom(newkey);
2539 2540 2541 2542
          compact->current_output()->smallest_seqno = seqno;
        } else {
          compact->current_output()->smallest_seqno =
            std::min(compact->current_output()->smallest_seqno, seqno);
2543 2544 2545
        }
        compact->current_output()->largest.DecodeFrom(newkey);
        compact->builder->Add(newkey, value);
2546
        (*num_output_records)++,
2547 2548
        compact->current_output()->largest_seqno =
          std::max(compact->current_output()->largest_seqno, seqno);
2549 2550 2551 2552

        // Close output file if it is big enough
        if (compact->builder->FileSize() >=
            compact->compaction->MaxOutputFileSize()) {
D
Danny Guo 已提交
2553
          status = FinishCompactionOutputFile(compact, input);
2554 2555 2556
          if (!status.ok()) {
            break;
          }
J
jorlow@chromium.org 已提交
2557 2558
        }

2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578
        // If we have a list of entries, move to next element
        // If we only had one entry, then break the loop.
        if (has_merge_list) {
          ++key_iter;
          ++value_iter;

          // If at end of list
          if (key_iter == keys->rend() || value_iter == values->rend()) {
            // Sanity Check: if one ends, then both end
            assert(key_iter == keys->rend() && value_iter == values->rend());
            break;
          }

          // Otherwise not at end of list. Update key, value, and ikey.
          key = Slice(*key_iter);
          value = Slice(*value_iter);
          ParseInternalKey(key, &ikey);

        } else{
          // Only had one item to begin with (Put/Delete)
J
jorlow@chromium.org 已提交
2579 2580
          break;
        }
2581 2582
      }  // while (true)
    }  // if (!drop)
J
jorlow@chromium.org 已提交
2583

2584
    // MergeUntil has moved input to the next entry
2585
    if (!current_entry_is_merging) {
2586 2587
      input->Next();
    }
J
jorlow@chromium.org 已提交
2588
  }
2589 2590 2591 2592 2593 2594 2595 2596 2597
  if (key_drop_user > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
  }
  if (key_drop_newer_entry > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
  }
  if (key_drop_obsolete > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
  }
2598 2599
  RecordCompactionIOStats();

D
Danny Guo 已提交
2600 2601 2602 2603 2604 2605 2606 2607 2608
  return status;
}

void DBImpl::CallCompactionFilterV2(CompactionState* compact,
  CompactionFilterV2* compaction_filter_v2) {
  if (compact == nullptr || compaction_filter_v2 == nullptr) {
    return;
  }

2609
  // Assemble slice vectors for user keys and existing values.
2610
  // We also keep track of our parsed internal key structs because
2611 2612 2613
  // we may need to access the sequence number in the event that
  // keys are garbage collected during the filter process.
  std::vector<ParsedInternalKey> ikey_buf;
D
Danny Guo 已提交
2614
  std::vector<Slice> user_key_buf;
2615 2616 2617 2618 2619 2620 2621 2622 2623 2624
  std::vector<Slice> existing_value_buf;

  for (const auto& key : compact->key_str_buf_) {
    ParsedInternalKey ikey;
    ParseInternalKey(Slice(key), &ikey);
    ikey_buf.emplace_back(ikey);
    user_key_buf.emplace_back(ikey.user_key);
  }
  for (const auto& value : compact->existing_value_str_buf_) {
    existing_value_buf.emplace_back(Slice(value));
D
Danny Guo 已提交
2625 2626 2627 2628 2629 2630 2631 2632 2633
  }

  // If the user has specified a compaction filter and the sequence
  // number is greater than any external snapshot, then invoke the
  // filter.
  // If the return value of the compaction filter is true, replace
  // the entry with a delete marker.
  compact->to_delete_buf_ = compaction_filter_v2->Filter(
      compact->compaction->level(),
2634
      user_key_buf, existing_value_buf,
D
Danny Guo 已提交
2635 2636 2637 2638 2639 2640
      &compact->new_value_buf_,
      &compact->value_changed_buf_);

  // new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
  // kv-pairs in this compaction run needs to be deleted.
  assert(compact->to_delete_buf_.size() ==
2641
      compact->key_str_buf_.size());
D
Danny Guo 已提交
2642
  assert(compact->to_delete_buf_.size() ==
2643
      compact->existing_value_str_buf_.size());
D
Danny Guo 已提交
2644 2645 2646 2647 2648 2649 2650 2651 2652
  assert(compact->to_delete_buf_.size() ==
      compact->value_changed_buf_.size());

  int new_value_idx = 0;
  for (unsigned int i = 0; i < compact->to_delete_buf_.size(); ++i) {
    if (compact->to_delete_buf_[i]) {
      // update the string buffer directly
      // the Slice buffer points to the updated buffer
      UpdateInternalKey(&compact->key_str_buf_[i][0],
2653 2654 2655
                        compact->key_str_buf_[i].size(),
                        ikey_buf[i].sequence,
                        kTypeDeletion);
D
Danny Guo 已提交
2656 2657

      // no value associated with delete
2658
      compact->existing_value_str_buf_[i].clear();
L
Lei Jin 已提交
2659
      RecordTick(stats_, COMPACTION_KEY_DROP_USER);
D
Danny Guo 已提交
2660
    } else if (compact->value_changed_buf_[i]) {
2661 2662
      compact->existing_value_str_buf_[i] =
          compact->new_value_buf_[new_value_idx++];
D
Danny Guo 已提交
2663 2664 2665 2666 2667
    }
  }  // for
}

Status DBImpl::DoCompactionWork(CompactionState* compact,
2668
                                const MutableCFOptions& mutable_cf_options,
I
Igor Canadi 已提交
2669
                                JobContext* job_context,
D
Danny Guo 已提交
2670 2671 2672 2673 2674
                                LogBuffer* log_buffer) {
  assert(compact);
  compact->CleanupBatchBuffer();
  compact->CleanupMergedBuffer();

F
Feng Zhu 已提交
2675 2676
  // Generate file_levels_ for compaction berfore making Iterator
  compact->compaction->GenerateFileLevels();
D
Danny Guo 已提交
2677
  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
2678
  ColumnFamilyData* cfd = compact->compaction->column_family_data();
2679 2680
  LogToBuffer(
      log_buffer,
I
Igor Canadi 已提交
2681 2682
      "[%s] Compacting %d@%d + %d@%d files, score %.2f slots available %d",
      cfd->GetName().c_str(), compact->compaction->num_input_files(0),
2683 2684
      compact->compaction->level(), compact->compaction->num_input_files(1),
      compact->compaction->output_level(), compact->compaction->score(),
2685
      db_options_.max_background_compactions - bg_compaction_scheduled_);
D
Danny Guo 已提交
2686 2687
  char scratch[2345];
  compact->compaction->Summary(scratch, sizeof(scratch));
I
Igor Canadi 已提交
2688 2689
  LogToBuffer(log_buffer, "[%s] Compaction start summary: %s\n",
              cfd->GetName().c_str(), scratch);
D
Danny Guo 已提交
2690

S
sdong 已提交
2691
  assert(cfd->current()->storage_info()->NumLevelFiles(
S
sdong 已提交
2692
             compact->compaction->level()) > 0);
D
Danny Guo 已提交
2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717
  assert(compact->builder == nullptr);
  assert(!compact->outfile);

  SequenceNumber visible_at_tip = 0;
  SequenceNumber earliest_snapshot;
  SequenceNumber latest_snapshot = 0;
  snapshots_.getAll(compact->existing_snapshots);
  if (compact->existing_snapshots.size() == 0) {
    // optimize for fast path if there are no snapshots
    visible_at_tip = versions_->LastSequence();
    earliest_snapshot = visible_at_tip;
  } else {
    latest_snapshot = compact->existing_snapshots.back();
    // Add the current seqno as the 'latest' virtual
    // snapshot to the end of this list.
    compact->existing_snapshots.push_back(versions_->LastSequence());
    earliest_snapshot = compact->existing_snapshots[0];
  }

  // Is this compaction producing files at the bottommost level?
  bool bottommost_level = compact->compaction->BottomMostLevel();

  // Allocate the output file numbers before we release the lock
  AllocateCompactionOutputFileNumbers(compact);

2718
  bool is_snapshot_supported = IsSnapshotSupported();
D
Danny Guo 已提交
2719 2720
  // Release mutex while we're actually doing the compaction work
  mutex_.Unlock();
2721
  log_buffer->FlushBufferToLog();
D
Danny Guo 已提交
2722

2723
  int num_output_records = 0;
D
Danny Guo 已提交
2724 2725 2726 2727 2728 2729 2730 2731 2732 2733
  const uint64_t start_micros = env_->NowMicros();
  unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
  input->SeekToFirst();

  Status status;
  ParsedInternalKey ikey;
  std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2
    = nullptr;
  auto context = compact->GetFilterContext();
  compaction_filter_from_factory_v2 =
2734 2735
      cfd->ioptions()->compaction_filter_factory_v2->
          CreateCompactionFilterV2(context);
D
Danny Guo 已提交
2736 2737 2738
  auto compaction_filter_v2 =
    compaction_filter_from_factory_v2.get();

2739 2740
  if (!compaction_filter_v2) {
    status = ProcessKeyValueCompaction(
I
Igor Canadi 已提交
2741 2742 2743 2744
        mutable_cf_options, is_snapshot_supported, visible_at_tip,
        earliest_snapshot, latest_snapshot, job_context, bottommost_level,
        &imm_micros, input.get(), compact, false, &num_output_records,
        log_buffer);
2745 2746 2747 2748 2749 2750 2751 2752
  } else {
    // temp_backup_input always point to the start of the current buffer
    // temp_backup_input = backup_input;
    // iterate through input,
    // 1) buffer ineligible keys and value keys into 2 separate buffers;
    // 2) send value_buffer to compaction filter and alternate the values;
    // 3) merge value_buffer with ineligible_value_buffer;
    // 4) run the modified "compaction" using the old for loop.
2753
    bool prefix_initialized = false;
2754 2755 2756
    shared_ptr<Iterator> backup_input(
        versions_->MakeInputIterator(compact->compaction));
    backup_input->SeekToFirst();
I
Igor Canadi 已提交
2757 2758
    while (backup_input->Valid() &&
           !shutting_down_.load(std::memory_order_acquire) &&
2759
           !cfd->IsDropped()) {
I
Igor Canadi 已提交
2760 2761 2762 2763
      // FLUSH preempts compaction
      // TODO(icanadi) this currently only checks if flush is necessary on
      // compacting column family. we should also check if flush is necessary on
      // other column families, too
2764
      imm_micros += CallFlushDuringCompaction(cfd, mutable_cf_options,
I
Igor Canadi 已提交
2765
                                              job_context, log_buffer);
I
Igor Canadi 已提交
2766

D
Danny Guo 已提交
2767 2768 2769 2770 2771
      Slice key = backup_input->key();
      Slice value = backup_input->value();

      if (!ParseInternalKey(key, &ikey)) {
        // log error
2772
        Log(db_options_.info_log, "[%s] Failed to parse key: %s",
I
Igor Canadi 已提交
2773
            cfd->GetName().c_str(), key.ToString().c_str());
D
Danny Guo 已提交
2774 2775
        continue;
      } else {
2776
        const SliceTransform* transformer =
2777
            cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor();
2778 2779 2780 2781 2782
        const auto key_prefix = transformer->Transform(ikey.user_key);
        if (!prefix_initialized) {
          compact->cur_prefix_ = key_prefix.ToString();
          prefix_initialized = true;
        }
D
Danny Guo 已提交
2783
        // If the prefix remains the same, keep buffering
2784
        if (key_prefix.compare(Slice(compact->cur_prefix_)) == 0) {
D
Danny Guo 已提交
2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801
          // Apply the compaction filter V2 to all the kv pairs sharing
          // the same prefix
          if (ikey.type == kTypeValue &&
              (visible_at_tip || ikey.sequence > latest_snapshot)) {
            // Buffer all keys sharing the same prefix for CompactionFilterV2
            // Iterate through keys to check prefix
            compact->BufferKeyValueSlices(key, value);
          } else {
            // buffer ineligible keys
            compact->BufferOtherKeyValueSlices(key, value);
          }
          backup_input->Next();
          continue;
          // finish changing values for eligible keys
        } else {
          // Now prefix changes, this batch is done.
          // Call compaction filter on the buffered values to change the value
2802
          if (compact->key_str_buf_.size() > 0) {
D
Danny Guo 已提交
2803 2804
            CallCompactionFilterV2(compact, compaction_filter_v2);
          }
2805
          compact->cur_prefix_ = key_prefix.ToString();
D
Danny Guo 已提交
2806 2807 2808 2809
        }
      }

      // Merge this batch of data (values + ineligible keys)
2810
      compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
D
Danny Guo 已提交
2811 2812 2813 2814

      // Done buffering for the current prefix. Spit it out to disk
      // Now just iterate through all the kv-pairs
      status = ProcessKeyValueCompaction(
I
Igor Canadi 已提交
2815 2816 2817
          mutable_cf_options, is_snapshot_supported, visible_at_tip,
          earliest_snapshot, latest_snapshot, job_context, bottommost_level,
          &imm_micros, input.get(), compact, true, &num_output_records,
D
Danny Guo 已提交
2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837
          log_buffer);

      if (!status.ok()) {
        break;
      }

      // After writing the kv-pairs, we can safely remove the reference
      // to the string buffer and clean them up
      compact->CleanupBatchBuffer();
      compact->CleanupMergedBuffer();
      // Buffer the key that triggers the mismatch in prefix
      if (ikey.type == kTypeValue &&
        (visible_at_tip || ikey.sequence > latest_snapshot)) {
        compact->BufferKeyValueSlices(key, value);
      } else {
        compact->BufferOtherKeyValueSlices(key, value);
      }
      backup_input->Next();
      if (!backup_input->Valid()) {
        // If this is the single last value, we need to merge it.
2838
        if (compact->key_str_buf_.size() > 0) {
D
Danny Guo 已提交
2839 2840
          CallCompactionFilterV2(compact, compaction_filter_v2);
        }
2841
        compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
D
Danny Guo 已提交
2842 2843

        status = ProcessKeyValueCompaction(
I
Igor Canadi 已提交
2844 2845 2846
            mutable_cf_options, is_snapshot_supported, visible_at_tip,
            earliest_snapshot, latest_snapshot, job_context, bottommost_level,
            &imm_micros, input.get(), compact, true, &num_output_records,
D
Danny Guo 已提交
2847 2848 2849 2850 2851 2852 2853
            log_buffer);

        compact->CleanupBatchBuffer();
        compact->CleanupMergedBuffer();
      }
    }  // done processing all prefix batches
    // finish the last batch
2854
    if (compact->key_str_buf_.size() > 0) {
D
Danny Guo 已提交
2855 2856
      CallCompactionFilterV2(compact, compaction_filter_v2);
    }
2857
    compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
D
Danny Guo 已提交
2858
    status = ProcessKeyValueCompaction(
I
Igor Canadi 已提交
2859 2860 2861
        mutable_cf_options, is_snapshot_supported, visible_at_tip,
        earliest_snapshot, latest_snapshot, job_context, bottommost_level,
        &imm_micros, input.get(), compact, true, &num_output_records,
D
Danny Guo 已提交
2862 2863 2864
        log_buffer);
  }  // checking for compaction filter v2

I
Igor Canadi 已提交
2865 2866
  if (status.ok() &&
      (shutting_down_.load(std::memory_order_acquire) || cfd->IsDropped())) {
L
Lei Jin 已提交
2867
    status = Status::ShutdownInProgress(
2868
        "Database shutdown or Column family drop during compaction");
J
jorlow@chromium.org 已提交
2869
  }
2870
  if (status.ok() && compact->builder != nullptr) {
2871
    status = FinishCompactionOutputFile(compact, input.get());
J
jorlow@chromium.org 已提交
2872 2873 2874 2875
  }
  if (status.ok()) {
    status = input->status();
  }
2876
  input.reset();
J
jorlow@chromium.org 已提交
2877

2878
  if (!db_options_.disableDataSync) {
2879 2880
    db_directory_->Fsync();
  }
I
Igor Canadi 已提交
2881

L
Lei Jin 已提交
2882
  InternalStats::CompactionStats stats(1);
2883
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
M
Mark Callaghan 已提交
2884 2885
  stats.files_in_leveln = compact->compaction->num_input_files(0);
  stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
L
Lei Jin 已提交
2886
  MeasureTime(stats_, COMPACTION_TIME, stats.micros);
2887 2888

  int num_output_files = compact->outputs.size();
2889
  if (compact->builder != nullptr) {
P
Pascal Borreli 已提交
2890
    // An error occurred so ignore the last output.
2891 2892 2893 2894
    assert(num_output_files > 0);
    --num_output_files;
  }
  stats.files_out_levelnp1 = num_output_files;
M
Mark Callaghan 已提交
2895

2896 2897
  uint64_t num_input_records = 0;

2898
  for (int i = 0; i < compact->compaction->num_input_files(0); i++) {
2899
    stats.bytes_readn += compact->compaction->input(0, i)->fd.GetFileSize();
2900 2901
    stats.num_input_records += compact->compaction->input(0, i)->num_entries;
    num_input_records += compact->compaction->input(0, i)->num_entries;
2902
  }
M
Mark Callaghan 已提交
2903

2904
  for (int i = 0; i < compact->compaction->num_input_files(1); i++) {
2905
    stats.bytes_readnp1 += compact->compaction->input(1, i)->fd.GetFileSize();
2906
    num_input_records += compact->compaction->input(1, i)->num_entries;
2907
  }
M
Mark Callaghan 已提交
2908

2909
  for (int i = 0; i < num_output_files; i++) {
2910 2911
    stats.bytes_written += compact->outputs[i].file_size;
  }
2912 2913
  stats.num_dropped_records =
      static_cast<int>(num_input_records) - num_output_records;
2914

2915 2916
  RecordCompactionIOStats();

2917
  LogFlush(db_options_.info_log);
J
jorlow@chromium.org 已提交
2918
  mutex_.Lock();
2919 2920
  cfd->internal_stats()->AddCompactionStats(
      compact->compaction->output_level(), stats);
J
jorlow@chromium.org 已提交
2921

2922 2923 2924 2925
  // if there were any unused file number (mostly in case of
  // compaction error), free up the entry from pending_putputs
  ReleaseCompactionUnusedFileNumbers(compact);

J
jorlow@chromium.org 已提交
2926
  if (status.ok()) {
2927
    status = InstallCompactionResults(compact, mutable_cf_options, log_buffer);
I
Igor Canadi 已提交
2928
    InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
J
jorlow@chromium.org 已提交
2929
  }
S
sdong 已提交
2930 2931 2932 2933 2934 2935 2936
  VersionStorageInfo::LevelSummaryStorage tmp;
  LogToBuffer(log_buffer,
              "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
              "files in(%d, %d) out(%d) "
              "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
              "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
              cfd->GetName().c_str(),
S
sdong 已提交
2937
              cfd->current()->storage_info()->LevelSummary(&tmp),
S
sdong 已提交
2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949
              (stats.bytes_readn + stats.bytes_readnp1) /
                  static_cast<double>(stats.micros),
              stats.bytes_written / static_cast<double>(stats.micros),
              compact->compaction->output_level(), stats.files_in_leveln,
              stats.files_in_levelnp1, stats.files_out_levelnp1,
              stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
              stats.bytes_written / 1048576.0,
              (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
                  (double)stats.bytes_readn,
              stats.bytes_written / (double)stats.bytes_readn,
              status.ToString().c_str(), stats.num_input_records,
              stats.num_dropped_records);
M
Mark Callaghan 已提交
2950

J
jorlow@chromium.org 已提交
2951 2952 2953
  return status;
}

2954 2955
namespace {
struct IterState {
I
Igor Canadi 已提交
2956 2957
  IterState(DBImpl* _db, port::Mutex* _mu, SuperVersion* _super_version)
      : db(_db), mu(_mu), super_version(_super_version) {}
2958 2959

  DBImpl* db;
2960
  port::Mutex* mu;
2961
  SuperVersion* super_version;
2962 2963 2964 2965
};

static void CleanupIteratorState(void* arg1, void* arg2) {
  IterState* state = reinterpret_cast<IterState*>(arg1);
2966

2967
  if (state->super_version->Unref()) {
I
Igor Canadi 已提交
2968
    JobContext job_context;
2969

2970 2971
    state->mu->Lock();
    state->super_version->Cleanup();
I
Igor Canadi 已提交
2972
    state->db->FindObsoleteFiles(&job_context, false, true);
2973 2974 2975
    state->mu->Unlock();

    delete state->super_version;
I
Igor Canadi 已提交
2976 2977
    if (job_context.HaveSomethingToDelete()) {
      state->db->PurgeObsoleteFiles(job_context);
2978
    }
I
Igor Canadi 已提交
2979
  }
T
Tomislav Novak 已提交
2980

2981 2982
  delete state;
}
H
Hans Wennborg 已提交
2983
}  // namespace
2984

L
Lei Jin 已提交
2985
Iterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
2986
                                      ColumnFamilyData* cfd,
2987 2988 2989
                                      SuperVersion* super_version,
                                      Arena* arena) {
  Iterator* internal_iter;
2990 2991 2992 2993 2994
  assert(arena != nullptr);
  // Need to create internal iterator from the arena.
  MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
  // Collect iterator for mutable mem
  merge_iter_builder.AddIterator(
L
Lei Jin 已提交
2995
      super_version->mem->NewIterator(read_options, arena));
2996
  // Collect all needed child iterators for immutable memtables
L
Lei Jin 已提交
2997
  super_version->imm->AddIterators(read_options, &merge_iter_builder);
2998
  // Collect iterators for files in L0 - Ln
L
Lei Jin 已提交
2999
  super_version->current->AddIterators(read_options, env_options_,
3000 3001
                                       &merge_iter_builder);
  internal_iter = merge_iter_builder.Finish();
3002
  IterState* cleanup = new IterState(this, &mutex_, super_version);
3003
  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
J
jorlow@chromium.org 已提交
3004 3005 3006 3007

  return internal_iter;
}

3008 3009 3010 3011
ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
  return default_cf_handle_;
}

L
Lei Jin 已提交
3012
Status DBImpl::Get(const ReadOptions& read_options,
3013
                   ColumnFamilyHandle* column_family, const Slice& key,
J
jorlow@chromium.org 已提交
3014
                   std::string* value) {
L
Lei Jin 已提交
3015
  return GetImpl(read_options, column_family, key, value);
3016 3017
}

I
Igor Canadi 已提交
3018 3019
// JobContext gets created and destructed outside of the lock --
// we
I
Igor Canadi 已提交
3020 3021
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
3022
// * delete SuperVersion()s outside of the lock -- superversions_to_free
I
Igor Canadi 已提交
3023
//
I
Igor Canadi 已提交
3024 3025 3026 3027
// However, if InstallSuperVersion() gets called twice with the same
// job_context, we can't reuse the SuperVersion() that got
// malloced
// because
I
Igor Canadi 已提交
3028 3029 3030
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
I
Igor Canadi 已提交
3031 3032
void DBImpl::InstallSuperVersionBackground(
    ColumnFamilyData* cfd, JobContext* job_context,
3033
    const MutableCFOptions& mutable_cf_options) {
3034
  mutex_.AssertHeld();
I
Igor Canadi 已提交
3035 3036 3037 3038
  SuperVersion* old_superversion = InstallSuperVersion(
      cfd, job_context->new_superversion, mutable_cf_options);
  job_context->new_superversion = nullptr;
  job_context->superversions_to_free.push_back(old_superversion);
I
Igor Canadi 已提交
3039 3040
}

L
Lei Jin 已提交
3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066
SuperVersion* DBImpl::InstallSuperVersion(
    ColumnFamilyData* cfd, SuperVersion* new_sv,
    const MutableCFOptions& mutable_cf_options) {
  mutex_.AssertHeld();
  auto* old = cfd->InstallSuperVersion(
      new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);

  // We want to schedule potential flush or compactions since new options may
  // have been picked up in this new version. New options may cause flush
  // compaction trigger condition to change.
  MaybeScheduleFlushOrCompaction();

  // Update max_total_in_memory_state_
  auto old_memtable_size = 0;
  if (old) {
    old_memtable_size = old->mutable_cf_options.write_buffer_size *
                        old->mutable_cf_options.max_write_buffer_number;
  }
  max_total_in_memory_state_ =
      max_total_in_memory_state_ - old_memtable_size +
      mutable_cf_options.write_buffer_size *
      mutable_cf_options.max_write_buffer_number;
  return old;
}

Status DBImpl::GetImpl(const ReadOptions& read_options,
3067 3068
                       ColumnFamilyHandle* column_family, const Slice& key,
                       std::string* value, bool* value_found) {
L
Lei Jin 已提交
3069
  StopWatch sw(env_, stats_, DB_GET);
3070
  PERF_TIMER_GUARD(get_snapshot_time);
3071

3072 3073 3074
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();

3075
  SequenceNumber snapshot;
L
Lei Jin 已提交
3076 3077 3078
  if (read_options.snapshot != nullptr) {
    snapshot = reinterpret_cast<const SnapshotImpl*>(
        read_options.snapshot)->number_;
3079 3080
  } else {
    snapshot = versions_->LastSequence();
J
jorlow@chromium.org 已提交
3081
  }
3082

3083
  // Acquire SuperVersion
3084
  SuperVersion* sv = GetAndRefSuperVersion(cfd);
I
Igor Canadi 已提交
3085

3086
  // Prepare to store a list of merge operations if merge occurs.
3087
  MergeContext merge_context;
3088

3089
  Status s;
3090
  // First look in the memtable, then in the immutable memtable (if any).
3091
  // s is both in/out. When in, s could either be OK or MergeInProgress.
3092
  // merge_operands will contain the sequence of merges in the latter case.
3093
  LookupKey lkey(key, snapshot);
L
Lei Jin 已提交
3094
  PERF_TIMER_STOP(get_snapshot_time);
3095

3096
  if (sv->mem->Get(lkey, value, &s, &merge_context)) {
3097
    // Done
L
Lei Jin 已提交
3098
    RecordTick(stats_, MEMTABLE_HIT);
3099
  } else if (sv->imm->Get(lkey, value, &s, &merge_context)) {
3100
    // Done
L
Lei Jin 已提交
3101
    RecordTick(stats_, MEMTABLE_HIT);
3102
  } else {
3103
    PERF_TIMER_GUARD(get_from_output_files_time);
L
Lei Jin 已提交
3104 3105
    sv->current->Get(read_options, lkey, value, &s, &merge_context,
                     value_found);
L
Lei Jin 已提交
3106
    RecordTick(stats_, MEMTABLE_MISS);
3107
  }
3108

3109 3110
  {
    PERF_TIMER_GUARD(get_post_process_time);
3111

3112
    ReturnAndCleanupSuperVersion(cfd, sv);
3113

3114 3115 3116
    RecordTick(stats_, NUMBER_KEYS_READ);
    RecordTick(stats_, BYTES_READ, value->size());
  }
3117
  return s;
J
jorlow@chromium.org 已提交
3118 3119
}

3120
std::vector<Status> DBImpl::MultiGet(
L
Lei Jin 已提交
3121
    const ReadOptions& read_options,
3122
    const std::vector<ColumnFamilyHandle*>& column_family,
3123
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
3124

L
Lei Jin 已提交
3125
  StopWatch sw(env_, stats_, DB_MULTIGET);
3126
  PERF_TIMER_GUARD(get_snapshot_time);
K
Kai Liu 已提交
3127

3128
  SequenceNumber snapshot;
3129

3130
  struct MultiGetColumnFamilyData {
I
Igor Canadi 已提交
3131
    ColumnFamilyData* cfd;
3132 3133 3134 3135 3136
    SuperVersion* super_version;
  };
  std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data;
  // fill up and allocate outside of mutex
  for (auto cf : column_family) {
3137 3138 3139 3140 3141 3142
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
    auto cfd = cfh->cfd();
    if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
      auto mgcfd = new MultiGetColumnFamilyData();
      mgcfd->cfd = cfd;
      multiget_cf_data.insert({cfd->GetID(), mgcfd});
3143 3144 3145
    }
  }

3146
  mutex_.Lock();
L
Lei Jin 已提交
3147 3148 3149
  if (read_options.snapshot != nullptr) {
    snapshot = reinterpret_cast<const SnapshotImpl*>(
        read_options.snapshot)->number_;
3150 3151 3152
  } else {
    snapshot = versions_->LastSequence();
  }
3153
  for (auto mgd_iter : multiget_cf_data) {
3154 3155
    mgd_iter.second->super_version =
        mgd_iter.second->cfd->GetSuperVersion()->Ref();
3156
  }
3157
  mutex_.Unlock();
3158

3159 3160
  // Contain a list of merge operations if merge occurs.
  MergeContext merge_context;
3161

3162
  // Note: this always resizes the values array
3163 3164 3165
  size_t num_keys = keys.size();
  std::vector<Status> stat_list(num_keys);
  values->resize(num_keys);
3166 3167

  // Keep track of bytes that we read for statistics-recording later
3168
  uint64_t bytes_read = 0;
L
Lei Jin 已提交
3169
  PERF_TIMER_STOP(get_snapshot_time);
3170 3171 3172 3173

  // For each of the given keys, apply the entire "get" process as follows:
  // First look in the memtable, then in the immutable memtable (if any).
  // s is both in/out. When in, s could either be OK or MergeInProgress.
3174
  // merge_operands will contain the sequence of merges in the latter case.
3175
  for (size_t i = 0; i < num_keys; ++i) {
3176
    merge_context.Clear();
3177
    Status& s = stat_list[i];
3178 3179 3180
    std::string* value = &(*values)[i];

    LookupKey lkey(keys[i], snapshot);
3181 3182
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
    auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
3183 3184 3185
    assert(mgd_iter != multiget_cf_data.end());
    auto mgd = mgd_iter->second;
    auto super_version = mgd->super_version;
3186
    if (super_version->mem->Get(lkey, value, &s, &merge_context)) {
3187
      // Done
3188
    } else if (super_version->imm->Get(lkey, value, &s, &merge_context)) {
3189 3190
      // Done
    } else {
3191
      PERF_TIMER_GUARD(get_from_output_files_time);
L
Lei Jin 已提交
3192 3193
      super_version->current->Get(read_options, lkey, value, &s,
                                  &merge_context);
3194 3195 3196
    }

    if (s.ok()) {
3197
      bytes_read += value->size();
3198 3199 3200 3201
    }
  }

  // Post processing (decrement reference counts and record statistics)
3202
  PERF_TIMER_GUARD(get_post_process_time);
3203 3204
  autovector<SuperVersion*> superversions_to_delete;

I
Igor Canadi 已提交
3205
  // TODO(icanadi) do we need lock here or just around Cleanup()?
3206 3207 3208 3209 3210 3211
  mutex_.Lock();
  for (auto mgd_iter : multiget_cf_data) {
    auto mgd = mgd_iter.second;
    if (mgd->super_version->Unref()) {
      mgd->super_version->Cleanup();
      superversions_to_delete.push_back(mgd->super_version);
3212 3213
    }
  }
3214 3215 3216 3217 3218 3219 3220
  mutex_.Unlock();

  for (auto td : superversions_to_delete) {
    delete td;
  }
  for (auto mgd : multiget_cf_data) {
    delete mgd.second;
3221
  }
3222

L
Lei Jin 已提交
3223 3224 3225
  RecordTick(stats_, NUMBER_MULTIGET_CALLS);
  RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
  RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
L
Lei Jin 已提交
3226
  PERF_TIMER_STOP(get_post_process_time);
3227

3228
  return stat_list;
3229 3230
}

L
Lei Jin 已提交
3231
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
3232
                                  const std::string& column_family_name,
3233
                                  ColumnFamilyHandle** handle) {
I
Igor Canadi 已提交
3234 3235 3236
  *handle = nullptr;
  MutexLock l(&mutex_);

I
Igor Canadi 已提交
3237 3238
  if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
      nullptr) {
I
Igor Canadi 已提交
3239 3240
    return Status::InvalidArgument("Column family already exists");
  }
3241
  VersionEdit edit;
3242
  edit.AddColumnFamily(column_family_name);
3243 3244
  uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
  edit.SetColumnFamily(new_id);
I
Igor Canadi 已提交
3245
  edit.SetLogNumber(logfile_number_);
L
Lei Jin 已提交
3246
  edit.SetComparatorName(cf_options.comparator->Name());
3247

I
Igor Canadi 已提交
3248 3249
  // LogAndApply will both write the creation in MANIFEST and create
  // ColumnFamilyData object
L
Lei Jin 已提交
3250
  Options opt(db_options_, cf_options);
3251 3252
  Status s = versions_->LogAndApply(nullptr,
      MutableCFOptions(opt, ImmutableCFOptions(opt)),
L
Lei Jin 已提交
3253
      &edit, &mutex_, db_directory_.get(), false, &cf_options);
3254
  if (s.ok()) {
3255
    single_column_family_mode_ = false;
3256 3257 3258
    auto cfd =
        versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
    assert(cfd != nullptr);
L
Lei Jin 已提交
3259
    delete InstallSuperVersion(cfd, nullptr, *cfd->GetLatestMutableCFOptions());
3260
    *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
3261
    Log(db_options_.info_log, "Created column family [%s] (ID %u)",
3262 3263
        column_family_name.c_str(), (unsigned)cfd->GetID());
  } else {
3264
    Log(db_options_.info_log, "Creating column family [%s] FAILED -- %s",
3265 3266
        column_family_name.c_str(), s.ToString().c_str());
  }
3267
  return s;
3268 3269
}

3270 3271 3272 3273
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
  if (cfd->GetID() == 0) {
3274 3275
    return Status::InvalidArgument("Can't drop default column family");
  }
3276

I
Igor Canadi 已提交
3277 3278
  VersionEdit edit;
  edit.DropColumnFamily();
3279 3280
  edit.SetColumnFamily(cfd->GetID());

3281
  Status s;
3282 3283 3284 3285 3286 3287
  {
    MutexLock l(&mutex_);
    if (cfd->IsDropped()) {
      s = Status::InvalidArgument("Column family already dropped!\n");
    }
    if (s.ok()) {
3288
      // we drop column family from a single write thread
I
Igor Canadi 已提交
3289 3290
      WriteThread::Writer w(&mutex_);
      s = write_thread_.EnterWriteThread(&w, 0);
3291
      assert(s.ok() && !w.done);  // No timeout and nobody should do our job
3292 3293
      s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
                                 &edit, &mutex_);
I
Igor Canadi 已提交
3294
      write_thread_.ExitWriteThread(&w, &w, s);
3295
    }
3296
  }
3297

3298
  if (s.ok()) {
I
Igor Canadi 已提交
3299
    assert(cfd->IsDropped());
L
Lei Jin 已提交
3300 3301 3302
    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
    max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
                                  mutable_cf_options->max_write_buffer_number;
3303 3304
    Log(db_options_.info_log, "Dropped column family with id %u\n",
        cfd->GetID());
3305
  } else {
3306 3307
    Log(db_options_.info_log,
        "Dropping column family with id %u FAILED -- %s\n",
3308 3309 3310
        cfd->GetID(), s.ToString().c_str());
  }

3311
  return s;
3312 3313
}

L
Lei Jin 已提交
3314
bool DBImpl::KeyMayExist(const ReadOptions& read_options,
3315 3316
                         ColumnFamilyHandle* column_family, const Slice& key,
                         std::string* value, bool* value_found) {
3317
  if (value_found != nullptr) {
K
Kai Liu 已提交
3318 3319
    // falsify later if key-may-exist but can't fetch value
    *value_found = true;
3320
  }
L
Lei Jin 已提交
3321
  ReadOptions roptions = read_options;
3322
  roptions.read_tier = kBlockCacheTier; // read from block cache only
3323
  auto s = GetImpl(roptions, column_family, key, value, value_found);
K
Kai Liu 已提交
3324

3325
  // If block_cache is enabled and the index block of the table didn't
K
Kai Liu 已提交
3326 3327 3328
  // not present in block_cache, the return value will be Status::Incomplete.
  // In this case, key may still exist in the table.
  return s.ok() || s.IsIncomplete();
3329 3330
}

3331
Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
3332 3333 3334
                              ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
T
Tomislav Novak 已提交
3335

3336
  if (read_options.tailing) {
I
Igor Canadi 已提交
3337 3338 3339 3340
#ifdef ROCKSDB_LITE
    // not supported in lite version
    return nullptr;
#else
3341 3342
    SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
    auto iter = new ForwardIterator(this, read_options, cfd, sv);
3343
    return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
3344 3345 3346
        kMaxSequenceNumber,
        sv->mutable_cf_options.max_sequential_skip_in_iterations,
        read_options.iterate_upper_bound);
I
Igor Canadi 已提交
3347
#endif
T
Tomislav Novak 已提交
3348
  } else {
3349
    SequenceNumber latest_snapshot = versions_->LastSequence();
3350
    SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
3351

I
Igor Canadi 已提交
3352
    auto snapshot =
3353 3354 3355
        read_options.snapshot != nullptr
            ? reinterpret_cast<const SnapshotImpl*>(
                read_options.snapshot)->number_
I
Igor Canadi 已提交
3356
            : latest_snapshot;
T
Tomislav Novak 已提交
3357

3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400
    // Try to generate a DB iterator tree in continuous memory area to be
    // cache friendly. Here is an example of result:
    // +-------------------------------+
    // |                               |
    // | ArenaWrappedDBIter            |
    // |  +                            |
    // |  +---> Inner Iterator   ------------+
    // |  |                            |     |
    // |  |    +-- -- -- -- -- -- -- --+     |
    // |  +--- | Arena                 |     |
    // |       |                       |     |
    // |          Allocated Memory:    |     |
    // |       |   +-------------------+     |
    // |       |   | DBIter            | <---+
    // |           |  +                |
    // |       |   |  +-> iter_  ------------+
    // |       |   |                   |     |
    // |       |   +-------------------+     |
    // |       |   | MergingIterator   | <---+
    // |           |  +                |
    // |       |   |  +->child iter1  ------------+
    // |       |   |  |                |          |
    // |           |  +->child iter2  ----------+ |
    // |       |   |  |                |        | |
    // |       |   |  +->child iter3  --------+ | |
    // |           |                   |      | | |
    // |       |   +-------------------+      | | |
    // |       |   | Iterator1         | <--------+
    // |       |   +-------------------+      | |
    // |       |   | Iterator2         | <------+
    // |       |   +-------------------+      |
    // |       |   | Iterator3         | <----+
    // |       |   +-------------------+
    // |       |                       |
    // +-------+-----------------------+
    //
    // ArenaWrappedDBIter inlines an arena area where all the iterartor in the
    // the iterator tree is allocated in the order of being accessed when
    // querying.
    // Laying out the iterators in the order of being accessed makes it more
    // likely that any iterator pointer is close to the iterator it points to so
    // that they are likely to be in the same cache line and/or page.
    ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
3401
        env_, *cfd->ioptions(), cfd->user_comparator(),
3402
        snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
3403
        read_options.iterate_upper_bound);
3404

3405
    Iterator* internal_iter =
3406
        NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
3407 3408 3409 3410
    db_iter->SetIterUnderDBIter(internal_iter);

    return db_iter;
  }
3411 3412
  // To stop compiler from complaining
  return nullptr;
J
jorlow@chromium.org 已提交
3413 3414
}

3415
Status DBImpl::NewIterators(
3416
    const ReadOptions& read_options,
I
Igor Canadi 已提交
3417
    const std::vector<ColumnFamilyHandle*>& column_families,
3418
    std::vector<Iterator*>* iterators) {
I
Igor Canadi 已提交
3419 3420 3421
  iterators->clear();
  iterators->reserve(column_families.size());

3422
  if (read_options.tailing) {
I
Igor Canadi 已提交
3423 3424 3425 3426
#ifdef ROCKSDB_LITE
    return Status::InvalidArgument(
        "Tailing interator not supported in RocksDB lite");
#else
I
Igor Canadi 已提交
3427 3428
    for (auto cfh : column_families) {
      auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
3429 3430
      SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
      auto iter = new ForwardIterator(this, read_options, cfd, sv);
L
Lei Jin 已提交
3431
      iterators->push_back(
3432
          NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
3433 3434
              kMaxSequenceNumber,
              sv->mutable_cf_options.max_sequential_skip_in_iterations));
I
Igor Canadi 已提交
3435
    }
I
Igor Canadi 已提交
3436
#endif
I
Igor Canadi 已提交
3437
  } else {
3438 3439
    SequenceNumber latest_snapshot = versions_->LastSequence();

I
Igor Canadi 已提交
3440
    for (size_t i = 0; i < column_families.size(); ++i) {
3441 3442 3443
      auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
          column_families[i])->cfd();
      SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
I
Igor Canadi 已提交
3444 3445

      auto snapshot =
3446 3447 3448
          read_options.snapshot != nullptr
              ? reinterpret_cast<const SnapshotImpl*>(
                  read_options.snapshot)->number_
I
Igor Canadi 已提交
3449 3450
              : latest_snapshot;

3451
      ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
3452
          env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
3453
          sv->mutable_cf_options.max_sequential_skip_in_iterations);
3454
      Iterator* internal_iter = NewInternalIterator(
3455
          read_options, cfd, sv, db_iter->GetArena());
3456 3457
      db_iter->SetIterUnderDBIter(internal_iter);
      iterators->push_back(db_iter);
I
Igor Canadi 已提交
3458 3459 3460 3461
    }
  }

  return Status::OK();
3462 3463
}

3464 3465 3466 3467 3468 3469 3470 3471 3472
bool DBImpl::IsSnapshotSupported() const {
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    if (!cfd->mem()->IsSnapshotSupported()) {
      return false;
    }
  }
  return true;
}

J
jorlow@chromium.org 已提交
3473
const Snapshot* DBImpl::GetSnapshot() {
3474
  MutexLock l(&mutex_);
3475 3476
  // returns null if the underlying memtable does not support snapshot.
  if (!IsSnapshotSupported()) return nullptr;
3477
  return snapshots_.New(versions_->LastSequence());
J
jorlow@chromium.org 已提交
3478 3479 3480 3481
}

void DBImpl::ReleaseSnapshot(const Snapshot* s) {
  MutexLock l(&mutex_);
3482
  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
J
jorlow@chromium.org 已提交
3483 3484 3485
}

// Convenience methods
3486 3487
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
                   const Slice& key, const Slice& val) {
3488
  return DB::Put(o, column_family, key, val);
J
jorlow@chromium.org 已提交
3489 3490
}

3491 3492 3493
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
                     const Slice& key, const Slice& val) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3494
  if (!cfh->cfd()->ioptions()->merge_operator) {
3495 3496
    return Status::NotSupported("Provide a merge_operator when opening DB");
  } else {
3497
    return DB::Merge(o, column_family, key, val);
3498 3499 3500
  }
}

L
Lei Jin 已提交
3501
Status DBImpl::Delete(const WriteOptions& write_options,
3502
                      ColumnFamilyHandle* column_family, const Slice& key) {
L
Lei Jin 已提交
3503
  return DB::Delete(write_options, column_family, key);
J
jorlow@chromium.org 已提交
3504 3505
}

L
Lei Jin 已提交
3506
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
S
Stanislau Hlebik 已提交
3507 3508 3509
  if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
  }
3510
  PERF_TIMER_GUARD(write_pre_and_post_process_time);
I
Igor Canadi 已提交
3511
  WriteThread::Writer w(&mutex_);
S
Stanislau Hlebik 已提交
3512
  w.batch = my_batch;
L
Lei Jin 已提交
3513 3514
  w.sync = write_options.sync;
  w.disableWAL = write_options.disableWAL;
S
Stanislau Hlebik 已提交
3515 3516
  w.in_batch_group = false;
  w.done = false;
L
Lei Jin 已提交
3517
  w.timeout_hint_us = write_options.timeout_hint_us;
S
Stanislau Hlebik 已提交
3518 3519

  uint64_t expiration_time = 0;
I
Igor Canadi 已提交
3520
  bool has_timeout = false;
S
Stanislau Hlebik 已提交
3521
  if (w.timeout_hint_us == 0) {
I
Igor Canadi 已提交
3522
    w.timeout_hint_us = WriteThread::kNoTimeOut;
S
Stanislau Hlebik 已提交
3523 3524
  } else {
    expiration_time = env_->NowMicros() + w.timeout_hint_us;
I
Igor Canadi 已提交
3525
    has_timeout = true;
S
Stanislau Hlebik 已提交
3526 3527
  }

L
Lei Jin 已提交
3528
  if (!write_options.disableWAL) {
S
Stanislau Hlebik 已提交
3529 3530 3531 3532 3533 3534
    RecordTick(stats_, WRITE_WITH_WAL);
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
  }

  WriteContext context;
  mutex_.Lock();
I
Igor Canadi 已提交
3535
  Status status = write_thread_.EnterWriteThread(&w, expiration_time);
S
Stanislau Hlebik 已提交
3536 3537
  assert(status.ok() || status.IsTimedOut());
  if (status.IsTimedOut()) {
3538
    mutex_.Unlock();
L
Lei Jin 已提交
3539
    RecordTick(stats_, WRITE_TIMEDOUT);
3540
    return Status::TimedOut();
3541
  }
S
Stanislau Hlebik 已提交
3542 3543 3544 3545 3546 3547 3548 3549 3550 3551
  if (w.done) {  // write was done by someone else
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
                                           1);
    mutex_.Unlock();
    RecordTick(stats_, WRITE_DONE_BY_OTHER);
    return w.status;
  }

  RecordTick(stats_, WRITE_DONE_BY_SELF);
  default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
3552

3553 3554 3555 3556
  // Once reaches this point, the current writer "w" will try to do its write
  // job.  It may also pick up some of the remaining writers in the "writers_"
  // when it finds suitable, and finish them in the same write batch.
  // This is how a write job could be done by the other writer.
3557 3558 3559
  assert(!single_column_family_mode_ ||
         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);

3560
  uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0)
3561
                                    ? 4 * max_total_in_memory_state_
3562
                                    : db_options_.max_total_wal_size;
3563
  if (UNLIKELY(!single_column_family_mode_) &&
3564
      alive_log_files_.begin()->getting_flushed == false &&
I
Igor Canadi 已提交
3565
      total_log_size_ > max_total_wal_size) {
I
Igor Canadi 已提交
3566
    uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
I
Igor Canadi 已提交
3567
    alive_log_files_.begin()->getting_flushed = true;
3568
    Log(db_options_.info_log,
3569 3570 3571
        "Flushing all column families with data in WAL number %" PRIu64
        ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
        flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
I
Igor Canadi 已提交
3572 3573
    // no need to refcount because drop is happening in write thread, so can't
    // happen while we're in the write thread
3574
    for (auto cfd : *versions_->GetColumnFamilySet()) {
I
Igor Canadi 已提交
3575
      if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
S
Stanislau Hlebik 已提交
3576
        status = SetNewMemtableAndNewLogFile(cfd, &context);
I
Igor Canadi 已提交
3577 3578 3579
        if (!status.ok()) {
          break;
        }
S
Stanislau Hlebik 已提交
3580
        cfd->imm()->FlushRequested();
3581
      }
3582
    }
I
Igor Canadi 已提交
3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595
    MaybeScheduleFlushOrCompaction();
  }

  if (UNLIKELY(status.ok() && !bg_error_.ok())) {
    status = bg_error_;
  }

  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
    status = ScheduleFlushes(&context);
  }

  if (UNLIKELY(status.ok()) &&
      (write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) {
I
Igor Canadi 已提交
3596
    status = DelayWrite(expiration_time);
I
Igor Canadi 已提交
3597 3598 3599 3600 3601
  }

  if (UNLIKELY(status.ok() && has_timeout &&
               env_->NowMicros() > expiration_time)) {
    status = Status::TimedOut();
3602
  }
3603

D
dgrogan@chromium.org 已提交
3604
  uint64_t last_sequence = versions_->LastSequence();
I
Igor Canadi 已提交
3605
  WriteThread::Writer* last_writer = &w;
S
Stanislau Hlebik 已提交
3606
  if (status.ok()) {
3607
    autovector<WriteBatch*> write_batch_group;
I
Igor Canadi 已提交
3608
    write_thread_.BuildBatchGroup(&last_writer, &write_batch_group);
3609

3610 3611 3612
    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
3613
    // into memtables
3614
    {
3615
      mutex_.Unlock();
3616 3617 3618 3619 3620 3621 3622 3623 3624 3625
      WriteBatch* updates = nullptr;
      if (write_batch_group.size() == 1) {
        updates = write_batch_group[0];
      } else {
        updates = &tmp_batch_;
        for (size_t i = 0; i < write_batch_group.size(); ++i) {
          WriteBatchInternal::Append(updates, write_batch_group[i]);
        }
      }

3626 3627 3628 3629
      const SequenceNumber current_sequence = last_sequence + 1;
      WriteBatchInternal::SetSequence(updates, current_sequence);
      int my_batch_count = WriteBatchInternal::Count(updates);
      last_sequence += my_batch_count;
3630
      const uint64_t batch_size = WriteBatchInternal::ByteSize(updates);
3631
      // Record statistics
L
Lei Jin 已提交
3632
      RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count);
S
sdong 已提交
3633
      RecordTick(stats_, BYTES_WRITTEN, batch_size);
L
Lei Jin 已提交
3634
      if (write_options.disableWAL) {
3635
        flush_on_destroy_ = true;
3636
      }
L
Lei Jin 已提交
3637
      PERF_TIMER_STOP(write_pre_and_post_process_time);
3638

3639
      uint64_t log_size = 0;
L
Lei Jin 已提交
3640
      if (!write_options.disableWAL) {
3641
        PERF_TIMER_GUARD(write_wal_time);
3642 3643
        Slice log_entry = WriteBatchInternal::Contents(updates);
        status = log_->AddRecord(log_entry);
I
Igor Canadi 已提交
3644 3645
        total_log_size_ += log_entry.size();
        alive_log_files_.back().AddSize(log_entry.size());
3646
        log_empty_ = false;
3647
        log_size = log_entry.size();
L
Lei Jin 已提交
3648
        RecordTick(stats_, WAL_FILE_BYTES, log_size);
L
Lei Jin 已提交
3649
        if (status.ok() && write_options.sync) {
I
Igor Canadi 已提交
3650
          RecordTick(stats_, WAL_FILE_SYNCED);
3651
          StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
3652
          if (db_options_.use_fsync) {
3653
            status = log_->file()->Fsync();
3654
          } else {
3655
            status = log_->file()->Sync();
3656
          }
H
heyongqiang 已提交
3657
        }
3658 3659
      }
      if (status.ok()) {
3660
        PERF_TIMER_GUARD(write_memtable_time);
Y
Yueh-Hsuan Chiang 已提交
3661

3662
        status = WriteBatchInternal::InsertInto(
I
Igor Canadi 已提交
3663
            updates, column_family_memtables_.get(),
L
Lei Jin 已提交
3664
            write_options.ignore_missing_column_families, 0, this, false);
Y
Yueh-Hsuan Chiang 已提交
3665 3666 3667 3668 3669 3670 3671 3672
        // A non-OK status here indicates iteration failure (either in-memory
        // writebatch corruption (very bad), or the client specified invalid
        // column family).  This will later on trigger bg_error_.
        //
        // Note that existing logic was not sound. Any partial failure writing
        // into the memtable would result in a state that some write ops might
        // have succeeded in memtable but Status reports error for all writes.

L
Lei Jin 已提交
3673
        SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
3674
      }
L
Lei Jin 已提交
3675
      PERF_TIMER_START(write_pre_and_post_process_time);
3676 3677 3678
      if (updates == &tmp_batch_) {
        tmp_batch_.Clear();
      }
3679
      mutex_.Lock();
3680 3681 3682
      // internal stats
      default_cf_internal_stats_->AddDBStats(
          InternalStats::BYTES_WRITTEN, batch_size);
S
sdong 已提交
3683 3684
      default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN,
                                             my_batch_count);
L
Lei Jin 已提交
3685
      if (!write_options.disableWAL) {
3686 3687 3688 3689 3690
        default_cf_internal_stats_->AddDBStats(
            InternalStats::WAL_FILE_SYNCED, 1);
        default_cf_internal_stats_->AddDBStats(
            InternalStats::WAL_FILE_BYTES, log_size);
      }
3691
      if (status.ok()) {
3692
        versions_->SetLastSequence(last_sequence);
3693
      }
J
jorlow@chromium.org 已提交
3694 3695
    }
  }
3696
  if (db_options_.paranoid_checks && !status.ok() &&
3697
      !status.IsTimedOut() && bg_error_.ok()) {
I
Igor Canadi 已提交
3698 3699
    bg_error_ = status; // stop compaction & fail any further writes
  }
3700

I
Igor Canadi 已提交
3701
  write_thread_.ExitWriteThread(&w, last_writer, status);
I
Igor Canadi 已提交
3702
  mutex_.Unlock();
3703

3704
  if (status.IsTimedOut()) {
L
Lei Jin 已提交
3705
    RecordTick(stats_, WRITE_TIMEDOUT);
3706 3707
  }

J
jorlow@chromium.org 已提交
3708 3709 3710
  return status;
}

3711 3712
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
I
Igor Canadi 已提交
3713
Status DBImpl::DelayWrite(uint64_t expiration_time) {
3714 3715 3716 3717 3718 3719 3720 3721 3722
  StopWatch sw(env_, stats_, WRITE_STALL);
  bool has_timeout = (expiration_time > 0);
  auto delay = write_controller_.GetDelay();
  if (write_controller_.IsStopped() == false && delay > 0) {
    mutex_.Unlock();
    env_->SleepForMicroseconds(delay);
    mutex_.Lock();
  }

I
Igor Canadi 已提交
3723
  while (bg_error_.ok() && write_controller_.IsStopped()) {
3724 3725 3726
    if (has_timeout) {
      bg_cv_.TimedWait(expiration_time);
      if (env_->NowMicros() > expiration_time) {
I
Igor Canadi 已提交
3727
        return Status::TimedOut();
3728 3729 3730 3731 3732
      }
    } else {
      bg_cv_.Wait();
    }
  }
I
Igor Canadi 已提交
3733 3734

  return bg_error_;
3735 3736
}

I
Igor Canadi 已提交
3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747
Status DBImpl::ScheduleFlushes(WriteContext* context) {
  bool schedule_bg_work = false;
  ColumnFamilyData* cfd;
  while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
    schedule_bg_work = true;
    auto status = SetNewMemtableAndNewLogFile(cfd, context);
    if (cfd->Unref()) {
      delete cfd;
    }
    if (!status.ok()) {
      return status;
S
Stanislau Hlebik 已提交
3748 3749
    }
  }
I
Igor Canadi 已提交
3750 3751 3752 3753
  if (schedule_bg_work) {
    MaybeScheduleFlushOrCompaction();
  }
  return Status::OK();
S
Stanislau Hlebik 已提交
3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771
}

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
                                           WriteContext* context) {
  mutex_.AssertHeld();
  unique_ptr<WritableFile> lfile;
  log::Writer* new_log = nullptr;
  MemTable* new_mem = nullptr;

  // Attempt to switch to a new memtable and trigger flush of old.
  // Do this without holding the dbmutex lock.
  assert(versions_->PrevLogNumber() == 0);
  bool creating_new_log = !log_empty_;
  uint64_t new_log_number =
      creating_new_log ? versions_->NewFileNumber() : logfile_number_;
  SuperVersion* new_superversion = nullptr;
3772
  const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
S
Stanislau Hlebik 已提交
3773 3774 3775 3776
  mutex_.Unlock();
  Status s;
  {
    if (creating_new_log) {
3777 3778 3779
      s = env_->NewWritableFile(
          LogFileName(db_options_.wal_dir, new_log_number),
          &lfile, env_->OptimizeForLogWrite(env_options_));
S
Stanislau Hlebik 已提交
3780 3781 3782
      if (s.ok()) {
        // Our final size should be less than write_buffer_size
        // (compression, etc) but err on the side of caution.
L
Lei Jin 已提交
3783 3784
        lfile->SetPreallocationBlockSize(
            1.1 * mutable_cf_options.write_buffer_size);
S
Stanislau Hlebik 已提交
3785
        new_log = new log::Writer(std::move(lfile));
I
Igor Canadi 已提交
3786
      }
S
Stanislau Hlebik 已提交
3787 3788 3789
    }

    if (s.ok()) {
L
Lei Jin 已提交
3790 3791
      new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(),
                             mutable_cf_options);
S
Stanislau Hlebik 已提交
3792 3793 3794
      new_superversion = new SuperVersion();
    }
  }
I
Igor Canadi 已提交
3795 3796 3797
  Log(db_options_.info_log,
      "[%s] New memtable created with log file: #%" PRIu64 "\n",
      cfd->GetName().c_str(), new_log_number);
S
Stanislau Hlebik 已提交
3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814
  mutex_.Lock();
  if (!s.ok()) {
    // how do we fail if we're not creating new log?
    assert(creating_new_log);
    // Avoid chewing through file number space in a tight loop.
    versions_->ReuseLogFileNumber(new_log_number);
    assert(!new_mem);
    assert(!new_log);
    return s;
  }
  if (creating_new_log) {
    logfile_number_ = new_log_number;
    assert(new_log != nullptr);
    context->logs_to_free_.push_back(log_.release());
    log_.reset(new_log);
    log_empty_ = true;
    alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
I
Igor Canadi 已提交
3815
    for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
S
Stanislau Hlebik 已提交
3816 3817 3818 3819
      // all this is just optimization to delete logs that
      // are no longer needed -- if CF is empty, that means it
      // doesn't need that particular log to stay alive, so we just
      // advance the log number. no need to persist this in the manifest
I
Igor Canadi 已提交
3820 3821 3822
      if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
          loop_cfd->imm()->size() == 0) {
        loop_cfd->SetLogNumber(logfile_number_);
3823
      }
3824 3825
    }
  }
S
Stanislau Hlebik 已提交
3826 3827 3828 3829 3830
  cfd->mem()->SetNextLogNumber(logfile_number_);
  cfd->imm()->Add(cfd->mem());
  new_mem->Ref();
  cfd->SetMemtable(new_mem);
  context->superversions_to_free_.push_back(
L
Lei Jin 已提交
3831
      InstallSuperVersion(cfd, new_superversion, mutable_cf_options));
3832 3833 3834
  return s;
}

I
Igor Canadi 已提交
3835
#ifndef ROCKSDB_LITE
I
Igor Canadi 已提交
3836 3837 3838 3839 3840
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
                                        TablePropertiesCollection* props) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();

3841 3842
  // Increment the ref count
  mutex_.Lock();
I
Igor Canadi 已提交
3843
  auto version = cfd->current();
3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855
  version->Ref();
  mutex_.Unlock();

  auto s = version->GetPropertiesOfAllTables(props);

  // Decrement the ref count
  mutex_.Lock();
  version->Unref();
  mutex_.Unlock();

  return s;
}
I
Igor Canadi 已提交
3856
#endif  // ROCKSDB_LITE
3857

I
Igor Canadi 已提交
3858 3859 3860 3861
const std::string& DBImpl::GetName() const {
  return dbname_;
}

3862 3863 3864 3865
Env* DBImpl::GetEnv() const {
  return env_;
}

3866 3867
const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
I
Igor Canadi 已提交
3868
  return *cfh->cfd()->options();
I
Igor Canadi 已提交
3869 3870
}

3871
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
3872
                         const Slice& property, std::string* value) {
3873 3874
  bool is_int_property = false;
  bool need_out_of_mutex = false;
3875 3876 3877
  DBPropertyType property_type =
      GetPropertyType(property, &is_int_property, &need_out_of_mutex);

3878
  value->clear();
3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893
  if (is_int_property) {
    uint64_t int_value;
    bool ret_value = GetIntPropertyInternal(column_family, property_type,
                                            need_out_of_mutex, &int_value);
    if (ret_value) {
      *value = std::to_string(int_value);
    }
    return ret_value;
  } else {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    auto cfd = cfh->cfd();
    MutexLock l(&mutex_);
    return cfd->internal_stats()->GetStringProperty(property_type, property,
                                                    value);
  }
J
jorlow@chromium.org 已提交
3894 3895
}

3896 3897
bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
                            const Slice& property, uint64_t* value) {
3898 3899
  bool is_int_property = false;
  bool need_out_of_mutex = false;
3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911
  DBPropertyType property_type =
      GetPropertyType(property, &is_int_property, &need_out_of_mutex);
  if (!is_int_property) {
    return false;
  }
  return GetIntPropertyInternal(column_family, property_type, need_out_of_mutex,
                                value);
}

bool DBImpl::GetIntPropertyInternal(ColumnFamilyHandle* column_family,
                                    DBPropertyType property_type,
                                    bool need_out_of_mutex, uint64_t* value) {
3912 3913
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
3914 3915 3916

  if (!need_out_of_mutex) {
    MutexLock l(&mutex_);
3917
    return cfd->internal_stats()->GetIntProperty(property_type, value, this);
3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931
  } else {
    SuperVersion* sv = GetAndRefSuperVersion(cfd);

    bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
        property_type, sv->current, value);

    ReturnAndCleanupSuperVersion(cfd, sv);

    return ret;
  }
}

SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
  // TODO(ljin): consider using GetReferencedSuperVersion() directly
I
Igor Canadi 已提交
3932
  return cfd->GetThreadLocalSuperVersion(&mutex_);
3933 3934 3935 3936
}

void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
                                          SuperVersion* sv) {
I
Igor Canadi 已提交
3937
  bool unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv);
3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950

  if (unref_sv) {
    // Release SuperVersion
    if (sv->Unref()) {
      {
        MutexLock l(&mutex_);
        sv->Cleanup();
      }
      delete sv;
      RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
    }
    RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
  }
3951 3952
}

3953
void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
3954
                                 const Range* range, int n, uint64_t* sizes) {
J
jorlow@chromium.org 已提交
3955 3956
  // TODO(opt): better implementation
  Version* v;
3957 3958
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
J
jorlow@chromium.org 已提交
3959 3960
  {
    MutexLock l(&mutex_);
3961
    v = cfd->current();
3962
    v->Ref();
J
jorlow@chromium.org 已提交
3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979
  }

  for (int i = 0; i < n; i++) {
    // Convert user_key into a corresponding internal key.
    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
    sizes[i] = (limit >= start ? limit - start : 0);
  }

  {
    MutexLock l(&mutex_);
    v->Unref();
  }
}

I
Igor Canadi 已提交
3980 3981 3982 3983 3984
#ifndef ROCKSDB_LITE
Status DBImpl::GetUpdatesSince(
    SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,
    const TransactionLogIterator::ReadOptions& read_options) {

L
Lei Jin 已提交
3985
  RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
I
Igor Canadi 已提交
3986 3987 3988
  if (seq > versions_->LastSequence()) {
    return Status::NotFound("Requested sequence not yet written in the db");
  }
I
Igor Canadi 已提交
3989
  return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
I
Igor Canadi 已提交
3990 3991
}

3992 3993 3994
Status DBImpl::DeleteFile(std::string name) {
  uint64_t number;
  FileType type;
3995 3996 3997
  WalFileType log_type;
  if (!ParseFileName(name, &number, &type, &log_type) ||
      (type != kTableFile && type != kLogFile)) {
3998
    Log(db_options_.info_log, "DeleteFile %s failed.\n", name.c_str());
3999 4000 4001
    return Status::InvalidArgument("Invalid file name");
  }

4002 4003 4004 4005
  Status status;
  if (type == kLogFile) {
    // Only allow deleting archived log files
    if (log_type != kArchivedLogFile) {
4006
      Log(db_options_.info_log, "DeleteFile %s failed - not archived log.\n",
4007
          name.c_str());
4008 4009
      return Status::NotSupported("Delete only supported for archived logs");
    }
4010
    status = env_->DeleteFile(db_options_.wal_dir + "/" + name.c_str());
4011
    if (!status.ok()) {
4012
      Log(db_options_.info_log, "DeleteFile %s failed -- %s.\n",
4013
          name.c_str(), status.ToString().c_str());
4014 4015 4016 4017
    }
    return status;
  }

4018
  int level;
I
Igor Canadi 已提交
4019
  FileMetaData* metadata;
4020
  ColumnFamilyData* cfd;
4021
  VersionEdit edit;
I
Igor Canadi 已提交
4022
  JobContext job_context(true);
D
Dhruba Borthakur 已提交
4023 4024
  {
    MutexLock l(&mutex_);
4025
    status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
D
Dhruba Borthakur 已提交
4026
    if (!status.ok()) {
4027
      Log(db_options_.info_log, "DeleteFile %s failed. File not found\n",
4028
                             name.c_str());
D
Dhruba Borthakur 已提交
4029 4030
      return Status::InvalidArgument("File not found");
    }
4031
    assert(level < cfd->NumberLevels());
4032

D
Dhruba Borthakur 已提交
4033
    // If the file is being compacted no need to delete.
4034
    if (metadata->being_compacted) {
4035
      Log(db_options_.info_log,
4036
          "DeleteFile %s Skipped. File about to be compacted\n", name.c_str());
D
Dhruba Borthakur 已提交
4037
      return Status::OK();
4038 4039
    }

D
Dhruba Borthakur 已提交
4040 4041 4042
    // Only the files in the last level can be deleted externally.
    // This is to make sure that any deletion tombstones are not
    // lost. Check that the level passed is the last level.
S
sdong 已提交
4043
    auto* vstoreage = cfd->current()->storage_info();
I
Igor Canadi 已提交
4044
    for (int i = level + 1; i < cfd->NumberLevels(); i++) {
S
sdong 已提交
4045
      if (vstoreage->NumLevelFiles(i) != 0) {
4046
        Log(db_options_.info_log,
4047
            "DeleteFile %s FAILED. File not in last level\n", name.c_str());
D
Dhruba Borthakur 已提交
4048 4049 4050
        return Status::InvalidArgument("File not in last level");
      }
    }
4051
    // if level == 0, it has to be the oldest file
S
sdong 已提交
4052 4053
    if (level == 0 &&
        vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
4054 4055 4056
      return Status::InvalidArgument("File in level 0, but not oldest");
    }
    edit.SetColumnFamily(cfd->GetID());
D
Dhruba Borthakur 已提交
4057
    edit.DeleteFile(level, number);
4058 4059
    status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
                                    &edit, &mutex_, db_directory_.get());
I
Igor Canadi 已提交
4060
    if (status.ok()) {
I
Igor Canadi 已提交
4061 4062
      InstallSuperVersionBackground(cfd, &job_context,
                                    *cfd->GetLatestMutableCFOptions());
I
Igor Canadi 已提交
4063
    }
I
Igor Canadi 已提交
4064 4065
    FindObsoleteFiles(&job_context, false);
  }  // lock released here
4066
  LogFlush(db_options_.info_log);
I
Igor Canadi 已提交
4067
  // remove files outside the db-lock
I
Igor Canadi 已提交
4068 4069
  if (job_context.HaveSomethingToDelete()) {
    PurgeObsoleteFiles(job_context);
4070
  }
4071 4072 4073 4074 4075 4076
  {
    MutexLock l(&mutex_);
    // schedule flush if file deletion means we freed the space for flushes to
    // continue
    MaybeScheduleFlushOrCompaction();
  }
4077 4078 4079
  return status;
}

4080
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
4081
  MutexLock l(&mutex_);
4082
  versions_->GetLiveFilesMetaData(metadata);
4083
}
I
Igor Canadi 已提交
4084
#endif  // ROCKSDB_LITE
4085

I
Igor Canadi 已提交
4086 4087 4088 4089 4090 4091 4092
Status DBImpl::CheckConsistency() {
  mutex_.AssertHeld();
  std::vector<LiveFileMetaData> metadata;
  versions_->GetLiveFilesMetaData(&metadata);

  std::string corruption_messages;
  for (const auto& md : metadata) {
4093 4094
    std::string file_path = md.db_path + "/" + md.name;

I
Igor Canadi 已提交
4095 4096 4097 4098 4099 4100
    uint64_t fsize = 0;
    Status s = env_->GetFileSize(file_path, &fsize);
    if (!s.ok()) {
      corruption_messages +=
          "Can't access " + md.name + ": " + s.ToString() + "\n";
    } else if (fsize != md.size) {
4101
      corruption_messages += "Sst file size mismatch: " + file_path +
I
Igor Canadi 已提交
4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113
                             ". Size recorded in manifest " +
                             std::to_string(md.size) + ", actual size " +
                             std::to_string(fsize) + "\n";
    }
  }
  if (corruption_messages.size() == 0) {
    return Status::OK();
  } else {
    return Status::Corruption(corruption_messages);
  }
}

4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140
Status DBImpl::GetDbIdentity(std::string& identity) {
  std::string idfilename = IdentityFileName(dbname_);
  unique_ptr<SequentialFile> idfile;
  const EnvOptions soptions;
  Status s = env_->NewSequentialFile(idfilename, &idfile, soptions);
  if (!s.ok()) {
    return s;
  }
  uint64_t file_size;
  s = env_->GetFileSize(idfilename, &file_size);
  if (!s.ok()) {
    return s;
  }
  char buffer[file_size];
  Slice id;
  s = idfile->Read(file_size, &id, buffer);
  if (!s.ok()) {
    return s;
  }
  identity.assign(id.ToString());
  // If last character is '\n' remove it from identity
  if (identity.size() > 0 && identity.back() == '\n') {
    identity.pop_back();
  }
  return s;
}

J
jorlow@chromium.org 已提交
4141 4142
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
4143
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
4144
               const Slice& key, const Slice& value) {
4145 4146 4147 4148
  // Pre-allocate size of write batch conservatively.
  // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
  // and we allocate 11 extra bytes for key length, as well as value length.
  WriteBatch batch(key.size() + value.size() + 24);
4149
  batch.Put(column_family, key, value);
J
jorlow@chromium.org 已提交
4150 4151 4152
  return Write(opt, &batch);
}

4153 4154
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                  const Slice& key) {
J
jorlow@chromium.org 已提交
4155
  WriteBatch batch;
4156
  batch.Delete(column_family, key);
J
jorlow@chromium.org 已提交
4157 4158 4159
  return Write(opt, &batch);
}

4160 4161
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                 const Slice& key, const Slice& value) {
4162
  WriteBatch batch;
4163
  batch.Merge(column_family, key, value);
4164 4165 4166
  return Write(opt, &batch);
}

4167
// Default implementation -- returns not supported status
L
Lei Jin 已提交
4168
Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
4169
                              const std::string& column_family_name,
4170
                              ColumnFamilyHandle** handle) {
4171
  return Status::NotSupported("");
4172
}
4173
Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) {
4174
  return Status::NotSupported("");
4175 4176
}

J
jorlow@chromium.org 已提交
4177 4178
DB::~DB() { }

J
Jim Paton 已提交
4179
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
4180 4181 4182 4183
  DBOptions db_options(options);
  ColumnFamilyOptions cf_options(options);
  std::vector<ColumnFamilyDescriptor> column_families;
  column_families.push_back(
4184
      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
4185
  std::vector<ColumnFamilyHandle*> handles;
4186
  Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
4187 4188 4189 4190 4191 4192 4193
  if (s.ok()) {
    assert(handles.size() == 1);
    // i can delete the handle since DBImpl is always holding a reference to
    // default column family
    delete handles[0];
  }
  return s;
4194 4195
}

4196 4197
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
                const std::vector<ColumnFamilyDescriptor>& column_families,
4198
                std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
4199
  Status s = SanitizeOptionsByTable(db_options, column_families);
4200 4201 4202
  if (!s.ok()) {
    return s;
  }
4203
  if (db_options.db_paths.size() > 1) {
4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215
    for (auto& cfd : column_families) {
      if (cfd.options.compaction_style != kCompactionStyleUniversal) {
        return Status::NotSupported(
            "More than one DB paths are only supported in "
            "universal compaction style. ");
      }
    }

    if (db_options.db_paths.size() > 4) {
      return Status::NotSupported(
        "More than four DB paths are not supported yet. ");
    }
4216 4217
  }

4218
  *dbptr = nullptr;
4219
  handles->clear();
J
jorlow@chromium.org 已提交
4220

I
Igor Canadi 已提交
4221 4222 4223 4224
  size_t max_write_buffer_size = 0;
  for (auto cf : column_families) {
    max_write_buffer_size =
        std::max(max_write_buffer_size, cf.options.write_buffer_size);
4225
  }
4226

I
Igor Canadi 已提交
4227
  DBImpl* impl = new DBImpl(db_options, dbname);
4228
  s = impl->env_->CreateDirIfMissing(impl->db_options_.wal_dir);
4229
  if (s.ok()) {
4230
    for (auto db_path : impl->db_options_.db_paths) {
4231
      s = impl->env_->CreateDirIfMissing(db_path.path);
4232 4233 4234 4235 4236 4237
      if (!s.ok()) {
        break;
      }
    }
  }

4238 4239 4240 4241 4242 4243
  if (!s.ok()) {
    delete impl;
    return s;
  }

  s = impl->CreateArchivalDirectory();
4244 4245 4246 4247
  if (!s.ok()) {
    delete impl;
    return s;
  }
J
jorlow@chromium.org 已提交
4248
  impl->mutex_.Lock();
4249 4250
  // Handles create_if_missing, error_if_exists
  s = impl->Recover(column_families);
J
jorlow@chromium.org 已提交
4251
  if (s.ok()) {
4252
    uint64_t new_log_number = impl->versions_->NewFileNumber();
4253
    unique_ptr<WritableFile> lfile;
4254
    EnvOptions soptions(db_options);
4255 4256 4257
    s = impl->db_options_.env->NewWritableFile(
        LogFileName(impl->db_options_.wal_dir, new_log_number), &lfile,
        impl->db_options_.env->OptimizeForLogWrite(soptions));
J
jorlow@chromium.org 已提交
4258
    if (s.ok()) {
4259
      lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size);
4260
      impl->logfile_number_ = new_log_number;
4261
      impl->log_.reset(new log::Writer(std::move(lfile)));
I
Igor Canadi 已提交
4262

4263 4264
      // set column family handles
      for (auto cf : column_families) {
I
Igor Canadi 已提交
4265 4266
        auto cfd =
            impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
I
Igor Canadi 已提交
4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285
        if (cfd != nullptr) {
          handles->push_back(
              new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
        } else {
          if (db_options.create_missing_column_families) {
            // missing column family, create it
            ColumnFamilyHandle* handle;
            impl->mutex_.Unlock();
            s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
            impl->mutex_.Lock();
            if (s.ok()) {
              handles->push_back(handle);
            } else {
              break;
            }
          } else {
            s = Status::InvalidArgument("Column family not found: ", cf.name);
            break;
          }
I
Igor Canadi 已提交
4286
        }
4287
      }
I
Igor Canadi 已提交
4288 4289
    }
    if (s.ok()) {
4290
      for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
4291 4292
        delete impl->InstallSuperVersion(
            cfd, nullptr, *cfd->GetLatestMutableCFOptions());
4293
      }
I
Igor Canadi 已提交
4294 4295
      impl->alive_log_files_.push_back(
          DBImpl::LogFileNumberSize(impl->logfile_number_));
J
jorlow@chromium.org 已提交
4296
      impl->DeleteObsoleteFiles();
4297
      impl->MaybeScheduleFlushOrCompaction();
4298
      s = impl->db_directory_->Fsync();
J
jorlow@chromium.org 已提交
4299 4300
    }
  }
4301

I
Igor Canadi 已提交
4302 4303
  if (s.ok()) {
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
4304 4305
      if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
          cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
S
sdong 已提交
4306
        auto* vstorage = cfd->current()->storage_info();
S
sdong 已提交
4307 4308
        for (int i = 1; i < vstorage->NumberLevels(); ++i) {
          int num_files = vstorage->NumLevelFiles(i);
I
Igor Canadi 已提交
4309
          if (num_files > 0) {
I
Igor Canadi 已提交
4310 4311 4312
            s = Status::InvalidArgument(
                "Not all files are at level 0. Cannot "
                "open with universal or FIFO compaction style.");
I
Igor Canadi 已提交
4313 4314 4315 4316
            break;
          }
        }
      }
4317
      if (cfd->ioptions()->merge_operator != nullptr &&
4318 4319 4320 4321 4322
          !cfd->mem()->IsMergeOperatorSupported()) {
        s = Status::InvalidArgument(
            "The memtable of column family %s does not support merge operator "
            "its options.merge_operator is non-null", cfd->GetName().c_str());
      }
I
Igor Canadi 已提交
4323
      if (!s.ok()) {
4324 4325 4326 4327 4328
        break;
      }
    }
  }

4329 4330
  impl->mutex_.Unlock();

J
jorlow@chromium.org 已提交
4331
  if (s.ok()) {
4332
    impl->opened_successfully_ = true;
J
jorlow@chromium.org 已提交
4333 4334
    *dbptr = impl;
  } else {
4335 4336 4337
    for (auto h : *handles) {
      delete h;
    }
4338
    handles->clear();
J
jorlow@chromium.org 已提交
4339 4340 4341 4342 4343
    delete impl;
  }
  return s;
}

4344 4345 4346
Status DB::ListColumnFamilies(const DBOptions& db_options,
                              const std::string& name,
                              std::vector<std::string>* column_families) {
I
Igor Canadi 已提交
4347
  return VersionSet::ListColumnFamilies(column_families, name, db_options.env);
4348 4349
}

4350 4351 4352
Snapshot::~Snapshot() {
}

J
jorlow@chromium.org 已提交
4353
Status DestroyDB(const std::string& dbname, const Options& options) {
4354
  const InternalKeyComparator comparator(options.comparator);
4355
  const Options& soptions(SanitizeOptions(dbname, &comparator, options));
4356
  Env* env = soptions.env;
J
jorlow@chromium.org 已提交
4357
  std::vector<std::string> filenames;
4358 4359
  std::vector<std::string> archiveFiles;

4360
  std::string archivedir = ArchivalDirectory(dbname);
J
jorlow@chromium.org 已提交
4361 4362
  // Ignore error in case directory does not exist
  env->GetChildren(dbname, &filenames);
4363 4364 4365 4366 4367 4368 4369

  if (dbname != soptions.wal_dir) {
    std::vector<std::string> logfilenames;
    env->GetChildren(soptions.wal_dir, &logfilenames);
    filenames.insert(filenames.end(), logfilenames.begin(), logfilenames.end());
    archivedir = ArchivalDirectory(soptions.wal_dir);
  }
4370

J
jorlow@chromium.org 已提交
4371 4372 4373 4374 4375
  if (filenames.empty()) {
    return Status::OK();
  }

  FileLock* lock;
4376 4377
  const std::string lockname = LockFileName(dbname);
  Status result = env->LockFile(lockname, &lock);
J
jorlow@chromium.org 已提交
4378 4379 4380
  if (result.ok()) {
    uint64_t number;
    FileType type;
4381
    InfoLogPrefix info_log_prefix(!options.db_log_dir.empty(), dbname);
D
dgrogan@chromium.org 已提交
4382
    for (size_t i = 0; i < filenames.size(); i++) {
4383
      if (ParseFileName(filenames[i], &number, info_log_prefix.prefix, &type) &&
4384
          type != kDBLockFile) {  // Lock file will be deleted at end
K
Kosie van der Merwe 已提交
4385 4386 4387
        Status del;
        if (type == kMetaDatabase) {
          del = DestroyDB(dbname + "/" + filenames[i], options);
4388 4389
        } else if (type == kLogFile) {
          del = env->DeleteFile(soptions.wal_dir + "/" + filenames[i]);
K
Kosie van der Merwe 已提交
4390 4391 4392
        } else {
          del = env->DeleteFile(dbname + "/" + filenames[i]);
        }
J
jorlow@chromium.org 已提交
4393 4394 4395 4396 4397
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
4398

4399 4400
    for (auto& db_path : options.db_paths) {
      env->GetChildren(db_path.path, &filenames);
4401 4402 4403
      for (size_t i = 0; i < filenames.size(); i++) {
        if (ParseFileName(filenames[i], &number, &type) &&
            type == kTableFile) {  // Lock file will be deleted at end
4404
          Status del = env->DeleteFile(db_path.path + "/" + filenames[i]);
4405 4406 4407 4408 4409 4410 4411
          if (result.ok() && !del.ok()) {
            result = del;
          }
        }
      }
    }

4412
    env->GetChildren(archivedir, &archiveFiles);
4413 4414
    // Delete archival files.
    for (size_t i = 0; i < archiveFiles.size(); ++i) {
4415 4416
      if (ParseFileName(archiveFiles[i], &number, &type) &&
          type == kLogFile) {
4417
        Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]);
4418 4419 4420 4421 4422
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
4423
    // ignore case where no archival directory is present.
4424
    env->DeleteDir(archivedir);
4425

J
jorlow@chromium.org 已提交
4426
    env->UnlockFile(lock);  // Ignore error since state is already gone
4427
    env->DeleteFile(lockname);
J
jorlow@chromium.org 已提交
4428
    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
4429
    env->DeleteDir(soptions.wal_dir);
J
jorlow@chromium.org 已提交
4430 4431 4432 4433
  }
  return result;
}

4434 4435
//
// A global method that can dump out the build version
I
Igor Canadi 已提交
4436
void DumpRocksDBBuildVersion(Logger * log) {
I
Igor Canadi 已提交
4437
#if !defined(IOS_CROSS_COMPILE)
I
Igor Canadi 已提交
4438 4439 4440 4441
  // if we compile with Xcode, we don't run build_detect_vesion, so we don't
  // generate util/build_version.cc
  Log(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR, ROCKSDB_MINOR,
      ROCKSDB_PATCH);
4442
  Log(log, "Git sha %s", rocksdb_build_git_sha);
4443
  Log(log, "Compile time %s %s",
4444
      rocksdb_build_compile_time, rocksdb_build_compile_date);
I
Igor Canadi 已提交
4445
#endif
4446 4447
}

4448
}  // namespace rocksdb