db_impl.cc 128.5 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11 12
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_impl.h"

#include <algorithm>
13 14
#include <climits>
#include <cstdio>
J
jorlow@chromium.org 已提交
15
#include <set>
16
#include <stdexcept>
17 18
#include <stdint.h>
#include <string>
19
#include <unordered_set>
T
Tomislav Novak 已提交
20
#include <utility>
21
#include <vector>
22

J
jorlow@chromium.org 已提交
23
#include "db/builder.h"
24
#include "db/db_iter.h"
K
kailiu 已提交
25
#include "db/dbformat.h"
J
jorlow@chromium.org 已提交
26 27 28 29
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
K
kailiu 已提交
30
#include "db/memtable_list.h"
31
#include "db/merge_context.h"
32
#include "db/merge_helper.h"
T
Tyler Harter 已提交
33
#include "db/prefix_filter_iterator.h"
J
jorlow@chromium.org 已提交
34
#include "db/table_cache.h"
K
kailiu 已提交
35
#include "db/table_properties_collector.h"
T
Tomislav Novak 已提交
36
#include "db/tailing_iter.h"
37
#include "db/transaction_log_impl.h"
J
jorlow@chromium.org 已提交
38 39
#include "db/version_set.h"
#include "db/write_batch_internal.h"
40
#include "port/port.h"
41 42 43 44 45 46
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
S
Siying Dong 已提交
47
#include "rocksdb/table.h"
J
jorlow@chromium.org 已提交
48
#include "table/block.h"
49
#include "table/block_based_table_factory.h"
J
jorlow@chromium.org 已提交
50
#include "table/merger.h"
K
kailiu 已提交
51
#include "table/table_builder.h"
J
jorlow@chromium.org 已提交
52
#include "table/two_level_iterator.h"
53
#include "util/auto_roll_logger.h"
K
kailiu 已提交
54
#include "util/autovector.h"
55
#include "util/build_version.h"
J
jorlow@chromium.org 已提交
56
#include "util/coding.h"
I
Igor Canadi 已提交
57
#include "util/hash_skiplist_rep.h"
J
jorlow@chromium.org 已提交
58 59
#include "util/logging.h"
#include "util/mutexlock.h"
60
#include "util/perf_context_imp.h"
61
#include "util/stop_watch.h"
J
jorlow@chromium.org 已提交
62

63
namespace rocksdb {
J
jorlow@chromium.org 已提交
64

K
kailiu 已提交
65
void DumpLeveldbBuildVersion(Logger * log);
66

67 68 69 70 71
// Information kept for every waiting writer
struct DBImpl::Writer {
  Status status;
  WriteBatch* batch;
  bool sync;
H
heyongqiang 已提交
72
  bool disableWAL;
73 74 75 76 77 78
  bool done;
  port::CondVar cv;

  explicit Writer(port::Mutex* mu) : cv(mu) { }
};

J
jorlow@chromium.org 已提交
79 80 81
struct DBImpl::CompactionState {
  Compaction* const compaction;

82 83 84 85 86
  // If there were two snapshots with seq numbers s1 and
  // s2 and s1 < s2, and if we find two instances of a key k1 then lies
  // entirely within s1 and s2, then the earlier version of k1 can be safely
  // deleted because that version is not visible in any snapshot.
  std::vector<SequenceNumber> existing_snapshots;
J
jorlow@chromium.org 已提交
87 88 89 90 91 92

  // Files produced by compaction
  struct Output {
    uint64_t number;
    uint64_t file_size;
    InternalKey smallest, largest;
93
    SequenceNumber smallest_seqno, largest_seqno;
J
jorlow@chromium.org 已提交
94 95
  };
  std::vector<Output> outputs;
96
  std::list<uint64_t> allocated_file_numbers;
J
jorlow@chromium.org 已提交
97 98

  // State kept for output being generated
99 100
  unique_ptr<WritableFile> outfile;
  unique_ptr<TableBuilder> builder;
J
jorlow@chromium.org 已提交
101 102 103 104 105 106 107 108 109

  uint64_t total_bytes;

  Output* current_output() { return &outputs[outputs.size()-1]; }

  explicit CompactionState(Compaction* c)
      : compaction(c),
        total_bytes(0) {
  }
110 111 112 113 114

  // Create a client visible context of this compaction
  CompactionFilter::Context GetFilterContext() {
    CompactionFilter::Context context;
    context.is_full_compaction = compaction->IsFullCompaction();
115
    context.is_manual_compaction = compaction->IsManualCompaction();
116 117
    return context;
  }
J
jorlow@chromium.org 已提交
118 119 120
};

// Fix user-supplied options to be reasonable
121
template <class T, class V>
J
jorlow@chromium.org 已提交
122
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
D
dgrogan@chromium.org 已提交
123 124
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
J
jorlow@chromium.org 已提交
125 126 127
}
Options SanitizeOptions(const std::string& dbname,
                        const InternalKeyComparator* icmp,
S
Sanjay Ghemawat 已提交
128
                        const InternalFilterPolicy* ipolicy,
J
jorlow@chromium.org 已提交
129 130
                        const Options& src) {
  Options result = src;
131
  result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
132 133 134 135
  // result.max_open_files means an "infinite" open files.
  if (result.max_open_files != -1) {
    ClipToRange(&result.max_open_files,            20,     1000000);
  }
136 137
  ClipToRange(&result.write_buffer_size,         ((size_t)64)<<10,
                                                 ((size_t)64)<<30);
S
Sanjay Ghemawat 已提交
138
  ClipToRange(&result.block_size,                1<<10,  4<<20);
139

X
Xing Jin 已提交
140 141 142 143 144 145
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }

146 147
  result.min_write_buffer_number_to_merge = std::min(
    result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1);
148
  if (result.info_log == nullptr) {
K
Kai Liu 已提交
149 150
    Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
                                       result, &result.info_log);
J
jorlow@chromium.org 已提交
151 152
    if (!s.ok()) {
      // No place suitable for logging
153
      result.info_log = nullptr;
J
jorlow@chromium.org 已提交
154 155
    }
  }
156
  if (result.block_cache == nullptr && !result.no_block_cache) {
157 158
    result.block_cache = NewLRUCache(8 << 20);
  }
159
  result.compression_per_level = src.compression_per_level;
160 161 162
  if (result.block_size_deviation < 0 || result.block_size_deviation > 100) {
    result.block_size_deviation = 0;
  }
163 164 165
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
J
Jim Paton 已提交
166 167 168
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
169 170
  if (result.compaction_filter) {
    Log(result.info_log, "Compaction filter specified, ignore factory");
171
  }
J
Jim Paton 已提交
172
  if (result.prefix_extractor) {
I
Igor Canadi 已提交
173
    // If a prefix extractor has been supplied and a HashSkipListRepFactory is
J
Jim Paton 已提交
174 175
    // being used, make sure that the latter uses the former as its transform
    // function.
I
Igor Canadi 已提交
176
    auto factory = dynamic_cast<HashSkipListRepFactory*>(
J
Jim Paton 已提交
177
      result.memtable_factory.get());
178
    if (factory &&
179
        factory->GetTransform() != result.prefix_extractor) {
J
Jim Paton 已提交
180 181 182 183
      Log(result.info_log, "A prefix hash representation factory was supplied "
          "whose prefix extractor does not match options.prefix_extractor. "
          "Falling back to skip list representation factory");
      result.memtable_factory = std::make_shared<SkipListFactory>();
184 185
    } else if (factory) {
      Log(result.info_log, "Prefix hash memtable rep is in use.");
J
Jim Paton 已提交
186 187
    }
  }
188 189 190 191 192

  if (result.wal_dir.empty()) {
    // Use dbname as default
    result.wal_dir = dbname;
  }
193

K
kailiu 已提交
194 195 196 197 198 199
  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
  auto& collectors = result.table_properties_collectors;
  for (size_t i = 0; i < result.table_properties_collectors.size(); ++i) {
200 201
    assert(collectors[i]);
    collectors[i] =
K
kailiu 已提交
202
      std::make_shared<UserKeyTablePropertiesCollector>(collectors[i]);
203 204 205 206
  }

  // Add collector to collect internal key statistics
  collectors.push_back(
K
kailiu 已提交
207
      std::make_shared<InternalKeyPropertiesCollector>()
208 209
  );

J
jorlow@chromium.org 已提交
210 211 212
  return result;
}

S
Siying Dong 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
CompressionType GetCompressionType(const Options& options, int level,
                                   const bool enable_compression) {
  if (!enable_compression) {
    // disable compression
    return kNoCompression;
  }
  // If the use has specified a different compression level for each level,
  // then pick the compresison for that level.
  if (!options.compression_per_level.empty()) {
    const int n = options.compression_per_level.size() - 1;
    // It is possible for level_ to be -1; in that case, we use level
    // 0's compression.  This occurs mostly in backwards compatibility
    // situations when the builder doesn't know what level the file
    // belongs to.  Likewise, if level_ is beyond the end of the
    // specified compression levels, use the last value.
    return options.compression_per_level[std::max(0, std::min(level, n))];
  } else {
    return options.compression;
  }
}

234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
CompressionType GetCompressionFlush(const Options& options) {
  // Compressing memtable flushes might not help unless the sequential load
  // optimization is used for leveled compaction. Otherwise the CPU and
  // latency overhead is not offset by saving much space.

  bool can_compress;

  if  (options.compaction_style == kCompactionStyleUniversal) {
    can_compress =
        (options.compaction_options_universal.compression_size_percent < 0);
  } else {
    // For leveled compress when min_level_to_compress == 0.
    can_compress = (GetCompressionType(options, 0, true) != kNoCompression);
  }

  if (can_compress) {
    return options.compression;
  } else {
    return kNoCompression;
  }
}

J
jorlow@chromium.org 已提交
256 257
DBImpl::DBImpl(const Options& options, const std::string& dbname)
    : env_(options.env),
H
heyongqiang 已提交
258
      dbname_(dbname),
J
jorlow@chromium.org 已提交
259
      internal_comparator_(options.comparator),
260 261
      options_(SanitizeOptions(dbname, &internal_comparator_,
                               &internal_filter_policy_, options)),
H
heyongqiang 已提交
262
      internal_filter_policy_(options.filter_policy),
J
jorlow@chromium.org 已提交
263
      owns_info_log_(options_.info_log != options.info_log),
264
      db_lock_(nullptr),
H
Haobo Xu 已提交
265
      mutex_(options.use_adaptive_mutex),
266
      shutting_down_(nullptr),
J
jorlow@chromium.org 已提交
267
      bg_cv_(&mutex_),
268
      mem_(new MemTable(internal_comparator_, options_)),
I
Igor Canadi 已提交
269
      imm_(options_.min_write_buffer_number_to_merge),
270
      logfile_number_(0),
I
Igor Canadi 已提交
271
      super_version_(nullptr),
T
Tomislav Novak 已提交
272
      super_version_number_(0),
273
      tmp_batch_(),
274
      bg_compaction_scheduled_(0),
275
      bg_manual_only_(0),
276
      bg_flush_scheduled_(0),
277
      bg_logstats_scheduled_(false),
278 279
      manual_compaction_(nullptr),
      logger_(nullptr),
280
      disable_delete_obsolete_files_(0),
I
Igor Canadi 已提交
281
      delete_obsolete_files_last_run_(options.env->NowMicros()),
282
      purge_wal_files_last_run_(0),
283
      last_stats_dump_time_microsec_(0),
284
      default_interval_to_delete_obsolete_WAL_(600),
285
      flush_on_destroy_(false),
I
Igor Canadi 已提交
286 287
      internal_stats_(options.num_levels, options.env,
                      options.statistics.get()),
288
      delayed_writes_(0),
289 290 291
      storage_options_(options),
      bg_work_gate_closed_(false),
      refitting_level_(false) {
292
  mem_->Ref();
H
heyongqiang 已提交
293
  env_->GetAbsolutePath(dbname, &db_absolute_path_);
294

J
jorlow@chromium.org 已提交
295
  // Reserve ten files or so for other uses and give the rest to TableCache.
296 297 298 299
  // Give a large number for setting of "infinite" open files.
  const int table_cache_size =
      (options_.max_open_files == -1) ?
          4194304 : options_.max_open_files - 10;
300 301 302 303
  table_cache_.reset(new TableCache(dbname_, &options_,
                                    storage_options_, table_cache_size));
  versions_.reset(new VersionSet(dbname_, &options_, storage_options_,
                                 table_cache_.get(), &internal_comparator_));
304

K
kailiu 已提交
305
  DumpLeveldbBuildVersion(options_.info_log.get());
306
  options_.Dump(options_.info_log.get());
307

308
  char name[100];
K
kailiu 已提交
309 310
  Status s = env_->GetHostName(name, 100L);
  if (s.ok()) {
311 312 313 314 315 316
    host_name_ = name;
  } else {
    Log(options_.info_log, "Can't get hostname, use localhost as host name.");
    host_name_ = "localhost";
  }
  last_log_ts = 0;
317

I
Igor Canadi 已提交
318
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
319 320 321
}

DBImpl::~DBImpl() {
K
Kai Liu 已提交
322
  autovector<MemTable*> to_delete;
323

J
jorlow@chromium.org 已提交
324
  // Wait for background work to finish
325
  if (flush_on_destroy_ && mem_->GetFirstSequenceNumber() != 0) {
326 327
    FlushMemTable(FlushOptions());
  }
328
  mutex_.Lock();
329
  shutting_down_.Release_Store(this);  // Any non-nullptr value is ok
330 331 332
  while (bg_compaction_scheduled_ ||
         bg_flush_scheduled_ ||
         bg_logstats_scheduled_) {
H
hans@chromium.org 已提交
333
    bg_cv_.Wait();
J
jorlow@chromium.org 已提交
334
  }
I
Igor Canadi 已提交
335 336 337 338 339 340 341
  if (super_version_ != nullptr) {
    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
  }
J
jorlow@chromium.org 已提交
342 343
  mutex_.Unlock();

344
  if (db_lock_ != nullptr) {
J
jorlow@chromium.org 已提交
345 346 347
    env_->UnlockFile(db_lock_);
  }

348 349 350 351
  if (mem_ != nullptr) {
    delete mem_->Unref();
  }

I
Igor Canadi 已提交
352
  imm_.current()->Unref(&to_delete);
353 354 355
  for (MemTable* m: to_delete) {
    delete m;
  }
356 357 358
  // versions need to be destroyed before table_cache since it can holds
  // references to table_cache.
  versions_.reset();
I
Igor Canadi 已提交
359
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
360 361
}

A
Abhishek Kona 已提交
362
// Do not flush and close database elegantly. Simulate a crash.
363 364 365 366 367 368
void DBImpl::TEST_Destroy_DBImpl() {
  // ensure that no new memtable flushes can occur
  flush_on_destroy_ = false;

  // wait till all background compactions are done.
  mutex_.Lock();
369 370 371
  while (bg_compaction_scheduled_ ||
         bg_flush_scheduled_ ||
         bg_logstats_scheduled_) {
372 373
    bg_cv_.Wait();
  }
I
Igor Canadi 已提交
374 375 376 377 378 379 380
  if (super_version_ != nullptr) {
    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
  }
381 382

  // Prevent new compactions from occuring.
383
  bg_work_gate_closed_ = true;
384 385
  const int LargeNumber = 10000000;
  bg_compaction_scheduled_ += LargeNumber;
386

387
  mutex_.Unlock();
I
Igor Canadi 已提交
388
  LogFlush(options_.info_log);
389 390

  // force release the lock file.
391
  if (db_lock_ != nullptr) {
392 393
    env_->UnlockFile(db_lock_);
  }
394 395 396 397

  log_.reset();
  versions_.reset();
  table_cache_.reset();
398 399
}

A
Abhishek Kona 已提交
400 401 402
uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
  return versions_->ManifestFileNumber();
}
403

J
jorlow@chromium.org 已提交
404
Status DBImpl::NewDB() {
405
  VersionEdit new_db;
J
jorlow@chromium.org 已提交
406
  new_db.SetComparatorName(user_comparator()->Name());
407
  new_db.SetLogNumber(0);
J
jorlow@chromium.org 已提交
408 409 410 411
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);

  const std::string manifest = DescriptorFileName(dbname_, 1);
412
  unique_ptr<WritableFile> file;
413
  Status s = env_->NewWritableFile(manifest, &file, storage_options_);
J
jorlow@chromium.org 已提交
414 415 416
  if (!s.ok()) {
    return s;
  }
417
  file->SetPreallocationBlockSize(options_.manifest_preallocation_size);
J
jorlow@chromium.org 已提交
418
  {
419
    log::Writer log(std::move(file));
J
jorlow@chromium.org 已提交
420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
  }
  if (s.ok()) {
    // Make "CURRENT" file that points to the new manifest file.
    s = SetCurrentFile(env_, dbname_, 1);
  } else {
    env_->DeleteFile(manifest);
  }
  return s;
}

void DBImpl::MaybeIgnoreError(Status* s) const {
  if (s->ok() || options_.paranoid_checks) {
    // No change needed
  } else {
437
    Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
J
jorlow@chromium.org 已提交
438 439 440 441
    *s = Status::OK();
  }
}

442
const Status DBImpl::CreateArchivalDirectory() {
443
  if (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0) {
444
    std::string archivalPath = ArchivalDirectory(options_.wal_dir);
445 446 447 448 449
    return env_->CreateDirIfMissing(archivalPath);
  }
  return Status::OK();
}

450
void DBImpl::PrintStatistics() {
451
  auto dbstats = options_.statistics.get();
452 453
  if (dbstats) {
    Log(options_.info_log,
454 455
        "STATISTCS:\n %s",
        dbstats->ToString().c_str());
456 457 458
  }
}

459
void DBImpl::MaybeDumpStats() {
H
Haobo Xu 已提交
460 461 462 463 464 465 466 467 468 469 470 471 472
  if (options_.stats_dump_period_sec == 0) return;

  const uint64_t now_micros = env_->NowMicros();

  if (last_stats_dump_time_microsec_ +
      options_.stats_dump_period_sec * 1000000
      <= now_micros) {
    // Multiple threads could race in here simultaneously.
    // However, the last one will update last_stats_dump_time_microsec_
    // atomically. We could see more than one dump during one dump
    // period in rare cases.
    last_stats_dump_time_microsec_ = now_micros;
    std::string stats;
473
    GetProperty("rocksdb.stats", &stats);
H
Haobo Xu 已提交
474
    Log(options_.info_log, "%s", stats.c_str());
475
    PrintStatistics();
476 477 478
  }
}

I
Igor Canadi 已提交
479 480 481 482 483 484 485 486 487 488 489 490 491 492
// DBImpl::SuperVersion methods
DBImpl::SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

DBImpl::SuperVersion* DBImpl::SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool DBImpl::SuperVersion::Unref() {
  assert(refs > 0);
K
Kai Liu 已提交
493
  // fetch_sub returns the previous value of yoeref
I
Igor Canadi 已提交
494 495 496 497 498
  return refs.fetch_sub(1, std::memory_order_relaxed) == 1;
}

void DBImpl::SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
I
Igor Canadi 已提交
499
  imm->Unref(&to_delete);
I
Igor Canadi 已提交
500 501 502 503 504 505 506
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

I
Igor Canadi 已提交
507
void DBImpl::SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
I
Igor Canadi 已提交
508 509 510 511 512
                                Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
I
Igor Canadi 已提交
513
  imm->Ref();
I
Igor Canadi 已提交
514 515 516 517
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

