db_impl.cc 119.0 KB
Newer Older
1 2 3 4 5
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
J
jorlow@chromium.org 已提交
6 7 8 9 10 11
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/db_impl.h"

L
liuhuahang 已提交
12
#ifndef __STDC_FORMAT_MACROS
I
Igor Canadi 已提交
13
#define __STDC_FORMAT_MACROS
L
liuhuahang 已提交
14 15
#endif

I
Igor Canadi 已提交
16
#include <inttypes.h>
J
jorlow@chromium.org 已提交
17
#include <algorithm>
18 19
#include <climits>
#include <cstdio>
J
jorlow@chromium.org 已提交
20
#include <set>
21
#include <stdexcept>
22 23
#include <stdint.h>
#include <string>
24
#include <unordered_set>
25
#include <unordered_map>
T
Tomislav Novak 已提交
26
#include <utility>
27
#include <vector>
28

J
jorlow@chromium.org 已提交
29
#include "db/builder.h"
I
Igor Canadi 已提交
30
#include "db/flush_job.h"
I
Igor Canadi 已提交
31
#include "db/compaction_job.h"
32
#include "db/db_iter.h"
K
kailiu 已提交
33
#include "db/dbformat.h"
J
jorlow@chromium.org 已提交
34 35 36 37
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
K
kailiu 已提交
38
#include "db/memtable_list.h"
39
#include "db/merge_context.h"
40
#include "db/merge_helper.h"
J
jorlow@chromium.org 已提交
41
#include "db/table_cache.h"
K
kailiu 已提交
42
#include "db/table_properties_collector.h"
L
Lei Jin 已提交
43
#include "db/forward_iterator.h"
44
#include "db/transaction_log_impl.h"
J
jorlow@chromium.org 已提交
45 46
#include "db/version_set.h"
#include "db/write_batch_internal.h"
47
#include "port/port.h"
I
Igor Canadi 已提交
48
#include "rocksdb/cache.h"
49
#include "port/likely.h"
50 51 52 53
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
I
Igor Canadi 已提交
54
#include "rocksdb/version.h"
55 56
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
S
Siying Dong 已提交
57
#include "rocksdb/table.h"
J
jorlow@chromium.org 已提交
58
#include "table/block.h"
59
#include "table/block_based_table_factory.h"
J
jorlow@chromium.org 已提交
60
#include "table/merger.h"
K
kailiu 已提交
61
#include "table/table_builder.h"
J
jorlow@chromium.org 已提交
62
#include "table/two_level_iterator.h"
63
#include "util/auto_roll_logger.h"
K
kailiu 已提交
64
#include "util/autovector.h"
65
#include "util/build_version.h"
J
jorlow@chromium.org 已提交
66
#include "util/coding.h"
I
Igor Canadi 已提交
67
#include "util/hash_skiplist_rep.h"
68
#include "util/hash_linklist_rep.h"
J
jorlow@chromium.org 已提交
69
#include "util/logging.h"
H
Haobo Xu 已提交
70
#include "util/log_buffer.h"
J
jorlow@chromium.org 已提交
71
#include "util/mutexlock.h"
72
#include "util/perf_context_imp.h"
73
#include "util/iostats_context_imp.h"
74
#include "util/stop_watch.h"
75
#include "util/sync_point.h"
J
jorlow@chromium.org 已提交
76

77
namespace rocksdb {
J
jorlow@chromium.org 已提交
78

79
const std::string kDefaultColumnFamilyName("default");
80

I
Igor Canadi 已提交
81
void DumpRocksDBBuildVersion(Logger * log);
82

S
Stanislau Hlebik 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96
struct DBImpl::WriteContext {
  autovector<SuperVersion*> superversions_to_free_;
  autovector<log::Writer*> logs_to_free_;

  ~WriteContext() {
    for (auto& sv : superversions_to_free_) {
      delete sv;
    }
    for (auto& log : logs_to_free_) {
      delete log;
    }
  }
};

J
jorlow@chromium.org 已提交
97 98 99
Options SanitizeOptions(const std::string& dbname,
                        const InternalKeyComparator* icmp,
                        const Options& src) {
100
  auto db_options = SanitizeOptions(dbname, DBOptions(src));
101
  auto cf_options = SanitizeOptions(icmp, ColumnFamilyOptions(src));
102 103 104 105 106
  return Options(db_options, cf_options);
}

DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
  DBOptions result = src;
L
Lei Jin 已提交
107

108 109
  // result.max_open_files means an "infinite" open files.
  if (result.max_open_files != -1) {
110
    ClipToRange(&result.max_open_files, 20, 1000000);
111
  }
112

113
  if (result.info_log == nullptr) {
K
Kai Liu 已提交
114 115
    Status s = CreateLoggerFromOptions(dbname, result.db_log_dir, src.env,
                                       result, &result.info_log);
J
jorlow@chromium.org 已提交
116 117
    if (!s.ok()) {
      // No place suitable for logging
118
      result.info_log = nullptr;
J
jorlow@chromium.org 已提交
119 120
    }
  }
121 122 123 124
  result.env->IncBackgroundThreadsIfNeeded(src.max_background_compactions,
                                           Env::Priority::LOW);
  result.env->IncBackgroundThreadsIfNeeded(src.max_background_flushes,
                                           Env::Priority::HIGH);
125

126 127 128 129 130 131
  if (!result.rate_limiter) {
    if (result.bytes_per_sync == 0) {
      result.bytes_per_sync = 1024 * 1024;
    }
  }

132 133 134 135
  if (result.wal_dir.empty()) {
    // Use dbname as default
    result.wal_dir = dbname;
  }
136
  if (result.wal_dir.back() == '/') {
I
Igor Canadi 已提交
137
    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
138
  }
139

140
  if (result.db_paths.size() == 0) {
141
    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
142 143
  }

J
jorlow@chromium.org 已提交
144 145 146
  return result;
}

147 148
namespace {

149 150
Status SanitizeOptionsByTable(
    const DBOptions& db_opts,
151 152 153
    const std::vector<ColumnFamilyDescriptor>& column_families) {
  Status s;
  for (auto cf : column_families) {
154
    s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
155 156 157 158 159 160 161
    if (!s.ok()) {
      return s;
    }
  }
  return Status::OK();
}

162
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions) {
163 164 165 166 167 168
  // Compressing memtable flushes might not help unless the sequential load
  // optimization is used for leveled compaction. Otherwise the CPU and
  // latency overhead is not offset by saving much space.

  bool can_compress;

169
  if (ioptions.compaction_style == kCompactionStyleUniversal) {
170
    can_compress =
171
        (ioptions.compaction_options_universal.compression_size_percent < 0);
172 173
  } else {
    // For leveled compress when min_level_to_compress == 0.
174 175
    can_compress = ioptions.compression_per_level.empty() ||
                   ioptions.compression_per_level[0] != kNoCompression;
176 177 178
  }

  if (can_compress) {
179
    return ioptions.compression;
180 181 182 183
  } else {
    return kNoCompression;
  }
}
184
}  // namespace
185

I
Igor Canadi 已提交
186
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
J
jorlow@chromium.org 已提交
187
    : env_(options.env),
H
heyongqiang 已提交
188
      dbname_(dbname),
189 190
      db_options_(SanitizeOptions(dbname, options)),
      stats_(db_options_.statistics.get()),
191
      db_lock_(nullptr),
H
Haobo Xu 已提交
192
      mutex_(options.use_adaptive_mutex),
I
Igor Canadi 已提交
193
      shutting_down_(false),
J
jorlow@chromium.org 已提交
194
      bg_cv_(&mutex_),
195
      logfile_number_(0),
I
Igor Canadi 已提交
196
      log_empty_(true),
197
      default_cf_handle_(nullptr),
I
Igor Canadi 已提交
198 199
      total_log_size_(0),
      max_total_in_memory_state_(0),
200
      tmp_batch_(),
201
      bg_schedule_needed_(false),
202
      bg_compaction_scheduled_(0),
203
      bg_manual_only_(0),
204
      bg_flush_scheduled_(0),
205
      manual_compaction_(nullptr),
206
      disable_delete_obsolete_files_(0),
I
Igor Canadi 已提交
207
      delete_obsolete_files_last_run_(options.env->NowMicros()),
208
      last_stats_dump_time_microsec_(0),
209
      flush_on_destroy_(false),
L
Lei Jin 已提交
210
      env_options_(options),
I
Igor Canadi 已提交
211 212 213
#ifndef ROCKSDB_LITE
      wal_manager_(db_options_, env_options_),
#endif  // ROCKSDB_LITE
214
      bg_work_gate_closed_(false),
215 216
      refitting_level_(false),
      opened_successfully_(false) {
H
heyongqiang 已提交
217
  env_->GetAbsolutePath(dbname, &db_absolute_path_);
218

J
jorlow@chromium.org 已提交
219
  // Reserve ten files or so for other uses and give the rest to TableCache.
220
  // Give a large number for setting of "infinite" open files.
221 222
  const int table_cache_size = (db_options_.max_open_files == -1) ?
        4194304 : db_options_.max_open_files - 10;
223 224
  // Reserve ten files or so for other uses and give the rest to TableCache.
  table_cache_ =
225 226
      NewLRUCache(table_cache_size, db_options_.table_cache_numshardbits,
                  db_options_.table_cache_remove_scan_count_limit);
227

228 229
  versions_.reset(new VersionSet(dbname_, &db_options_, env_options_,
                                 table_cache_.get(), &write_controller_));
I
Igor Canadi 已提交
230 231
  column_family_memtables_.reset(new ColumnFamilyMemTablesImpl(
      versions_->GetColumnFamilySet(), &flush_scheduler_));
232

I
Igor Canadi 已提交
233
  DumpRocksDBBuildVersion(db_options_.info_log.get());
234 235
  DumpDBFileSummary(db_options_, dbname_);
  db_options_.Dump(db_options_.info_log.get());
236

237
  LogFlush(db_options_.info_log);
J
jorlow@chromium.org 已提交
238 239 240
}

DBImpl::~DBImpl() {
241 242 243
  mutex_.Lock();
  if (flush_on_destroy_) {
    for (auto cfd : *versions_->GetColumnFamilySet()) {
244
      if (!cfd->mem()->IsEmpty()) {
245 246 247 248
        cfd->Ref();
        mutex_.Unlock();
        FlushMemTable(cfd, FlushOptions());
        mutex_.Lock();
249
        cfd->Unref();
250 251
      }
    }
252
    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
253
  }
I
Igor Canadi 已提交
254 255

  // Wait for background work to finish
I
Igor Canadi 已提交
256
  shutting_down_.store(true, std::memory_order_release);
I
Igor Canadi 已提交
257
  while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
H
hans@chromium.org 已提交
258
    bg_cv_.Wait();
J
jorlow@chromium.org 已提交
259
  }
260

I
Igor Canadi 已提交
261 262
  flush_scheduler_.Clear();

I
Igor Canadi 已提交
263 264 265 266 267
  if (default_cf_handle_ != nullptr) {
    // we need to delete handle outside of lock because it does its own locking
    mutex_.Unlock();
    delete default_cf_handle_;
    mutex_.Lock();
268 269
  }

I
Igor Canadi 已提交
270 271 272 273 274 275 276 277 278 279
  // Clean up obsolete files due to SuperVersion release.
  // (1) Need to delete to obsolete files before closing because RepairDB()
  // scans all existing files in the file system and builds manifest file.
  // Keeping obsolete files confuses the repair process.
  // (2) Need to check if we Open()/Recover() the DB successfully before
  // deleting because if VersionSet recover fails (may be due to corrupted
  // manifest file), it is not able to identify live files correctly. As a
  // result, all "live" files can get deleted by accident. However, corrupted
  // manifest is recoverable by RepairDB().
  if (opened_successfully_) {
I
Igor Canadi 已提交
280 281
    JobContext job_context;
    FindObsoleteFiles(&job_context, true);
I
Igor Canadi 已提交
282
    // manifest number starting from 2
I
Igor Canadi 已提交
283 284 285
    job_context.manifest_file_number = 1;
    if (job_context.HaveSomethingToDelete()) {
      PurgeObsoleteFiles(job_context);
286 287 288
    }
  }

289
  // versions need to be destroyed before table_cache since it can hold
290 291
  // references to table_cache.
  versions_.reset();
292
  mutex_.Unlock();
I
Igor Canadi 已提交
293 294 295
  if (db_lock_ != nullptr) {
    env_->UnlockFile(db_lock_);
  }
296

297
  LogFlush(db_options_.info_log);
J
jorlow@chromium.org 已提交
298 299 300
}

Status DBImpl::NewDB() {
301
  VersionEdit new_db;
302
  new_db.SetLogNumber(0);
J
jorlow@chromium.org 已提交
303 304 305
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);

306 307
  Log(InfoLogLevel::INFO_LEVEL,
      db_options_.info_log, "Creating manifest 1 \n");
J
jorlow@chromium.org 已提交
308
  const std::string manifest = DescriptorFileName(dbname_, 1);
309
  unique_ptr<WritableFile> file;
I
Igor Canadi 已提交
310
  Status s = env_->NewWritableFile(
L
Lei Jin 已提交
311
      manifest, &file, env_->OptimizeForManifestWrite(env_options_));
J
jorlow@chromium.org 已提交
312 313 314
  if (!s.ok()) {
    return s;
  }
315
  file->SetPreallocationBlockSize(db_options_.manifest_preallocation_size);
J
jorlow@chromium.org 已提交
316
  {
317
    log::Writer log(std::move(file));
J
jorlow@chromium.org 已提交
318 319 320 321 322 323
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
  }
  if (s.ok()) {
    // Make "CURRENT" file that points to the new manifest file.
324
    s = SetCurrentFile(env_, dbname_, 1, db_directory_.get());
J
jorlow@chromium.org 已提交
325 326 327 328 329 330 331
  } else {
    env_->DeleteFile(manifest);
  }
  return s;
}

void DBImpl::MaybeIgnoreError(Status* s) const {
332
  if (s->ok() || db_options_.paranoid_checks) {
J
jorlow@chromium.org 已提交
333 334
    // No change needed
  } else {
335 336
    Log(InfoLogLevel::WARN_LEVEL,
        db_options_.info_log, "Ignoring error %s", s->ToString().c_str());
J
jorlow@chromium.org 已提交
337 338 339 340
    *s = Status::OK();
  }
}

341
const Status DBImpl::CreateArchivalDirectory() {
342 343
  if (db_options_.WAL_ttl_seconds > 0 || db_options_.WAL_size_limit_MB > 0) {
    std::string archivalPath = ArchivalDirectory(db_options_.wal_dir);
344 345 346 347 348
    return env_->CreateDirIfMissing(archivalPath);
  }
  return Status::OK();
}

349
void DBImpl::PrintStatistics() {
350
  auto dbstats = db_options_.statistics.get();
351
  if (dbstats) {
352
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
353 354
        "STATISTCS:\n %s",
        dbstats->ToString().c_str());
355 356 357
  }
}

358
void DBImpl::MaybeDumpStats() {
359
  if (db_options_.stats_dump_period_sec == 0) return;
H
Haobo Xu 已提交
360 361 362 363

  const uint64_t now_micros = env_->NowMicros();

  if (last_stats_dump_time_microsec_ +
364
      db_options_.stats_dump_period_sec * 1000000
H
Haobo Xu 已提交
365 366 367 368 369 370
      <= now_micros) {
    // Multiple threads could race in here simultaneously.
    // However, the last one will update last_stats_dump_time_microsec_
    // atomically. We could see more than one dump during one dump
    // period in rare cases.
    last_stats_dump_time_microsec_ = now_micros;
371

372 373 374 375 376 377
    bool tmp1 = false;
    bool tmp2 = false;
    DBPropertyType cf_property_type =
        GetPropertyType("rocksdb.cfstats", &tmp1, &tmp2);
    DBPropertyType db_property_type =
        GetPropertyType("rocksdb.dbstats", &tmp1, &tmp2);
H
Haobo Xu 已提交
378
    std::string stats;
379 380 381
    {
      MutexLock l(&mutex_);
      for (auto cfd : *versions_->GetColumnFamilySet()) {
382 383
        cfd->internal_stats()->GetStringProperty(cf_property_type,
                                                 "rocksdb.cfstats", &stats);
384
      }
385 386
      default_cf_internal_stats_->GetStringProperty(db_property_type,
                                                    "rocksdb.dbstats", &stats);
387
    }
388 389 390 391
    Log(InfoLogLevel::INFO_LEVEL,
        db_options_.info_log, "------- DUMPING STATS -------");
    Log(InfoLogLevel::INFO_LEVEL,
        db_options_.info_log, "%s", stats.c_str());
392

393
    PrintStatistics();
394 395 396
  }
}

