db_impl_compaction_flush.cc 78.9 KB
Newer Older
S
Siying Dong 已提交
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
S
Siying Dong 已提交
5 6 7 8 9 10 11 12 13 14 15 16
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>

#include "db/builder.h"
17
#include "db/error_handler.h"
18
#include "db/event_helpers.h"
19 20 21 22
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
S
Siying Dong 已提交
23 24
#include "util/sst_file_manager_impl.h"
#include "util/sync_point.h"
S
Siying Dong 已提交
25 26

namespace rocksdb {
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41
bool DBImpl::EnoughRoomForCompaction(
    const std::vector<CompactionInputFiles>& inputs,
    bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
  // Check if we have enough room to do the compaction
  bool enough_room = true;
#ifndef ROCKSDB_LITE
  auto sfm = static_cast<SstFileManagerImpl*>(
      immutable_db_options_.sst_file_manager.get());
  if (sfm) {
    enough_room = sfm->EnoughRoomForCompaction(inputs);
    if (enough_room) {
      *sfm_reserved_compact_space = true;
    }
  }
42 43 44
#else
  (void)inputs;
  (void)sfm_reserved_compact_space;
45 46 47 48 49 50 51 52 53 54 55 56
#endif  // ROCKSDB_LITE
  if (!enough_room) {
    // Just in case tests want to change the value of enough_room
    TEST_SYNC_POINT_CALLBACK(
        "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
    ROCKS_LOG_BUFFER(log_buffer,
                     "Cancelled compaction because not enough room");
    RecordTick(stats_, COMPACTION_CANCELLED, 1);
  }
  return enough_room;
}

S
Siying Dong 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
  TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
  mutex_.AssertHeld();
  autovector<log::Writer*, 1> logs_to_sync;
  uint64_t current_log_number = logfile_number_;
  while (logs_.front().number < current_log_number &&
         logs_.front().getting_synced) {
    log_sync_cv_.Wait();
  }
  for (auto it = logs_.begin();
       it != logs_.end() && it->number < current_log_number; ++it) {
    auto& log = *it;
    assert(!log.getting_synced);
    log.getting_synced = true;
    logs_to_sync.push_back(log.writer);
  }

  Status s;
  if (!logs_to_sync.empty()) {
    mutex_.Unlock();

    for (log::Writer* log : logs_to_sync) {
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
                     "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
                     log->get_log_number());
      s = log->file()->Sync(immutable_db_options_.use_fsync);
83 84 85
      if (!s.ok()) {
        break;
      }
S
Siying Dong 已提交
86 87 88 89 90 91 92 93 94 95 96
    }
    if (s.ok()) {
      s = directories_.GetWalDir()->Fsync();
    }

    mutex_.Lock();

    // "number <= current_log_number - 1" is equivalent to
    // "number < current_log_number".
    MarkLogsSynced(current_log_number - 1, true, s);
    if (!s.ok()) {
97
      error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
S
Siying Dong 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
      TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
      return s;
    }
  }
  return s;
}

Status DBImpl::FlushMemTableToOutputFile(
    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
    bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
  mutex_.AssertHeld();
  assert(cfd->imm()->NumNotFlushed() != 0);
  assert(cfd->imm()->IsFlushPending());

  SequenceNumber earliest_write_conflict_snapshot;
  std::vector<SequenceNumber> snapshot_seqs =
      snapshots_.GetAll(&earliest_write_conflict_snapshot);

116 117 118 119
  auto snapshot_checker = snapshot_checker_.get();
  if (use_custom_gc_ && snapshot_checker == nullptr) {
    snapshot_checker = DisableGCSnapshotChecker::Instance();
  }
S
Siying Dong 已提交
120
  FlushJob flush_job(
121
      dbname_, cfd, immutable_db_options_, mutable_cf_options,
Y
Yi Wu 已提交
122
      env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
123
      snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
124
      job_context, log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
S
Siying Dong 已提交
125 126 127 128 129 130 131
      GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
      &event_logger_, mutable_cf_options.report_bg_io_stats);

  FileMetaData file_meta;

  flush_job.PickMemTable();

Y
Yi Wu 已提交
132 133 134 135 136 137
#ifndef ROCKSDB_LITE
  // may temporarily unlock and lock the mutex.
  NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
                     flush_job.GetTableProperties());
#endif  // ROCKSDB_LITE

S
Siying Dong 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  Status s;
  if (logfile_number_ > 0 &&
      versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) {
    // If there are more than one column families, we need to make sure that
    // all the log files except the most recent one are synced. Otherwise if
    // the host crashes after flushing and before WAL is persistent, the
    // flushed SST may contain data from write batches whose updates to
    // other column families are missing.
    // SyncClosedLogs() may unlock and re-lock the db_mutex.
    s = SyncClosedLogs(job_context);
  }

  // Within flush_job.Run, rocksdb may call event listener to notify
  // file creation and deletion.
  //
  // Note that flush_job.Run will unlock and lock the db_mutex,
  // and EventListener callback will be called when the db_mutex
  // is unlocked by the current thread.
  if (s.ok()) {
S
Siying Dong 已提交
157
    s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
S
Siying Dong 已提交
158 159 160 161 162
  } else {
    flush_job.Cancel();
  }

  if (s.ok()) {
163 164
    InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context,
                                       mutable_cf_options);
S
Siying Dong 已提交
165 166 167 168 169 170 171 172 173
    if (made_progress) {
      *made_progress = 1;
    }
    VersionStorageInfo::LevelSummaryStorage tmp;
    ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
                     cfd->GetName().c_str(),
                     cfd->current()->storage_info()->LevelSummary(&tmp));
  }

174
  if (!s.ok() && !s.IsShutdownInProgress()) {
175
    Status new_bg_error = s;
176
    error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
S
Siying Dong 已提交
177 178 179 180 181 182 183 184 185 186 187
  }
  if (s.ok()) {
#ifndef ROCKSDB_LITE
    // may temporarily unlock and lock the mutex.
    NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
                           job_context->job_id, flush_job.GetTableProperties());
    auto sfm = static_cast<SstFileManagerImpl*>(
        immutable_db_options_.sst_file_manager.get());
    if (sfm) {
      // Notify sst_file_manager that a new file was added
      std::string file_path = MakeTableFileName(
188
          cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
S
Siying Dong 已提交
189
      sfm->OnAddFile(file_path);
190 191
      if (sfm->IsMaxAllowedSpaceReached()) {
        Status new_bg_error = Status::SpaceLimit("Max allowed space was reached");
S
Siying Dong 已提交
192 193
        TEST_SYNC_POINT_CALLBACK(
            "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
194
            &new_bg_error);
195
        error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
S
Siying Dong 已提交
196 197 198 199 200 201 202
      }
    }
#endif  // ROCKSDB_LITE
  }
  return s;
}

Y
Yi Wu 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
                                const MutableCFOptions& mutable_cf_options,
                                int job_id, TableProperties prop) {
#ifndef ROCKSDB_LITE
  if (immutable_db_options_.listeners.size() == 0U) {
    return;
  }
  mutex_.AssertHeld();
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }
  bool triggered_writes_slowdown =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_slowdown_writes_trigger);
  bool triggered_writes_stop =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_stop_writes_trigger);
  // release lock while notifying events
  mutex_.Unlock();
  {
    FlushJobInfo info;
    info.cf_name = cfd->GetName();
    // TODO(yhchiang): make db_paths dynamic in case flush does not
    //                 go to L0 in the future.
227
    info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
Y
Yi Wu 已提交
228 229 230 231 232 233 234 235
                                       file_meta->fd.GetNumber());
    info.thread_id = env_->GetThreadID();
    info.job_id = job_id;
    info.triggered_writes_slowdown = triggered_writes_slowdown;
    info.triggered_writes_stop = triggered_writes_stop;
    info.smallest_seqno = file_meta->smallest_seqno;
    info.largest_seqno = file_meta->largest_seqno;
    info.table_properties = prop;
236
    info.flush_reason = cfd->GetFlushReason();
Y
Yi Wu 已提交
237 238 239 240 241 242 243
    for (auto listener : immutable_db_options_.listeners) {
      listener->OnFlushBegin(this, info);
    }
  }
  mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
244 245 246 247 248 249
#else
  (void)cfd;
  (void)file_meta;
  (void)mutable_cf_options;
  (void)job_id;
  (void)prop;
Y
Yi Wu 已提交
250 251 252
#endif  // ROCKSDB_LITE
}

S
Siying Dong 已提交
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
                                    FileMetaData* file_meta,
                                    const MutableCFOptions& mutable_cf_options,
                                    int job_id, TableProperties prop) {
#ifndef ROCKSDB_LITE
  if (immutable_db_options_.listeners.size() == 0U) {
    return;
  }
  mutex_.AssertHeld();
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }
  bool triggered_writes_slowdown =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_slowdown_writes_trigger);
  bool triggered_writes_stop =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_stop_writes_trigger);
  // release lock while notifying events
  mutex_.Unlock();
  {
    FlushJobInfo info;
    info.cf_name = cfd->GetName();
    // TODO(yhchiang): make db_paths dynamic in case flush does not
    //                 go to L0 in the future.
278
    info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
S
Siying Dong 已提交
279 280 281 282 283 284 285 286
                                       file_meta->fd.GetNumber());
    info.thread_id = env_->GetThreadID();
    info.job_id = job_id;
    info.triggered_writes_slowdown = triggered_writes_slowdown;
    info.triggered_writes_stop = triggered_writes_stop;
    info.smallest_seqno = file_meta->smallest_seqno;
    info.largest_seqno = file_meta->largest_seqno;
    info.table_properties = prop;
