db_impl.cc 133.1 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_impl.h"

#include <algorithm>
13 14
#include <climits>
#include <cstdio>
J
jorlow@chromium.org 已提交
15
#include <set>
16
#include <stdexcept>
17 18
#include <stdint.h>
#include <string>
19
#include <unordered_set>
20
#include <unordered_map>
T
Tomislav Novak 已提交
21
#include <utility>
22
#include <vector>
23

J
jorlow@chromium.org 已提交
24 25
#include "db/builder.h"
#include "db/dbformat.h"
26
#include "db/db_iter.h"
J
jorlow@chromium.org 已提交
27 28 29 30
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
31
#include "db/memtablelist.h"
32
#include "db/merge_context.h"
33
#include "db/merge_helper.h"
T
Tyler Harter 已提交
34
#include "db/prefix_filter_iterator.h"
J
jorlow@chromium.org 已提交
35
#include "db/table_cache.h"
K
kailiu 已提交
36
#include "db/table_properties_collector.h"
T
Tomislav Novak 已提交
37
#include "db/tailing_iter.h"
38
#include "db/transaction_log_impl.h"
J
jorlow@chromium.org 已提交
39 40
#include "db/version_set.h"
#include "db/write_batch_internal.h"
41
#include "port/port.h"
I
Igor Canadi 已提交
42
#include "rocksdb/cache.h"
43 44
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
45
#include "rocksdb/column_family.h"
46 47 48 49
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
S
Siying Dong 已提交
50 51
#include "rocksdb/table.h"
#include "port/port.h"
J
jorlow@chromium.org 已提交
52
#include "table/block.h"
53
#include "table/block_based_table_factory.h"
J
jorlow@chromium.org 已提交
54 55
#include "table/merger.h"
#include "table/two_level_iterator.h"
56 57
#include "util/auto_roll_logger.h"
#include "util/build_version.h"
J
jorlow@chromium.org 已提交
58
#include "util/coding.h"
I
Igor Canadi 已提交
59
#include "util/hash_skiplist_rep.h"
J
jorlow@chromium.org 已提交
60 61
#include "util/logging.h"
#include "util/mutexlock.h"
62
#include "util/perf_context_imp.h"
63
#include "util/stop_watch.h"
64
#include "util/autovector.h"
J
jorlow@chromium.org 已提交
65

66
namespace rocksdb {
J
jorlow@chromium.org 已提交
67

68
const std::string default_column_family_name("default");
69

70 71
void dumpLeveldbBuildVersion(Logger * log);

72 73 74 75 76
// Information kept for every waiting writer
struct DBImpl::Writer {
  Status status;
  WriteBatch* batch;
  bool sync;
H
heyongqiang 已提交
77
  bool disableWAL;
78 79 80 81 82 83
  bool done;
  port::CondVar cv;

  explicit Writer(port::Mutex* mu) : cv(mu) { }
};

J
jorlow@chromium.org 已提交
84 85 86
struct DBImpl::CompactionState {
  Compaction* const compaction;

87 88 89 90 91
  // If there were two snapshots with seq numbers s1 and
  // s2 and s1 < s2, and if we find two instances of a key k1 then lies
  // entirely within s1 and s2, then the earlier version of k1 can be safely
  // deleted because that version is not visible in any snapshot.
  std::vector<SequenceNumber> existing_snapshots;
J
jorlow@chromium.org 已提交
92 93 94 95 96 97

  // Files produced by compaction
  struct Output {
    uint64_t number;
    uint64_t file_size;
    InternalKey smallest, largest;
98
    SequenceNumber smallest_seqno, largest_seqno;
J
jorlow@chromium.org 已提交
99 100
  };
  std::vector<Output> outputs;
101
  std::list<uint64_t> allocated_file_numbers;
J
jorlow@chromium.org 已提交
102 103

  // State kept for output being generated
104 105
  unique_ptr<WritableFile> outfile;
  unique_ptr<TableBuilder> builder;
J
jorlow@chromium.org 已提交
106 107 108 109 110 111 112 113 114

  uint64_t total_bytes;

  Output* current_output() { return &outputs[outputs.size()-1]; }

  explicit CompactionState(Compaction* c)
      : compaction(c),
        total_bytes(0) {
  }
115 116 117 118 119 120 121

  // Create a client visible context of this compaction
  CompactionFilter::Context GetFilterContext() {
    CompactionFilter::Context context;
    context.is_full_compaction = compaction->IsFullCompaction();
    return context;
  }
J
jorlow@chromium.org 已提交
122 123
};

124
namespace {
J
jorlow@chromium.org 已提交
125
// Fix user-supplied options to be reasonable
126
template <class T, class V>
J
jorlow@chromium.org 已提交
127
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
D
dgrogan@chromium.org 已提交
128 129
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
J
jorlow@chromium.org 已提交
130
}
131 132
}  // anonymous namespace

J
jorlow@chromium.org 已提交
133 134
Options SanitizeOptions(const std::string& dbname,
                        const InternalKeyComparator* icmp,
S
Sanjay Ghemawat 已提交
135
                        const InternalFilterPolicy* ipolicy,
J
jorlow@chromium.org 已提交
136
                        const Options& src) {
137 138 139 140 141 142 143 144 145
  auto db_options = SanitizeOptions(dbname, DBOptions(src));
  auto cf_options = SanitizeOptions(icmp, ipolicy, ColumnFamilyOptions(src));
  return Options(db_options, cf_options);
}

DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
  DBOptions result = src;
  ClipToRange(&result.max_open_files, 20, 1000000);

146
  if (result.info_log == nullptr) {
K
Kai Liu 已提交
147 148
    Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
                                       result, &result.info_log);
J
jorlow@chromium.org 已提交
149 150
    if (!s.ok()) {
      // No place suitable for logging
151
      result.info_log = nullptr;
J
jorlow@chromium.org 已提交
152 153
    }
  }
154 155 156 157 158

  if (result.wal_dir.empty()) {
    // Use dbname as default
    result.wal_dir = dbname;
  }
159

J
jorlow@chromium.org 已提交
160 161 162
  return result;
}

S
Siying Dong 已提交
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
CompressionType GetCompressionType(const Options& options, int level,
                                   const bool enable_compression) {
  if (!enable_compression) {
    // disable compression
    return kNoCompression;
  }
  // If the use has specified a different compression level for each level,
  // then pick the compresison for that level.
  if (!options.compression_per_level.empty()) {
    const int n = options.compression_per_level.size() - 1;
    // It is possible for level_ to be -1; in that case, we use level
    // 0's compression.  This occurs mostly in backwards compatibility
    // situations when the builder doesn't know what level the file
    // belongs to.  Likewise, if level_ is beyond the end of the
    // specified compression levels, use the last value.
    return options.compression_per_level[std::max(0, std::min(level, n))];
  } else {
    return options.compression;
  }
}

184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
CompressionType GetCompressionFlush(const Options& options) {
  // Compressing memtable flushes might not help unless the sequential load
  // optimization is used for leveled compaction. Otherwise the CPU and
  // latency overhead is not offset by saving much space.

  bool can_compress;

  if  (options.compaction_style == kCompactionStyleUniversal) {
    can_compress =
        (options.compaction_options_universal.compression_size_percent < 0);
  } else {
    // For leveled compress when min_level_to_compress == 0.
    can_compress = (GetCompressionType(options, 0, true) != kNoCompression);
  }

  if (can_compress) {
    return options.compression;
  } else {
    return kNoCompression;
  }
}

J
jorlow@chromium.org 已提交
206 207
DBImpl::DBImpl(const Options& options, const std::string& dbname)
    : env_(options.env),
H
heyongqiang 已提交
208
      dbname_(dbname),
J
jorlow@chromium.org 已提交
209
      internal_comparator_(options.comparator),
210 211
      options_(SanitizeOptions(dbname, &internal_comparator_,
                               &internal_filter_policy_, options)),
H
heyongqiang 已提交
212
      internal_filter_policy_(options.filter_policy),
I
Igor Canadi 已提交
213 214 215 216
      // Reserve ten files or so for other uses and give the rest to TableCache.
      table_cache_(NewLRUCache(options_.max_open_files - 10,
                               options_.table_cache_numshardbits,
                               options_.table_cache_remove_scan_count_limit)),
217
      db_lock_(nullptr),
H
Haobo Xu 已提交
218
      mutex_(options.use_adaptive_mutex),
219
      shutting_down_(nullptr),
J
jorlow@chromium.org 已提交
220
      bg_cv_(&mutex_),
221
      logfile_number_(0),
222
      tmp_batch_(),
223
      bg_compaction_scheduled_(0),
224
      bg_manual_only_(0),
225
      bg_flush_scheduled_(0),
226
      bg_logstats_scheduled_(false),
227 228
      manual_compaction_(nullptr),
      logger_(nullptr),
229
      disable_delete_obsolete_files_(0),
I
Igor Canadi 已提交
230
      delete_obsolete_files_last_run_(options.env->NowMicros()),
231
      purge_wal_files_last_run_(0),
232
      last_stats_dump_time_microsec_(0),
233
      default_interval_to_delete_obsolete_WAL_(600),
234
      flush_on_destroy_(false),
235
      delayed_writes_(0),
236 237 238
      storage_options_(options),
      bg_work_gate_closed_(false),
      refitting_level_(false) {
239

H
heyongqiang 已提交
240
  env_->GetAbsolutePath(dbname, &db_absolute_path_);
241

I
Igor Canadi 已提交
242 243
  versions_.reset(
      new VersionSet(dbname_, &options_, storage_options_, table_cache_.get()));
244 245
  column_family_memtables_.reset(
      new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
246

247 248
  dumpLeveldbBuildVersion(options_.info_log.get());
  options_.Dump(options_.info_log.get());
249

250
  char name[100];
251
  Status st = env_->GetHostName(name, 100L);
252
  if (st.ok()) {
253 254 255 256 257 258
    host_name_ = name;
  } else {
    Log(options_.info_log, "Can't get hostname, use localhost as host name.");
    host_name_ = "localhost";
  }
  last_log_ts = 0;
259

I
Igor Canadi 已提交
260
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
261 262 263 264
}

DBImpl::~DBImpl() {
  // Wait for background work to finish
265 266 267 268 269 270
  if (flush_on_destroy_) {
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
        FlushMemTable(cfd, FlushOptions());
      }
    }
271
  }
272
  mutex_.Lock();
273
  shutting_down_.Release_Store(this);  // Any non-nullptr value is ok
274 275 276
  while (bg_compaction_scheduled_ ||
         bg_flush_scheduled_ ||
         bg_logstats_scheduled_) {
H
hans@chromium.org 已提交
277
    bg_cv_.Wait();
J
jorlow@chromium.org 已提交
278 279 280
  }
  mutex_.Unlock();

281
  if (db_lock_ != nullptr) {
J
jorlow@chromium.org 已提交
282 283 284
    env_->UnlockFile(db_lock_);
  }

I
Igor Canadi 已提交
285
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
286 287
}

A
Abhishek Kona 已提交
288
// Do not flush and close database elegantly. Simulate a crash.
289 290 291 292 293 294
void DBImpl::TEST_Destroy_DBImpl() {
  // ensure that no new memtable flushes can occur
  flush_on_destroy_ = false;

  // wait till all background compactions are done.
  mutex_.Lock();
295 296 297
  while (bg_compaction_scheduled_ ||
         bg_flush_scheduled_ ||
         bg_logstats_scheduled_) {
298 299 300 301
    bg_cv_.Wait();
  }

  // Prevent new compactions from occuring.
302
  bg_work_gate_closed_ = true;
303 304
  const int LargeNumber = 10000000;
  bg_compaction_scheduled_ += LargeNumber;
305

306
  mutex_.Unlock();
I
Igor Canadi 已提交
307
  LogFlush(options_.info_log);
308 309

  // force release the lock file.
310
  if (db_lock_ != nullptr) {
311 312
    env_->UnlockFile(db_lock_);
  }
313 314 315 316

  log_.reset();
  versions_.reset();
  table_cache_.reset();
317 318
}

A
Abhishek Kona 已提交
319 320 321
uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
  return versions_->ManifestFileNumber();
}
322

J
jorlow@chromium.org 已提交
323
Status DBImpl::NewDB() {
324
  VersionEdit new_db;
325
  new_db.SetComparatorName(internal_comparator_.user_comparator()->Name());
326
  new_db.SetLogNumber(0);
J
jorlow@chromium.org 已提交
327 328 329 330
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);

  const std::string manifest = DescriptorFileName(dbname_, 1);
331
  unique_ptr<WritableFile> file;
332
  Status s = env_->NewWritableFile(manifest, &file, storage_options_);
J
jorlow@chromium.org 已提交
333 334 335
  if (!s.ok()) {
    return s;
  }
336
  file->SetPreallocationBlockSize(options_.manifest_preallocation_size);
J
jorlow@chromium.org 已提交
337
  {
338
    log::Writer log(std::move(file));
J
jorlow@chromium.org 已提交
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
  }
  if (s.ok()) {
    // Make "CURRENT" file that points to the new manifest file.
    s = SetCurrentFile(env_, dbname_, 1);
  } else {
    env_->DeleteFile(manifest);
  }
  return s;
}

void DBImpl::MaybeIgnoreError(Status* s) const {
  if (s->ok() || options_.paranoid_checks) {
    // No change needed
  } else {
356
    Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
J
jorlow@chromium.org 已提交
357 358 359 360
    *s = Status::OK();
  }
}

361
const Status DBImpl::CreateArchivalDirectory() {
362
  if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
363
    std::string archivalPath = ArchivalDirectory(options_.wal_dir);
364 365 366 367 368
    return env_->CreateDirIfMissing(archivalPath);
  }
  return Status::OK();
}

369
void DBImpl::PrintStatistics() {
370
  auto dbstats = options_.statistics.get();
371 372
  if (dbstats) {
    Log(options_.info_log,
373 374
        "STATISTCS:\n %s",
        dbstats->ToString().c_str());
375 376 377
  }
}

378
void DBImpl::MaybeDumpStats() {
H
Haobo Xu 已提交
379 380 381 382 383 384 385 386 387 388 389 390 391
  if (options_.stats_dump_period_sec == 0) return;

  const uint64_t now_micros = env_->NowMicros();

  if (last_stats_dump_time_microsec_ +
      options_.stats_dump_period_sec * 1000000
      <= now_micros) {
    // Multiple threads could race in here simultaneously.
    // However, the last one will update last_stats_dump_time_microsec_
    // atomically. We could see more than one dump during one dump
    // period in rare cases.
    last_stats_dump_time_microsec_ = now_micros;
    std::string stats;
392
    GetProperty("rocksdb.stats", &stats);
H
Haobo Xu 已提交
393
    Log(options_.info_log, "%s", stats.c_str());
394
    PrintStatistics();
395 396 397
  }
}

398 399
// Returns the list of live files in 'sst_live' and the list
// of all files in the filesystem in 'all_files'.
I
Igor Canadi 已提交
400 401 402 403 404 405 406
// no_full_scan = true -- never do the full scan using GetChildren()
// force = false -- don't force the full scan, except every
//  options_.delete_obsolete_files_period_micros
// force = true -- force the full scan
void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
                               bool force,
                               bool no_full_scan) {
D
Dhruba Borthakur 已提交
407 408
  mutex_.AssertHeld();

409
  // if deletion is disabled, do nothing
410
  if (disable_delete_obsolete_files_ > 0) {
411 412 413
    return;
  }

414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
  bool doing_the_full_scan = false;

  // logic for figurint out if we're doing the full scan
  if (no_full_scan) {
    doing_the_full_scan = false;
  } else if (force || options_.delete_obsolete_files_period_micros == 0) {
    doing_the_full_scan = true;
  } else {
    const uint64_t now_micros = env_->NowMicros();
    if (delete_obsolete_files_last_run_ +
        options_.delete_obsolete_files_period_micros < now_micros) {
      doing_the_full_scan = true;
      delete_obsolete_files_last_run_ = now_micros;
    }
  }

I
Igor Canadi 已提交
430 431 432
  // get obsolete files
  versions_->GetObsoleteFiles(&deletion_state.sst_delete_files);

I
Igor Canadi 已提交
433 434
  // store the current filenum, lognum, etc
  deletion_state.manifest_file_number = versions_->ManifestFileNumber();
435
  deletion_state.log_number = versions_->MinLogNumber();
I
Igor Canadi 已提交
436 437
  deletion_state.prev_log_number = versions_->PrevLogNumber();

438 439 440 441 442 443 444 445
  if (!doing_the_full_scan && !deletion_state.HaveSomethingToDelete()) {
    // avoid filling up sst_live if we're sure that we
    // are not going to do the full scan and that we don't have
    // anything to delete at the moment
    return;
  }

  // don't delete live files
I
Igor Canadi 已提交
446 447 448 449
  deletion_state.sst_live.assign(pending_outputs_.begin(),
                                 pending_outputs_.end());
  versions_->AddLiveFiles(&deletion_state.sst_live);

450 451 452 453 454 455 456 457 458 459 460 461 462
  if (doing_the_full_scan) {
    // set of all files in the directory
    env_->GetChildren(dbname_, &deletion_state.all_files); // Ignore errors

    //Add log files in wal_dir
    if (options_.wal_dir != dbname_) {
      std::vector<std::string> log_files;
      env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
      deletion_state.all_files.insert(
        deletion_state.all_files.end(),
        log_files.begin(),
        log_files.end()
      );
463
    }
464
  }
465 466
}

D
Dhruba Borthakur 已提交
467
// Diffs the files listed in filenames and those that do not
I
Igor Canadi 已提交
468
// belong to live files are posibly removed. Also, removes all the
469
// files in sst_delete_files and log_delete_files.
470
// It is not necessary to hold the mutex when invoking this method.
D
Dhruba Borthakur 已提交
471
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
472 473 474 475 476 477 478 479

  // check if there is anything to do
  if (!state.all_files.size() &&
      !state.sst_delete_files.size() &&
      !state.log_delete_files.size()) {
    return;
  }

I
Igor Canadi 已提交
480 481 482 483
  // this checks if FindObsoleteFiles() was run before. If not, don't do
  // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
  // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
  if (state.manifest_file_number == 0) {
I
Igor Canadi 已提交
484 485 486
    return;
  }

J
jorlow@chromium.org 已提交
487 488
  uint64_t number;
  FileType type;
H
heyongqiang 已提交
489
  std::vector<std::string> old_log_files;
490

491 492
  // Now, convert live list to an unordered set, WITHOUT mutex held;
  // set is slow.
493 494
  std::unordered_set<uint64_t> live_set(state.sst_live.begin(),
                                        state.sst_live.end());
I
Igor Canadi 已提交
495

496 497 498
  state.all_files.reserve(state.all_files.size() +
      state.sst_delete_files.size());
  for (auto file : state.sst_delete_files) {
I
Igor Canadi 已提交
499
    state.all_files.push_back(TableFileName("", file->number).substr(1));
500
    delete file;
I
Igor Canadi 已提交
501 502
  }

503 504 505
  state.all_files.reserve(state.all_files.size() +
      state.log_delete_files.size());
  for (auto filenum : state.log_delete_files) {
I
Igor Canadi 已提交
506
    if (filenum > 0) {
I
Igor Canadi 已提交
507
      state.all_files.push_back(LogFileName("", filenum).substr(1));
I
Igor Canadi 已提交
508 509
    }
  }
510