518
// Returns the list of live files in 'sst_live' and the list
K
kailiu 已提交
519
// of all files in the filesystem in 'candidate_files'.
I
Igor Canadi 已提交
520 521 522 523 524 525 526
// no_full_scan = true -- never do the full scan using GetChildren()
// force = false -- don't force the full scan, except every
//  options_.delete_obsolete_files_period_micros
// force = true -- force the full scan
void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
                               bool force,
                               bool no_full_scan) {
D
Dhruba Borthakur 已提交
527 528
  mutex_.AssertHeld();

529
  // if deletion is disabled, do nothing
530
  if (disable_delete_obsolete_files_ > 0) {
531 532 533
    return;
  }

534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549
  bool doing_the_full_scan = false;

  // logic for figurint out if we're doing the full scan
  if (no_full_scan) {
    doing_the_full_scan = false;
  } else if (force || options_.delete_obsolete_files_period_micros == 0) {
    doing_the_full_scan = true;
  } else {
    const uint64_t now_micros = env_->NowMicros();
    if (delete_obsolete_files_last_run_ +
        options_.delete_obsolete_files_period_micros < now_micros) {
      doing_the_full_scan = true;
      delete_obsolete_files_last_run_ = now_micros;
    }
  }

I
Igor Canadi 已提交
550 551 552
  // get obsolete files
  versions_->GetObsoleteFiles(&deletion_state.sst_delete_files);

I
Igor Canadi 已提交
553 554 555 556 557
  // store the current filenum, lognum, etc
  deletion_state.manifest_file_number = versions_->ManifestFileNumber();
  deletion_state.log_number = versions_->LogNumber();
  deletion_state.prev_log_number = versions_->PrevLogNumber();

558 559 560 561 562 563 564 565
  if (!doing_the_full_scan && !deletion_state.HaveSomethingToDelete()) {
    // avoid filling up sst_live if we're sure that we
    // are not going to do the full scan and that we don't have
    // anything to delete at the moment
    return;
  }

  // don't delete live files
I
Igor Canadi 已提交
566 567 568 569
  deletion_state.sst_live.assign(pending_outputs_.begin(),
                                 pending_outputs_.end());
  versions_->AddLiveFiles(&deletion_state.sst_live);

570
  if (doing_the_full_scan) {
K
kailiu 已提交
571 572 573 574 575
    // set of all files in the directory. We'll exclude files that are still
    // alive in the subsequent processings.
    env_->GetChildren(
        dbname_, &deletion_state.candidate_files
    ); // Ignore errors
576 577 578 579 580

    //Add log files in wal_dir
    if (options_.wal_dir != dbname_) {
      std::vector<std::string> log_files;
      env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
K
kailiu 已提交
581 582
      deletion_state.candidate_files.insert(
        deletion_state.candidate_files.end(),
583 584 585
        log_files.begin(),
        log_files.end()
      );
586
    }
587
  }
588 589
}

D
Dhruba Borthakur 已提交
590
// Diffs the files listed in filenames and those that do not
I
Igor Canadi 已提交
591
// belong to live files are posibly removed. Also, removes all the
592
// files in sst_delete_files and log_delete_files.
593
// It is not necessary to hold the mutex when invoking this method.
D
Dhruba Borthakur 已提交
594
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
595
  // check if there is anything to do
K
kailiu 已提交
596 597 598
  if (state.candidate_files.empty() &&
      state.sst_delete_files.empty() &&
      state.log_delete_files.empty()) {
599 600 601
    return;
  }

I
Igor Canadi 已提交
602 603 604 605
  // this checks if FindObsoleteFiles() was run before. If not, don't do
  // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
  // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
  if (state.manifest_file_number == 0) {
I
Igor Canadi 已提交
606 607
    return;
  }
H
heyongqiang 已提交
608
  std::vector<std::string> old_log_files;
609

610 611
  // Now, convert live list to an unordered set, WITHOUT mutex held;
  // set is slow.
K
kailiu 已提交
612 613 614
  std::unordered_set<uint64_t> sst_live(
      state.sst_live.begin(), state.sst_live.end()
  );
I
Igor Canadi 已提交
615

K
kailiu 已提交
616 617 618 619 620 621 622
  auto& candidate_files = state.candidate_files;
  candidate_files.reserve(
      candidate_files.size() +
      state.sst_delete_files.size() +
      state.log_delete_files.size());
  // We may ignore the dbname when generating the file names.
  const char* kDumbDbName = "";
623
  for (auto file : state.sst_delete_files) {
K
kailiu 已提交
624 625 626
    candidate_files.push_back(
        TableFileName(kDumbDbName, file->number).substr(1)
    );
627
    delete file;
I
Igor Canadi 已提交
628 629
  }

K
kailiu 已提交
630 631 632 633 634
  for (auto file_num : state.log_delete_files) {
    if (file_num > 0) {
      candidate_files.push_back(
          LogFileName(kDumbDbName, file_num).substr(1)
      );
I
Igor Canadi 已提交
635 636
    }
  }
637

K
kailiu 已提交
638
  // dedup state.candidate_files so we don't try to delete the same
I
Igor Canadi 已提交
639
  // file twice
K
kailiu 已提交
640 641 642 643 644
  sort(candidate_files.begin(), candidate_files.end());
  candidate_files.erase(
      unique(candidate_files.begin(), candidate_files.end()),
      candidate_files.end()
  );
J
jorlow@chromium.org 已提交
645

K
kailiu 已提交
646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
  for (const auto& to_delete : candidate_files) {
    uint64_t number;
    FileType type;
    // Ignore file if we cannot recognize it.
    if (!ParseFileName(to_delete, &number, &type)) {
      continue;
    }

    bool keep = true;
    switch (type) {
      case kLogFile:
        keep = ((number >= state.log_number) ||
                (number == state.prev_log_number));
        break;
      case kDescriptorFile:
        // Keep my manifest file, and any newer incarnations'
        // (in case there is a race that allows other incarnations)
        keep = (number >= state.manifest_file_number);
        break;
      case kTableFile:
        keep = (sst_live.find(number) != sst_live.end());
        break;
      case kTempFile:
        // Any temp files that are currently being written to must
        // be recorded in pending_outputs_, which is inserted into "live"
        keep = (sst_live.find(number) != sst_live.end());
        break;
      case kInfoLogFile:
        keep = true;
        if (number != 0) {
          old_log_files.push_back(to_delete);
J
jorlow@chromium.org 已提交
677
        }
K
kailiu 已提交
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
        break;
      case kCurrentFile:
      case kDBLockFile:
      case kIdentityFile:
      case kMetaDatabase:
        keep = true;
        break;
    }

    if (keep) {
      continue;
    }

    if (type == kTableFile) {
      // evict from cache
      table_cache_->Evict(number);
    }
    std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
        "/" + to_delete;
    Log(options_.info_log,
        "Delete type=%d #%lu",
        int(type),
        (unsigned long)number);

    if (type == kLogFile &&
        (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
      Status s = env_->RenameFile(fname,
          ArchivedLogFileName(options_.wal_dir, number));
      if (!s.ok()) {
K
Kai Liu 已提交
707
        Log(options_.info_log,
K
kailiu 已提交
708 709 710 711 712 713 714 715
            "RenameFile logfile #%lu FAILED -- %s\n",
            (unsigned long)number, s.ToString().c_str());
      }
    } else {
      Status s = env_->DeleteFile(fname);
      if (!s.ok()) {
        Log(options_.info_log, "Delete type=%d #%lu FAILED -- %s\n",
            int(type), (unsigned long)number, s.ToString().c_str());
J
jorlow@chromium.org 已提交
716 717 718
      }
    }
  }
H
heyongqiang 已提交
719

720
  // Delete old info log files.
K
Kai Liu 已提交
721 722 723 724 725
  size_t old_log_file_count = old_log_files.size();
  // NOTE: Currently we only support log purge when options_.db_log_dir is
  // located in `dbname` directory.
  if (old_log_file_count >= options_.keep_log_file_num &&
      options_.db_log_dir.empty()) {
H
heyongqiang 已提交
726
    std::sort(old_log_files.begin(), old_log_files.end());
K
Kai Liu 已提交
727
    size_t end = old_log_file_count - options_.keep_log_file_num;
728
    for (unsigned int i = 0; i <= end; i++) {
H
heyongqiang 已提交
729
      std::string& to_delete = old_log_files.at(i);
D
Dhruba Borthakur 已提交
730 731
      // Log(options_.info_log, "Delete type=%d %s\n",
      //     int(kInfoLogFile), to_delete.c_str());
H
heyongqiang 已提交
732 733 734
      env_->DeleteFile(dbname_ + "/" + to_delete);
    }
  }
735
  PurgeObsoleteWALFiles();
I
Igor Canadi 已提交
736
  LogFlush(options_.info_log);
D
Dhruba Borthakur 已提交
737 738 739 740 741
}

void DBImpl::DeleteObsoleteFiles() {
  mutex_.AssertHeld();
  DeletionState deletion_state;
I
Igor Canadi 已提交
742
  FindObsoleteFiles(deletion_state, true);
D
Dhruba Borthakur 已提交
743
  PurgeObsoleteFiles(deletion_state);
744 745
}

746 747 748 749 750 751 752 753
// 1. Go through all archived files and
//    a. if ttl is enabled, delete outdated files
//    b. if archive size limit is enabled, delete empty files,
//        compute file number and size.
// 2. If size limit is enabled:
//    a. compute how many files should be deleted
//    b. get sorted non-empty archived logs
//    c. delete what should be deleted
754
void DBImpl::PurgeObsoleteWALFiles() {
755 756 757 758 759 760
  bool const ttl_enabled = options_.WAL_ttl_seconds > 0;
  bool const size_limit_enabled =  options_.WAL_size_limit_MB > 0;
  if (!ttl_enabled && !size_limit_enabled) {
    return;
  }

761 762
  int64_t current_time;
  Status s = env_->GetCurrentTime(&current_time);
763 764 765 766 767 768 769 770
  if (!s.ok()) {
    Log(options_.info_log, "Can't get current time: %s", s.ToString().c_str());
    assert(false);
    return;
  }
  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
  uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled) ?
    options_.WAL_ttl_seconds / 2 : default_interval_to_delete_obsolete_WAL_;
771

772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811
  if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
    return;
  }

  purge_wal_files_last_run_ = now_seconds;

  std::string archival_dir = ArchivalDirectory(options_.wal_dir);
  std::vector<std::string> files;
  s = env_->GetChildren(archival_dir, &files);
  if (!s.ok()) {
    Log(options_.info_log, "Can't get archive files: %s", s.ToString().c_str());
    assert(false);
    return;
  }

  size_t log_files_num = 0;
  uint64_t log_file_size = 0;

  for (auto& f : files) {
    uint64_t number;
    FileType type;
    if (ParseFileName(f, &number, &type) && type == kLogFile) {
      std::string const file_path = archival_dir + "/" + f;
      if (ttl_enabled) {
        uint64_t file_m_time;
        Status const s = env_->GetFileModificationTime(file_path,
          &file_m_time);
        if (!s.ok()) {
          Log(options_.info_log, "Can't get file mod time: %s: %s",
              file_path.c_str(), s.ToString().c_str());
          continue;
        }
        if (now_seconds - file_m_time > options_.WAL_ttl_seconds) {
          Status const s = env_->DeleteFile(file_path);
          if (!s.ok()) {
            Log(options_.info_log, "Can't delete file: %s: %s",
                file_path.c_str(), s.ToString().c_str());
            continue;
          }
          continue;
812
        }
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
      }

      if (size_limit_enabled) {
        uint64_t file_size;
        Status const s = env_->GetFileSize(file_path, &file_size);
        if (!s.ok()) {
          Log(options_.info_log, "Can't get file size: %s: %s",
              file_path.c_str(), s.ToString().c_str());
          return;
        } else {
          if (file_size > 0) {
            log_file_size = std::max(log_file_size, file_size);
            ++log_files_num;
          } else {
            Status s = env_->DeleteFile(file_path);
            if (!s.ok()) {
              Log(options_.info_log, "Can't delete file: %s: %s",
                  file_path.c_str(), s.ToString().c_str());
              continue;
            }
          }
        }
      }
    }
  }

  if (0 == log_files_num || !size_limit_enabled) {
    return;
  }

  size_t const files_keep_num = options_.WAL_size_limit_MB *
    1024 * 1024 / log_file_size;
  if (log_files_num <= files_keep_num) {
    return;
  }

  size_t files_del_num = log_files_num - files_keep_num;
  VectorLogPtr archived_logs;
  AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);

  if (files_del_num > archived_logs.size()) {
    Log(options_.info_log, "Trying to delete more archived log files than "
        "exist. Deleting all");
    files_del_num = archived_logs.size();
  }

  for (size_t i = 0; i < files_del_num; ++i) {
    std::string const file_path = archived_logs[i]->PathName();
    Status const s = DeleteFile(file_path);
    if (!s.ok()) {
      Log(options_.info_log, "Can't delete file: %s: %s",
          file_path.c_str(), s.ToString().c_str());
      continue;
866 867
    }
  }
D
Dhruba Borthakur 已提交
868 869
}

I
Igor Canadi 已提交
870
Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) {
J
jorlow@chromium.org 已提交
871 872
  mutex_.AssertHeld();

873
  assert(db_lock_ == nullptr);
I
Igor Canadi 已提交
874
  if (!read_only) {
875 876 877 878 879 880 881 882 883 884 885 886
    // We call CreateDirIfMissing() as the directory may already exist (if we
    // are reopening a DB), when this happens we don't want creating the
    // directory to cause an error. However, we need to check if creating the
    // directory fails or else we may get an obscure message about the lock
    // file not existing. One real-world example of this occurring is if
    // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
    // when dbname_ is "dir/db" but when "dir" doesn't exist.
    Status s = env_->CreateDirIfMissing(dbname_);
    if (!s.ok()) {
      return s;
    }

887 888 889 890 891
    s = env_->NewDirectory(dbname_, &db_directory_);
    if (!s.ok()) {
      return s;
    }

892
    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
893 894 895
    if (!s.ok()) {
      return s;
    }
J
jorlow@chromium.org 已提交
896

897 898
    if (!env_->FileExists(CurrentFileName(dbname_))) {
      if (options_.create_if_missing) {
899
        // TODO: add merge_operator name check
900 901 902 903 904 905 906
        s = NewDB();
        if (!s.ok()) {
          return s;
        }
      } else {
        return Status::InvalidArgument(
            dbname_, "does not exist (create_if_missing is false)");
J
jorlow@chromium.org 已提交
907 908
      }
    } else {
909 910 911 912
      if (options_.error_if_exists) {
        return Status::InvalidArgument(
            dbname_, "exists (error_if_exists is true)");
      }
J
jorlow@chromium.org 已提交
913
    }
M
Mayank Agarwal 已提交
914 915 916 917 918 919 920
    // Check for the IDENTITY file and create it if not there
    if (!env_->FileExists(IdentityFileName(dbname_))) {
      s = SetIdentityFile(env_, dbname_);
      if (!s.ok()) {
        return s;
      }
    }
J
jorlow@chromium.org 已提交
921 922
  }

923
  Status s = versions_->Recover();
J
jorlow@chromium.org 已提交
924 925
  if (s.ok()) {
    SequenceNumber max_sequence(0);
926 927 928 929 930 931 932

    // Recover from all newer log files than the ones named in the
    // descriptor (new log files may have been added by the previous
    // incarnation without registering them in the descriptor).
    //
    // Note that PrevLogNumber() is no longer used, but we pay
    // attention to it in case we are recovering a database
933
    // produced by an older version of rocksdb.
934 935 936
    const uint64_t min_log = versions_->LogNumber();
    const uint64_t prev_log = versions_->PrevLogNumber();
    std::vector<std::string> filenames;
937
    s = env_->GetChildren(options_.wal_dir, &filenames);
938 939
    if (!s.ok()) {
      return s;
940
    }
K
kailiu 已提交
941

942 943
    std::vector<uint64_t> logs;
    for (size_t i = 0; i < filenames.size(); i++) {
K
kailiu 已提交
944 945
      uint64_t number;
      FileType type;
946 947 948 949 950
      if (ParseFileName(filenames[i], &number, &type)
          && type == kLogFile
          && ((number >= min_log) || (number == prev_log))) {
        logs.push_back(number);
      }
J
jorlow@chromium.org 已提交
951
    }
952

H
heyongqiang 已提交
953 954 955 956 957 958
    if (logs.size() > 0 && error_if_log_file_exist) {
      return Status::Corruption(""
          "The db was opened in readonly mode with error_if_log_file_exist"
          "flag but a log file already exists");
    }

959 960
    // Recover in the order in which the logs were generated
    std::sort(logs.begin(), logs.end());
K
kailiu 已提交
961
    for (const auto& log : logs) {
962 963 964
      // The previous incarnation may not have written any MANIFEST
      // records after allocating this log number.  So we manually
      // update the file number allocation counter in VersionSet.
K
kailiu 已提交
965
      versions_->MarkFileNumberUsed(log);
K
Kai Liu 已提交
966
      s = RecoverLogFile(log, &max_sequence, read_only);
967 968
    }

J
jorlow@chromium.org 已提交
969
    if (s.ok()) {
970 971 972
      if (versions_->LastSequence() < max_sequence) {
        versions_->SetLastSequence(max_sequence);
      }
973
      SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
974
                     versions_->LastSequence());
J
jorlow@chromium.org 已提交
975 976 977 978 979 980
    }
  }

  return s;
}

I
Igor Canadi 已提交
981 982
Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
                              bool read_only) {
J
jorlow@chromium.org 已提交
983 984
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
985
    Logger* info_log;
J
jorlow@chromium.org 已提交
986
    const char* fname;
987 988
    Status* status;  // nullptr if options_.paranoid_checks==false or
                     //            options_.skip_log_error_on_recovery==true
J
jorlow@chromium.org 已提交
989
    virtual void Corruption(size_t bytes, const Status& s) {
990
      Log(info_log, "%s%s: dropping %d bytes; %s",
991
          (this->status == nullptr ? "(ignoring error) " : ""),
J
jorlow@chromium.org 已提交
992
          fname, static_cast<int>(bytes), s.ToString().c_str());
993
      if (this->status != nullptr && this->status->ok()) *this->status = s;
J
jorlow@chromium.org 已提交
994 995 996 997 998
    }
  };

  mutex_.AssertHeld();

I
Igor Canadi 已提交
999 1000
  VersionEdit edit;

J
jorlow@chromium.org 已提交
1001
  // Open the log file
1002
  std::string fname = LogFileName(options_.wal_dir, log_number);
1003
  unique_ptr<SequentialFile> file;
1004
  Status status = env_->NewSequentialFile(fname, &file, storage_options_);
J
jorlow@chromium.org 已提交
1005 1006 1007 1008 1009 1010 1011 1012
  if (!status.ok()) {
    MaybeIgnoreError(&status);
    return status;
  }

  // Create the log reader.
  LogReporter reporter;
  reporter.env = env_;
1013
  reporter.info_log = options_.info_log.get();
J
jorlow@chromium.org 已提交
1014
  reporter.fname = fname.c_str();
1015 1016
  reporter.status = (options_.paranoid_checks &&
                     !options_.skip_log_error_on_recovery ? &status : nullptr);
J
jorlow@chromium.org 已提交
1017 1018 1019 1020
  // We intentially make log::Reader do checksumming even if
  // paranoid_checks==false so that corruptions cause entire commits
  // to be skipped instead of propagating bad information (like overly
  // large sequence numbers).
1021
  log::Reader reader(std::move(file), &reporter, true/*checksum*/,
1022
                     0/*initial_offset*/);
K
Kai Liu 已提交
1023 1024
  Log(options_.info_log, "Recovering log #%lu",
      (unsigned long) log_number);