287
    info.flush_reason = cfd->GetFlushReason();
S
Siying Dong 已提交
288 289 290 291 292 293 294
    for (auto listener : immutable_db_options_.listeners) {
      listener->OnFlushCompleted(this, info);
    }
  }
  mutex_.Lock();
  // no need to signal bg_cv_ as it will be signaled at the end of the
  // flush process.
295 296 297 298 299 300
#else
  (void)cfd;
  (void)file_meta;
  (void)mutable_cf_options;
  (void)job_id;
  (void)prop;
S
Siying Dong 已提交
301 302 303 304 305 306
#endif  // ROCKSDB_LITE
}

Status DBImpl::CompactRange(const CompactRangeOptions& options,
                            ColumnFamilyHandle* column_family,
                            const Slice* begin, const Slice* end) {
307 308 309 310
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  auto cfd = cfh->cfd();

  if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
S
Siying Dong 已提交
311 312 313 314 315
    return Status::InvalidArgument("Invalid target path ID");
  }

  bool exclusive = options.exclusive_manual_compaction;

316
  bool flush_needed = true;
317 318 319 320 321 322 323 324 325 326 327
  if (begin != nullptr && end != nullptr) {
    // TODO(ajkr): We could also optimize away the flush in certain cases where
    // one/both sides of the interval are unbounded. But it requires more
    // changes to RangesOverlapWithMemtables.
    Range range(*begin, *end);
    SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
    cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
    CleanupSuperVersion(super_version);
  }

  if (!options.allow_write_stall && flush_needed) {
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    InstrumentedMutexLock l(&mutex_);
    uint64_t orig_active_memtable_id = cfd->mem()->GetID();
    WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
    do {
      if (write_stall_condition != WriteStallCondition::kNormal) {
        TEST_SYNC_POINT("DBImpl::CompactRange:StallWait");
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "[%s] CompactRange waiting on stall conditions to clear",
                       cfd->GetName().c_str());
        bg_cv_.Wait();
      }
      if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) {
        return Status::ShutdownInProgress();
      }

      uint64_t earliest_memtable_id =
          std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
      if (earliest_memtable_id > orig_active_memtable_id) {
        // We waited so long that the memtable we were originally waiting on was
        // flushed.
        flush_needed = false;
        break;
      }

      const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
      const auto* vstorage = cfd->current()->storage_info();

      // Skip stalling check if we're below auto-flush and auto-compaction
      // triggers. If it stalled in these conditions, that'd mean the stall
      // triggers are so low that stalling is needed for any background work. In
      // that case we shouldn't wait since background work won't be scheduled.
      if (cfd->imm()->NumNotFlushed() <
              cfd->ioptions()->min_write_buffer_number_to_merge &&
          vstorage->l0_delay_trigger_count() <
              mutable_cf_options.level0_file_num_compaction_trigger) {
        break;
      }

      // check whether one extra immutable memtable or an extra L0 file would
      // cause write stalling mode to be entered. It could still enter stall
      // mode due to pending compaction bytes, but that's less common
      write_stall_condition =
          ColumnFamilyData::GetWriteStallConditionAndCause(
              cfd->imm()->NumNotFlushed() + 1,
              vstorage->l0_delay_trigger_count() + 1,
              vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
              .first;
    } while (write_stall_condition != WriteStallCondition::kNormal);
  }
  TEST_SYNC_POINT("DBImpl::CompactRange:StallWaitDone");
  Status s;
  if (flush_needed) {
    s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction);
    if (!s.ok()) {
      LogFlush(immutable_db_options_.info_log);
      return s;
    }
S
Siying Dong 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
  }

  int max_level_with_files = 0;
  {
    InstrumentedMutexLock l(&mutex_);
    Version* base = cfd->current();
    for (int level = 1; level < base->storage_info()->num_non_empty_levels();
         level++) {
      if (base->storage_info()->OverlapInLevel(level, begin, end)) {
        max_level_with_files = level;
      }
    }
  }

  int final_output_level = 0;
  if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
      cfd->NumberLevels() > 1) {
    // Always compact all files together.
403 404 405 406 407
    final_output_level = cfd->NumberLevels() - 1;
    // if bottom most level is reserved
    if (immutable_db_options_.allow_ingest_behind) {
      final_output_level--;
    }
S
Siying Dong 已提交
408
    s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
409 410
                            final_output_level, options.target_path_id,
                            options.max_subcompactions, begin, end, exclusive);
S
Siying Dong 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
  } else {
    for (int level = 0; level <= max_level_with_files; level++) {
      int output_level;
      // in case the compaction is universal or if we're compacting the
      // bottom-most level, the output level will be the same as input one.
      // level 0 can never be the bottommost level (i.e. if all files are in
      // level 0, we will compact to level 1)
      if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
          cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
        output_level = level;
      } else if (level == max_level_with_files && level > 0) {
        if (options.bottommost_level_compaction ==
            BottommostLevelCompaction::kSkip) {
          // Skip bottommost level compaction
          continue;
        } else if (options.bottommost_level_compaction ==
                       BottommostLevelCompaction::kIfHaveCompactionFilter &&
                   cfd->ioptions()->compaction_filter == nullptr &&
                   cfd->ioptions()->compaction_filter_factory == nullptr) {
          // Skip bottommost level compaction since we don't have a compaction
          // filter
          continue;
        }
        output_level = level;
      } else {
        output_level = level + 1;
        if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
            cfd->ioptions()->level_compaction_dynamic_level_bytes &&
            level == 0) {
          output_level = ColumnFamilyData::kCompactToBaseLevel;
        }
      }
      s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
444
                              options.max_subcompactions, begin, end, exclusive);
S
Siying Dong 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
      if (!s.ok()) {
        break;
      }
      if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
        final_output_level = cfd->NumberLevels() - 1;
      } else if (output_level > final_output_level) {
        final_output_level = output_level;
      }
      TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
      TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
    }
  }
  if (!s.ok()) {
    LogFlush(immutable_db_options_.info_log);
    return s;
  }

  if (options.change_level) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[RefitLevel] waiting for background threads to stop");
    s = PauseBackgroundWork();
    if (s.ok()) {
      s = ReFitLevel(cfd, final_output_level, options.target_level);
    }
    ContinueBackgroundWork();
  }
  LogFlush(immutable_db_options_.info_log);

  {
    InstrumentedMutexLock l(&mutex_);
    // an automatic compaction that has been scheduled might have been
    // preempted by the manual compactions. Need to schedule it back.
    MaybeScheduleFlushOrCompaction();
  }

  return s;
}

483 484 485 486 487
Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
                            ColumnFamilyHandle* column_family,
                            const std::vector<std::string>& input_file_names,
                            const int output_level, const int output_path_id,
                            std::vector<std::string>* const output_file_names) {
S
Siying Dong 已提交
488
#ifdef ROCKSDB_LITE
489 490 491 492 493 494 495
  (void)compact_options;
  (void)column_family;
  (void)input_file_names;
  (void)output_level;
  (void)output_path_id;
  (void)output_file_names;
  // not supported in lite version
S
Siying Dong 已提交
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
  return Status::NotSupported("Not supported in ROCKSDB LITE");
#else
  if (column_family == nullptr) {
    return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
  }

  auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
  assert(cfd);

  Status s;
  JobContext job_context(0, true);
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
                       immutable_db_options_.info_log.get());

  // Perform CompactFiles
  SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
  {
    InstrumentedMutexLock l(&mutex_);

    // This call will unlock/lock the mutex to wait for current running
    // IngestExternalFile() calls to finish.
    WaitForIngestFile();

519 520 521
    s = CompactFilesImpl(compact_options, cfd, sv->current, input_file_names,
                         output_file_names, output_level, output_path_id,
                         &job_context, &log_buffer);
S
Siying Dong 已提交
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
  }
  if (sv->Unref()) {
    mutex_.Lock();
    sv->Cleanup();
    mutex_.Unlock();
    delete sv;
  }

  // Find and delete obsolete files
  {
    InstrumentedMutexLock l(&mutex_);
    // If !s.ok(), this means that Compaction failed. In that case, we want
    // to delete all obsolete files we might have created and we force
    // FindObsoleteFiles(). This is because job_context does not
    // catch all created files if compaction failed.
    FindObsoleteFiles(&job_context, !s.ok());
  }  // release the mutex

  // delete unnecessary files if any, this is done outside the mutex
541 542
  if (job_context.HaveSomethingToClean() ||
      job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
S
Siying Dong 已提交
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
    // Have to flush the info logs before bg_compaction_scheduled_--
    // because if bg_flush_scheduled_ becomes 0 and the lock is
    // released, the deconstructor of DB can kick in and destroy all the
    // states of DB so info_log might not be available after that point.
    // It also applies to access other states that DB owns.
    log_buffer.FlushBufferToLog();
    if (job_context.HaveSomethingToDelete()) {
      // no mutex is locked here.  No need to Unlock() and Lock() here.
      PurgeObsoleteFiles(job_context);
    }
    job_context.Clean();
  }

  return s;
#endif  // ROCKSDB_LITE
}