397
// Returns the list of live files in 'sst_live' and the list
K
kailiu 已提交
398
// of all files in the filesystem in 'candidate_files'.
I
Igor Canadi 已提交
399 400
// no_full_scan = true -- never do the full scan using GetChildren()
// force = false -- don't force the full scan, except every
401
//  db_options_.delete_obsolete_files_period_micros
I
Igor Canadi 已提交
402
// force = true -- force the full scan
I
Igor Canadi 已提交
403
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
I
Igor Canadi 已提交
404
                               bool no_full_scan) {
D
Dhruba Borthakur 已提交
405 406
  mutex_.AssertHeld();

407
  // if deletion is disabled, do nothing
408
  if (disable_delete_obsolete_files_ > 0) {
409 410 411
    return;
  }

412 413 414 415 416
  bool doing_the_full_scan = false;

  // logic for figurint out if we're doing the full scan
  if (no_full_scan) {
    doing_the_full_scan = false;
417
  } else if (force || db_options_.delete_obsolete_files_period_micros == 0) {
418 419 420 421
    doing_the_full_scan = true;
  } else {
    const uint64_t now_micros = env_->NowMicros();
    if (delete_obsolete_files_last_run_ +
422
        db_options_.delete_obsolete_files_period_micros < now_micros) {
423 424 425 426 427
      doing_the_full_scan = true;
      delete_obsolete_files_last_run_ = now_micros;
    }
  }

I
Igor Canadi 已提交
428
  // get obsolete files
I
Igor Canadi 已提交
429
  versions_->GetObsoleteFiles(&job_context->sst_delete_files);
I
Igor Canadi 已提交
430

I
Igor Canadi 已提交
431
  // store the current filenum, lognum, etc
432
  job_context->manifest_file_number = versions_->manifest_file_number();
I
Igor Canadi 已提交
433
  job_context->pending_manifest_file_number =
434
      versions_->pending_manifest_file_number();
I
Igor Canadi 已提交
435
  job_context->log_number = versions_->MinLogNumber();
436
  job_context->prev_log_number = versions_->prev_log_number();
I
Igor Canadi 已提交
437

I
Igor Canadi 已提交
438
  if (!doing_the_full_scan && !job_context->HaveSomethingToDelete()) {
439 440 441 442 443 444 445
    // avoid filling up sst_live if we're sure that we
    // are not going to do the full scan and that we don't have
    // anything to delete at the moment
    return;
  }

  // don't delete live files
446
  for (auto pair : pending_outputs_) {
I
Igor Canadi 已提交
447
    job_context->sst_live.emplace_back(pair.first, pair.second, 0);
448
  }
I
Igor Canadi 已提交
449
  versions_->AddLiveFiles(&job_context->sst_live);
I
Igor Canadi 已提交
450

451
  if (doing_the_full_scan) {
452 453
    for (uint32_t path_id = 0;
         path_id < db_options_.db_paths.size(); path_id++) {
454 455 456
      // set of all files in the directory. We'll exclude files that are still
      // alive in the subsequent processings.
      std::vector<std::string> files;
457
      env_->GetChildren(db_options_.db_paths[path_id].path,
458
                        &files);  // Ignore errors
459
      for (std::string file : files) {
I
Igor Canadi 已提交
460
        // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
I
Igor Canadi 已提交
461
        job_context->candidate_files.emplace_back("/" + file, path_id);
462 463
      }
    }
464 465

    //Add log files in wal_dir
466
    if (db_options_.wal_dir != dbname_) {
467
      std::vector<std::string> log_files;
468
      env_->GetChildren(db_options_.wal_dir, &log_files);  // Ignore errors
469
      for (std::string log_file : log_files) {
I
Igor Canadi 已提交
470
        job_context->candidate_files.emplace_back(log_file, 0);
471 472
      }
    }
473
    // Add info log files in db_log_dir
474
    if (!db_options_.db_log_dir.empty() && db_options_.db_log_dir != dbname_) {
475
      std::vector<std::string> info_log_files;
476 477
      // Ignore errors
      env_->GetChildren(db_options_.db_log_dir, &info_log_files);
478
      for (std::string log_file : info_log_files) {
I
Igor Canadi 已提交
479
        job_context->candidate_files.emplace_back(log_file, 0);
480 481
      }
    }
482
  }
483 484
}

485
namespace {
I
Igor Canadi 已提交
486 487
bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
                          const JobContext::CandidateFileInfo& second) {
488 489 490 491 492
  if (first.file_name > second.file_name) {
    return true;
  } else if (first.file_name < second.file_name) {
    return false;
  } else {
493
    return (first.path_id > second.path_id);
494 495 496 497
  }
}
};  // namespace

D
Dhruba Borthakur 已提交
498
// Diffs the files listed in filenames and those that do not
I
Igor Canadi 已提交
499
// belong to live files are posibly removed. Also, removes all the
500
// files in sst_delete_files and log_delete_files.
501
// It is not necessary to hold the mutex when invoking this method.
I
Igor Canadi 已提交
502
void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
503 504
  // we'd better have sth to delete
  assert(state.HaveSomethingToDelete());
505

I
Igor Canadi 已提交
506 507 508 509
  // this checks if FindObsoleteFiles() was run before. If not, don't do
  // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
  // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
  if (state.manifest_file_number == 0) {
I
Igor Canadi 已提交
510 511
    return;
  }
512

513
  // Now, convert live list to an unordered map, WITHOUT mutex held;
514
  // set is slow.
515
  std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
I
Igor Canadi 已提交
516
  for (const FileDescriptor& fd : state.sst_live) {
517 518
    sst_live_map[fd.GetNumber()] = &fd;
  }
I
Igor Canadi 已提交
519

I
Igor Canadi 已提交
520 521 522 523
  auto candidate_files = state.candidate_files;
  candidate_files.reserve(candidate_files.size() +
                          state.sst_delete_files.size() +
                          state.log_delete_files.size());
K
kailiu 已提交
524 525
  // We may ignore the dbname when generating the file names.
  const char* kDumbDbName = "";
526
  for (auto file : state.sst_delete_files) {
527 528 529
    candidate_files.emplace_back(
        MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
        file->fd.GetPathId());
530
    delete file;
I
Igor Canadi 已提交
531 532
  }

K
kailiu 已提交
533 534
  for (auto file_num : state.log_delete_files) {
    if (file_num > 0) {
535 536
      candidate_files.emplace_back(LogFileName(kDumbDbName, file_num).substr(1),
                                   0);
I
Igor Canadi 已提交
537 538
    }
  }
539

K
kailiu 已提交
540
  // dedup state.candidate_files so we don't try to delete the same
I
Igor Canadi 已提交
541
  // file twice
542
  sort(candidate_files.begin(), candidate_files.end(), CompareCandidateFile);
543 544
  candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()),
                        candidate_files.end());
J
jorlow@chromium.org 已提交
545

546
  std::vector<std::string> old_info_log_files;
547
  InfoLogPrefix info_log_prefix(!db_options_.db_log_dir.empty(), dbname_);
548 549 550
  for (const auto& candidate_file : candidate_files) {
    std::string to_delete = candidate_file.file_name;
    uint32_t path_id = candidate_file.path_id;
K
kailiu 已提交
551 552 553
    uint64_t number;
    FileType type;
    // Ignore file if we cannot recognize it.
554
    if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
K
kailiu 已提交
555 556
      continue;
    }
J
jorlow@chromium.org 已提交
557

K
kailiu 已提交
558 559 560 561 562 563 564 565
    bool keep = true;
    switch (type) {
      case kLogFile:
        keep = ((number >= state.log_number) ||
                (number == state.prev_log_number));
        break;
      case kDescriptorFile:
        // Keep my manifest file, and any newer incarnations'
566
        // (can happen during manifest roll)
K
kailiu 已提交
567 568 569
        keep = (number >= state.manifest_file_number);
        break;
      case kTableFile:
570
        keep = (sst_live_map.find(number) != sst_live_map.end());
K
kailiu 已提交
571 572 573
        break;
      case kTempFile:
        // Any temp files that are currently being written to must
574 575 576 577
        // be recorded in pending_outputs_, which is inserted into "live".
        // Also, SetCurrentFile creates a temp file when writing out new
        // manifest, which is equal to state.pending_manifest_file_number. We
        // should not delete that file
578
        keep = (sst_live_map.find(number) != sst_live_map.end()) ||
579
               (number == state.pending_manifest_file_number);
K
kailiu 已提交
580 581 582 583
        break;
      case kInfoLogFile:
        keep = true;
        if (number != 0) {
584
          old_info_log_files.push_back(to_delete);
J
jorlow@chromium.org 已提交
585
        }
K
kailiu 已提交
586 587 588 589 590 591 592 593 594 595 596 597 598
        break;
      case kCurrentFile:
      case kDBLockFile:
      case kIdentityFile:
      case kMetaDatabase:
        keep = true;
        break;
    }

    if (keep) {
      continue;
    }

599
    std::string fname;
K
kailiu 已提交
600 601
    if (type == kTableFile) {
      // evict from cache
602
      TableCache::Evict(table_cache_.get(), number);
603
      fname = TableFileName(db_options_.db_paths, number, path_id);
604
    } else {
605 606
      fname = ((type == kLogFile) ?
          db_options_.wal_dir : dbname_) + "/" + to_delete;
K
kailiu 已提交
607
    }
608

I
Igor Canadi 已提交
609 610
#ifdef ROCKSDB_LITE
    Status s = env_->DeleteFile(fname);
611 612
    Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
        "Delete %s type=%d #%" PRIu64 " -- %s\n",
I
Igor Canadi 已提交
613 614 615 616 617
        fname.c_str(), type, number, s.ToString().c_str());
#else   // not ROCKSDB_LITE
    if (type == kLogFile && (db_options_.WAL_ttl_seconds > 0 ||
                             db_options_.WAL_size_limit_MB > 0)) {
      wal_manager_.ArchiveWALFile(fname, number);
K
kailiu 已提交
618 619
    } else {
      Status s = env_->DeleteFile(fname);
620 621
      Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
          "Delete %s type=%d #%" PRIu64 " -- %s\n",
622
          fname.c_str(), type, number, s.ToString().c_str());
J
jorlow@chromium.org 已提交
623
    }
I
Igor Canadi 已提交
624
#endif  // ROCKSDB_LITE
J
jorlow@chromium.org 已提交
625
  }
H
heyongqiang 已提交
626

627
  // Delete old info log files.
628
  size_t old_info_log_file_count = old_info_log_files.size();
629
  if (old_info_log_file_count >= db_options_.keep_log_file_num) {
630
    std::sort(old_info_log_files.begin(), old_info_log_files.end());
631
    size_t end = old_info_log_file_count - db_options_.keep_log_file_num;
632
    for (unsigned int i = 0; i <= end; i++) {
633
      std::string& to_delete = old_info_log_files.at(i);
634 635
      std::string full_path_to_delete = (db_options_.db_log_dir.empty() ?
           dbname_ : db_options_.db_log_dir) + "/" + to_delete;
636 637
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
          "Delete info log file %s\n",
638 639
          full_path_to_delete.c_str());
      Status s = env_->DeleteFile(full_path_to_delete);
640
      if (!s.ok()) {
641 642
        Log(InfoLogLevel::ERROR_LEVEL,
            db_options_.info_log, "Delete info log file %s FAILED -- %s\n",
643 644
            to_delete.c_str(), s.ToString().c_str());
      }
H
heyongqiang 已提交
645 646
    }
  }
I
Igor Canadi 已提交
647 648 649
#ifndef ROCKSDB_LITE
  wal_manager_.PurgeObsoleteWALFiles();
#endif  // ROCKSDB_LITE
650
  LogFlush(db_options_.info_log);
D
Dhruba Borthakur 已提交
651 652 653 654
}

void DBImpl::DeleteObsoleteFiles() {
  mutex_.AssertHeld();
I
Igor Canadi 已提交
655 656 657 658
  JobContext job_context;
  FindObsoleteFiles(&job_context, true);
  if (job_context.HaveSomethingToDelete()) {
    PurgeObsoleteFiles(job_context);
659
  }
660 661
}

662
Status DBImpl::Recover(
663 664
    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
    bool error_if_log_file_exist) {
J
jorlow@chromium.org 已提交
665 666
  mutex_.AssertHeld();

667
  bool is_new_db = false;
668
  assert(db_lock_ == nullptr);
I
Igor Canadi 已提交
669
  if (!read_only) {
670 671 672 673 674 675 676 677 678 679 680 681
    // We call CreateDirIfMissing() as the directory may already exist (if we
    // are reopening a DB), when this happens we don't want creating the
    // directory to cause an error. However, we need to check if creating the
    // directory fails or else we may get an obscure message about the lock
    // file not existing. One real-world example of this occurring is if
    // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
    // when dbname_ is "dir/db" but when "dir" doesn't exist.
    Status s = env_->CreateDirIfMissing(dbname_);
    if (!s.ok()) {
      return s;
    }

682
    for (auto& db_path : db_options_.db_paths) {
683
      s = env_->CreateDirIfMissing(db_path.path);
684 685 686 687 688
      if (!s.ok()) {
        return s;
      }
    }

689 690 691 692 693
    s = env_->NewDirectory(dbname_, &db_directory_);
    if (!s.ok()) {
      return s;
    }

694
    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
695 696 697
    if (!s.ok()) {
      return s;
    }
J
jorlow@chromium.org 已提交
698

699
    if (!env_->FileExists(CurrentFileName(dbname_))) {
700
      if (db_options_.create_if_missing) {
701
        s = NewDB();
702
        is_new_db = true;
703 704 705 706 707 708
        if (!s.ok()) {
          return s;
        }
      } else {
        return Status::InvalidArgument(
            dbname_, "does not exist (create_if_missing is false)");
J
jorlow@chromium.org 已提交
709 710
      }
    } else {
711
      if (db_options_.error_if_exists) {
712 713 714
        return Status::InvalidArgument(
            dbname_, "exists (error_if_exists is true)");
      }
J
jorlow@chromium.org 已提交
715
    }
M
Mayank Agarwal 已提交
716 717 718 719 720 721 722
    // Check for the IDENTITY file and create it if not there
    if (!env_->FileExists(IdentityFileName(dbname_))) {
      s = SetIdentityFile(env_, dbname_);
      if (!s.ok()) {
        return s;
      }
    }
J
jorlow@chromium.org 已提交
723 724
  }

725
  Status s = versions_->Recover(column_families, read_only);
726
  if (db_options_.paranoid_checks && s.ok()) {
I
Igor Canadi 已提交
727 728
    s = CheckConsistency();
  }
J
jorlow@chromium.org 已提交
729 730
  if (s.ok()) {
    SequenceNumber max_sequence(0);
731 732
    default_cf_handle_ = new ColumnFamilyHandleImpl(
        versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
733
    default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
734 735
    single_column_family_mode_ =
        versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
736 737 738 739 740

    // Recover from all newer log files than the ones named in the
    // descriptor (new log files may have been added by the previous
    // incarnation without registering them in the descriptor).
    //
741
    // Note that prev_log_number() is no longer used, but we pay
742
    // attention to it in case we are recovering a database
743
    // produced by an older version of rocksdb.
744
    const uint64_t min_log = versions_->MinLogNumber();
745
    const uint64_t prev_log = versions_->prev_log_number();
746
    std::vector<std::string> filenames;
747
    s = env_->GetChildren(db_options_.wal_dir, &filenames);
748 749
    if (!s.ok()) {
      return s;
750
    }
K
kailiu 已提交
751

752 753
    std::vector<uint64_t> logs;
    for (size_t i = 0; i < filenames.size(); i++) {
K
kailiu 已提交
754 755
      uint64_t number;
      FileType type;
756 757 758 759 760 761 762 763 764
      if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
        if (is_new_db) {
          return Status::Corruption(
              "While creating a new Db, wal_dir contains "
              "existing log file: ",
              filenames[i]);
        } else if ((number >= min_log) || (number == prev_log)) {
          logs.push_back(number);
        }
765
      }
J
jorlow@chromium.org 已提交
766
    }
767

H
heyongqiang 已提交
768 769 770 771 772 773
    if (logs.size() > 0 && error_if_log_file_exist) {
      return Status::Corruption(""
          "The db was opened in readonly mode with error_if_log_file_exist"
          "flag but a log file already exists");
    }

S
Stanislau Hlebik 已提交
774 775 776 777 778 779 780
    if (!logs.empty()) {
      // Recover in the order in which the logs were generated
      std::sort(logs.begin(), logs.end());
      s = RecoverLogFiles(logs, &max_sequence, read_only);
      if (!s.ok()) {
        // Clear memtables if recovery failed
        for (auto cfd : *versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
781
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
S
Stanislau Hlebik 已提交
782 783
        }
      }
784
    }
L
Lei Jin 已提交
785
    SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence());
J
jorlow@chromium.org 已提交
786 787
  }

L
Lei Jin 已提交
788 789
  // Initial value
  max_total_in_memory_state_ = 0;
I
Igor Canadi 已提交
790
  for (auto cfd : *versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
791 792 793
    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
    max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
                                  mutable_cf_options->max_write_buffer_number;
I
Igor Canadi 已提交
794 795
  }

J
jorlow@chromium.org 已提交
796 797 798
  return s;
}

S
Stanislau Hlebik 已提交
799 800 801
// REQUIRES: log_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
                               SequenceNumber* max_sequence, bool read_only) {
J
jorlow@chromium.org 已提交
802 803
  struct LogReporter : public log::Reader::Reporter {
    Env* env;
804
    Logger* info_log;
J
jorlow@chromium.org 已提交
805
    const char* fname;
806 807
    Status* status;  // nullptr if db_options_.paranoid_checks==false or
                     //            db_options_.skip_log_error_on_recovery==true
J
jorlow@chromium.org 已提交
808
    virtual void Corruption(size_t bytes, const Status& s) {
809 810
      Log(InfoLogLevel::WARN_LEVEL,
          info_log, "%s%s: dropping %d bytes; %s",
811
          (this->status == nullptr ? "(ignoring error) " : ""),
J
jorlow@chromium.org 已提交
812
          fname, static_cast<int>(bytes), s.ToString().c_str());
813
      if (this->status != nullptr && this->status->ok()) *this->status = s;
J
jorlow@chromium.org 已提交
814 815 816 817
    }
  };

  mutex_.AssertHeld();
S
Stanislau Hlebik 已提交
818
  Status status;
819
  std::unordered_map<int, VersionEdit> version_edits;
820
  // no need to refcount because iteration is under mutex
821 822
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    VersionEdit edit;
823 824
    edit.SetColumnFamily(cfd->GetID());
    version_edits.insert({cfd->GetID(), edit});
825
  }
I
Igor Canadi 已提交
826