J
jorlow@chromium.org 已提交
1025 1026 1027 1028 1029

  // Read all the records and add to a memtable
  std::string scratch;
  Slice record;
  WriteBatch batch;
I
Igor Canadi 已提交
1030 1031
  bool memtable_empty = true;
  while (reader.ReadRecord(&record, &scratch)) {
J
jorlow@chromium.org 已提交
1032 1033 1034 1035 1036 1037 1038
    if (record.size() < 12) {
      reporter.Corruption(
          record.size(), Status::Corruption("log record too small"));
      continue;
    }
    WriteBatchInternal::SetContents(&batch, record);

I
Igor Canadi 已提交
1039 1040
    status = WriteBatchInternal::InsertInto(&batch, mem_, &options_);
    memtable_empty = false;
J
jorlow@chromium.org 已提交
1041 1042
    MaybeIgnoreError(&status);
    if (!status.ok()) {
I
Igor Canadi 已提交
1043
      return status;
J
jorlow@chromium.org 已提交
1044 1045 1046 1047 1048 1049 1050 1051
    }
    const SequenceNumber last_seq =
        WriteBatchInternal::Sequence(&batch) +
        WriteBatchInternal::Count(&batch) - 1;
    if (last_seq > *max_sequence) {
      *max_sequence = last_seq;
    }

I
Igor Canadi 已提交
1052 1053 1054 1055 1056 1057 1058 1059
    if (!read_only &&
        mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
      status = WriteLevel0TableForRecovery(mem_, &edit);
      // we still want to clear memtable, even if the recovery failed
      delete mem_->Unref();
      mem_ = new MemTable(internal_comparator_, options_);
      mem_->Ref();
      memtable_empty = true;
J
jorlow@chromium.org 已提交
1060 1061 1062
      if (!status.ok()) {
        // Reflect errors immediately so that conditions like full
        // file-systems cause the DB::Open() to fail.
I
Igor Canadi 已提交
1063
        return status;
J
jorlow@chromium.org 已提交
1064 1065 1066 1067
      }
    }
  }

I
Igor Canadi 已提交
1068 1069 1070 1071 1072 1073 1074 1075
  if (!memtable_empty && !read_only) {
    status = WriteLevel0TableForRecovery(mem_, &edit);
    delete mem_->Unref();
    mem_ = new MemTable(internal_comparator_, options_);
    mem_->Ref();
    if (!status.ok()) {
      return status;
    }
J
jorlow@chromium.org 已提交
1076 1077
  }

I
Igor Canadi 已提交
1078 1079 1080 1081 1082 1083 1084 1085 1086
  if (edit.NumEntries() > 0) {
    // if read_only, NumEntries() will be 0
    assert(!read_only);
    // writing log number in the manifest means that any log file
    // with number strongly less than (log_number + 1) is already
    // recovered and should be ignored on next reincarnation.
    // Since we already recovered log_number, we want all logs
    // with numbers `<= log_number` (includes this one) to be ignored
    edit.SetLogNumber(log_number + 1);
1087 1088 1089 1090 1091
    // we must mark the next log number as used, even though it's
    // not actually used. that is because VersionSet assumes
    // VersionSet::next_file_number_ always to be strictly greater than any log
    // number
    versions_->MarkFileNumberUsed(log_number + 1);
I
Igor Canadi 已提交
1092
    status = versions_->LogAndApply(&edit, &mutex_);
1093
  }
I
Igor Canadi 已提交
1094

J
jorlow@chromium.org 已提交
1095 1096 1097
  return status;
}

1098
Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
J
jorlow@chromium.org 已提交
1099
  mutex_.AssertHeld();
1100
  const uint64_t start_micros = env_->NowMicros();
J
jorlow@chromium.org 已提交
1101 1102 1103 1104
  FileMetaData meta;
  meta.number = versions_->NewFileNumber();
  pending_outputs_.insert(meta.number);
  Iterator* iter = mem->NewIterator();
1105 1106 1107
  const SequenceNumber newest_snapshot = snapshots_.GetNewest();
  const SequenceNumber earliest_seqno_in_memtable =
    mem->GetFirstSequenceNumber();
K
Kai Liu 已提交
1108 1109
  Log(options_.info_log, "Level-0 table #%lu: started",
      (unsigned long) meta.number);
1110 1111 1112 1113

  Status s;
  {
    mutex_.Unlock();
1114
    s = BuildTable(dbname_, env_, options_, storage_options_,
1115 1116
                   table_cache_.get(), iter, &meta, internal_comparator_,
                   newest_snapshot, earliest_seqno_in_memtable,
1117
                   GetCompressionFlush(options_));
I
Igor Canadi 已提交
1118
    LogFlush(options_.info_log);
1119 1120 1121
    mutex_.Lock();
  }

K
Kai Liu 已提交
1122 1123 1124
  Log(options_.info_log, "Level-0 table #%lu: %lu bytes %s",
      (unsigned long) meta.number,
      (unsigned long) meta.file_size,
J
jorlow@chromium.org 已提交
1125 1126
      s.ToString().c_str());
  delete iter;
1127

1128
  pending_outputs_.erase(meta.number);
1129 1130 1131 1132 1133 1134

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.file_size > 0) {
    edit->AddFile(level, meta.number, meta.file_size,
1135 1136
                  meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
1137 1138
  }

I
Igor Canadi 已提交
1139
  InternalStats::CompactionStats stats;
1140 1141
  stats.micros = env_->NowMicros() - start_micros;
  stats.bytes_written = meta.file_size;
M
Mark Callaghan 已提交
1142
  stats.files_out_levelnp1 = 1;
I
Igor Canadi 已提交
1143
  internal_stats_.AddCompactionStats(level, stats);
1144
  RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
J
jorlow@chromium.org 已提交
1145 1146 1147
  return s;
}

1148

K
Kai Liu 已提交
1149
Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
1150
                                uint64_t* filenumber) {
J
jorlow@chromium.org 已提交
1151
  mutex_.AssertHeld();
1152 1153 1154 1155 1156
  const uint64_t start_micros = env_->NowMicros();
  FileMetaData meta;
  meta.number = versions_->NewFileNumber();
  *filenumber = meta.number;
  pending_outputs_.insert(meta.number);
1157

1158 1159
  const SequenceNumber newest_snapshot = snapshots_.GetNewest();
  const SequenceNumber earliest_seqno_in_memtable =
1160
    mems[0]->GetFirstSequenceNumber();
1161
  Version* base = versions_->current();
1162
  base->Ref();          // it is likely that we do not need this reference
1163 1164 1165
  Status s;
  {
    mutex_.Unlock();
K
Kai Liu 已提交
1166
    std::vector<Iterator*> memtables;
1167 1168 1169 1170
    for (MemTable* m : mems) {
      Log(options_.info_log,
          "Flushing memtable with log file: %lu\n",
          (unsigned long)m->GetLogNumber());
K
Kai Liu 已提交
1171
      memtables.push_back(m->NewIterator());
1172
    }
K
Kai Liu 已提交
1173 1174
    Iterator* iter = NewMergingIterator(
        env_, &internal_comparator_, &memtables[0], memtables.size());
1175 1176 1177
    Log(options_.info_log,
        "Level-0 flush table #%lu: started",
        (unsigned long)meta.number);
1178

1179
    s = BuildTable(dbname_, env_, options_, storage_options_,
1180 1181 1182
                   table_cache_.get(), iter, &meta, internal_comparator_,
                   newest_snapshot, earliest_seqno_in_memtable,
                   GetCompressionFlush(options_));
I
Igor Canadi 已提交
1183
    LogFlush(options_.info_log);
1184 1185 1186 1187 1188
    delete iter;
    Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s",
        (unsigned long) meta.number,
        (unsigned long) meta.file_size,
        s.ToString().c_str());
1189 1190 1191
    if (!options_.disableDataSync) {
      db_directory_->Fsync();
    }
1192 1193
    mutex_.Lock();
  }
1194 1195
  base->Unref();

1196 1197 1198 1199 1200 1201 1202 1203
  // re-acquire the most current version
  base = versions_->current();

  // There could be multiple threads writing to its own level-0 file.
  // The pending_outputs cannot be cleared here, otherwise this newly
  // created file might not be considered as a live-file by another
  // compaction thread that is concurrently deleting obselete files.
  // The pending_outputs can be cleared only after the new version is
A
Abhishek Kona 已提交
1204
  // committed so that other threads can recognize this file as a
1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
  // valid one.
  // pending_outputs_.erase(meta.number);

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.file_size > 0) {
    const Slice min_user_key = meta.smallest.user_key();
    const Slice max_user_key = meta.largest.user_key();
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
1218
    if (base != nullptr && options_.max_background_compactions <= 1 &&
1219
        options_.compaction_style == kCompactionStyleLevel) {
1220 1221 1222
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
    }
    edit->AddFile(level, meta.number, meta.file_size,
1223 1224
                  meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
1225 1226
  }

I
Igor Canadi 已提交
1227
  InternalStats::CompactionStats stats;
1228 1229
  stats.micros = env_->NowMicros() - start_micros;
  stats.bytes_written = meta.file_size;
I
Igor Canadi 已提交
1230
  internal_stats_.AddCompactionStats(level, stats);
1231
  RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES, meta.file_size);
1232 1233 1234
  return s;
}

I
Igor Canadi 已提交
1235 1236
Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
                                         DeletionState& deletion_state) {
1237 1238
  mutex_.AssertHeld();
  assert(imm_.size() != 0);
L
Lei Jin 已提交
1239
  assert(imm_.IsFlushPending());
1240 1241 1242

  // Save the contents of the earliest memtable as a new Table
  uint64_t file_number;
K
Kai Liu 已提交
1243
  autovector<MemTable*> mems;
1244 1245
  imm_.PickMemtablesToFlush(&mems);
  if (mems.empty()) {
1246
    Log(options_.info_log, "Nothing in memstore to flush");
L
Lei Jin 已提交
1247
    return Status::OK();
1248 1249 1250
  }

  // record the logfile_number_ before we release the mutex
1251 1252 1253
  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
1254
  MemTable* m = mems[0];
1255 1256
  VersionEdit* edit = m->GetEdits();
  edit->SetPrevLogNumber(0);
1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit->SetLogNumber(
      mems.back()->GetNextLogNumber()
  );

  std::vector<uint64_t> logs_to_delete;
  for (auto mem : mems) {
    logs_to_delete.push_back(mem->GetLogNumber());
  }
1267

1268
  // This will release and re-acquire the mutex.
1269
  Status s = WriteLevel0Table(mems, edit, &file_number);
1270

1271
  if (s.ok() && shutting_down_.Acquire_Load()) {
L
Lei Jin 已提交
1272
    s = Status::ShutdownInProgress(
1273 1274
      "Database shutdown started during memtable compaction"
    );
1275
  }
J
jorlow@chromium.org 已提交
1276

L
Lei Jin 已提交
1277 1278 1279 1280 1281
  if (!s.ok()) {
    imm_.RollbackMemtableFlush(mems, file_number, &pending_outputs_);
    return s;
  }

1282
  // Replace immutable memtable with the generated Table
1283
  s = imm_.InstallMemtableFlushResults(
L
Lei Jin 已提交
1284
      mems, versions_.get(), &mutex_, options_.info_log.get(), file_number,
1285
      pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
J
jorlow@chromium.org 已提交
1286 1287

  if (s.ok()) {
I
Igor Canadi 已提交
1288
    InstallSuperVersion(deletion_state);
1289 1290 1291
    if (madeProgress) {
      *madeProgress = 1;
    }
1292

1293
    MaybeScheduleLogDBDeployStats();
I
Igor Canadi 已提交
1294

1295
    if (disable_delete_obsolete_files_ == 0) {
I
Igor Canadi 已提交
1296
      // add to deletion state
1297 1298 1299 1300
      deletion_state.log_delete_files.insert(
          deletion_state.log_delete_files.end(),
          logs_to_delete.begin(),
          logs_to_delete.end());
1301
    }
J
jorlow@chromium.org 已提交
1302 1303 1304 1305
  }
  return s;
}

L
Lei Jin 已提交
1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
Status DBImpl::CompactRange(const Slice* begin,
                            const Slice* end,
                            bool reduce_level,
                            int target_level) {
  Status s = FlushMemTable(FlushOptions());
  if (!s.ok()) {
    LogFlush(options_.info_log);
    return s;
  }

G
Gabor Cselle 已提交
1316 1317 1318 1319
  int max_level_with_files = 1;
  {
    MutexLock l(&mutex_);
    Version* base = versions_->current();
1320
    for (int level = 1; level < NumberLevels(); level++) {
G
Gabor Cselle 已提交
1321 1322 1323 1324 1325
      if (base->OverlapInLevel(level, begin, end)) {
        max_level_with_files = level;
      }
    }
  }
1326 1327 1328 1329 1330
  for (int level = 0; level <= max_level_with_files; level++) {
    // in case the compaction is unversal or if we're compacting the
    // bottom-most level, the output level will be the same as input one
    if (options_.compaction_style == kCompactionStyleUniversal ||
        level == max_level_with_files) {
L
Lei Jin 已提交
1331
      s = RunManualCompaction(level, level, begin, end);
1332
    } else {
L
Lei Jin 已提交
1333 1334 1335 1336 1337
      s = RunManualCompaction(level, level + 1, begin, end);
    }
    if (!s.ok()) {
      LogFlush(options_.info_log);
      return s;
1338
    }
G
Gabor Cselle 已提交
1339
  }
1340 1341

  if (reduce_level) {
L
Lei Jin 已提交
1342
    s = ReFitLevel(max_level_with_files, target_level);
1343
  }
I
Igor Canadi 已提交
1344
  LogFlush(options_.info_log);
L
Lei Jin 已提交
1345 1346

  return s;
1347 1348 1349 1350 1351
}

// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(int level) {
  mutex_.AssertHeld();
1352
  Version* current = versions_->current();
1353
  int minimum_level = level;
1354
  for (int i = level - 1; i > 0; --i) {
1355
    // stop if level i is not empty
1356
    if (current->NumLevelFiles(i) > 0) break;
1357
    // stop if level i is too small (cannot fit the level files)
1358
    if (versions_->MaxBytesForLevel(i) < current->NumLevelBytes(level)) break;
1359 1360 1361 1362 1363 1364

    minimum_level = i;
  }
  return minimum_level;
}

L
Lei Jin 已提交
1365
Status DBImpl::ReFitLevel(int level, int target_level) {
1366 1367
  assert(level < NumberLevels());

I
Igor Canadi 已提交
1368
  SuperVersion* superversion_to_free = nullptr;
K
Kai Liu 已提交
1369
  SuperVersion* new_superversion = new SuperVersion();
I
Igor Canadi 已提交
1370 1371

  mutex_.Lock();
1372 1373 1374

  // only allow one thread refitting
  if (refitting_level_) {
I
Igor Canadi 已提交
1375
    mutex_.Unlock();
1376
    Log(options_.info_log, "ReFitLevel: another thread is refitting");
I
Igor Canadi 已提交
1377
    delete new_superversion;
L
Lei Jin 已提交
1378
    return Status::NotSupported("another thread is refitting");
1379 1380 1381 1382 1383
  }
  refitting_level_ = true;

  // wait for all background threads to stop
  bg_work_gate_closed_ = true;
1384
  while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) {
1385
    Log(options_.info_log,
1386 1387
        "RefitLevel: waiting for background threads to stop: %d %d",
        bg_compaction_scheduled_, bg_flush_scheduled_);
1388 1389 1390 1391
    bg_cv_.Wait();
  }

  // move to a smaller level
1392 1393 1394 1395
  int to_level = target_level;
  if (target_level < 0) {
    to_level = FindMinimumEmptyLevelFitting(level);
  }
1396 1397 1398

  assert(to_level <= level);

L
Lei Jin 已提交
1399
  Status status;
1400 1401 1402 1403
  if (to_level < level) {
    Log(options_.info_log, "Before refitting:\n%s",
        versions_->current()->DebugString().data());

1404
    VersionEdit edit;
1405 1406
    for (const auto& f : versions_->current()->files_[level]) {
      edit.DeleteFile(level, f->number);
1407 1408
      edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
                   f->smallest_seqno, f->largest_seqno);
1409 1410 1411 1412
    }
    Log(options_.info_log, "Apply version edit:\n%s",
        edit.DebugString().data());

1413
    status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
I
Igor Canadi 已提交
1414 1415
    superversion_to_free = InstallSuperVersion(new_superversion);
    new_superversion = nullptr;
1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426

    Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());

    if (status.ok()) {
      Log(options_.info_log, "After refitting:\n%s",
          versions_->current()->DebugString().data());
    }
  }

  refitting_level_ = false;
  bg_work_gate_closed_ = false;
I
Igor Canadi 已提交
1427 1428 1429 1430

  mutex_.Unlock();
  delete superversion_to_free;
  delete new_superversion;
L
Lei Jin 已提交
1431
  return status;
G
Gabor Cselle 已提交
1432 1433
}

1434
int DBImpl::NumberLevels() {
1435
  return options_.num_levels;
1436 1437 1438
}

int DBImpl::MaxMemCompactionLevel() {
1439
  return options_.max_mem_compaction_level;
1440 1441 1442
}

int DBImpl::Level0StopWriteTrigger() {
1443
  return options_.level0_stop_writes_trigger;
1444 1445
}

T
Tomislav Novak 已提交
1446 1447 1448 1449
uint64_t DBImpl::CurrentVersionNumber() const {
  return super_version_number_.load();
}

H
heyongqiang 已提交
1450
Status DBImpl::Flush(const FlushOptions& options) {
K
kailiu 已提交
1451
  return FlushMemTable(options);
H
heyongqiang 已提交
1452 1453
}

1454
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1455 1456 1457
  return versions_->LastSequence();
}

1458
Status DBImpl::GetUpdatesSince(SequenceNumber seq,
1459
                               unique_ptr<TransactionLogIterator>* iter) {
1460

1461
  RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS);
1462
  if (seq > versions_->LastSequence()) {
L
Lei Jin 已提交
1463 1464
    return Status::NotFound(
        "Requested sequence not yet written in the db");
1465 1466
  }
  //  Get all sorted Wal Files.
1467 1468
  //  Do binary search and open files and find the seq number.

1469 1470
  std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
  Status s = GetSortedWalFiles(*wal_files);
1471 1472 1473 1474
  if (!s.ok()) {
    return s;
  }

1475
  s = RetainProbableWalFiles(*wal_files, seq);
1476 1477
  if (!s.ok()) {
    return s;
1478
  }
1479
  iter->reset(
1480
    new TransactionLogIteratorImpl(options_.wal_dir,
1481
                                   &options_,
1482
                                   storage_options_,
1483
                                   seq,
1484
                                   std::move(wal_files),
1485
                                   this));
1486
  return (*iter)->status();
1487 1488
}

1489 1490
Status DBImpl::RetainProbableWalFiles(VectorLogPtr& all_logs,
                                      const SequenceNumber target) {
1491
  long start = 0; // signed to avoid overflow when target is < first file.
1492
  long end = static_cast<long>(all_logs.size()) - 1;
1493
  // Binary Search. avoid opening all files.
1494 1495
  while (end >= start) {
    long mid = start + (end - start) / 2;  // Avoid overflow.
1496 1497
    SequenceNumber current_seq_num = all_logs.at(mid)->StartSequence();
    if (current_seq_num == target) {
1498
      end = mid;
1499
      break;
1500
    } else if (current_seq_num < target) {
1501
      start = mid + 1;
1502
    } else {
1503
      end = mid - 1;
1504 1505
    }
  }
1506 1507 1508
  size_t start_index = std::max(0l, end); // end could be -ve.
  // The last wal file is always included
  all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
1509 1510 1511
  return Status::OK();
}