#ifndef ROCKSDB_LITE
Status DBImpl::CompactFilesImpl(
    const CompactionOptions& compact_options, ColumnFamilyData* cfd,
    Version* version, const std::vector<std::string>& input_file_names,
564 565
    std::vector<std::string>* const output_file_names, const int output_level,
    int output_path_id, JobContext* job_context, LogBuffer* log_buffer) {
S
Siying Dong 已提交
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
  mutex_.AssertHeld();

  if (shutting_down_.load(std::memory_order_acquire)) {
    return Status::ShutdownInProgress();
  }

  std::unordered_set<uint64_t> input_set;
  for (auto file_name : input_file_names) {
    input_set.insert(TableFileNameToNumber(file_name));
  }

  ColumnFamilyMetaData cf_meta;
  // TODO(yhchiang): can directly use version here if none of the
  // following functions call is pluggable to external developers.
  version->GetColumnFamilyMetaData(&cf_meta);

  if (output_path_id < 0) {
583
    if (cfd->ioptions()->cf_paths.size() == 1U) {
S
Siying Dong 已提交
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
      output_path_id = 0;
    } else {
      return Status::NotSupported(
          "Automatic output path selection is not "
          "yet supported in CompactFiles()");
    }
  }

  Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
      &input_set, cf_meta, output_level);
  if (!s.ok()) {
    return s;
  }

  std::vector<CompactionInputFiles> input_files;
  s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
      &input_files, &input_set, version->storage_info(), compact_options);
  if (!s.ok()) {
    return s;
  }

  for (auto inputs : input_files) {
S
Siying Dong 已提交
606
    if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
S
Siying Dong 已提交
607 608 609 610 611
      return Status::Aborted(
          "Some of the necessary compaction input "
          "files are already being compacted");
    }
  }
612 613 614 615 616 617 618 619 620 621
  bool sfm_reserved_compact_space = false;
  // First check if we have enough room to do the compaction
  bool enough_room = EnoughRoomForCompaction(
      input_files, &sfm_reserved_compact_space, log_buffer);

  if (!enough_room) {
    // m's vars will get set properly at the end of this function,
    // as long as status == CompactionTooLarge
    return Status::CompactionTooLarge();
  }
S
Siying Dong 已提交
622 623 624 625 626 627

  // At this point, CompactFiles will be run.
  bg_compaction_scheduled_++;

  unique_ptr<Compaction> c;
  assert(cfd->compaction_picker());
S
Siying Dong 已提交
628
  c.reset(cfd->compaction_picker()->CompactFiles(
S
Siying Dong 已提交
629 630
      compact_options, input_files, output_level, version->storage_info(),
      *cfd->GetLatestMutableCFOptions(), output_path_id));
631 632 633 634
  // we already sanitized the set of input files and checked for conflicts
  // without releasing the lock, so we're guaranteed a compaction can be formed.
  assert(c != nullptr);

S
Siying Dong 已提交
635 636 637 638 639 640 641 642 643 644 645
  c->SetInputVersion(version);
  // deletion compaction currently not allowed in CompactFiles.
  assert(!c->deletion_compaction());

  SequenceNumber earliest_write_conflict_snapshot;
  std::vector<SequenceNumber> snapshot_seqs =
      snapshots_.GetAll(&earliest_write_conflict_snapshot);

  auto pending_outputs_inserted_elem =
      CaptureCurrentFileNumberInPendingOutputs();

646 647 648 649
  auto snapshot_checker = snapshot_checker_.get();
  if (use_custom_gc_ && snapshot_checker == nullptr) {
    snapshot_checker = DisableGCSnapshotChecker::Instance();
  }
S
Siying Dong 已提交
650 651
  assert(is_snapshot_supported_ || snapshots_.empty());
  CompactionJob compaction_job(
652
      job_context->job_id, c.get(), immutable_db_options_,
653 654
      env_options_for_compaction_, versions_.get(), &shutting_down_,
      preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
655
      GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
656
      &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
657
      snapshot_checker, table_cache_, &event_logger_,
658
      c->mutable_cf_options()->paranoid_file_checks,
S
Siying Dong 已提交
659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
      c->mutable_cf_options()->report_bg_io_stats, dbname_,
      nullptr);  // Here we pass a nullptr for CompactionJobStats because
                 // CompactFiles does not trigger OnCompactionCompleted(),
                 // which is the only place where CompactionJobStats is
                 // returned.  The idea of not triggering OnCompationCompleted()
                 // is that CompactFiles runs in the caller thread, so the user
                 // should always know when it completes.  As a result, it makes
                 // less sense to notify the users something they should already
                 // know.
                 //
                 // In the future, if we would like to add CompactionJobStats
                 // support for CompactFiles, we should have CompactFiles API
                 // pass a pointer of CompactionJobStats as the out-value
                 // instead of using EventListener.

  // Creating a compaction influences the compaction score because the score
  // takes running compactions into account (by skipping files that are already
  // being compacted). Since we just changed compaction score, we recalculate it
  // here.
  version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
                                                  *c->mutable_cf_options());

  compaction_job.Prepare();

  mutex_.Unlock();
  TEST_SYNC_POINT("CompactFilesImpl:0");
  TEST_SYNC_POINT("CompactFilesImpl:1");
  compaction_job.Run();
  TEST_SYNC_POINT("CompactFilesImpl:2");
  TEST_SYNC_POINT("CompactFilesImpl:3");
  mutex_.Lock();

  Status status = compaction_job.Install(*c->mutable_cf_options());
  if (status.ok()) {
693 694
    InstallSuperVersionAndScheduleWork(
        c->column_family_data(), &job_context->superversion_context,
695
        *c->mutable_cf_options(), FlushReason::kManualCompaction);
S
Siying Dong 已提交
696 697
  }
  c->ReleaseCompactionFiles(s);
698 699 700 701 702 703 704 705
#ifndef ROCKSDB_LITE
  // Need to make sure SstFileManager does its bookkeeping
  auto sfm = static_cast<SstFileManagerImpl*>(
      immutable_db_options_.sst_file_manager.get());
  if (sfm && sfm_reserved_compact_space) {
    sfm->OnCompactionCompletion(c.get());
  }
#endif  // ROCKSDB_LITE
S
Siying Dong 已提交
706 707 708 709 710 711 712 713 714 715 716 717

  ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

  if (status.ok()) {
    // Done
  } else if (status.IsShutdownInProgress()) {
    // Ignore compaction errors found during shutting down
  } else {
    ROCKS_LOG_WARN(immutable_db_options_.info_log,
                   "[%s] [JOB %d] Compaction error: %s",
                   c->column_family_data()->GetName().c_str(),
                   job_context->job_id, status.ToString().c_str());
718
    error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
S
Siying Dong 已提交
719 720
  }

721 722
  if (output_file_names != nullptr) {
    for (const auto newf : c->edit()->GetNewFiles()) {
723 724 725 726
      (*output_file_names)
          .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
                                   newf.second.fd.GetNumber(),
                                   newf.second.fd.GetPathId()));
727 728 729
    }
  }

S
Siying Dong 已提交
730 731 732 733 734 735
  c.reset();

  bg_compaction_scheduled_--;
  if (bg_compaction_scheduled_ == 0) {
    bg_cv_.SignalAll();
  }
736
  TEST_SYNC_POINT("CompactFilesImpl:End");
S
Siying Dong 已提交
737 738 739 740 741 742 743 744

  return status;
}
#endif  // ROCKSDB_LITE

Status DBImpl::PauseBackgroundWork() {
  InstrumentedMutexLock guard_lock(&mutex_);
  bg_compaction_paused_++;
745 746
  while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
         bg_flush_scheduled_ > 0) {
S
Siying Dong 已提交
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770
    bg_cv_.Wait();
  }
  bg_work_paused_++;
  return Status::OK();
}

Status DBImpl::ContinueBackgroundWork() {
  InstrumentedMutexLock guard_lock(&mutex_);
  if (bg_work_paused_ == 0) {
    return Status::InvalidArgument();
  }
  assert(bg_work_paused_ > 0);
  assert(bg_compaction_paused_ > 0);
  bg_compaction_paused_--;
  bg_work_paused_--;
  // It's sufficient to check just bg_work_paused_ here since
  // bg_work_paused_ is always no greater than bg_compaction_paused_
  if (bg_work_paused_ == 0) {
    MaybeScheduleFlushOrCompaction();
  }
  return Status::OK();
}

void DBImpl::NotifyOnCompactionCompleted(
771 772
    ColumnFamilyData* cfd, Compaction* c, const Status& st,
    const CompactionJobStats& compaction_job_stats, const int job_id) {
S
Siying Dong 已提交
773 774 775 776 777 778 779 780
#ifndef ROCKSDB_LITE
  if (immutable_db_options_.listeners.size() == 0U) {
    return;
  }
  mutex_.AssertHeld();
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }
781 782
  Version* current = cfd->current();
  current->Ref();
S
Siying Dong 已提交
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
  // release lock while notifying events
  mutex_.Unlock();
  TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
  {
    CompactionJobInfo info;
    info.cf_name = cfd->GetName();
    info.status = st;
    info.thread_id = env_->GetThreadID();
    info.job_id = job_id;
    info.base_input_level = c->start_level();
    info.output_level = c->output_level();
    info.stats = compaction_job_stats;
    info.table_properties = c->GetOutputTableProperties();
    info.compaction_reason = c->compaction_reason();
    info.compression = c->output_compression();
    for (size_t i = 0; i < c->num_input_levels(); ++i) {
      for (const auto fmd : *c->inputs(i)) {
800
        auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
S
Siying Dong 已提交
801 802 803 804
                                fmd->fd.GetNumber(), fmd->fd.GetPathId());
        info.input_files.push_back(fn);
        if (info.table_properties.count(fn) == 0) {
          std::shared_ptr<const TableProperties> tp;
805
          auto s = current->GetTableProperties(&tp, fmd, &fn);
S
Siying Dong 已提交
806 807 808 809 810 811 812
          if (s.ok()) {
            info.table_properties[fn] = tp;
          }
        }
      }
    }
    for (const auto newf : c->edit()->GetNewFiles()) {
813
      info.output_files.push_back(TableFileName(
814 815
          c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(),
          newf.second.fd.GetPathId()));
S
Siying Dong 已提交
816 817 818 819 820 821
    }
    for (auto listener : immutable_db_options_.listeners) {
      listener->OnCompactionCompleted(this, info);
    }
  }
  mutex_.Lock();