S
Stanislau Hlebik 已提交
827 828 829 830 831 832 833 834 835
  for (auto log_number : log_numbers) {
    // The previous incarnation may not have written any MANIFEST
    // records after allocating this log number.  So we manually
    // update the file number allocation counter in VersionSet.
    versions_->MarkFileNumberUsed(log_number);
    // Open the log file
    std::string fname = LogFileName(db_options_.wal_dir, log_number);
    unique_ptr<SequentialFile> file;
    status = env_->NewSequentialFile(fname, &file, env_options_);
J
jorlow@chromium.org 已提交
836
    if (!status.ok()) {
S
Stanislau Hlebik 已提交
837 838 839 840 841 842 843 844
      MaybeIgnoreError(&status);
      if (!status.ok()) {
        return status;
      } else {
        // Fail with one log file, but that's ok.
        // Try next one.
        continue;
      }
J
jorlow@chromium.org 已提交
845 846
    }

S
Stanislau Hlebik 已提交
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
    // Create the log reader.
    LogReporter reporter;
    reporter.env = env_;
    reporter.info_log = db_options_.info_log.get();
    reporter.fname = fname.c_str();
    reporter.status =
        (db_options_.paranoid_checks && !db_options_.skip_log_error_on_recovery
             ? &status
             : nullptr);
    // We intentially make log::Reader do checksumming even if
    // paranoid_checks==false so that corruptions cause entire commits
    // to be skipped instead of propagating bad information (like overly
    // large sequence numbers).
    log::Reader reader(std::move(file), &reporter, true /*checksum*/,
                       0 /*initial_offset*/);
862 863
    Log(InfoLogLevel::INFO_LEVEL,
        db_options_.info_log, "Recovering log #%" PRIu64 "", log_number);
S
Stanislau Hlebik 已提交
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895

    // Read all the records and add to a memtable
    std::string scratch;
    Slice record;
    WriteBatch batch;
    while (reader.ReadRecord(&record, &scratch)) {
      if (record.size() < 12) {
        reporter.Corruption(record.size(),
                            Status::Corruption("log record too small"));
        continue;
      }
      WriteBatchInternal::SetContents(&batch, record);

      // If column family was not found, it might mean that the WAL write
      // batch references to the column family that was dropped after the
      // insert. We don't want to fail the whole write batch in that case --
      // we just ignore the update.
      // That's why we set ignore missing column families to true
      status = WriteBatchInternal::InsertInto(
          &batch, column_family_memtables_.get(), true, log_number);

      MaybeIgnoreError(&status);
      if (!status.ok()) {
        return status;
      }
      const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
                                      WriteBatchInternal::Count(&batch) - 1;
      if (last_seq > *max_sequence) {
        *max_sequence = last_seq;
      }

      if (!read_only) {
I
Igor Canadi 已提交
896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
        // we can do this because this is called before client has access to the
        // DB and there is only a single thread operating on DB
        ColumnFamilyData* cfd;

        while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
          cfd->Unref();
          // If this asserts, it means that InsertInto failed in
          // filtering updates to already-flushed column families
          assert(cfd->GetLogNumber() <= log_number);
          auto iter = version_edits.find(cfd->GetID());
          assert(iter != version_edits.end());
          VersionEdit* edit = &iter->second;
          status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
          if (!status.ok()) {
            // Reflect errors immediately so that conditions like full
            // file-systems cause the DB::Open() to fail.
            return status;
913
          }
L
Lei Jin 已提交
914
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
915
        }
J
jorlow@chromium.org 已提交
916 917 918
      }
    }

I
Igor Canadi 已提交
919
    flush_scheduler_.Clear();
S
Stanislau Hlebik 已提交
920 921 922
    if (versions_->LastSequence() < *max_sequence) {
      versions_->SetLastSequence(*max_sequence);
    }
923 924
  }

925
  if (!read_only) {
926 927
    // no need to refcount since client still doesn't have access
    // to the DB and can not drop column families while we iterate
S
Stanislau Hlebik 已提交
928
    auto max_log_number = log_numbers.back();
929
    for (auto cfd : *versions_->GetColumnFamilySet()) {
930
      auto iter = version_edits.find(cfd->GetID());
931 932 933
      assert(iter != version_edits.end());
      VersionEdit* edit = &iter->second;

S
Stanislau Hlebik 已提交
934
      if (cfd->GetLogNumber() > max_log_number) {
935
        // Column family cfd has already flushed the data
S
Stanislau Hlebik 已提交
936
        // from all logs. Memtable has to be empty because
937
        // we filter the updates based on log_number
938
        // (in WriteBatch::InsertInto)
939 940 941 942 943 944 945
        assert(cfd->mem()->GetFirstSequenceNumber() == 0);
        assert(edit->NumEntries() == 0);
        continue;
      }

      // flush the final memtable (if non-empty)
      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
946
        status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit);
S
Stanislau Hlebik 已提交
947 948 949 950
        if (!status.ok()) {
          // Recovery failed
          break;
        }
L
Lei Jin 已提交
951
        cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions());
952
      }
J
jorlow@chromium.org 已提交
953

954
      // write MANIFEST with update
S
Stanislau Hlebik 已提交
955
      // writing log_number in the manifest means that any log file
956 957
      // with number strongly less than (log_number + 1) is already
      // recovered and should be ignored on next reincarnation.
S
Stanislau Hlebik 已提交
958 959 960
      // Since we already recovered max_log_number, we want all logs
      // with numbers `<= max_log_number` (includes this one) to be ignored
      edit->SetLogNumber(max_log_number + 1);
961 962 963
      // we must mark the next log number as used, even though it's
      // not actually used. that is because VersionSet assumes
      // VersionSet::next_file_number_ always to be strictly greater than any
I
Igor Canadi 已提交
964
      // log number
S
Stanislau Hlebik 已提交
965
      versions_->MarkFileNumberUsed(max_log_number + 1);
966 967
      status = versions_->LogAndApply(
          cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
968
      if (!status.ok()) {
S
Stanislau Hlebik 已提交
969 970
        // Recovery failed
        break;
971 972
      }
    }
973
  }
I
Igor Canadi 已提交
974

J
jorlow@chromium.org 已提交
975 976 977
  return status;
}

978 979
Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
                                           VersionEdit* edit) {
J
jorlow@chromium.org 已提交
980
  mutex_.AssertHeld();
981
  const uint64_t start_micros = env_->NowMicros();
J
jorlow@chromium.org 已提交
982
  FileMetaData meta;
983 984
  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  pending_outputs_[meta.fd.GetNumber()] = 0;  // path 0 for level 0 file.
985 986
  ReadOptions ro;
  ro.total_order_seek = true;
987
  Arena arena;
988 989
  Status s;
  {
990 991 992 993
    ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
    const SequenceNumber newest_snapshot = snapshots_.GetNewest();
    const SequenceNumber earliest_seqno_in_memtable =
        mem->GetFirstSequenceNumber();
994 995 996
    Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
        "[%s] [WriteLevel0TableForRecovery]"
        " Level-0 table #%" PRIu64 ": started",
997
        cfd->GetName().c_str(), meta.fd.GetNumber());
998

999 1000 1001 1002 1003
    {
      mutex_.Unlock();
      s = BuildTable(
          dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
          iter.get(), &meta, cfd->internal_comparator(), newest_snapshot,
1004 1005
          earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
          cfd->ioptions()->compression_opts, Env::IO_HIGH);
1006
      LogFlush(db_options_.info_log);
1007 1008
      mutex_.Lock();
    }
1009

1010 1011 1012
    Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
        "[%s] [WriteLevel0TableForRecovery]"
        " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1013 1014 1015
        cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
        s.ToString().c_str());
  }
1016
  pending_outputs_.erase(meta.fd.GetNumber());
1017 1018 1019 1020

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
1021
  if (s.ok() && meta.fd.GetFileSize() > 0) {
1022 1023 1024
    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
1025 1026
  }

L
Lei Jin 已提交
1027
  InternalStats::CompactionStats stats(1);
1028
  stats.micros = env_->NowMicros() - start_micros;
1029
  stats.bytes_written = meta.fd.GetFileSize();
M
Mark Callaghan 已提交
1030
  stats.files_out_levelnp1 = 1;
1031
  cfd->internal_stats()->AddCompactionStats(level, stats);
1032 1033
  cfd->internal_stats()->AddCFStats(
      InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
L
Lei Jin 已提交
1034
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
J
jorlow@chromium.org 已提交
1035 1036 1037
  return s;
}

1038 1039
Status DBImpl::FlushMemTableToOutputFile(
    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
I
Igor Canadi 已提交
1040
    bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) {
1041
  mutex_.AssertHeld();
1042
  assert(cfd->imm()->size() != 0);
1043
  assert(cfd->imm()->IsFlushPending());
1044

I
Igor Canadi 已提交
1045 1046 1047 1048 1049
  FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
                     env_options_, versions_.get(), &mutex_, &shutting_down_,
                     &pending_outputs_, snapshots_.GetNewest(), job_context,
                     log_buffer, db_directory_.get(),
                     GetCompressionFlush(*cfd->ioptions()), stats_);
1050

I
Igor Canadi 已提交
1051
  Status s = flush_job.Run();
J
jorlow@chromium.org 已提交
1052 1053

  if (s.ok()) {
I
Igor Canadi 已提交
1054
    InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
1055 1056 1057
    if (madeProgress) {
      *madeProgress = 1;
    }
S
sdong 已提交
1058
    VersionStorageInfo::LevelSummaryStorage tmp;
I
Igor Canadi 已提交
1059
    LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
S
sdong 已提交
1060
                cfd->current()->storage_info()->LevelSummary(&tmp));
1061

1062
    if (disable_delete_obsolete_files_ == 0) {
I
Igor Canadi 已提交
1063
      // add to deletion state
1064
      while (alive_log_files_.size() &&
I
Igor Canadi 已提交
1065 1066
             alive_log_files_.begin()->number < versions_->MinLogNumber()) {
        const auto& earliest = *alive_log_files_.begin();
I
Igor Canadi 已提交
1067
        job_context->log_delete_files.push_back(earliest.number);
I
Igor Canadi 已提交
1068
        total_log_size_ -= earliest.size;
1069 1070
        alive_log_files_.pop_front();
      }
1071
    }
J
jorlow@chromium.org 已提交
1072
  }
1073

1074
  if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks &&
1075 1076 1077 1078 1079
      bg_error_.ok()) {
    // if a bad error happened (not ShutdownInProgress) and paranoid_checks is
    // true, mark DB read-only
    bg_error_ = s;
  }
1080
  RecordFlushIOStats();
J
jorlow@chromium.org 已提交
1081 1082 1083
  return s;
}

1084
Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
1085
                            const Slice* begin, const Slice* end,
1086 1087
                            bool reduce_level, int target_level,
                            uint32_t target_path_id) {
1088
  if (target_path_id >= db_options_.db_paths.size()) {
1089 1090 1091
    return Status::InvalidArgument("Invalid target path ID");
  }

1092 1093
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
I
Igor Canadi 已提交
1094 1095

  Status s = FlushMemTable(cfd, FlushOptions());
L
Lei Jin 已提交
1096
  if (!s.ok()) {
1097
    LogFlush(db_options_.info_log);
L
Lei Jin 已提交
1098 1099 1100
    return s;
  }

I
Igor Canadi 已提交
1101
  int max_level_with_files = 0;
G
Gabor Cselle 已提交
1102 1103
  {
    MutexLock l(&mutex_);
I
Igor Canadi 已提交
1104 1105
    Version* base = cfd->current();
    for (int level = 1; level < cfd->NumberLevels(); level++) {
S
sdong 已提交
1106
      if (base->storage_info()->OverlapInLevel(level, begin, end)) {
G
Gabor Cselle 已提交
1107 1108 1109 1110
        max_level_with_files = level;
      }
    }
  }
1111 1112
  for (int level = 0; level <= max_level_with_files; level++) {
    // in case the compaction is unversal or if we're compacting the
1113 1114 1115
    // bottom-most level, the output level will be the same as input one.
    // level 0 can never be the bottommost level (i.e. if all files are in level
    // 0, we will compact to level 1)
1116 1117
    if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
        cfd->ioptions()->compaction_style == kCompactionStyleFIFO ||
1118
        (level == max_level_with_files && level > 0)) {
1119
      s = RunManualCompaction(cfd, level, level, target_path_id, begin, end);
1120
    } else {
1121 1122
      s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin,
                              end);
L
Lei Jin 已提交
1123 1124
    }
    if (!s.ok()) {
1125
      LogFlush(db_options_.info_log);
L
Lei Jin 已提交
1126
      return s;
1127
    }
G
Gabor Cselle 已提交
1128
  }
1129 1130

  if (reduce_level) {
I
Igor Canadi 已提交
1131
    s = ReFitLevel(cfd, max_level_with_files, target_level);
1132
  }
1133
  LogFlush(db_options_.info_log);
L
Lei Jin 已提交
1134

1135 1136 1137 1138 1139 1140 1141
  {
    MutexLock l(&mutex_);
    // an automatic compaction that has been scheduled might have been
    // preempted by the manual compactions. Need to schedule it back.
    MaybeScheduleFlushOrCompaction();
  }

L
Lei Jin 已提交
1142
  return s;
1143 1144
}

1145
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
1146
    const std::unordered_map<std::string, std::string>& options_map) {
L
Lei Jin 已提交
1147 1148
  auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  if (options_map.empty()) {
1149 1150
    Log(InfoLogLevel::WARN_LEVEL,
        db_options_.info_log, "SetOptions() on column family [%s], empty input",
L
Lei Jin 已提交
1151
        cfd->GetName().c_str());
1152
    return Status::InvalidArgument("empty input");
L
Lei Jin 已提交
1153 1154 1155
  }

  MutableCFOptions new_options;
1156
  Status s;
L
Lei Jin 已提交
1157 1158
  {
    MutexLock l(&mutex_);
1159 1160
    s = cfd->SetOptions(options_map);
    if (s.ok()) {
L
Lei Jin 已提交
1161 1162 1163 1164
      new_options = *cfd->GetLatestMutableCFOptions();
    }
  }

1165 1166
  Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
      "SetOptions() on column family [%s], inputs:",
L
Lei Jin 已提交
1167 1168
      cfd->GetName().c_str());
  for (const auto& o : options_map) {
1169 1170
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "%s: %s\n", o.first.c_str(), o.second.c_str());
L
Lei Jin 已提交
1171
  }
1172
  if (s.ok()) {
1173 1174
    Log(InfoLogLevel::INFO_LEVEL,
        db_options_.info_log, "[%s] SetOptions succeeded",
L
Lei Jin 已提交
1175 1176 1177
        cfd->GetName().c_str());
    new_options.Dump(db_options_.info_log.get());
  } else {
1178 1179
    Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
        "[%s] SetOptions failed", cfd->GetName().c_str());
L
Lei Jin 已提交
1180
  }
1181
  return s;
1182 1183
}

1184
// return the same level if it cannot be moved
1185 1186
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
    const MutableCFOptions& mutable_cf_options, int level) {
1187
  mutex_.AssertHeld();
S
sdong 已提交
1188
  const auto* vstorage = cfd->current()->storage_info();
1189
  int minimum_level = level;
1190
  for (int i = level - 1; i > 0; --i) {
1191
    // stop if level i is not empty
S
sdong 已提交
1192
    if (vstorage->NumLevelFiles(i) > 0) break;
1193
    // stop if level i is too small (cannot fit the level files)
1194
    if (mutable_cf_options.MaxBytesForLevel(i) <
S
sdong 已提交
1195
        vstorage->NumLevelBytes(level)) {
1196 1197
      break;
    }
1198 1199 1200 1201 1202 1203

    minimum_level = i;
  }
  return minimum_level;
}

I
Igor Canadi 已提交
1204 1205
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
  assert(level < cfd->NumberLevels());
1206

I
Igor Canadi 已提交
1207
  SuperVersion* superversion_to_free = nullptr;
1208
  SuperVersion* new_superversion = new SuperVersion();
I
Igor Canadi 已提交
1209 1210

  mutex_.Lock();
1211 1212 1213

  // only allow one thread refitting
  if (refitting_level_) {
I
Igor Canadi 已提交
1214
    mutex_.Unlock();
1215 1216
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[ReFitLevel] another thread is refitting");
I
Igor Canadi 已提交
1217
    delete new_superversion;
L
Lei Jin 已提交
1218
    return Status::NotSupported("another thread is refitting");
1219 1220 1221 1222 1223
  }
  refitting_level_ = true;

  // wait for all background threads to stop
  bg_work_gate_closed_ = true;
1224
  while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) {
1225 1226
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[RefitLevel] waiting for background threads to stop: %d %d",
1227
        bg_compaction_scheduled_, bg_flush_scheduled_);
1228 1229 1230
    bg_cv_.Wait();
  }

1231 1232
  const MutableCFOptions mutable_cf_options =
    *cfd->GetLatestMutableCFOptions();
1233
  // move to a smaller level
1234 1235
  int to_level = target_level;
  if (target_level < 0) {
1236
    to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1237
  }
1238 1239 1240

  assert(to_level <= level);

L
Lei Jin 已提交
1241
  Status status;
1242
  if (to_level < level) {
1243 1244
    Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
        "[%s] Before refitting:\n%s",
1245
        cfd->GetName().c_str(), cfd->current()->DebugString().data());
1246

1247
    VersionEdit edit;
I
Igor Canadi 已提交
1248
    edit.SetColumnFamily(cfd->GetID());
S
sdong 已提交
1249
    for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) {
1250
      edit.DeleteFile(level, f->fd.GetNumber());
1251 1252 1253
      edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
                   f->fd.GetFileSize(), f->smallest, f->largest,
                   f->smallest_seqno, f->largest_seqno);
1254
    }
1255 1256
    Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
        "[%s] Apply version edit:\n%s",
I
Igor Canadi 已提交
1257
        cfd->GetName().c_str(), edit.DebugString().data());
1258

1259 1260
    status = versions_->LogAndApply(cfd,
        mutable_cf_options, &edit, &mutex_, db_directory_.get());
L
Lei Jin 已提交
1261 1262
    superversion_to_free = InstallSuperVersion(
        cfd, new_superversion, mutable_cf_options);
I
Igor Canadi 已提交
1263
    new_superversion = nullptr;
1264

1265 1266
    Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
        "[%s] LogAndApply: %s\n", cfd->GetName().c_str(),
I
Igor Canadi 已提交
1267
        status.ToString().data());
1268 1269

    if (status.ok()) {
1270 1271
      Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
          "[%s] After refitting:\n%s",
I
Igor Canadi 已提交
1272
          cfd->GetName().c_str(), cfd->current()->DebugString().data());
1273 1274 1275 1276 1277
    }
  }

  refitting_level_ = false;
  bg_work_gate_closed_ = false;
I
Igor Canadi 已提交
1278 1279 1280 1281

  mutex_.Unlock();
  delete superversion_to_free;
  delete new_superversion;
L
Lei Jin 已提交
1282
  return status;
G
Gabor Cselle 已提交
1283 1284
}

1285 1286 1287
int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  return cfh->cfd()->NumberLevels();
1288 1289
}

1290 1291
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
L
Lei Jin 已提交
1292 1293 1294
  MutexLock l(&mutex_);
  return cfh->cfd()->GetSuperVersion()->
      mutable_cf_options.max_mem_compaction_level;
1295 1296
}

1297 1298
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
L
Lei Jin 已提交
1299 1300 1301
  MutexLock l(&mutex_);
  return cfh->cfd()->GetSuperVersion()->
      mutable_cf_options.level0_stop_writes_trigger;