I
Igor Canadi 已提交
511 512 513 514 515 516
  // dedup state.all_files so we don't try to delete the same
  // file twice
  sort(state.all_files.begin(), state.all_files.end());
  auto unique_end = unique(state.all_files.begin(), state.all_files.end());

  for (size_t i = 0; state.all_files.begin() + i < unique_end; i++) {
517
    if (ParseFileName(state.all_files[i], &number, &type)) {
J
jorlow@chromium.org 已提交
518 519 520
      bool keep = true;
      switch (type) {
        case kLogFile:
I
Igor Canadi 已提交
521 522
          keep = ((number >= state.log_number) ||
                  (number == state.prev_log_number));
J
jorlow@chromium.org 已提交
523 524 525 526
          break;
        case kDescriptorFile:
          // Keep my manifest file, and any newer incarnations'
          // (in case there is a race that allows other incarnations)
I
Igor Canadi 已提交
527
          keep = (number >= state.manifest_file_number);
J
jorlow@chromium.org 已提交
528 529
          break;
        case kTableFile:
530
          keep = (live_set.find(number) != live_set.end());
J
jorlow@chromium.org 已提交
531 532 533 534
          break;
        case kTempFile:
          // Any temp files that are currently being written to must
          // be recorded in pending_outputs_, which is inserted into "live"
535
          keep = (live_set.find(number) != live_set.end());
J
jorlow@chromium.org 已提交
536
          break;
H
heyongqiang 已提交
537 538 539
        case kInfoLogFile:
          keep = true;
          if (number != 0) {
540
            old_log_files.push_back(state.all_files[i]);
H
heyongqiang 已提交
541 542
          }
          break;
J
jorlow@chromium.org 已提交
543 544
        case kCurrentFile:
        case kDBLockFile:
M
Mayank Agarwal 已提交
545
        case kIdentityFile:
K
Kosie van der Merwe 已提交
546
        case kMetaDatabase:
J
jorlow@chromium.org 已提交
547 548 549 550 551 552
          keep = true;
          break;
      }

      if (!keep) {
        if (type == kTableFile) {
I
Igor Canadi 已提交
553
          // evict from cache
I
Igor Canadi 已提交
554
          TableCache::Evict(table_cache_.get(), number);
J
jorlow@chromium.org 已提交
555
        }
I
Igor Canadi 已提交
556 557
        std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
            "/" + state.all_files[i];
K
Kai Liu 已提交
558 559 560 561
        Log(options_.info_log,
            "Delete type=%d #%lu",
            int(type),
            (unsigned long)number);
562

I
Igor Canadi 已提交
563 564 565
        Status st;
        if (type == kLogFile && (options_.WAL_ttl_seconds > 0 ||
              options_.WAL_size_limit_MB > 0)) {
I
Igor Canadi 已提交
566 567
            st = env_->RenameFile(fname,
                ArchivedLogFileName(options_.wal_dir, number));
I
Igor Canadi 已提交
568
            if (!st.ok()) {
K
Kai Liu 已提交
569
              Log(options_.info_log,
I
Igor Canadi 已提交
570 571
                  "RenameFile logfile #%lu FAILED -- %s\n",
                  (unsigned long)number, st.ToString().c_str());
I
Igor Canadi 已提交
572
            }
573
        } else {
I
Igor Canadi 已提交
574
          st = env_->DeleteFile(fname);
575
          if (!st.ok()) {
I
Igor Canadi 已提交
576 577
            Log(options_.info_log, "Delete type=%d #%lu FAILED -- %s\n",
                int(type), (unsigned long)number, st.ToString().c_str());
578
          }
H
heyongqiang 已提交
579
        }
J
jorlow@chromium.org 已提交
580 581 582
      }
    }
  }
H
heyongqiang 已提交
583

584
  // Delete old info log files.
K
Kai Liu 已提交
585 586 587 588 589
  size_t old_log_file_count = old_log_files.size();
  // NOTE: Currently we only support log purge when options_.db_log_dir is
  // located in `dbname` directory.
  if (old_log_file_count >= options_.keep_log_file_num &&
      options_.db_log_dir.empty()) {
H
heyongqiang 已提交
590
    std::sort(old_log_files.begin(), old_log_files.end());
K
Kai Liu 已提交
591
    size_t end = old_log_file_count - options_.keep_log_file_num;
592
    for (unsigned int i = 0; i <= end; i++) {
H
heyongqiang 已提交
593
      std::string& to_delete = old_log_files.at(i);
D
Dhruba Borthakur 已提交
594 595
      // Log(options_.info_log, "Delete type=%d %s\n",
      //     int(kInfoLogFile), to_delete.c_str());
H
heyongqiang 已提交
596 597 598
      env_->DeleteFile(dbname_ + "/" + to_delete);
    }
  }
599
  PurgeObsoleteWALFiles();
I
Igor Canadi 已提交
600
  LogFlush(options_.info_log);
D
Dhruba Borthakur 已提交
601 602 603 604 605
}

void DBImpl::DeleteObsoleteFiles() {
  mutex_.AssertHeld();
  DeletionState deletion_state;
I
Igor Canadi 已提交
606
  FindObsoleteFiles(deletion_state, true);
D
Dhruba Borthakur 已提交
607
  PurgeObsoleteFiles(deletion_state);
608 609
}

610 611 612 613 614 615 616 617
// 1. Go through all archived files and
//    a. if ttl is enabled, delete outdated files
//    b. if archive size limit is enabled, delete empty files,
//        compute file number and size.
// 2. If size limit is enabled:
//    a. compute how many files should be deleted
//    b. get sorted non-empty archived logs
//    c. delete what should be deleted
618
void DBImpl::PurgeObsoleteWALFiles() {
619 620 621 622 623 624
  bool const ttl_enabled = options_.WAL_ttl_seconds > 0;
  bool const size_limit_enabled =  options_.WAL_size_limit_MB > 0;
  if (!ttl_enabled && !size_limit_enabled) {
    return;
  }

625 626
  int64_t current_time;
  Status s = env_->GetCurrentTime(&current_time);
627 628 629 630 631 632 633 634
  if (!s.ok()) {
    Log(options_.info_log, "Can't get current time: %s", s.ToString().c_str());
    assert(false);
    return;
  }
  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
  uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ?
    options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_;
635

636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675
  if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
    return;
  }

  purge_wal_files_last_run_ = now_seconds;

  std::string archival_dir = ArchivalDirectory(options_.wal_dir);
  std::vector<std::string> files;
  s = env_->GetChildren(archival_dir, &files);
  if (!s.ok()) {
    Log(options_.info_log, "Can't get archive files: %s", s.ToString().c_str());
    assert(false);
    return;
  }

  size_t log_files_num = 0;
  uint64_t log_file_size = 0;

  for (auto& f : files) {
    uint64_t number;
    FileType type;
    if (ParseFileName(f, &number, &type) && type == kLogFile) {
      std::string const file_path = archival_dir + "/" + f;
      if (ttl_enabled) {
        uint64_t file_m_time;
        Status const s = env_->GetFileModificationTime(file_path,
          &file_m_time);
        if (!s.ok()) {
          Log(options_.info_log, "Can't get file mod time: %s: %s",
              file_path.c_str(), s.ToString().c_str());
          continue;
        }
        if (now_seconds - file_m_time > options_.WAL_ttl_seconds) {
          Status const s = env_->DeleteFile(file_path);
          if (!s.ok()) {
            Log(options_.info_log, "Can't delete file: %s: %s",
                file_path.c_str(), s.ToString().c_str());
            continue;
          }
          continue;
676
        }
677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
      }

      if (size_limit_enabled) {
        uint64_t file_size;
        Status const s = env_->GetFileSize(file_path, &file_size);
        if (!s.ok()) {
          Log(options_.info_log, "Can't get file size: %s: %s",
              file_path.c_str(), s.ToString().c_str());
          return;
        } else {
          if (file_size > 0) {
            log_file_size = std::max(log_file_size, file_size);
            ++log_files_num;
          } else {
            Status s = env_->DeleteFile(file_path);
            if (!s.ok()) {
              Log(options_.info_log, "Can't delete file: %s: %s",
                  file_path.c_str(), s.ToString().c_str());
              continue;
            }
          }
        }
      }
    }
  }

  if (0 == log_files_num || !size_limit_enabled) {
    return;
  }

  size_t const files_keep_num = options_.WAL_size_limit_MB *
    1024 * 1024 / log_file_size;
  if (log_files_num <= files_keep_num) {
    return;
  }

  size_t files_del_num = log_files_num - files_keep_num;
  VectorLogPtr archived_logs;
  AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);

  if (files_del_num > archived_logs.size()) {
    Log(options_.info_log, "Trying to delete more archived log files than "
        "exist. Deleting all");
    files_del_num = archived_logs.size();
  }

  for (size_t i = 0; i < files_del_num; ++i) {
    std::string const file_path = archived_logs[i]->PathName();
    Status const s = DeleteFile(file_path);
    if (!s.ok()) {
      Log(options_.info_log, "Can't delete file: %s: %s",
          file_path.c_str(), s.ToString().c_str());
      continue;
730 731
    }
  }
D
Dhruba Borthakur 已提交
732 733
}

734
Status DBImpl::Recover(
735 736
    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
    bool error_if_log_file_exist) {
J
jorlow@chromium.org 已提交
737 738
  mutex_.AssertHeld();

739
  assert(db_lock_ == nullptr);
I
Igor Canadi 已提交
740
  if (!read_only) {
741 742 743 744 745 746 747 748 749 750 751 752
    // We call CreateDirIfMissing() as the directory may already exist (if we
    // are reopening a DB), when this happens we don't want creating the
    // directory to cause an error. However, we need to check if creating the
    // directory fails or else we may get an obscure message about the lock
    // file not existing. One real-world example of this occurring is if
    // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
    // when dbname_ is "dir/db" but when "dir" doesn't exist.
    Status s = env_->CreateDirIfMissing(dbname_);
    if (!s.ok()) {
      return s;
    }

753 754 755 756 757
    s = env_->NewDirectory(dbname_, &db_directory_);
    if (!s.ok()) {
      return s;
    }

758
    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
759 760 761
    if (!s.ok()) {
      return s;
    }
J
jorlow@chromium.org 已提交
762

763 764
    if (!env_->FileExists(CurrentFileName(dbname_))) {
      if (options_.create_if_missing) {
765
        // TODO: add merge_operator name check
766 767 768 769 770 771 772
        s = NewDB();
        if (!s.ok()) {
          return s;
        }
      } else {
        return Status::InvalidArgument(
            dbname_, "does not exist (create_if_missing is false)");
J
jorlow@chromium.org 已提交
773 774
      }
    } else {
775 776 777 778
      if (options_.error_if_exists) {
        return Status::InvalidArgument(
            dbname_, "exists (error_if_exists is true)");
      }
J
jorlow@chromium.org 已提交
779
    }
M
Mayank Agarwal 已提交
780 781 782 783 784 785 786
    // Check for the IDENTITY file and create it if not there
    if (!env_->FileExists(IdentityFileName(dbname_))) {
      s = SetIdentityFile(env_, dbname_);
      if (!s.ok()) {
        return s;
      }
    }
J
jorlow@chromium.org 已提交
787 788
  }

I
Igor Canadi 已提交
789
  Status s = versions_->Recover(column_families);
J
jorlow@chromium.org 已提交
790 791
  if (s.ok()) {
    SequenceNumber max_sequence(0);
792
    default_cfd_ = versions_->GetColumnFamilySet()->GetDefault();
793 794 795 796 797 798 799

    // Recover from all newer log files than the ones named in the
    // descriptor (new log files may have been added by the previous
    // incarnation without registering them in the descriptor).
    //
    // Note that PrevLogNumber() is no longer used, but we pay
    // attention to it in case we are recovering a database
800
    // produced by an older version of rocksdb.
801
    const uint64_t min_log = versions_->MinLogNumber();
802 803
    const uint64_t prev_log = versions_->PrevLogNumber();
    std::vector<std::string> filenames;
804
    s = env_->GetChildren(options_.wal_dir, &filenames);
805 806
    if (!s.ok()) {
      return s;
807
    }
808 809 810 811 812 813 814 815 816
    uint64_t number;
    FileType type;
    std::vector<uint64_t> logs;
    for (size_t i = 0; i < filenames.size(); i++) {
      if (ParseFileName(filenames[i], &number, &type)
          && type == kLogFile
          && ((number >= min_log) || (number == prev_log))) {
        logs.push_back(number);
      }
J
jorlow@chromium.org 已提交
817
    }
818

H
heyongqiang 已提交
819 820 821 822 823 824
    if (logs.size() > 0 && error_if_log_file_exist) {
      return Status::Corruption(""
          "The db was opened in readonly mode with error_if_log_file_exist"
          "flag but a log file already exists");
    }

825 826
    // Recover in the order in which the logs were generated
    std::sort(logs.begin(), logs.end());
I
Igor Canadi 已提交
827
    for (size_t i = 0; s.ok() && i < logs.size(); i++) {
828 829 830 831
      // The previous incarnation may not have written any MANIFEST
      // records after allocating this log number.  So we manually
      // update the file number allocation counter in VersionSet.
      versions_->MarkFileNumberUsed(logs[i]);
I
Igor Canadi 已提交
832
      s = RecoverLogFile(logs[i], &max_sequence, read_only);
833 834
    }

J
jorlow@chromium.org 已提交
835
    if (s.ok()) {
836 837 838
      if (versions_->LastSequence() < max_sequence) {
        versions_->SetLastSequence(max_sequence);
      }
839
      SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
840
                     versions_->LastSequence());
J
jorlow@chromium.org 已提交
841 842 843 844 845 846
    }
  }

  return s;
}

I
Igor Canadi 已提交
847 848
Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
                              bool read_only) {
J
jorlow@chromium.org 已提交
849 850
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
851
    Logger* info_log;
J
jorlow@chromium.org 已提交
852
    const char* fname;
853 854
    Status* status;  // nullptr if options_.paranoid_checks==false or
                     //            options_.skip_log_error_on_recovery==true
J
jorlow@chromium.org 已提交
855
    virtual void Corruption(size_t bytes, const Status& s) {
856
      Log(info_log, "%s%s: dropping %d bytes; %s",
857
          (this->status == nullptr ? "(ignoring error) " : ""),
J
jorlow@chromium.org 已提交
858
          fname, static_cast<int>(bytes), s.ToString().c_str());
859
      if (this->status != nullptr && this->status->ok()) *this->status = s;
J
jorlow@chromium.org 已提交
860 861 862 863 864
    }
  };

  mutex_.AssertHeld();

865 866 867
  std::unordered_map<int, VersionEdit> version_edits;
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    VersionEdit edit;
868 869
    edit.SetColumnFamily(cfd->GetID());
    version_edits.insert({cfd->GetID(), edit});
870
  }
I
Igor Canadi 已提交
871

J
jorlow@chromium.org 已提交
872
  // Open the log file
873
  std::string fname = LogFileName(options_.wal_dir, log_number);
874
  unique_ptr<SequentialFile> file;
875
  Status status = env_->NewSequentialFile(fname, &file, storage_options_);
J
jorlow@chromium.org 已提交
876 877 878 879 880 881 882 883
  if (!status.ok()) {
    MaybeIgnoreError(&status);
    return status;
  }

  // Create the log reader.
  LogReporter reporter;
  reporter.env = env_;
884
  reporter.info_log = options_.info_log.get();
J
jorlow@chromium.org 已提交
885
  reporter.fname = fname.c_str();
886 887
  reporter.status = (options_.paranoid_checks &&
                     !options_.skip_log_error_on_recovery ? &status : nullptr);
J
jorlow@chromium.org 已提交
888 889 890 891
  // We intentially make log::Reader do checksumming even if
  // paranoid_checks==false so that corruptions cause entire commits
  // to be skipped instead of propagating bad information (like overly
  // large sequence numbers).
892
  log::Reader reader(std::move(file), &reporter, true/*checksum*/,
893
                     0/*initial_offset*/);
K
Kai Liu 已提交
894 895
  Log(options_.info_log, "Recovering log #%lu",
      (unsigned long) log_number);
J
jorlow@chromium.org 已提交
896 897 898 899 900

  // Read all the records and add to a memtable
  std::string scratch;
  Slice record;
  WriteBatch batch;
I
Igor Canadi 已提交
901
  while (reader.ReadRecord(&record, &scratch)) {
J
jorlow@chromium.org 已提交
902 903 904 905 906 907 908
    if (record.size() < 12) {
      reporter.Corruption(
          record.size(), Status::Corruption("log record too small"));
      continue;
    }
    WriteBatchInternal::SetContents(&batch, record);

909 910 911 912 913 914 915
    // filter out all the column families that have already
    // flushed memtables with log_number
    column_family_memtables_->SetLogNumber(log_number);
    status = WriteBatchInternal::InsertInto(
        &batch, column_family_memtables_.get(), &options_);
    column_family_memtables_->SetLogNumber(0);

J
jorlow@chromium.org 已提交
916 917
    MaybeIgnoreError(&status);
    if (!status.ok()) {
I
Igor Canadi 已提交
918
      return status;
J
jorlow@chromium.org 已提交
919 920 921 922 923 924 925 926
    }
    const SequenceNumber last_seq =
        WriteBatchInternal::Sequence(&batch) +
        WriteBatchInternal::Count(&batch) - 1;
    if (last_seq > *max_sequence) {
      *max_sequence = last_seq;
    }

927 928
    if (!read_only) {
      for (auto cfd : *versions_->GetColumnFamilySet()) {
929 930
        if (cfd->mem()->ApproximateMemoryUsage() >
            cfd->options()->write_buffer_size) {
931 932 933
          // If this asserts, it means that ColumnFamilyMemTablesImpl failed in
          // filtering updates to already-flushed column families
          assert(cfd->GetLogNumber() <= log_number);
934
          auto iter = version_edits.find(cfd->GetID());
935 936
          assert(iter != version_edits.end());
          VersionEdit* edit = &iter->second;
937
          status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
938 939 940 941 942 943 944 945
          // we still want to clear the memtable, even if the recovery failed
          cfd->CreateNewMemtable();
          if (!status.ok()) {
            // Reflect errors immediately so that conditions like full
            // file-systems cause the DB::Open() to fail.
            return status;
          }
        }
J
jorlow@chromium.org 已提交
946 947 948 949
      }
    }
  }

950 951
  if (!read_only) {
    for (auto cfd : *versions_->GetColumnFamilySet()) {
952
      auto iter = version_edits.find(cfd->GetID());
953 954 955
      assert(iter != version_edits.end());
      VersionEdit* edit = &iter->second;

956 957 958 959 960 961 962 963 964 965 966 967
      if (cfd->GetLogNumber() > log_number) {
        // Column family cfd has already flushed the data
        // from log_number. Memtable has to be empty because
        // we filter the updates based on log_number
        // (in ColumnFamilyMemTablesImpl)
        assert(cfd->mem()->GetFirstSequenceNumber() == 0);
        assert(edit->NumEntries() == 0);
        continue;
      }

      // flush the final memtable (if non-empty)
      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
968
        status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
969
      }
970 971 972 973 974
      // we still want to clear the memtable, even if the recovery failed
      cfd->CreateNewMemtable();
      if (!status.ok()) {
        return status;
      }
J
jorlow@chromium.org 已提交
975

976 977 978 979 980 981 982
      // write MANIFEST with update
      // writing log number in the manifest means that any log file
      // with number strongly less than (log_number + 1) is already
      // recovered and should be ignored on next reincarnation.
      // Since we already recovered log_number, we want all logs
      // with numbers `<= log_number` (includes this one) to be ignored
      edit->SetLogNumber(log_number + 1);
983 984 985 986 987 988
      // we must mark the next log number as used, even though it's
      // not actually used. that is because VersionSet assumes
      // VersionSet::next_file_number_ always to be strictly greater than any
      // log
      // number
      versions_->MarkFileNumberUsed(log_number + 1);
989 990 991 992 993
      status = versions_->LogAndApply(cfd, edit, &mutex_);
      if (!status.ok()) {
        return status;
      }
    }
994
  }
I
Igor Canadi 已提交
995

J
jorlow@chromium.org 已提交
996 997 998
  return status;
}