822
  current->Unref();
S
Siying Dong 已提交
823 824
  // no need to signal bg_cv_ as it will be signaled at the end of the
  // flush process.
825 826 827 828 829 830
#else
  (void)cfd;
  (void)c;
  (void)st;
  (void)compaction_job_stats;
  (void)job_id;
S
Siying Dong 已提交
831 832 833 834 835 836 837 838 839 840 841
#endif  // ROCKSDB_LITE
}

// REQUIREMENT: block all background work by calling PauseBackgroundWork()
// before calling this function
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
  assert(level < cfd->NumberLevels());
  if (target_level >= cfd->NumberLevels()) {
    return Status::InvalidArgument("Target level exceeds number of levels");
  }

842
  SuperVersionContext sv_context(/* create_superversion */ true);
S
Siying Dong 已提交
843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896

  Status status;

  InstrumentedMutexLock guard_lock(&mutex_);

  // only allow one thread refitting
  if (refitting_level_) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[ReFitLevel] another thread is refitting");
    return Status::NotSupported("another thread is refitting");
  }
  refitting_level_ = true;

  const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  // move to a smaller level
  int to_level = target_level;
  if (target_level < 0) {
    to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
  }

  auto* vstorage = cfd->current()->storage_info();
  if (to_level > level) {
    if (level == 0) {
      return Status::NotSupported(
          "Cannot change from level 0 to other levels.");
    }
    // Check levels are empty for a trivial move
    for (int l = level + 1; l <= to_level; l++) {
      if (vstorage->NumLevelFiles(l) > 0) {
        return Status::NotSupported(
            "Levels between source and target are not empty for a move.");
      }
    }
  }
  if (to_level != level) {
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                    "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
                    cfd->current()->DebugString().data());

    VersionEdit edit;
    edit.SetColumnFamily(cfd->GetID());
    for (const auto& f : vstorage->LevelFiles(level)) {
      edit.DeleteFile(level, f->fd.GetNumber());
      edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
                   f->fd.GetFileSize(), f->smallest, f->largest,
                   f->smallest_seqno, f->largest_seqno,
                   f->marked_for_compaction);
    }
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                    "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
                    edit.DebugString().data());

    status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
                                    directories_.GetDbDir());
897
    InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
S
Siying Dong 已提交
898 899 900 901 902 903 904 905 906 907 908

    ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
                    cfd->GetName().c_str(), status.ToString().data());

    if (status.ok()) {
      ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                      "[%s] After refitting:\n%s", cfd->GetName().c_str(),
                      cfd->current()->DebugString().data());
    }
  }

909
  sv_context.Clean();
S
Siying Dong 已提交
910 911 912 913 914 915 916 917 918 919
  refitting_level_ = false;

  return status;
}

int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  return cfh->cfd()->NumberLevels();
}

A
Andrew Kryczka 已提交
920
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
S
Siying Dong 已提交
921 922 923 924 925 926
  return 0;
}

int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
  InstrumentedMutexLock l(&mutex_);
927 928 929
  return cfh->cfd()
      ->GetSuperVersion()
      ->mutable_cf_options.level0_stop_writes_trigger;
S
Siying Dong 已提交
930 931 932 933 934
}

Status DBImpl::Flush(const FlushOptions& flush_options,
                     ColumnFamilyHandle* column_family) {
  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
935 936
  ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
                 cfh->GetName().c_str());
937
  Status s =
Z
Zhongyi Xie 已提交
938
      FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
939 940 941 942
  ROCKS_LOG_INFO(immutable_db_options_.info_log,
                 "[%s] Manual flush finished, status: %s\n",
                 cfh->GetName().c_str(), s.ToString().c_str());
  return s;
S
Siying Dong 已提交
943 944 945 946
}

Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
                                   int output_level, uint32_t output_path_id,
947
                                   uint32_t max_subcompactions,
S
Siying Dong 已提交
948 949 950 951 952 953 954 955 956 957
                                   const Slice* begin, const Slice* end,
                                   bool exclusive, bool disallow_trivial_move) {
  assert(input_level == ColumnFamilyData::kCompactAllLevels ||
         input_level >= 0);

  InternalKey begin_storage, end_storage;
  CompactionArg* ca;

  bool scheduled = false;
  bool manual_conflict = false;
958
  ManualCompactionState manual;
S
Siying Dong 已提交
959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974
  manual.cfd = cfd;
  manual.input_level = input_level;
  manual.output_level = output_level;
  manual.output_path_id = output_path_id;
  manual.done = false;
  manual.in_progress = false;
  manual.incomplete = false;
  manual.exclusive = exclusive;
  manual.disallow_trivial_move = disallow_trivial_move;
  // For universal compaction, we enforce every manual compaction to compact
  // all files.
  if (begin == nullptr ||
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
    manual.begin = nullptr;
  } else {
A
Amy Xu 已提交
975
    begin_storage.SetMinPossibleForUserKey(*begin);
S
Siying Dong 已提交
976 977 978 979 980 981 982
    manual.begin = &begin_storage;
  }
  if (end == nullptr ||
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
    manual.end = nullptr;
  } else {
A
Amy Xu 已提交
983
    end_storage.SetMaxPossibleForUserKey(*end);
S
Siying Dong 已提交
984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
    manual.end = &end_storage;
  }

  TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
  TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
  InstrumentedMutexLock l(&mutex_);

  // When a manual compaction arrives, temporarily disable scheduling of
  // non-manual compactions and wait until the number of scheduled compaction
  // jobs drops to zero. This is needed to ensure that this manual compaction
  // can compact any range of keys/files.
  //
  // HasPendingManualCompaction() is true when at least one thread is inside
  // RunManualCompaction(), i.e. during that time no other compaction will
  // get scheduled (see MaybeScheduleFlushOrCompaction).
  //
  // Note that the following loop doesn't stop more that one thread calling
  // RunManualCompaction() from getting to the second while loop below.
  // However, only one of them will actually schedule compaction, while
  // others will wait on a condition variable until it completes.

  AddManualCompaction(&manual);
  TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
  if (exclusive) {
1008 1009
    while (bg_bottom_compaction_scheduled_ > 0 ||
           bg_compaction_scheduled_ > 0) {
1010
      TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
S
Siying Dong 已提交
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028
      ROCKS_LOG_INFO(
          immutable_db_options_.info_log,
          "[%s] Manual compaction waiting for all other scheduled background "
          "compactions to finish",
          cfd->GetName().c_str());
      bg_cv_.Wait();
    }
  }

  ROCKS_LOG_INFO(immutable_db_options_.info_log,
                 "[%s] Manual compaction starting", cfd->GetName().c_str());

  // We don't check bg_error_ here, because if we get the error in compaction,
  // the compaction will set manual.status to bg_error_ and set manual.done to
  // true.
  while (!manual.done) {
    assert(HasPendingManualCompaction());
    manual_conflict = false;
1029
    Compaction* compaction = nullptr;
S
Siying Dong 已提交
1030 1031
    if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
        scheduled ||
1032
        (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1033 1034
         ((compaction = manual.cfd->CompactRange(
               *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
1035 1036 1037
               manual.output_level, manual.output_path_id, max_subcompactions,
               manual.begin, manual.end, &manual.manual_end,
               &manual_conflict)) == nullptr &&
1038
          manual_conflict))) {
S
Siying Dong 已提交
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
      // exclusive manual compactions should not see a conflict during
      // CompactRange
      assert(!exclusive || !manual_conflict);
      // Running either this or some other manual compaction
      bg_cv_.Wait();
      if (scheduled && manual.incomplete == true) {
        assert(!manual.in_progress);
        scheduled = false;
        manual.incomplete = false;
      }
    } else if (!scheduled) {
1050
      if (compaction == nullptr) {
S
Siying Dong 已提交
1051 1052 1053 1054 1055 1056
        manual.done = true;
        bg_cv_.SignalAll();
        continue;
      }
      ca = new CompactionArg;
      ca->db = this;
1057 1058 1059
      ca->prepicked_compaction = new PrepickedCompaction;
      ca->prepicked_compaction->manual_compaction_state = &manual;
      ca->prepicked_compaction->compaction = compaction;
S
Siying Dong 已提交
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076
      manual.incomplete = false;
      bg_compaction_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
                     &DBImpl::UnscheduleCallback);
      scheduled = true;
    }
  }

  assert(!manual.in_progress);
  assert(HasPendingManualCompaction());
  RemoveManualCompaction(&manual);
  bg_cv_.SignalAll();
  return manual.status;
}

Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                             const FlushOptions& flush_options,
1077
                             FlushReason flush_reason, bool writes_stopped) {
S
Siying Dong 已提交
1078
  Status s;
1079
  uint64_t flush_memtable_id = 0;
S
Siying Dong 已提交
1080 1081 1082 1083
  {
    WriteContext context;
    InstrumentedMutexLock guard_lock(&mutex_);

1084 1085
    if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
        cached_recoverable_state_empty_.load()) {
S
Siying Dong 已提交
1086 1087 1088 1089 1090 1091 1092 1093 1094
      // Nothing to flush
      return Status::OK();
    }

    WriteThread::Writer w;
    if (!writes_stopped) {
      write_thread_.EnterUnbatched(&w, &mutex_);
    }

1095
    // SwitchMemtable() will release and reacquire mutex during execution
S
Siying Dong 已提交
1096
    s = SwitchMemtable(cfd, &context);
1097
    flush_memtable_id = cfd->imm()->GetLatestMemTableID();
S
Siying Dong 已提交
1098 1099 1100 1101 1102 1103 1104 1105

    if (!writes_stopped) {
      write_thread_.ExitUnbatched(&w);
    }

    cfd->imm()->FlushRequested();

    // schedule flush
1106
    SchedulePendingFlush(cfd, flush_reason);
S
Siying Dong 已提交
1107 1108 1109 1110 1111
    MaybeScheduleFlushOrCompaction();
  }

  if (s.ok() && flush_options.wait) {
    // Wait until the compaction completes
1112
    s = WaitForFlushMemTable(cfd, &flush_memtable_id);
S
Siying Dong 已提交
1113
  }