1302 1303
}

L
Lei Jin 已提交
1304
Status DBImpl::Flush(const FlushOptions& flush_options,
1305 1306
                     ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
L
Lei Jin 已提交
1307
  return FlushMemTable(cfh->cfd(), flush_options);
H
heyongqiang 已提交
1308 1309
}

1310
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1311 1312 1313
  return versions_->LastSequence();
}

I
Igor Canadi 已提交
1314
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
1315 1316
                                   int output_level, uint32_t output_path_id,
                                   const Slice* begin, const Slice* end) {
1317
  assert(input_level >= 0);
1318

G
Gabor Cselle 已提交
1319 1320
  InternalKey begin_storage, end_storage;

H
hans@chromium.org 已提交
1321
  ManualCompaction manual;
I
Igor Canadi 已提交
1322
  manual.cfd = cfd;
1323 1324
  manual.input_level = input_level;
  manual.output_level = output_level;
1325
  manual.output_path_id = output_path_id;
G
Gabor Cselle 已提交
1326
  manual.done = false;
1327
  manual.in_progress = false;
1328 1329 1330
  // For universal compaction, we enforce every manual compaction to compact
  // all files.
  if (begin == nullptr ||
1331 1332
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1333
    manual.begin = nullptr;
G
Gabor Cselle 已提交
1334 1335 1336 1337
  } else {
    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
    manual.begin = &begin_storage;
  }
1338
  if (end == nullptr ||
1339 1340
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1341
    manual.end = nullptr;
G
Gabor Cselle 已提交
1342 1343 1344 1345 1346 1347
  } else {
    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
    manual.end = &end_storage;
  }

  MutexLock l(&mutex_);
1348

1349 1350 1351 1352 1353 1354
  // When a manual compaction arrives, temporarily disable scheduling of
  // non-manual compactions and wait until the number of scheduled compaction
  // jobs drops to zero. This is needed to ensure that this manual compaction
  // can compact any range of keys/files.
  //
  // bg_manual_only_ is non-zero when at least one thread is inside
1355
  // RunManualCompaction(), i.e. during that time no other compaction will
1356 1357 1358
  // get scheduled (see MaybeScheduleFlushOrCompaction).
  //
  // Note that the following loop doesn't stop more that one thread calling
1359
  // RunManualCompaction() from getting to the second while loop below.
1360 1361 1362 1363 1364
  // However, only one of them will actually schedule compaction, while
  // others will wait on a condition variable until it completes.

  ++bg_manual_only_;
  while (bg_compaction_scheduled_ > 0) {
1365
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
I
Igor Canadi 已提交
1366 1367 1368
        "[%s] Manual compaction waiting for all other scheduled background "
        "compactions to finish",
        cfd->GetName().c_str());
1369 1370
    bg_cv_.Wait();
  }
1371

1372 1373
  Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
      "[%s] Manual compaction starting",
I
Igor Canadi 已提交
1374
      cfd->GetName().c_str());
1375

1376 1377 1378 1379
  // We don't check bg_error_ here, because if we get the error in compaction,
  // the compaction will set manual.status to bg_error_ and set manual.done to
  // true.
  while (!manual.done) {
1380 1381 1382
    assert(bg_manual_only_ > 0);
    if (manual_compaction_ != nullptr) {
      // Running either this or some other manual compaction
G
Gabor Cselle 已提交
1383
      bg_cv_.Wait();
1384 1385
    } else {
      manual_compaction_ = &manual;
1386 1387
      bg_compaction_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
G
Gabor Cselle 已提交
1388
    }
H
hans@chromium.org 已提交
1389
  }
1390

1391 1392 1393
  assert(!manual.in_progress);
  assert(bg_manual_only_ > 0);
  --bg_manual_only_;
L
Lei Jin 已提交
1394
  return manual.status;
J
jorlow@chromium.org 已提交
1395 1396
}

1397
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
L
Lei Jin 已提交
1398
                             const FlushOptions& flush_options) {
S
Stanislau Hlebik 已提交
1399 1400 1401 1402
  Status s;
  {
    WriteContext context;
    MutexLock guard_lock(&mutex_);
1403 1404 1405 1406 1407 1408

    if (cfd->imm()->size() == 0 && cfd->mem()->IsEmpty()) {
      // Nothing to flush
      return Status::OK();
    }

I
Igor Canadi 已提交
1409 1410
    WriteThread::Writer w(&mutex_);
    s = write_thread_.EnterWriteThread(&w, 0);
S
Stanislau Hlebik 已提交
1411 1412 1413 1414 1415 1416 1417 1418
    assert(s.ok() && !w.done);  // No timeout and nobody should do our job

    // SetNewMemtableAndNewLogFile() will release and reacquire mutex
    // during execution
    s = SetNewMemtableAndNewLogFile(cfd, &context);
    cfd->imm()->FlushRequested();
    MaybeScheduleFlushOrCompaction();

I
Igor Canadi 已提交
1419
    write_thread_.ExitWriteThread(&w, &w, s);
S
Stanislau Hlebik 已提交
1420
  }
S
Stanislau Hlebik 已提交
1421

L
Lei Jin 已提交
1422
  if (s.ok() && flush_options.wait) {
1423
    // Wait until the compaction completes
1424
    s = WaitForFlushMemTable(cfd);
1425 1426
  }
  return s;
J
jorlow@chromium.org 已提交
1427 1428
}

1429
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
1430 1431 1432
  Status s;
  // Wait until the compaction completes
  MutexLock l(&mutex_);
1433
  while (cfd->imm()->size() > 0 && bg_error_.ok()) {
1434 1435
    bg_cv_.Wait();
  }
1436
  if (!bg_error_.ok()) {
1437 1438 1439
    s = bg_error_;
  }
  return s;
H
heyongqiang 已提交
1440 1441
}

1442
void DBImpl::MaybeScheduleFlushOrCompaction() {
J
jorlow@chromium.org 已提交
1443
  mutex_.AssertHeld();
1444
  bg_schedule_needed_ = false;
1445 1446
  if (bg_work_gate_closed_) {
    // gate closed for backgrond work
I
Igor Canadi 已提交
1447
  } else if (shutting_down_.load(std::memory_order_acquire)) {
J
jorlow@chromium.org 已提交
1448 1449
    // DB is being deleted; no more background compactions
  } else {
1450
    bool is_flush_pending = false;
1451
    // no need to refcount since we're under a mutex
1452
    for (auto cfd : *versions_->GetColumnFamilySet()) {
I
Igor Canadi 已提交
1453
      if (cfd->imm()->IsFlushPending()) {
1454 1455 1456
        is_flush_pending = true;
      }
    }
1457
    if (is_flush_pending) {
1458
      // memtable flush needed
1459
      if (bg_flush_scheduled_ < db_options_.max_background_flushes) {
1460 1461
        bg_flush_scheduled_++;
        env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
1462
      } else if (db_options_.max_background_flushes > 0) {
1463
        bg_schedule_needed_ = true;
1464
      }
1465
    }
1466
    bool is_compaction_needed = false;
1467
    // no need to refcount since we're under a mutex
1468
    for (auto cfd : *versions_->GetColumnFamilySet()) {
S
sdong 已提交
1469
      if (cfd->current()->storage_info()->NeedsCompaction()) {
1470 1471 1472 1473
        is_compaction_needed = true;
        break;
      }
    }
1474

1475 1476
    // Schedule BGWorkCompaction if there's a compaction pending (or a memtable
    // flush, but the HIGH pool is not enabled)
1477 1478 1479 1480
    // Do it only if max_background_compactions hasn't been reached and
    // bg_manual_only_ == 0
    if (!bg_manual_only_ &&
        (is_compaction_needed ||
1481 1482
         (is_flush_pending && db_options_.max_background_flushes == 0))) {
      if (bg_compaction_scheduled_ < db_options_.max_background_compactions) {
1483 1484 1485 1486 1487
        bg_compaction_scheduled_++;
        env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW);
      } else {
        bg_schedule_needed_ = true;
      }
1488 1489 1490 1491
    }
  }
}

1492
void DBImpl::RecordFlushIOStats() {
I
Igor Canadi 已提交
1493
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
1494 1495 1496
  IOSTATS_RESET(bytes_written);
}

1497
void DBImpl::BGWorkFlush(void* db) {
1498
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
1499 1500 1501 1502
  reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
}

void DBImpl::BGWorkCompaction(void* db) {
1503
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
1504 1505 1506
  reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}

I
Igor Canadi 已提交
1507
Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
H
Haobo Xu 已提交
1508
                               LogBuffer* log_buffer) {
1509
  mutex_.AssertHeld();
1510 1511 1512 1513 1514

  if (!bg_error_.ok()) {
    return bg_error_;
  }

1515 1516 1517 1518 1519
  // call_status is failure if at least one flush was a failure. even if
  // flushing one column family reports a failure, we will continue flushing
  // other column families. however, call_status will be a failure in that case.
  Status call_status;
  // refcounting in iteration
1520
  for (auto cfd : *versions_->GetColumnFamilySet()) {
1521 1522
    cfd->Ref();
    Status flush_status;
1523 1524
    const MutableCFOptions mutable_cf_options =
      *cfd->GetLatestMutableCFOptions();
1525
    while (flush_status.ok() && cfd->imm()->IsFlushPending()) {
1526 1527
      LogToBuffer(
          log_buffer,
1528
          "BackgroundCallFlush doing FlushMemTableToOutputFile with column "
I
Igor Canadi 已提交
1529 1530
          "family [%s], flush slots available %d",
          cfd->GetName().c_str(),
1531
          db_options_.max_background_flushes - bg_flush_scheduled_);
1532
      flush_status = FlushMemTableToOutputFile(
I
Igor Canadi 已提交
1533
          cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
1534 1535 1536
    }
    if (call_status.ok() && !flush_status.ok()) {
      call_status = flush_status;
1537
    }
1538
    cfd->Unref();
J
jorlow@chromium.org 已提交
1539
  }
1540
  versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
1541
  return call_status;
J
jorlow@chromium.org 已提交
1542 1543
}

1544
void DBImpl::BackgroundCallFlush() {
1545
  bool madeProgress = false;
I
Igor Canadi 已提交
1546
  JobContext job_context(true);
1547 1548
  assert(bg_flush_scheduled_);

1549
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
H
Haobo Xu 已提交
1550 1551 1552 1553
  {
    MutexLock l(&mutex_);

    Status s;
I
Igor Canadi 已提交
1554
    if (!shutting_down_.load(std::memory_order_acquire)) {
I
Igor Canadi 已提交
1555
      s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
H
Haobo Xu 已提交
1556 1557 1558 1559 1560
      if (!s.ok()) {
        // Wait a little bit before retrying background compaction in
        // case this is an environmental problem and we do not want to
        // chew up resources for failed compactions for the duration of
        // the problem.
1561 1562
        uint64_t error_cnt =
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
H
Haobo Xu 已提交
1563 1564
        bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
        mutex_.Unlock();
1565
        Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
1566 1567 1568
            "Waiting after background flush error: %s"
            "Accumulated background error counts: %" PRIu64,
            s.ToString().c_str(), error_cnt);
H
Haobo Xu 已提交
1569
        log_buffer.FlushBufferToLog();
1570
        LogFlush(db_options_.info_log);
H
Haobo Xu 已提交
1571 1572 1573 1574 1575 1576 1577
        env_->SleepForMicroseconds(1000000);
        mutex_.Lock();
      }
    }

    // If !s.ok(), this means that Flush failed. In that case, we want
    // to delete all obsolete files and we force FindObsoleteFiles()
I
Igor Canadi 已提交
1578
    FindObsoleteFiles(&job_context, !s.ok());
H
Haobo Xu 已提交
1579
    // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1580
    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
1581
      mutex_.Unlock();
1582 1583 1584 1585 1586
      // Have to flush the info logs before bg_flush_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
H
Haobo Xu 已提交
1587
      log_buffer.FlushBufferToLog();
I
Igor Canadi 已提交
1588 1589
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
1590
      }
1591 1592
      mutex_.Lock();
    }
I
Igor Canadi 已提交
1593

H
Haobo Xu 已提交
1594
    bg_flush_scheduled_--;
1595 1596 1597 1598 1599
    // Any time the mutex is released After finding the work to do, another
    // thread might execute MaybeScheduleFlushOrCompaction(). It is possible
    // that there is a pending job but it is not scheduled because of the
    // max thread limit.
    if (madeProgress || bg_schedule_needed_) {
H
Haobo Xu 已提交
1600 1601
      MaybeScheduleFlushOrCompaction();
    }
I
Igor Canadi 已提交
1602
    RecordFlushIOStats();
H
Haobo Xu 已提交
1603
    bg_cv_.SignalAll();
I
Igor Canadi 已提交
1604 1605 1606 1607
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
1608
  }
J
jorlow@chromium.org 已提交
1609 1610
}

1611
void DBImpl::BackgroundCallCompaction() {
1612
  bool madeProgress = false;
I
Igor Canadi 已提交
1613
  JobContext job_context(true);
H
Haobo Xu 已提交
1614 1615

  MaybeDumpStats();
1616
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
1617 1618 1619 1620
  {
    MutexLock l(&mutex_);
    assert(bg_compaction_scheduled_);
    Status s;
I
Igor Canadi 已提交
1621
    if (!shutting_down_.load(std::memory_order_acquire)) {
I
Igor Canadi 已提交
1622
      s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
1623 1624 1625 1626 1627
      if (!s.ok()) {
        // Wait a little bit before retrying background compaction in
        // case this is an environmental problem and we do not want to
        // chew up resources for failed compactions for the duration of
        // the problem.
1628 1629
        uint64_t error_cnt =
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
1630 1631
        bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
        mutex_.Unlock();
H
Haobo Xu 已提交
1632
        log_buffer.FlushBufferToLog();
1633
        Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
1634 1635 1636
            "Waiting after background compaction error: %s, "
            "Accumulated background error counts: %" PRIu64,
            s.ToString().c_str(), error_cnt);
1637
        LogFlush(db_options_.info_log);
1638 1639 1640 1641
        env_->SleepForMicroseconds(1000000);
        mutex_.Lock();
      }
    }
H
Haobo Xu 已提交
1642

1643 1644
    // If !s.ok(), this means that Compaction failed. In that case, we want
    // to delete all obsolete files we might have created and we force
I
Igor Canadi 已提交
1645 1646 1647
    // FindObsoleteFiles(). This is because job_context does not
    // catch all created files if compaction failed.
    FindObsoleteFiles(&job_context, !s.ok());
1648 1649

    // delete unnecessary files if any, this is done outside the mutex
I
Igor Canadi 已提交
1650
    if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
1651
      mutex_.Unlock();
1652 1653 1654 1655 1656
      // Have to flush the info logs before bg_compaction_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
H
Haobo Xu 已提交
1657
      log_buffer.FlushBufferToLog();
I
Igor Canadi 已提交
1658 1659
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
1660
      }
1661 1662
      mutex_.Lock();
    }
D
Dhruba Borthakur 已提交
1663

1664
    bg_compaction_scheduled_--;
J
jorlow@chromium.org 已提交
1665

1666 1667
    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();

1668 1669 1670
    // Previous compaction may have produced too many files in a level,
    // So reschedule another compaction if we made progress in the
    // last compaction.
1671 1672 1673 1674 1675 1676
    //
    // Also, any time the mutex is released After finding the work to do,
    // another thread might execute MaybeScheduleFlushOrCompaction(). It is
    // possible  that there is a pending job but it is not scheduled because of
    // the max thread limit.
    if (madeProgress || bg_schedule_needed_) {
1677 1678
      MaybeScheduleFlushOrCompaction();
    }
1679 1680
    if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
      // signal if
I
Igor Canadi 已提交
1681
      // * madeProgress -- need to wakeup DelayWrite
1682 1683 1684 1685 1686 1687
      // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
      // * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction
      // If none of this is true, there is no need to signal since nobody is
      // waiting for it
      bg_cv_.SignalAll();
    }
I
Igor Canadi 已提交
1688 1689 1690 1691
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
1692
  }
J
jorlow@chromium.org 已提交
1693 1694
}