1512 1513 1514
bool DBImpl::CheckWalFileExistsAndEmpty(const WalFileType type,
                                        const uint64_t number) {
  const std::string fname = (type == kAliveLogFile) ?
1515 1516
    LogFileName(options_.wal_dir, number) :
    ArchivedLogFileName(options_.wal_dir, number);
1517 1518
  uint64_t file_size;
  Status s = env_->GetFileSize(fname, &file_size);
1519
  return (s.ok() && (file_size == 0));
1520 1521
}

1522 1523
Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
                               WriteBatch* const result) {
1524

1525
  if (type == kAliveLogFile) {
1526
    std::string fname = LogFileName(options_.wal_dir, number);
1527
    Status status = ReadFirstLine(fname, result);
L
Lei Jin 已提交
1528 1529 1530
    if (status.ok() || env_->FileExists(fname)) {
      // return OK or any error that is not caused non-existing file
      return status;
1531
    }
L
Lei Jin 已提交
1532 1533 1534 1535 1536 1537 1538 1539 1540

    //  check if the file got moved to archive.
    std::string archived_file =
      ArchivedLogFileName(options_.wal_dir, number);
    Status s = ReadFirstLine(archived_file, result);
    if (s.ok() || env_->FileExists(archived_file)) {
      return s;
    }
    return Status::NotFound("Log File has been deleted: " + archived_file);
1541
  } else if (type == kArchivedLogFile) {
1542
    std::string fname = ArchivedLogFileName(options_.wal_dir, number);
1543 1544 1545
    Status status = ReadFirstLine(fname, result);
    return status;
  }
1546
  return Status::NotSupported("File Type Not Known: " + std::to_string(type));
1547 1548 1549 1550 1551 1552 1553 1554
}

Status DBImpl::ReadFirstLine(const std::string& fname,
                             WriteBatch* const batch) {
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
    Logger* info_log;
    const char* fname;
L
Lei Jin 已提交
1555 1556 1557

    Status* status;
    bool ignore_error;  // true if options_.paranoid_checks==false
1558 1559
    virtual void Corruption(size_t bytes, const Status& s) {
      Log(info_log, "%s%s: dropping %d bytes; %s",
L
Lei Jin 已提交
1560
          (this->ignore_error ? "(ignoring error) " : ""),
1561
          fname, static_cast<int>(bytes), s.ToString().c_str());
L
Lei Jin 已提交
1562 1563 1564 1565
      if (this->status->ok()) {
        // only keep the first error
        *this->status = s;
      }
1566 1567 1568
    }
  };

1569
  unique_ptr<SequentialFile> file;
1570
  Status status = env_->NewSequentialFile(fname, &file, storage_options_);
1571 1572 1573 1574 1575 1576 1577 1578

  if (!status.ok()) {
    return status;
  }


  LogReporter reporter;
  reporter.env = env_;
1579
  reporter.info_log = options_.info_log.get();
1580
  reporter.fname = fname.c_str();
L
Lei Jin 已提交
1581 1582
  reporter.status = &status;
  reporter.ignore_error = !options_.paranoid_checks;
1583
  log::Reader reader(std::move(file), &reporter, true/*checksum*/,
1584 1585 1586
                     0/*initial_offset*/);
  std::string scratch;
  Slice record;
1587

L
Lei Jin 已提交
1588 1589
  if (reader.ReadRecord(&record, &scratch) &&
      (status.ok() || !options_.paranoid_checks)) {
1590 1591 1592 1593
    if (record.size() < 12) {
      reporter.Corruption(
          record.size(), Status::Corruption("log record too small"));
      //  TODO read record's till the first no corrupt entry?
L
Lei Jin 已提交
1594 1595 1596
    } else {
      WriteBatchInternal::SetContents(batch, record);
      return Status::OK();
1597 1598
    }
  }
L
Lei Jin 已提交
1599 1600 1601 1602 1603 1604

  // ReadRecord returns false on EOF, which is deemed as OK() by Reader
  if (status.ok()) {
    status = Status::Corruption("eof reached");
  }
  return status;
1605 1606
}

1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619
struct CompareLogByPointer {
  bool operator() (const unique_ptr<LogFile>& a,
                   const unique_ptr<LogFile>& b) {
    LogFileImpl* a_impl = dynamic_cast<LogFileImpl*>(a.get());
    LogFileImpl* b_impl = dynamic_cast<LogFileImpl*>(b.get());
    return *a_impl < *b_impl;
  }
};

Status DBImpl::AppendSortedWalsOfType(const std::string& path,
    VectorLogPtr& log_files, WalFileType log_type) {
  std::vector<std::string> all_files;
  const Status status = env_->GetChildren(path, &all_files);
1620 1621 1622
  if (!status.ok()) {
    return status;
  }
1623
  log_files.reserve(log_files.size() + all_files.size());
1624 1625 1626 1627 1628 1629
  VectorLogPtr::iterator pos_start;
  if (!log_files.empty()) {
    pos_start = log_files.end() - 1;
  } else {
    pos_start = log_files.begin();
  }
1630
  for (const auto& f : all_files) {
1631 1632
    uint64_t number;
    FileType type;
1633
    if (ParseFileName(f, &number, &type) && type == kLogFile){
1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651

      WriteBatch batch;
      Status s = ReadFirstRecord(log_type, number, &batch);
      if (!s.ok()) {
        if (CheckWalFileExistsAndEmpty(log_type, number)) {
          continue;
        }
        return s;
      }

      uint64_t size_bytes;
      s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
      if (!s.ok()) {
        return s;
      }

      log_files.push_back(std::move(unique_ptr<LogFile>(new LogFileImpl(
        number, log_type, WriteBatchInternal::Sequence(&batch), size_bytes))));
1652 1653
    }
  }
1654
  CompareLogByPointer compare_log_files;
1655
  std::sort(pos_start, log_files.end(), compare_log_files);
1656 1657 1658
  return status;
}

L
Lei Jin 已提交
1659 1660 1661 1662
Status DBImpl::RunManualCompaction(int input_level,
                                   int output_level,
                                   const Slice* begin,
                                   const Slice* end) {
1663
  assert(input_level >= 0);
1664

G
Gabor Cselle 已提交
1665 1666
  InternalKey begin_storage, end_storage;

H
hans@chromium.org 已提交
1667
  ManualCompaction manual;
1668 1669
  manual.input_level = input_level;
  manual.output_level = output_level;
G
Gabor Cselle 已提交
1670
  manual.done = false;
1671
  manual.in_progress = false;
1672 1673 1674 1675
  // For universal compaction, we enforce every manual compaction to compact
  // all files.
  if (begin == nullptr ||
      options_.compaction_style == kCompactionStyleUniversal) {
1676
    manual.begin = nullptr;
G
Gabor Cselle 已提交
1677 1678 1679 1680
  } else {
    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
    manual.begin = &begin_storage;
  }
1681 1682
  if (end == nullptr ||
      options_.compaction_style == kCompactionStyleUniversal) {
1683
    manual.end = nullptr;
G
Gabor Cselle 已提交
1684 1685 1686 1687 1688 1689
  } else {
    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
    manual.end = &end_storage;
  }

  MutexLock l(&mutex_);
1690

1691 1692 1693 1694 1695 1696
  // When a manual compaction arrives, temporarily disable scheduling of
  // non-manual compactions and wait until the number of scheduled compaction
  // jobs drops to zero. This is needed to ensure that this manual compaction
  // can compact any range of keys/files.
  //
  // bg_manual_only_ is non-zero when at least one thread is inside
1697
  // RunManualCompaction(), i.e. during that time no other compaction will
1698 1699 1700
  // get scheduled (see MaybeScheduleFlushOrCompaction).
  //
  // Note that the following loop doesn't stop more that one thread calling
1701
  // RunManualCompaction() from getting to the second while loop below.
1702 1703 1704 1705 1706 1707 1708 1709
  // However, only one of them will actually schedule compaction, while
  // others will wait on a condition variable until it completes.

  ++bg_manual_only_;
  while (bg_compaction_scheduled_ > 0) {
    Log(options_.info_log,
        "Manual compaction waiting for all other scheduled background "
        "compactions to finish");
1710 1711
    bg_cv_.Wait();
  }
1712

1713 1714
  Log(options_.info_log, "Manual compaction starting");

1715 1716 1717 1718
  while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
    assert(bg_manual_only_ > 0);
    if (manual_compaction_ != nullptr) {
      // Running either this or some other manual compaction
G
Gabor Cselle 已提交
1719
      bg_cv_.Wait();
1720 1721 1722
    } else {
      manual_compaction_ = &manual;
      MaybeScheduleFlushOrCompaction();
G
Gabor Cselle 已提交
1723
    }
H
hans@chromium.org 已提交
1724
  }
1725

1726 1727 1728
  assert(!manual.in_progress);
  assert(bg_manual_only_ > 0);
  --bg_manual_only_;
L
Lei Jin 已提交
1729
  return manual.status;
J
jorlow@chromium.org 已提交
1730 1731
}

L
Lei Jin 已提交
1732 1733 1734
Status DBImpl::TEST_CompactRange(int level,
                                 const Slice* begin,
                                 const Slice* end) {
1735 1736 1737
  int output_level = (options_.compaction_style == kCompactionStyleUniversal)
                         ? level
                         : level + 1;
L
Lei Jin 已提交
1738
  return RunManualCompaction(level, output_level, begin, end);
1739 1740
}

H
heyongqiang 已提交
1741
Status DBImpl::FlushMemTable(const FlushOptions& options) {
1742 1743
  // nullptr batch means just wait for earlier writes to be done
  Status s = Write(WriteOptions(), nullptr);
H
heyongqiang 已提交
1744
  if (s.ok() && options.wait) {
1745
    // Wait until the compaction completes
1746
    s = WaitForFlushMemTable();
1747 1748
  }
  return s;
J
jorlow@chromium.org 已提交
1749 1750
}

1751
Status DBImpl::WaitForFlushMemTable() {
1752 1753 1754
  Status s;
  // Wait until the compaction completes
  MutexLock l(&mutex_);
1755
  while (imm_.size() > 0 && bg_error_.ok()) {
1756 1757
    bg_cv_.Wait();
  }
1758
  if (imm_.size() != 0) {
1759 1760 1761
    s = bg_error_;
  }
  return s;
H
heyongqiang 已提交
1762 1763
}

1764
Status DBImpl::TEST_FlushMemTable() {
H
heyongqiang 已提交
1765 1766 1767
  return FlushMemTable(FlushOptions());
}

1768 1769
Status DBImpl::TEST_WaitForFlushMemTable() {
  return WaitForFlushMemTable();
1770 1771 1772
}

Status DBImpl::TEST_WaitForCompact() {
1773
  // Wait until the compaction completes
1774 1775 1776 1777 1778

  // TODO: a bug here. This function actually does not necessarily
  // wait for compact. It actually waits for scheduled compaction
  // OR flush to finish.

1779
  MutexLock l(&mutex_);
1780 1781
  while ((bg_compaction_scheduled_ || bg_flush_scheduled_) &&
         bg_error_.ok()) {
1782 1783 1784
    bg_cv_.Wait();
  }
  return bg_error_;
1785 1786
}

1787
void DBImpl::MaybeScheduleFlushOrCompaction() {
J
jorlow@chromium.org 已提交
1788
  mutex_.AssertHeld();
1789 1790
  if (bg_work_gate_closed_) {
    // gate closed for backgrond work
J
jorlow@chromium.org 已提交
1791 1792 1793
  } else if (shutting_down_.Acquire_Load()) {
    // DB is being deleted; no more background compactions
  } else {
I
Igor Canadi 已提交
1794
    bool is_flush_pending = imm_.IsFlushPending();
1795 1796 1797 1798 1799 1800 1801
    if (is_flush_pending &&
        (bg_flush_scheduled_ < options_.max_background_flushes)) {
      // memtable flush needed
      bg_flush_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
    }

1802 1803 1804 1805
    // Schedule BGWorkCompaction if there's a compaction pending (or a memtable
    // flush, but the HIGH pool is not enabled). Do it only if
    // max_background_compactions hasn't been reached and, in case
    // bg_manual_only_ > 0, if it's a manual compaction.
1806
    if ((manual_compaction_ ||
1807
         versions_->current()->NeedsCompaction() ||
1808
         (is_flush_pending && (options_.max_background_flushes <= 0))) &&
1809 1810 1811
        bg_compaction_scheduled_ < options_.max_background_compactions &&
        (!bg_manual_only_ || manual_compaction_)) {

1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825
      bg_compaction_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
    }
  }
}

void DBImpl::BGWorkFlush(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
}

void DBImpl::BGWorkCompaction(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}

I
Igor Canadi 已提交
1826 1827
Status DBImpl::BackgroundFlush(bool* madeProgress,
                               DeletionState& deletion_state) {
1828
  Status stat;
I
Igor Canadi 已提交
1829
  while (stat.ok() && imm_.IsFlushPending()) {
1830
    Log(options_.info_log,
1831
        "BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
1832
        options_.max_background_flushes - bg_flush_scheduled_);
I
Igor Canadi 已提交
1833
    stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
J
jorlow@chromium.org 已提交
1834
  }
1835
  return stat;
J
jorlow@chromium.org 已提交
1836 1837
}

1838
void DBImpl::BackgroundCallFlush() {
1839
  bool madeProgress = false;
K
Kai Liu 已提交
1840
  DeletionState deletion_state(true);
1841 1842 1843
  assert(bg_flush_scheduled_);
  MutexLock l(&mutex_);

I
Igor Canadi 已提交
1844
  Status s;
1845
  if (!shutting_down_.Acquire_Load()) {
I
Igor Canadi 已提交
1846
    s = BackgroundFlush(&madeProgress, deletion_state);
1847 1848 1849 1850 1851 1852 1853 1854 1855
    if (!s.ok()) {
      // Wait a little bit before retrying background compaction in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed compactions for the duration of
      // the problem.
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      Log(options_.info_log, "Waiting after background flush error: %s",
          s.ToString().c_str());
      mutex_.Unlock();
I
Igor Canadi 已提交
1856
      LogFlush(options_.info_log);
1857 1858 1859 1860 1861
      env_->SleepForMicroseconds(1000000);
      mutex_.Lock();
    }
  }

I
Igor Canadi 已提交
1862 1863 1864
  // If !s.ok(), this means that Flush failed. In that case, we want
  // to delete all obsolete files and we force FindObsoleteFiles()
  FindObsoleteFiles(deletion_state, !s.ok());
I
Igor Canadi 已提交
1865
  // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1866 1867 1868 1869 1870 1871
  if (deletion_state.HaveSomethingToDelete()) {
    mutex_.Unlock();
    PurgeObsoleteFiles(deletion_state);
    mutex_.Lock();
  }

1872
  bg_flush_scheduled_--;
1873 1874 1875
  if (madeProgress) {
    MaybeScheduleFlushOrCompaction();
  }
1876
  bg_cv_.SignalAll();
J
jorlow@chromium.org 已提交
1877 1878
}

1879

1880 1881 1882 1883
void DBImpl::TEST_PurgeObsoleteteWAL() {
  PurgeObsoleteWALFiles();
}

1884 1885 1886 1887 1888
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
  MutexLock l(&mutex_);
  return versions_->current()->NumLevelBytes(0);
}

1889
void DBImpl::BackgroundCallCompaction() {
1890
  bool madeProgress = false;
K
Kai Liu 已提交
1891
  DeletionState deletion_state(true);
H
Haobo Xu 已提交
1892 1893 1894

  MaybeDumpStats();

J
jorlow@chromium.org 已提交
1895
  MutexLock l(&mutex_);
1896
  // Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self());
J
jorlow@chromium.org 已提交
1897
  assert(bg_compaction_scheduled_);
I
Igor Canadi 已提交
1898
  Status s;
H
hans@chromium.org 已提交
1899
  if (!shutting_down_.Acquire_Load()) {
I
Igor Canadi 已提交
1900
    s = BackgroundCompaction(&madeProgress, deletion_state);
1901 1902 1903 1904 1905 1906 1907 1908 1909
    if (!s.ok()) {
      // Wait a little bit before retrying background compaction in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed compactions for the duration of
      // the problem.
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      Log(options_.info_log, "Waiting after background compaction error: %s",
          s.ToString().c_str());
      mutex_.Unlock();
I
Igor Canadi 已提交
1910
      LogFlush(options_.info_log);
1911 1912 1913
      env_->SleepForMicroseconds(1000000);
      mutex_.Lock();
    }
J
jorlow@chromium.org 已提交
1914
  }
1915

I
Igor Canadi 已提交
1916 1917 1918 1919 1920
  // If !s.ok(), this means that Compaction failed. In that case, we want
  // to delete all obsolete files we might have created and we force
  // FindObsoleteFiles(). This is because deletion_state does not catch
  // all created files if compaction failed.
  FindObsoleteFiles(deletion_state, !s.ok());
1921

I
Igor Canadi 已提交
1922
  // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1923
  if (deletion_state.HaveSomethingToDelete()) {
D
Dhruba Borthakur 已提交
1924 1925
    mutex_.Unlock();
    PurgeObsoleteFiles(deletion_state);
1926
    mutex_.Lock();
D
Dhruba Borthakur 已提交
1927 1928
  }

1929
  bg_compaction_scheduled_--;
J
jorlow@chromium.org 已提交
1930

1931 1932
  MaybeScheduleLogDBDeployStats();

J
jorlow@chromium.org 已提交
1933
  // Previous compaction may have produced too many files in a level,
A
Abhishek Kona 已提交
1934
  // So reschedule another compaction if we made progress in the
1935 1936
  // last compaction.
  if (madeProgress) {
1937
    MaybeScheduleFlushOrCompaction();
1938
  }
H
hans@chromium.org 已提交
1939
  bg_cv_.SignalAll();
1940

J
jorlow@chromium.org 已提交
1941 1942
}

A
Abhishek Kona 已提交
1943
Status DBImpl::BackgroundCompaction(bool* madeProgress,
I
Igor Canadi 已提交
1944
                                    DeletionState& deletion_state) {
1945
  *madeProgress = false;
J
jorlow@chromium.org 已提交
1946
  mutex_.AssertHeld();
1947

1948 1949 1950 1951 1952 1953 1954
  bool is_manual = (manual_compaction_ != nullptr) &&
                   (manual_compaction_->in_progress == false);
  if (is_manual) {
    // another thread cannot pick up the same work
    manual_compaction_->in_progress = true;
  }

1955
  // TODO: remove memtable flush from formal compaction
I
Igor Canadi 已提交
1956
  while (imm_.IsFlushPending()) {
A
Abhishek Kona 已提交
1957
    Log(options_.info_log,
1958 1959
        "BackgroundCompaction doing FlushMemTableToOutputFile, compaction slots "
        "available %d",
1960
        options_.max_background_compactions - bg_compaction_scheduled_);
I
Igor Canadi 已提交
1961
    Status stat = FlushMemTableToOutputFile(madeProgress, deletion_state);
1962
    if (!stat.ok()) {
1963 1964 1965 1966 1967 1968
      if (is_manual) {
        manual_compaction_->status = stat;
        manual_compaction_->done = true;
        manual_compaction_->in_progress = false;
        manual_compaction_ = nullptr;
      }
1969 1970
      return stat;
    }
1971 1972
  }

1973
  unique_ptr<Compaction> c;
1974 1975
  InternalKey manual_end_storage;
  InternalKey* manual_end = &manual_end_storage;
H
hans@chromium.org 已提交
1976
  if (is_manual) {
G
Gabor Cselle 已提交
1977
    ManualCompaction* m = manual_compaction_;
1978
    assert(m->in_progress);
1979 1980 1981
    c.reset(versions_->CompactRange(
        m->input_level, m->output_level, m->begin, m->end, &manual_end));
    if (!c) {
1982
      m->done = true;
G
Gabor Cselle 已提交
1983 1984
    }
    Log(options_.info_log,
1985 1986 1987 1988
        "Manual compaction from level-%d to level-%d from %s .. %s; will stop "
        "at %s\n",
        m->input_level,
        m->output_level,
G
Gabor Cselle 已提交
1989 1990
        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
        (m->end ? m->end->DebugString().c_str() : "(end)"),
1991 1992 1993
        ((m->done || manual_end == nullptr)
             ? "(end)"
             : manual_end->DebugString().c_str()));
1994
  } else if (!options_.disable_auto_compactions) {
1995
    c.reset(versions_->PickCompaction());
J
jorlow@chromium.org 已提交
1996 1997 1998
  }

  Status status;
1999
  if (!c) {
H
hans@chromium.org 已提交
2000
    // Nothing to do
2001
    Log(options_.info_log, "Compaction nothing to do");
H
hans@chromium.org 已提交
2002
  } else if (!is_manual && c->IsTrivialMove()) {
J
jorlow@chromium.org 已提交
2003
    // Move file to next level
2004
    assert(c->num_input_files(0) == 1);
J
jorlow@chromium.org 已提交
2005 2006 2007
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
2008 2009
                       f->smallest, f->largest,
                       f->smallest_seqno, f->largest_seqno);
2010
    status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get());