999 1000
Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
                                           VersionEdit* edit) {
J
jorlow@chromium.org 已提交
1001
  mutex_.AssertHeld();
1002
  const uint64_t start_micros = env_->NowMicros();
J
jorlow@chromium.org 已提交
1003 1004 1005 1006
  FileMetaData meta;
  meta.number = versions_->NewFileNumber();
  pending_outputs_.insert(meta.number);
  Iterator* iter = mem->NewIterator();
1007 1008 1009
  const SequenceNumber newest_snapshot = snapshots_.GetNewest();
  const SequenceNumber earliest_seqno_in_memtable =
    mem->GetFirstSequenceNumber();
K
Kai Liu 已提交
1010 1011
  Log(options_.info_log, "Level-0 table #%lu: started",
      (unsigned long) meta.number);
1012 1013 1014 1015

  Status s;
  {
    mutex_.Unlock();
1016
    s = BuildTable(dbname_, env_, options_, storage_options_,
I
Igor Canadi 已提交
1017
                   cfd->table_cache(), iter, &meta, cfd->user_comparator(),
1018
                   newest_snapshot, earliest_seqno_in_memtable,
1019
                   GetCompressionFlush(options_));
I
Igor Canadi 已提交
1020
    LogFlush(options_.info_log);
1021 1022 1023
    mutex_.Lock();
  }

K
Kai Liu 已提交
1024 1025 1026
  Log(options_.info_log, "Level-0 table #%lu: %lu bytes %s",
      (unsigned long) meta.number,
      (unsigned long) meta.file_size,
J
jorlow@chromium.org 已提交
1027 1028
      s.ToString().c_str());
  delete iter;
1029

1030
  pending_outputs_.erase(meta.number);
1031 1032 1033 1034 1035 1036

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.file_size > 0) {
    edit->AddFile(level, meta.number, meta.file_size,
1037 1038
                  meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
1039 1040
  }

I
Igor Canadi 已提交
1041
  InternalStats::CompactionStats stats;
1042 1043
  stats.micros = env_->NowMicros() - start_micros;
  stats.bytes_written = meta.file_size;
M
Mark Callaghan 已提交
1044
  stats.files_out_levelnp1 = 1;
1045
  cfd->internal_stats()->AddCompactionStats(level, stats);
1046
  RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
J
jorlow@chromium.org 已提交
1047 1048 1049
  return s;
}

1050 1051
Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
                                std::vector<MemTable*>& mems, VersionEdit* edit,
1052
                                uint64_t* filenumber) {
J
jorlow@chromium.org 已提交
1053
  mutex_.AssertHeld();
1054 1055 1056 1057 1058
  const uint64_t start_micros = env_->NowMicros();
  FileMetaData meta;
  meta.number = versions_->NewFileNumber();
  *filenumber = meta.number;
  pending_outputs_.insert(meta.number);
1059

1060 1061
  const SequenceNumber newest_snapshot = snapshots_.GetNewest();
  const SequenceNumber earliest_seqno_in_memtable =
1062
    mems[0]->GetFirstSequenceNumber();
1063
  Version* base = cfd->current();
1064
  base->Ref();          // it is likely that we do not need this reference
1065 1066 1067
  Status s;
  {
    mutex_.Unlock();
1068 1069 1070 1071 1072 1073 1074
    std::vector<Iterator*> list;
    for (MemTable* m : mems) {
      Log(options_.info_log,
          "Flushing memtable with log file: %lu\n",
          (unsigned long)m->GetLogNumber());
      list.push_back(m->NewIterator());
    }
I
Igor Canadi 已提交
1075 1076
    Iterator* iter =
        NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size());
1077 1078 1079
    Log(options_.info_log,
        "Level-0 flush table #%lu: started",
        (unsigned long)meta.number);
1080

1081
    s = BuildTable(dbname_, env_, options_, storage_options_,
I
Igor Canadi 已提交
1082
                   cfd->table_cache(), iter, &meta, cfd->user_comparator(),
1083 1084
                   newest_snapshot, earliest_seqno_in_memtable,
                   GetCompressionFlush(options_));
I
Igor Canadi 已提交
1085
    LogFlush(options_.info_log);
1086 1087 1088 1089 1090
    delete iter;
    Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s",
        (unsigned long) meta.number,
        (unsigned long) meta.file_size,
        s.ToString().c_str());
1091 1092 1093
    if (!options_.disableDataSync) {
      db_directory_->Fsync();
    }
1094 1095
    mutex_.Lock();
  }
1096 1097
  base->Unref();

1098 1099

  // re-acquire the most current version
1100
  base = cfd->current();
1101 1102 1103 1104 1105 1106

  // There could be multiple threads writing to its own level-0 file.
  // The pending_outputs cannot be cleared here, otherwise this newly
  // created file might not be considered as a live-file by another
  // compaction thread that is concurrently deleting obselete files.
  // The pending_outputs can be cleared only after the new version is
A
Abhishek Kona 已提交
1107
  // committed so that other threads can recognize this file as a
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
  // valid one.
  // pending_outputs_.erase(meta.number);

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.file_size > 0) {
    const Slice min_user_key = meta.smallest.user_key();
    const Slice max_user_key = meta.largest.user_key();
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
1121
    if (base != nullptr && options_.max_background_compactions <= 1 &&
1122
        cfd->options()->compaction_style == kCompactionStyleLevel) {
1123 1124 1125
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
    }
    edit->AddFile(level, meta.number, meta.file_size,
1126 1127
                  meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
1128 1129
  }

I
Igor Canadi 已提交
1130
  InternalStats::CompactionStats stats;
1131 1132
  stats.micros = env_->NowMicros() - start_micros;
  stats.bytes_written = meta.file_size;
1133
  cfd->internal_stats()->AddCompactionStats(level, stats);
1134
  RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
1135 1136 1137
  return s;
}

1138 1139
Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
                                         bool* madeProgress,
I
Igor Canadi 已提交
1140
                                         DeletionState& deletion_state) {
1141
  mutex_.AssertHeld();
1142
  assert(cfd->imm()->size() != 0);
1143

1144
  if (!cfd->imm()->IsFlushPending()) {
1145
    Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
1146
    return Status::IOError("FlushMemTableToOutputFile already in progress");
1147 1148 1149 1150
  }

  // Save the contents of the earliest memtable as a new Table
  uint64_t file_number;
1151
  std::vector<MemTable*> mems;
1152
  cfd->imm()->PickMemtablesToFlush(&mems);
1153
  if (mems.empty()) {
1154
    Log(options_.info_log, "Nothing in memstore to flush");
1155
    return Status::IOError("Nothing in memstore to flush");
1156 1157 1158
  }

  // record the logfile_number_ before we release the mutex
1159 1160 1161
  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
1162
  MemTable* m = mems[0];
1163 1164
  VersionEdit* edit = m->GetEdits();
  edit->SetPrevLogNumber(0);
1165 1166
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
1167 1168
  edit->SetLogNumber(mems.back()->GetNextLogNumber());
  edit->SetColumnFamily(cfd->GetID());
1169 1170 1171 1172 1173

  std::vector<uint64_t> logs_to_delete;
  for (auto mem : mems) {
    logs_to_delete.push_back(mem->GetLogNumber());
  }
1174

1175
  // This will release and re-acquire the mutex.
1176
  Status s = WriteLevel0Table(cfd, mems, edit, &file_number);
1177

1178
  if (s.ok() && shutting_down_.Acquire_Load()) {
1179 1180 1181
    s = Status::IOError(
      "Database shutdown started during memtable compaction"
    );
1182
  }
J
jorlow@chromium.org 已提交
1183

1184
  // Replace immutable memtable with the generated Table
1185 1186
  s = cfd->imm()->InstallMemtableFlushResults(
      cfd, mems, versions_.get(), s, &mutex_, options_.info_log.get(),
1187 1188
      file_number, pending_outputs_, &deletion_state.memtables_to_free,
      db_directory_.get());
J
jorlow@chromium.org 已提交
1189 1190

  if (s.ok()) {
1191
    InstallSuperVersion(cfd, deletion_state);
1192 1193 1194
    if (madeProgress) {
      *madeProgress = 1;
    }
1195

1196
    MaybeScheduleLogDBDeployStats();
I
Igor Canadi 已提交
1197

1198
    if (disable_delete_obsolete_files_ == 0) {
I
Igor Canadi 已提交
1199
      // add to deletion state
1200 1201 1202 1203
      deletion_state.log_delete_files.insert(
          deletion_state.log_delete_files.end(),
          logs_to_delete.begin(),
          logs_to_delete.end());
1204
    }
J
jorlow@chromium.org 已提交
1205 1206 1207 1208
  }
  return s;
}

1209 1210 1211
Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
                            const Slice* begin, const Slice* end,
                            bool reduce_level, int target_level) {
I
Igor Canadi 已提交
1212 1213 1214 1215 1216 1217 1218 1219
  mutex_.Lock();
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
  mutex_.Unlock();
  // this is asserting because client calling DB methods with undefined
  // ColumnFamilyHandle is undefined behavior.
  assert(cfd != nullptr);

  Status s = FlushMemTable(cfd, FlushOptions());
L
Lei Jin 已提交
1220 1221 1222 1223 1224
  if (!s.ok()) {
    LogFlush(options_.info_log);
    return s;
  }

G
Gabor Cselle 已提交
1225 1226 1227
  int max_level_with_files = 1;
  {
    MutexLock l(&mutex_);
I
Igor Canadi 已提交
1228 1229
    Version* base = cfd->current();
    for (int level = 1; level < cfd->NumberLevels(); level++) {
G
Gabor Cselle 已提交
1230 1231 1232 1233 1234
      if (base->OverlapInLevel(level, begin, end)) {
        max_level_with_files = level;
      }
    }
  }
1235 1236 1237
  for (int level = 0; level <= max_level_with_files; level++) {
    // in case the compaction is unversal or if we're compacting the
    // bottom-most level, the output level will be the same as input one
I
Igor Canadi 已提交
1238
    if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
1239
        level == max_level_with_files) {
I
Igor Canadi 已提交
1240
      s = RunManualCompaction(cfd, level, level, begin, end);
1241
    } else {
I
Igor Canadi 已提交
1242
      s = RunManualCompaction(cfd, level, level + 1, begin, end);
L
Lei Jin 已提交
1243 1244 1245 1246
    }
    if (!s.ok()) {
      LogFlush(options_.info_log);
      return s;
1247
    }
G
Gabor Cselle 已提交
1248
  }
1249 1250

  if (reduce_level) {
I
Igor Canadi 已提交
1251
    s = ReFitLevel(cfd, max_level_with_files, target_level);
1252
  }
I
Igor Canadi 已提交
1253
  LogFlush(options_.info_log);
L
Lei Jin 已提交
1254 1255

  return s;
1256 1257 1258
}

// return the same level if it cannot be moved
I
Igor Canadi 已提交
1259
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) {
1260
  mutex_.AssertHeld();
I
Igor Canadi 已提交
1261
  Version* current = cfd->current();
1262
  int minimum_level = level;
1263
  for (int i = level - 1; i > 0; --i) {
1264
    // stop if level i is not empty
1265
    if (current->NumLevelFiles(i) > 0) break;
1266
    // stop if level i is too small (cannot fit the level files)
I
Igor Canadi 已提交
1267
    if (cfd->compaction_picker()->MaxBytesForLevel(i) <
1268 1269 1270
        current->NumLevelBytes(level)) {
      break;
    }
1271 1272 1273 1274 1275 1276

    minimum_level = i;
  }
  return minimum_level;
}

I
Igor Canadi 已提交
1277 1278
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
  assert(level < cfd->NumberLevels());
1279

I
Igor Canadi 已提交
1280
  SuperVersion* superversion_to_free = nullptr;
1281
  SuperVersion* new_superversion = new SuperVersion();
I
Igor Canadi 已提交
1282 1283

  mutex_.Lock();
1284 1285 1286

  // only allow one thread refitting
  if (refitting_level_) {
I
Igor Canadi 已提交
1287
    mutex_.Unlock();
1288
    Log(options_.info_log, "ReFitLevel: another thread is refitting");
I
Igor Canadi 已提交
1289
    delete new_superversion;
L
Lei Jin 已提交
1290
    return Status::NotSupported("another thread is refitting");
1291 1292 1293 1294 1295
  }
  refitting_level_ = true;

  // wait for all background threads to stop
  bg_work_gate_closed_ = true;
1296
  while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) {
1297
    Log(options_.info_log,
1298 1299
        "RefitLevel: waiting for background threads to stop: %d %d",
        bg_compaction_scheduled_, bg_flush_scheduled_);
1300 1301 1302 1303
    bg_cv_.Wait();
  }

  // move to a smaller level
1304 1305
  int to_level = target_level;
  if (target_level < 0) {
I
Igor Canadi 已提交
1306
    to_level = FindMinimumEmptyLevelFitting(cfd, level);
1307
  }
1308 1309 1310

  assert(to_level <= level);

L
Lei Jin 已提交
1311
  Status status;
1312 1313
  if (to_level < level) {
    Log(options_.info_log, "Before refitting:\n%s",
I
Igor Canadi 已提交
1314
        cfd->current()->DebugString().data());
1315

1316
    VersionEdit edit;
I
Igor Canadi 已提交
1317 1318
    edit.SetColumnFamily(cfd->GetID());
    for (const auto& f : cfd->current()->files_[level]) {
1319
      edit.DeleteFile(level, f->number);
1320 1321
      edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
                   f->smallest_seqno, f->largest_seqno);
1322 1323 1324 1325
    }
    Log(options_.info_log, "Apply version edit:\n%s",
        edit.DebugString().data());

I
Igor Canadi 已提交
1326 1327
    status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
    superversion_to_free = cfd->InstallSuperVersion(new_superversion);
I
Igor Canadi 已提交
1328
    new_superversion = nullptr;
1329 1330 1331 1332 1333

    Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());

    if (status.ok()) {
      Log(options_.info_log, "After refitting:\n%s",
I
Igor Canadi 已提交
1334
          cfd->current()->DebugString().data());
1335 1336 1337 1338 1339
    }
  }

  refitting_level_ = false;
  bg_work_gate_closed_ = false;
I
Igor Canadi 已提交
1340 1341 1342 1343

  mutex_.Unlock();
  delete superversion_to_free;
  delete new_superversion;
L
Lei Jin 已提交
1344
  return status;
G
Gabor Cselle 已提交
1345 1346
}

1347
int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) {
I
Igor Canadi 已提交
1348 1349 1350 1351 1352
  mutex_.Lock();
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
  mutex_.Unlock();
  assert(cfd != nullptr);
  return cfd->NumberLevels();
1353 1354
}

1355
int DBImpl::MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) {
I
Igor Canadi 已提交
1356 1357 1358 1359 1360
  mutex_.Lock();
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
  mutex_.Unlock();
  assert(cfd != nullptr);
  return cfd->options()->max_mem_compaction_level;
1361 1362
}

1363
int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) {
I
Igor Canadi 已提交
1364 1365 1366 1367 1368
  mutex_.Lock();
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
  mutex_.Unlock();
  assert(cfd != nullptr);
  return cfd->options()->level0_stop_writes_trigger;
1369 1370
}

1371 1372
Status DBImpl::Flush(const FlushOptions& options,
                     const ColumnFamilyHandle& column_family) {
1373 1374 1375 1376 1377 1378
  mutex_.Lock();
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
  mutex_.Unlock();
  assert(cfd != nullptr);

  return FlushMemTable(cfd, options);
H
heyongqiang 已提交
1379 1380
}

1381
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1382 1383 1384
  return versions_->LastSequence();
}

1385
Status DBImpl::GetUpdatesSince(SequenceNumber seq,
1386
                               unique_ptr<TransactionLogIterator>* iter) {
1387

1388
  RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS);
1389
  if (seq > versions_->LastSequence()) {
1390 1391 1392
    return Status::IOError("Requested sequence not yet written in the db");
  }
  //  Get all sorted Wal Files.
1393 1394
  //  Do binary search and open files and find the seq number.

1395 1396
  std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
  Status s = GetSortedWalFiles(*wal_files);
1397 1398 1399 1400
  if (!s.ok()) {
    return s;
  }

1401
  s = RetainProbableWalFiles(*wal_files, seq);
1402 1403
  if (!s.ok()) {
    return s;
1404
  }
1405
  iter->reset(
1406
    new TransactionLogIteratorImpl(options_.wal_dir,
1407
                                   &options_,
1408
                                   storage_options_,
1409
                                   seq,
1410
                                   std::move(wal_files),
1411
                                   this));
1412
  return (*iter)->status();
1413 1414
}

1415 1416
Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
                                      const SequenceNumber target) {
1417
  long start = 0; // signed to avoid overflow when target is < first file.
1418
  long end = static_cast<long>(all_logs.size()) - 1;
1419
  // Binary Search. avoid opening all files.
1420 1421
  while (end >= start) {
    long mid = start + (end - start) / 2;  // Avoid overflow.
1422 1423
    SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
    if (current_seq_num == target) {
1424
      end = mid;
1425
      break;
1426
    } else if (current_seq_num < target) {
1427
      start = mid + 1;
1428
    } else {
1429
      end = mid - 1;
1430 1431
    }
  }
1432 1433 1434
  size_t start_index = std::max(0l, end); // end could be -ve.
  // The last wal file is always included
  all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
1435 1436 1437
  return Status::OK();
}

1438 1439 1440
bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type,
                                        const uint64_t number) {
  const std::string fname = (type == kAliveLogFile) ?
1441 1442
    LogFileName(options_.wal_dir, number) :
    ArchivedLogFileName(options_.wal_dir, number);
1443 1444
  uint64_t file_size;
  Status s = env_->GetFileSize(fname, &file_size);
1445
  return (s.ok() && (file_size == 0));
1446 1447
}

1448 1449
Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
                               WriteBatch* const result) {
1450

1451
  if (type == kAliveLogFile) {
1452
    std::string fname = LogFileName(options_.wal_dir, number);
1453 1454 1455
    Status status = ReadFirstLine(fname, result);
    if (!status.ok()) {
      //  check if the file got moved to archive.
1456 1457
      std::string archived_file =
        ArchivedLogFileName(options_.wal_dir, number);
1458
      Status s = ReadFirstLine(archived_file, result);
1459
      if (!s.ok()) {
1460
        return Status::IOError("Log File has been deleted: " + archived_file);
1461 1462 1463
      }
    }
    return Status::OK();
1464
  } else if (type == kArchivedLogFile) {
1465
    std::string fname = ArchivedLogFileName(options_.wal_dir, number);
1466 1467 1468
    Status status = ReadFirstLine(fname, result);
    return status;
  }
1469
  return Status::NotSupported("File Type Not Known: " + std::to_string(type));
1470 1471 1472 1473 1474 1475 1476 1477
}

Status DBImpl::ReadFirstLine(const std::string& fname,
                             WriteBatch* const batch) {
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
    Logger* info_log;
    const char* fname;
1478
    Status* status;  // nullptr if options_.paranoid_checks==false
1479 1480
    virtual void Corruption(size_t bytes, const Status& s) {
      Log(info_log, "%s%s: dropping %d bytes; %s",
1481
          (this->status == nullptr ? "(ignoring error) " : ""),
1482
          fname, static_cast<int>(bytes), s.ToString().c_str());
1483
      if (this->status != nullptr && this->status->ok()) *this->status = s;
1484 1485 1486
    }
  };

1487
  unique_ptr<SequentialFile> file;
1488
  Status status = env_->NewSequentialFile(fname, &file, storage_options_);
1489 1490 1491 1492 1493 1494 1495 1496

  if (!status.ok()) {
    return status;
  }


  LogReporter reporter;
  reporter.env = env_;
1497
  reporter.info_log = options_.info_log.get();
1498
  reporter.fname = fname.c_str();
1499
  reporter.status = (options_.paranoid_checks ? &status : nullptr);
1500
  log::Reader reader(std::move(file), &reporter, true/*checksum*/,
1501 1502 1503
                     0/*initial_offset*/);
  std::string scratch;
  Slice record;
1504

1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
  if (reader.ReadRecord(&record, &scratch) && status.ok()) {
    if (record.size() < 12) {
      reporter.Corruption(
          record.size(), Status::Corruption("log record too small"));
      return Status::IOError("Corruption noted");
      //  TODO read record's till the first no corrupt entry?
    }
    WriteBatchInternal::SetContents(batch, record);
    return Status::OK();
  }
  return Status::IOError("Error reading from file " + fname);
}