I
Igor Canadi 已提交
1695
Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
1696
                                    LogBuffer* log_buffer) {
1697
  *madeProgress = false;
J
jorlow@chromium.org 已提交
1698
  mutex_.AssertHeld();
1699

1700 1701
  bool is_manual = (manual_compaction_ != nullptr) &&
                   (manual_compaction_->in_progress == false);
1702

1703 1704 1705 1706 1707 1708 1709 1710 1711 1712
  if (!bg_error_.ok()) {
    if (is_manual) {
      manual_compaction_->status = bg_error_;
      manual_compaction_->done = true;
      manual_compaction_->in_progress = false;
      manual_compaction_ = nullptr;
    }
    return bg_error_;
  }

1713 1714 1715
  if (is_manual) {
    // another thread cannot pick up the same work
    manual_compaction_->in_progress = true;
1716 1717 1718 1719
  } else if (manual_compaction_ != nullptr) {
    // there should be no automatic compactions running when manual compaction
    // is running
    return Status::OK();
1720 1721 1722 1723 1724
  }

  // FLUSH preempts compaction
  Status flush_stat;
  for (auto cfd : *versions_->GetColumnFamilySet()) {
1725 1726
    const MutableCFOptions mutable_cf_options =
      *cfd->GetLatestMutableCFOptions();
1727 1728 1729 1730 1731
    while (cfd->imm()->IsFlushPending()) {
      LogToBuffer(
          log_buffer,
          "BackgroundCompaction doing FlushMemTableToOutputFile, "
          "compaction slots available %d",
1732
          db_options_.max_background_compactions - bg_compaction_scheduled_);
1733
      cfd->Ref();
1734
      flush_stat = FlushMemTableToOutputFile(
I
Igor Canadi 已提交
1735
          cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
1736
      cfd->Unref();
1737 1738 1739 1740 1741 1742 1743
      if (!flush_stat.ok()) {
        if (is_manual) {
          manual_compaction_->status = flush_stat;
          manual_compaction_->done = true;
          manual_compaction_->in_progress = false;
          manual_compaction_ = nullptr;
        }
1744
        return flush_stat;
1745 1746 1747 1748
      }
    }
  }

1749 1750 1751
  // Compaction makes a copy of the latest MutableCFOptions. It should be used
  // throughout the compaction procedure to make sure consistency. It will
  // eventually be installed into SuperVersion
1752
  unique_ptr<Compaction> c;
1753 1754
  InternalKey manual_end_storage;
  InternalKey* manual_end = &manual_end_storage;
H
hans@chromium.org 已提交
1755
  if (is_manual) {
G
Gabor Cselle 已提交
1756
    ManualCompaction* m = manual_compaction_;
1757
    assert(m->in_progress);
1758 1759 1760
    c.reset(m->cfd->CompactRange(
          *m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level,
          m->output_path_id, m->begin, m->end, &manual_end));
1761
    if (!c) {
1762
      m->done = true;
G
Gabor Cselle 已提交
1763
    }
I
Igor Canadi 已提交
1764 1765 1766 1767 1768 1769 1770 1771 1772
    LogToBuffer(log_buffer,
                "[%s] Manual compaction from level-%d to level-%d from %s .. "
                "%s; will stop at %s\n",
                m->cfd->GetName().c_str(), m->input_level, m->output_level,
                (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
                (m->end ? m->end->DebugString().c_str() : "(end)"),
                ((m->done || manual_end == nullptr)
                     ? "(end)"
                     : manual_end->DebugString().c_str()));
I
Igor Canadi 已提交
1773
  } else {
1774
    // no need to refcount in iteration since it's always under a mutex
I
Igor Canadi 已提交
1775
    for (auto cfd : *versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
1776 1777 1778 1779
      // Pick up latest mutable CF Options and use it throughout the
      // compaction job
      auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
      if (!mutable_cf_options->disable_auto_compactions) {
1780 1781 1782
        // NOTE: try to avoid unnecessary copy of MutableCFOptions if
        // compaction is not necessary. Need to make sure mutex is held
        // until we make a copy in the following code
L
Lei Jin 已提交
1783
        c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
I
Igor Canadi 已提交
1784 1785
        if (c != nullptr) {
          // update statistics
L
Lei Jin 已提交
1786
          MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
I
Igor Canadi 已提交
1787 1788 1789
                      c->inputs(0)->size());
          break;
        }
I
Igor Canadi 已提交
1790 1791
      }
    }
J
jorlow@chromium.org 已提交
1792 1793 1794
  }

  Status status;
1795
  if (!c) {
H
hans@chromium.org 已提交
1796
    // Nothing to do
1797
    LogToBuffer(log_buffer, "Compaction nothing to do");
I
Igor Canadi 已提交
1798 1799 1800 1801 1802
  } else if (c->IsDeletionCompaction()) {
    // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
    // file if there is alive snapshot pointing to it
    assert(c->num_input_files(1) == 0);
    assert(c->level() == 0);
1803
    assert(c->column_family_data()->ioptions()->compaction_style ==
I
Igor Canadi 已提交
1804 1805
           kCompactionStyleFIFO);
    for (const auto& f : *c->inputs(0)) {
1806
      c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
I
Igor Canadi 已提交
1807
    }
1808 1809 1810
    status = versions_->LogAndApply(
        c->column_family_data(), *c->mutable_cf_options(), c->edit(),
        &mutex_, db_directory_.get());
I
Igor Canadi 已提交
1811 1812
    InstallSuperVersionBackground(c->column_family_data(), job_context,
                                  *c->mutable_cf_options());
I
Igor Canadi 已提交
1813 1814 1815 1816 1817
    LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
                c->column_family_data()->GetName().c_str(),
                c->num_input_files(0));
    c->ReleaseCompactionFiles(status);
    *madeProgress = true;
H
hans@chromium.org 已提交
1818
  } else if (!is_manual && c->IsTrivialMove()) {
J
jorlow@chromium.org 已提交
1819
    // Move file to next level
1820
    assert(c->num_input_files(0) == 1);
J
jorlow@chromium.org 已提交
1821
    FileMetaData* f = c->input(0, 0);
1822
    c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
1823 1824 1825
    c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
                       f->fd.GetFileSize(), f->smallest, f->largest,
                       f->smallest_seqno, f->largest_seqno);
1826 1827 1828
    status = versions_->LogAndApply(c->column_family_data(),
                                    *c->mutable_cf_options(),
                                    c->edit(), &mutex_, db_directory_.get());
1829
    // Use latest MutableCFOptions
I
Igor Canadi 已提交
1830 1831
    InstallSuperVersionBackground(c->column_family_data(), job_context,
                                  *c->mutable_cf_options());
1832

S
sdong 已提交
1833 1834 1835 1836 1837
    VersionStorageInfo::LevelSummaryStorage tmp;
    LogToBuffer(log_buffer, "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64
                            " bytes %s: %s\n",
                c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
                c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(),
S
sdong 已提交
1838
                c->input_version()->storage_info()->LevelSummary(&tmp));
I
Igor Canadi 已提交
1839
    c->ReleaseCompactionFiles(status);
1840
    *madeProgress = true;
J
jorlow@chromium.org 已提交
1841
  } else {
1842
    MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
I
Igor Canadi 已提交
1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862

    auto yield_callback = [&]() {
      return CallFlushDuringCompaction(c->column_family_data(),
                                       *c->mutable_cf_options(), job_context,
                                       log_buffer);
    };
    CompactionJob compaction_job(
        c.get(), db_options_, *c->mutable_cf_options(), env_options_,
        versions_.get(), &mutex_, &shutting_down_, &pending_outputs_,
        log_buffer, db_directory_.get(), stats_, &snapshots_,
        IsSnapshotSupported(), table_cache_, std::move(yield_callback));
    compaction_job.Prepare();
    mutex_.Unlock();
    status = compaction_job.Run();
    mutex_.Lock();
    status = compaction_job.Install(status);
    if (status.ok()) {
      InstallSuperVersionBackground(c->column_family_data(), job_context,
                                    *c->mutable_cf_options());
    }
I
Igor Canadi 已提交
1863
    c->ReleaseCompactionFiles(status);
1864
    c->ReleaseInputs();
1865
    *madeProgress = true;
J
jorlow@chromium.org 已提交
1866
  }
1867
  c.reset();
J
jorlow@chromium.org 已提交
1868 1869 1870

  if (status.ok()) {
    // Done
F
Feng Zhu 已提交
1871
  } else if (status.IsShutdownInProgress()) {
J
jorlow@chromium.org 已提交
1872 1873
    // Ignore compaction errors found during shutting down
  } else {
1874
    Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "Compaction error: %s",
1875
        status.ToString().c_str());
1876
    if (db_options_.paranoid_checks && bg_error_.ok()) {
J
jorlow@chromium.org 已提交
1877 1878 1879
      bg_error_ = status;
    }
  }
H
hans@chromium.org 已提交
1880 1881

  if (is_manual) {
G
Gabor Cselle 已提交
1882
    ManualCompaction* m = manual_compaction_;
1883
    if (!status.ok()) {
L
Lei Jin 已提交
1884
      m->status = status;
1885 1886
      m->done = true;
    }
1887 1888 1889 1890 1891 1892 1893 1894 1895
    // For universal compaction:
    //   Because universal compaction always happens at level 0, so one
    //   compaction will pick up all overlapped files. No files will be
    //   filtered out due to size limit and left for a successive compaction.
    //   So we can safely conclude the current compaction.
    //
    //   Also note that, if we don't stop here, then the current compaction
    //   writes a new file back to level 0, which will be used in successive
    //   compaction. Hence the manual compaction will never finish.
1896 1897 1898 1899 1900
    //
    // Stop the compaction if manual_end points to nullptr -- this means
    // that we compacted the whole range. manual_end should always point
    // to nullptr in case of universal compaction
    if (manual_end == nullptr) {
1901 1902
      m->done = true;
    }
G
Gabor Cselle 已提交
1903 1904 1905
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
I
Igor Canadi 已提交
1906
      // Universal and FIFO compactions should always compact the whole range
1907 1908
      assert(m->cfd->ioptions()->compaction_style != kCompactionStyleUniversal);
      assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
1909
      m->tmp_storage = *manual_end;
G
Gabor Cselle 已提交
1910 1911
      m->begin = &m->tmp_storage;
    }
1912
    m->in_progress = false; // not being processed anymore
1913
    manual_compaction_ = nullptr;
H
hans@chromium.org 已提交
1914
  }
1915
  return status;
J
jorlow@chromium.org 已提交
1916 1917
}

I
Igor Canadi 已提交
1918 1919 1920
uint64_t DBImpl::CallFlushDuringCompaction(
    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
    JobContext* job_context, LogBuffer* log_buffer) {
1921
  if (db_options_.max_background_flushes > 0) {
1922 1923 1924
    // flush thread will take care of this
    return 0;
  }
I
Igor Canadi 已提交
1925
  if (cfd->imm()->imm_flush_needed.load(std::memory_order_relaxed)) {
I
Igor Canadi 已提交
1926 1927 1928 1929
    const uint64_t imm_start = env_->NowMicros();
    mutex_.Lock();
    if (cfd->imm()->IsFlushPending()) {
      cfd->Ref();
I
Igor Canadi 已提交
1930 1931
      FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, job_context,
                                log_buffer);
I
Igor Canadi 已提交
1932
      cfd->Unref();
I
Igor Canadi 已提交
1933
      bg_cv_.SignalAll();  // Wakeup DelayWrite() if necessary
I
Igor Canadi 已提交
1934 1935 1936 1937 1938 1939 1940 1941
    }
    mutex_.Unlock();
    log_buffer->FlushBufferToLog();
    return env_->NowMicros() - imm_start;
  }
  return 0;
}

1942 1943
namespace {
struct IterState {
I
Igor Canadi 已提交
1944 1945
  IterState(DBImpl* _db, port::Mutex* _mu, SuperVersion* _super_version)
      : db(_db), mu(_mu), super_version(_super_version) {}
1946 1947

  DBImpl* db;
1948
  port::Mutex* mu;
1949
  SuperVersion* super_version;
1950 1951 1952 1953
};

static void CleanupIteratorState(void* arg1, void* arg2) {
  IterState* state = reinterpret_cast<IterState*>(arg1);
1954

1955
  if (state->super_version->Unref()) {
I
Igor Canadi 已提交
1956
    JobContext job_context;
1957

1958 1959
    state->mu->Lock();
    state->super_version->Cleanup();
I
Igor Canadi 已提交
1960
    state->db->FindObsoleteFiles(&job_context, false, true);
1961 1962 1963
    state->mu->Unlock();

    delete state->super_version;
I
Igor Canadi 已提交
1964 1965
    if (job_context.HaveSomethingToDelete()) {
      state->db->PurgeObsoleteFiles(job_context);
1966
    }
I
Igor Canadi 已提交
1967
  }
T
Tomislav Novak 已提交
1968

1969 1970
  delete state;
}
H
Hans Wennborg 已提交
1971
}  // namespace
1972

L
Lei Jin 已提交
1973
Iterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1974
                                      ColumnFamilyData* cfd,
1975 1976 1977
                                      SuperVersion* super_version,
                                      Arena* arena) {
  Iterator* internal_iter;
1978 1979 1980 1981 1982
  assert(arena != nullptr);
  // Need to create internal iterator from the arena.
  MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
  // Collect iterator for mutable mem
  merge_iter_builder.AddIterator(
L
Lei Jin 已提交
1983
      super_version->mem->NewIterator(read_options, arena));
1984
  // Collect all needed child iterators for immutable memtables
L
Lei Jin 已提交
1985
  super_version->imm->AddIterators(read_options, &merge_iter_builder);
1986
  // Collect iterators for files in L0 - Ln
L
Lei Jin 已提交
1987
  super_version->current->AddIterators(read_options, env_options_,
1988 1989
                                       &merge_iter_builder);
  internal_iter = merge_iter_builder.Finish();
1990
  IterState* cleanup = new IterState(this, &mutex_, super_version);
1991
  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
J
jorlow@chromium.org 已提交
1992 1993 1994 1995

  return internal_iter;
}

1996 1997 1998 1999
ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
  return default_cf_handle_;
}

L
Lei Jin 已提交
2000
Status DBImpl::Get(const ReadOptions& read_options,
2001
                   ColumnFamilyHandle* column_family, const Slice& key,
J
jorlow@chromium.org 已提交
2002
                   std::string* value) {
L
Lei Jin 已提交
2003
  return GetImpl(read_options, column_family, key, value);
2004 2005
}

I
Igor Canadi 已提交
2006 2007
// JobContext gets created and destructed outside of the lock --
// we
I
Igor Canadi 已提交
2008 2009
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
2010
// * delete SuperVersion()s outside of the lock -- superversions_to_free
I
Igor Canadi 已提交
2011
//
I
Igor Canadi 已提交
2012 2013 2014 2015
// However, if InstallSuperVersion() gets called twice with the same
// job_context, we can't reuse the SuperVersion() that got
// malloced
// because
I
Igor Canadi 已提交
2016 2017 2018
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
I
Igor Canadi 已提交
2019 2020
void DBImpl::InstallSuperVersionBackground(
    ColumnFamilyData* cfd, JobContext* job_context,
2021
    const MutableCFOptions& mutable_cf_options) {
2022
  mutex_.AssertHeld();
I
Igor Canadi 已提交
2023 2024 2025 2026
  SuperVersion* old_superversion = InstallSuperVersion(
      cfd, job_context->new_superversion, mutable_cf_options);
  job_context->new_superversion = nullptr;
  job_context->superversions_to_free.push_back(old_superversion);
I
Igor Canadi 已提交
2027 2028
}

L
Lei Jin 已提交
2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054
SuperVersion* DBImpl::InstallSuperVersion(
    ColumnFamilyData* cfd, SuperVersion* new_sv,
    const MutableCFOptions& mutable_cf_options) {
  mutex_.AssertHeld();
  auto* old = cfd->InstallSuperVersion(
      new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);

  // We want to schedule potential flush or compactions since new options may
  // have been picked up in this new version. New options may cause flush
  // compaction trigger condition to change.
  MaybeScheduleFlushOrCompaction();

  // Update max_total_in_memory_state_
  auto old_memtable_size = 0;
  if (old) {
    old_memtable_size = old->mutable_cf_options.write_buffer_size *
                        old->mutable_cf_options.max_write_buffer_number;
  }
  max_total_in_memory_state_ =
      max_total_in_memory_state_ - old_memtable_size +
      mutable_cf_options.write_buffer_size *
      mutable_cf_options.max_write_buffer_number;
  return old;
}

Status DBImpl::GetImpl(const ReadOptions& read_options,
2055 2056
                       ColumnFamilyHandle* column_family, const Slice& key,
                       std::string* value, bool* value_found) {
L
Lei Jin 已提交
2057
  StopWatch sw(env_, stats_, DB_GET);
2058
  PERF_TIMER_GUARD(get_snapshot_time);
2059

2060 2061 2062
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();

2063
  SequenceNumber snapshot;
L
Lei Jin 已提交
2064 2065 2066
  if (read_options.snapshot != nullptr) {
    snapshot = reinterpret_cast<const SnapshotImpl*>(
        read_options.snapshot)->number_;
2067 2068
  } else {
    snapshot = versions_->LastSequence();
J
jorlow@chromium.org 已提交
2069
  }
2070

2071
  // Acquire SuperVersion
2072
  SuperVersion* sv = GetAndRefSuperVersion(cfd);
I
Igor Canadi 已提交
2073

2074
  // Prepare to store a list of merge operations if merge occurs.
2075
  MergeContext merge_context;
2076

2077
  Status s;
2078
  // First look in the memtable, then in the immutable memtable (if any).
2079
  // s is both in/out. When in, s could either be OK or MergeInProgress.
2080
  // merge_operands will contain the sequence of merges in the latter case.
2081
  LookupKey lkey(key, snapshot);
L
Lei Jin 已提交
2082
  PERF_TIMER_STOP(get_snapshot_time);
2083

2084
  if (sv->mem->Get(lkey, value, &s, &merge_context)) {
2085
    // Done
L
Lei Jin 已提交
2086
    RecordTick(stats_, MEMTABLE_HIT);
2087
  } else if (sv->imm->Get(lkey, value, &s, &merge_context)) {
2088
    // Done
L
Lei Jin 已提交
2089
    RecordTick(stats_, MEMTABLE_HIT);
2090
  } else {
2091
    PERF_TIMER_GUARD(get_from_output_files_time);
L
Lei Jin 已提交
2092 2093
    sv->current->Get(read_options, lkey, value, &s, &merge_context,
                     value_found);
L
Lei Jin 已提交
2094
    RecordTick(stats_, MEMTABLE_MISS);
2095
  }
2096

2097 2098
  {
    PERF_TIMER_GUARD(get_post_process_time);
2099

2100
    ReturnAndCleanupSuperVersion(cfd, sv);
2101

2102 2103 2104
    RecordTick(stats_, NUMBER_KEYS_READ);
    RecordTick(stats_, BYTES_READ, value->size());
  }
2105
  return s;
J
jorlow@chromium.org 已提交
2106 2107
}

2108
std::vector<Status> DBImpl::MultiGet(
L
Lei Jin 已提交
2109
    const ReadOptions& read_options,
2110
    const std::vector<ColumnFamilyHandle*>& column_family,
2111
    const std::vector<Slice>& keys, std::vector<std::string>* values) {
2112

L
Lei Jin 已提交
2113
  StopWatch sw(env_, stats_, DB_MULTIGET);
2114
  PERF_TIMER_GUARD(get_snapshot_time);
K
Kai Liu 已提交
2115

2116
  SequenceNumber snapshot;
2117

2118
  struct MultiGetColumnFamilyData {
I
Igor Canadi 已提交
2119
    ColumnFamilyData* cfd;
2120 2121 2122 2123 2124
    SuperVersion* super_version;
  };
  std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data;
  // fill up and allocate outside of mutex
  for (auto cf : column_family) {
2125 2126 2127 2128 2129 2130
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
    auto cfd = cfh->cfd();
    if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
      auto mgcfd = new MultiGetColumnFamilyData();
      mgcfd->cfd = cfd;
      multiget_cf_data.insert({cfd->GetID(), mgcfd});
2131 2132 2133
    }
  }

2134
  mutex_.Lock();
L
Lei Jin 已提交
2135 2136 2137
  if (read_options.snapshot != nullptr) {
    snapshot = reinterpret_cast<const SnapshotImpl*>(
        read_options.snapshot)->number_;
2138 2139 2140
  } else {
    snapshot = versions_->LastSequence();
  }