I
Igor Canadi 已提交
2011
    InstallSuperVersion(deletion_state);
2012
    Version::LevelSummaryStorage tmp;
2013
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
2014
        static_cast<unsigned long long>(f->number), c->level() + 1,
J
jorlow@chromium.org 已提交
2015
        static_cast<unsigned long long>(f->file_size),
2016
        status.ToString().c_str(), versions_->current()->LevelSummary(&tmp));
2017
    versions_->ReleaseCompactionFiles(c.get(), status);
2018
    *madeProgress = true;
J
jorlow@chromium.org 已提交
2019
  } else {
2020
    MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
2021
    CompactionState* compact = new CompactionState(c.get());
I
Igor Canadi 已提交
2022
    status = DoCompactionWork(compact, deletion_state);
2023
    CleanupCompaction(compact, status);
2024
    versions_->ReleaseCompactionFiles(c.get(), status);
2025
    c->ReleaseInputs();
2026
    *madeProgress = true;
J
jorlow@chromium.org 已提交
2027
  }
2028
  c.reset();
J
jorlow@chromium.org 已提交
2029 2030 2031 2032 2033 2034

  if (status.ok()) {
    // Done
  } else if (shutting_down_.Acquire_Load()) {
    // Ignore compaction errors found during shutting down
  } else {
2035
    Log(options_.info_log,
J
jorlow@chromium.org 已提交
2036 2037 2038 2039 2040
        "Compaction error: %s", status.ToString().c_str());
    if (options_.paranoid_checks && bg_error_.ok()) {
      bg_error_ = status;
    }
  }
H
hans@chromium.org 已提交
2041 2042

  if (is_manual) {
G
Gabor Cselle 已提交
2043
    ManualCompaction* m = manual_compaction_;
2044
    if (!status.ok()) {
L
Lei Jin 已提交
2045
      m->status = status;
2046 2047
      m->done = true;
    }
2048 2049 2050 2051 2052 2053 2054 2055 2056
    // For universal compaction:
    //   Because universal compaction always happens at level 0, so one
    //   compaction will pick up all overlapped files. No files will be
    //   filtered out due to size limit and left for a successive compaction.
    //   So we can safely conclude the current compaction.
    //
    //   Also note that, if we don't stop here, then the current compaction
    //   writes a new file back to level 0, which will be used in successive
    //   compaction. Hence the manual compaction will never finish.
2057 2058 2059 2060 2061
    //
    // Stop the compaction if manual_end points to nullptr -- this means
    // that we compacted the whole range. manual_end should always point
    // to nullptr in case of universal compaction
    if (manual_end == nullptr) {
2062 2063
      m->done = true;
    }
G
Gabor Cselle 已提交
2064 2065 2066
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
2067 2068 2069
      // Universal compaction should always compact the whole range
      assert(options_.compaction_style != kCompactionStyleUniversal);
      m->tmp_storage = *manual_end;
G
Gabor Cselle 已提交
2070 2071
      m->begin = &m->tmp_storage;
    }
2072
    m->in_progress = false; // not being processed anymore
2073
    manual_compaction_ = nullptr;
H
hans@chromium.org 已提交
2074
  }
2075
  return status;
J
jorlow@chromium.org 已提交
2076 2077
}

2078
void DBImpl::CleanupCompaction(CompactionState* compact, Status status) {
J
jorlow@chromium.org 已提交
2079
  mutex_.AssertHeld();
2080
  if (compact->builder != nullptr) {
J
jorlow@chromium.org 已提交
2081 2082
    // May happen if we get a shutdown call in the middle of compaction
    compact->builder->Abandon();
2083
    compact->builder.reset();
J
jorlow@chromium.org 已提交
2084
  } else {
2085
    assert(compact->outfile == nullptr);
J
jorlow@chromium.org 已提交
2086
  }
D
dgrogan@chromium.org 已提交
2087
  for (size_t i = 0; i < compact->outputs.size(); i++) {
J
jorlow@chromium.org 已提交
2088 2089
    const CompactionState::Output& out = compact->outputs[i];
    pending_outputs_.erase(out.number);
2090 2091 2092 2093 2094 2095

    // If this file was inserted into the table cache then remove
    // them here because this compaction was not committed.
    if (!status.ok()) {
      table_cache_->Evict(out.number);
    }
J
jorlow@chromium.org 已提交
2096 2097 2098 2099
  }
  delete compact;
}

2100
// Allocate the file numbers for the output file. We allocate as
2101
// many output file numbers as there are files in level+1 (at least one)
2102 2103 2104
// Insert them into pending_outputs so that they do not get deleted.
void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
  mutex_.AssertHeld();
2105 2106
  assert(compact != nullptr);
  assert(compact->builder == nullptr);
2107
  int filesNeeded = compact->compaction->num_input_files(1);
2108
  for (int i = 0; i < std::max(filesNeeded, 1); i++) {
2109 2110 2111 2112 2113 2114 2115 2116 2117
    uint64_t file_number = versions_->NewFileNumber();
    pending_outputs_.insert(file_number);
    compact->allocated_file_numbers.push_back(file_number);
  }
}

// Frees up unused file number.
void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
  mutex_.AssertHeld();
2118
  for (const auto file_number : compact->allocated_file_numbers) {
2119 2120 2121 2122 2123
    pending_outputs_.erase(file_number);
    // Log(options_.info_log, "XXX releasing unused file num %d", file_number);
  }
}

J
jorlow@chromium.org 已提交
2124
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
2125 2126
  assert(compact != nullptr);
  assert(compact->builder == nullptr);
J
jorlow@chromium.org 已提交
2127
  uint64_t file_number;
2128 2129 2130 2131 2132 2133 2134
  // If we have not yet exhausted the pre-allocated file numbers,
  // then use the one from the front. Otherwise, we have to acquire
  // the heavyweight lock and allocate a new file number.
  if (!compact->allocated_file_numbers.empty()) {
    file_number = compact->allocated_file_numbers.front();
    compact->allocated_file_numbers.pop_front();
  } else {
J
jorlow@chromium.org 已提交
2135 2136 2137 2138 2139
    mutex_.Lock();
    file_number = versions_->NewFileNumber();
    pending_outputs_.insert(file_number);
    mutex_.Unlock();
  }
2140 2141 2142 2143
  CompactionState::Output out;
  out.number = file_number;
  out.smallest.Clear();
  out.largest.Clear();
2144
  out.smallest_seqno = out.largest_seqno = 0;
2145
  compact->outputs.push_back(out);
J
jorlow@chromium.org 已提交
2146 2147 2148

  // Make the output file
  std::string fname = TableFileName(dbname_, file_number);
2149
  Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
2150

J
jorlow@chromium.org 已提交
2151
  if (s.ok()) {
2152 2153 2154
    // Over-estimate slightly so we don't end up just barely crossing
    // the threshold.
    compact->outfile->SetPreallocationBlockSize(
2155
      1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level()));
2156

S
Siying Dong 已提交
2157 2158 2159 2160
    CompressionType compression_type = GetCompressionType(
        options_, compact->compaction->output_level(),
        compact->compaction->enable_compression());

2161 2162 2163
    compact->builder.reset(NewTableBuilder(options_, internal_comparator_,
                                           compact->outfile.get(),
                                           compression_type));
J
jorlow@chromium.org 已提交
2164
  }
I
Igor Canadi 已提交
2165
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
2166 2167 2168 2169 2170
  return s;
}

Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
                                          Iterator* input) {
2171
  assert(compact != nullptr);
2172
  assert(compact->outfile);
2173
  assert(compact->builder != nullptr);
J
jorlow@chromium.org 已提交
2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188

  const uint64_t output_number = compact->current_output()->number;
  assert(output_number != 0);

  // Check for iterator errors
  Status s = input->status();
  const uint64_t current_entries = compact->builder->NumEntries();
  if (s.ok()) {
    s = compact->builder->Finish();
  } else {
    compact->builder->Abandon();
  }
  const uint64_t current_bytes = compact->builder->FileSize();
  compact->current_output()->file_size = current_bytes;
  compact->total_bytes += current_bytes;
2189
  compact->builder.reset();
J
jorlow@chromium.org 已提交
2190 2191

  // Finish and check for file errors
2192
  if (s.ok() && !options_.disableDataSync) {
2193
    if (options_.use_fsync) {
2194
      StopWatch sw(env_, options_.statistics.get(),
2195
                   COMPACTION_OUTFILE_SYNC_MICROS, false);
2196 2197
      s = compact->outfile->Fsync();
    } else {
2198
      StopWatch sw(env_, options_.statistics.get(),
2199
                   COMPACTION_OUTFILE_SYNC_MICROS, false);
2200 2201
      s = compact->outfile->Sync();
    }
J
jorlow@chromium.org 已提交
2202 2203 2204 2205
  }
  if (s.ok()) {
    s = compact->outfile->Close();
  }
2206
  compact->outfile.reset();
J
jorlow@chromium.org 已提交
2207 2208 2209

  if (s.ok() && current_entries > 0) {
    // Verify that the table is usable
2210
    FileMetaData meta(output_number, current_bytes);
2211 2212
    Iterator* iter = table_cache_->NewIterator(ReadOptions(), storage_options_,
                                               internal_comparator_, meta);
J
jorlow@chromium.org 已提交
2213 2214 2215
    s = iter->status();
    delete iter;
    if (s.ok()) {
2216
      Log(options_.info_log,
K
Kai Liu 已提交
2217 2218 2219 2220
          "Generated table #%lu: %lu keys, %lu bytes",
          (unsigned long) output_number,
          (unsigned long) current_entries,
          (unsigned long) current_bytes);
J
jorlow@chromium.org 已提交
2221 2222 2223 2224 2225 2226 2227 2228
    }
  }
  return s;
}


Status DBImpl::InstallCompactionResults(CompactionState* compact) {
  mutex_.AssertHeld();
2229 2230 2231 2232 2233

  // paranoia: verify that the files that we started with
  // still exist in the current version and in the same original level.
  // This ensures that a concurrent compaction did not erroneously
  // pick the same files to compact.
2234
  if (!versions_->VerifyCompactionFileConsistency(compact->compaction)) {
2235 2236 2237 2238 2239
    Log(options_.info_log,  "Compaction %d@%d + %d@%d files aborted",
      compact->compaction->num_input_files(0),
      compact->compaction->level(),
      compact->compaction->num_input_files(1),
      compact->compaction->level() + 1);
L
Lei Jin 已提交
2240
    return Status::Corruption("Compaction input files inconsistent");
2241 2242
  }

2243
  Log(options_.info_log,  "Compacted %d@%d + %d@%d files => %lld bytes",
J
jorlow@chromium.org 已提交
2244 2245 2246 2247 2248 2249 2250 2251
      compact->compaction->num_input_files(0),
      compact->compaction->level(),
      compact->compaction->num_input_files(1),
      compact->compaction->level() + 1,
      static_cast<long long>(compact->total_bytes));

  // Add compaction outputs
  compact->compaction->AddInputDeletions(compact->compaction->edit());
D
dgrogan@chromium.org 已提交
2252
  for (size_t i = 0; i < compact->outputs.size(); i++) {
J
jorlow@chromium.org 已提交
2253 2254
    const CompactionState::Output& out = compact->outputs[i];
    compact->compaction->edit()->AddFile(
2255 2256
        compact->compaction->output_level(), out.number, out.file_size,
        out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
J
jorlow@chromium.org 已提交
2257
  }
2258 2259
  return versions_->LogAndApply(compact->compaction->edit(), &mutex_,
                                db_directory_.get());
J
jorlow@chromium.org 已提交
2260 2261
}

2262 2263 2264 2265 2266 2267 2268 2269
//
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
2270 2271
  SequenceNumber in, std::vector<SequenceNumber>& snapshots,
  SequenceNumber* prev_snapshot) {
2272
  SequenceNumber prev __attribute__((unused)) = 0;
2273 2274 2275
  for (const auto cur : snapshots) {
    assert(prev <= cur);
    if (cur >= in) {
2276
      *prev_snapshot = prev;
2277
      return cur;
2278
    }
2279 2280
    prev = cur; // assignment
    assert(prev);
2281 2282
  }
  Log(options_.info_log,
K
Kai Liu 已提交
2283 2284 2285
      "Looking for seqid %lu but maxseqid is %lu",
      (unsigned long)in,
      (unsigned long)snapshots[snapshots.size()-1]);
2286 2287 2288 2289
  assert(0);
  return 0;
}

I
Igor Canadi 已提交
2290 2291
Status DBImpl::DoCompactionWork(CompactionState* compact,
                                DeletionState& deletion_state) {
2292
  assert(compact);
2293
  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
A
Abhishek Kona 已提交
2294
  Log(options_.info_log,
2295
      "Compacting %d@%d + %d@%d files, score %.2f slots available %d",
J
jorlow@chromium.org 已提交
2296 2297 2298
      compact->compaction->num_input_files(0),
      compact->compaction->level(),
      compact->compaction->num_input_files(1),
2299
      compact->compaction->output_level(),
2300
      compact->compaction->score(),
2301
      options_.max_background_compactions - bg_compaction_scheduled_);
2302 2303
  char scratch[256];
  compact->compaction->Summary(scratch, sizeof(scratch));
H
heyongqiang 已提交
2304
  Log(options_.info_log, "Compaction start summary: %s\n", scratch);
J
jorlow@chromium.org 已提交
2305

2306
  assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
2307
  assert(compact->builder == nullptr);
2308
  assert(!compact->outfile);
2309 2310 2311

  SequenceNumber visible_at_tip = 0;
  SequenceNumber earliest_snapshot;
H
Haobo Xu 已提交
2312
  SequenceNumber latest_snapshot = 0;
2313 2314 2315 2316 2317
  snapshots_.getAll(compact->existing_snapshots);
  if (compact->existing_snapshots.size() == 0) {
    // optimize for fast path if there are no snapshots
    visible_at_tip = versions_->LastSequence();
    earliest_snapshot = visible_at_tip;
J
jorlow@chromium.org 已提交
2318
  } else {
H
Haobo Xu 已提交
2319
    latest_snapshot = compact->existing_snapshots.back();
2320 2321 2322 2323
    // Add the current seqno as the 'latest' virtual
    // snapshot to the end of this list.
    compact->existing_snapshots.push_back(versions_->LastSequence());
    earliest_snapshot = compact->existing_snapshots[0];
J
jorlow@chromium.org 已提交
2324 2325
  }

2326
  // Is this compaction producing files at the bottommost level?
2327
  bool bottommost_level = compact->compaction->BottomMostLevel();
2328

2329 2330 2331
  // Allocate the output file numbers before we release the lock
  AllocateCompactionOutputFileNumbers(compact);

J
jorlow@chromium.org 已提交
2332 2333 2334
  // Release mutex while we're actually doing the compaction work
  mutex_.Unlock();

2335
  const uint64_t start_micros = env_->NowMicros();
2336
  unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
J
jorlow@chromium.org 已提交
2337 2338 2339 2340 2341
  input->SeekToFirst();
  Status status;
  ParsedInternalKey ikey;
  std::string current_user_key;
  bool has_current_user_key = false;
2342 2343
  SequenceNumber last_sequence_for_key __attribute__((unused)) =
    kMaxSequenceNumber;
2344
  SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
H
Haobo Xu 已提交
2345
  std::string compaction_filter_value;
H
Haobo Xu 已提交
2346
  std::vector<char> delete_key; // for compaction filter
2347
  MergeHelper merge(user_comparator(), options_.merge_operator.get(),
2348 2349
                    options_.info_log.get(),
                    false /* internal key corruption is expected */);
2350 2351 2352
  auto compaction_filter = options_.compaction_filter;
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (!compaction_filter) {
2353 2354 2355
    auto context = compact->GetFilterContext();
    compaction_filter_from_factory =
      options_.compaction_filter_factory->CreateCompactionFilter(context);
2356 2357
    compaction_filter = compaction_filter_from_factory.get();
  }
2358

J
jorlow@chromium.org 已提交
2359
  for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
2360
    // Prioritize immutable compaction work
2361
    // TODO: remove memtable flush from normal compaction work
2362
    if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
2363
      const uint64_t imm_start = env_->NowMicros();
I
Igor Canadi 已提交
2364
      LogFlush(options_.info_log);
2365
      mutex_.Lock();
I
Igor Canadi 已提交
2366
      if (imm_.IsFlushPending()) {
I
Igor Canadi 已提交
2367
        FlushMemTableToOutputFile(nullptr, deletion_state);
H
hans@chromium.org 已提交
2368
        bg_cv_.SignalAll();  // Wakeup MakeRoomForWrite() if necessary
2369 2370 2371 2372 2373
      }
      mutex_.Unlock();
      imm_micros += (env_->NowMicros() - imm_start);
    }

J
jorlow@chromium.org 已提交
2374
    Slice key = input->key();
2375
    Slice value = input->value();
H
Haobo Xu 已提交
2376

2377
    if (compact->compaction->ShouldStopBefore(key) &&
2378
        compact->builder != nullptr) {
2379
      status = FinishCompactionOutputFile(compact, input.get());
2380 2381 2382 2383 2384 2385
      if (!status.ok()) {
        break;
      }
    }

    // Handle key/value, add to state, etc.
J
jorlow@chromium.org 已提交
2386
    bool drop = false;
2387
    bool current_entry_is_merging = false;