1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530
struct CompareLogByPointer {
  bool operator() (const unique_ptr<LogFile>& a,
                   const unique_ptr<LogFile>& b) {
    LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
    LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
    return *a_impl < *b_impl;
  }
};

Status DBImpl::AppendSortedWalsOfType(const std::string& path,
    VectorLogPtr& log_files, WalFileType log_type) {
  std::vector<std::string> all_files;
  const Status status = env_->GetChildren(path, &all_files);
1531 1532 1533
  if (!status.ok()) {
    return status;
  }
1534
  log_files.reserve(log_files.size() + all_files.size());
1535 1536 1537 1538 1539 1540
  VectorLogPtr::iterator pos_start;
  if (!log_files.empty()) {
    pos_start = log_files.end() - 1;
  } else {
    pos_start = log_files.begin();
  }
1541
  for (const auto& f : all_files) {
1542 1543
    uint64_t number;
    FileType type;
1544
    if (ParseFileName(f, &number, &type) && type == kLogFile){
1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562

      WriteBatch batch;
      Status s = ReadFirstRecord(log_type, number, &batch);
      if (!s.ok()) {
        if (CheckWalFileExistsAndEmpty(log_type, number)) {
          continue;
        }
        return s;
      }

      uint64_t size_bytes;
      s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
      if (!s.ok()) {
        return s;
      }

      log_files.push_back(std::move(unique_ptr<LogFile>(new LogFileImpl(
        number, log_type, WriteBatchInternal::Sequence(&batch), size_bytes))));
1563 1564
    }
  }
1565
  CompareLogByPointer compare_log_files;
1566
  std::sort(pos_start, log_files.end(), compare_log_files);
1567 1568 1569
  return status;
}

I
Igor Canadi 已提交
1570 1571
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
                                   int output_level, const Slice* begin,
L
Lei Jin 已提交
1572
                                   const Slice* end) {
1573
  assert(input_level >= 0);
1574

G
Gabor Cselle 已提交
1575 1576
  InternalKey begin_storage, end_storage;

H
hans@chromium.org 已提交
1577
  ManualCompaction manual;
I
Igor Canadi 已提交
1578
  manual.cfd = cfd;
1579 1580
  manual.input_level = input_level;
  manual.output_level = output_level;
G
Gabor Cselle 已提交
1581
  manual.done = false;
1582
  manual.in_progress = false;
1583 1584 1585
  // For universal compaction, we enforce every manual compaction to compact
  // all files.
  if (begin == nullptr ||
I
Igor Canadi 已提交
1586
      cfd->options()->compaction_style == kCompactionStyleUniversal) {
1587
    manual.begin = nullptr;
G
Gabor Cselle 已提交
1588 1589 1590 1591
  } else {
    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
    manual.begin = &begin_storage;
  }
1592
  if (end == nullptr ||
I
Igor Canadi 已提交
1593
      cfd->options()->compaction_style == kCompactionStyleUniversal) {
1594
    manual.end = nullptr;
G
Gabor Cselle 已提交
1595 1596 1597 1598 1599 1600
  } else {
    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
    manual.end = &end_storage;
  }

  MutexLock l(&mutex_);
1601

1602 1603 1604 1605 1606 1607
  // When a manual compaction arrives, temporarily disable scheduling of
  // non-manual compactions and wait until the number of scheduled compaction
  // jobs drops to zero. This is needed to ensure that this manual compaction
  // can compact any range of keys/files.
  //
  // bg_manual_only_ is non-zero when at least one thread is inside
1608
  // RunManualCompaction(), i.e. during that time no other compaction will
1609 1610 1611
  // get scheduled (see MaybeScheduleFlushOrCompaction).
  //
  // Note that the following loop doesn't stop more that one thread calling
1612
  // RunManualCompaction() from getting to the second while loop below.
1613 1614 1615 1616 1617 1618 1619 1620
  // However, only one of them will actually schedule compaction, while
  // others will wait on a condition variable until it completes.

  ++bg_manual_only_;
  while (bg_compaction_scheduled_ > 0) {
    Log(options_.info_log,
        "Manual compaction waiting for all other scheduled background "
        "compactions to finish");
1621 1622
    bg_cv_.Wait();
  }
1623

1624 1625
  Log(options_.info_log, "Manual compaction starting");

1626 1627 1628 1629
  while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
    assert(bg_manual_only_ > 0);
    if (manual_compaction_ != nullptr) {
      // Running either this or some other manual compaction
G
Gabor Cselle 已提交
1630
      bg_cv_.Wait();
1631 1632 1633
    } else {
      manual_compaction_ = &manual;
      MaybeScheduleFlushOrCompaction();
G
Gabor Cselle 已提交
1634
    }
H
hans@chromium.org 已提交
1635
  }
1636

1637 1638 1639
  assert(!manual.in_progress);
  assert(bg_manual_only_ > 0);
  --bg_manual_only_;
L
Lei Jin 已提交
1640
  return manual.status;
J
jorlow@chromium.org 已提交
1641 1642
}

L
Lei Jin 已提交
1643 1644 1645
Status DBImpl::TEST_CompactRange(int level,
                                 const Slice* begin,
                                 const Slice* end) {
1646 1647 1648
  int output_level = (options_.compaction_style == kCompactionStyleUniversal)
                         ? level
                         : level + 1;
I
Igor Canadi 已提交
1649
  return RunManualCompaction(default_cfd_, level, output_level, begin, end);
1650 1651
}

1652 1653
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                             const FlushOptions& options) {
1654 1655
  // nullptr batch means just wait for earlier writes to be done
  Status s = Write(WriteOptions(), nullptr);
H
heyongqiang 已提交
1656
  if (s.ok() && options.wait) {
1657
    // Wait until the compaction completes
1658
    s = WaitForFlushMemTable(cfd);
1659 1660
  }
  return s;
J
jorlow@chromium.org 已提交
1661 1662
}

1663
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
1664 1665 1666
  Status s;
  // Wait until the compaction completes
  MutexLock l(&mutex_);
1667
  while (cfd->imm()->size() > 0 && bg_error_.ok()) {
1668 1669
    bg_cv_.Wait();
  }
1670
  if (!bg_error_.ok()) {
1671 1672 1673
    s = bg_error_;
  }
  return s;
H
heyongqiang 已提交
1674 1675
}

1676
Status DBImpl::TEST_FlushMemTable() {
1677
  return FlushMemTable(default_cfd_, FlushOptions());
H
heyongqiang 已提交
1678 1679
}

1680
Status DBImpl::TEST_WaitForFlushMemTable() {
1681
  return WaitForFlushMemTable(default_cfd_);
1682 1683 1684
}

Status DBImpl::TEST_WaitForCompact() {
1685
  // Wait until the compaction completes
1686 1687 1688 1689 1690

  // TODO: a bug here. This function actually does not necessarily
  // wait for compact. It actually waits for scheduled compaction
  // OR flush to finish.

1691
  MutexLock l(&mutex_);
1692 1693
  while ((bg_compaction_scheduled_ || bg_flush_scheduled_) &&
         bg_error_.ok()) {
1694 1695 1696
    bg_cv_.Wait();
  }
  return bg_error_;
1697 1698
}

1699
void DBImpl::MaybeScheduleFlushOrCompaction() {
J
jorlow@chromium.org 已提交
1700
  mutex_.AssertHeld();
1701 1702
  if (bg_work_gate_closed_) {
    // gate closed for backgrond work
J
jorlow@chromium.org 已提交
1703 1704 1705
  } else if (shutting_down_.Acquire_Load()) {
    // DB is being deleted; no more background compactions
  } else {
1706 1707 1708 1709 1710 1711
    bool is_flush_pending = false;
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->imm()->IsFlushPending()) {
        is_flush_pending = true;
      }
    }
1712 1713 1714 1715 1716 1717
    if (is_flush_pending &&
        (bg_flush_scheduled_ < options_.max_background_flushes)) {
      // memtable flush needed
      bg_flush_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
    }
1718 1719 1720 1721 1722 1723 1724
    bool is_compaction_needed = false;
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      if (cfd->current()->NeedsCompaction()) {
        is_compaction_needed = true;
        break;
      }
    }
1725

1726 1727 1728 1729
    // Schedule BGWorkCompaction if there's a compaction pending (or a memtable
    // flush, but the HIGH pool is not enabled). Do it only if
    // max_background_compactions hasn't been reached and, in case
    // bg_manual_only_ > 0, if it's a manual compaction.
1730
    if ((manual_compaction_ || is_compaction_needed ||
1731
         (is_flush_pending && (options_.max_background_flushes <= 0))) &&
1732 1733 1734
        bg_compaction_scheduled_ < options_.max_background_compactions &&
        (!bg_manual_only_ || manual_compaction_)) {

1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748
      bg_compaction_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
    }
  }
}

void DBImpl::BGWorkFlush(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
}

void DBImpl::BGWorkCompaction(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}

I
Igor Canadi 已提交
1749 1750
Status DBImpl::BackgroundFlush(bool* madeProgress,
                               DeletionState& deletion_state) {
1751
  Status stat;
1752 1753 1754 1755 1756 1757 1758 1759
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    while (stat.ok() && cfd->imm()->IsFlushPending()) {
      Log(options_.info_log,
          "BackgroundCallFlush doing FlushMemTableToOutputFile with column "
          "family %u, flush slots available %d",
          cfd->GetID(), options_.max_background_flushes - bg_flush_scheduled_);
      stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state);
    }
J
jorlow@chromium.org 已提交
1760
  }
1761
  return stat;
J
jorlow@chromium.org 已提交
1762 1763
}

1764
void DBImpl::BackgroundCallFlush() {
1765
  bool madeProgress = false;
I
Igor Canadi 已提交
1766
  DeletionState deletion_state(options_.max_write_buffer_number, true);
1767 1768 1769
  assert(bg_flush_scheduled_);
  MutexLock l(&mutex_);

I
Igor Canadi 已提交
1770
  Status s;
1771
  if (!shutting_down_.Acquire_Load()) {
I
Igor Canadi 已提交
1772
    s = BackgroundFlush(&madeProgress, deletion_state);
1773 1774 1775 1776 1777 1778 1779 1780 1781
    if (!s.ok()) {
      // Wait a little bit before retrying background compaction in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed compactions for the duration of
      // the problem.
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      Log(options_.info_log, "Waiting after background flush error: %s",
          s.ToString().c_str());
      mutex_.Unlock();
I
Igor Canadi 已提交
1782
      LogFlush(options_.info_log);
1783 1784 1785 1786 1787
      env_->SleepForMicroseconds(1000000);
      mutex_.Lock();
    }
  }

I
Igor Canadi 已提交
1788 1789 1790
  // If !s.ok(), this means that Flush failed. In that case, we want
  // to delete all obsolete files and we force FindObsoleteFiles()
  FindObsoleteFiles(deletion_state, !s.ok());
I
Igor Canadi 已提交
1791
  // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1792 1793 1794 1795 1796 1797
  if (deletion_state.HaveSomethingToDelete()) {
    mutex_.Unlock();
    PurgeObsoleteFiles(deletion_state);
    mutex_.Lock();
  }

1798
  bg_flush_scheduled_--;
1799 1800 1801
  if (madeProgress) {
    MaybeScheduleFlushOrCompaction();
  }
1802
  bg_cv_.SignalAll();
J
jorlow@chromium.org 已提交
1803 1804
}

1805

1806 1807 1808 1809
void DBImpl::TEST_PurgeObsoleteteWAL() {
  PurgeObsoleteWALFiles();
}

1810 1811
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
  MutexLock l(&mutex_);
1812
  return default_cfd_->current()->NumLevelBytes(0);
1813 1814
}

1815
void DBImpl::BackgroundCallCompaction() {
1816
  bool madeProgress = false;
I
Igor Canadi 已提交
1817
  DeletionState deletion_state(options_.max_write_buffer_number, true);
H
Haobo Xu 已提交
1818 1819 1820

  MaybeDumpStats();

J
jorlow@chromium.org 已提交
1821
  MutexLock l(&mutex_);
1822
  // Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self());
J
jorlow@chromium.org 已提交
1823
  assert(bg_compaction_scheduled_);
I
Igor Canadi 已提交
1824
  Status s;
H
hans@chromium.org 已提交
1825
  if (!shutting_down_.Acquire_Load()) {
I
Igor Canadi 已提交
1826
    s = BackgroundCompaction(&madeProgress, deletion_state);
1827 1828 1829 1830 1831 1832 1833 1834 1835
    if (!s.ok()) {
      // Wait a little bit before retrying background compaction in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed compactions for the duration of
      // the problem.
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      Log(options_.info_log, "Waiting after background compaction error: %s",
          s.ToString().c_str());
      mutex_.Unlock();
I
Igor Canadi 已提交
1836
      LogFlush(options_.info_log);
1837 1838 1839
      env_->SleepForMicroseconds(1000000);
      mutex_.Lock();
    }
J
jorlow@chromium.org 已提交
1840
  }
1841

I
Igor Canadi 已提交
1842 1843 1844 1845 1846
  // If !s.ok(), this means that Compaction failed. In that case, we want
  // to delete all obsolete files we might have created and we force
  // FindObsoleteFiles(). This is because deletion_state does not catch
  // all created files if compaction failed.
  FindObsoleteFiles(deletion_state, !s.ok());
1847

I
Igor Canadi 已提交
1848
  // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1849
  if (deletion_state.HaveSomethingToDelete()) {
D
Dhruba Borthakur 已提交
1850 1851
    mutex_.Unlock();
    PurgeObsoleteFiles(deletion_state);
1852
    mutex_.Lock();
D
Dhruba Borthakur 已提交
1853 1854
  }

1855
  bg_compaction_scheduled_--;
J
jorlow@chromium.org 已提交
1856

1857 1858
  MaybeScheduleLogDBDeployStats();

J
jorlow@chromium.org 已提交
1859
  // Previous compaction may have produced too many files in a level,
A
Abhishek Kona 已提交
1860
  // So reschedule another compaction if we made progress in the
1861 1862
  // last compaction.
  if (madeProgress) {
1863
    MaybeScheduleFlushOrCompaction();
1864
  }
H
hans@chromium.org 已提交
1865
  bg_cv_.SignalAll();
1866

J
jorlow@chromium.org 已提交
1867 1868
}

A
Abhishek Kona 已提交
1869
Status DBImpl::BackgroundCompaction(bool* madeProgress,
I
Igor Canadi 已提交
1870
                                    DeletionState& deletion_state) {
1871
  *madeProgress = false;
J
jorlow@chromium.org 已提交
1872
  mutex_.AssertHeld();
1873

1874 1875 1876 1877 1878 1879 1880
  bool is_manual = (manual_compaction_ != nullptr) &&
                   (manual_compaction_->in_progress == false);
  if (is_manual) {
    // another thread cannot pick up the same work
    manual_compaction_->in_progress = true;
  }

1881
  // TODO: remove memtable flush from formal compaction
1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    while (cfd->imm()->IsFlushPending()) {
      Log(options_.info_log,
          "BackgroundCompaction doing FlushMemTableToOutputFile with column "
          "family %d, compaction slots available %d",
          cfd->GetID(),
          options_.max_background_compactions - bg_compaction_scheduled_);
      Status stat =
          FlushMemTableToOutputFile(cfd, madeProgress, deletion_state);
      if (!stat.ok()) {
        if (is_manual) {
          manual_compaction_->status = stat;
          manual_compaction_->done = true;
          manual_compaction_->in_progress = false;
          manual_compaction_ = nullptr;
        }
        return stat;
1899
      }
1900
    }
1901 1902
  }

1903
  unique_ptr<Compaction> c;
1904 1905
  InternalKey manual_end_storage;
  InternalKey* manual_end = &manual_end_storage;
H
hans@chromium.org 已提交
1906
  if (is_manual) {
G
Gabor Cselle 已提交
1907
    ManualCompaction* m = manual_compaction_;
1908
    assert(m->in_progress);
I
Igor Canadi 已提交
1909 1910
    c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin,
                                 m->end, &manual_end));
1911
    if (!c) {
1912
      m->done = true;
G
Gabor Cselle 已提交
1913 1914
    }
    Log(options_.info_log,
1915 1916 1917 1918
        "Manual compaction from level-%d to level-%d from %s .. %s; will stop "
        "at %s\n",
        m->input_level,
        m->output_level,
G
Gabor Cselle 已提交
1919 1920
        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
        (m->end ? m->end->DebugString().c_str() : "(end)"),
1921 1922 1923
        ((m->done || manual_end == nullptr)
             ? "(end)"
             : manual_end->DebugString().c_str()));
1924
  } else if (!options_.disable_auto_compactions) {
I
Igor Canadi 已提交
1925 1926 1927
    for (auto cfd : *versions_->GetColumnFamilySet()) {
      c.reset(cfd->PickCompaction());
      if (c != nullptr) {
1928 1929 1930
        // update statistics
        MeasureTime(options_.statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION,
                    c->inputs(0)->size());
I
Igor Canadi 已提交
1931 1932 1933
        break;
      }
    }
J
jorlow@chromium.org 已提交
1934 1935 1936
  }

  Status status;
1937
  if (!c) {
H
hans@chromium.org 已提交
1938
    // Nothing to do
1939
    Log(options_.info_log, "Compaction nothing to do");
H
hans@chromium.org 已提交
1940
  } else if (!is_manual && c->IsTrivialMove()) {
J
jorlow@chromium.org 已提交
1941
    // Move file to next level
1942
    assert(c->num_input_files(0) == 1);
J
jorlow@chromium.org 已提交
1943 1944 1945
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
1946 1947
                       f->smallest, f->largest,
                       f->smallest_seqno, f->largest_seqno);
I
Igor Canadi 已提交
1948
    status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
1949
                                    db_directory_.get());
I
Igor Canadi 已提交
1950
    InstallSuperVersion(c->column_family_data(), deletion_state);
1951

1952
    Version::LevelSummaryStorage tmp;
1953
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
1954
        static_cast<unsigned long long>(f->number), c->level() + 1,
J
jorlow@chromium.org 已提交
1955
        static_cast<unsigned long long>(f->file_size),
I
Igor Canadi 已提交
1956 1957
        status.ToString().c_str(), c->input_version()->LevelSummary(&tmp));
    c->ReleaseCompactionFiles(status);
1958
    *madeProgress = true;
J
jorlow@chromium.org 已提交
1959
  } else {
1960
    MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
1961
    CompactionState* compact = new CompactionState(c.get());
I
Igor Canadi 已提交
1962
    status = DoCompactionWork(compact, deletion_state);
1963
    CleanupCompaction(compact, status);
I
Igor Canadi 已提交
1964
    c->ReleaseCompactionFiles(status);
1965
    c->ReleaseInputs();
1966
    *madeProgress = true;
J
jorlow@chromium.org 已提交
1967
  }
1968
  c.reset();
J
jorlow@chromium.org 已提交
1969 1970 1971 1972 1973 1974

  if (status.ok()) {
    // Done
  } else if (shutting_down_.Acquire_Load()) {
    // Ignore compaction errors found during shutting down
  } else {
1975
    Log(options_.info_log,
J
jorlow@chromium.org 已提交
1976 1977 1978 1979 1980
        "Compaction error: %s", status.ToString().c_str());
    if (options_.paranoid_checks && bg_error_.ok()) {
      bg_error_ = status;
    }
  }