2141
  for (auto mgd_iter : multiget_cf_data) {
2142 2143
    mgd_iter.second->super_version =
        mgd_iter.second->cfd->GetSuperVersion()->Ref();
2144
  }
2145
  mutex_.Unlock();
2146

2147 2148
  // Contain a list of merge operations if merge occurs.
  MergeContext merge_context;
2149

2150
  // Note: this always resizes the values array
2151 2152 2153
  size_t num_keys = keys.size();
  std::vector<Status> stat_list(num_keys);
  values->resize(num_keys);
2154 2155

  // Keep track of bytes that we read for statistics-recording later
2156
  uint64_t bytes_read = 0;
L
Lei Jin 已提交
2157
  PERF_TIMER_STOP(get_snapshot_time);
2158 2159 2160 2161

  // For each of the given keys, apply the entire "get" process as follows:
  // First look in the memtable, then in the immutable memtable (if any).
  // s is both in/out. When in, s could either be OK or MergeInProgress.
2162
  // merge_operands will contain the sequence of merges in the latter case.
2163
  for (size_t i = 0; i < num_keys; ++i) {
2164
    merge_context.Clear();
2165
    Status& s = stat_list[i];
2166 2167 2168
    std::string* value = &(*values)[i];

    LookupKey lkey(keys[i], snapshot);
2169 2170
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
    auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
2171 2172 2173
    assert(mgd_iter != multiget_cf_data.end());
    auto mgd = mgd_iter->second;
    auto super_version = mgd->super_version;
2174
    if (super_version->mem->Get(lkey, value, &s, &merge_context)) {
2175
      // Done
2176
    } else if (super_version->imm->Get(lkey, value, &s, &merge_context)) {
2177 2178
      // Done
    } else {
2179
      PERF_TIMER_GUARD(get_from_output_files_time);
L
Lei Jin 已提交
2180 2181
      super_version->current->Get(read_options, lkey, value, &s,
                                  &merge_context);
2182 2183 2184
    }

    if (s.ok()) {
2185
      bytes_read += value->size();
2186 2187 2188 2189
    }
  }

  // Post processing (decrement reference counts and record statistics)
2190
  PERF_TIMER_GUARD(get_post_process_time);
2191 2192
  autovector<SuperVersion*> superversions_to_delete;

I
Igor Canadi 已提交
2193
  // TODO(icanadi) do we need lock here or just around Cleanup()?
2194 2195 2196 2197 2198 2199
  mutex_.Lock();
  for (auto mgd_iter : multiget_cf_data) {
    auto mgd = mgd_iter.second;
    if (mgd->super_version->Unref()) {
      mgd->super_version->Cleanup();
      superversions_to_delete.push_back(mgd->super_version);
2200 2201
    }
  }
2202 2203 2204 2205 2206 2207 2208
  mutex_.Unlock();

  for (auto td : superversions_to_delete) {
    delete td;
  }
  for (auto mgd : multiget_cf_data) {
    delete mgd.second;
2209
  }
2210

L
Lei Jin 已提交
2211 2212 2213
  RecordTick(stats_, NUMBER_MULTIGET_CALLS);
  RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
  RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
L
Lei Jin 已提交
2214
  PERF_TIMER_STOP(get_post_process_time);
2215

2216
  return stat_list;
2217 2218
}

L
Lei Jin 已提交
2219
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
2220
                                  const std::string& column_family_name,
2221
                                  ColumnFamilyHandle** handle) {
I
Igor Canadi 已提交
2222 2223 2224
  *handle = nullptr;
  MutexLock l(&mutex_);

I
Igor Canadi 已提交
2225 2226
  if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
      nullptr) {
I
Igor Canadi 已提交
2227 2228
    return Status::InvalidArgument("Column family already exists");
  }
2229
  VersionEdit edit;
2230
  edit.AddColumnFamily(column_family_name);
2231 2232
  uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
  edit.SetColumnFamily(new_id);
I
Igor Canadi 已提交
2233
  edit.SetLogNumber(logfile_number_);
L
Lei Jin 已提交
2234
  edit.SetComparatorName(cf_options.comparator->Name());
2235

I
Igor Canadi 已提交
2236 2237
  // LogAndApply will both write the creation in MANIFEST and create
  // ColumnFamilyData object
L
Lei Jin 已提交
2238
  Options opt(db_options_, cf_options);
2239 2240
  Status s = versions_->LogAndApply(nullptr,
      MutableCFOptions(opt, ImmutableCFOptions(opt)),
L
Lei Jin 已提交
2241
      &edit, &mutex_, db_directory_.get(), false, &cf_options);
2242
  if (s.ok()) {
2243
    single_column_family_mode_ = false;
2244 2245 2246
    auto cfd =
        versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
    assert(cfd != nullptr);
L
Lei Jin 已提交
2247
    delete InstallSuperVersion(cfd, nullptr, *cfd->GetLatestMutableCFOptions());
2248
    *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2249 2250
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "Created column family [%s] (ID %u)",
2251 2252
        column_family_name.c_str(), (unsigned)cfd->GetID());
  } else {
2253 2254
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
        "Creating column family [%s] FAILED -- %s",
2255 2256
        column_family_name.c_str(), s.ToString().c_str());
  }
2257
  return s;
2258 2259
}

2260 2261 2262 2263
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
  if (cfd->GetID() == 0) {
2264 2265
    return Status::InvalidArgument("Can't drop default column family");
  }
2266

I
Igor Canadi 已提交
2267 2268
  VersionEdit edit;
  edit.DropColumnFamily();
2269 2270
  edit.SetColumnFamily(cfd->GetID());

2271
  Status s;
2272 2273 2274 2275 2276 2277
  {
    MutexLock l(&mutex_);
    if (cfd->IsDropped()) {
      s = Status::InvalidArgument("Column family already dropped!\n");
    }
    if (s.ok()) {
2278
      // we drop column family from a single write thread
I
Igor Canadi 已提交
2279 2280
      WriteThread::Writer w(&mutex_);
      s = write_thread_.EnterWriteThread(&w, 0);
2281
      assert(s.ok() && !w.done);  // No timeout and nobody should do our job
2282 2283
      s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
                                 &edit, &mutex_);
I
Igor Canadi 已提交
2284
      write_thread_.ExitWriteThread(&w, &w, s);
2285
    }
2286
  }
2287

2288
  if (s.ok()) {
I
Igor Canadi 已提交
2289
    assert(cfd->IsDropped());
L
Lei Jin 已提交
2290 2291 2292
    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
    max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
                                  mutable_cf_options->max_write_buffer_number;
2293 2294
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "Dropped column family with id %u\n",
2295
        cfd->GetID());
2296
  } else {
2297
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
2298
        "Dropping column family with id %u FAILED -- %s\n",
2299 2300 2301
        cfd->GetID(), s.ToString().c_str());
  }

2302
  return s;
2303 2304
}

L
Lei Jin 已提交
2305
bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2306 2307
                         ColumnFamilyHandle* column_family, const Slice& key,
                         std::string* value, bool* value_found) {
2308
  if (value_found != nullptr) {
K
Kai Liu 已提交
2309 2310
    // falsify later if key-may-exist but can't fetch value
    *value_found = true;
2311
  }
L
Lei Jin 已提交
2312
  ReadOptions roptions = read_options;
2313
  roptions.read_tier = kBlockCacheTier; // read from block cache only
2314
  auto s = GetImpl(roptions, column_family, key, value, value_found);
K
Kai Liu 已提交
2315

2316
  // If block_cache is enabled and the index block of the table didn't
K
Kai Liu 已提交
2317 2318 2319
  // not present in block_cache, the return value will be Status::Incomplete.
  // In this case, key may still exist in the table.
  return s.ok() || s.IsIncomplete();
2320 2321
}

2322
Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2323 2324 2325
                              ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
T
Tomislav Novak 已提交
2326

2327
  if (read_options.tailing) {
I
Igor Canadi 已提交
2328 2329 2330 2331
#ifdef ROCKSDB_LITE
    // not supported in lite version
    return nullptr;
#else
2332 2333
    SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
    auto iter = new ForwardIterator(this, read_options, cfd, sv);
2334
    return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
2335 2336 2337
        kMaxSequenceNumber,
        sv->mutable_cf_options.max_sequential_skip_in_iterations,
        read_options.iterate_upper_bound);
I
Igor Canadi 已提交
2338
#endif
T
Tomislav Novak 已提交
2339
  } else {
2340
    SequenceNumber latest_snapshot = versions_->LastSequence();
2341
    SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
2342

I
Igor Canadi 已提交
2343
    auto snapshot =
2344 2345 2346
        read_options.snapshot != nullptr
            ? reinterpret_cast<const SnapshotImpl*>(
                read_options.snapshot)->number_
I
Igor Canadi 已提交
2347
            : latest_snapshot;
T
Tomislav Novak 已提交
2348

2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391
    // Try to generate a DB iterator tree in continuous memory area to be
    // cache friendly. Here is an example of result:
    // +-------------------------------+
    // |                               |
    // | ArenaWrappedDBIter            |
    // |  +                            |
    // |  +---> Inner Iterator   ------------+
    // |  |                            |     |
    // |  |    +-- -- -- -- -- -- -- --+     |
    // |  +--- | Arena                 |     |
    // |       |                       |     |
    // |          Allocated Memory:    |     |
    // |       |   +-------------------+     |
    // |       |   | DBIter            | <---+
    // |           |  +                |
    // |       |   |  +-> iter_  ------------+
    // |       |   |                   |     |
    // |       |   +-------------------+     |
    // |       |   | MergingIterator   | <---+
    // |           |  +                |
    // |       |   |  +->child iter1  ------------+
    // |       |   |  |                |          |
    // |           |  +->child iter2  ----------+ |
    // |       |   |  |                |        | |
    // |       |   |  +->child iter3  --------+ | |
    // |           |                   |      | | |
    // |       |   +-------------------+      | | |
    // |       |   | Iterator1         | <--------+
    // |       |   +-------------------+      | |
    // |       |   | Iterator2         | <------+
    // |       |   +-------------------+      |
    // |       |   | Iterator3         | <----+
    // |       |   +-------------------+
    // |       |                       |
    // +-------+-----------------------+
    //
    // ArenaWrappedDBIter inlines an arena area where all the iterartor in the
    // the iterator tree is allocated in the order of being accessed when
    // querying.
    // Laying out the iterators in the order of being accessed makes it more
    // likely that any iterator pointer is close to the iterator it points to so
    // that they are likely to be in the same cache line and/or page.
    ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2392
        env_, *cfd->ioptions(), cfd->user_comparator(),
2393
        snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
2394
        read_options.iterate_upper_bound);
2395

2396
    Iterator* internal_iter =
2397
        NewInternalIterator(read_options, cfd, sv, db_iter->GetArena());
2398 2399 2400 2401
    db_iter->SetIterUnderDBIter(internal_iter);

    return db_iter;
  }
2402 2403
  // To stop compiler from complaining
  return nullptr;
J
jorlow@chromium.org 已提交
2404 2405
}

2406
Status DBImpl::NewIterators(
2407
    const ReadOptions& read_options,
I
Igor Canadi 已提交
2408
    const std::vector<ColumnFamilyHandle*>& column_families,
2409
    std::vector<Iterator*>* iterators) {
I
Igor Canadi 已提交
2410 2411 2412
  iterators->clear();
  iterators->reserve(column_families.size());

2413
  if (read_options.tailing) {
I
Igor Canadi 已提交
2414 2415 2416 2417
#ifdef ROCKSDB_LITE
    return Status::InvalidArgument(
        "Tailing interator not supported in RocksDB lite");
#else
I
Igor Canadi 已提交
2418 2419
    for (auto cfh : column_families) {
      auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
2420 2421
      SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
      auto iter = new ForwardIterator(this, read_options, cfd, sv);
L
Lei Jin 已提交
2422
      iterators->push_back(
2423
          NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter,
2424 2425
              kMaxSequenceNumber,
              sv->mutable_cf_options.max_sequential_skip_in_iterations));
I
Igor Canadi 已提交
2426
    }
I
Igor Canadi 已提交
2427
#endif
I
Igor Canadi 已提交
2428
  } else {
2429 2430
    SequenceNumber latest_snapshot = versions_->LastSequence();

I
Igor Canadi 已提交
2431
    for (size_t i = 0; i < column_families.size(); ++i) {
2432 2433 2434
      auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
          column_families[i])->cfd();
      SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
I
Igor Canadi 已提交
2435 2436

      auto snapshot =
2437 2438 2439
          read_options.snapshot != nullptr
              ? reinterpret_cast<const SnapshotImpl*>(
                  read_options.snapshot)->number_
I
Igor Canadi 已提交
2440 2441
              : latest_snapshot;

2442
      ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2443
          env_, *cfd->ioptions(), cfd->user_comparator(), snapshot,
2444
          sv->mutable_cf_options.max_sequential_skip_in_iterations);
2445
      Iterator* internal_iter = NewInternalIterator(
2446
          read_options, cfd, sv, db_iter->GetArena());
2447 2448
      db_iter->SetIterUnderDBIter(internal_iter);
      iterators->push_back(db_iter);
I
Igor Canadi 已提交
2449 2450 2451 2452
    }
  }

  return Status::OK();
2453 2454
}

2455 2456 2457 2458 2459 2460 2461 2462 2463
bool DBImpl::IsSnapshotSupported() const {
  for (auto cfd : *versions_->GetColumnFamilySet()) {
    if (!cfd->mem()->IsSnapshotSupported()) {
      return false;
    }
  }
  return true;
}

J
jorlow@chromium.org 已提交
2464
const Snapshot* DBImpl::GetSnapshot() {
2465
  MutexLock l(&mutex_);
2466 2467
  // returns null if the underlying memtable does not support snapshot.
  if (!IsSnapshotSupported()) return nullptr;
2468
  return snapshots_.New(versions_->LastSequence());
J
jorlow@chromium.org 已提交
2469 2470 2471 2472
}

void DBImpl::ReleaseSnapshot(const Snapshot* s) {
  MutexLock l(&mutex_);
2473
  snapshots_.Delete(reinterpret_cast<const SnapshotImpl*>(s));
J
jorlow@chromium.org 已提交
2474 2475 2476
}

// Convenience methods
2477 2478
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
                   const Slice& key, const Slice& val) {
2479
  return DB::Put(o, column_family, key, val);
J
jorlow@chromium.org 已提交
2480 2481
}

2482 2483 2484
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
                     const Slice& key, const Slice& val) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2485
  if (!cfh->cfd()->ioptions()->merge_operator) {
2486 2487
    return Status::NotSupported("Provide a merge_operator when opening DB");
  } else {
2488
    return DB::Merge(o, column_family, key, val);
2489 2490 2491
  }
}

L
Lei Jin 已提交
2492
Status DBImpl::Delete(const WriteOptions& write_options,
2493
                      ColumnFamilyHandle* column_family, const Slice& key) {
L
Lei Jin 已提交
2494
  return DB::Delete(write_options, column_family, key);
J
jorlow@chromium.org 已提交
2495 2496
}

L
Lei Jin 已提交
2497
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
S
Stanislau Hlebik 已提交
2498 2499 2500
  if (my_batch == nullptr) {
    return Status::Corruption("Batch is nullptr!");
  }
2501
  PERF_TIMER_GUARD(write_pre_and_post_process_time);
I
Igor Canadi 已提交
2502
  WriteThread::Writer w(&mutex_);
S
Stanislau Hlebik 已提交
2503
  w.batch = my_batch;
L
Lei Jin 已提交
2504 2505
  w.sync = write_options.sync;
  w.disableWAL = write_options.disableWAL;
S
Stanislau Hlebik 已提交
2506 2507
  w.in_batch_group = false;
  w.done = false;
L
Lei Jin 已提交
2508
  w.timeout_hint_us = write_options.timeout_hint_us;
S
Stanislau Hlebik 已提交
2509 2510

  uint64_t expiration_time = 0;
I
Igor Canadi 已提交
2511
  bool has_timeout = false;
S
Stanislau Hlebik 已提交
2512
  if (w.timeout_hint_us == 0) {
I
Igor Canadi 已提交
2513
    w.timeout_hint_us = WriteThread::kNoTimeOut;
S
Stanislau Hlebik 已提交
2514 2515
  } else {
    expiration_time = env_->NowMicros() + w.timeout_hint_us;
I
Igor Canadi 已提交
2516
    has_timeout = true;
S
Stanislau Hlebik 已提交
2517 2518
  }

L
Lei Jin 已提交
2519
  if (!write_options.disableWAL) {
S
Stanislau Hlebik 已提交
2520 2521 2522 2523 2524 2525
    RecordTick(stats_, WRITE_WITH_WAL);
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1);
  }

  WriteContext context;
  mutex_.Lock();
I
Igor Canadi 已提交
2526
  Status status = write_thread_.EnterWriteThread(&w, expiration_time);
S
Stanislau Hlebik 已提交
2527 2528
  assert(status.ok() || status.IsTimedOut());
  if (status.IsTimedOut()) {
2529
    mutex_.Unlock();
L
Lei Jin 已提交
2530
    RecordTick(stats_, WRITE_TIMEDOUT);
2531
    return Status::TimedOut();
2532
  }
S
Stanislau Hlebik 已提交
2533 2534 2535 2536 2537 2538 2539 2540 2541 2542
  if (w.done) {  // write was done by someone else
    default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
                                           1);
    mutex_.Unlock();
    RecordTick(stats_, WRITE_DONE_BY_OTHER);
    return w.status;
  }

  RecordTick(stats_, WRITE_DONE_BY_SELF);
  default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
2543

2544 2545 2546 2547
  // Once reaches this point, the current writer "w" will try to do its write
  // job.  It may also pick up some of the remaining writers in the "writers_"
  // when it finds suitable, and finish them in the same write batch.
  // This is how a write job could be done by the other writer.
2548 2549 2550
  assert(!single_column_family_mode_ ||
         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);

2551
  uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0)
2552
                                    ? 4 * max_total_in_memory_state_
2553
                                    : db_options_.max_total_wal_size;
2554
  if (UNLIKELY(!single_column_family_mode_) &&
2555
      alive_log_files_.begin()->getting_flushed == false &&
I
Igor Canadi 已提交
2556
      total_log_size_ > max_total_wal_size) {
I
Igor Canadi 已提交
2557
    uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number;
I
Igor Canadi 已提交
2558
    alive_log_files_.begin()->getting_flushed = true;
2559
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
2560 2561 2562
        "Flushing all column families with data in WAL number %" PRIu64
        ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
        flush_column_family_if_log_file, total_log_size_, max_total_wal_size);