J
jorlow@chromium.org 已提交
2388 2389
    if (!ParseInternalKey(key, &ikey)) {
      // Do not hide error keys
2390 2391
      // TODO: error key stays in db forever? Figure out the intention/rationale
      // v10 error v8 : we cannot hide v8 even though it's pretty obvious.
J
jorlow@chromium.org 已提交
2392 2393 2394
      current_user_key.clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
2395
      visible_in_snapshot = kMaxSequenceNumber;
J
jorlow@chromium.org 已提交
2396 2397 2398 2399 2400 2401 2402 2403
    } else {
      if (!has_current_user_key ||
          user_comparator()->Compare(ikey.user_key,
                                     Slice(current_user_key)) != 0) {
        // First occurrence of this user key
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
2404
        visible_in_snapshot = kMaxSequenceNumber;
H
Haobo Xu 已提交
2405 2406

        // apply the compaction filter to the first occurrence of the user key
2407
        if (compaction_filter &&
H
Haobo Xu 已提交
2408 2409 2410 2411 2412 2413 2414 2415 2416 2417
            ikey.type == kTypeValue &&
            (visible_at_tip || ikey.sequence > latest_snapshot)) {
          // If the user has specified a compaction filter and the sequence
          // number is greater than any external snapshot, then invoke the
          // filter.
          // If the return value of the compaction filter is true, replace
          // the entry with a delete marker.
          bool value_changed = false;
          compaction_filter_value.clear();
          bool to_delete =
2418
            compaction_filter->Filter(compact->compaction->level(),
S
Siying Dong 已提交
2419 2420 2421
                                               ikey.user_key, value,
                                               &compaction_filter_value,
                                               &value_changed);
H
Haobo Xu 已提交
2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433
          if (to_delete) {
            // make a copy of the original key
            delete_key.assign(key.data(), key.data() + key.size());
            // convert it to a delete
            UpdateInternalKey(&delete_key[0], delete_key.size(),
                              ikey.sequence, kTypeDeletion);
            // anchor the key again
            key = Slice(&delete_key[0], delete_key.size());
            // needed because ikey is backed by key
            ParseInternalKey(key, &ikey);
            // no value associated with delete
            value.clear();
2434
            RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER);
H
Haobo Xu 已提交
2435 2436 2437 2438 2439
          } else if (value_changed) {
            value = compaction_filter_value;
          }
        }

J
jorlow@chromium.org 已提交
2440 2441
      }

2442 2443 2444
      // If there are no snapshots, then this kv affect visibility at tip.
      // Otherwise, search though all existing snapshots to find
      // the earlist snapshot that is affected by this kv.
2445 2446 2447 2448 2449 2450
      SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
      SequenceNumber visible = visible_at_tip ?
        visible_at_tip :
        findEarliestVisibleSnapshot(ikey.sequence,
                                    compact->existing_snapshots,
                                    &prev_snapshot);
2451 2452 2453 2454 2455

      if (visible_in_snapshot == visible) {
        // If the earliest snapshot is which this key is visible in
        // is the same as the visibily of a previous instance of the
        // same key, then this kv is not visible in any snapshot.
J
jorlow@chromium.org 已提交
2456
        // Hidden by an newer entry for same user key
2457
        // TODO: why not > ?
2458
        assert(last_sequence_for_key >= ikey.sequence);
J
jorlow@chromium.org 已提交
2459
        drop = true;    // (A)
2460
        RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY);
J
jorlow@chromium.org 已提交
2461
      } else if (ikey.type == kTypeDeletion &&
2462
                 ikey.sequence <= earliest_snapshot &&
J
jorlow@chromium.org 已提交
2463 2464 2465 2466 2467 2468 2469 2470 2471
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
2472
        RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_OBSOLETE);
2473 2474 2475 2476 2477 2478 2479
      } else if (ikey.type == kTypeMerge) {
        // We know the merge type entry is not hidden, otherwise we would
        // have hit (A)
        // We encapsulate the merge related state machine in a different
        // object to minimize change to the existing flow. Turn out this
        // logic could also be nicely re-used for memtable flush purge
        // optimization in BuildTable.
M
Mayank Agarwal 已提交
2480
        merge.MergeUntil(input.get(), prev_snapshot, bottommost_level,
2481
                         options_.statistics.get());
2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499
        current_entry_is_merging = true;
        if (merge.IsSuccess()) {
          // Successfully found Put/Delete/(end-of-key-range) while merging
          // Get the merge result
          key = merge.key();
          ParseInternalKey(key, &ikey);
          value = merge.value();
        } else {
          // Did not find a Put/Delete/(end-of-key-range) while merging
          // We now have some stack of merge operands to write out.
          // NOTE: key,value, and ikey are now referring to old entries.
          //       These will be correctly set below.
          assert(!merge.keys().empty());
          assert(merge.keys().size() == merge.values().size());

          // Hack to make sure last_sequence_for_key is correct
          ParseInternalKey(merge.keys().front(), &ikey);
        }
J
jorlow@chromium.org 已提交
2500 2501 2502
      }

      last_sequence_for_key = ikey.sequence;
2503
      visible_in_snapshot = visible;
J
jorlow@chromium.org 已提交
2504 2505
    }
#if 0
2506
    Log(options_.info_log,
J
jorlow@chromium.org 已提交
2507
        "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
2508
        "%d smallest_snapshot: %d level: %d bottommost %d",
J
jorlow@chromium.org 已提交
2509
        ikey.user_key.ToString().c_str(),
D
dgrogan@chromium.org 已提交
2510
        (int)ikey.sequence, ikey.type, kTypeValue, drop,
J
jorlow@chromium.org 已提交
2511
        compact->compaction->IsBaseLevelForKey(ikey.user_key),
2512 2513
        (int)last_sequence_for_key, (int)earliest_snapshot,
        compact->compaction->level(), bottommost_level);
J
jorlow@chromium.org 已提交
2514 2515 2516
#endif

    if (!drop) {
2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532
      // We may write a single key (e.g.: for Put/Delete or successful merge).
      // Or we may instead have to write a sequence/list of keys.
      // We have to write a sequence iff we have an unsuccessful merge
      bool has_merge_list = current_entry_is_merging && !merge.IsSuccess();
      const std::deque<std::string>* keys = nullptr;
      const std::deque<std::string>* values = nullptr;
      std::deque<std::string>::const_reverse_iterator key_iter;
      std::deque<std::string>::const_reverse_iterator value_iter;
      if (has_merge_list) {
        keys = &merge.keys();
        values = &merge.values();
        key_iter = keys->rbegin();    // The back (*rbegin()) is the first key
        value_iter = values->rbegin();

        key = Slice(*key_iter);
        value = Slice(*value_iter);
2533
      }
2534

2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545
      // If we have a list of keys to write, traverse the list.
      // If we have a single key to write, simply write that key.
      while (true) {
        // Invariant: key,value,ikey will always be the next entry to write
        char* kptr = (char*)key.data();
        std::string kstr;

        // Zeroing out the sequence number leads to better compression.
        // If this is the bottommost level (no files in lower levels)
        // and the earliest snapshot is larger than this seqno
        // then we can squash the seqno to zero.
2546
        if (bottommost_level && ikey.sequence < earliest_snapshot &&
2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557
            ikey.type != kTypeMerge) {
          assert(ikey.type != kTypeDeletion);
          // make a copy because updating in place would cause problems
          // with the priority queue that is managing the input key iterator
          kstr.assign(key.data(), key.size());
          kptr = (char *)kstr.c_str();
          UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type);
        }

        Slice newkey(kptr, key.size());
        assert((key.clear(), 1)); // we do not need 'key' anymore
2558

2559 2560 2561 2562 2563 2564 2565
        // Open output file if necessary
        if (compact->builder == nullptr) {
          status = OpenCompactionOutputFile(compact);
          if (!status.ok()) {
            break;
          }
        }
2566 2567

        SequenceNumber seqno = GetInternalKeySeqno(newkey);
2568 2569
        if (compact->builder->NumEntries() == 0) {
          compact->current_output()->smallest.DecodeFrom(newkey);
2570 2571 2572 2573
          compact->current_output()->smallest_seqno = seqno;
        } else {
          compact->current_output()->smallest_seqno =
            std::min(compact->current_output()->smallest_seqno, seqno);
2574 2575 2576
        }
        compact->current_output()->largest.DecodeFrom(newkey);
        compact->builder->Add(newkey, value);
2577 2578
        compact->current_output()->largest_seqno =
          std::max(compact->current_output()->largest_seqno, seqno);
2579 2580 2581 2582 2583 2584 2585 2586

        // Close output file if it is big enough
        if (compact->builder->FileSize() >=
            compact->compaction->MaxOutputFileSize()) {
          status = FinishCompactionOutputFile(compact, input.get());
          if (!status.ok()) {
            break;
          }
J
jorlow@chromium.org 已提交
2587 2588
        }

2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608
        // If we have a list of entries, move to next element
        // If we only had one entry, then break the loop.
        if (has_merge_list) {
          ++key_iter;
          ++value_iter;

          // If at end of list
          if (key_iter == keys->rend() || value_iter == values->rend()) {
            // Sanity Check: if one ends, then both end
            assert(key_iter == keys->rend() && value_iter == values->rend());
            break;
          }

          // Otherwise not at end of list. Update key, value, and ikey.
          key = Slice(*key_iter);
          value = Slice(*value_iter);
          ParseInternalKey(key, &ikey);

        } else{
          // Only had one item to begin with (Put/Delete)
J
jorlow@chromium.org 已提交
2609 2610 2611 2612 2613
          break;
        }
      }
    }

2614
    // MergeUntil has moved input to the next entry
2615
    if (!current_entry_is_merging) {
2616 2617
      input->Next();
    }
J
jorlow@chromium.org 已提交
2618 2619 2620
  }

  if (status.ok() && shutting_down_.Acquire_Load()) {
L
Lei Jin 已提交
2621 2622
    status = Status::ShutdownInProgress(
        "Database shutdown started during compaction");
J
jorlow@chromium.org 已提交
2623
  }
2624
  if (status.ok() && compact->builder != nullptr) {
2625
    status = FinishCompactionOutputFile(compact, input.get());
J
jorlow@chromium.org 已提交
2626 2627 2628 2629
  }
  if (status.ok()) {
    status = input->status();
  }
2630
  input.reset();
J
jorlow@chromium.org 已提交
2631

2632 2633 2634
  if (!options_.disableDataSync) {
    db_directory_->Fsync();
  }
I
Igor Canadi 已提交
2635 2636

  InternalStats::CompactionStats stats;
2637
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
I
Igor Canadi 已提交
2638
  MeasureTime(options_.statistics.get(), COMPACTION_TIME, stats.micros);
M
Mark Callaghan 已提交
2639 2640
  stats.files_in_leveln = compact->compaction->num_input_files(0);
  stats.files_in_levelnp1 = compact->compaction->num_input_files(1);
2641 2642

  int num_output_files = compact->outputs.size();
2643
  if (compact->builder != nullptr) {
P
Pascal Borreli 已提交
2644
    // An error occurred so ignore the last output.
2645 2646 2647 2648
    assert(num_output_files > 0);
    --num_output_files;
  }
  stats.files_out_levelnp1 = num_output_files;
M
Mark Callaghan 已提交
2649

2650
  for (int i = 0; i < compact->compaction->num_input_files(0); i++) {
M
Mark Callaghan 已提交
2651
    stats.bytes_readn += compact->compaction->input(0, i)->file_size;
2652 2653 2654
    RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
               compact->compaction->input(0, i)->file_size);
  }
M
Mark Callaghan 已提交
2655

2656
  for (int i = 0; i < compact->compaction->num_input_files(1); i++) {
M
Mark Callaghan 已提交
2657
    stats.bytes_readnp1 += compact->compaction->input(1, i)->file_size;
2658 2659 2660
    RecordTick(options_.statistics.get(), COMPACT_READ_BYTES,
               compact->compaction->input(1, i)->file_size);
  }
M
Mark Callaghan 已提交
2661

2662
  for (int i = 0; i < num_output_files; i++) {
2663
    stats.bytes_written += compact->outputs[i].file_size;
2664 2665
    RecordTick(options_.statistics.get(), COMPACT_WRITE_BYTES,
               compact->outputs[i].file_size);
2666 2667
  }

I
Igor Canadi 已提交
2668
  LogFlush(options_.info_log);
J
jorlow@chromium.org 已提交
2669
  mutex_.Lock();
I
Igor Canadi 已提交
2670 2671
  internal_stats_.AddCompactionStats(compact->compaction->output_level(),
                                     stats);
J
jorlow@chromium.org 已提交
2672

2673 2674 2675 2676
  // if there were any unused file number (mostly in case of
  // compaction error), free up the entry from pending_putputs
  ReleaseCompactionUnusedFileNumbers(compact);

J
jorlow@chromium.org 已提交
2677 2678
  if (status.ok()) {
    status = InstallCompactionResults(compact);
I
Igor Canadi 已提交
2679
    InstallSuperVersion(deletion_state);
J
jorlow@chromium.org 已提交
2680
  }
2681
  Version::LevelSummaryStorage tmp;
2682
  Log(options_.info_log,
M
Mark Callaghan 已提交
2683
      "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
2684 2685
      "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
      "write-amplify(%.1f) %s\n",
2686
      versions_->current()->LevelSummary(&tmp),
M
Mark Callaghan 已提交
2687
      (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
2688 2689 2690 2691
          (double)stats.micros,
      compact->compaction->output_level(), stats.files_in_leveln,
      stats.files_in_levelnp1, stats.files_out_levelnp1,
      stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
M
Mark Callaghan 已提交
2692
      stats.bytes_written / 1048576.0,
2693
      (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
2694 2695
          (double)stats.bytes_readn,
      stats.bytes_written / (double)stats.bytes_readn,
2696
      status.ToString().c_str());
M
Mark Callaghan 已提交
2697

J
jorlow@chromium.org 已提交
2698 2699 2700
  return status;
}

2701 2702
namespace {
struct IterState {
2703 2704 2705 2706
  IterState(DBImpl* db, port::Mutex* mu, DBImpl::SuperVersion* super_version)
    : db(db), mu(mu), super_version(super_version) {}

  DBImpl* db;
2707
  port::Mutex* mu;
2708
  DBImpl::SuperVersion* super_version;
2709 2710 2711 2712
};

static void CleanupIteratorState(void* arg1, void* arg2) {
  IterState* state = reinterpret_cast<IterState*>(arg1);
2713 2714
  DBImpl::DeletionState deletion_state(state->db->GetOptions().
                                       max_write_buffer_number);
2715 2716 2717 2718 2719 2720 2721 2722 2723 2724

  bool need_cleanup = state->super_version->Unref();
  if (need_cleanup) {
    state->mu->Lock();
    state->super_version->Cleanup();
    state->db->FindObsoleteFiles(deletion_state, false, true);
    state->mu->Unlock();

    delete state->super_version;
    state->db->PurgeObsoleteFiles(deletion_state);
2725
  }
T
Tomislav Novak 已提交
2726

2727 2728
  delete state;
}
H
Hans Wennborg 已提交
2729
}  // namespace
2730

J
jorlow@chromium.org 已提交
2731 2732
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
                                      SequenceNumber* latest_snapshot) {
2733 2734
  mutex_.Lock();
  *latest_snapshot = versions_->LastSequence();
2735
  SuperVersion* super_version = super_version_->Ref();
2736 2737
  mutex_.Unlock();

I
Igor Canadi 已提交
2738
  std::vector<Iterator*> iterator_list;
2739 2740
  // Collect iterator for mutable mem
  iterator_list.push_back(super_version->mem->NewIterator(options));
T
Tomislav Novak 已提交
2741
  // Collect all needed child iterators for immutable memtables
2742
  super_version->imm->AddIterators(options, &iterator_list);
T
Tomislav Novak 已提交
2743
  // Collect iterators for files in L0 - Ln
2744 2745
  super_version->current->AddIterators(options, storage_options_,
                                       &iterator_list);
K
Kai Liu 已提交
2746
  Iterator* internal_iter = NewMergingIterator(
K
kailiu 已提交
2747
      env_, &internal_comparator_, &iterator_list[0], iterator_list.size());
2748 2749

  IterState* cleanup = new IterState(this, &mutex_, super_version);
2750
  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
J
jorlow@chromium.org 已提交
2751 2752 2753 2754 2755 2756

  return internal_iter;
}

Iterator* DBImpl::TEST_NewInternalIterator() {
  SequenceNumber ignored;
2757 2758 2759 2760
  ReadOptions read_options;
  // Use prefix_seek to make the test function more useful.
  read_options.prefix_seek = true;
  return NewInternalIterator(read_options, &ignored);
J
jorlow@chromium.org 已提交
2761 2762
}

T
Tomislav Novak 已提交
2763 2764 2765 2766 2767
std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
    const ReadOptions& options,
    uint64_t* superversion_number) {

  mutex_.Lock();
2768
  SuperVersion* super_version = super_version_->Ref();
T
Tomislav Novak 已提交
2769 2770 2771 2772 2773
  if (superversion_number != nullptr) {
    *superversion_number = CurrentVersionNumber();
  }
  mutex_.Unlock();

2774
  Iterator* mutable_iter = super_version->mem->NewIterator(options);
T
Tomislav Novak 已提交
2775 2776 2777 2778 2779
  // create a DBIter that only uses memtable content; see NewIterator()
  mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
                               mutable_iter, kMaxSequenceNumber);

  std::vector<Iterator*> list;
2780 2781 2782
  super_version->imm->AddIterators(options, &list);
  super_version->current->AddIterators(options, storage_options_, &list);
  Iterator* immutable_iter =
K
kailiu 已提交
2783
    NewMergingIterator(env_, &internal_comparator_, &list[0], list.size());
T
Tomislav Novak 已提交
2784 2785 2786 2787 2788

  // create a DBIter that only uses memtable content; see NewIterator()
  immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(),
                                 immutable_iter, kMaxSequenceNumber);

2789 2790 2791 2792 2793 2794 2795 2796
  // register cleanups
  mutable_iter->RegisterCleanup(CleanupIteratorState,
    new IterState(this, &mutex_, super_version), nullptr);

  // bump the ref one more time since it will be Unref'ed twice
  immutable_iter->RegisterCleanup(CleanupIteratorState,
    new IterState(this, &mutex_, super_version->Ref()), nullptr);

T
Tomislav Novak 已提交
2797 2798 2799
  return std::make_pair(mutable_iter, immutable_iter);
}

J
jorlow@chromium.org 已提交
2800
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
2801
  MutexLock l(&mutex_);
2802
  return versions_->current()->MaxNextLevelOverlappingBytes();
2803 2804
}

J
jorlow@chromium.org 已提交
2805 2806 2807
Status DBImpl::Get(const ReadOptions& options,
                   const Slice& key,
                   std::string* value) {
2808 2809 2810
  return GetImpl(options, key, value);
}

I
Igor Canadi 已提交
2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838
// DeletionState gets created and destructed outside of the lock -- we
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete one SuperVersion() outside of the lock -- superversion_to_free
//
// However, if InstallSuperVersion() gets called twice with the same,
// deletion_state, we can't reuse the SuperVersion() that got malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
  // if new_superversion == nullptr, it means somebody already used it
  SuperVersion* new_superversion =
    (deletion_state.new_superversion != nullptr) ?
    deletion_state.new_superversion : new SuperVersion();
  SuperVersion* old_superversion = InstallSuperVersion(new_superversion);
  deletion_state.new_superversion = nullptr;
  if (deletion_state.superversion_to_free != nullptr) {
    // somebody already put it there
    delete old_superversion;
  } else {
    deletion_state.superversion_to_free = old_superversion;
  }
}

DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
    SuperVersion* new_superversion) {
  mutex_.AssertHeld();
I
Igor Canadi 已提交
2839
  new_superversion->Init(mem_, imm_.current(), versions_->current());
I
Igor Canadi 已提交
2840 2841
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
T
Tomislav Novak 已提交
2842
  ++super_version_number_;
I
Igor Canadi 已提交
2843 2844 2845 2846 2847 2848 2849
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion; // will let caller delete outside of mutex
  }
  return nullptr;
}