1114
  TEST_SYNC_POINT("FlushMemTableFinished");
S
Siying Dong 已提交
1115 1116 1117
  return s;
}

1118 1119
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
                                    const uint64_t* flush_memtable_id) {
S
Siying Dong 已提交
1120 1121 1122
  Status s;
  // Wait until the compaction completes
  InstrumentedMutexLock l(&mutex_);
1123
  while (cfd->imm()->NumNotFlushed() > 0 && !error_handler_.IsDBStopped() &&
1124 1125
         (flush_memtable_id == nullptr ||
          cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) {
S
Siying Dong 已提交
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136
    if (shutting_down_.load(std::memory_order_acquire)) {
      return Status::ShutdownInProgress();
    }
    if (cfd->IsDropped()) {
      // FlushJob cannot flush a dropped CF, if we did not break here
      // we will loop forever since cfd->imm()->NumNotFlushed() will never
      // drop to zero
      return Status::InvalidArgument("Cannot flush a dropped CF");
    }
    bg_cv_.Wait();
  }
1137 1138
  if (error_handler_.IsDBStopped()) {
    s = error_handler_.GetBGError();
S
Siying Dong 已提交
1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
  }
  return s;
}

Status DBImpl::EnableAutoCompaction(
    const std::vector<ColumnFamilyHandle*>& column_family_handles) {
  Status s;
  for (auto cf_ptr : column_family_handles) {
    Status status =
        this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
    if (!status.ok()) {
      s = status;
    }
  }

  return s;
}

void DBImpl::MaybeScheduleFlushOrCompaction() {
  mutex_.AssertHeld();
  if (!opened_successfully_) {
    // Compaction may introduce data race to DB open
    return;
  }
  if (bg_work_paused_ > 0) {
    // we paused the background work
    return;
  } else if (shutting_down_.load(std::memory_order_acquire)) {
    // DB is being deleted; no more background compactions
    return;
  }
1170
  auto bg_job_limits = GetBGJobLimits();
1171
  bool is_flush_pool_empty =
1172
      env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
1173
  while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
1174
         bg_flush_scheduled_ < bg_job_limits.max_flushes) {
S
Siying Dong 已提交
1175 1176 1177 1178 1179
    unscheduled_flushes_--;
    bg_flush_scheduled_++;
    env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
  }

1180 1181 1182
  // special case -- if high-pri (flush) thread pool is empty, then schedule
  // flushes in low-pri (compaction) thread pool.
  if (is_flush_pool_empty) {
S
Siying Dong 已提交
1183 1184
    while (unscheduled_flushes_ > 0 &&
           bg_flush_scheduled_ + bg_compaction_scheduled_ <
1185
               bg_job_limits.max_flushes) {
S
Siying Dong 已提交
1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199
      unscheduled_flushes_--;
      bg_flush_scheduled_++;
      env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
    }
  }

  if (bg_compaction_paused_ > 0) {
    // we paused the background compaction
    return;
  }

  if (HasExclusiveManualCompaction()) {
    // only manual compactions are allowed to run. don't schedule automatic
    // compactions
1200
    TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
S
Siying Dong 已提交
1201 1202 1203
    return;
  }

1204
  while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
S
Siying Dong 已提交
1205 1206 1207
         unscheduled_compactions_ > 0) {
    CompactionArg* ca = new CompactionArg;
    ca->db = this;
1208
    ca->prepicked_compaction = nullptr;
S
Siying Dong 已提交
1209 1210 1211 1212 1213 1214 1215
    bg_compaction_scheduled_++;
    unscheduled_compactions_--;
    env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
                   &DBImpl::UnscheduleCallback);
  }
}

1216
DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
S
Siying Dong 已提交
1217
  mutex_.AssertHeld();
1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233
  return GetBGJobLimits(immutable_db_options_.max_background_flushes,
                        mutable_db_options_.max_background_compactions,
                        mutable_db_options_.max_background_jobs,
                        write_controller_.NeedSpeedupCompaction());
}

DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
                                           int max_background_compactions,
                                           int max_background_jobs,
                                           bool parallelize_compactions) {
  BGJobLimits res;
  if (max_background_flushes == -1 && max_background_compactions == -1) {
    // for our first stab implementing max_background_jobs, simply allocate a
    // quarter of the threads to flushes.
    res.max_flushes = std::max(1, max_background_jobs / 4);
    res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
S
Siying Dong 已提交
1234
  } else {
1235 1236 1237 1238 1239 1240 1241 1242
    // compatibility code in case users haven't migrated to max_background_jobs,
    // which automatically computes flush/compaction limits
    res.max_flushes = std::max(1, max_background_flushes);
    res.max_compactions = std::max(1, max_background_compactions);
  }
  if (!parallelize_compactions) {
    // throttle background compactions until we deem necessary
    res.max_compactions = 1;
S
Siying Dong 已提交
1243
  }
1244
  return res;
S
Siying Dong 已提交
1245 1246 1247
}

void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
1248
  assert(!cfd->queued_for_compaction());
S
Siying Dong 已提交
1249 1250
  cfd->Ref();
  compaction_queue_.push_back(cfd);
1251
  cfd->set_queued_for_compaction(true);
S
Siying Dong 已提交
1252 1253 1254 1255 1256 1257
}

ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
  assert(!compaction_queue_.empty());
  auto cfd = *compaction_queue_.begin();
  compaction_queue_.pop_front();
1258 1259
  assert(cfd->queued_for_compaction());
  cfd->set_queued_for_compaction(false);
S
Siying Dong 已提交
1260 1261 1262
  return cfd;
}

1263
void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd, FlushReason flush_reason) {
1264
  assert(!cfd->queued_for_flush());
S
Siying Dong 已提交
1265 1266
  cfd->Ref();
  flush_queue_.push_back(cfd);
1267
  cfd->set_queued_for_flush(true);
1268
  cfd->SetFlushReason(flush_reason);
S
Siying Dong 已提交
1269 1270 1271 1272 1273 1274
}

ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
  assert(!flush_queue_.empty());
  auto cfd = *flush_queue_.begin();
  flush_queue_.pop_front();
1275 1276
  assert(cfd->queued_for_flush());
  cfd->set_queued_for_flush(false);
1277
  // TODO: need to unset flush reason?
S
Siying Dong 已提交
1278 1279 1280
  return cfd;
}

1281 1282
void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd,
                                  FlushReason flush_reason) {
1283
  if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
1284
    AddToFlushQueue(cfd, flush_reason);
S
Siying Dong 已提交
1285 1286 1287 1288 1289
    ++unscheduled_flushes_;
  }
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
1290
  if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
S
Siying Dong 已提交
1291 1292 1293 1294 1295
    AddToCompactionQueue(cfd);
    ++unscheduled_compactions_;
  }
}

1296 1297
void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
                                  FileType type, uint64_t number, int job_id) {
S
Siying Dong 已提交
1298
  mutex_.AssertHeld();
1299
  PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
S
Siying Dong 已提交
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
  purge_queue_.push_back(std::move(file_info));
}

void DBImpl::BGWorkFlush(void* db) {
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
  TEST_SYNC_POINT("DBImpl::BGWorkFlush");
  reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
  TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
}

void DBImpl::BGWorkCompaction(void* arg) {
  CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
  delete reinterpret_cast<CompactionArg*>(arg);
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
  TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331
  auto prepicked_compaction =
      static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
  reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
      prepicked_compaction, Env::Priority::LOW);
  delete prepicked_compaction;
}

void DBImpl::BGWorkBottomCompaction(void* arg) {
  CompactionArg ca = *(static_cast<CompactionArg*>(arg));
  delete static_cast<CompactionArg*>(arg);
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
  TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
  auto* prepicked_compaction = ca.prepicked_compaction;
  assert(prepicked_compaction && prepicked_compaction->compaction &&
         !prepicked_compaction->manual_compaction_state);
  ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
  delete prepicked_compaction;
S
Siying Dong 已提交
1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343
}

void DBImpl::BGWorkPurge(void* db) {
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
  TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
  reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
  TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
}