I
Igor Canadi 已提交
2563 2564
    // no need to refcount because drop is happening in write thread, so can't
    // happen while we're in the write thread
2565
    for (auto cfd : *versions_->GetColumnFamilySet()) {
I
Igor Canadi 已提交
2566
      if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
S
Stanislau Hlebik 已提交
2567
        status = SetNewMemtableAndNewLogFile(cfd, &context);
I
Igor Canadi 已提交
2568 2569 2570
        if (!status.ok()) {
          break;
        }
S
Stanislau Hlebik 已提交
2571
        cfd->imm()->FlushRequested();
2572
      }
2573
    }
I
Igor Canadi 已提交
2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586
    MaybeScheduleFlushOrCompaction();
  }

  if (UNLIKELY(status.ok() && !bg_error_.ok())) {
    status = bg_error_;
  }

  if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
    status = ScheduleFlushes(&context);
  }

  if (UNLIKELY(status.ok()) &&
      (write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) {
I
Igor Canadi 已提交
2587
    status = DelayWrite(expiration_time);
I
Igor Canadi 已提交
2588 2589 2590 2591 2592
  }

  if (UNLIKELY(status.ok() && has_timeout &&
               env_->NowMicros() > expiration_time)) {
    status = Status::TimedOut();
2593
  }
2594

D
dgrogan@chromium.org 已提交
2595
  uint64_t last_sequence = versions_->LastSequence();
I
Igor Canadi 已提交
2596
  WriteThread::Writer* last_writer = &w;
S
Stanislau Hlebik 已提交
2597
  if (status.ok()) {
2598
    autovector<WriteBatch*> write_batch_group;
I
Igor Canadi 已提交
2599
    write_thread_.BuildBatchGroup(&last_writer, &write_batch_group);
2600

2601 2602 2603
    // Add to log and apply to memtable.  We can release the lock
    // during this phase since &w is currently responsible for logging
    // and protects against concurrent loggers and concurrent writes
2604
    // into memtables
2605
    {
2606
      mutex_.Unlock();
2607 2608 2609 2610 2611 2612 2613 2614 2615 2616
      WriteBatch* updates = nullptr;
      if (write_batch_group.size() == 1) {
        updates = write_batch_group[0];
      } else {
        updates = &tmp_batch_;
        for (size_t i = 0; i < write_batch_group.size(); ++i) {
          WriteBatchInternal::Append(updates, write_batch_group[i]);
        }
      }

2617 2618 2619 2620
      const SequenceNumber current_sequence = last_sequence + 1;
      WriteBatchInternal::SetSequence(updates, current_sequence);
      int my_batch_count = WriteBatchInternal::Count(updates);
      last_sequence += my_batch_count;
2621
      const uint64_t batch_size = WriteBatchInternal::ByteSize(updates);
2622
      // Record statistics
L
Lei Jin 已提交
2623
      RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count);
S
sdong 已提交
2624
      RecordTick(stats_, BYTES_WRITTEN, batch_size);
L
Lei Jin 已提交
2625
      if (write_options.disableWAL) {
2626
        flush_on_destroy_ = true;
2627
      }
L
Lei Jin 已提交
2628
      PERF_TIMER_STOP(write_pre_and_post_process_time);
2629

2630
      uint64_t log_size = 0;
L
Lei Jin 已提交
2631
      if (!write_options.disableWAL) {
2632
        PERF_TIMER_GUARD(write_wal_time);
2633 2634
        Slice log_entry = WriteBatchInternal::Contents(updates);
        status = log_->AddRecord(log_entry);
I
Igor Canadi 已提交
2635 2636
        total_log_size_ += log_entry.size();
        alive_log_files_.back().AddSize(log_entry.size());
2637
        log_empty_ = false;
2638
        log_size = log_entry.size();
L
Lei Jin 已提交
2639
        RecordTick(stats_, WAL_FILE_BYTES, log_size);
L
Lei Jin 已提交
2640
        if (status.ok() && write_options.sync) {
I
Igor Canadi 已提交
2641
          RecordTick(stats_, WAL_FILE_SYNCED);
2642
          StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
2643
          if (db_options_.use_fsync) {
2644
            status = log_->file()->Fsync();
2645
          } else {
2646
            status = log_->file()->Sync();
2647
          }
H
heyongqiang 已提交
2648
        }
2649 2650
      }
      if (status.ok()) {
2651
        PERF_TIMER_GUARD(write_memtable_time);
Y
Yueh-Hsuan Chiang 已提交
2652

2653
        status = WriteBatchInternal::InsertInto(
I
Igor Canadi 已提交
2654
            updates, column_family_memtables_.get(),
L
Lei Jin 已提交
2655
            write_options.ignore_missing_column_families, 0, this, false);
Y
Yueh-Hsuan Chiang 已提交
2656 2657 2658 2659 2660 2661 2662 2663
        // A non-OK status here indicates iteration failure (either in-memory
        // writebatch corruption (very bad), or the client specified invalid
        // column family).  This will later on trigger bg_error_.
        //
        // Note that existing logic was not sound. Any partial failure writing
        // into the memtable would result in a state that some write ops might
        // have succeeded in memtable but Status reports error for all writes.

L
Lei Jin 已提交
2664
        SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
2665
      }
L
Lei Jin 已提交
2666
      PERF_TIMER_START(write_pre_and_post_process_time);
2667 2668 2669
      if (updates == &tmp_batch_) {
        tmp_batch_.Clear();
      }
2670
      mutex_.Lock();
2671 2672 2673
      // internal stats
      default_cf_internal_stats_->AddDBStats(
          InternalStats::BYTES_WRITTEN, batch_size);
S
sdong 已提交
2674 2675
      default_cf_internal_stats_->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN,
                                             my_batch_count);
L
Lei Jin 已提交
2676
      if (!write_options.disableWAL) {
2677 2678 2679 2680 2681
        default_cf_internal_stats_->AddDBStats(
            InternalStats::WAL_FILE_SYNCED, 1);
        default_cf_internal_stats_->AddDBStats(
            InternalStats::WAL_FILE_BYTES, log_size);
      }
2682
      if (status.ok()) {
2683
        versions_->SetLastSequence(last_sequence);
2684
      }
J
jorlow@chromium.org 已提交
2685 2686
    }
  }
2687
  if (db_options_.paranoid_checks && !status.ok() &&
2688
      !status.IsTimedOut() && bg_error_.ok()) {
I
Igor Canadi 已提交
2689 2690
    bg_error_ = status; // stop compaction & fail any further writes
  }
2691

I
Igor Canadi 已提交
2692
  write_thread_.ExitWriteThread(&w, last_writer, status);
I
Igor Canadi 已提交
2693
  mutex_.Unlock();
2694

2695
  if (status.IsTimedOut()) {
L
Lei Jin 已提交
2696
    RecordTick(stats_, WRITE_TIMEDOUT);
2697 2698
  }

J
jorlow@chromium.org 已提交
2699 2700 2701
  return status;
}

2702 2703
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
I
Igor Canadi 已提交
2704
Status DBImpl::DelayWrite(uint64_t expiration_time) {
2705 2706 2707 2708 2709 2710 2711 2712 2713
  StopWatch sw(env_, stats_, WRITE_STALL);
  bool has_timeout = (expiration_time > 0);
  auto delay = write_controller_.GetDelay();
  if (write_controller_.IsStopped() == false && delay > 0) {
    mutex_.Unlock();
    env_->SleepForMicroseconds(delay);
    mutex_.Lock();
  }

I
Igor Canadi 已提交
2714
  while (bg_error_.ok() && write_controller_.IsStopped()) {
2715 2716 2717
    if (has_timeout) {
      bg_cv_.TimedWait(expiration_time);
      if (env_->NowMicros() > expiration_time) {
I
Igor Canadi 已提交
2718
        return Status::TimedOut();
2719 2720 2721 2722 2723
      }
    } else {
      bg_cv_.Wait();
    }
  }
I
Igor Canadi 已提交
2724 2725

  return bg_error_;
2726 2727
}

I
Igor Canadi 已提交
2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738
Status DBImpl::ScheduleFlushes(WriteContext* context) {
  bool schedule_bg_work = false;
  ColumnFamilyData* cfd;
  while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
    schedule_bg_work = true;
    auto status = SetNewMemtableAndNewLogFile(cfd, context);
    if (cfd->Unref()) {
      delete cfd;
    }
    if (!status.ok()) {
      return status;
S
Stanislau Hlebik 已提交
2739 2740
    }
  }
I
Igor Canadi 已提交
2741 2742 2743 2744
  if (schedule_bg_work) {
    MaybeScheduleFlushOrCompaction();
  }
  return Status::OK();
S
Stanislau Hlebik 已提交
2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757
}

// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
                                           WriteContext* context) {
  mutex_.AssertHeld();
  unique_ptr<WritableFile> lfile;
  log::Writer* new_log = nullptr;
  MemTable* new_mem = nullptr;

  // Attempt to switch to a new memtable and trigger flush of old.
  // Do this without holding the dbmutex lock.
2758
  assert(versions_->prev_log_number() == 0);
S
Stanislau Hlebik 已提交
2759 2760 2761 2762
  bool creating_new_log = !log_empty_;
  uint64_t new_log_number =
      creating_new_log ? versions_->NewFileNumber() : logfile_number_;
  SuperVersion* new_superversion = nullptr;
2763
  const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
S
Stanislau Hlebik 已提交
2764 2765 2766 2767
  mutex_.Unlock();
  Status s;
  {
    if (creating_new_log) {
2768 2769 2770
      s = env_->NewWritableFile(
          LogFileName(db_options_.wal_dir, new_log_number),
          &lfile, env_->OptimizeForLogWrite(env_options_));
S
Stanislau Hlebik 已提交
2771 2772 2773
      if (s.ok()) {
        // Our final size should be less than write_buffer_size
        // (compression, etc) but err on the side of caution.
L
Lei Jin 已提交
2774 2775
        lfile->SetPreallocationBlockSize(
            1.1 * mutable_cf_options.write_buffer_size);
S
Stanislau Hlebik 已提交
2776
        new_log = new log::Writer(std::move(lfile));
I
Igor Canadi 已提交
2777
      }
S
Stanislau Hlebik 已提交
2778 2779 2780
    }

    if (s.ok()) {
L
Lei Jin 已提交
2781 2782
      new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(),
                             mutable_cf_options);
S
Stanislau Hlebik 已提交
2783 2784 2785
      new_superversion = new SuperVersion();
    }
  }
2786
  Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
I
Igor Canadi 已提交
2787 2788
      "[%s] New memtable created with log file: #%" PRIu64 "\n",
      cfd->GetName().c_str(), new_log_number);
S
Stanislau Hlebik 已提交
2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805
  mutex_.Lock();
  if (!s.ok()) {
    // how do we fail if we're not creating new log?
    assert(creating_new_log);
    // Avoid chewing through file number space in a tight loop.
    versions_->ReuseLogFileNumber(new_log_number);
    assert(!new_mem);
    assert(!new_log);
    return s;
  }
  if (creating_new_log) {
    logfile_number_ = new_log_number;
    assert(new_log != nullptr);
    context->logs_to_free_.push_back(log_.release());
    log_.reset(new_log);
    log_empty_ = true;
    alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
I
Igor Canadi 已提交
2806
    for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
S
Stanislau Hlebik 已提交
2807 2808 2809 2810
      // all this is just optimization to delete logs that
      // are no longer needed -- if CF is empty, that means it
      // doesn't need that particular log to stay alive, so we just
      // advance the log number. no need to persist this in the manifest
I
Igor Canadi 已提交
2811 2812 2813
      if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
          loop_cfd->imm()->size() == 0) {
        loop_cfd->SetLogNumber(logfile_number_);
2814
      }
2815 2816
    }
  }
S
Stanislau Hlebik 已提交
2817 2818 2819 2820 2821
  cfd->mem()->SetNextLogNumber(logfile_number_);
  cfd->imm()->Add(cfd->mem());
  new_mem->Ref();
  cfd->SetMemtable(new_mem);
  context->superversions_to_free_.push_back(
L
Lei Jin 已提交
2822
      InstallSuperVersion(cfd, new_superversion, mutable_cf_options));
2823 2824 2825
  return s;
}

I
Igor Canadi 已提交
2826
#ifndef ROCKSDB_LITE
I
Igor Canadi 已提交
2827 2828 2829 2830 2831
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
                                        TablePropertiesCollection* props) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();

2832 2833
  // Increment the ref count
  mutex_.Lock();
I
Igor Canadi 已提交
2834
  auto version = cfd->current();
2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846
  version->Ref();
  mutex_.Unlock();

  auto s = version->GetPropertiesOfAllTables(props);

  // Decrement the ref count
  mutex_.Lock();
  version->Unref();
  mutex_.Unlock();

  return s;
}
I
Igor Canadi 已提交
2847
#endif  // ROCKSDB_LITE
2848

I
Igor Canadi 已提交
2849 2850 2851 2852
const std::string& DBImpl::GetName() const {
  return dbname_;
}

2853 2854 2855 2856
Env* DBImpl::GetEnv() const {
  return env_;
}

2857 2858
const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
I
Igor Canadi 已提交
2859
  return *cfh->cfd()->options();
I
Igor Canadi 已提交
2860 2861
}

2862
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
2863
                         const Slice& property, std::string* value) {
2864 2865
  bool is_int_property = false;
  bool need_out_of_mutex = false;
2866 2867 2868
  DBPropertyType property_type =
      GetPropertyType(property, &is_int_property, &need_out_of_mutex);

2869
  value->clear();
2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884
  if (is_int_property) {
    uint64_t int_value;
    bool ret_value = GetIntPropertyInternal(column_family, property_type,
                                            need_out_of_mutex, &int_value);
    if (ret_value) {
      *value = std::to_string(int_value);
    }
    return ret_value;
  } else {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    auto cfd = cfh->cfd();
    MutexLock l(&mutex_);
    return cfd->internal_stats()->GetStringProperty(property_type, property,
                                                    value);
  }
J
jorlow@chromium.org 已提交
2885 2886
}

2887 2888
bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
                            const Slice& property, uint64_t* value) {
2889 2890
  bool is_int_property = false;
  bool need_out_of_mutex = false;
2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902
  DBPropertyType property_type =
      GetPropertyType(property, &is_int_property, &need_out_of_mutex);
  if (!is_int_property) {
    return false;
  }
  return GetIntPropertyInternal(column_family, property_type, need_out_of_mutex,
                                value);
}

bool DBImpl::GetIntPropertyInternal(ColumnFamilyHandle* column_family,
                                    DBPropertyType property_type,
                                    bool need_out_of_mutex, uint64_t* value) {
2903 2904
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
2905 2906 2907

  if (!need_out_of_mutex) {
    MutexLock l(&mutex_);
2908
    return cfd->internal_stats()->GetIntProperty(property_type, value, this);
2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922
  } else {
    SuperVersion* sv = GetAndRefSuperVersion(cfd);

    bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
        property_type, sv->current, value);

    ReturnAndCleanupSuperVersion(cfd, sv);

    return ret;
  }
}

SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
  // TODO(ljin): consider using GetReferencedSuperVersion() directly
I
Igor Canadi 已提交
2923
  return cfd->GetThreadLocalSuperVersion(&mutex_);
2924 2925 2926 2927
}

void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
                                          SuperVersion* sv) {
I
Igor Canadi 已提交
2928
  bool unref_sv = !cfd->ReturnThreadLocalSuperVersion(sv);
2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941

  if (unref_sv) {
    // Release SuperVersion
    if (sv->Unref()) {
      {
        MutexLock l(&mutex_);
        sv->Cleanup();
      }
      delete sv;
      RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
    }
    RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
  }
2942 2943
}

2944
void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
2945
                                 const Range* range, int n, uint64_t* sizes) {
J
jorlow@chromium.org 已提交
2946 2947
  // TODO(opt): better implementation
  Version* v;
2948 2949
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();
J
jorlow@chromium.org 已提交
2950 2951
  {
    MutexLock l(&mutex_);
2952
    v = cfd->current();
2953
    v->Ref();
J
jorlow@chromium.org 已提交
2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970
  }

  for (int i = 0; i < n; i++) {
    // Convert user_key into a corresponding internal key.
    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
    sizes[i] = (limit >= start ? limit - start : 0);
  }

  {
    MutexLock l(&mutex_);
    v->Unref();
  }
}

I
Igor Canadi 已提交
2971 2972 2973 2974 2975
#ifndef ROCKSDB_LITE
Status DBImpl::GetUpdatesSince(
    SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,
    const TransactionLogIterator::ReadOptions& read_options) {

L
Lei Jin 已提交
2976
  RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
I
Igor Canadi 已提交
2977 2978 2979
  if (seq > versions_->LastSequence()) {
    return Status::NotFound("Requested sequence not yet written in the db");
  }
I
Igor Canadi 已提交
2980
  return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
I
Igor Canadi 已提交
2981 2982
}

2983 2984 2985
Status DBImpl::DeleteFile(std::string name) {
  uint64_t number;
  FileType type;
2986 2987 2988
  WalFileType log_type;
  if (!ParseFileName(name, &number, &type, &log_type) ||
      (type != kTableFile && type != kLogFile)) {
2989 2990
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
        "DeleteFile %s failed.\n", name.c_str());
2991 2992 2993
    return Status::InvalidArgument("Invalid file name");
  }

2994 2995 2996 2997
  Status status;
  if (type == kLogFile) {
    // Only allow deleting archived log files
    if (log_type != kArchivedLogFile) {
2998 2999
      Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
          "DeleteFile %s failed - not archived log.\n",
3000
          name.c_str());
3001 3002
      return Status::NotSupported("Delete only supported for archived logs");
    }
3003
    status = env_->DeleteFile(db_options_.wal_dir + "/" + name.c_str());
3004
    if (!status.ok()) {
3005 3006
      Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
          "DeleteFile %s failed -- %s.\n",
3007
          name.c_str(), status.ToString().c_str());
3008 3009 3010 3011
    }
    return status;
  }

3012
  int level;
I
Igor Canadi 已提交
3013
  FileMetaData* metadata;