2850 2851 2852
Status DBImpl::GetImpl(const ReadOptions& options,
                       const Slice& key,
                       std::string* value,
2853
                       bool* value_found) {
2854
  Status s;
2855

2856
  StopWatch sw(env_, options_.statistics.get(), DB_GET, false);
2857 2858
  StopWatchNano snapshot_timer(env_, false);
  StartPerfTimer(&snapshot_timer);
2859
  SequenceNumber snapshot;
K
kailiu 已提交
2860

2861
  if (options.snapshot != nullptr) {
2862 2863 2864
    snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
  } else {
    snapshot = versions_->LastSequence();
J
jorlow@chromium.org 已提交
2865
  }
2866

I
Igor Canadi 已提交
2867 2868 2869
  // This can be replaced by using atomics and spinlock instead of big mutex
  mutex_.Lock();
  SuperVersion* get_version = super_version_->Ref();
2870
  mutex_.Unlock();
I
Igor Canadi 已提交
2871

2872
  bool have_stat_update = false;
2873
  Version::GetStats stats;
2874

2875
  // Prepare to store a list of merge operations if merge occurs.
2876
  MergeContext merge_context;
2877

2878
  // First look in the memtable, then in the immutable memtable (if any).
2879
  // s is both in/out. When in, s could either be OK or MergeInProgress.
2880
  // merge_operands will contain the sequence of merges in the latter case.
2881
  LookupKey lkey(key, snapshot);
2882
  BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
I
Igor Canadi 已提交
2883
  if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
2884
    // Done
2885
    RecordTick(options_.statistics.get(), MEMTABLE_HIT);
I
Igor Canadi 已提交
2886
  } else if (get_version->imm->Get(lkey, value, &s, merge_context, options_)) {
2887
    // Done
2888
    RecordTick(options_.statistics.get(), MEMTABLE_HIT);
2889
  } else {
2890 2891 2892
    StopWatchNano from_files_timer(env_, false);
    StartPerfTimer(&from_files_timer);

I
Igor Canadi 已提交
2893 2894
    get_version->current->Get(options, lkey, value, &s, &merge_context, &stats,
                              options_, value_found);
2895
    have_stat_update = true;
2896
    BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer);
2897
    RecordTick(options_.statistics.get(), MEMTABLE_MISS);
2898
  }
2899 2900 2901

  StopWatchNano post_process_timer(env_, false);
  StartPerfTimer(&post_process_timer);
2902

I
Igor Canadi 已提交
2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923
  bool delete_get_version = false;
  if (!options_.disable_seek_compaction && have_stat_update) {
    mutex_.Lock();
    if (get_version->current->UpdateStats(stats)) {
      MaybeScheduleFlushOrCompaction();
    }
    if (get_version->Unref()) {
      get_version->Cleanup();
      delete_get_version = true;
    }
    mutex_.Unlock();
  } else {
    if (get_version->Unref()) {
      mutex_.Lock();
      get_version->Cleanup();
      mutex_.Unlock();
      delete_get_version = true;
    }
  }
  if (delete_get_version) {
    delete get_version;
2924
  }
2925

2926
  // Note, tickers are atomic now - no lock protection needed any more.
2927

2928 2929
  RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
  RecordTick(options_.statistics.get(), BYTES_READ, value->size());
K
Kai Liu 已提交
2930
  BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);
2931
  return s;
J
jorlow@chromium.org 已提交
2932 2933
}

2934 2935 2936
std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
                                     const std::vector<Slice>& keys,
                                     std::vector<std::string>* values) {
2937
  StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false);
2938 2939
  StopWatchNano snapshot_timer(env_, false);
  StartPerfTimer(&snapshot_timer);
K
Kai Liu 已提交
2940

2941
  SequenceNumber snapshot;
2942

2943
  mutex_.Lock();
2944 2945 2946 2947 2948 2949
  if (options.snapshot != nullptr) {
    snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
  } else {
    snapshot = versions_->LastSequence();
  }

2950
  SuperVersion* get_version = super_version_->Ref();
2951
  mutex_.Unlock();
2952

2953 2954 2955
  bool have_stat_update = false;
  Version::GetStats stats;

2956 2957
  // Contain a list of merge operations if merge occurs.
  MergeContext merge_context;
2958

2959 2960 2961 2962 2963 2964 2965
  // Note: this always resizes the values array
  int numKeys = keys.size();
  std::vector<Status> statList(numKeys);
  values->resize(numKeys);

  // Keep track of bytes that we read for statistics-recording later
  uint64_t bytesRead = 0;
2966
  BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
2967 2968 2969 2970

  // For each of the given keys, apply the entire "get" process as follows:
  // First look in the memtable, then in the immutable memtable (if any).
  // s is both in/out. When in, s could either be OK or MergeInProgress.
2971 2972
  // merge_operands will contain the sequence of merges in the latter case.
  for (int i=0; i<numKeys; ++i) {
2973
    merge_context.Clear();
2974 2975 2976 2977
    Status& s = statList[i];
    std::string* value = &(*values)[i];

    LookupKey lkey(keys[i], snapshot);
2978
    if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
2979
      // Done
2980 2981
    } else if (get_version->imm->Get(lkey, value, &s, merge_context,
                                     options_)) {
2982 2983
      // Done
    } else {
2984 2985
      get_version->current->Get(options, lkey, value, &s, &merge_context,
                                &stats, options_);
2986 2987 2988 2989 2990 2991 2992 2993 2994
      have_stat_update = true;
    }

    if (s.ok()) {
      bytesRead += value->size();
    }
  }

  // Post processing (decrement reference counts and record statistics)
2995 2996
  StopWatchNano post_process_timer(env_, false);
  StartPerfTimer(&post_process_timer);
2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017
  bool delete_get_version = false;
  if (!options_.disable_seek_compaction && have_stat_update) {
    mutex_.Lock();
    if (get_version->current->UpdateStats(stats)) {
      MaybeScheduleFlushOrCompaction();
    }
    if (get_version->Unref()) {
      get_version->Cleanup();
      delete_get_version = true;
    }
    mutex_.Unlock();
  } else {
    if (get_version->Unref()) {
      mutex_.Lock();
      get_version->Cleanup();
      mutex_.Unlock();
      delete_get_version = true;
    }
  }
  if (delete_get_version) {
    delete get_version;
3018
  }
3019

3020 3021 3022
  RecordTick(options_.statistics.get(), NUMBER_MULTIGET_CALLS);
  RecordTick(options_.statistics.get(), NUMBER_MULTIGET_KEYS_READ, numKeys);
  RecordTick(options_.statistics.get(), NUMBER_MULTIGET_BYTES_READ, bytesRead);
3023
  BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);
3024 3025 3026 3027

  return statList;
}

3028 3029 3030 3031 3032
bool DBImpl::KeyMayExist(const ReadOptions& options,
                         const Slice& key,
                         std::string* value,
                         bool* value_found) {
  if (value_found != nullptr) {
K
Kai Liu 已提交
3033 3034
    // falsify later if key-may-exist but can't fetch value
    *value_found = true;
3035
  }
3036 3037
  ReadOptions roptions = options;
  roptions.read_tier = kBlockCacheTier; // read from block cache only
K
Kai Liu 已提交
3038 3039 3040 3041 3042 3043
  auto s = GetImpl(roptions, key, value, value_found);

  // If options.block_cache != nullptr and the index block of the table didn't
  // not present in block_cache, the return value will be Status::Incomplete.
  // In this case, key may still exist in the table.
  return s.ok() || s.IsIncomplete();
3044 3045
}

J
jorlow@chromium.org 已提交
3046
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
T
Tomislav Novak 已提交
3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061
  Iterator* iter;

  if (options.tailing) {
    iter = new TailingIterator(this, options, user_comparator());
  } else {
    SequenceNumber latest_snapshot;
    iter = NewInternalIterator(options, &latest_snapshot);

    iter = NewDBIterator(
      &dbname_, env_, options_, user_comparator(), iter,
      (options.snapshot != nullptr
       ? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
       : latest_snapshot));
  }

T
Tyler Harter 已提交
3062 3063 3064 3065 3066 3067 3068
  if (options.prefix) {
    // use extra wrapper to exclude any keys from the results which
    // don't begin with the prefix
    iter = new PrefixFilterIterator(iter, *options.prefix,
                                    options_.prefix_extractor);
  }
  return iter;
J
jorlow@chromium.org 已提交
3069 3070 3071 3072
}

const Snapshot* DBImpl::GetSnapshot() {
  MutexLock l(&mutex_);
3073
  return snapshots_.New(versions_->LastSequence());
J
jorlow@chromium.org 已提交
3074 3075 3076 3077
}

void DBImpl::ReleaseSnapshot(const Snapshot* s) {
  MutexLock l(&mutex_);
3078
  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
J
jorlow@chromium.org 已提交
3079 3080 3081 3082 3083 3084 3085
}

// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
  return DB::Put(o, key, val);
}

3086 3087 3088 3089 3090 3091 3092 3093 3094
Status DBImpl::Merge(const WriteOptions& o, const Slice& key,
                     const Slice& val) {
  if (!options_.merge_operator) {
    return Status::NotSupported("Provide a merge_operator when opening DB");
  } else {
    return DB::Merge(o, key, val);
  }
}

J
jorlow@chromium.org 已提交
3095 3096 3097 3098
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
  return DB::Delete(options, key);
}

3099
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
3100 3101
  StopWatchNano pre_post_process_timer(env_, false);
  StartPerfTimer(&pre_post_process_timer);
3102 3103 3104
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
H
heyongqiang 已提交
3105
  w.disableWAL = options.disableWAL;
3106
  w.done = false;
3107

3108
  StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false);
I
Igor Canadi 已提交
3109
  mutex_.Lock();
3110 3111 3112 3113
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
3114 3115 3116 3117 3118

  if (!options.disableWAL) {
    RecordTick(options_.statistics.get(), WRITE_WITH_WAL, 1);
  }

3119
  if (w.done) {
I
Igor Canadi 已提交
3120
    mutex_.Unlock();
3121
    RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1);
3122
    return w.status;
3123 3124
  } else {
    RecordTick(options_.statistics.get(), WRITE_DONE_BY_SELF, 1);
3125 3126 3127
  }

  // May temporarily unlock and wait.
I
Igor Canadi 已提交
3128 3129
  SuperVersion* superversion_to_free = nullptr;
  Status status = MakeRoomForWrite(my_batch == nullptr, &superversion_to_free);
D
dgrogan@chromium.org 已提交
3130
  uint64_t last_sequence = versions_->LastSequence();
3131
  Writer* last_writer = &w;
3132
  if (status.ok() && my_batch != nullptr) {  // nullptr batch is for compactions
3133 3134
    autovector<WriteBatch*> write_batch_group;
    BuildBatchGroup(&last_writer, &write_batch_group);
3135

3136 3137 3138 3139
    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
    // into mem_.
3140
    {
3141
      mutex_.Unlock();
3142 3143 3144 3145 3146 3147 3148 3149 3150 3151
      WriteBatch* updates = nullptr;
      if (write_batch_group.size() == 1) {
        updates = write_batch_group[0];
      } else {
        updates = &tmp_batch_;
        for (size_t i = 0; i < write_batch_group.size(); ++i) {
          WriteBatchInternal::Append(updates, write_batch_group[i]);
        }
      }

3152 3153 3154 3155 3156
      const SequenceNumber current_sequence = last_sequence + 1;
      WriteBatchInternal::SetSequence(updates, current_sequence);
      int my_batch_count = WriteBatchInternal::Count(updates);
      last_sequence += my_batch_count;
      // Record statistics
3157 3158 3159
      RecordTick(options_.statistics.get(),
                 NUMBER_KEYS_WRITTEN, my_batch_count);
      RecordTick(options_.statistics.get(),
3160 3161
                 BYTES_WRITTEN,
                 WriteBatchInternal::ByteSize(updates));
3162 3163
      if (options.disableWAL) {
        flush_on_destroy_ = true;
3164
      }
3165 3166
      BumpPerfTime(&perf_context.write_pre_and_post_process_time,
                   &pre_post_process_timer);
3167 3168

      if (!options.disableWAL) {
3169 3170
        StopWatchNano timer(env_);
        StartPerfTimer(&timer);
3171 3172 3173 3174
        Slice log_entry = WriteBatchInternal::Contents(updates);
        status = log_->AddRecord(log_entry);
        RecordTick(options_.statistics.get(), WAL_FILE_SYNCED, 1);
        RecordTick(options_.statistics.get(), WAL_FILE_BYTES, log_entry.size());
H
heyongqiang 已提交
3175
        if (status.ok() && options.sync) {
3176
          if (options_.use_fsync) {
3177
            StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
3178
            status = log_->file()->Fsync();
3179
          } else {
3180
            StopWatch(env_, options_.statistics.get(), WAL_FILE_SYNC_MICROS);
3181
            status = log_->file()->Sync();
3182
          }
H
heyongqiang 已提交
3183
        }
3184
        BumpPerfTime(&perf_context.write_wal_time, &timer);
3185 3186
      }
      if (status.ok()) {
3187 3188
        StopWatchNano write_memtable_timer(env_, false);
        StartPerfTimer(&write_memtable_timer);
3189 3190
        status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this,
                                                options_.filter_deletes);
3191
        BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer);
3192 3193 3194 3195 3196 3197 3198
        if (!status.ok()) {
          // Panic for in-memory corruptions
          // Note that existing logic was not sound. Any partial failure writing
          // into the memtable would result in a state that some write ops might
          // have succeeded in memtable but Status reports error for all writes.
          throw std::runtime_error("In memory WriteBatch corruption!");
        }
I
Igor Canadi 已提交
3199 3200
        SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
                       last_sequence);
3201
      }
3202
      StartPerfTimer(&pre_post_process_timer);
3203
      if (updates == &tmp_batch_) tmp_batch_.Clear();
3204 3205
      mutex_.Lock();
      if (status.ok()) {
3206
        versions_->SetLastSequence(last_sequence);
3207
      }
J
jorlow@chromium.org 已提交
3208 3209
    }
  }
I
Igor Canadi 已提交
3210 3211 3212
  if (options_.paranoid_checks && !status.ok() && bg_error_.ok()) {
    bg_error_ = status; // stop compaction & fail any further writes
  }
3213

3214 3215 3216 3217 3218 3219 3220
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
3221
    }
3222 3223
    if (ready == last_writer) break;
  }
3224

3225 3226 3227
  // Notify new head of write queue
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
3228
  }
I
Igor Canadi 已提交
3229 3230
  mutex_.Unlock();
  delete superversion_to_free;
3231 3232
  BumpPerfTime(&perf_context.write_pre_and_post_process_time,
               &pre_post_process_timer);
J
jorlow@chromium.org 已提交
3233 3234 3235
  return status;
}

3236
// REQUIRES: Writer list must be non-empty
3237
// REQUIRES: First writer must have a non-nullptr batch
3238 3239
void DBImpl::BuildBatchGroup(Writer** last_writer,
                             autovector<WriteBatch*>* write_batch_group) {
3240 3241
  assert(!writers_.empty());
  Writer* first = writers_.front();
3242
  assert(first->batch != nullptr);
3243 3244

  size_t size = WriteBatchInternal::ByteSize(first->batch);
3245
  write_batch_group->push_back(first->batch);
3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264

  // Allow the group to grow up to a maximum size, but if the
  // original write is small, limit the growth so we do not slow
  // down the small write too much.
  size_t max_size = 1 << 20;
  if (size <= (128<<10)) {
    max_size = size + (128<<10);
  }

  *last_writer = first;
  std::deque<Writer*>::iterator iter = writers_.begin();
  ++iter;  // Advance past "first"
  for (; iter != writers_.end(); ++iter) {
    Writer* w = *iter;
    if (w->sync && !first->sync) {
      // Do not include a sync write into a batch handled by a non-sync write.
      break;
    }

H
heyongqiang 已提交
3265 3266 3267 3268 3269 3270
    if (!w->disableWAL && first->disableWAL) {
      // Do not include a write that needs WAL into a batch that has
      // WAL disabled.
      break;
    }

3271
    if (w->batch != nullptr) {
3272 3273 3274 3275 3276 3277
      size += WriteBatchInternal::ByteSize(w->batch);
      if (size > max_size) {
        // Do not make batch too big
        break;
      }

3278
      write_batch_group->push_back(w->batch);
3279 3280 3281 3282 3283
    }
    *last_writer = w;
  }
}

3284 3285 3286
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
J
Jim Paton 已提交
3287 3288 3289 3290
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
//                    (top - bottom)
3291 3292 3293 3294
//  and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
M
Mark Callaghan 已提交
3295
uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) {
3296
  uint64_t delay;
J
Jim Paton 已提交
3297
  if (n >= top) {
3298 3299
    delay = 1000;
  }
J
Jim Paton 已提交
3300
  else if (n < bottom) {
3301 3302 3303 3304
    delay = 0;
  }
  else {
    // If we are here, we know that:
J
Jim Paton 已提交
3305
    //   level0_start_slowdown <= n < level0_slowdown
3306
    // since the previous two conditions are false.
M
Mark Callaghan 已提交
3307 3308
    double how_much =
      (double) (n - bottom) /
J
Jim Paton 已提交
3309
              (top - bottom);
M
Mark Callaghan 已提交
3310
    delay = std::max(how_much * how_much * 1000, 100.0);
3311 3312 3313 3314 3315
  }
  assert(delay <= 1000);
  return delay;
}

3316
// REQUIRES: mutex_ is held
3317
// REQUIRES: this thread is currently at the front of the writer queue
I
Igor Canadi 已提交
3318 3319
Status DBImpl::MakeRoomForWrite(bool force,
                                SuperVersion** superversion_to_free) {
3320
  mutex_.AssertHeld();
3321
  assert(!writers_.empty());
3322
  bool allow_delay = !force;
J
Jim Paton 已提交
3323 3324
  bool allow_hard_rate_limit_delay = !force;
  bool allow_soft_rate_limit_delay = !force;
3325
  uint64_t rate_limit_delay_millis = 0;
3326
  Status s;
3327
  double score;
I
Igor Canadi 已提交
3328
  *superversion_to_free = nullptr;
3329

3330 3331 3332 3333 3334
  while (true) {
    if (!bg_error_.ok()) {
      // Yield previous error
      s = bg_error_;
      break;
3335
    } else if (allow_delay && versions_->NeedSlowdownForNumLevel0Files()) {
3336 3337 3338
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, start delaying each
3339
      // individual write by 0-1ms to reduce latency variance.  Also,
3340 3341
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
3342 3343 3344 3345
      uint64_t slowdown =
          SlowdownAmount(versions_->current()->NumLevelFiles(0),
                         options_.level0_slowdown_writes_trigger,
                         options_.level0_stop_writes_trigger);
3346
      mutex_.Unlock();
3347
      uint64_t delayed;
J
Jim Paton 已提交
3348
      {
3349
        StopWatch sw(env_, options_.statistics.get(), STALL_L0_SLOWDOWN_COUNT);
3350
        env_->SleepForMicroseconds(slowdown);
3351
        delayed = sw.ElapsedMicros();
J
Jim Paton 已提交
3352
      }
3353
      RecordTick(options_.statistics.get(), STALL_L0_SLOWDOWN_MICROS, delayed);
I
Igor Canadi 已提交
3354
      internal_stats_.RecordWriteStall(InternalStats::LEVEL0_SLOWDOWN, delayed);
3355 3356
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
3357
      delayed_writes_++;
3358 3359 3360
    } else if (!force &&
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
3361 3362 3363
      if (allow_delay) {
        DelayLoggingAndReset();
      }
3364
      break;
3365
    } else if (imm_.size() == options_.max_write_buffer_number - 1) {
3366
      // We have filled up the current memtable, but the previous
3367 3368
      // ones are still being compacted, so we wait.
      DelayLoggingAndReset();
3369
      Log(options_.info_log, "wait for memtable compaction...\n");
3370
      uint64_t stall;
J
Jim Paton 已提交
3371
      {
3372
        StopWatch sw(env_, options_.statistics.get(),
J
Jim Paton 已提交
3373 3374
          STALL_MEMTABLE_COMPACTION_COUNT);
        bg_cv_.Wait();
3375
        stall = sw.ElapsedMicros();
J
Jim Paton 已提交
3376
      }
3377 3378
      RecordTick(options_.statistics.get(),
                 STALL_MEMTABLE_COMPACTION_MICROS, stall);
I
Igor Canadi 已提交
3379 3380
      internal_stats_.RecordWriteStall(InternalStats::MEMTABLE_COMPACTION,
                                       stall);
3381
    } else if (versions_->current()->NumLevelFiles(0) >=
3382
               options_.level0_stop_writes_trigger) {
3383
      // There are too many level-0 files.
3384 3385
      DelayLoggingAndReset();
      Log(options_.info_log, "wait for fewer level0 files...\n");
3386
      uint64_t stall;
J
Jim Paton 已提交
3387
      {
3388 3389
        StopWatch sw(env_, options_.statistics.get(),
                     STALL_L0_NUM_FILES_COUNT);
J
Jim Paton 已提交
3390
        bg_cv_.Wait();
3391
        stall = sw.ElapsedMicros();
J
Jim Paton 已提交
3392
      }
3393
      RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall);
I
Igor Canadi 已提交
3394
      internal_stats_.RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall);
3395 3396 3397
    } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 &&
               (score = versions_->current()->MaxCompactionScore()) >
                   options_.hard_rate_limit) {
3398
      // Delay a write when the compaction score for any level is too large.
3399
      int max_level = versions_->current()->MaxCompactionScoreLevel();
3400
      mutex_.Unlock();
3401
      uint64_t delayed;
J
Jim Paton 已提交
3402
      {
3403 3404
        StopWatch sw(env_, options_.statistics.get(),
                     HARD_RATE_LIMIT_DELAY_COUNT);
J
Jim Paton 已提交
3405
        env_->SleepForMicroseconds(1000);
3406
        delayed = sw.ElapsedMicros();
J
Jim Paton 已提交
3407
      }
I
Igor Canadi 已提交
3408
      internal_stats_.RecordLevelNSlowdown(max_level, delayed);
3409
      // Make sure the following value doesn't round to zero.
3410 3411
      uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1);
      rate_limit_delay_millis += rate_limit;