void DBImpl::UnscheduleCallback(void* arg) {
  CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
  delete reinterpret_cast<CompactionArg*>(arg);
1344 1345 1346 1347 1348
  if (ca.prepicked_compaction != nullptr) {
    if (ca.prepicked_compaction->compaction != nullptr) {
      delete ca.prepicked_compaction->compaction;
    }
    delete ca.prepicked_compaction;
S
Siying Dong 已提交
1349 1350 1351 1352 1353 1354 1355 1356
  }
  TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
}

Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
                               LogBuffer* log_buffer) {
  mutex_.AssertHeld();

1357 1358 1359 1360 1361 1362 1363
  Status status;
  if (!error_handler_.IsBGWorkStopped()) {
    if (shutting_down_.load(std::memory_order_acquire)) {
      status = Status::ShutdownInProgress();
    }
  } else {
    status = error_handler_.GetBGError();
S
Siying Dong 已提交
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390
  }

  if (!status.ok()) {
    return status;
  }

  ColumnFamilyData* cfd = nullptr;
  while (!flush_queue_.empty()) {
    // This cfd is already referenced
    auto first_cfd = PopFirstFromFlushQueue();

    if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
      // can't flush this CF, try next one
      if (first_cfd->Unref()) {
        delete first_cfd;
      }
      continue;
    }

    // found a flush!
    cfd = first_cfd;
    break;
  }

  if (cfd != nullptr) {
    const MutableCFOptions mutable_cf_options =
        *cfd->GetLatestMutableCFOptions();
1391
    auto bg_job_limits = GetBGJobLimits();
S
Siying Dong 已提交
1392 1393 1394
    ROCKS_LOG_BUFFER(
        log_buffer,
        "Calling FlushMemTableToOutputFile with column "
1395 1396 1397 1398 1399
        "family [%s], flush slots available %d, compaction slots available %d, "
        "flush slots scheduled %d, compaction slots scheduled %d",
        cfd->GetName().c_str(), bg_job_limits.max_flushes,
        bg_job_limits.max_compactions, bg_flush_scheduled_,
        bg_compaction_scheduled_);
S
Siying Dong 已提交
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418
    status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
                                       job_context, log_buffer);
    if (cfd->Unref()) {
      delete cfd;
    }
  }
  return status;
}

void DBImpl::BackgroundCallFlush() {
  bool made_progress = false;
  JobContext job_context(next_job_id_.fetch_add(1), true);

  TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");

  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
                       immutable_db_options_.info_log.get());
  {
    InstrumentedMutexLock l(&mutex_);
A
Andrew Kryczka 已提交
1419
    assert(bg_flush_scheduled_);
S
Siying Dong 已提交
1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431
    num_running_flushes_++;

    auto pending_outputs_inserted_elem =
        CaptureCurrentFileNumberInPendingOutputs();

    Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer);
    if (!s.ok() && !s.IsShutdownInProgress()) {
      // Wait a little bit before retrying background flush in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed flushes for the duration of
      // the problem.
      uint64_t error_cnt =
1432
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
S
Siying Dong 已提交
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      mutex_.Unlock();
      ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                      "Waiting after background flush error: %s"
                      "Accumulated background error counts: %" PRIu64,
                      s.ToString().c_str(), error_cnt);
      log_buffer.FlushBufferToLog();
      LogFlush(immutable_db_options_.info_log);
      env_->SleepForMicroseconds(1000000);
      mutex_.Lock();
    }

    ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

    // If flush failed, we want to delete all temporary files that we might have
    // created. Thus, we force full scan in FindObsoleteFiles()
    FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
    // delete unnecessary files if any, this is done outside the mutex
1451 1452
    if (job_context.HaveSomethingToClean() ||
        job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
S
Siying Dong 已提交
1453
      mutex_.Unlock();
1454
      TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
S
Siying Dong 已提交
1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
      // Have to flush the info logs before bg_flush_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
      log_buffer.FlushBufferToLog();
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
      }
      job_context.Clean();
      mutex_.Lock();
    }

    assert(num_running_flushes_ > 0);
    num_running_flushes_--;
    bg_flush_scheduled_--;
    // See if there's more work to be done
    MaybeScheduleFlushOrCompaction();
    bg_cv_.SignalAll();
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
  }
}

1481 1482
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
                                      Env::Priority bg_thread_pri) {
S
Siying Dong 已提交
1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
  bool made_progress = false;
  JobContext job_context(next_job_id_.fetch_add(1), true);
  TEST_SYNC_POINT("BackgroundCallCompaction:0");
  MaybeDumpStats();
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
                       immutable_db_options_.info_log.get());
  {
    InstrumentedMutexLock l(&mutex_);

    // This call will unlock/lock the mutex to wait for current running
    // IngestExternalFile() calls to finish.
    WaitForIngestFile();

    num_running_compactions_++;

    auto pending_outputs_inserted_elem =
        CaptureCurrentFileNumberInPendingOutputs();

1501 1502 1503 1504 1505
    assert((bg_thread_pri == Env::Priority::BOTTOM &&
            bg_bottom_compaction_scheduled_) ||
           (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
    Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
                                    prepicked_compaction);
S
Siying Dong 已提交
1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531
    TEST_SYNC_POINT("BackgroundCallCompaction:1");
    if (!s.ok() && !s.IsShutdownInProgress()) {
      // Wait a little bit before retrying background compaction in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed compactions for the duration of
      // the problem.
      uint64_t error_cnt =
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      mutex_.Unlock();
      log_buffer.FlushBufferToLog();
      ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                      "Waiting after background compaction error: %s, "
                      "Accumulated background error counts: %" PRIu64,
                      s.ToString().c_str(), error_cnt);
      LogFlush(immutable_db_options_.info_log);
      env_->SleepForMicroseconds(1000000);
      mutex_.Lock();
    }

    ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

    // If compaction failed, we want to delete all temporary files that we might
    // have created (they might not be all recorded in job_context in case of a
    // failure). Thus, we force full scan in FindObsoleteFiles()
    FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
1532
    TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
S
Siying Dong 已提交
1533 1534

    // delete unnecessary files if any, this is done outside the mutex
1535 1536
    if (job_context.HaveSomethingToClean() ||
        job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
S
Siying Dong 已提交
1537 1538 1539 1540 1541 1542 1543 1544 1545
      mutex_.Unlock();
      // Have to flush the info logs before bg_compaction_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
      log_buffer.FlushBufferToLog();
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
1546
        TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
S
Siying Dong 已提交
1547 1548 1549 1550 1551 1552 1553
      }
      job_context.Clean();
      mutex_.Lock();
    }

    assert(num_running_compactions_ > 0);
    num_running_compactions_--;
1554 1555 1556 1557 1558 1559
    if (bg_thread_pri == Env::Priority::LOW) {
      bg_compaction_scheduled_--;
    } else {
      assert(bg_thread_pri == Env::Priority::BOTTOM);
      bg_bottom_compaction_scheduled_--;
    }
S
Siying Dong 已提交
1560 1561 1562 1563 1564

    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();

    // See if there's more work to be done
    MaybeScheduleFlushOrCompaction();
1565 1566 1567
    if (made_progress ||
        (bg_compaction_scheduled_ == 0 &&
         bg_bottom_compaction_scheduled_ == 0) ||
1568
        HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
S
Siying Dong 已提交
1569 1570
      // signal if
      // * made_progress -- need to wakeup DelayWrite
1571
      // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
S
Siying Dong 已提交
1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585
      // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
      // If none of this is true, there is no need to signal since nobody is
      // waiting for it
      bg_cv_.SignalAll();
    }
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
  }
}