H
hans@chromium.org 已提交
1981 1982

  if (is_manual) {
G
Gabor Cselle 已提交
1983
    ManualCompaction* m = manual_compaction_;
1984
    if (!status.ok()) {
L
Lei Jin 已提交
1985
      m->status = status;
1986 1987
      m->done = true;
    }
1988 1989 1990 1991 1992 1993 1994 1995 1996
    // For universal compaction:
    //   Because universal compaction always happens at level 0, so one
    //   compaction will pick up all overlapped files. No files will be
    //   filtered out due to size limit and left for a successive compaction.
    //   So we can safely conclude the current compaction.
    //
    //   Also note that, if we don't stop here, then the current compaction
    //   writes a new file back to level 0, which will be used in successive
    //   compaction. Hence the manual compaction will never finish.
1997 1998 1999 2000 2001
    //
    // Stop the compaction if manual_end points to nullptr -- this means
    // that we compacted the whole range. manual_end should always point
    // to nullptr in case of universal compaction
    if (manual_end == nullptr) {
2002 2003
      m->done = true;
    }
G
Gabor Cselle 已提交
2004 2005 2006
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
2007 2008 2009
      // Universal compaction should always compact the whole range
      assert(options_.compaction_style != kCompactionStyleUniversal);
      m->tmp_storage = *manual_end;
G
Gabor Cselle 已提交
2010 2011
      m->begin = &m->tmp_storage;
    }
2012
    m->in_progress = false; // not being processed anymore
2013
    manual_compaction_ = nullptr;
H
hans@chromium.org 已提交
2014
  }
2015
  return status;
J
jorlow@chromium.org 已提交
2016 2017
}

2018
void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
J
jorlow@chromium.org 已提交
2019
  mutex_.AssertHeld();
2020
  if (compact->builder != nullptr) {
J
jorlow@chromium.org 已提交
2021 2022
    // May happen if we get a shutdown call in the middle of compaction
    compact->builder->Abandon();
2023
    compact->builder.reset();
J
jorlow@chromium.org 已提交
2024
  } else {
2025
    assert(compact->outfile == nullptr);
J
jorlow@chromium.org 已提交
2026
  }
D
dgrogan@chromium.org 已提交
2027
  for (size_t i = 0; i < compact->outputs.size(); i++) {
J
jorlow@chromium.org 已提交
2028 2029
    const CompactionState::Output& out = compact->outputs[i];
    pending_outputs_.erase(out.number);
2030 2031 2032 2033

    // If this file was inserted into the table cache then remove
    // them here because this compaction was not committed.
    if (!status.ok()) {
I
Igor Canadi 已提交
2034
      TableCache::Evict(table_cache_.get(), out.number);
2035
    }
J
jorlow@chromium.org 已提交
2036 2037 2038 2039
  }
  delete compact;
}

2040
// Allocate the file numbers for the output file. We allocate as
2041
// many output file numbers as there are files in level+1 (at least one)
2042 2043 2044
// Insert them into pending_outputs so that they do not get deleted.
void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
  mutex_.AssertHeld();
2045 2046
  assert(compact != nullptr);
  assert(compact->builder == nullptr);
2047
  int filesNeeded = compact->compaction->num_input_files(1);
2048
  for (int i = 0; i < std::max(filesNeeded, 1); i++) {
2049 2050 2051 2052 2053 2054 2055 2056 2057
    uint64_t file_number = versions_->NewFileNumber();
    pending_outputs_.insert(file_number);
    compact->allocated_file_numbers.push_back(file_number);
  }
}

// Frees up unused file number.
void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
  mutex_.AssertHeld();
2058
  for (const auto file_number : compact->allocated_file_numbers) {
2059 2060 2061 2062 2063
    pending_outputs_.erase(file_number);
    // Log(options_.info_log, "XXX releasing unused file num %d", file_number);
  }
}

J
jorlow@chromium.org 已提交
2064
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
2065 2066
  assert(compact != nullptr);
  assert(compact->builder == nullptr);
J
jorlow@chromium.org 已提交
2067
  uint64_t file_number;
2068 2069 2070 2071 2072 2073 2074
  // If we have not yet exhausted the pre-allocated file numbers,
  // then use the one from the front. Otherwise, we have to acquire
  // the heavyweight lock and allocate a new file number.
  if (!compact->allocated_file_numbers.empty()) {
    file_number = compact->allocated_file_numbers.front();
    compact->allocated_file_numbers.pop_front();
  } else {
J
jorlow@chromium.org 已提交
2075 2076 2077 2078 2079
    mutex_.Lock();
    file_number = versions_->NewFileNumber();
    pending_outputs_.insert(file_number);
    mutex_.Unlock();
  }
2080 2081 2082 2083
  CompactionState::Output out;
  out.number = file_number;
  out.smallest.Clear();
  out.largest.Clear();
2084
  out.smallest_seqno = out.largest_seqno = 0;
2085
  compact->outputs.push_back(out);
J
jorlow@chromium.org 已提交
2086 2087 2088

  // Make the output file
  std::string fname = TableFileName(dbname_, file_number);
2089
  Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
2090

J
jorlow@chromium.org 已提交
2091
  if (s.ok()) {
2092 2093
    // Over-estimate slightly so we don't end up just barely crossing
    // the threshold.
I
Igor Canadi 已提交
2094
    ColumnFamilyData* cfd = compact->compaction->column_family_data();
2095
    compact->outfile->SetPreallocationBlockSize(
I
Igor Canadi 已提交
2096
        1.1 * cfd->compaction_picker()->MaxFileSizeForLevel(
2097
                  compact->compaction->output_level()));
2098

S
Siying Dong 已提交
2099 2100 2101 2102
    CompressionType compression_type = GetCompressionType(
        options_, compact->compaction->output_level(),
        compact->compaction->enable_compression());

S
Siying Dong 已提交
2103
    compact->builder.reset(
S
Siying Dong 已提交
2104
        GetTableBuilder(options_, compact->outfile.get(), compression_type));
J
jorlow@chromium.org 已提交
2105
  }
I
Igor Canadi 已提交
2106
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
2107 2108 2109 2110 2111
  return s;
}

Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
                                          Iterator* input) {
2112
  assert(compact != nullptr);
2113
  assert(compact->outfile);
2114
  assert(compact->builder != nullptr);
J
jorlow@chromium.org 已提交
2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129

  const uint64_t output_number = compact->current_output()->number;
  assert(output_number != 0);

  // Check for iterator errors
  Status s = input->status();
  const uint64_t current_entries = compact->builder->NumEntries();
  if (s.ok()) {
    s = compact->builder->Finish();
  } else {
    compact->builder->Abandon();
  }
  const uint64_t current_bytes = compact->builder->FileSize();
  compact->current_output()->file_size = current_bytes;
  compact->total_bytes += current_bytes;
2130
  compact->builder.reset();
J
jorlow@chromium.org 已提交
2131 2132

  // Finish and check for file errors
2133
  if (s.ok() && !options_.disableDataSync) {
2134
    if (options_.use_fsync) {
2135
      StopWatch sw(env_, options_.statistics.get(),
2136
                   COMPACTION_OUTFILE_SYNC_MICROS, false);
2137 2138
      s = compact->outfile->Fsync();
    } else {
2139
      StopWatch sw(env_, options_.statistics.get(),
2140
                   COMPACTION_OUTFILE_SYNC_MICROS, false);
2141 2142
      s = compact->outfile->Sync();
    }
J
jorlow@chromium.org 已提交
2143 2144 2145 2146
  }
  if (s.ok()) {
    s = compact->outfile->Close();
  }
2147
  compact->outfile.reset();
J
jorlow@chromium.org 已提交
2148 2149 2150

  if (s.ok() && current_entries > 0) {
    // Verify that the table is usable
I
Igor Canadi 已提交
2151 2152 2153
    ColumnFamilyData* cfd = compact->compaction->column_family_data();
    Iterator* iter = cfd->table_cache()->NewIterator(
        ReadOptions(), storage_options_, output_number, current_bytes);
J
jorlow@chromium.org 已提交
2154 2155 2156
    s = iter->status();
    delete iter;
    if (s.ok()) {
2157
      Log(options_.info_log,
K
Kai Liu 已提交
2158 2159 2160 2161
          "Generated table #%lu: %lu keys, %lu bytes",
          (unsigned long) output_number,
          (unsigned long) current_entries,
          (unsigned long) current_bytes);
J
jorlow@chromium.org 已提交
2162 2163 2164 2165 2166 2167 2168 2169
    }
  }
  return s;
}


Status DBImpl::InstallCompactionResults(CompactionState* compact) {
  mutex_.AssertHeld();
2170 2171 2172 2173 2174

  // paranoia: verify that the files that we started with
  // still exist in the current version and in the same original level.
  // This ensures that a concurrent compaction did not erroneously
  // pick the same files to compact.
2175
  if (!versions_->VerifyCompactionFileConsistency(compact->compaction)) {
2176 2177 2178 2179 2180 2181 2182 2183
    Log(options_.info_log,  "Compaction %d@%d + %d@%d files aborted",
      compact->compaction->num_input_files(0),
      compact->compaction->level(),
      compact->compaction->num_input_files(1),
      compact->compaction->level() + 1);
    return Status::IOError("Compaction input files inconsistent");
  }

2184
  Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
J
jorlow@chromium.org 已提交
2185 2186 2187 2188 2189 2190 2191 2192
      compact->compaction->num_input_files(0),
      compact->compaction->level(),
      compact->compaction->num_input_files(1),
      compact->compaction->level() + 1,
      static_cast<long long>(compact->total_bytes));

  // Add compaction outputs
  compact->compaction->AddInputDeletions(compact->compaction->edit());
D
dgrogan@chromium.org 已提交
2193
  for (size_t i = 0; i < compact->outputs.size(); i++) {
J
jorlow@chromium.org 已提交
2194 2195
    const CompactionState::Output& out = compact->outputs[i];
    compact->compaction->edit()->AddFile(
2196 2197
        compact->compaction->output_level(), out.number, out.file_size,
        out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
J
jorlow@chromium.org 已提交
2198
  }
I
Igor Canadi 已提交
2199 2200 2201
  return versions_->LogAndApply(compact->compaction->column_family_data(),
                                compact->compaction->edit(), &mutex_,
                                db_directory_.get());
J
jorlow@chromium.org 已提交
2202 2203
}

2204 2205 2206 2207 2208 2209 2210 2211
//
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
2212 2213
  SequenceNumber in, std::vector<SequenceNumber>& snapshots,
  SequenceNumber* prev_snapshot) {
2214
  SequenceNumber prev __attribute__((unused)) = 0;
2215 2216 2217
  for (const auto cur : snapshots) {
    assert(prev <= cur);
    if (cur >= in) {
2218
      *prev_snapshot = prev;
2219
      return cur;
2220
    }
2221 2222
    prev = cur; // assignment
    assert(prev);
2223 2224
  }
  Log(options_.info_log,
K
Kai Liu 已提交
2225 2226 2227
      "Looking for seqid %lu but maxseqid is %lu",
      (unsigned long)in,
      (unsigned long)snapshots[snapshots.size()-1]);
2228 2229 2230 2231
  assert(0);
  return 0;
}

I
Igor Canadi 已提交
2232 2233
Status DBImpl::DoCompactionWork(CompactionState* compact,
                                DeletionState& deletion_state) {
2234
  assert(compact);
2235
  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
I
Igor Canadi 已提交
2236
  ColumnFamilyData* cfd = compact->compaction->column_family_data();
A
Abhishek Kona 已提交
2237
  Log(options_.info_log,
I
Igor Canadi 已提交
2238 2239 2240 2241
      "[CF %u] Compacting %d@%d + %d@%d files, score %.2f slots available %d",
      cfd->GetID(), compact->compaction->num_input_files(0),
      compact->compaction->level(), compact->compaction->num_input_files(1),
      compact->compaction->output_level(), compact->compaction->score(),
2242
      options_.max_background_compactions - bg_compaction_scheduled_);
2243 2244
  char scratch[256];
  compact->compaction->Summary(scratch, sizeof(scratch));
H
heyongqiang 已提交
2245
  Log(options_.info_log, "Compaction start summary: %s\n", scratch);
J
jorlow@chromium.org 已提交
2246

2247 2248
  assert(compact->compaction->input_version()->NumLevelFiles(
             compact->compaction->level()) > 0);
2249
  assert(compact->builder == nullptr);
2250
  assert(!compact->outfile);
2251 2252 2253

  SequenceNumber visible_at_tip = 0;
  SequenceNumber earliest_snapshot;
H
Haobo Xu 已提交
2254
  SequenceNumber latest_snapshot = 0;
2255 2256 2257 2258 2259
  snapshots_.getAll(compact->existing_snapshots);
  if (compact->existing_snapshots.size() == 0) {
    // optimize for fast path if there are no snapshots
    visible_at_tip = versions_->LastSequence();
    earliest_snapshot = visible_at_tip;
J
jorlow@chromium.org 已提交
2260
  } else {
H
Haobo Xu 已提交
2261
    latest_snapshot = compact->existing_snapshots.back();
2262 2263 2264 2265
    // Add the current seqno as the 'latest' virtual
    // snapshot to the end of this list.
    compact->existing_snapshots.push_back(versions_->LastSequence());
    earliest_snapshot = compact->existing_snapshots[0];
J
jorlow@chromium.org 已提交
2266 2267
  }

2268
  // Is this compaction producing files at the bottommost level?
2269
  bool bottommost_level = compact->compaction->BottomMostLevel();
2270

2271 2272 2273
  // Allocate the output file numbers before we release the lock
  AllocateCompactionOutputFileNumbers(compact);

J
jorlow@chromium.org 已提交
2274 2275 2276
  // Release mutex while we're actually doing the compaction work
  mutex_.Unlock();

2277
  const uint64_t start_micros = env_->NowMicros();
2278
  unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
J
jorlow@chromium.org 已提交
2279 2280 2281 2282 2283
  input->SeekToFirst();
  Status status;
  ParsedInternalKey ikey;
  std::string current_user_key;
  bool has_current_user_key = false;
2284 2285
  SequenceNumber last_sequence_for_key __attribute__((unused)) =
    kMaxSequenceNumber;
2286
  SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
H
Haobo Xu 已提交
2287
  std::string compaction_filter_value;
H
Haobo Xu 已提交
2288
  std::vector<char> delete_key; // for compaction filter
2289
  MergeHelper merge(cfd->user_comparator(), options_.merge_operator.get(),
2290 2291
                    options_.info_log.get(),
                    false /* internal key corruption is expected */);
I
Igor Canadi 已提交
2292
  auto compaction_filter = cfd->options()->compaction_filter;
2293 2294
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (!compaction_filter) {
2295 2296 2297
    auto context = compact->GetFilterContext();
    compaction_filter_from_factory =
      options_.compaction_filter_factory->CreateCompactionFilter(context);
2298 2299
    compaction_filter = compaction_filter_from_factory.get();
  }
2300

J
jorlow@chromium.org 已提交
2301
  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
2302
    // Prioritize immutable compaction work
2303
    // TODO: remove memtable flush from normal compaction work
I
Igor Canadi 已提交
2304
    if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
2305
      const uint64_t imm_start = env_->NowMicros();
I
Igor Canadi 已提交
2306
      LogFlush(options_.info_log);
2307
      mutex_.Lock();
I
Igor Canadi 已提交
2308 2309
      if (cfd->imm()->IsFlushPending()) {
        FlushMemTableToOutputFile(cfd, nullptr, deletion_state);
H
hans@chromium.org 已提交
2310
        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
2311 2312 2313 2314 2315
      }
      mutex_.Unlock();
      imm_micros += (env_->NowMicros() - imm_start);
    }

J
jorlow@chromium.org 已提交
2316
    Slice key = input->key();
2317
    Slice value = input->value();
H
Haobo Xu 已提交
2318

2319
    if (compact->compaction->ShouldStopBefore(key) &&
2320
        compact->builder != nullptr) {
2321
      status = FinishCompactionOutputFile(compact, input.get());
2322 2323 2324 2325 2326 2327
      if (!status.ok()) {
        break;
      }
    }

    // Handle key/value, add to state, etc.
J
jorlow@chromium.org 已提交
2328
    bool drop = false;
2329
    bool current_entry_is_merging = false;
J
jorlow@chromium.org 已提交
2330 2331
    if (!ParseInternalKey(key, &ikey)) {
      // Do not hide error keys
2332 2333
      // TODO: error key stays in db forever? Figure out the intention/rationale
      // v10 error v8 : we cannot hide v8 even though it's pretty obvious.
J
jorlow@chromium.org 已提交
2334 2335 2336
      current_user_key.clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
2337
      visible_in_snapshot = kMaxSequenceNumber;
J
jorlow@chromium.org 已提交
2338 2339
    } else {
      if (!has_current_user_key ||
2340 2341
          cfd->user_comparator()->Compare(ikey.user_key,
                                          Slice(current_user_key)) != 0) {
J
jorlow@chromium.org 已提交
2342 2343 2344 2345
        // First occurrence of this user key
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
2346
        visible_in_snapshot = kMaxSequenceNumber;
H
Haobo Xu 已提交
2347 2348

        // apply the compaction filter to the first occurrence of the user key
2349
        if (compaction_filter &&
H
Haobo Xu 已提交
2350 2351 2352 2353 2354 2355 2356 2357 2358
            ikey.type == kTypeValue &&
            (visible_at_tip || ikey.sequence > latest_snapshot)) {
          // If the user has specified a compaction filter and the sequence
          // number is greater than any external snapshot, then invoke the
          // filter.
          // If the return value of the compaction filter is true, replace
          // the entry with a delete marker.
          bool value_changed = false;
          compaction_filter_value.clear();
I
Igor Canadi 已提交
2359 2360 2361
          bool to_delete = compaction_filter->Filter(
              compact->compaction->level(), ikey.user_key, value,
              &compaction_filter_value, &value_changed);
H
Haobo Xu 已提交
2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373
          if (to_delete) {
            // make a copy of the original key
            delete_key.assign(key.data(), key.data() + key.size());
            // convert it to a delete
            UpdateInternalKey(&delete_key[0], delete_key.size(),
                              ikey.sequence, kTypeDeletion);
            // anchor the key again
            key = Slice(&delete_key[0], delete_key.size());
            // needed because ikey is backed by key
            ParseInternalKey(key, &ikey);
            // no value associated with delete
            value.clear();
2374
            RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER);
H
Haobo Xu 已提交
2375 2376 2377 2378
          } else if (value_changed) {
            value = compaction_filter_value;
          }
        }
J
jorlow@chromium.org 已提交
2379 2380
      }

2381 2382 2383
      // If there are no snapshots, then this kv affect visibility at tip.
      // Otherwise, search though all existing snapshots to find
      // the earlist snapshot that is affected by this kv.
2384 2385 2386 2387 2388 2389
      SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
      SequenceNumber visible = visible_at_tip ?
        visible_at_tip :
        findEarliestVisibleSnapshot(ikey.sequence,
                                    compact->existing_snapshots,
                                    &prev_snapshot);
2390 2391 2392 2393 2394

      if (visible_in_snapshot == visible) {
        // If the earliest snapshot is which this key is visible in
        // is the same as the visibily of a previous instance of the
        // same key, then this kv is not visible in any snapshot.
J
jorlow@chromium.org 已提交
2395
        // Hidden by an newer entry for same user key
2396
        // TODO: why not > ?
2397
        assert(last_sequence_for_key >= ikey.sequence);
J
jorlow@chromium.org 已提交
2398
        drop = true;    // (A)
2399
        RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY);
J
jorlow@chromium.org 已提交
2400
      } else if (ikey.type == kTypeDeletion &&
2401
                 ikey.sequence <= earliest_snapshot &&
J
jorlow@chromium.org 已提交
2402 2403 2404 2405 2406 2407 2408 2409 2410
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
2411
        RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_OBSOLETE);