3412 3413
      RecordTick(options_.statistics.get(),
                 RATE_LIMIT_DELAY_MILLIS, rate_limit);
J
Jim Paton 已提交
3414 3415 3416 3417
      if (options_.rate_limit_delay_max_milliseconds > 0 &&
          rate_limit_delay_millis >=
          (unsigned)options_.rate_limit_delay_max_milliseconds) {
        allow_hard_rate_limit_delay = false;
3418
      }
3419
      mutex_.Lock();
3420 3421 3422
    } else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 &&
               (score = versions_->current()->MaxCompactionScore()) >
                   options_.soft_rate_limit) {
J
Jim Paton 已提交
3423 3424 3425
      // Delay a write when the compaction score for any level is too large.
      // TODO: add statistics
      mutex_.Unlock();
J
Jim Paton 已提交
3426
      {
3427 3428
        StopWatch sw(env_, options_.statistics.get(),
                     SOFT_RATE_LIMIT_DELAY_COUNT);
J
Jim Paton 已提交
3429 3430 3431 3432 3433 3434 3435
        env_->SleepForMicroseconds(SlowdownAmount(
          score,
          options_.soft_rate_limit,
          options_.hard_rate_limit)
        );
        rate_limit_delay_millis += sw.ElapsedMicros();
      }
J
Jim Paton 已提交
3436 3437
      allow_soft_rate_limit_delay = false;
      mutex_.Lock();
3438

3439
    } else {
3440
      unique_ptr<WritableFile> lfile;
K
kailiu 已提交
3441
      MemTable* new_mem = nullptr;
3442 3443 3444

      // Attempt to switch to a new memtable and trigger compaction of old.
      // Do this without holding the dbmutex lock.
3445 3446
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
I
Igor Canadi 已提交
3447
      SuperVersion* new_superversion = nullptr;
3448 3449 3450 3451 3452
      mutex_.Unlock();
      {
        EnvOptions soptions(storage_options_);
        soptions.use_mmap_writes = false;
        DelayLoggingAndReset();
3453 3454
        s = env_->NewWritableFile(LogFileName(options_.wal_dir, new_log_number),
                                  &lfile, soptions);
3455 3456 3457 3458
        if (s.ok()) {
          // Our final size should be less than write_buffer_size
          // (compression, etc) but err on the side of caution.
          lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
K
kailiu 已提交
3459
          new_mem = new MemTable(internal_comparator_, options_);
K
Kai Liu 已提交
3460
          new_superversion = new SuperVersion();
3461 3462 3463
        }
      }
      mutex_.Lock();
3464
      if (!s.ok()) {
H
heyongqiang 已提交
3465
        // Avoid chewing through file number space in a tight loop.
3466
        versions_->ReuseFileNumber(new_log_number);
K
kailiu 已提交
3467
        assert (!new_mem);
3468 3469
        break;
      }
3470
      logfile_number_ = new_log_number;
3471
      log_.reset(new log::Writer(std::move(lfile)));
3472
      mem_->SetNextLogNumber(logfile_number_);
3473
      imm_.Add(mem_);
3474 3475 3476
      if (force) {
        imm_.FlushRequested();
      }
K
kailiu 已提交
3477
      mem_ = new_mem;
3478
      mem_->Ref();
3479
      Log(options_.info_log,
K
Kai Liu 已提交
3480 3481
          "New memtable created with log file: #%lu\n",
          (unsigned long)logfile_number_);
3482
      mem_->SetLogNumber(logfile_number_);
3483
      force = false;   // Do not force another compaction if have room
3484
      MaybeScheduleFlushOrCompaction();
I
Igor Canadi 已提交
3485
      *superversion_to_free = InstallSuperVersion(new_superversion);
3486 3487 3488 3489 3490
    }
  }
  return s;
}

I
Igor Canadi 已提交
3491 3492 3493 3494
const std::string& DBImpl::GetName() const {
  return dbname_;
}

3495 3496 3497 3498
Env* DBImpl::GetEnv() const {
  return env_;
}

I
Igor Canadi 已提交
3499 3500 3501 3502
const Options& DBImpl::GetOptions() const {
  return options_;
}

3503 3504
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
  value->clear();
J
jorlow@chromium.org 已提交
3505
  MutexLock l(&mutex_);
I
Igor Canadi 已提交
3506 3507
  return internal_stats_.GetProperty(property, value, versions_.get(),
                                     imm_.size());
J
jorlow@chromium.org 已提交
3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535
}

void DBImpl::GetApproximateSizes(
    const Range* range, int n,
    uint64_t* sizes) {
  // TODO(opt): better implementation
  Version* v;
  {
    MutexLock l(&mutex_);
    versions_->current()->Ref();
    v = versions_->current();
  }

  for (int i = 0; i < n; i++) {
    // Convert user_key into a corresponding internal key.
    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
    sizes[i] = (limit >= start ? limit - start : 0);
  }

  {
    MutexLock l(&mutex_);
    v->Unref();
  }
}

3536 3537 3538 3539 3540 3541 3542
inline void DBImpl::DelayLoggingAndReset() {
  if (delayed_writes_ > 0) {
    Log(options_.info_log, "delayed %d write...\n", delayed_writes_ );
    delayed_writes_ = 0;
  }
}

3543 3544 3545
Status DBImpl::DeleteFile(std::string name) {
  uint64_t number;
  FileType type;
3546 3547 3548 3549
  WalFileType log_type;
  if (!ParseFileName(name, &number, &type, &log_type) ||
      (type != kTableFile && type != kLogFile)) {
    Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str());
3550 3551 3552
    return Status::InvalidArgument("Invalid file name");
  }

3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566
  Status status;
  if (type == kLogFile) {
    // Only allow deleting archived log files
    if (log_type != kArchivedLogFile) {
      Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str());
      return Status::NotSupported("Delete only supported for archived logs");
    }
    status = env_->DeleteFile(options_.wal_dir + "/" + name.c_str());
    if (!status.ok()) {
      Log(options_.info_log, "DeleteFile %s failed.\n", name.c_str());
    }
    return status;
  }

3567
  int level;
3568
  FileMetaData* metadata;
3569
  int maxlevel = NumberLevels();
3570
  VersionEdit edit;
K
Kai Liu 已提交
3571
  DeletionState deletion_state(true);
D
Dhruba Borthakur 已提交
3572 3573 3574 3575
  {
    MutexLock l(&mutex_);
    status = versions_->GetMetadataForFile(number, &level, &metadata);
    if (!status.ok()) {
3576 3577
      Log(options_.info_log, "DeleteFile %s failed. File not found\n",
                             name.c_str());
D
Dhruba Borthakur 已提交
3578 3579 3580
      return Status::InvalidArgument("File not found");
    }
    assert((level > 0) && (level < maxlevel));
3581

D
Dhruba Borthakur 已提交
3582
    // If the file is being compacted no need to delete.
3583
    if (metadata->being_compacted) {
3584
      Log(options_.info_log,
3585
          "DeleteFile %s Skipped. File about to be compacted\n", name.c_str());
D
Dhruba Borthakur 已提交
3586
      return Status::OK();
3587 3588
    }

D
Dhruba Borthakur 已提交
3589 3590 3591 3592
    // Only the files in the last level can be deleted externally.
    // This is to make sure that any deletion tombstones are not
    // lost. Check that the level passed is the last level.
    for (int i = level + 1; i < maxlevel; i++) {
3593
      if (versions_->current()->NumLevelFiles(i) != 0) {
D
Dhruba Borthakur 已提交
3594
        Log(options_.info_log,
3595
            "DeleteFile %s FAILED. File not in last level\n", name.c_str());
D
Dhruba Borthakur 已提交
3596 3597 3598 3599
        return Status::InvalidArgument("File not in last level");
      }
    }
    edit.DeleteFile(level, number);
3600
    status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get());
I
Igor Canadi 已提交
3601 3602 3603
    if (status.ok()) {
      InstallSuperVersion(deletion_state);
    }
I
Igor Canadi 已提交
3604
    FindObsoleteFiles(deletion_state, false);
D
Dhruba Borthakur 已提交
3605
  } // lock released here
I
Igor Canadi 已提交
3606
  LogFlush(options_.info_log);
I
Igor Canadi 已提交
3607 3608
  // remove files outside the db-lock
  PurgeObsoleteFiles(deletion_state);
3609 3610 3611
  return status;
}

3612
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata) {
3613 3614 3615 3616
  MutexLock l(&mutex_);
  return versions_->GetLiveFilesMetaData(metadata);
}

3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631
void DBImpl::TEST_GetFilesMetaData(
    std::vector<std::vector<FileMetaData>>* metadata) {
  MutexLock l(&mutex_);
  metadata->resize(NumberLevels());
  for (int level = 0; level < NumberLevels(); level++) {
    const std::vector<FileMetaData*>& files =
      versions_->current()->files_[level];

    (*metadata)[level].clear();
    for (const auto& f : files) {
      (*metadata)[level].push_back(*f);
    }
  }
}

3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658
Status DBImpl::GetDbIdentity(std::string& identity) {
  std::string idfilename = IdentityFileName(dbname_);
  unique_ptr<SequentialFile> idfile;
  const EnvOptions soptions;
  Status s = env_->NewSequentialFile(idfilename, &idfile, soptions);
  if (!s.ok()) {
    return s;
  }
  uint64_t file_size;
  s = env_->GetFileSize(idfilename, &file_size);
  if (!s.ok()) {
    return s;
  }
  char buffer[file_size];
  Slice id;
  s = idfile->Read(file_size, &id, buffer);
  if (!s.ok()) {
    return s;
  }
  identity.assign(id.ToString());
  // If last character is '\n' remove it from identity
  if (identity.size() > 0 && identity.back() == '\n') {
    identity.pop_back();
  }
  return s;
}

J
jorlow@chromium.org 已提交
3659 3660 3661
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
3662 3663 3664 3665
  // Pre-allocate size of write batch conservatively.
  // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
  // and we allocate 11 extra bytes for key length, as well as value length.
  WriteBatch batch(key.size() + value.size() + 24);
J
jorlow@chromium.org 已提交
3666 3667 3668 3669 3670 3671 3672 3673 3674 3675
  batch.Put(key, value);
  return Write(opt, &batch);
}

Status DB::Delete(const WriteOptions& opt, const Slice& key) {
  WriteBatch batch;
  batch.Delete(key);
  return Write(opt, &batch);
}

3676 3677 3678 3679 3680 3681 3682
Status DB::Merge(const WriteOptions& opt, const Slice& key,
                 const Slice& value) {
  WriteBatch batch;
  batch.Merge(key, value);
  return Write(opt, &batch);
}

J
jorlow@chromium.org 已提交
3683 3684
DB::~DB() { }

J
Jim Paton 已提交
3685
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
3686
  *dbptr = nullptr;
H
Haobo Xu 已提交
3687
  EnvOptions soptions;
J
jorlow@chromium.org 已提交
3688

3689
  if (options.block_cache != nullptr && options.no_block_cache) {
3690
    return Status::InvalidArgument(
3691
        "no_block_cache is true while block_cache is not nullptr");
3692
  }
3693

J
jorlow@chromium.org 已提交
3694
  DBImpl* impl = new DBImpl(options, dbname);
3695 3696 3697 3698 3699 3700 3701
  Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
  if (!s.ok()) {
    delete impl;
    return s;
  }

  s = impl->CreateArchivalDirectory();
3702 3703 3704 3705
  if (!s.ok()) {
    delete impl;
    return s;
  }
J
jorlow@chromium.org 已提交
3706
  impl->mutex_.Lock();
I
Igor Canadi 已提交
3707
  s = impl->Recover(); // Handles create_if_missing, error_if_exists
J
jorlow@chromium.org 已提交
3708
  if (s.ok()) {
3709
    uint64_t new_log_number = impl->versions_->NewFileNumber();
3710
    unique_ptr<WritableFile> lfile;
H
Haobo Xu 已提交
3711
    soptions.use_mmap_writes = false;
3712
    s = impl->options_.env->NewWritableFile(
3713 3714 3715 3716
      LogFileName(impl->options_.wal_dir, new_log_number),
      &lfile,
      soptions
    );
J
jorlow@chromium.org 已提交
3717
    if (s.ok()) {
3718
      lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
I
Igor Canadi 已提交
3719
      VersionEdit edit;
3720
      edit.SetLogNumber(new_log_number);
3721
      impl->logfile_number_ = new_log_number;
3722
      impl->log_.reset(new log::Writer(std::move(lfile)));
3723 3724
      s = impl->versions_->LogAndApply(&edit, &impl->mutex_,
                                       impl->db_directory_.get());
J
jorlow@chromium.org 已提交
3725 3726
    }
    if (s.ok()) {
I
Igor Canadi 已提交
3727
      delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
3728
      impl->mem_->SetLogNumber(impl->logfile_number_);
J
jorlow@chromium.org 已提交
3729
      impl->DeleteObsoleteFiles();
3730
      impl->MaybeScheduleFlushOrCompaction();
3731
      impl->MaybeScheduleLogDBDeployStats();
3732
      s = impl->db_directory_->Fsync();
J
jorlow@chromium.org 已提交
3733 3734
    }
  }
3735

3736 3737
  if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) {
    Version* current = impl->versions_->current();
3738
    for (int i = 1; i < impl->NumberLevels(); i++) {
3739
      int num_files = current->NumLevelFiles(i);
3740 3741 3742 3743 3744 3745 3746 3747
      if (num_files > 0) {
        s = Status::InvalidArgument("Not all files are at level 0. Cannot "
          "open with universal compaction style.");
        break;
      }
    }
  }

3748 3749
  impl->mutex_.Unlock();

J
jorlow@chromium.org 已提交
3750 3751 3752 3753 3754 3755 3756 3757
  if (s.ok()) {
    *dbptr = impl;
  } else {
    delete impl;
  }
  return s;
}

3758 3759 3760
Snapshot::~Snapshot() {
}

J
jorlow@chromium.org 已提交
3761
Status DestroyDB(const std::string& dbname, const Options& options) {
3762 3763 3764 3765 3766
  const InternalKeyComparator comparator(options.comparator);
  const InternalFilterPolicy filter_policy(options.filter_policy);
  const Options& soptions(SanitizeOptions(
    dbname, &comparator, &filter_policy, options));
  Env* env = soptions.env;
J
jorlow@chromium.org 已提交
3767
  std::vector<std::string> filenames;
3768 3769
  std::vector<std::string> archiveFiles;

3770
  std::string archivedir = ArchivalDirectory(dbname);
J
jorlow@chromium.org 已提交
3771 3772
  // Ignore error in case directory does not exist
  env->GetChildren(dbname, &filenames);
3773 3774 3775 3776 3777 3778 3779

  if (dbname != soptions.wal_dir) {
    std::vector<std::string> logfilenames;
    env->GetChildren(soptions.wal_dir, &logfilenames);
    filenames.insert(filenames.end(), logfilenames.begin(), logfilenames.end());
    archivedir = ArchivalDirectory(soptions.wal_dir);
  }
3780

J
jorlow@chromium.org 已提交
3781 3782 3783 3784 3785
  if (filenames.empty()) {
    return Status::OK();
  }

  FileLock* lock;
3786 3787
  const std::string lockname = LockFileName(dbname);
  Status result = env->LockFile(lockname, &lock);
J
jorlow@chromium.org 已提交
3788 3789 3790
  if (result.ok()) {
    uint64_t number;
    FileType type;
D
dgrogan@chromium.org 已提交
3791
    for (size_t i = 0; i < filenames.size(); i++) {
3792
      if (ParseFileName(filenames[i], &number, &type) &&
3793
          type != kDBLockFile) {  // Lock file will be deleted at end
K
Kosie van der Merwe 已提交
3794 3795 3796
        Status del;
        if (type == kMetaDatabase) {
          del = DestroyDB(dbname + "/" + filenames[i], options);
3797 3798
        } else if (type == kLogFile) {
          del = env->DeleteFile(soptions.wal_dir + "/" + filenames[i]);
K
Kosie van der Merwe 已提交
3799 3800 3801
        } else {
          del = env->DeleteFile(dbname + "/" + filenames[i]);
        }
J
jorlow@chromium.org 已提交
3802 3803 3804 3805 3806
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
3807

3808
    env->GetChildren(archivedir, &archiveFiles);
3809 3810
    // Delete archival files.
    for (size_t i = 0; i < archiveFiles.size(); ++i) {
3811 3812
      if (ParseFileName(archiveFiles[i], &number, &type) &&
          type == kLogFile) {
3813
        Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]);
3814 3815 3816 3817 3818
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
3819
    // ignore case where no archival directory is present.
3820
    env->DeleteDir(archivedir);
3821

J
jorlow@chromium.org 已提交
3822
    env->UnlockFile(lock);  // Ignore error since state is already gone
3823
    env->DeleteFile(lockname);
J
jorlow@chromium.org 已提交
3824
    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
3825
    env->DeleteDir(soptions.wal_dir);
J
jorlow@chromium.org 已提交
3826 3827 3828 3829
  }
  return result;
}

3830 3831
//
// A global method that can dump out the build version
K
kailiu 已提交
3832
void DumpLeveldbBuildVersion(Logger * log) {
3833
  Log(log, "Git sha %s", rocksdb_build_git_sha);
3834
  Log(log, "Compile time %s %s",
3835
      rocksdb_build_compile_time, rocksdb_build_compile_date);
3836 3837
}

3838
}  // namespace rocksdb