Status DBImpl::BackgroundCompaction(bool* made_progress,
                                    JobContext* job_context,
1586 1587 1588 1589 1590 1591
                                    LogBuffer* log_buffer,
                                    PrepickedCompaction* prepicked_compaction) {
  ManualCompactionState* manual_compaction =
      prepicked_compaction == nullptr
          ? nullptr
          : prepicked_compaction->manual_compaction_state;
S
Siying Dong 已提交
1592 1593 1594 1595 1596
  *made_progress = false;
  mutex_.AssertHeld();
  TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");

  bool is_manual = (manual_compaction != nullptr);
1597 1598 1599 1600 1601 1602
  unique_ptr<Compaction> c;
  if (prepicked_compaction != nullptr &&
      prepicked_compaction->compaction != nullptr) {
    c.reset(prepicked_compaction->compaction);
  }
  bool is_prepicked = is_manual || c;
S
Siying Dong 已提交
1603 1604 1605 1606 1607 1608

  // (manual_compaction->in_progress == false);
  bool trivial_move_disallowed =
      is_manual && manual_compaction->disallow_trivial_move;

  CompactionJobStats compaction_job_stats;
1609 1610 1611 1612 1613 1614 1615
  Status status;
  if (!error_handler_.IsBGWorkStopped()) {
    if (shutting_down_.load(std::memory_order_acquire)) {
      status = Status::ShutdownInProgress();
    }
  } else {
    status = error_handler_.GetBGError();
S
Siying Dong 已提交
1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634
  }

  if (!status.ok()) {
    if (is_manual) {
      manual_compaction->status = status;
      manual_compaction->done = true;
      manual_compaction->in_progress = false;
      manual_compaction = nullptr;
    }
    return status;
  }

  if (is_manual) {
    // another thread cannot pick up the same work
    manual_compaction->in_progress = true;
  }

  // InternalKey manual_end_storage;
  // InternalKey* manual_end = &manual_end_storage;
1635
  bool sfm_reserved_compact_space = false;
S
Siying Dong 已提交
1636
  if (is_manual) {
1637
    ManualCompactionState* m = manual_compaction;
S
Siying Dong 已提交
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648
    assert(m->in_progress);
    if (!c) {
      m->done = true;
      m->manual_end = nullptr;
      ROCKS_LOG_BUFFER(log_buffer,
                       "[%s] Manual compaction from level-%d from %s .. "
                       "%s; nothing to do\n",
                       m->cfd->GetName().c_str(), m->input_level,
                       (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
                       (m->end ? m->end->DebugString().c_str() : "(end)"));
    } else {
1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671
      // First check if we have enough room to do the compaction
      bool enough_room = EnoughRoomForCompaction(
          *(c->inputs()), &sfm_reserved_compact_space, log_buffer);

      if (!enough_room) {
        // Then don't do the compaction
        c->ReleaseCompactionFiles(status);
        c.reset();
        // m's vars will get set properly at the end of this function,
        // as long as status == CompactionTooLarge
        status = Status::CompactionTooLarge();
      } else {
        ROCKS_LOG_BUFFER(
            log_buffer,
            "[%s] Manual compaction from level-%d to level-%d from %s .. "
            "%s; will stop at %s\n",
            m->cfd->GetName().c_str(), m->input_level, c->output_level(),
            (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
            (m->end ? m->end->DebugString().c_str() : "(end)"),
            ((m->done || m->manual_end == nullptr)
                 ? "(end)"
                 : m->manual_end->DebugString().c_str()));
      }
S
Siying Dong 已提交
1672
    }
1673
  } else if (!is_prepicked && !compaction_queue_.empty()) {
1674
    if (HasExclusiveManualCompaction()) {
1675 1676 1677
      // Can't compact right now, but try again later
      TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");

Y
yizhu.sun 已提交
1678
      // Stay in the compaction queue.
1679 1680 1681 1682 1683
      unscheduled_compactions_++;

      return Status::OK();
    }

S
Siying Dong 已提交
1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
    // cfd is referenced here
    auto cfd = PopFirstFromCompactionQueue();
    // We unreference here because the following code will take a Ref() on
    // this cfd if it is going to use it (Compaction class holds a
    // reference).
    // This will all happen under a mutex so we don't have to be afraid of
    // somebody else deleting it.
    if (cfd->Unref()) {
      delete cfd;
      // This was the last reference of the column family, so no need to
      // compact.
      return Status::OK();
    }

    // Pick up latest mutable CF Options and use it throughout the
    // compaction job
    // Compaction makes a copy of the latest MutableCFOptions. It should be used
    // throughout the compaction procedure to make sure consistency. It will
    // eventually be installed into SuperVersion
    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
    if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
      // NOTE: try to avoid unnecessary copy of MutableCFOptions if
      // compaction is not necessary. Need to make sure mutex is held
      // until we make a copy in the following code
      TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
      c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
      TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
1711

S
Siying Dong 已提交
1712
      if (c != nullptr) {
1713 1714 1715
        bool enough_room = EnoughRoomForCompaction(
            *(c->inputs()), &sfm_reserved_compact_space, log_buffer);

1716 1717 1718 1719 1720 1721 1722 1723
        if (!enough_room) {
          // Then don't do the compaction
          c->ReleaseCompactionFiles(status);
          c->column_family_data()
              ->current()
              ->storage_info()
              ->ComputeCompactionScore(*(c->immutable_cf_options()),
                                       *(c->mutable_cf_options()));
S
Siying Dong 已提交
1724 1725
          AddToCompactionQueue(cfd);
          ++unscheduled_compactions_;
1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752

          c.reset();
          // Don't need to sleep here, because BackgroundCallCompaction
          // will sleep if !s.ok()
          status = Status::CompactionTooLarge();
        } else {
          // update statistics
          MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
                      c->inputs(0)->size());
          // There are three things that can change compaction score:
          // 1) When flush or compaction finish. This case is covered by
          // InstallSuperVersionAndScheduleWork
          // 2) When MutableCFOptions changes. This case is also covered by
          // InstallSuperVersionAndScheduleWork, because this is when the new
          // options take effect.
          // 3) When we Pick a new compaction, we "remove" those files being
          // compacted from the calculation, which then influences compaction
          // score. Here we check if we need the new compaction even without the
          // files that are currently being compacted. If we need another
          // compaction, we might be able to execute it in parallel, so we add
          // it to the queue and schedule a new thread.
          if (cfd->NeedsCompaction()) {
            // Yes, we need more compactions!
            AddToCompactionQueue(cfd);
            ++unscheduled_compactions_;
            MaybeScheduleFlushOrCompaction();
          }
S
Siying Dong 已提交
1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776
        }
      }
    }
  }

  if (!c) {
    // Nothing to do
    ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
  } else if (c->deletion_compaction()) {
    // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
    // file if there is alive snapshot pointing to it
    assert(c->num_input_files(1) == 0);
    assert(c->level() == 0);
    assert(c->column_family_data()->ioptions()->compaction_style ==
           kCompactionStyleFIFO);

    compaction_job_stats.num_input_files = c->num_input_files(0);

    for (const auto& f : *c->inputs(0)) {
      c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
    }
    status = versions_->LogAndApply(c->column_family_data(),
                                    *c->mutable_cf_options(), c->edit(),
                                    &mutex_, directories_.GetDbDir());
1777 1778
    InstallSuperVersionAndScheduleWork(
        c->column_family_data(), &job_context->superversion_context,
Z
Zhongyi Xie 已提交
1779
        *c->mutable_cf_options(), FlushReason::kAutoCompaction);
S
Siying Dong 已提交
1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809
    ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
                     c->column_family_data()->GetName().c_str(),
                     c->num_input_files(0));
    *made_progress = true;
  } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
    // Instrument for event update
    // TODO(yhchiang): add op details for showing trivial-move.
    ThreadStatusUtil::SetColumnFamily(
        c->column_family_data(), c->column_family_data()->ioptions()->env,
        immutable_db_options_.enable_thread_tracking);
    ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);

    compaction_job_stats.num_input_files = c->num_input_files(0);

    // Move files to next level
    int32_t moved_files = 0;
    int64_t moved_bytes = 0;
    for (unsigned int l = 0; l < c->num_input_levels(); l++) {
      if (c->level(l) == c->output_level()) {
        continue;
      }
      for (size_t i = 0; i < c->num_input_files(l); i++) {
        FileMetaData* f = c->input(l, i);
        c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
        c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
                           f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
                           f->largest, f->smallest_seqno, f->largest_seqno,
                           f->marked_for_compaction);

1810 1811 1812 1813 1814
        ROCKS_LOG_BUFFER(
            log_buffer,
            "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
            c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
            c->output_level(), f->fd.GetFileSize());
S
Siying Dong 已提交
1815 1816 1817 1818 1819 1820 1821 1822 1823
        ++moved_files;
        moved_bytes += f->fd.GetFileSize();
      }
    }

    status = versions_->LogAndApply(c->column_family_data(),
                                    *c->mutable_cf_options(), c->edit(),
                                    &mutex_, directories_.GetDbDir());
    // Use latest MutableCFOptions
1824 1825
    InstallSuperVersionAndScheduleWork(
        c->column_family_data(), &job_context->superversion_context,
Z
Zhongyi Xie 已提交
1826
        *c->mutable_cf_options(), FlushReason::kAutoCompaction);
S
Siying Dong 已提交
1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847

    VersionStorageInfo::LevelSummaryStorage tmp;
    c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
                                                             moved_bytes);
    {
      event_logger_.LogToBuffer(log_buffer)
          << "job" << job_context->job_id << "event"
          << "trivial_move"
          << "destination_level" << c->output_level() << "files" << moved_files
          << "total_files_size" << moved_bytes;
    }
    ROCKS_LOG_BUFFER(
        log_buffer,
        "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
        c->column_family_data()->GetName().c_str(), moved_files,
        c->output_level(), moved_bytes, status.ToString().c_str(),
        c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
    *made_progress = true;

    // Clear Instrument
    ThreadStatusUtil::ResetThreadStatus();
1848
  } else if (!is_prepicked && c->output_level() > 0 &&
1849 1850 1851 1852 1853 1854 1855
             c->output_level() ==
                 c->column_family_data()
                     ->current()
                     ->storage_info()
                     ->MaxOutputLevel(
                         immutable_db_options_.allow_ingest_behind) &&
             env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
1856 1857 1858
    // Forward compactions involving last level to the bottom pool if it exists,
    // such that compactions unlikely to contribute to write stalls can be
    // delayed or deprioritized.
1859 1860 1861 1862 1863 1864 1865 1866 1867
    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
    CompactionArg* ca = new CompactionArg;
    ca->db = this;
    ca->prepicked_compaction = new PrepickedCompaction;
    ca->prepicked_compaction->compaction = c.release();
    ca->prepicked_compaction->manual_compaction_state = nullptr;
    ++bg_bottom_compaction_scheduled_;
    env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
                   this, &DBImpl::UnscheduleCallback);
S
Siying Dong 已提交
1868
  } else {
1869
    int output_level __attribute__((__unused__));
1870
    output_level = c->output_level();
S
Siying Dong 已提交
1871 1872 1873 1874 1875 1876
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
                             &output_level);
    SequenceNumber earliest_write_conflict_snapshot;
    std::vector<SequenceNumber> snapshot_seqs =
        snapshots_.GetAll(&earliest_write_conflict_snapshot);

1877 1878 1879 1880
    auto snapshot_checker = snapshot_checker_.get();
    if (use_custom_gc_ && snapshot_checker == nullptr) {
      snapshot_checker = DisableGCSnapshotChecker::Instance();
    }