2412 2413 2414 2415 2416 2417 2418
      } else if (ikey.type == kTypeMerge) {
        // We know the merge type entry is not hidden, otherwise we would
        // have hit (A)
        // We encapsulate the merge related state machine in a different
        // object to minimize change to the existing flow. Turn out this
        // logic could also be nicely re-used for memtable flush purge
        // optimization in BuildTable.
M
Mayank Agarwal 已提交
2419
        merge.MergeUntil(input.get(), prev_snapshot, bottommost_level,
2420
                         options_.statistics.get());
2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438
        current_entry_is_merging = true;
        if (merge.IsSuccess()) {
          // Successfully found Put/Delete/(end-of-key-range) while merging
          // Get the merge result
          key = merge.key();
          ParseInternalKey(key, &ikey);
          value = merge.value();
        } else {
          // Did not find a Put/Delete/(end-of-key-range) while merging
          // We now have some stack of merge operands to write out.
          // NOTE: key,value, and ikey are now referring to old entries.
          //       These will be correctly set below.
          assert(!merge.keys().empty());
          assert(merge.keys().size() == merge.values().size());

          // Hack to make sure last_sequence_for_key is correct
          ParseInternalKey(merge.keys().front(), &ikey);
        }
J
jorlow@chromium.org 已提交
2439 2440 2441
      }

      last_sequence_for_key = ikey.sequence;
2442
      visible_in_snapshot = visible;
J
jorlow@chromium.org 已提交
2443 2444
    }
#if 0
2445
    Log(options_.info_log,
J
jorlow@chromium.org 已提交
2446
        "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
2447
        "%d smallest_snapshot: %d level: %d bottommost %d",
J
jorlow@chromium.org 已提交
2448
        ikey.user_key.ToString().c_str(),
D
dgrogan@chromium.org 已提交
2449
        (int)ikey.sequence, ikey.type, kTypeValue, drop,
J
jorlow@chromium.org 已提交
2450
        compact->compaction->IsBaseLevelForKey(ikey.user_key),
2451 2452
        (int)last_sequence_for_key, (int)earliest_snapshot,
        compact->compaction->level(), bottommost_level);
J
jorlow@chromium.org 已提交
2453 2454 2455
#endif

    if (!drop) {
2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471
      // We may write a single key (e.g.: for Put/Delete or successful merge).
      // Or we may instead have to write a sequence/list of keys.
      // We have to write a sequence iff we have an unsuccessful merge
      bool has_merge_list = current_entry_is_merging && !merge.IsSuccess();
      const std::deque<std::string>* keys = nullptr;
      const std::deque<std::string>* values = nullptr;
      std::deque<std::string>::const_reverse_iterator key_iter;
      std::deque<std::string>::const_reverse_iterator value_iter;
      if (has_merge_list) {
        keys = &merge.keys();
        values = &merge.values();
        key_iter = keys->rbegin();    // The back (*rbegin()) is the first key
        value_iter = values->rbegin();

        key = Slice(*key_iter);
        value = Slice(*value_iter);
2472
      }
2473

2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484
      // If we have a list of keys to write, traverse the list.
      // If we have a single key to write, simply write that key.
      while (true) {
        // Invariant: key,value,ikey will always be the next entry to write
        char* kptr = (char*)key.data();
        std::string kstr;

        // Zeroing out the sequence number leads to better compression.
        // If this is the bottommost level (no files in lower levels)
        // and the earliest snapshot is larger than this seqno
        // then we can squash the seqno to zero.
2485
        if (bottommost_level && ikey.sequence < earliest_snapshot &&
2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496
            ikey.type != kTypeMerge) {
          assert(ikey.type != kTypeDeletion);
          // make a copy because updating in place would cause problems
          // with the priority queue that is managing the input key iterator
          kstr.assign(key.data(), key.size());
          kptr = (char *)kstr.c_str();
          UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type);
        }

        Slice newkey(kptr, key.size());
        assert((key.clear(), 1)); // we do not need 'key' anymore
2497

2498 2499 2500 2501 2502 2503 2504
        // Open output file if necessary
        if (compact->builder == nullptr) {
          status = OpenCompactionOutputFile(compact);
          if (!status.ok()) {
            break;
          }
        }
2505 2506

        SequenceNumber seqno = GetInternalKeySeqno(newkey);
2507 2508
        if (compact->builder->NumEntries() == 0) {
          compact->current_output()->smallest.DecodeFrom(newkey);
2509 2510 2511 2512
          compact->current_output()->smallest_seqno = seqno;
        } else {
          compact->current_output()->smallest_seqno =
            std::min(compact->current_output()->smallest_seqno, seqno);
2513 2514 2515
        }
        compact->current_output()->largest.DecodeFrom(newkey);
        compact->builder->Add(newkey, value);
2516 2517
        compact->current_output()->largest_seqno =
          std::max(compact->current_output()->largest_seqno, seqno);
2518 2519 2520 2521 2522 2523 2524 2525

        // Close output file if it is big enough
        if (compact->builder->FileSize() >=
            compact->compaction->MaxOutputFileSize()) {
          status = FinishCompactionOutputFile(compact, input.get());
          if (!status.ok()) {
            break;
          }
J
jorlow@chromium.org 已提交
2526 2527
        }

2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547
        // If we have a list of entries, move to next element
        // If we only had one entry, then break the loop.
        if (has_merge_list) {
          ++key_iter;
          ++value_iter;

          // If at end of list
          if (key_iter == keys->rend() || value_iter == values->rend()) {
            // Sanity Check: if one ends, then both end
            assert(key_iter == keys->rend() && value_iter == values->rend());
            break;
          }

          // Otherwise not at end of list. Update key, value, and ikey.
          key = Slice(*key_iter);
          value = Slice(*value_iter);
          ParseInternalKey(key, &ikey);

        } else{
          // Only had one item to begin with (Put/Delete)
J
jorlow@chromium.org 已提交
2548 2549 2550 2551 2552
          break;
        }
      }
    }

2553
    // MergeUntil has moved input to the next entry
2554
    if (!current_entry_is_merging) {
2555 2556
      input->Next();
    }
J
jorlow@chromium.org 已提交
2557 2558 2559
  }

  if (status.ok() && shutting_down_.Acquire_Load()) {
2560
    status = Status::IOError("Database shutdown started during compaction");
J
jorlow@chromium.org 已提交
2561
  }
2562
  if (status.ok() && compact->builder != nullptr) {
2563
    status = FinishCompactionOutputFile(compact, input.get());
J
jorlow@chromium.org 已提交
2564 2565 2566 2567
  }
  if (status.ok()) {
    status = input->status();
  }
2568
  input.reset();
J
jorlow@chromium.org 已提交
2569

2570 2571 2572
  if (!options_.disableDataSync) {
    db_directory_->Fsync();
  }
I
Igor Canadi 已提交
2573 2574

  InternalStats::CompactionStats stats;
2575
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
I
Igor Canadi 已提交
2576
  MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
M
Mark Callaghan 已提交
2577 2578
  stats.files_in_leveln = compact->compaction->num_input_files(0);
  stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
2579 2580

  int num_output_files = compact->outputs.size();
2581
  if (compact->builder != nullptr) {
P
Pascal Borreli 已提交
2582
    // An error occurred so ignore the last output.
2583 2584 2585 2586
    assert(num_output_files > 0);
    --num_output_files;
  }
  stats.files_out_levelnp1 = num_output_files;
M
Mark Callaghan 已提交
2587

2588
  for (int i = 0; i < compact->compaction->num_input_files(0); i++) {
M
Mark Callaghan 已提交
2589
    stats.bytes_readn += compact->compaction->input(0, i)->file_size;
2590 2591 2592
    RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
               compact->compaction->input(0, i)->file_size);
  }
M
Mark Callaghan 已提交
2593

2594
  for (int i = 0; i < compact->compaction->num_input_files(1); i++) {
M
Mark Callaghan 已提交
2595
    stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size;
2596 2597 2598
    RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
               compact->compaction->input(1, i)->file_size);
  }
M
Mark Callaghan 已提交
2599

2600
  for (int i = 0; i < num_output_files; i++) {
2601
    stats.bytes_written += compact->outputs[i].file_size;
2602 2603
    RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES,
               compact->outputs[i].file_size);
2604 2605
  }

I
Igor Canadi 已提交
2606
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
2607
  mutex_.Lock();
2608 2609
  cfd->internal_stats()->AddCompactionStats(compact->compaction->output_level(),
                                            stats);
J
jorlow@chromium.org 已提交
2610

2611 2612 2613 2614
  // if there were any unused file number (mostly in case of
  // compaction error), free up the entry from pending_putputs
  ReleaseCompactionUnusedFileNumbers(compact);

J
jorlow@chromium.org 已提交
2615 2616
  if (status.ok()) {
    status = InstallCompactionResults(compact);
I
Igor Canadi 已提交
2617
    InstallSuperVersion(cfd, deletion_state);
J
jorlow@chromium.org 已提交
2618
  }
2619
  Version::LevelSummaryStorage tmp;
2620
  Log(options_.info_log,
M
Mark Callaghan 已提交
2621
      "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
2622 2623
      "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
      "write-amplify(%.1f) %s\n",
2624
      compact->compaction->input_version()->LevelSummary(&tmp),
M
Mark Callaghan 已提交
2625
      (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
2626 2627 2628 2629
          (double)stats.micros,
      compact->compaction->output_level(), stats.files_in_leveln,
      stats.files_in_levelnp1, stats.files_out_levelnp1,
      stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
M
Mark Callaghan 已提交
2630
      stats.bytes_written / 1048576.0,
2631
      (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
2632 2633
          (double)stats.bytes_readn,
      stats.bytes_written / (double)stats.bytes_readn,
2634
      status.ToString().c_str());
M
Mark Callaghan 已提交
2635

J
jorlow@chromium.org 已提交
2636 2637 2638
  return status;
}

2639 2640
namespace {
struct IterState {
2641 2642
  IterState(DBImpl* db, port::Mutex* mu, SuperVersion* super_version)
      : db(db), mu(mu), super_version(super_version) {}
2643 2644

  DBImpl* db;
2645
  port::Mutex* mu;
2646
  SuperVersion* super_version;
2647 2648 2649 2650
};

static void CleanupIteratorState(void* arg1, void* arg2) {
  IterState* state = reinterpret_cast<IterState*>(arg1);
2651 2652
  DBImpl::DeletionState deletion_state(state->db->GetOptions().
                                       max_write_buffer_number);
2653 2654 2655 2656 2657 2658 2659 2660 2661 2662

  bool need_cleanup = state->super_version->Unref();
  if (need_cleanup) {
    state->mu->Lock();
    state->super_version->Cleanup();
    state->db->FindObsoleteFiles(deletion_state, false, true);
    state->mu->Unlock();

    delete state->super_version;
    state->db->PurgeObsoleteFiles(deletion_state);
I
Igor Canadi 已提交
2663
  }
T
Tomislav Novak 已提交
2664

2665 2666
  delete state;
}
H
Hans Wennborg 已提交
2667
}  // namespace
2668

J
jorlow@chromium.org 已提交
2669
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
2670 2671
                                      ColumnFamilyData* cfd,
                                      SuperVersion* super_version) {
I
Igor Canadi 已提交
2672
  std::vector<Iterator*> iterator_list;
2673 2674
  // Collect iterator for mutable mem
  iterator_list.push_back(super_version->mem->NewIterator(options));
T
Tomislav Novak 已提交
2675
  // Collect all needed child iterators for immutable memtables
2676
  super_version->imm->AddIterators(options, &iterator_list);
T
Tomislav Novak 已提交
2677
  // Collect iterators for files in L0 - Ln
2678 2679
  super_version->current->AddIterators(options, storage_options_,
                                       &iterator_list);
2680 2681
  Iterator* internal_iter = NewMergingIterator(
      &cfd->internal_comparator(), &iterator_list[0], iterator_list.size());
2682 2683

  IterState* cleanup = new IterState(this, &mutex_, super_version);
2684
  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
J
jorlow@chromium.org 已提交
2685 2686 2687 2688 2689

  return internal_iter;
}

Iterator* DBImpl::TEST_NewInternalIterator() {
2690 2691 2692 2693
  mutex_.Lock();
  SuperVersion* super_version = default_cfd_->GetSuperVersion()->Ref();
  mutex_.Unlock();
  return NewInternalIterator(ReadOptions(), default_cfd_, super_version);
J
jorlow@chromium.org 已提交
2694 2695
}

T
Tomislav Novak 已提交
2696
std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
2697
    const ReadOptions& options, ColumnFamilyData* cfd,
T
Tomislav Novak 已提交
2698 2699 2700
    uint64_t* superversion_number) {

  mutex_.Lock();
2701
  SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
T
Tomislav Novak 已提交
2702
  if (superversion_number != nullptr) {
2703
    *superversion_number = cfd->GetSuperVersionNumber();
T
Tomislav Novak 已提交
2704 2705 2706
  }
  mutex_.Unlock();

2707
  Iterator* mutable_iter = super_version->mem->NewIterator(options);
T
Tomislav Novak 已提交
2708
  // create a DBIter that only uses memtable content; see NewIterator()
2709
  mutable_iter = NewDBIterator(&dbname_, env_, options_, cfd->user_comparator(),
T
Tomislav Novak 已提交
2710 2711 2712
                               mutable_iter, kMaxSequenceNumber);

  std::vector<Iterator*> list;
2713 2714
  super_version->imm->AddIterators(options, &list);
  super_version->current->AddIterators(options, storage_options_, &list);
2715 2716
  Iterator* immutable_iter =
      NewMergingIterator(&cfd->internal_comparator(), &list[0], list.size());
T
Tomislav Novak 已提交
2717 2718

  // create a DBIter that only uses memtable content; see NewIterator()
2719 2720 2721
  immutable_iter =
      NewDBIterator(&dbname_, env_, options_, cfd->user_comparator(),
                    immutable_iter, kMaxSequenceNumber);
T
Tomislav Novak 已提交
2722

2723 2724 2725 2726 2727 2728 2729 2730
  // register cleanups
  mutable_iter->RegisterCleanup(CleanupIteratorState,
    new IterState(this, &mutex_, super_version), nullptr);

  // bump the ref one more time since it will be Unref'ed twice
  immutable_iter->RegisterCleanup(CleanupIteratorState,
    new IterState(this, &mutex_, super_version->Ref()), nullptr);

T
Tomislav Novak 已提交
2731 2732 2733
  return std::make_pair(mutable_iter, immutable_iter);
}

J
jorlow@chromium.org 已提交
2734
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
2735
  MutexLock l(&mutex_);
2736
  return default_cfd_->current()->MaxNextLevelOverlappingBytes();
2737 2738
}

J
jorlow@chromium.org 已提交
2739
Status DBImpl::Get(const ReadOptions& options,
2740
                   const ColumnFamilyHandle& column_family, const Slice& key,
J
jorlow@chromium.org 已提交
2741
                   std::string* value) {
2742
  return GetImpl(options, column_family, key, value);
2743 2744
}

I
Igor Canadi 已提交
2745 2746 2747 2748 2749 2750 2751 2752 2753 2754
// DeletionState gets created and destructed outside of the lock -- we
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete one SuperVersion() outside of the lock -- superversion_to_free
//
// However, if InstallSuperVersion() gets called twice with the same,
// deletion_state, we can't reuse the SuperVersion() that got malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
2755 2756
void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
                                 DeletionState& deletion_state) {
2757
  mutex_.AssertHeld();
I
Igor Canadi 已提交
2758 2759 2760 2761
  // if new_superversion == nullptr, it means somebody already used it
  SuperVersion* new_superversion =
    (deletion_state.new_superversion != nullptr) ?
    deletion_state.new_superversion : new SuperVersion();
2762
  SuperVersion* old_superversion = cfd->InstallSuperVersion(new_superversion);
I
Igor Canadi 已提交
2763 2764 2765 2766 2767 2768 2769 2770 2771
  deletion_state.new_superversion = nullptr;
  if (deletion_state.superversion_to_free != nullptr) {
    // somebody already put it there
    delete old_superversion;
  } else {
    deletion_state.superversion_to_free = old_superversion;
  }
}

2772
Status DBImpl::GetImpl(const ReadOptions& options,
2773 2774
                       const ColumnFamilyHandle& column_family,
                       const Slice& key, std::string* value,
2775
                       bool* value_found) {
2776
  StopWatch sw(env_, options_.statistics.get(), DB_GET, false);
2777 2778 2779

  mutex_.Lock();
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
I
Igor Canadi 已提交
2780
  // this is asserting because client calling DB methods with undefined
2781 2782
  // ColumnFamilyHandle is undefined behavior.
  assert(cfd != nullptr);
2783
  SuperVersion* get_version = cfd->GetSuperVersion()->Ref();
2784 2785
  mutex_.Unlock();

2786
  SequenceNumber snapshot;
2787
  if (options.snapshot != nullptr) {
2788 2789 2790
    snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
  } else {
    snapshot = versions_->LastSequence();
J
jorlow@chromium.org 已提交
2791
  }
2792

2793
  bool have_stat_update = false;
2794
  Version::GetStats stats;
2795

2796
  // Prepare to store a list of merge operations if merge occurs.
2797
  MergeContext merge_context;
2798

2799
  Status s;
2800
  // First look in the memtable, then in the immutable memtable (if any).
2801
  // s is both in/out. When in, s could either be OK or MergeInProgress.
2802
  // merge_operands will contain the sequence of merges in the latter case.
2803
  LookupKey lkey(key, snapshot);
I
Igor Canadi 已提交
2804
  if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
2805
    // Done
2806
    RecordTick(options_.statistics.get(), MEMTABLE_HIT);
I
Igor Canadi 已提交
2807
  } else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) {
2808
    // Done
2809
    RecordTick(options_.statistics.get(), MEMTABLE_HIT);
2810
  } else {
I
Igor Canadi 已提交
2811 2812
    get_version->current->Get(options, lkey, value, &s, &merge_context, &stats,
                              options_, value_found);
2813
    have_stat_update = true;
2814
    RecordTick(options_.statistics.get(), MEMTABLE_MISS);
2815
  }
2816

I
Igor Canadi 已提交
2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837
  bool delete_get_version = false;
  if (!options_.disable_seek_compaction && have_stat_update) {
    mutex_.Lock();
    if (get_version->current->UpdateStats(stats)) {
      MaybeScheduleFlushOrCompaction();
    }
    if (get_version->Unref()) {
      get_version->Cleanup();
      delete_get_version = true;
    }
    mutex_.Unlock();
  } else {
    if (get_version->Unref()) {
      mutex_.Lock();
      get_version->Cleanup();
      mutex_.Unlock();
      delete_get_version = true;
    }
  }
  if (delete_get_version) {
    delete get_version;
2838
  }
2839

2840
  // Note, tickers are atomic now - no lock protection needed any more.
2841 2842
  RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
  RecordTick(options_.statistics.get(), BYTES_READ, value->size());
2843
  return s;
J
jorlow@chromium.org 已提交
2844 2845
}

2846 2847 2848 2849
std::vector<Status> DBImpl::MultiGet(
    const ReadOptions& options,
    const std::vector<ColumnFamilyHandle>& column_family,
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
2850

2851
  StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false);
2852
  SequenceNumber snapshot;
2853

2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866
  struct MultiGetColumnFamilyData {
    SuperVersion* super_version;
    Version::GetStats stats;
    bool have_stat_update = false;
  };
  std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data;
  // fill up and allocate outside of mutex
  for (auto cf : column_family) {
    if (multiget_cf_data.find(cf.id) == multiget_cf_data.end()) {
      multiget_cf_data.insert({cf.id, new MultiGetColumnFamilyData()});
    }
  }

2867
  mutex_.Lock();
2868 2869 2870 2871 2872
  if (options.snapshot != nullptr) {
    snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
  } else {
    snapshot = versions_->LastSequence();
  }
2873 2874 2875 2876 2877
  for (auto mgd_iter : multiget_cf_data) {
    auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(mgd_iter.first);
    assert(cfd != nullptr);
    mgd_iter.second->super_version = cfd->GetSuperVersion()->Ref();
  }
2878
  mutex_.Unlock();
2879

2880 2881
  // Contain a list of merge operations if merge occurs.
  MergeContext merge_context;
2882

2883
  // Note: this always resizes the values array
2884 2885 2886
  size_t num_keys = keys.size();
  std::vector<Status> stat_list(num_keys);
  values->resize(num_keys);
2887 2888

  // Keep track of bytes that we read for statistics-recording later