3014
  ColumnFamilyData* cfd;
3015
  VersionEdit edit;
I
Igor Canadi 已提交
3016
  JobContext job_context(true);
D
Dhruba Borthakur 已提交
3017 3018
  {
    MutexLock l(&mutex_);
3019
    status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
D
Dhruba Borthakur 已提交
3020
    if (!status.ok()) {
3021 3022
      Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
          "DeleteFile %s failed. File not found\n", name.c_str());
D
Dhruba Borthakur 已提交
3023 3024
      return Status::InvalidArgument("File not found");
    }
3025
    assert(level < cfd->NumberLevels());
3026

D
Dhruba Borthakur 已提交
3027
    // If the file is being compacted no need to delete.
3028
    if (metadata->being_compacted) {
3029
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
3030
          "DeleteFile %s Skipped. File about to be compacted\n", name.c_str());
D
Dhruba Borthakur 已提交
3031
      return Status::OK();
3032 3033
    }

D
Dhruba Borthakur 已提交
3034 3035 3036
    // Only the files in the last level can be deleted externally.
    // This is to make sure that any deletion tombstones are not
    // lost. Check that the level passed is the last level.
S
sdong 已提交
3037
    auto* vstoreage = cfd->current()->storage_info();
I
Igor Canadi 已提交
3038
    for (int i = level + 1; i < cfd->NumberLevels(); i++) {
S
sdong 已提交
3039
      if (vstoreage->NumLevelFiles(i) != 0) {
3040
        Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
3041
            "DeleteFile %s FAILED. File not in last level\n", name.c_str());
D
Dhruba Borthakur 已提交
3042 3043 3044
        return Status::InvalidArgument("File not in last level");
      }
    }
3045
    // if level == 0, it has to be the oldest file
S
sdong 已提交
3046 3047
    if (level == 0 &&
        vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3048 3049
      Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
          "DeleteFile %s failed ---"
L
Lei Jin 已提交
3050
          " target file in level 0 must be the oldest.", name.c_str());
3051 3052 3053
      return Status::InvalidArgument("File in level 0, but not oldest");
    }
    edit.SetColumnFamily(cfd->GetID());
D
Dhruba Borthakur 已提交
3054
    edit.DeleteFile(level, number);
3055 3056
    status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
                                    &edit, &mutex_, db_directory_.get());
I
Igor Canadi 已提交
3057
    if (status.ok()) {
I
Igor Canadi 已提交
3058 3059
      InstallSuperVersionBackground(cfd, &job_context,
                                    *cfd->GetLatestMutableCFOptions());
I
Igor Canadi 已提交
3060
    }
I
Igor Canadi 已提交
3061 3062
    FindObsoleteFiles(&job_context, false);
  }  // lock released here
3063
  LogFlush(db_options_.info_log);
I
Igor Canadi 已提交
3064
  // remove files outside the db-lock
I
Igor Canadi 已提交
3065 3066
  if (job_context.HaveSomethingToDelete()) {
    PurgeObsoleteFiles(job_context);
3067
  }
3068 3069 3070 3071 3072 3073
  {
    MutexLock l(&mutex_);
    // schedule flush if file deletion means we freed the space for flushes to
    // continue
    MaybeScheduleFlushOrCompaction();
  }
3074 3075 3076
  return status;
}

3077
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3078
  MutexLock l(&mutex_);
3079
  versions_->GetLiveFilesMetaData(metadata);
3080
}
I
Igor Canadi 已提交
3081
#endif  // ROCKSDB_LITE
3082

I
Igor Canadi 已提交
3083 3084 3085 3086 3087 3088 3089
Status DBImpl::CheckConsistency() {
  mutex_.AssertHeld();
  std::vector<LiveFileMetaData> metadata;
  versions_->GetLiveFilesMetaData(&metadata);

  std::string corruption_messages;
  for (const auto& md : metadata) {
3090 3091
    std::string file_path = md.db_path + "/" + md.name;

I
Igor Canadi 已提交
3092 3093 3094 3095 3096 3097
    uint64_t fsize = 0;
    Status s = env_->GetFileSize(file_path, &fsize);
    if (!s.ok()) {
      corruption_messages +=
          "Can't access " + md.name + ": " + s.ToString() + "\n";
    } else if (fsize != md.size) {
3098
      corruption_messages += "Sst file size mismatch: " + file_path +
I
Igor Canadi 已提交
3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110
                             ". Size recorded in manifest " +
                             std::to_string(md.size) + ", actual size " +
                             std::to_string(fsize) + "\n";
    }
  }
  if (corruption_messages.size() == 0) {
    return Status::OK();
  } else {
    return Status::Corruption(corruption_messages);
  }
}

3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137
Status DBImpl::GetDbIdentity(std::string& identity) {
  std::string idfilename = IdentityFileName(dbname_);
  unique_ptr<SequentialFile> idfile;
  const EnvOptions soptions;
  Status s = env_->NewSequentialFile(idfilename, &idfile, soptions);
  if (!s.ok()) {
    return s;
  }
  uint64_t file_size;
  s = env_->GetFileSize(idfilename, &file_size);
  if (!s.ok()) {
    return s;
  }
  char buffer[file_size];
  Slice id;
  s = idfile->Read(file_size, &id, buffer);
  if (!s.ok()) {
    return s;
  }
  identity.assign(id.ToString());
  // If last character is '\n' remove it from identity
  if (identity.size() > 0 && identity.back() == '\n') {
    identity.pop_back();
  }
  return s;
}

J
jorlow@chromium.org 已提交
3138 3139
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
3140
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
3141
               const Slice& key, const Slice& value) {
3142 3143 3144 3145
  // Pre-allocate size of write batch conservatively.
  // 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
  // and we allocate 11 extra bytes for key length, as well as value length.
  WriteBatch batch(key.size() + value.size() + 24);
3146
  batch.Put(column_family, key, value);
J
jorlow@chromium.org 已提交
3147 3148 3149
  return Write(opt, &batch);
}

3150 3151
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                  const Slice& key) {
J
jorlow@chromium.org 已提交
3152
  WriteBatch batch;
3153
  batch.Delete(column_family, key);
J
jorlow@chromium.org 已提交
3154 3155 3156
  return Write(opt, &batch);
}

3157 3158
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                 const Slice& key, const Slice& value) {
3159
  WriteBatch batch;
3160
  batch.Merge(column_family, key, value);
3161 3162 3163
  return Write(opt, &batch);
}

3164
// Default implementation -- returns not supported status
L
Lei Jin 已提交
3165
Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
3166
                              const std::string& column_family_name,
3167
                              ColumnFamilyHandle** handle) {
3168
  return Status::NotSupported("");
3169
}
3170
Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) {
3171
  return Status::NotSupported("");
3172 3173
}

J
jorlow@chromium.org 已提交
3174 3175
DB::~DB() { }

J
Jim Paton 已提交
3176
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
3177 3178 3179 3180
  DBOptions db_options(options);
  ColumnFamilyOptions cf_options(options);
  std::vector<ColumnFamilyDescriptor> column_families;
  column_families.push_back(
3181
      ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
3182
  std::vector<ColumnFamilyHandle*> handles;
3183
  Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
3184 3185 3186 3187 3188 3189 3190
  if (s.ok()) {
    assert(handles.size() == 1);
    // i can delete the handle since DBImpl is always holding a reference to
    // default column family
    delete handles[0];
  }
  return s;
3191 3192
}

3193 3194
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
                const std::vector<ColumnFamilyDescriptor>& column_families,
3195
                std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
3196
  Status s = SanitizeOptionsByTable(db_options, column_families);
3197 3198 3199
  if (!s.ok()) {
    return s;
  }
3200
  if (db_options.db_paths.size() > 1) {
3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212
    for (auto& cfd : column_families) {
      if (cfd.options.compaction_style != kCompactionStyleUniversal) {
        return Status::NotSupported(
            "More than one DB paths are only supported in "
            "universal compaction style. ");
      }
    }

    if (db_options.db_paths.size() > 4) {
      return Status::NotSupported(
        "More than four DB paths are not supported yet. ");
    }
3213 3214
  }

3215
  *dbptr = nullptr;
3216
  handles->clear();
J
jorlow@chromium.org 已提交
3217

I
Igor Canadi 已提交
3218 3219 3220 3221
  size_t max_write_buffer_size = 0;
  for (auto cf : column_families) {
    max_write_buffer_size =
        std::max(max_write_buffer_size, cf.options.write_buffer_size);
3222
  }
3223

I
Igor Canadi 已提交
3224
  DBImpl* impl = new DBImpl(db_options, dbname);
3225
  s = impl->env_->CreateDirIfMissing(impl->db_options_.wal_dir);
3226
  if (s.ok()) {
3227
    for (auto db_path : impl->db_options_.db_paths) {
3228
      s = impl->env_->CreateDirIfMissing(db_path.path);
3229 3230 3231 3232 3233 3234
      if (!s.ok()) {
        break;
      }
    }
  }

3235 3236 3237 3238 3239 3240
  if (!s.ok()) {
    delete impl;
    return s;
  }

  s = impl->CreateArchivalDirectory();
3241 3242 3243 3244
  if (!s.ok()) {
    delete impl;
    return s;
  }
J
jorlow@chromium.org 已提交
3245
  impl->mutex_.Lock();
3246 3247
  // Handles create_if_missing, error_if_exists
  s = impl->Recover(column_families);
J
jorlow@chromium.org 已提交
3248
  if (s.ok()) {
3249
    uint64_t new_log_number = impl->versions_->NewFileNumber();
3250
    unique_ptr<WritableFile> lfile;
3251
    EnvOptions soptions(db_options);
3252 3253 3254
    s = impl->db_options_.env->NewWritableFile(
        LogFileName(impl->db_options_.wal_dir, new_log_number), &lfile,
        impl->db_options_.env->OptimizeForLogWrite(soptions));
J
jorlow@chromium.org 已提交
3255
    if (s.ok()) {
3256
      lfile->SetPreallocationBlockSize(1.1 * max_write_buffer_size);
3257
      impl->logfile_number_ = new_log_number;
3258
      impl->log_.reset(new log::Writer(std::move(lfile)));
I
Igor Canadi 已提交
3259

3260 3261
      // set column family handles
      for (auto cf : column_families) {
I
Igor Canadi 已提交
3262 3263
        auto cfd =
            impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
I
Igor Canadi 已提交
3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282
        if (cfd != nullptr) {
          handles->push_back(
              new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
        } else {
          if (db_options.create_missing_column_families) {
            // missing column family, create it
            ColumnFamilyHandle* handle;
            impl->mutex_.Unlock();
            s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
            impl->mutex_.Lock();
            if (s.ok()) {
              handles->push_back(handle);
            } else {
              break;
            }
          } else {
            s = Status::InvalidArgument("Column family not found: ", cf.name);
            break;
          }
I
Igor Canadi 已提交
3283
        }
3284
      }
I
Igor Canadi 已提交
3285 3286
    }
    if (s.ok()) {
3287
      for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
L
Lei Jin 已提交
3288 3289
        delete impl->InstallSuperVersion(
            cfd, nullptr, *cfd->GetLatestMutableCFOptions());
3290
      }
I
Igor Canadi 已提交
3291 3292
      impl->alive_log_files_.push_back(
          DBImpl::LogFileNumberSize(impl->logfile_number_));
J
jorlow@chromium.org 已提交
3293
      impl->DeleteObsoleteFiles();
3294
      impl->MaybeScheduleFlushOrCompaction();
3295
      s = impl->db_directory_->Fsync();
J
jorlow@chromium.org 已提交
3296 3297
    }
  }
3298

I
Igor Canadi 已提交
3299 3300
  if (s.ok()) {
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
3301 3302
      if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
          cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
S
sdong 已提交
3303
        auto* vstorage = cfd->current()->storage_info();
3304
        for (int i = 1; i < vstorage->num_levels(); ++i) {
S
sdong 已提交
3305
          int num_files = vstorage->NumLevelFiles(i);
I
Igor Canadi 已提交
3306
          if (num_files > 0) {
I
Igor Canadi 已提交
3307 3308 3309
            s = Status::InvalidArgument(
                "Not all files are at level 0. Cannot "
                "open with universal or FIFO compaction style.");
I
Igor Canadi 已提交
3310 3311 3312 3313
            break;
          }
        }
      }
3314
      if (cfd->ioptions()->merge_operator != nullptr &&
3315 3316 3317 3318 3319
          !cfd->mem()->IsMergeOperatorSupported()) {
        s = Status::InvalidArgument(
            "The memtable of column family %s does not support merge operator "
            "its options.merge_operator is non-null", cfd->GetName().c_str());
      }
I
Igor Canadi 已提交
3320
      if (!s.ok()) {
3321 3322 3323 3324 3325
        break;
      }
    }
  }

3326 3327
  impl->mutex_.Unlock();

J
jorlow@chromium.org 已提交
3328
  if (s.ok()) {
3329
    impl->opened_successfully_ = true;
J
jorlow@chromium.org 已提交
3330 3331
    *dbptr = impl;
  } else {
3332 3333 3334
    for (auto h : *handles) {
      delete h;
    }
3335
    handles->clear();
J
jorlow@chromium.org 已提交
3336 3337 3338 3339 3340
    delete impl;
  }
  return s;
}

3341 3342 3343
Status DB::ListColumnFamilies(const DBOptions& db_options,
                              const std::string& name,
                              std::vector<std::string>* column_families) {
I
Igor Canadi 已提交
3344
  return VersionSet::ListColumnFamilies(column_families, name, db_options.env);
3345 3346
}

3347 3348 3349
Snapshot::~Snapshot() {
}

J
jorlow@chromium.org 已提交
3350
Status DestroyDB(const std::string& dbname, const Options& options) {
3351
  const InternalKeyComparator comparator(options.comparator);
3352
  const Options& soptions(SanitizeOptions(dbname, &comparator, options));
3353
  Env* env = soptions.env;
J
jorlow@chromium.org 已提交
3354
  std::vector<std::string> filenames;
3355 3356
  std::vector<std::string> archiveFiles;

3357
  std::string archivedir = ArchivalDirectory(dbname);
J
jorlow@chromium.org 已提交
3358 3359
  // Ignore error in case directory does not exist
  env->GetChildren(dbname, &filenames);
3360 3361 3362 3363 3364 3365 3366

  if (dbname != soptions.wal_dir) {
    std::vector<std::string> logfilenames;
    env->GetChildren(soptions.wal_dir, &logfilenames);
    filenames.insert(filenames.end(), logfilenames.begin(), logfilenames.end());
    archivedir = ArchivalDirectory(soptions.wal_dir);
  }
3367

J
jorlow@chromium.org 已提交
3368 3369 3370 3371 3372
  if (filenames.empty()) {
    return Status::OK();
  }

  FileLock* lock;
3373 3374
  const std::string lockname = LockFileName(dbname);
  Status result = env->LockFile(lockname, &lock);
J
jorlow@chromium.org 已提交
3375 3376 3377
  if (result.ok()) {
    uint64_t number;
    FileType type;
3378
    InfoLogPrefix info_log_prefix(!options.db_log_dir.empty(), dbname);
D
dgrogan@chromium.org 已提交
3379
    for (size_t i = 0; i < filenames.size(); i++) {
3380
      if (ParseFileName(filenames[i], &number, info_log_prefix.prefix, &type) &&
3381
          type != kDBLockFile) {  // Lock file will be deleted at end
K
Kosie van der Merwe 已提交
3382 3383 3384
        Status del;
        if (type == kMetaDatabase) {
          del = DestroyDB(dbname + "/" + filenames[i], options);
3385 3386
        } else if (type == kLogFile) {
          del = env->DeleteFile(soptions.wal_dir + "/" + filenames[i]);
K
Kosie van der Merwe 已提交
3387 3388 3389
        } else {
          del = env->DeleteFile(dbname + "/" + filenames[i]);
        }
J
jorlow@chromium.org 已提交
3390 3391 3392 3393 3394
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
3395

3396 3397
    for (auto& db_path : options.db_paths) {
      env->GetChildren(db_path.path, &filenames);
3398 3399 3400
      for (size_t i = 0; i < filenames.size(); i++) {
        if (ParseFileName(filenames[i], &number, &type) &&
            type == kTableFile) {  // Lock file will be deleted at end
3401
          Status del = env->DeleteFile(db_path.path + "/" + filenames[i]);
3402 3403 3404 3405 3406 3407 3408
          if (result.ok() && !del.ok()) {
            result = del;
          }
        }
      }
    }

3409
    env->GetChildren(archivedir, &archiveFiles);
3410 3411
    // Delete archival files.
    for (size_t i = 0; i < archiveFiles.size(); ++i) {
3412 3413
      if (ParseFileName(archiveFiles[i], &number, &type) &&
          type == kLogFile) {
3414
        Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]);
3415 3416 3417 3418 3419
        if (result.ok() && !del.ok()) {
          result = del;
        }
      }
    }
3420
    // ignore case where no archival directory is present.
3421
    env->DeleteDir(archivedir);
3422

J
jorlow@chromium.org 已提交
3423
    env->UnlockFile(lock);  // Ignore error since state is already gone
3424
    env->DeleteFile(lockname);
J
jorlow@chromium.org 已提交
3425
    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
3426
    env->DeleteDir(soptions.wal_dir);
J
jorlow@chromium.org 已提交
3427 3428 3429 3430
  }
  return result;
}

3431 3432
//
// A global method that can dump out the build version
I
Igor Canadi 已提交
3433
void DumpRocksDBBuildVersion(Logger * log) {
I
Igor Canadi 已提交
3434
#if !defined(IOS_CROSS_COMPILE)
I
Igor Canadi 已提交
3435 3436
  // if we compile with Xcode, we don't run build_detect_vesion, so we don't
  // generate util/build_version.cc
3437 3438
  Log(InfoLogLevel::INFO_LEVEL, log,
      "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR, ROCKSDB_MINOR,
I
Igor Canadi 已提交
3439
      ROCKSDB_PATCH);
3440 3441
  Log(InfoLogLevel::INFO_LEVEL, log, "Git sha %s", rocksdb_build_git_sha);
  Log(InfoLogLevel::INFO_LEVEL, log, "Compile time %s %s",
3442
      rocksdb_build_compile_time, rocksdb_build_compile_date);
I
Igor Canadi 已提交
3443
#endif
3444 3445
}

3446
}  // namespace rocksdb