S
Siying Dong 已提交
1881 1882
    assert(is_snapshot_supported_ || snapshots_.empty());
    CompactionJob compaction_job(
1883 1884
        job_context->job_id, c.get(), immutable_db_options_,
        env_options_for_compaction_, versions_.get(), &shutting_down_,
1885
        preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
1886
        GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
1887
        &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
1888
        snapshot_checker, table_cache_, &event_logger_,
S
Siying Dong 已提交
1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900
        c->mutable_cf_options()->paranoid_file_checks,
        c->mutable_cf_options()->report_bg_io_stats, dbname_,
        &compaction_job_stats);
    compaction_job.Prepare();

    mutex_.Unlock();
    compaction_job.Run();
    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
    mutex_.Lock();

    status = compaction_job.Install(*c->mutable_cf_options());
    if (status.ok()) {
1901 1902
      InstallSuperVersionAndScheduleWork(
          c->column_family_data(), &job_context->superversion_context,
Z
Zhongyi Xie 已提交
1903
          *c->mutable_cf_options(), FlushReason::kAutoCompaction);
S
Siying Dong 已提交
1904 1905 1906 1907 1908 1909
    }
    *made_progress = true;
  }
  if (c != nullptr) {
    c->ReleaseCompactionFiles(status);
    *made_progress = true;
1910 1911 1912 1913 1914

#ifndef ROCKSDB_LITE
    // Need to make sure SstFileManager does its bookkeeping
    auto sfm = static_cast<SstFileManagerImpl*>(
        immutable_db_options_.sst_file_manager.get());
1915
    if (sfm && sfm_reserved_compact_space) {
1916 1917 1918 1919
      sfm->OnCompactionCompletion(c.get());
    }
#endif  // ROCKSDB_LITE

1920 1921
    NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
                                compaction_job_stats, job_context->job_id);
S
Siying Dong 已提交
1922 1923 1924 1925
  }
  // this will unref its input_version and column_family_data
  c.reset();

1926
  if (status.ok() || status.IsCompactionTooLarge()) {
S
Siying Dong 已提交
1927 1928 1929 1930 1931 1932
    // Done
  } else if (status.IsShutdownInProgress()) {
    // Ignore compaction errors found during shutting down
  } else {
    ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
                   status.ToString().c_str());
1933
    error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
S
Siying Dong 已提交
1934 1935 1936
  }

  if (is_manual) {
1937
    ManualCompactionState* m = manual_compaction;
S
Siying Dong 已提交
1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969
    if (!status.ok()) {
      m->status = status;
      m->done = true;
    }
    // For universal compaction:
    //   Because universal compaction always happens at level 0, so one
    //   compaction will pick up all overlapped files. No files will be
    //   filtered out due to size limit and left for a successive compaction.
    //   So we can safely conclude the current compaction.
    //
    //   Also note that, if we don't stop here, then the current compaction
    //   writes a new file back to level 0, which will be used in successive
    //   compaction. Hence the manual compaction will never finish.
    //
    // Stop the compaction if manual_end points to nullptr -- this means
    // that we compacted the whole range. manual_end should always point
    // to nullptr in case of universal compaction
    if (m->manual_end == nullptr) {
      m->done = true;
    }
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
      // Universal and FIFO compactions should always compact the whole range
      assert(m->cfd->ioptions()->compaction_style !=
                 kCompactionStyleUniversal ||
             m->cfd->ioptions()->num_levels > 1);
      assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
      m->tmp_storage = *m->manual_end;
      m->begin = &m->tmp_storage;
      m->incomplete = true;
    }
1970
    m->in_progress = false;  // not being processed anymore
S
Siying Dong 已提交
1971 1972 1973 1974 1975 1976 1977 1978 1979
  }
  TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
  return status;
}

bool DBImpl::HasPendingManualCompaction() {
  return (!manual_compaction_dequeue_.empty());
}

1980
void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
S
Siying Dong 已提交
1981 1982 1983
  manual_compaction_dequeue_.push_back(m);
}

1984
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
S
Siying Dong 已提交
1985
  // Remove from queue
1986
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998
      manual_compaction_dequeue_.begin();
  while (it != manual_compaction_dequeue_.end()) {
    if (m == (*it)) {
      it = manual_compaction_dequeue_.erase(it);
      return;
    }
    it++;
  }
  assert(false);
  return;
}

1999
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
S
Siying Dong 已提交
2000 2001 2002 2003 2004 2005
  if (num_running_ingest_file_ > 0) {
    // We need to wait for other IngestExternalFile() calls to finish
    // before running a manual compaction.
    return true;
  }
  if (m->exclusive) {
2006 2007
    return (bg_bottom_compaction_scheduled_ > 0 ||
            bg_compaction_scheduled_ > 0);
S
Siying Dong 已提交
2008
  }
2009
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029
      manual_compaction_dequeue_.begin();
  bool seen = false;
  while (it != manual_compaction_dequeue_.end()) {
    if (m == (*it)) {
      it++;
      seen = true;
      continue;
    } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
      // Consider the other manual compaction *it, conflicts if:
      // overlaps with m
      // and (*it) is ahead in the queue and is not yet in progress
      return true;
    }
    it++;
  }
  return false;
}

bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
  // Remove from priority queue
2030
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
2031 2032 2033 2034 2035 2036 2037
      manual_compaction_dequeue_.begin();
  while (it != manual_compaction_dequeue_.end()) {
    if ((*it)->exclusive) {
      return true;
    }
    if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
      // Allow automatic compaction if manual compaction is
H
hyunwoo 已提交
2038
      // in progress
S
Siying Dong 已提交
2039 2040 2041 2042 2043 2044 2045 2046 2047
      return true;
    }
    it++;
  }
  return false;
}

bool DBImpl::HasExclusiveManualCompaction() {
  // Remove from priority queue
2048
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
2049 2050 2051 2052 2053 2054 2055 2056 2057 2058
      manual_compaction_dequeue_.begin();
  while (it != manual_compaction_dequeue_.end()) {
    if ((*it)->exclusive) {
      return true;
    }
    it++;
  }
  return false;
}

2059
bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
S
Siying Dong 已提交
2060 2061 2062 2063 2064 2065 2066 2067 2068
  if ((m->exclusive) || (m1->exclusive)) {
    return true;
  }
  if (m->cfd != m1->cfd) {
    return false;
  }
  return true;
}

2069
// SuperVersionContext gets created and destructed outside of the lock --
Y
Yanqin Jin 已提交
2070
// we use this conveniently to:
S
Siying Dong 已提交
2071 2072 2073 2074
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete SuperVersion()s outside of the lock -- superversions_to_free
//
// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
2075
// same sv_context, we can't reuse the SuperVersion() that got
S
Siying Dong 已提交
2076 2077 2078 2079 2080
// malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free

2081 2082
void DBImpl::InstallSuperVersionAndScheduleWork(
    ColumnFamilyData* cfd, SuperVersionContext* sv_context,
2083
    const MutableCFOptions& mutable_cf_options, FlushReason flush_reason) {
S
Siying Dong 已提交
2084 2085 2086 2087 2088 2089 2090 2091 2092 2093
  mutex_.AssertHeld();

  // Update max_total_in_memory_state_
  size_t old_memtable_size = 0;
  auto* old_sv = cfd->GetSuperVersion();
  if (old_sv) {
    old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
                        old_sv->mutable_cf_options.max_write_buffer_number;
  }

2094 2095
  // this branch is unlikely to step in
  if (UNLIKELY(sv_context->new_superversion == nullptr)) {
2096 2097
    sv_context->NewSuperVersion();
  }
2098
  cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
S
Siying Dong 已提交
2099 2100 2101

  // Whenever we install new SuperVersion, we might need to issue new flushes or
  // compactions.
Z
Zhongyi Xie 已提交
2102
  SchedulePendingFlush(cfd, flush_reason);
S
Siying Dong 已提交
2103 2104 2105 2106
  SchedulePendingCompaction(cfd);
  MaybeScheduleFlushOrCompaction();

  // Update max_total_in_memory_state_
2107 2108 2109
  max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
                               mutable_cf_options.write_buffer_size *
                                   mutable_cf_options.max_write_buffer_number;
S
Siying Dong 已提交
2110
}
Y
Yi Wu 已提交
2111

2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142
// ShouldPurge is called by FindObsoleteFiles when doing a full scan,
// and db mutex (mutex_) should already be held. This function performs a
// linear scan of an vector (files_grabbed_for_purge_) in search of a
// certain element. We expect FindObsoleteFiles with full scan to occur once
// every 10 hours by default, and the size of the vector is small.
// Therefore, the cost is affordable even if the mutex is held.
// Actually, the current implementation of FindObsoleteFiles with
// full_scan=true can issue I/O requests to obtain list of files in
// directories, e.g. env_->getChildren while holding db mutex.
// In the future, if we want to reduce the cost of search, we may try to keep
// the vector sorted.
bool DBImpl::ShouldPurge(uint64_t file_number) const {
  for (auto fn : files_grabbed_for_purge_) {
    if (file_number == fn) {
      return false;
    }
  }
  for (const auto& purge_file_info : purge_queue_) {
    if (purge_file_info.number == file_number) {
      return false;
    }
  }
  return true;
}

// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
// (mutex_) should already be held.
void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
  files_grabbed_for_purge_.emplace_back(file_number);
}

Y
Yi Wu 已提交
2143 2144 2145 2146 2147 2148 2149 2150
void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
  InstrumentedMutexLock l(&mutex_);
  // snapshot_checker_ should only set once. If we need to set it multiple
  // times, we need to make sure the old one is not deleted while it is still
  // using by a compaction job.
  assert(!snapshot_checker_);
  snapshot_checker_.reset(snapshot_checker);
}
S
Siying Dong 已提交
2151
}  // namespace rocksdb