2889
  uint64_t bytes_read = 0;
2890 2891 2892 2893

  // For each of the given keys, apply the entire "get" process as follows:
  // First look in the memtable, then in the immutable memtable (if any).
  // s is both in/out. When in, s could either be OK or MergeInProgress.
2894
  // merge_operands will contain the sequence of merges in the latter case.
2895
  for (size_t i = 0; i < num_keys; ++i) {
2896
    merge_context.Clear();
2897
    Status& s = stat_list[i];
2898 2899 2900
    std::string* value = &(*values)[i];

    LookupKey lkey(keys[i], snapshot);
2901 2902 2903 2904 2905
    auto mgd_iter = multiget_cf_data.find(column_family[i].id);
    assert(mgd_iter != multiget_cf_data.end());
    auto mgd = mgd_iter->second;
    auto super_version = mgd->super_version;
    if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) {
2906
      // Done
2907 2908
    } else if (super_version->imm->Get(lkey, value, &s, merge_context,
                                       options_)) {
2909 2910
      // Done
    } else {
2911 2912 2913
      super_version->current->Get(options, lkey, value, &s, &merge_context,
                                  &mgd->stats, options_);
      mgd->have_stat_update = true;
2914 2915 2916
    }

    if (s.ok()) {
2917
      bytes_read += value->size();
2918 2919 2920
    }
  }

2921 2922 2923 2924 2925 2926 2927 2928 2929 2930
  autovector<SuperVersion*> superversions_to_delete;

  bool schedule_flush_or_compaction = false;
  mutex_.Lock();
  for (auto mgd_iter : multiget_cf_data) {
    auto mgd = mgd_iter.second;
    if (!options_.disable_seek_compaction && mgd->have_stat_update) {
      if (mgd->super_version->current->UpdateStats(mgd->stats)) {
        schedule_flush_or_compaction = true;
      }
2931
    }
2932 2933 2934
    if (mgd->super_version->Unref()) {
      mgd->super_version->Cleanup();
      superversions_to_delete.push_back(mgd->super_version);
2935 2936
    }
  }
2937 2938 2939 2940 2941 2942 2943 2944 2945 2946
  if (schedule_flush_or_compaction) {
    MaybeScheduleFlushOrCompaction();
  }
  mutex_.Unlock();

  for (auto td : superversions_to_delete) {
    delete td;
  }
  for (auto mgd : multiget_cf_data) {
    delete mgd.second;
2947
  }
2948

2949
  RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
2950 2951
  RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, num_keys);
  RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytes_read);
2952

2953
  return stat_list;
2954 2955
}

2956
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
2957
                                  const std::string& column_family_name,
2958
                                  ColumnFamilyHandle* handle) {
2959
  MutexLock l(&mutex_);
2960
  if (versions_->GetColumnFamilySet()->Exists(column_family_name)) {
I
Igor Canadi 已提交
2961 2962
    return Status::InvalidArgument("Column family already exists");
  }
2963
  VersionEdit edit;
2964
  edit.AddColumnFamily(column_family_name);
I
Igor Canadi 已提交
2965
  handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2966
  edit.SetColumnFamily(handle->id);
2967
  Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
2968 2969
  if (s.ok()) {
    // add to internal data structures
2970
    versions_->CreateColumnFamily(options, &edit);
2971
  }
2972 2973
  Log(options_.info_log, "Created column family %s\n",
      column_family_name.c_str());
2974
  return s;
2975 2976 2977
}

Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
2978 2979 2980
  if (column_family.id == 0) {
    return Status::InvalidArgument("Can't drop default column family");
  }
2981
  mutex_.Lock();
I
Igor Canadi 已提交
2982
  if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) {
2983 2984
    return Status::NotFound("Column family not found");
  }
I
Igor Canadi 已提交
2985 2986 2987
  VersionEdit edit;
  edit.DropColumnFamily();
  edit.SetColumnFamily(column_family.id);
2988
  Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
2989 2990
  if (s.ok()) {
    // remove from internal data structures
2991
    versions_->DropColumnFamily(&edit);
2992
  }
2993 2994 2995 2996
  DeletionState deletion_state;
  FindObsoleteFiles(deletion_state, false, true);
  mutex_.Unlock();
  PurgeObsoleteFiles(deletion_state);
2997 2998
  Log(options_.info_log, "Dropped column family with id %u\n",
      column_family.id);
2999
  return s;
3000 3001
}

3002
bool DBImpl::KeyMayExist(const ReadOptions& options,
3003 3004
                         const ColumnFamilyHandle& column_family,
                         const Slice& key, std::string* value,
3005 3006
                         bool* value_found) {
  if (value_found != nullptr) {
K
Kai Liu 已提交
3007 3008
    // falsify later if key-may-exist but can't fetch value
    *value_found = true;
3009
  }
3010 3011
  ReadOptions roptions = options;
  roptions.read_tier = kBlockCacheTier; // read from block cache only
3012
  auto s = GetImpl(roptions, column_family, key, value, value_found);
K
Kai Liu 已提交
3013 3014 3015 3016 3017

  // If options.block_cache != nullptr and the index block of the table didn't
  // not present in block_cache, the return value will be Status::Incomplete.
  // In this case, key may still exist in the table.
  return s.ok() || s.IsIncomplete();
3018 3019
}

3020 3021
Iterator* DBImpl::NewIterator(const ReadOptions& options,
                              const ColumnFamilyHandle& column_family) {
3022 3023 3024 3025 3026 3027 3028 3029 3030 3031
  SequenceNumber latest_snapshot = 0;
  SuperVersion* super_version = nullptr;
  mutex_.Lock();
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
  assert(cfd != nullptr);
  if (!options.tailing) {
    super_version = cfd->GetSuperVersion()->Ref();
    latest_snapshot = versions_->LastSequence();
  }
  mutex_.Unlock();
T
Tomislav Novak 已提交
3032

3033
  Iterator* iter;
T
Tomislav Novak 已提交
3034
  if (options.tailing) {
3035
    iter = new TailingIterator(this, options, cfd);
T
Tomislav Novak 已提交
3036
  } else {
3037
    iter = NewInternalIterator(options, cfd, super_version);
T
Tomislav Novak 已提交
3038 3039

    iter = NewDBIterator(
3040 3041 3042 3043
        &dbname_, env_, options_, cfd->user_comparator(), iter,
        (options.snapshot != nullptr
             ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
             : latest_snapshot));
T
Tomislav Novak 已提交
3044 3045
  }

T
Tyler Harter 已提交
3046 3047 3048 3049 3050 3051 3052
  if (options.prefix) {
    // use extra wrapper to exclude any keys from the results which
    // don't begin with the prefix
    iter = new PrefixFilterIterator(iter, *options.prefix,
                                    options_.prefix_extractor);
  }
  return iter;
J
jorlow@chromium.org 已提交
3053 3054
}

3055 3056 3057 3058 3059 3060 3061 3062
Status DBImpl::NewIterators(
    const ReadOptions& options,
    const std::vector<ColumnFamilyHandle>& column_family,
    std::vector<Iterator*>* iterators) {
  // TODO
  return Status::NotSupported("Not yet!");
}

J
jorlow@chromium.org 已提交
3063 3064
const Snapshot* DBImpl::GetSnapshot() {
  MutexLock l(&mutex_);
3065
  return snapshots_.New(versions_->LastSequence());
J
jorlow@chromium.org 已提交
3066 3067 3068 3069
}

void DBImpl::ReleaseSnapshot(const Snapshot* s) {
  MutexLock l(&mutex_);
3070
  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
J
jorlow@chromium.org 已提交
3071 3072 3073
}

// Convenience methods
3074 3075 3076 3077
Status DBImpl::Put(const WriteOptions& o,
                   const ColumnFamilyHandle& column_family, const Slice& key,
                   const Slice& val) {
  return DB::Put(o, column_family, key, val);
J
jorlow@chromium.org 已提交
3078 3079
}

3080 3081
Status DBImpl::Merge(const WriteOptions& o,
                     const ColumnFamilyHandle& column_family, const Slice& key,
3082 3083 3084 3085
                     const Slice& val) {
  if (!options_.merge_operator) {
    return Status::NotSupported("Provide a merge_operator when opening DB");
  } else {
3086
    return DB::Merge(o, column_family, key, val);
3087 3088 3089
  }
}

3090 3091 3092 3093
Status DBImpl::Delete(const WriteOptions& options,
                      const ColumnFamilyHandle& column_family,
                      const Slice& key) {
  return DB::Delete(options, column_family, key);
J
jorlow@chromium.org 已提交
3094 3095
}

3096 3097 3098 3099
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
H
heyongqiang 已提交
3100
  w.disableWAL = options.disableWAL;
3101
  w.done = false;
3102

3103
  StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false);
I
Igor Canadi 已提交
3104
  mutex_.Lock();
3105 3106 3107 3108
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
3109 3110 3111 3112 3113

  if (!options.disableWAL) {
    RecordTick(options_.statistics.get(), WRITE_WITH_WAL, 1);
  }

3114
  if (w.done) {
I
Igor Canadi 已提交
3115
    mutex_.Unlock();
3116
    RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1);
3117
    return w.status;
3118 3119
  } else {
    RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1);
3120 3121
  }

3122 3123 3124 3125 3126 3127 3128 3129
  Status status;
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    // May temporarily unlock and wait.
    status = MakeRoomForWrite(cfd, my_batch == nullptr);
    if (!status.ok()) {
      break;
    }
  }
D
dgrogan@chromium.org 已提交
3130
  uint64_t last_sequence = versions_->LastSequence();
3131
  Writer* last_writer = &w;
3132
  if (status.ok() && my_batch != nullptr) {  // nullptr batch is for compactions
3133 3134
    autovector<WriteBatch*> write_batch_group;
    BuildBatchGroup(&last_writer, &write_batch_group);
3135

3136 3137 3138
    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
3139
    // into memtables
3140
    {
3141
      mutex_.Unlock();
3142 3143 3144 3145 3146 3147 3148 3149 3150 3151
      WriteBatch* updates = nullptr;
      if (write_batch_group.size() == 1) {
        updates = write_batch_group[0];
      } else {
        updates = &tmp_batch_;
        for (size_t i = 0; i < write_batch_group.size(); ++i) {
          WriteBatchInternal::Append(updates, write_batch_group[i]);
        }
      }

3152 3153 3154 3155 3156
      const SequenceNumber current_sequence = last_sequence + 1;
      WriteBatchInternal::SetSequence(updates, current_sequence);
      int my_batch_count = WriteBatchInternal::Count(updates);
      last_sequence += my_batch_count;
      // Record statistics
3157 3158 3159
      RecordTick(options_.statistics.get(),
                 NUMBER_KEYS_WRITTEN, my_batch_count);
      RecordTick(options_.statistics.get(),
3160 3161
                 BYTES_WRITTEN,
                 WriteBatchInternal::ByteSize(updates));
3162 3163
      if (options.disableWAL) {
        flush_on_destroy_ = true;
3164 3165 3166
      }

      if (!options.disableWAL) {
3167 3168
        StopWatchNano timer(env_);
        StartPerfTimer(&timer);
3169 3170 3171 3172
        Slice log_entry = WriteBatchInternal::Contents(updates);
        status = log_->AddRecord(log_entry);
        RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1);
        RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size());
3173
        BumpPerfTime(&perf_context.wal_write_time, &timer);
H
heyongqiang 已提交
3174
        if (status.ok() && options.sync) {
3175
          if (options_.use_fsync) {
3176
            StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
3177
            status = log_->file()->Fsync();
3178
          } else {
3179
            StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
3180
            status = log_->file()->Sync();
3181
          }
H
heyongqiang 已提交
3182
        }
3183 3184
      }
      if (status.ok()) {
3185 3186 3187 3188 3189 3190 3191 3192
        // TODO(icanadi) this accesses column_family_set_ without any lock.
        // We'll need to add a spinlock for reading that we also lock when we
        // write to a column family (only on column family add/drop, which is
        // a very rare action)
        status = WriteBatchInternal::InsertInto(
            updates, column_family_memtables_.get(), &options_, this,
            options_.filter_deletes);

3193 3194 3195 3196 3197 3198 3199
        if (!status.ok()) {
          // Panic for in-memory corruptions
          // Note that existing logic was not sound. Any partial failure writing
          // into the memtable would result in a state that some write ops might
          // have succeeded in memtable but Status reports error for all writes.
          throw std::runtime_error("In memory WriteBatch corruption!");
        }
I
Igor Canadi 已提交
3200 3201
        SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
                       last_sequence);
3202
      }
3203
      if (updates == &tmp_batch_) tmp_batch_.Clear();
3204 3205
      mutex_.Lock();
      if (status.ok()) {
3206
        versions_->SetLastSequence(last_sequence);
3207
      }
J
jorlow@chromium.org 已提交
3208 3209
    }
  }
I
Igor Canadi 已提交
3210 3211 3212
  if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) {
    bg_error_ = status; // stop compaction & fail any further writes
  }
3213

3214 3215 3216 3217 3218 3219 3220
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
3221
    }
3222 3223
    if (ready == last_writer) break;
  }
3224

3225 3226 3227
  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
3228
  }
I
Igor Canadi 已提交
3229
  mutex_.Unlock();
J
jorlow@chromium.org 已提交
3230 3231 3232
  return status;
}

3233
// REQUIRES: Writer list must be non-empty
3234
// REQUIRES: First writer must have a non-nullptr batch
3235 3236
void DBImpl::BuildBatchGroup(Writer** last_writer,
                             autovector<WriteBatch*>* write_batch_group) {
3237 3238
  assert(!writers_.empty());
  Writer* first = writers_.front();
3239
  assert(first->batch != nullptr);
3240 3241

  size_t size = WriteBatchInternal::ByteSize(first->batch);
3242
  write_batch_group->push_back(first->batch);
3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261

  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
  size_t max_size = 1 << 20;
  if (size <= (128<<10)) {
    max_size = size + (128<<10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
      // Do not include a sync write into a batch handled by a non-sync write.
      break;
    }

H
heyongqiang 已提交
3262 3263 3264 3265 3266 3267
    if (!w->disableWAL && first->disableWAL) {
      // Do not include a write that needs WAL into a batch that has
      // WAL disabled.
      break;
    }

3268
    if (w->batch != nullptr) {
3269 3270 3271 3272 3273 3274
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {
        // Do not make batch too big
        break;
      }

3275
      write_batch_group->push_back(w->batch);
3276 3277 3278 3279 3280
    }
    *last_writer = w;
  }
}

3281 3282 3283
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
J
Jim Paton 已提交
3284 3285 3286 3287
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
//                    (top - bottom)
3288 3289 3290 3291
//  and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
M
Mark Callaghan 已提交
3292
uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
3293
  uint64_t delay;
J
Jim Paton 已提交
3294
  if (n >= top) {
3295 3296
    delay = 1000;
  }
J
Jim Paton 已提交
3297
  else if (n < bottom) {
3298 3299 3300 3301
    delay = 0;
  }
  else {
    // If we are here, we know that:
J
Jim Paton 已提交
3302
    //   level0_start_slowdown <= n < level0_slowdown
3303
    // since the previous two conditions are false.
M
Mark Callaghan 已提交
3304 3305
    double how_much =
      (double) (n - bottom) /
J
Jim Paton 已提交
3306
              (top - bottom);
M
Mark Callaghan 已提交
3307
    delay = std::max(how_much * how_much * 1000, 100.0);
3308 3309 3310 3311 3312
  }
  assert(delay <= 1000);
  return delay;
}

3313
// REQUIRES: mutex_ is held
3314
// REQUIRES: this thread is currently at the front of the writer queue
3315
Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
3316
  mutex_.AssertHeld();
3317
  assert(!writers_.empty());
3318
  bool allow_delay = !force;
J
Jim Paton 已提交
3319 3320
  bool allow_hard_rate_limit_delay = !force;
  bool allow_soft_rate_limit_delay = !force;
3321
  uint64_t rate_limit_delay_millis = 0;
3322
  Status s;
3323
  double score;
3324

3325 3326 3327 3328 3329
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
3330
    } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) {
3331 3332 3333
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
3334
      // individual write by 0-1ms to reduce latency variance.  Also,
3335 3336
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
3337
      uint64_t slowdown =
3338 3339 3340
          SlowdownAmount(cfd->current()->NumLevelFiles(0),
                         cfd->options()->level0_slowdown_writes_trigger,
                         cfd->options()->level0_stop_writes_trigger);
3341
      mutex_.Unlock();
3342
      uint64_t delayed;
J
Jim Paton 已提交
3343
      {
3344
        StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT);
3345
        env_->SleepForMicroseconds(slowdown);
3346
        delayed = sw.ElapsedMicros();
J
Jim Paton 已提交
3347
      }
3348
      RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed);
3349 3350
      cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_SLOWDOWN,
                                              delayed);
3351 3352
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
3353
      delayed_writes_++;
3354 3355
    } else if (!force && (cfd->mem()->ApproximateMemoryUsage() <=
                          cfd->options()->write_buffer_size)) {
3356
      // There is room in current memtable
3357 3358 3359
      if (allow_delay) {
        DelayLoggingAndReset();
      }
3360
      break;
3361 3362
    } else if (cfd->imm()->size() ==
               cfd->options()->max_write_buffer_number - 1) {
3363
      // We have filled up the current memtable, but the previous
3364 3365
      // ones are still being compacted, so we wait.
      DelayLoggingAndReset();
3366
      Log(options_.info_log, "wait for memtable compaction...\n");
3367
      uint64_t stall;
J
Jim Paton 已提交
3368
      {
3369
        StopWatch sw(env_, options_.statistics.get(),
3370
                     STALL_MEMTABLE_COMPACTION_COUNT);
J
Jim Paton 已提交
3371
        bg_cv_.Wait();
3372
        stall = sw.ElapsedMicros();
J
Jim Paton 已提交
3373
      }
3374 3375
      RecordTick(options_.statistics.get(),
                 STALL_MEMTABLE_COMPACTION_MICROS, stall);
3376 3377
      cfd->internal_stats()->RecordWriteStall(
          InternalStats::MEMTABLE_COMPACTION, stall);
3378 3379
    } else if (cfd->current()->NumLevelFiles(0) >=
               cfd->options()->level0_stop_writes_trigger) {
3380
      // There are too many level-0 files.
3381 3382
      DelayLoggingAndReset();
      Log(options_.info_log, "wait for fewer level0 files...\n");
3383
      uint64_t stall;
J
Jim Paton 已提交
3384
      {
3385 3386
        StopWatch sw(env_, options_.statistics.get(),
                     STALL_L0_NUM_FILES_COUNT);
J
Jim Paton 已提交
3387
        bg_cv_.Wait();
3388
        stall = sw.ElapsedMicros();
J
Jim Paton 已提交
3389
      }
3390
      RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall);
3391 3392
      cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES,
                                              stall);
3393
    } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 &&
3394 3395
               (score = cfd->current()->MaxCompactionScore()) >
                   cfd->options()->hard_rate_limit) {
3396
      // Delay a write when the compaction score for any level is too large.
3397
      int max_level = cfd->current()->MaxCompactionScoreLevel();
3398
      mutex_.Unlock();
3399
      uint64_t delayed;
J
Jim Paton 已提交
3400
      {
3401 3402
        StopWatch sw(env_, options_.statistics.get(),
                     HARD_RATE_LIMIT_DELAY_COUNT);
J
Jim Paton 已提交
3403
        env_->SleepForMicroseconds(1000);
3404
        delayed = sw.ElapsedMicros();
J
Jim Paton 已提交
3405
      }
3406
      cfd->internal_stats()->RecordLevelNSlowdown(max_level, delayed);
3407
      // Make sure the following value doesn't round to zero.
3408 3409
      uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
      rate_limit_delay_millis += rate_limit;
3410 3411
      RecordTick(options_.statistics.get(),
                 RATE_LIMIT_DELAY_MILLIS, rate_limit);
3412
      if (cfd->options()->rate_limit_delay_max_milliseconds > 0 &&
J
Jim Paton 已提交
3413
          rate_limit_delay_millis >=
3414
              (unsigned)cfd->options()->rate_limit_delay_max_milliseconds) {
J
Jim Paton 已提交
3415
        allow_hard_rate_limit_delay = false;
3416
      }
3417
      mutex_.Lock();
3418 3419 3420 3421
    } else if (allow_soft_rate_limit_delay &&
               cfd->options()->soft_rate_limit > 0.0 &&
               (score = cfd->current()->MaxCompactionScore()) >
                   cfd->options()->soft_rate_limit) {
J
Jim Paton 已提交
3422 3423 3424
      // Delay a write when the compaction score for any level is too large.
      // TODO: add statistics
      mutex_.Unlock();
J
Jim Paton 已提交
3425
      {
3426 3427
        StopWatch sw(env_, options_.statistics.get(),
                     SOFT_RATE_LIMIT_DELAY_COUNT);
3428 3429 3430
        env_->SleepForMicroseconds(
            SlowdownAmount(score, cfd->options()->soft_rate_limit,
                           cfd->options()->hard_rate_limit));
J
Jim Paton 已提交
3431 3432
        rate_limit_delay_millis += sw.ElapsedMicros();
      }
J
Jim Paton 已提交
3433 3434
      allow_soft_rate_limit_delay = false;
      mutex_.Lock();
3435

3436
    } else {
3437 3438 3439 3440 3441
      unique_ptr<WritableFile> lfile;
      MemTable* memtmp = nullptr;

      // Attempt to switch to a new memtable and trigger compaction of old.
      // Do this without holding the dbmutex lock.
3442 3443
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
I
Igor Canadi 已提交
3444
      SuperVersion* new_superversion = nullptr;
3445 3446 3447 3448 3449
      mutex_.Unlock();
      {
        EnvOptions soptions(storage_options_);
        soptions.use_mmap_writes = false;
        DelayLoggingAndReset();
3450 3451
        s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
                                  &lfile, soptions);
3452 3453 3454
        if (s.ok()) {
          // Our final size should be less than write_buffer_size
          // (compression, etc) but err on the side of caution.
3455 3456
          lfile->SetPreallocationBlockSize(1.1 *
                                           cfd->options()->write_buffer_size);
I
Igor Canadi 已提交
3457
          memtmp = new MemTable(cfd->internal_comparator(), *cfd->options());
3458
          new_superversion = new SuperVersion();
3459 3460 3461
        }
      }
      mutex_.Lock();
3462
      if (!s.ok()) {
H
heyongqiang 已提交
3463
        // Avoid chewing through file number space in a tight loop.
3464
        versions_->ReuseFileNumber(new_log_number);
3465
        assert (!memtmp);
3466 3467
        break;
      }
3468
      logfile_number_ = new_log_number;
3469
      log_.reset(new log::Writer(std::move(lfile)));
3470 3471
      cfd->mem()->SetNextLogNumber(logfile_number_);
      cfd->imm()->Add(cfd->mem());
3472
      if (force) {
3473
        cfd->imm()->FlushRequested();
3474
      }
3475 3476
      memtmp->Ref();
      memtmp->SetLogNumber(logfile_number_);
3477
      cfd->SetMemtable(memtmp);
3478
      Log(options_.info_log, "New memtable created with log file: #%lu\n",
K
Kai Liu 已提交
3479
          (unsigned long)logfile_number_);
3480
      force = false;   // Do not force another compaction if have room
3481
      MaybeScheduleFlushOrCompaction();
3482
      delete cfd->InstallSuperVersion(new_superversion);
3483 3484 3485 3486 3487
    }
  }
  return s;
}

I
Igor Canadi 已提交
3488 3489 3490 3491
const std::string& DBImpl::GetName() const {
  return dbname_;
}

3492 3493 3494 3495
Env* DBImpl::GetEnv() const {
  return env_;
}

3496 3497
const Options& DBImpl::GetOptions(const ColumnFamilyHandle& column_family)
    const {
I
Igor Canadi 已提交
3498 3499 3500
  return options_;
}

3501 3502
bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
                         const Slice& property, std::string* value) {
3503
  value->clear();
J
jorlow@chromium.org 已提交
3504
  MutexLock l(&mutex_);
I
Igor Canadi 已提交
3505
  auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
3506
  assert(cfd != nullptr);
3507
  return cfd->internal_stats()->GetProperty(property, value, cfd);
J
jorlow@chromium.org 已提交
3508 3509
}

3510 3511
void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family,
                                 const Range* range, int n, uint64_t* sizes) {
J
jorlow@chromium.org 已提交
3512 3513 3514 3515
  // TODO(opt): better implementation
  Version* v;
  {
    MutexLock l(&mutex_);
3516 3517 3518 3519
    auto cfd =
        versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
    assert(cfd != nullptr);
    v = cfd->current();
3520
    v->Ref();
J
jorlow@chromium.org 已提交
3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537
  }

  for (int i = 0; i < n; i++) {
    // Convert user_key into a corresponding internal key.
    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
    sizes[i] = (limit >= start ? limit - start : 0);
  }

  {
    MutexLock l(&mutex_);
    v->Unref();
  }
}

3538 3539 3540 3541 3542 3543 3544
inline void DBImpl::DelayLoggingAndReset() {
  if (delayed_writes_ > 0) {
    Log(options_.info_log, "delayed %d write...\n", delayed_writes_ );
    delayed_writes_ = 0;
  }
}

3545 3546 3547
Status DBImpl::DeleteFile(std::string name) {
  uint64_t number;
  FileType type;
3548 3549 3550 3551
  WalFileType log_type;
  if (!ParseFileName(name, &number, &type, &log_type) ||
      (type != kTableFile && type != kLogFile)) {
    Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str());
3552 3553 3554
    return Status::InvalidArgument("Invalid file name");
  }

3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568
  Status status;
  if (type == kLogFile) {
    // Only allow deleting archived log files
    if (log_type != kArchivedLogFile) {
      Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str());
      return Status::NotSupported("Delete only supported for archived logs");
    }
    status = env_->DeleteFile(options_.wal_dir + "/" + name.c_str());
    if (!status.ok()) {
      Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str());
    }
    return status;
  }

3569 3570
  int level;
  FileMetaData metadata;
3571
  ColumnFamilyData* cfd;
3572
  VersionEdit edit;
I
Igor Canadi 已提交
3573
  DeletionState deletion_state(0, true);
D
Dhruba Borthakur 已提交
3574 3575
  {
    MutexLock l(&mutex_);
3576
    status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
D
Dhruba Borthakur 已提交
3577
    if (!status.ok()) {
3578 3579
      Log(options_.info_log, "DeleteFile %s failed. File not found\n",
                             name.c_str());
D
Dhruba Borthakur 已提交
3580 3581
      return Status::InvalidArgument("File not found");
    }
I
Igor Canadi 已提交
3582
    assert((level > 0) && (level < cfd->NumberLevels()));
3583

D
Dhruba Borthakur 已提交
3584 3585
    // If the file is being compacted no need to delete.
    if (metadata.being_compacted) {
3586
      Log(options_.info_log,
3587
          "DeleteFile %s Skipped. File about to be compacted\n", name.c_str());
D
Dhruba Borthakur 已提交
3588
      return Status::OK();
3589 3590
    }

D
Dhruba Borthakur 已提交
3591 3592 3593
    // Only the files in the last level can be deleted externally.
    // This is to make sure that any deletion tombstones are not
    // lost. Check that the level passed is the last level.
I
Igor Canadi 已提交
3594
    for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3595
      if (cfd->current()->NumLevelFiles(i) != 0) {
D
Dhruba Borthakur 已提交
3596
        Log(options_.info_log,
3597
            "DeleteFile %s FAILED. File not in last level\n", name.c_str());
D
Dhruba Borthakur 已提交
3598 3599 3600 3601
        return Status::InvalidArgument("File not in last level");
      }
    }
    edit.DeleteFile(level, number);
3602
    status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
I
Igor Canadi 已提交
3603
    if (status.ok()) {
3604
      InstallSuperVersion(cfd, deletion_state);
I
Igor Canadi 已提交
3605
    }
I
Igor Canadi 已提交
3606
    FindObsoleteFiles(deletion_state, false);
D
Dhruba Borthakur 已提交
3607
  } // lock released here
I
Igor Canadi 已提交
3608
  LogFlush(options_.info_log);
I
Igor Canadi 已提交
3609 3610
  // remove files outside the db-lock
  PurgeObsoleteFiles(deletion_state);
3611 3612 3613
  return status;
}

3614
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata) {
3615 3616 3617 3618
  MutexLock l(&mutex_);
  return versions_->GetLiveFilesMetaData(metadata);
}

3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645
Status DBImpl::GetDbIdentity(std::string& identity) {
  std::string idfilename = IdentityFileName(dbname_);
  unique_ptr<SequentialFile> idfile;
  const EnvOptions soptions;
  Status s = env_->NewSequentialFile(idfilename, &idfile, soptions);
  if (!s.ok()) {
    return s;
  }
  uint64_t file_size;
  s = env_->GetFileSize(idfilename, &file_size);
  if (!s.ok()) {
    return s;
  }
  char buffer[file_size];
  Slice id;
  s = idfile->Read(file_size, &id, buffer);
  if (!s.ok()) {
    return s;
  }
  identity.assign(id.ToString());
  // If last character is '\n' remove it from identity
  if (identity.size() > 0 && identity.back() == '\n') {
    identity.pop_back();
  }
  return s;
}

J
jorlow@chromium.org 已提交
3646 3647
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
3648 3649
Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family,
               const Slice& key, const Slice& value) {
3650 3651 3652 3653
  // Pre-allocate size of write batch conservatively.
  // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
  // and we allocate 11 extra bytes for key length, as well as value length.
  WriteBatch batch(key.size() + value.size() + 24);
3654
  batch.Put(column_family.id, key, value);
J
jorlow@chromium.org 已提交
3655 3656 3657
  return Write(opt, &batch);
}

3658 3659
Status DB::Delete(const WriteOptions& opt,
                  const ColumnFamilyHandle& column_family, const Slice& key) {
J
jorlow@chromium.org 已提交
3660
  WriteBatch batch;
3661
  batch.Delete(column_family.id, key);
J
jorlow@chromium.org 已提交
3662 3663 3664
  return Write(opt, &batch);
}

3665 3666
Status DB::Merge(const WriteOptions& opt,
                 const ColumnFamilyHandle& column_family, const Slice& key,
3667 3668
                 const Slice& value) {
  WriteBatch batch;
3669
  batch.Merge(column_family.id, key, value);
3670 3671 3672
  return Write(opt, &batch);
}

3673 3674
// Default implementation -- returns not supported status
Status DB::CreateColumnFamily(const ColumnFamilyOptions& options,
3675
                              const std::string& column_family_name,
3676 3677
                              ColumnFamilyHandle* handle) {
  return Status::NotSupported("");
3678 3679
}
Status DB::DropColumnFamily(const ColumnFamilyHandle& column_family) {
3680
  return Status::NotSupported("");
3681 3682
}

J
jorlow@chromium.org 已提交
3683 3684
DB::~DB() { }

J
Jim Paton 已提交
3685
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699
  DBOptions db_options(options);
  ColumnFamilyOptions cf_options(options);
  std::vector<ColumnFamilyDescriptor> column_families;
  column_families.push_back(
      ColumnFamilyDescriptor(default_column_family_name, cf_options));
  std::vector<ColumnFamilyHandle> handles;
  return DB::OpenWithColumnFamilies(db_options, dbname, column_families,
                                    &handles, dbptr);
}

Status DB::OpenWithColumnFamilies(
    const DBOptions& db_options, const std::string& dbname,
    const std::vector<ColumnFamilyDescriptor>& column_families,
    std::vector<ColumnFamilyHandle>* handles, DB** dbptr) {
3700
  *dbptr = nullptr;
H
Haobo Xu 已提交
3701
  EnvOptions soptions;
3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712
  // TODO temporary until we change DBImpl to accept
  // DBOptions instead of Options
  ColumnFamilyOptions default_column_family_options;
  for (auto cfd : column_families) {
    if (cfd.name == default_column_family_name) {
      default_column_family_options = cfd.options;
      break;
    }
  }
  // default options
  Options options(db_options, default_column_family_options);
J
jorlow@chromium.org 已提交
3713

3714
  if (options.block_cache != nullptr && options.no_block_cache) {
3715
    return Status::InvalidArgument(
3716
        "no_block_cache is true while block_cache is not nullptr");
3717
  }
3718

J
jorlow@chromium.org 已提交
3719
  DBImpl* impl = new DBImpl(options, dbname);
3720 3721 3722 3723 3724 3725 3726
  Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
  if (!s.ok()) {
    delete impl;
    return s;
  }

  s = impl->CreateArchivalDirectory();
3727 3728 3729 3730
  if (!s.ok()) {
    delete impl;
    return s;
  }
J
jorlow@chromium.org 已提交
3731
  impl->mutex_.Lock();
3732 3733
  // Handles create_if_missing, error_if_exists
  s = impl->Recover(column_families);
J
jorlow@chromium.org 已提交
3734
  if (s.ok()) {
3735
    uint64_t new_log_number = impl->versions_->NewFileNumber();
3736
    unique_ptr<WritableFile> lfile;
H
Haobo Xu 已提交
3737
    soptions.use_mmap_writes = false;
3738
    s = impl->options_.env->NewWritableFile(
3739 3740 3741 3742
      LogFileName(impl->options_.wal_dir, new_log_number),
      &lfile,
      soptions
    );
J
jorlow@chromium.org 已提交
3743
    if (s.ok()) {
3744
      lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
I
Igor Canadi 已提交
3745
      VersionEdit edit;
3746
      impl->logfile_number_ = new_log_number;
3747
      impl->log_.reset(new log::Writer(std::move(lfile)));
3748 3749 3750 3751
      // We use this LogAndApply just to store the next file number, the one
      // that we used by calling impl->versions_->NewFileNumber()
      // The used log number are already written to manifest in RecoverLogFile()
      // method
3752
      s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_,
3753
                                       impl->db_directory_.get());
J
jorlow@chromium.org 已提交
3754 3755
    }
    if (s.ok()) {
3756 3757 3758
      // set column family handles
      handles->clear();
      for (auto cf : column_families) {
I
Igor Canadi 已提交
3759 3760 3761 3762 3763 3764 3765
        if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) {
          s = Status::InvalidArgument("Column family not found: ", cf.name);
          handles->clear();
          break;
        }
        uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name);
        handles->push_back(ColumnFamilyHandle(id));
3766
      }
I
Igor Canadi 已提交
3767 3768
    }
    if (s.ok()) {
3769
      for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
3770 3771
        delete cfd->InstallSuperVersion(new SuperVersion());
        cfd->mem()->SetLogNumber(impl->logfile_number_);
3772
      }
J
jorlow@chromium.org 已提交
3773
      impl->DeleteObsoleteFiles();
3774
      impl->MaybeScheduleFlushOrCompaction();
3775
      impl->MaybeScheduleLogDBDeployStats();
3776
      s = impl->db_directory_->Fsync();
J
jorlow@chromium.org 已提交
3777 3778
    }
  }
3779

I
Igor Canadi 已提交
3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793
  if (s.ok()) {
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
      if (cfd->options()->compaction_style == kCompactionStyleUniversal) {
        Version* current = cfd->current();
        for (int i = 1; i < current->NumberLevels(); ++i) {
          int num_files = current->NumLevelFiles(i);
          if (num_files > 0) {
            s = Status::InvalidArgument("Not all files are at level 0. Cannot "
                "open with universal compaction style.");
            break;
          }
        }
      }
      if (!s.ok()) {
3794 3795 3796 3797 3798
        break;
      }
    }
  }

3799 3800
  impl->mutex_.Unlock();

J
jorlow@chromium.org 已提交
3801 3802 3803
  if (s.ok()) {
    *dbptr = impl;
  } else {
3804
    handles->clear();
J
jorlow@chromium.org 已提交
3805 3806 3807 3808 3809
    delete impl;
  }
  return s;
}

3810 3811 3812
Status DB::ListColumnFamilies(const DBOptions& db_options,
                              const std::string& name,
                              std::vector<std::string>* column_families) {
I
Igor Canadi 已提交
3813
  return VersionSet::ListColumnFamilies(column_families, name, db_options.env);
3814 3815
}

3816 3817 3818
Snapshot::~Snapshot() {
}

J
jorlow@chromium.org 已提交
3819
Status DestroyDB(const std::string& dbname, const Options& options) {
3820 3821 3822 3823 3824
  const InternalKeyComparator comparator(options.comparator);
  const InternalFilterPolicy filter_policy(options.filter_policy);
  const Options& soptions(SanitizeOptions(
    dbname, &comparator, &filter_policy, options));
  Env* env = soptions.env;
J
jorlow@chromium.org 已提交
3825
  std::vector<std::string> filenames;
3826 3827
  std::vector<std::string> archiveFiles;

3828
  std::string archivedir = ArchivalDirectory(dbname);
J
jorlow@chromium.org 已提交
3829 3830
  // Ignore error in case directory does not exist
  env->GetChildren(dbname, &filenames);
3831 3832 3833 3834 3835 3836 3837

  if (dbname != soptions.wal_dir) {
    std::vector<std::string> logfilenames;
    env->GetChildren(soptions.wal_dir, &logfilenames);
    filenames.insert(filenames.end(), logfilenames.begin(), logfilenames.end());
    archivedir = ArchivalDirectory(soptions.wal_dir);
  }
3838

J
jorlow@chromium.org 已提交
3839 3840 3841 3842 3843
  if (filenames.empty()) {
    return Status::OK();
  }

  FileLock* lock;
3844 3845
  const std::string lockname = LockFileName(dbname);
  Status result = env->LockFile(lockname, &lock);
J
jorlow@chromium.org 已提交
3846 3847 3848
  if (result.ok()) {
    uint64_t number;
    FileType type;
D
dgrogan@chromium.org 已提交
3849
    for (size_t i = 0; i < filenames.size(); i++) {
3850
      if (ParseFileName(filenames[i], &number, &type) &&
3851
          type != kDBLockFile) {  // Lock file will be deleted at end
K
Kosie van der Merwe 已提交
3852 3853 3854
        Status del;
        if (type == kMetaDatabase) {
          del = DestroyDB(dbname + "/" + filenames[i], options);
3855 3856
        } else if (type == kLogFile) {
          del = env->DeleteFile(soptions.wal_dir + "/" + filenames[i]);
K
Kosie van der Merwe 已提交
3857 3858 3859
        } else {
          del = env->DeleteFile(dbname + "/" + filenames[i]);
        }
J
jorlow@chromium.org 已提交
3860 3861 3862 3863 3864
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
3865

3866
    env->GetChildren(archivedir, &archiveFiles);
3867 3868
    // Delete archival files.
    for (size_t i = 0; i < archiveFiles.size(); ++i) {
3869 3870
      if (ParseFileName(archiveFiles[i], &number, &type) &&
          type == kLogFile) {
3871
        Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]);
3872 3873 3874 3875 3876
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
3877
    // ignore case where no archival directory is present.
3878
    env->DeleteDir(archivedir);
3879

J
jorlow@chromium.org 已提交
3880
    env->UnlockFile(lock);  // Ignore error since state is already gone
3881
    env->DeleteFile(lockname);
J
jorlow@chromium.org 已提交
3882
    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
3883
    env->DeleteDir(soptions.wal_dir);
J
jorlow@chromium.org 已提交
3884 3885 3886 3887
  }
  return result;
}

3888 3889
//
// A global method that can dump out the build version
3890
void dumpLeveldbBuildVersion(Logger * log) {
3891
  Log(log, "Git sha %s", rocksdb_build_git_sha);
3892
  Log(log, "Compile time %s %s",
3893
      rocksdb_build_compile_time, rocksdb_build_compile_date);
3894 3895
}

3896
}  // namespace rocksdb