db_impl_compaction_flush.cc 131.3 KB
Newer Older
S
Siying Dong 已提交
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
S
Siying Dong 已提交
5 6 7 8
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#include <cinttypes>
S
Siying Dong 已提交
10 11

#include "db/builder.h"
12
#include "db/db_impl/db_impl.h"
13
#include "db/error_handler.h"
14
#include "db/event_helpers.h"
15
#include "file/sst_file_manager_impl.h"
16 17 18 19
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
20
#include "test_util/sync_point.h"
21
#include "util/cast_util.h"
22
#include "util/concurrent_task_limiter_impl.h"
S
Siying Dong 已提交
23

24
namespace ROCKSDB_NAMESPACE {
25

26
bool DBImpl::EnoughRoomForCompaction(
27
    ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
28 29 30 31 32 33 34
    bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
  // Check if we have enough room to do the compaction
  bool enough_room = true;
#ifndef ROCKSDB_LITE
  auto sfm = static_cast<SstFileManagerImpl*>(
      immutable_db_options_.sst_file_manager.get());
  if (sfm) {
35 36 37
    // Pass the current bg_error_ to SFM so it can decide what checks to
    // perform. If this DB instance hasn't seen any error yet, the SFM can be
    // optimistic and not do disk space checks
38 39 40 41
    Status bg_error = error_handler_.GetBGError();
    enough_room = sfm->EnoughRoomForCompaction(cfd, inputs, bg_error);
    bg_error.PermitUncheckedError();  // bg_error is just a copy of the Status
                                      // from the error_handler_
42 43 44 45
    if (enough_room) {
      *sfm_reserved_compact_space = true;
    }
  }
46
#else
47
  (void)cfd;
48 49
  (void)inputs;
  (void)sfm_reserved_compact_space;
50 51 52 53 54 55 56 57 58 59 60 61
#endif  // ROCKSDB_LITE
  if (!enough_room) {
    // Just in case tests want to change the value of enough_room
    TEST_SYNC_POINT_CALLBACK(
        "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
    ROCKS_LOG_BUFFER(log_buffer,
                     "Cancelled compaction because not enough room");
    RecordTick(stats_, COMPACTION_CANCELLED, 1);
  }
  return enough_room;
}

Y
Yanqin Jin 已提交
62
bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
63 64 65 66 67 68 69 70 71 72 73
                                    std::unique_ptr<TaskLimiterToken>* token,
                                    LogBuffer* log_buffer) {
  assert(*token == nullptr);
  auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
      cfd->ioptions()->compaction_thread_limiter.get());
  if (limiter == nullptr) {
    return true;
  }
  *token = limiter->GetToken(force);
  if (*token != nullptr) {
    ROCKS_LOG_BUFFER(log_buffer,
Y
Yanqin Jin 已提交
74 75 76 77
                     "Thread limiter [%s] increase [%s] compaction task, "
                     "force: %s, tasks after: %d",
                     limiter->GetName().c_str(), cfd->GetName().c_str(),
                     force ? "true" : "false", limiter->GetOutstandingTask());
78 79 80 81 82
    return true;
  }
  return false;
}

83
IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
S
Siying Dong 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
  TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
  mutex_.AssertHeld();
  autovector<log::Writer*, 1> logs_to_sync;
  uint64_t current_log_number = logfile_number_;
  while (logs_.front().number < current_log_number &&
         logs_.front().getting_synced) {
    log_sync_cv_.Wait();
  }
  for (auto it = logs_.begin();
       it != logs_.end() && it->number < current_log_number; ++it) {
    auto& log = *it;
    assert(!log.getting_synced);
    log.getting_synced = true;
    logs_to_sync.push_back(log.writer);
  }

100
  IOStatus io_s;
S
Siying Dong 已提交
101 102 103 104 105 106 107
  if (!logs_to_sync.empty()) {
    mutex_.Unlock();

    for (log::Writer* log : logs_to_sync) {
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
                     "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
                     log->get_log_number());
108 109
      io_s = log->file()->Sync(immutable_db_options_.use_fsync);
      if (!io_s.ok()) {
110 111
        break;
      }
112 113

      if (immutable_db_options_.recycle_log_file_num > 0) {
114 115
        io_s = log->Close();
        if (!io_s.ok()) {
116 117 118
          break;
        }
      }
S
Siying Dong 已提交
119
    }
120 121
    if (io_s.ok()) {
      io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
S
Siying Dong 已提交
122 123 124 125 126 127
    }

    mutex_.Lock();

    // "number <= current_log_number - 1" is equivalent to
    // "number < current_log_number".
128 129 130 131 132
    if (io_s.ok()) {
      io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
    } else {
      MarkLogsNotSynced(current_log_number - 1);
    }
133
    if (!io_s.ok()) {
134
      if (total_log_size_ > 0) {
135
        error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
136 137
      } else {
        // If the WAL is empty, we use different error reason
138
        error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
139
      }
S
Siying Dong 已提交
140
      TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
141
      return io_s;
S
Siying Dong 已提交
142 143
    }
  }
144
  return io_s;
S
Siying Dong 已提交
145 146 147 148
}

Status DBImpl::FlushMemTableToOutputFile(
    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
149
    bool* made_progress, JobContext* job_context,
150 151 152
    SuperVersionContext* superversion_context,
    std::vector<SequenceNumber>& snapshot_seqs,
    SequenceNumber earliest_write_conflict_snapshot,
153 154
    SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
    Env::Priority thread_pri) {
S
Siying Dong 已提交
155
  mutex_.AssertHeld();
156
  assert(cfd);
S
Siying Dong 已提交
157 158 159 160
  assert(cfd->imm()->NumNotFlushed() != 0);
  assert(cfd->imm()->IsFlushPending());

  FlushJob flush_job(
161
      dbname_, cfd, immutable_db_options_, mutable_cf_options,
162 163 164 165
      port::kMaxUint64 /* memtable_id */, file_options_for_compaction_,
      versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
      earliest_write_conflict_snapshot, snapshot_checker, job_context,
      log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
S
Siying Dong 已提交
166
      GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
167
      &event_logger_, mutable_cf_options.report_bg_io_stats,
168
      true /* sync_output_directory */, true /* write_manifest */, thread_pri,
169 170
      io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(),
      &blob_callback_);
S
Siying Dong 已提交
171 172
  FileMetaData file_meta;

173
  TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
S
Siying Dong 已提交
174
  flush_job.PickMemTable();
175
  TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
S
Siying Dong 已提交
176

Y
Yi Wu 已提交
177 178
#ifndef ROCKSDB_LITE
  // may temporarily unlock and lock the mutex.
179
  NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
Y
Yi Wu 已提交
180 181
#endif  // ROCKSDB_LITE

S
Siying Dong 已提交
182
  Status s;
183
  IOStatus io_s = IOStatus::OK();
S
Siying Dong 已提交
184
  if (logfile_number_ > 0 &&
185
      versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
S
Siying Dong 已提交
186 187 188 189 190 191
    // If there are more than one column families, we need to make sure that
    // all the log files except the most recent one are synced. Otherwise if
    // the host crashes after flushing and before WAL is persistent, the
    // flushed SST may contain data from write batches whose updates to
    // other column families are missing.
    // SyncClosedLogs() may unlock and re-lock the db_mutex.
192 193
    io_s = SyncClosedLogs(job_context);
    s = io_s;
194 195
    if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
        !io_s.IsColumnFamilyDropped()) {
196
      error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
197
    }
198 199
  } else {
    TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
S
Siying Dong 已提交
200 201 202 203 204 205 206 207 208
  }

  // Within flush_job.Run, rocksdb may call event listener to notify
  // file creation and deletion.
  //
  // Note that flush_job.Run will unlock and lock the db_mutex,
  // and EventListener callback will be called when the db_mutex
  // is unlocked by the current thread.
  if (s.ok()) {
S
Siying Dong 已提交
209
    s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
S
Siying Dong 已提交
210 211 212
  } else {
    flush_job.Cancel();
  }
213 214 215
  if (io_s.ok()) {
    io_s = flush_job.io_status();
  }
S
Siying Dong 已提交
216 217

  if (s.ok()) {
218 219
    InstallSuperVersionAndScheduleWork(cfd, superversion_context,
                                       mutable_cf_options);
S
Siying Dong 已提交
220
    if (made_progress) {
221
      *made_progress = true;
S
Siying Dong 已提交
222
    }
223 224 225 226 227 228 229 230 231

    const std::string& column_family_name = cfd->GetName();

    Version* const current = cfd->current();
    assert(current);

    const VersionStorageInfo* const storage_info = current->storage_info();
    assert(storage_info);

S
Siying Dong 已提交
232 233
    VersionStorageInfo::LevelSummaryStorage tmp;
    ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
234 235 236 237 238 239 240 241 242 243 244
                     column_family_name.c_str(),
                     storage_info->LevelSummary(&tmp));

    const auto& blob_files = storage_info->GetBlobFiles();
    if (!blob_files.empty()) {
      ROCKS_LOG_BUFFER(log_buffer,
                       "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
                       "\n",
                       column_family_name.c_str(), blob_files.begin()->first,
                       blob_files.rbegin()->first);
    }
S
Siying Dong 已提交
245 246
  }

247
  if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
248 249
    if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
        !io_s.IsColumnFamilyDropped()) {
250 251 252 253 254
      // Error while writing to MANIFEST.
      // In fact, versions_->io_status() can also be the result of renaming
      // CURRENT file. With current code, it's just difficult to tell. So just
      // be pessimistic and try write to a new MANIFEST.
      // TODO: distinguish between MANIFEST write and CURRENT renaming
255
      if (!versions_->io_status().ok()) {
256 257
        if (total_log_size_ > 0) {
          // If the WAL is empty, we use different error reason
258 259
          error_handler_.SetBGError(io_s,
                                    BackgroundErrorReason::kManifestWrite);
260
        } else {
261 262
          error_handler_.SetBGError(io_s,
                                    BackgroundErrorReason::kManifestWriteNoWAL);
263
        }
264
      } else if (total_log_size_ > 0) {
265
        error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
266 267
      } else {
        // If the WAL is empty, we use different error reason
268
        error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
269
      }
270 271
    } else {
      Status new_bg_error = s;
272
      error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
273
    }
274 275 276 277
  } else {
    // If we got here, then we decided not to care about the i_os status (either
    // from never needing it or ignoring the flush job status
    io_s.PermitUncheckedError();
S
Siying Dong 已提交
278 279 280 281
  }
  if (s.ok()) {
#ifndef ROCKSDB_LITE
    // may temporarily unlock and lock the mutex.
282 283
    NotifyOnFlushCompleted(cfd, mutable_cf_options,
                           flush_job.GetCommittedFlushJobsInfo());
S
Siying Dong 已提交
284 285 286 287 288
    auto sfm = static_cast<SstFileManagerImpl*>(
        immutable_db_options_.sst_file_manager.get());
    if (sfm) {
      // Notify sst_file_manager that a new file was added
      std::string file_path = MakeTableFileName(
289
          cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
290 291 292 293
      // TODO (PR7798).  We should only add the file to the FileManager if it
      // exists. Otherwise, some tests may fail.  Ignore the error in the
      // interim.
      sfm->OnAddFile(file_path).PermitUncheckedError();
294
      if (sfm->IsMaxAllowedSpaceReached()) {
295 296
        Status new_bg_error =
            Status::SpaceLimit("Max allowed space was reached");
S
Siying Dong 已提交
297 298
        TEST_SYNC_POINT_CALLBACK(
            "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
299
            &new_bg_error);
300
        error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
S
Siying Dong 已提交
301 302 303 304
      }
    }
#endif  // ROCKSDB_LITE
  }
305
  TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
S
Siying Dong 已提交
306 307 308
  return s;
}

309 310
Status DBImpl::FlushMemTablesToOutputFiles(
    const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
311
    JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
312
  if (immutable_db_options_.atomic_flush) {
313 314
    return AtomicFlushMemTablesToOutputFiles(
        bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
Y
Yanqin Jin 已提交
315
  }
316
  assert(bg_flush_args.size() == 1);
317 318 319 320 321
  std::vector<SequenceNumber> snapshot_seqs;
  SequenceNumber earliest_write_conflict_snapshot;
  SnapshotChecker* snapshot_checker;
  GetSnapshotContext(job_context, &snapshot_seqs,
                     &earliest_write_conflict_snapshot, &snapshot_checker);
322 323 324 325 326 327 328 329 330 331
  const auto& bg_flush_arg = bg_flush_args[0];
  ColumnFamilyData* cfd = bg_flush_arg.cfd_;
  MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  SuperVersionContext* superversion_context =
      bg_flush_arg.superversion_context_;
  Status s = FlushMemTableToOutputFile(
      cfd, mutable_cf_options, made_progress, job_context, superversion_context,
      snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
      log_buffer, thread_pri);
  return s;
332 333
}

334 335 336 337 338 339 340 341 342 343 344
/*
 * Atomically flushes multiple column families.
 *
 * For each column family, all memtables with ID smaller than or equal to the
 * ID specified in bg_flush_args will be flushed. Only after all column
 * families finish flush will this function commit to MANIFEST. If any of the
 * column families are not flushed successfully, this function does not have
 * any side-effect on the state of the database.
 */
Status DBImpl::AtomicFlushMemTablesToOutputFiles(
    const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
345
    JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
346 347 348 349 350 351 352 353 354 355 356 357 358 359
  mutex_.AssertHeld();

  autovector<ColumnFamilyData*> cfds;
  for (const auto& arg : bg_flush_args) {
    cfds.emplace_back(arg.cfd_);
  }

#ifndef NDEBUG
  for (const auto cfd : cfds) {
    assert(cfd->imm()->NumNotFlushed() != 0);
    assert(cfd->imm()->IsFlushPending());
  }
#endif /* !NDEBUG */

360
  std::vector<SequenceNumber> snapshot_seqs;
361
  SequenceNumber earliest_write_conflict_snapshot;
362 363 364
  SnapshotChecker* snapshot_checker;
  GetSnapshotContext(job_context, &snapshot_seqs,
                     &earliest_write_conflict_snapshot, &snapshot_checker);
365

366
  autovector<FSDirectory*> distinct_output_dirs;
367
  autovector<std::string> distinct_output_dir_paths;
368
  std::vector<std::unique_ptr<FlushJob>> jobs;
369
  std::vector<MutableCFOptions> all_mutable_cf_options;
370
  int num_cfs = static_cast<int>(cfds.size());
371
  all_mutable_cf_options.reserve(num_cfs);
372 373
  for (int i = 0; i < num_cfs; ++i) {
    auto cfd = cfds[i];
374
    FSDirectory* data_dir = GetDataDir(cfd, 0U);
375
    const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
376 377 378 379 380

    // Add to distinct output directories if eligible. Use linear search. Since
    // the number of elements in the vector is not large, performance should be
    // tolerable.
    bool found = false;
381 382
    for (const auto& path : distinct_output_dir_paths) {
      if (path == curr_path) {
383 384 385 386 387
        found = true;
        break;
      }
    }
    if (!found) {
388
      distinct_output_dir_paths.emplace_back(curr_path);
389 390 391
      distinct_output_dirs.emplace_back(data_dir);
    }

392 393
    all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
    const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
394
    uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
395
    jobs.emplace_back(new FlushJob(
396
        dbname_, cfd, immutable_db_options_, mutable_cf_options,
397
        max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
398 399 400 401
        &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
        snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
        data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
        stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
402
        false /* sync_output_directory */, false /* write_manifest */,
403 404
        thread_pri, io_tracer_, db_id_, db_session_id_,
        cfd->GetFullHistoryTsLow()));
405
    jobs.back()->PickMemTable();
406 407
  }

408
  std::vector<FileMetaData> file_meta(num_cfs);
409
  Status s;
410
  IOStatus io_s;
411 412 413
  assert(num_cfs == static_cast<int>(jobs.size()));

#ifndef ROCKSDB_LITE
414 415
  for (int i = 0; i != num_cfs; ++i) {
    const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
416 417
    // may temporarily unlock and lock the mutex.
    NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
418
                       job_context->job_id);
419
  }
420
#endif /* !ROCKSDB_LITE */
421 422 423 424

  if (logfile_number_ > 0) {
    // TODO (yanqin) investigate whether we should sync the closed logs for
    // single column family case.
425 426
    io_s = SyncClosedLogs(job_context);
    s = io_s;
427 428
  }

Y
Yanqin Jin 已提交
429 430 431
  // exec_status stores the execution status of flush_jobs as
  // <bool /* executed */, Status /* status code */>
  autovector<std::pair<bool, Status>> exec_status;
432
  autovector<IOStatus> io_status;
Y
Yanqin Jin 已提交
433 434
  for (int i = 0; i != num_cfs; ++i) {
    // Initially all jobs are not executed, with status OK.
435
    exec_status.emplace_back(false, Status::OK());
436
    io_status.emplace_back(IOStatus::OK());
Y
Yanqin Jin 已提交
437 438
  }

439 440
  if (s.ok()) {
    // TODO (yanqin): parallelize jobs with threads.
441
    for (int i = 1; i != num_cfs; ++i) {
Y
Yanqin Jin 已提交
442
      exec_status[i].second =
443
          jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
Y
Yanqin Jin 已提交
444
      exec_status[i].first = true;
445
      io_status[i] = jobs[i]->io_status();
446
    }
447 448 449 450 451 452
    if (num_cfs > 1) {
      TEST_SYNC_POINT(
          "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
      TEST_SYNC_POINT(
          "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
    }
453 454
    assert(exec_status.size() > 0);
    assert(!file_meta.empty());
455
    exec_status[0].second =
456
        jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
457
    exec_status[0].first = true;
458
    io_status[0] = jobs[0]->io_status();
459 460 461 462 463

    Status error_status;
    for (const auto& e : exec_status) {
      if (!e.second.ok()) {
        s = e.second;
464 465
        if (!e.second.IsShutdownInProgress() &&
            !e.second.IsColumnFamilyDropped()) {
466 467 468 469 470
          // If a flush job did not return OK, and the CF is not dropped, and
          // the DB is not shutting down, then we have to return this result to
          // caller later.
          error_status = e.second;
        }
471 472
      }
    }
473 474

    s = error_status.ok() ? s : error_status;
475 476
  }

477 478 479 480 481 482 483 484 485 486 487 488 489 490
  if (io_s.ok()) {
    IOStatus io_error = IOStatus::OK();
    for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
      if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
          !io_status[i].IsColumnFamilyDropped()) {
        io_error = io_status[i];
      }
    }
    io_s = io_error;
    if (s.ok() && !io_s.ok()) {
      s = io_s;
    }
  }

491
  if (s.IsColumnFamilyDropped()) {
492 493 494
    s = Status::OK();
  }

495
  if (s.ok() || s.IsShutdownInProgress()) {
496 497 498
    // Sync on all distinct output directories.
    for (auto dir : distinct_output_dirs) {
      if (dir != nullptr) {
499
        Status error_status = dir->Fsync(IOOptions(), nullptr);
500 501
        if (!error_status.ok()) {
          s = error_status;
502 503 504 505
          break;
        }
      }
    }
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
  } else {
    // Need to undo atomic flush if something went wrong, i.e. s is not OK and
    // it is not because of CF drop.
    // Have to cancel the flush jobs that have NOT executed because we need to
    // unref the versions.
    for (int i = 0; i != num_cfs; ++i) {
      if (!exec_status[i].first) {
        jobs[i]->Cancel();
      }
    }
    for (int i = 0; i != num_cfs; ++i) {
      if (exec_status[i].first && exec_status[i].second.ok()) {
        auto& mems = jobs[i]->GetMemTables();
        cfds[i]->imm()->RollbackMemtableFlush(mems,
                                              file_meta[i].fd.GetNumber());
      }
    }
523
  }
524

525 526 527 528
  if (s.ok()) {
    auto wait_to_install_func = [&]() {
      bool ready = true;
      for (size_t i = 0; i != cfds.size(); ++i) {
529
        const auto& mems = jobs[i]->GetMemTables();
530 531
        if (cfds[i]->IsDropped()) {
          // If the column family is dropped, then do not wait.
532
          continue;
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547
        } else if (!mems.empty() &&
                   cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
          // If a flush job needs to install the flush result for mems and
          // mems[0] is not the earliest memtable, it means another thread must
          // be installing flush results for the same column family, then the
          // current thread needs to wait.
          ready = false;
          break;
        } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
                                       bg_flush_args[i].max_memtable_id_) {
          // If a flush job does not need to install flush results, then it has
          // to wait until all memtables up to max_memtable_id_ (inclusive) are
          // installed.
          ready = false;
          break;
548
        }
549
      }
550 551 552 553 554 555 556 557 558 559 560 561 562
      return ready;
    };

    bool resuming_from_bg_err = error_handler_.IsDBStopped();
    while ((!error_handler_.IsDBStopped() ||
            error_handler_.GetRecoveryError().ok()) &&
           !wait_to_install_func()) {
      atomic_flush_install_cv_.Wait();
    }

    s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
                             : error_handler_.GetBGError();
  }
563

564 565 566 567
  if (s.ok()) {
    autovector<ColumnFamilyData*> tmp_cfds;
    autovector<const autovector<MemTable*>*> mems_list;
    autovector<const MutableCFOptions*> mutable_cf_options_list;
568
    autovector<FileMetaData*> tmp_file_meta;
569
    for (int i = 0; i != num_cfs; ++i) {
570
      const auto& mems = jobs[i]->GetMemTables();
571 572 573
      if (!cfds[i]->IsDropped() && !mems.empty()) {
        tmp_cfds.emplace_back(cfds[i]);
        mems_list.emplace_back(&mems);
574
        mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
575
        tmp_file_meta.emplace_back(&file_meta[i]);
576
      }
577
    }
578 579 580

    s = InstallMemtableAtomicFlushResults(
        nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
581
        versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
582
        &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
583 584
  }

585
  if (s.ok()) {
586 587 588
    assert(num_cfs ==
           static_cast<int>(job_context->superversion_contexts.size()));
    for (int i = 0; i != num_cfs; ++i) {
589 590
      assert(cfds[i]);

591 592 593
      if (cfds[i]->IsDropped()) {
        continue;
      }
594 595
      InstallSuperVersionAndScheduleWork(cfds[i],
                                         &job_context->superversion_contexts[i],
596
                                         all_mutable_cf_options[i]);
597 598 599 600 601 602 603 604 605

      const std::string& column_family_name = cfds[i]->GetName();

      Version* const current = cfds[i]->current();
      assert(current);

      const VersionStorageInfo* const storage_info = current->storage_info();
      assert(storage_info);

606 607
      VersionStorageInfo::LevelSummaryStorage tmp;
      ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
608 609 610 611 612 613 614 615 616 617 618
                       column_family_name.c_str(),
                       storage_info->LevelSummary(&tmp));

      const auto& blob_files = storage_info->GetBlobFiles();
      if (!blob_files.empty()) {
        ROCKS_LOG_BUFFER(log_buffer,
                         "[%s] Blob file summary: head=%" PRIu64
                         ", tail=%" PRIu64 "\n",
                         column_family_name.c_str(), blob_files.begin()->first,
                         blob_files.rbegin()->first);
      }
619 620 621 622 623 624 625
    }
    if (made_progress) {
      *made_progress = true;
    }
#ifndef ROCKSDB_LITE
    auto sfm = static_cast<SstFileManagerImpl*>(
        immutable_db_options_.sst_file_manager.get());
626
    assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
M
mrambacher 已提交
627
    for (int i = 0; s.ok() && i != num_cfs; ++i) {
628 629 630
      if (cfds[i]->IsDropped()) {
        continue;
      }
631 632
      NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
                             jobs[i]->GetCommittedFlushJobsInfo());
633 634 635
      if (sfm) {
        std::string file_path = MakeTableFileName(
            cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
636 637 638 639
        // TODO (PR7798).  We should only add the file to the FileManager if it
        // exists. Otherwise, some tests may fail.  Ignore the error in the
        // interim.
        sfm->OnAddFile(file_path).PermitUncheckedError();
640 641 642 643
        if (sfm->IsMaxAllowedSpaceReached() &&
            error_handler_.GetBGError().ok()) {
          Status new_bg_error =
              Status::SpaceLimit("Max allowed space was reached");
644 645
          error_handler_.SetBGError(new_bg_error,
                                    BackgroundErrorReason::kFlush);
646 647 648 649 650 651
        }
      }
    }
#endif  // ROCKSDB_LITE
  }

652 653 654
  // Need to undo atomic flush if something went wrong, i.e. s is not OK and
  // it is not because of CF drop.
  if (!s.ok() && !s.IsColumnFamilyDropped()) {
655
    if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
656 657 658 659 660
      // Error while writing to MANIFEST.
      // In fact, versions_->io_status() can also be the result of renaming
      // CURRENT file. With current code, it's just difficult to tell. So just
      // be pessimistic and try write to a new MANIFEST.
      // TODO: distinguish between MANIFEST write and CURRENT renaming
661
      if (!versions_->io_status().ok()) {
662 663
        if (total_log_size_ > 0) {
          // If the WAL is empty, we use different error reason
664 665
          error_handler_.SetBGError(io_s,
                                    BackgroundErrorReason::kManifestWrite);
666
        } else {
667 668
          error_handler_.SetBGError(io_s,
                                    BackgroundErrorReason::kManifestWriteNoWAL);
669
        }
670
      } else if (total_log_size_ > 0) {
671
        error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
672 673
      } else {
        // If the WAL is empty, we use different error reason
674
        error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
675
      }
676 677
    } else {
      Status new_bg_error = s;
678
      error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
679
    }
680 681 682 683 684
  }

  return s;
}

Y
Yi Wu 已提交
685 686
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
                                const MutableCFOptions& mutable_cf_options,
687
                                int job_id) {
Y
Yi Wu 已提交
688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
#ifndef ROCKSDB_LITE
  if (immutable_db_options_.listeners.size() == 0U) {
    return;
  }
  mutex_.AssertHeld();
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }
  bool triggered_writes_slowdown =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_slowdown_writes_trigger);
  bool triggered_writes_stop =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_stop_writes_trigger);
  // release lock while notifying events
  mutex_.Unlock();
  {
705
    FlushJobInfo info{};
706
    info.cf_id = cfd->GetID();
Y
Yi Wu 已提交
707 708 709
    info.cf_name = cfd->GetName();
    // TODO(yhchiang): make db_paths dynamic in case flush does not
    //                 go to L0 in the future.
710 711 712 713
    const uint64_t file_number = file_meta->fd.GetNumber();
    info.file_path =
        MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
    info.file_number = file_number;
Y
Yi Wu 已提交
714 715 716 717
    info.thread_id = env_->GetThreadID();
    info.job_id = job_id;
    info.triggered_writes_slowdown = triggered_writes_slowdown;
    info.triggered_writes_stop = triggered_writes_stop;
718 719
    info.smallest_seqno = file_meta->fd.smallest_seqno;
    info.largest_seqno = file_meta->fd.largest_seqno;
720
    info.flush_reason = cfd->GetFlushReason();
Y
Yi Wu 已提交
721 722 723 724 725 726 727
    for (auto listener : immutable_db_options_.listeners) {
      listener->OnFlushBegin(this, info);
    }
  }
  mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
728 729 730 731 732
#else
  (void)cfd;
  (void)file_meta;
  (void)mutable_cf_options;
  (void)job_id;
Y
Yi Wu 已提交
733 734 735
#endif  // ROCKSDB_LITE
}

736 737 738
void DBImpl::NotifyOnFlushCompleted(
    ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
    std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
S
Siying Dong 已提交
739
#ifndef ROCKSDB_LITE
740
  assert(flush_jobs_info != nullptr);
S
Siying Dong 已提交
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
  if (immutable_db_options_.listeners.size() == 0U) {
    return;
  }
  mutex_.AssertHeld();
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }
  bool triggered_writes_slowdown =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_slowdown_writes_trigger);
  bool triggered_writes_stop =
      (cfd->current()->storage_info()->NumLevelFiles(0) >=
       mutable_cf_options.level0_stop_writes_trigger);
  // release lock while notifying events
  mutex_.Unlock();
  {
757 758 759 760 761 762
    for (auto& info : *flush_jobs_info) {
      info->triggered_writes_slowdown = triggered_writes_slowdown;
      info->triggered_writes_stop = triggered_writes_stop;
      for (auto listener : immutable_db_options_.listeners) {
        listener->OnFlushCompleted(this, *info);
      }
S
Siying Dong 已提交
763
    }
764
    flush_jobs_info->clear();
S
Siying Dong 已提交
765 766 767 768
  }
  mutex_.Lock();
  // no need to signal bg_cv_ as it will be signaled at the end of the
  // flush process.
769 770 771
#else
  (void)cfd;
  (void)mutable_cf_options;
772
  (void)flush_jobs_info;
S
Siying Dong 已提交
773 774 775 776 777
#endif  // ROCKSDB_LITE
}

Status DBImpl::CompactRange(const CompactRangeOptions& options,
                            ColumnFamilyHandle* column_family,
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
                            const Slice* begin_without_ts,
                            const Slice* end_without_ts) {
  const Comparator* const ucmp = column_family->GetComparator();
  assert(ucmp);
  size_t ts_sz = ucmp->timestamp_size();
  if (ts_sz == 0) {
    return CompactRangeInternal(options, column_family, begin_without_ts,
                                end_without_ts);
  }

  std::string begin_str;
  std::string end_str;

  // CompactRange compact all keys: [begin, end] inclusively. Add maximum
  // timestamp to include all `begin` keys, and add minimal timestamp to include
  // all `end` keys.
  if (begin_without_ts != nullptr) {
    AppendKeyWithMaxTimestamp(&begin_str, *begin_without_ts, ts_sz);
  }
  if (end_without_ts != nullptr) {
    AppendKeyWithMinTimestamp(&end_str, *end_without_ts, ts_sz);
  }
  Slice begin(begin_str);
  Slice end(end_str);

  Slice* begin_with_ts = begin_without_ts ? &begin : nullptr;
  Slice* end_with_ts = end_without_ts ? &end : nullptr;

  return CompactRangeInternal(options, column_family, begin_with_ts,
                              end_with_ts);
}

810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyData* cfd,
                                        std::string ts_low) {
  VersionEdit edit;
  edit.SetColumnFamily(cfd->GetID());
  edit.SetFullHistoryTsLow(ts_low);

  InstrumentedMutexLock l(&mutex_);
  std::string current_ts_low = cfd->GetFullHistoryTsLow();
  const Comparator* ucmp = cfd->user_comparator();
  if (!current_ts_low.empty() &&
      ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
    return Status::InvalidArgument(
        "Cannot decrease full_history_timestamp_low");
  }

  return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
                                &mutex_);
}

829 830 831
Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
                                    ColumnFamilyHandle* column_family,
                                    const Slice* begin, const Slice* end) {
832
  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
833 834 835
  auto cfd = cfh->cfd();

  if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
S
Siying Dong 已提交
836 837 838
    return Status::InvalidArgument("Invalid target path ID");
  }

839
  bool flush_needed = true;
840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855

  // Update full_history_ts_low if it's set
  if (options.full_history_ts_low != nullptr &&
      !options.full_history_ts_low->empty()) {
    std::string ts_low = options.full_history_ts_low->ToString();
    if (begin != nullptr || end != nullptr) {
      return Status::InvalidArgument(
          "Cannot specify compaction range with full_history_ts_low");
    }
    Status s = IncreaseFullHistoryTsLow(cfd, ts_low);
    if (!s.ok()) {
      LogFlush(immutable_db_options_.info_log);
      return s;
    }
  }

M
mrambacher 已提交
856
  Status s;
857 858 859 860 861
  if (begin != nullptr && end != nullptr) {
    // TODO(ajkr): We could also optimize away the flush in certain cases where
    // one/both sides of the interval are unbounded. But it requires more
    // changes to RangesOverlapWithMemtables.
    Range range(*begin, *end);
862
    SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
M
mrambacher 已提交
863 864 865
    s = cfd->RangesOverlapWithMemtables(
        {range}, super_version, immutable_db_options_.allow_data_in_errors,
        &flush_needed);
866 867 868
    CleanupSuperVersion(super_version);
  }

M
mrambacher 已提交
869
  if (s.ok() && flush_needed) {
870 871
    FlushOptions fo;
    fo.allow_write_stall = options.allow_write_stall;
872
    if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
873
      autovector<ColumnFamilyData*> cfds;
874
      mutex_.Lock();
Y
Yanqin Jin 已提交
875
      SelectColumnFamiliesForAtomicFlush(&cfds);
876
      mutex_.Unlock();
Y
Yanqin Jin 已提交
877 878 879 880 881 882
      s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
                               false /* writes_stopped */);
    } else {
      s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
                        false /* writes_stopped*/);
    }
883 884 885 886
    if (!s.ok()) {
      LogFlush(immutable_db_options_.info_log);
      return s;
    }
S
Siying Dong 已提交
887 888
  }

889 890 891
  constexpr int kInvalidLevel = -1;
  int final_output_level = kInvalidLevel;
  bool exclusive = options.exclusive_manual_compaction;
S
Siying Dong 已提交
892 893 894
  if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
      cfd->NumberLevels() > 1) {
    // Always compact all files together.
895 896 897 898 899
    final_output_level = cfd->NumberLevels() - 1;
    // if bottom most level is reserved
    if (immutable_db_options_.allow_ingest_behind) {
      final_output_level--;
    }
S
Siying Dong 已提交
900
    s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
901
                            final_output_level, options, begin, end, exclusive,
902
                            false, port::kMaxUint64);
S
Siying Dong 已提交
903
  } else {
904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
    int first_overlapped_level = kInvalidLevel;
    int max_overlapped_level = kInvalidLevel;
    {
      SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
      Version* current_version = super_version->current;
      ReadOptions ro;
      ro.total_order_seek = true;
      bool overlap;
      for (int level = 0;
           level < current_version->storage_info()->num_non_empty_levels();
           level++) {
        overlap = true;
        if (begin != nullptr && end != nullptr) {
          Status status = current_version->OverlapWithLevelIterator(
              ro, file_options_, *begin, *end, level, &overlap);
          if (!status.ok()) {
            overlap = current_version->storage_info()->OverlapInLevel(
                level, begin, end);
          }
        } else {
          overlap = current_version->storage_info()->OverlapInLevel(level,
                                                                    begin, end);
S
Siying Dong 已提交
926
        }
927 928 929 930 931
        if (overlap) {
          if (first_overlapped_level == kInvalidLevel) {
            first_overlapped_level = level;
          }
          max_overlapped_level = level;
S
Siying Dong 已提交
932 933
        }
      }
934 935 936 937 938 939 940 941 942 943 944
      CleanupSuperVersion(super_version);
    }
    if (s.ok() && first_overlapped_level != kInvalidLevel) {
      // max_file_num_to_ignore can be used to filter out newly created SST
      // files, useful for bottom level compaction in a manual compaction
      uint64_t max_file_num_to_ignore = port::kMaxUint64;
      uint64_t next_file_number = versions_->current_next_file_number();
      final_output_level = max_overlapped_level;
      int output_level;
      for (int level = first_overlapped_level; level <= max_overlapped_level;
           level++) {
945
        bool disallow_trivial_move = false;
946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
        // in case the compaction is universal or if we're compacting the
        // bottom-most level, the output level will be the same as input one.
        // level 0 can never be the bottommost level (i.e. if all files are in
        // level 0, we will compact to level 1)
        if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
            cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
          output_level = level;
        } else if (level == max_overlapped_level && level > 0) {
          if (options.bottommost_level_compaction ==
              BottommostLevelCompaction::kSkip) {
            // Skip bottommost level compaction
            continue;
          } else if (options.bottommost_level_compaction ==
                         BottommostLevelCompaction::kIfHaveCompactionFilter &&
                     cfd->ioptions()->compaction_filter == nullptr &&
                     cfd->ioptions()->compaction_filter_factory == nullptr) {
            // Skip bottommost level compaction since we don't have a compaction
            // filter
            continue;
          }
          output_level = level;
          // update max_file_num_to_ignore only for bottom level compaction
          // because data in newly compacted files in middle levels may still
          // need to be pushed down
          max_file_num_to_ignore = next_file_number;
        } else {
          output_level = level + 1;
          if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
              cfd->ioptions()->level_compaction_dynamic_level_bytes &&
              level == 0) {
            output_level = ColumnFamilyData::kCompactToBaseLevel;
          }
978 979 980 981 982 983 984 985 986
          // if it's a BottommostLevel compaction and `kForce*` compaction is
          // set, disallow trivial move
          if (level == max_overlapped_level &&
              (options.bottommost_level_compaction ==
                   BottommostLevelCompaction::kForce ||
               options.bottommost_level_compaction ==
                   BottommostLevelCompaction::kForceOptimized)) {
            disallow_trivial_move = true;
          }
987 988
        }
        s = RunManualCompaction(cfd, level, output_level, options, begin, end,
989 990
                                exclusive, disallow_trivial_move,
                                max_file_num_to_ignore);
991 992 993 994 995 996 997 998 999 1000
        if (!s.ok()) {
          break;
        }
        if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
          final_output_level = cfd->NumberLevels() - 1;
        } else if (output_level > final_output_level) {
          final_output_level = output_level;
        }
        TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
        TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
S
Siying Dong 已提交
1001 1002 1003
      }
    }
  }
1004
  if (!s.ok() || final_output_level == kInvalidLevel) {
S
Siying Dong 已提交
1005 1006 1007 1008 1009
    LogFlush(immutable_db_options_.info_log);
    return s;
  }

  if (options.change_level) {
1010 1011 1012
    TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:1");
    TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:2");

S
Siying Dong 已提交
1013 1014
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[RefitLevel] waiting for background threads to stop");
1015
    DisableManualCompaction();
S
Siying Dong 已提交
1016 1017
    s = PauseBackgroundWork();
    if (s.ok()) {
1018
      TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
S
Siying Dong 已提交
1019
      s = ReFitLevel(cfd, final_output_level, options.target_level);
1020
      TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
1021
      // ContinueBackgroundWork always return Status::OK().
1022 1023
      Status temp_s = ContinueBackgroundWork();
      assert(temp_s.ok());
S
Siying Dong 已提交
1024
    }
1025
    EnableManualCompaction();
S
Siying Dong 已提交
1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
  }
  LogFlush(immutable_db_options_.info_log);

  {
    InstrumentedMutexLock l(&mutex_);
    // an automatic compaction that has been scheduled might have been
    // preempted by the manual compactions. Need to schedule it back.
    MaybeScheduleFlushOrCompaction();
  }

  return s;
}

1039 1040 1041 1042
Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
                            ColumnFamilyHandle* column_family,
                            const std::vector<std::string>& input_file_names,
                            const int output_level, const int output_path_id,
1043 1044
                            std::vector<std::string>* const output_file_names,
                            CompactionJobInfo* compaction_job_info) {
S
Siying Dong 已提交
1045
#ifdef ROCKSDB_LITE
1046 1047 1048 1049 1050 1051
  (void)compact_options;
  (void)column_family;
  (void)input_file_names;
  (void)output_level;
  (void)output_path_id;
  (void)output_file_names;
1052
  (void)compaction_job_info;
1053
  // not supported in lite version
S
Siying Dong 已提交
1054 1055 1056 1057 1058 1059
  return Status::NotSupported("Not supported in ROCKSDB LITE");
#else
  if (column_family == nullptr) {
    return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
  }

1060 1061
  auto cfd =
      static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
S
Siying Dong 已提交
1062 1063 1064 1065 1066 1067 1068 1069
  assert(cfd);

  Status s;
  JobContext job_context(0, true);
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
                       immutable_db_options_.info_log.get());

  // Perform CompactFiles
D
DorianZheng 已提交
1070
  TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
S
Siying Dong 已提交
1071 1072 1073 1074 1075 1076 1077
  {
    InstrumentedMutexLock l(&mutex_);

    // This call will unlock/lock the mutex to wait for current running
    // IngestExternalFile() calls to finish.
    WaitForIngestFile();

D
DorianZheng 已提交
1078 1079 1080 1081 1082 1083
    // We need to get current after `WaitForIngestFile`, because
    // `IngestExternalFile` may add files that overlap with `input_file_names`
    auto* current = cfd->current();
    current->Ref();

    s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
1084
                         output_file_names, output_level, output_path_id,
1085
                         &job_context, &log_buffer, compaction_job_info);
D
DorianZheng 已提交
1086 1087

    current->Unref();
S
Siying Dong 已提交
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
  }

  // Find and delete obsolete files
  {
    InstrumentedMutexLock l(&mutex_);
    // If !s.ok(), this means that Compaction failed. In that case, we want
    // to delete all obsolete files we might have created and we force
    // FindObsoleteFiles(). This is because job_context does not
    // catch all created files if compaction failed.
    FindObsoleteFiles(&job_context, !s.ok());
  }  // release the mutex

  // delete unnecessary files if any, this is done outside the mutex
1101 1102
  if (job_context.HaveSomethingToClean() ||
      job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
S
Siying Dong 已提交
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
    // Have to flush the info logs before bg_compaction_scheduled_--
    // because if bg_flush_scheduled_ becomes 0 and the lock is
    // released, the deconstructor of DB can kick in and destroy all the
    // states of DB so info_log might not be available after that point.
    // It also applies to access other states that DB owns.
    log_buffer.FlushBufferToLog();
    if (job_context.HaveSomethingToDelete()) {
      // no mutex is locked here.  No need to Unlock() and Lock() here.
      PurgeObsoleteFiles(job_context);
    }
    job_context.Clean();
  }

  return s;
#endif  // ROCKSDB_LITE
}

#ifndef ROCKSDB_LITE
Status DBImpl::CompactFilesImpl(
    const CompactionOptions& compact_options, ColumnFamilyData* cfd,
    Version* version, const std::vector<std::string>& input_file_names,
1124
    std::vector<std::string>* const output_file_names, const int output_level,
1125 1126
    int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
    CompactionJobInfo* compaction_job_info) {
S
Siying Dong 已提交
1127 1128 1129 1130 1131
  mutex_.AssertHeld();

  if (shutting_down_.load(std::memory_order_acquire)) {
    return Status::ShutdownInProgress();
  }
1132
  if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1133 1134
    return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
  }
S
Siying Dong 已提交
1135 1136

  std::unordered_set<uint64_t> input_set;
1137
  for (const auto& file_name : input_file_names) {
S
Siying Dong 已提交
1138 1139 1140 1141 1142 1143 1144 1145 1146
    input_set.insert(TableFileNameToNumber(file_name));
  }

  ColumnFamilyMetaData cf_meta;
  // TODO(yhchiang): can directly use version here if none of the
  // following functions call is pluggable to external developers.
  version->GetColumnFamilyMetaData(&cf_meta);

  if (output_path_id < 0) {
1147
    if (cfd->ioptions()->cf_paths.size() == 1U) {
S
Siying Dong 已提交
1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
      output_path_id = 0;
    } else {
      return Status::NotSupported(
          "Automatic output path selection is not "
          "yet supported in CompactFiles()");
    }
  }

  Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
      &input_set, cf_meta, output_level);
  if (!s.ok()) {
    return s;
  }

  std::vector<CompactionInputFiles> input_files;
  s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
      &input_files, &input_set, version->storage_info(), compact_options);
  if (!s.ok()) {
    return s;
  }

1169
  for (const auto& inputs : input_files) {
S
Siying Dong 已提交
1170
    if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
S
Siying Dong 已提交
1171 1172 1173 1174 1175
      return Status::Aborted(
          "Some of the necessary compaction input "
          "files are already being compacted");
    }
  }
1176 1177 1178
  bool sfm_reserved_compact_space = false;
  // First check if we have enough room to do the compaction
  bool enough_room = EnoughRoomForCompaction(
1179
      cfd, input_files, &sfm_reserved_compact_space, log_buffer);
1180 1181 1182 1183 1184 1185

  if (!enough_room) {
    // m's vars will get set properly at the end of this function,
    // as long as status == CompactionTooLarge
    return Status::CompactionTooLarge();
  }
S
Siying Dong 已提交
1186 1187 1188 1189

  // At this point, CompactFiles will be run.
  bg_compaction_scheduled_++;

1190
  std::unique_ptr<Compaction> c;
S
Siying Dong 已提交
1191
  assert(cfd->compaction_picker());
S
Siying Dong 已提交
1192
  c.reset(cfd->compaction_picker()->CompactFiles(
S
Siying Dong 已提交
1193
      compact_options, input_files, output_level, version->storage_info(),
1194
      *cfd->GetLatestMutableCFOptions(), mutable_db_options_, output_path_id));
1195 1196 1197 1198
  // we already sanitized the set of input files and checked for conflicts
  // without releasing the lock, so we're guaranteed a compaction can be formed.
  assert(c != nullptr);

S
Siying Dong 已提交
1199 1200 1201 1202
  c->SetInputVersion(version);
  // deletion compaction currently not allowed in CompactFiles.
  assert(!c->deletion_compaction());

1203
  std::vector<SequenceNumber> snapshot_seqs;
S
Siying Dong 已提交
1204
  SequenceNumber earliest_write_conflict_snapshot;
1205 1206 1207
  SnapshotChecker* snapshot_checker;
  GetSnapshotContext(job_context, &snapshot_seqs,
                     &earliest_write_conflict_snapshot, &snapshot_checker);
S
Siying Dong 已提交
1208

1209 1210 1211
  std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
      new std::list<uint64_t>::iterator(
          CaptureCurrentFileNumberInPendingOutputs()));
S
Siying Dong 已提交
1212 1213

  assert(is_snapshot_supported_ || snapshots_.empty());
1214
  CompactionJobStats compaction_job_stats;
S
Siying Dong 已提交
1215
  CompactionJob compaction_job(
1216
      job_context->job_id, c.get(), immutable_db_options_,
1217
      file_options_for_compaction_, versions_.get(), &shutting_down_,
1218
      preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
1219 1220 1221 1222
      GetDataDir(c->column_family_data(), c->output_path_id()),
      GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_,
      snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
      table_cache_, &event_logger_,
1223
      c->mutable_cf_options()->paranoid_file_checks,
S
Siying Dong 已提交
1224
      c->mutable_cf_options()->report_bg_io_stats, dbname_,
1225
      &compaction_job_stats, Env::Priority::USER, io_tracer_,
1226 1227
      &manual_compaction_paused_, db_id_, db_session_id_,
      c->column_family_data()->GetFullHistoryTsLow());
S
Siying Dong 已提交
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240

  // Creating a compaction influences the compaction score because the score
  // takes running compactions into account (by skipping files that are already
  // being compacted). Since we just changed compaction score, we recalculate it
  // here.
  version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
                                                  *c->mutable_cf_options());

  compaction_job.Prepare();

  mutex_.Unlock();
  TEST_SYNC_POINT("CompactFilesImpl:0");
  TEST_SYNC_POINT("CompactFilesImpl:1");
M
mrambacher 已提交
1241 1242
  // Ignore the status here, as it will be checked in the Install down below...
  compaction_job.Run().PermitUncheckedError();
S
Siying Dong 已提交
1243 1244 1245 1246 1247 1248
  TEST_SYNC_POINT("CompactFilesImpl:2");
  TEST_SYNC_POINT("CompactFilesImpl:3");
  mutex_.Lock();

  Status status = compaction_job.Install(*c->mutable_cf_options());
  if (status.ok()) {
1249
    assert(compaction_job.io_status().ok());
Y
Yanqin Jin 已提交
1250 1251 1252
    InstallSuperVersionAndScheduleWork(c->column_family_data(),
                                       &job_context->superversion_contexts[0],
                                       *c->mutable_cf_options());
S
Siying Dong 已提交
1253
  }
1254 1255 1256 1257
  // status above captures any error during compaction_job.Install, so its ok
  // not check compaction_job.io_status() explicitly if we're not calling
  // SetBGError
  compaction_job.io_status().PermitUncheckedError();
S
Siying Dong 已提交
1258
  c->ReleaseCompactionFiles(s);
1259 1260 1261 1262 1263 1264 1265 1266
#ifndef ROCKSDB_LITE
  // Need to make sure SstFileManager does its bookkeeping
  auto sfm = static_cast<SstFileManagerImpl*>(
      immutable_db_options_.sst_file_manager.get());
  if (sfm && sfm_reserved_compact_space) {
    sfm->OnCompactionCompletion(c.get());
  }
#endif  // ROCKSDB_LITE
S
Siying Dong 已提交
1267 1268 1269

  ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

1270 1271 1272 1273 1274
  if (compaction_job_info != nullptr) {
    BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
                           job_context->job_id, version, compaction_job_info);
  }

S
Siying Dong 已提交
1275 1276
  if (status.ok()) {
    // Done
1277
  } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
S
Siying Dong 已提交
1278
    // Ignore compaction errors found during shutting down
1279 1280 1281 1282 1283 1284
  } else if (status.IsManualCompactionPaused()) {
    // Don't report stopping manual compaction as error
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[%s] [JOB %d] Stopping manual compaction",
                   c->column_family_data()->GetName().c_str(),
                   job_context->job_id);
S
Siying Dong 已提交
1285 1286 1287 1288 1289
  } else {
    ROCKS_LOG_WARN(immutable_db_options_.info_log,
                   "[%s] [JOB %d] Compaction error: %s",
                   c->column_family_data()->GetName().c_str(),
                   job_context->job_id, status.ToString().c_str());
1290 1291
    IOStatus io_s = compaction_job.io_status();
    if (!io_s.ok()) {
1292
      error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
1293
    } else {
1294
      error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1295
    }
S
Siying Dong 已提交
1296 1297
  }

1298
  if (output_file_names != nullptr) {
1299
    for (const auto& newf : c->edit()->GetNewFiles()) {
1300 1301 1302 1303
      (*output_file_names)
          .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
                                   newf.second.fd.GetNumber(),
                                   newf.second.fd.GetPathId()));
1304 1305 1306
    }
  }

S
Siying Dong 已提交
1307 1308 1309 1310 1311 1312
  c.reset();

  bg_compaction_scheduled_--;
  if (bg_compaction_scheduled_ == 0) {
    bg_cv_.SignalAll();
  }
1313
  MaybeScheduleFlushOrCompaction();
1314
  TEST_SYNC_POINT("CompactFilesImpl:End");
S
Siying Dong 已提交
1315 1316 1317 1318 1319 1320 1321 1322

  return status;
}
#endif  // ROCKSDB_LITE

Status DBImpl::PauseBackgroundWork() {
  InstrumentedMutexLock guard_lock(&mutex_);
  bg_compaction_paused_++;
1323 1324
  while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
         bg_flush_scheduled_ > 0) {
S
Siying Dong 已提交
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347
    bg_cv_.Wait();
  }
  bg_work_paused_++;
  return Status::OK();
}

Status DBImpl::ContinueBackgroundWork() {
  InstrumentedMutexLock guard_lock(&mutex_);
  if (bg_work_paused_ == 0) {
    return Status::InvalidArgument();
  }
  assert(bg_work_paused_ > 0);
  assert(bg_compaction_paused_ > 0);
  bg_compaction_paused_--;
  bg_work_paused_--;
  // It's sufficient to check just bg_work_paused_ here since
  // bg_work_paused_ is always no greater than bg_compaction_paused_
  if (bg_work_paused_ == 0) {
    MaybeScheduleFlushOrCompaction();
  }
  return Status::OK();
}

1348 1349
void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
                                     const Status& st,
P
Peter Pei 已提交
1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
                                     const CompactionJobStats& job_stats,
                                     int job_id) {
#ifndef ROCKSDB_LITE
  if (immutable_db_options_.listeners.empty()) {
    return;
  }
  mutex_.AssertHeld();
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }
1360
  if (c->is_manual_compaction() &&
1361
      manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1362 1363
    return;
  }
P
Peter Pei 已提交
1364 1365 1366 1367 1368 1369
  Version* current = cfd->current();
  current->Ref();
  // release lock while notifying events
  mutex_.Unlock();
  TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
  {
1370
    CompactionJobInfo info{};
1371
    BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info);
P
Peter Pei 已提交
1372 1373 1374
    for (auto listener : immutable_db_options_.listeners) {
      listener->OnCompactionBegin(this, info);
    }
1375
    info.status.PermitUncheckedError();
P
Peter Pei 已提交
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
  }
  mutex_.Lock();
  current->Unref();
#else
  (void)cfd;
  (void)c;
  (void)st;
  (void)job_stats;
  (void)job_id;
#endif  // ROCKSDB_LITE
}

S
Siying Dong 已提交
1388
void DBImpl::NotifyOnCompactionCompleted(
1389 1390
    ColumnFamilyData* cfd, Compaction* c, const Status& st,
    const CompactionJobStats& compaction_job_stats, const int job_id) {
S
Siying Dong 已提交
1391 1392 1393 1394 1395 1396 1397 1398
#ifndef ROCKSDB_LITE
  if (immutable_db_options_.listeners.size() == 0U) {
    return;
  }
  mutex_.AssertHeld();
  if (shutting_down_.load(std::memory_order_acquire)) {
    return;
  }
1399
  if (c->is_manual_compaction() &&
1400
      manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1401 1402
    return;
  }
1403 1404
  Version* current = cfd->current();
  current->Ref();
S
Siying Dong 已提交
1405 1406 1407 1408
  // release lock while notifying events
  mutex_.Unlock();
  TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
  {
1409
    CompactionJobInfo info{};
1410 1411
    BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
                           &info);
S
Siying Dong 已提交
1412 1413 1414 1415 1416
    for (auto listener : immutable_db_options_.listeners) {
      listener->OnCompactionCompleted(this, info);
    }
  }
  mutex_.Lock();
1417
  current->Unref();
S
Siying Dong 已提交
1418 1419
  // no need to signal bg_cv_ as it will be signaled at the end of the
  // flush process.
1420 1421 1422 1423 1424 1425
#else
  (void)cfd;
  (void)c;
  (void)st;
  (void)compaction_job_stats;
  (void)job_id;
S
Siying Dong 已提交
1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436
#endif  // ROCKSDB_LITE
}

// REQUIREMENT: block all background work by calling PauseBackgroundWork()
// before calling this function
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
  assert(level < cfd->NumberLevels());
  if (target_level >= cfd->NumberLevels()) {
    return Status::InvalidArgument("Target level exceeds number of levels");
  }

1437
  SuperVersionContext sv_context(/* create_superversion */ true);
S
Siying Dong 已提交
1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456

  InstrumentedMutexLock guard_lock(&mutex_);

  // only allow one thread refitting
  if (refitting_level_) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[ReFitLevel] another thread is refitting");
    return Status::NotSupported("another thread is refitting");
  }
  refitting_level_ = true;

  const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
  // move to a smaller level
  int to_level = target_level;
  if (target_level < 0) {
    to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
  }

  auto* vstorage = cfd->current()->storage_info();
1457 1458 1459
  if (to_level != level) {
    if (to_level > level) {
      if (level == 0) {
1460
        refitting_level_ = false;
S
Siying Dong 已提交
1461
        return Status::NotSupported(
1462 1463 1464 1465 1466
            "Cannot change from level 0 to other levels.");
      }
      // Check levels are empty for a trivial move
      for (int l = level + 1; l <= to_level; l++) {
        if (vstorage->NumLevelFiles(l) > 0) {
1467
          refitting_level_ = false;
1468 1469 1470 1471 1472 1473 1474 1475 1476
          return Status::NotSupported(
              "Levels between source and target are not empty for a move.");
        }
      }
    } else {
      // to_level < level
      // Check levels are empty for a trivial move
      for (int l = to_level; l < level; l++) {
        if (vstorage->NumLevelFiles(l) > 0) {
1477
          refitting_level_ = false;
1478 1479 1480
          return Status::NotSupported(
              "Levels between source and target are not empty for a move.");
        }
S
Siying Dong 已提交
1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492
      }
    }
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                    "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
                    cfd->current()->DebugString().data());

    VersionEdit edit;
    edit.SetColumnFamily(cfd->GetID());
    for (const auto& f : vstorage->LevelFiles(level)) {
      edit.DeleteFile(level, f->fd.GetNumber());
      edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
                   f->fd.GetFileSize(), f->smallest, f->largest,
1493
                   f->fd.smallest_seqno, f->fd.largest_seqno,
1494
                   f->marked_for_compaction, f->oldest_blob_file_number,
1495 1496
                   f->oldest_ancester_time, f->file_creation_time,
                   f->file_checksum, f->file_checksum_func_name);
S
Siying Dong 已提交
1497 1498 1499 1500 1501
    }
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                    "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
                    edit.DebugString().data());

1502 1503 1504
    Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit,
                                           &mutex_, directories_.GetDbDir());

1505
    InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
S
Siying Dong 已提交
1506 1507 1508 1509 1510 1511 1512 1513 1514

    ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
                    cfd->GetName().c_str(), status.ToString().data());

    if (status.ok()) {
      ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
                      "[%s] After refitting:\n%s", cfd->GetName().c_str(),
                      cfd->current()->DebugString().data());
    }
1515 1516 1517 1518
    sv_context.Clean();
    refitting_level_ = false;

    return status;
S
Siying Dong 已提交
1519 1520 1521
  }

  refitting_level_ = false;
1522
  return Status::OK();
S
Siying Dong 已提交
1523 1524 1525
}

int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1526
  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
S
Siying Dong 已提交
1527 1528 1529
  return cfh->cfd()->NumberLevels();
}

A
Andrew Kryczka 已提交
1530
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
S
Siying Dong 已提交
1531 1532 1533 1534
  return 0;
}

int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1535
  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
S
Siying Dong 已提交
1536
  InstrumentedMutexLock l(&mutex_);
1537 1538 1539
  return cfh->cfd()
      ->GetSuperVersion()
      ->mutable_cf_options.level0_stop_writes_trigger;
S
Siying Dong 已提交
1540 1541 1542 1543
}

Status DBImpl::Flush(const FlushOptions& flush_options,
                     ColumnFamilyHandle* column_family) {
1544
  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1545 1546
  ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
                 cfh->GetName().c_str());
Y
Yanqin Jin 已提交
1547
  Status s;
1548
  if (immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1549 1550 1551 1552 1553 1554
    s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
                             FlushReason::kManualFlush);
  } else {
    s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
  }

1555 1556 1557 1558
  ROCKS_LOG_INFO(immutable_db_options_.info_log,
                 "[%s] Manual flush finished, status: %s\n",
                 cfh->GetName().c_str(), s.ToString().c_str());
  return s;
S
Siying Dong 已提交
1559 1560
}

Y
Yanqin Jin 已提交
1561 1562
Status DBImpl::Flush(const FlushOptions& flush_options,
                     const std::vector<ColumnFamilyHandle*>& column_families) {
1563
  Status s;
1564
  if (!immutable_db_options_.atomic_flush) {
Y
Yanqin Jin 已提交
1565 1566 1567 1568 1569
    for (auto cfh : column_families) {
      s = Flush(flush_options, cfh);
      if (!s.ok()) {
        break;
      }
1570
    }
Y
Yanqin Jin 已提交
1571 1572 1573 1574 1575 1576 1577 1578
  } else {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "Manual atomic flush start.\n"
                   "=====Column families:=====");
    for (auto cfh : column_families) {
      auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
      ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
                     cfhi->GetName().c_str());
1579
    }
Y
Yanqin Jin 已提交
1580 1581 1582 1583 1584 1585 1586 1587 1588 1589
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "=====End of column families list=====");
    autovector<ColumnFamilyData*> cfds;
    std::for_each(column_families.begin(), column_families.end(),
                  [&cfds](ColumnFamilyHandle* elem) {
                    auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
                    cfds.emplace_back(cfh->cfd());
                  });
    s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
1590 1591 1592
                   "Manual atomic flush finished, status: %s\n"
                   "=====Column families:=====",
                   s.ToString().c_str());
Y
Yanqin Jin 已提交
1593 1594 1595 1596
    for (auto cfh : column_families) {
      auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
      ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
                     cfhi->GetName().c_str());
1597
    }
Y
Yanqin Jin 已提交
1598 1599
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "=====End of column families list=====");
1600 1601 1602 1603
  }
  return s;
}

1604 1605 1606 1607 1608
Status DBImpl::RunManualCompaction(
    ColumnFamilyData* cfd, int input_level, int output_level,
    const CompactRangeOptions& compact_range_options, const Slice* begin,
    const Slice* end, bool exclusive, bool disallow_trivial_move,
    uint64_t max_file_num_to_ignore) {
S
Siying Dong 已提交
1609 1610 1611 1612 1613 1614 1615 1616
  assert(input_level == ColumnFamilyData::kCompactAllLevels ||
         input_level >= 0);

  InternalKey begin_storage, end_storage;
  CompactionArg* ca;

  bool scheduled = false;
  bool manual_conflict = false;
1617
  ManualCompactionState manual;
S
Siying Dong 已提交
1618 1619 1620
  manual.cfd = cfd;
  manual.input_level = input_level;
  manual.output_level = output_level;
1621
  manual.output_path_id = compact_range_options.target_path_id;
S
Siying Dong 已提交
1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633
  manual.done = false;
  manual.in_progress = false;
  manual.incomplete = false;
  manual.exclusive = exclusive;
  manual.disallow_trivial_move = disallow_trivial_move;
  // For universal compaction, we enforce every manual compaction to compact
  // all files.
  if (begin == nullptr ||
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
    manual.begin = nullptr;
  } else {
A
Amy Xu 已提交
1634
    begin_storage.SetMinPossibleForUserKey(*begin);
S
Siying Dong 已提交
1635 1636 1637 1638 1639 1640 1641
    manual.begin = &begin_storage;
  }
  if (end == nullptr ||
      cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
      cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
    manual.end = nullptr;
  } else {
A
Amy Xu 已提交
1642
    end_storage.SetMaxPossibleForUserKey(*end);
S
Siying Dong 已提交
1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666
    manual.end = &end_storage;
  }

  TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
  TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
  InstrumentedMutexLock l(&mutex_);

  // When a manual compaction arrives, temporarily disable scheduling of
  // non-manual compactions and wait until the number of scheduled compaction
  // jobs drops to zero. This is needed to ensure that this manual compaction
  // can compact any range of keys/files.
  //
  // HasPendingManualCompaction() is true when at least one thread is inside
  // RunManualCompaction(), i.e. during that time no other compaction will
  // get scheduled (see MaybeScheduleFlushOrCompaction).
  //
  // Note that the following loop doesn't stop more that one thread calling
  // RunManualCompaction() from getting to the second while loop below.
  // However, only one of them will actually schedule compaction, while
  // others will wait on a condition variable until it completes.

  AddManualCompaction(&manual);
  TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
  if (exclusive) {
1667 1668
    while (bg_bottom_compaction_scheduled_ > 0 ||
           bg_compaction_scheduled_ > 0) {
1669
      TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
S
Siying Dong 已提交
1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681
      ROCKS_LOG_INFO(
          immutable_db_options_.info_log,
          "[%s] Manual compaction waiting for all other scheduled background "
          "compactions to finish",
          cfd->GetName().c_str());
      bg_cv_.Wait();
    }
  }

  ROCKS_LOG_INFO(immutable_db_options_.info_log,
                 "[%s] Manual compaction starting", cfd->GetName().c_str());

1682 1683
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
                       immutable_db_options_.info_log.get());
S
Siying Dong 已提交
1684 1685 1686 1687 1688 1689
  // We don't check bg_error_ here, because if we get the error in compaction,
  // the compaction will set manual.status to bg_error_ and set manual.done to
  // true.
  while (!manual.done) {
    assert(HasPendingManualCompaction());
    manual_conflict = false;
1690
    Compaction* compaction = nullptr;
S
Siying Dong 已提交
1691 1692
    if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
        scheduled ||
1693
        (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1694
         ((compaction = manual.cfd->CompactRange(
1695 1696 1697
               *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_,
               manual.input_level, manual.output_level, compact_range_options,
               manual.begin, manual.end, &manual.manual_end, &manual_conflict,
1698
               max_file_num_to_ignore)) == nullptr &&
1699
          manual_conflict))) {
S
Siying Dong 已提交
1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
      // exclusive manual compactions should not see a conflict during
      // CompactRange
      assert(!exclusive || !manual_conflict);
      // Running either this or some other manual compaction
      bg_cv_.Wait();
      if (scheduled && manual.incomplete == true) {
        assert(!manual.in_progress);
        scheduled = false;
        manual.incomplete = false;
      }
    } else if (!scheduled) {
1711
      if (compaction == nullptr) {
S
Siying Dong 已提交
1712 1713 1714 1715 1716 1717
        manual.done = true;
        bg_cv_.SignalAll();
        continue;
      }
      ca = new CompactionArg;
      ca->db = this;
1718 1719 1720
      ca->prepicked_compaction = new PrepickedCompaction;
      ca->prepicked_compaction->manual_compaction_state = &manual;
      ca->prepicked_compaction->compaction = compaction;
Y
Yanqin Jin 已提交
1721 1722 1723 1724
      if (!RequestCompactionToken(
              cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
        // Don't throttle manual compaction, only count outstanding tasks.
        assert(false);
1725
      }
S
Siying Dong 已提交
1726 1727
      manual.incomplete = false;
      bg_compaction_scheduled_++;
1728 1729 1730 1731 1732 1733
      Env::Priority thread_pool_pri = Env::Priority::LOW;
      if (compaction->bottommost_level() &&
          env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
        thread_pool_pri = Env::Priority::BOTTOM;
      }
      env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this,
1734
                     &DBImpl::UnscheduleCompactionCallback);
S
Siying Dong 已提交
1735 1736 1737 1738
      scheduled = true;
    }
  }

1739
  log_buffer.FlushBufferToLog();
S
Siying Dong 已提交
1740 1741 1742 1743 1744 1745 1746
  assert(!manual.in_progress);
  assert(HasPendingManualCompaction());
  RemoveManualCompaction(&manual);
  bg_cv_.SignalAll();
  return manual.status;
}

Y
Yanqin Jin 已提交
1747 1748 1749
void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
                                  FlushRequest* req) {
  assert(req != nullptr);
1750
  req->reserve(cfds.size());
Y
Yanqin Jin 已提交
1751 1752 1753 1754 1755 1756 1757 1758 1759 1760
  for (const auto cfd : cfds) {
    if (nullptr == cfd) {
      // cfd may be null, see DBImpl::ScheduleFlushes
      continue;
    }
    uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
    req->emplace_back(cfd, max_memtable_id);
  }
}

S
Siying Dong 已提交
1761 1762
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
                             const FlushOptions& flush_options,
1763
                             FlushReason flush_reason, bool writes_stopped) {
1764 1765
  // This method should not be called if atomic_flush is true.
  assert(!immutable_db_options_.atomic_flush);
S
Siying Dong 已提交
1766
  Status s;
1767 1768 1769 1770 1771 1772 1773 1774
  if (!flush_options.allow_write_stall) {
    bool flush_needed = true;
    s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
    TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
    if (!s.ok() || !flush_needed) {
      return s;
    }
  }
1775

1776 1777
  autovector<FlushRequest> flush_reqs;
  autovector<uint64_t> memtable_ids_to_wait;
S
Siying Dong 已提交
1778 1779 1780 1781 1782
  {
    WriteContext context;
    InstrumentedMutexLock guard_lock(&mutex_);

    WriteThread::Writer w;
1783
    WriteThread::Writer nonmem_w;
S
Siying Dong 已提交
1784 1785
    if (!writes_stopped) {
      write_thread_.EnterUnbatched(&w, &mutex_);
1786 1787 1788
      if (two_write_queues_) {
        nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
      }
S
Siying Dong 已提交
1789
    }
1790
    WaitForPendingWrites();
S
Siying Dong 已提交
1791

1792 1793 1794 1795 1796 1797 1798 1799 1800 1801
    if (flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
        (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) {
      // Note that, when flush reason is kErrorRecoveryRetryFlush, during the
      // auto retry resume, we want to avoid creating new small memtables.
      // Therefore, SwitchMemtable will not be called. Also, since ResumeImpl
      // will iterate through all the CFs and call FlushMemtable during auto
      // retry resume, it is possible that in some CFs,
      // cfd->imm()->NumNotFlushed() = 0. In this case, so no flush request will
      // be created and scheduled, status::OK() will be returned.
      s = SwitchMemtable(cfd, &context);
1802
    }
1803
    const uint64_t flush_memtable_id = port::kMaxUint64;
1804 1805 1806
    if (s.ok()) {
      if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
          !cached_recoverable_state_empty_.load()) {
1807 1808 1809
        FlushRequest req{{cfd, flush_memtable_id}};
        flush_reqs.emplace_back(std::move(req));
        memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
1810
      }
1811 1812
      if (immutable_db_options_.persist_stats_to_disk &&
          flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831
        ColumnFamilyData* cfd_stats =
            versions_->GetColumnFamilySet()->GetColumnFamily(
                kPersistentStatsColumnFamilyName);
        if (cfd_stats != nullptr && cfd_stats != cfd &&
            !cfd_stats->mem()->IsEmpty()) {
          // only force flush stats CF when it will be the only CF lagging
          // behind after the current flush
          bool stats_cf_flush_needed = true;
          for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
            if (loop_cfd == cfd_stats || loop_cfd == cfd) {
              continue;
            }
            if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
              stats_cf_flush_needed = false;
            }
          }
          if (stats_cf_flush_needed) {
            ROCKS_LOG_INFO(immutable_db_options_.info_log,
                           "Force flushing stats CF with manual flush of %s "
1832 1833
                           "to avoid holding old logs",
                           cfd->GetName().c_str());
1834
            s = SwitchMemtable(cfd_stats, &context);
1835 1836 1837 1838
            FlushRequest req{{cfd_stats, flush_memtable_id}};
            flush_reqs.emplace_back(std::move(req));
            memtable_ids_to_wait.emplace_back(
                cfd->imm()->GetLatestMemTableID());
1839 1840 1841
          }
        }
      }
1842
    }
1843 1844 1845 1846 1847

    if (s.ok() && !flush_reqs.empty()) {
      for (const auto& req : flush_reqs) {
        assert(req.size() == 1);
        ColumnFamilyData* loop_cfd = req[0].first;
1848 1849
        loop_cfd->imm()->FlushRequested();
      }
1850 1851 1852 1853 1854
      // If the caller wants to wait for this flush to complete, it indicates
      // that the caller expects the ColumnFamilyData not to be free'ed by
      // other threads which may drop the column family concurrently.
      // Therefore, we increase the cfd's ref count.
      if (flush_options.wait) {
1855 1856 1857
        for (const auto& req : flush_reqs) {
          assert(req.size() == 1);
          ColumnFamilyData* loop_cfd = req[0].first;
1858 1859 1860
          loop_cfd->Ref();
        }
      }
1861 1862 1863
      for (const auto& req : flush_reqs) {
        SchedulePendingFlush(req, flush_reason);
      }
1864 1865
      MaybeScheduleFlushOrCompaction();
    }
S
Siying Dong 已提交
1866 1867 1868

    if (!writes_stopped) {
      write_thread_.ExitUnbatched(&w);
1869 1870 1871
      if (two_write_queues_) {
        nonmem_write_thread_.ExitUnbatched(&nonmem_w);
      }
S
Siying Dong 已提交
1872 1873
    }
  }
1874 1875
  TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
  TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
S
Siying Dong 已提交
1876
  if (s.ok() && flush_options.wait) {
1877 1878
    autovector<ColumnFamilyData*> cfds;
    autovector<const uint64_t*> flush_memtable_ids;
1879 1880 1881 1882 1883
    assert(flush_reqs.size() == memtable_ids_to_wait.size());
    for (size_t i = 0; i < flush_reqs.size(); ++i) {
      assert(flush_reqs[i].size() == 1);
      cfds.push_back(flush_reqs[i][0].first);
      flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
1884
    }
1885 1886 1887 1888
    s = WaitForFlushMemTables(
        cfds, flush_memtable_ids,
        (flush_reason == FlushReason::kErrorRecovery ||
         flush_reason == FlushReason::kErrorRecoveryRetryFlush));
1889
    InstrumentedMutexLock lock_guard(&mutex_);
1890
    for (auto* tmp_cfd : cfds) {
1891
      tmp_cfd->UnrefAndTryDelete();
1892
    }
S
Siying Dong 已提交
1893
  }
1894
  TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
S
Siying Dong 已提交
1895 1896 1897
  return s;
}

1898
// Flush all elements in 'column_family_datas'
Y
Yanqin Jin 已提交
1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926
// and atomically record the result to the MANIFEST.
Status DBImpl::AtomicFlushMemTables(
    const autovector<ColumnFamilyData*>& column_family_datas,
    const FlushOptions& flush_options, FlushReason flush_reason,
    bool writes_stopped) {
  Status s;
  if (!flush_options.allow_write_stall) {
    int num_cfs_to_flush = 0;
    for (auto cfd : column_family_datas) {
      bool flush_needed = true;
      s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
      if (!s.ok()) {
        return s;
      } else if (flush_needed) {
        ++num_cfs_to_flush;
      }
    }
    if (0 == num_cfs_to_flush) {
      return s;
    }
  }
  FlushRequest flush_req;
  autovector<ColumnFamilyData*> cfds;
  {
    WriteContext context;
    InstrumentedMutexLock guard_lock(&mutex_);

    WriteThread::Writer w;
1927
    WriteThread::Writer nonmem_w;
Y
Yanqin Jin 已提交
1928 1929
    if (!writes_stopped) {
      write_thread_.EnterUnbatched(&w, &mutex_);
1930 1931 1932
      if (two_write_queues_) {
        nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
      }
Y
Yanqin Jin 已提交
1933
    }
1934
    WaitForPendingWrites();
Y
Yanqin Jin 已提交
1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945

    for (auto cfd : column_family_datas) {
      if (cfd->IsDropped()) {
        continue;
      }
      if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
          !cached_recoverable_state_empty_.load()) {
        cfds.emplace_back(cfd);
      }
    }
    for (auto cfd : cfds) {
1946 1947
      if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) ||
          flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
1948 1949
        continue;
      }
Y
Yanqin Jin 已提交
1950 1951
      cfd->Ref();
      s = SwitchMemtable(cfd, &context);
1952
      cfd->UnrefAndTryDelete();
Y
Yanqin Jin 已提交
1953 1954 1955 1956 1957 1958 1959 1960 1961
      if (!s.ok()) {
        break;
      }
    }
    if (s.ok()) {
      AssignAtomicFlushSeq(cfds);
      for (auto cfd : cfds) {
        cfd->imm()->FlushRequested();
      }
1962 1963 1964 1965 1966 1967 1968 1969 1970
      // If the caller wants to wait for this flush to complete, it indicates
      // that the caller expects the ColumnFamilyData not to be free'ed by
      // other threads which may drop the column family concurrently.
      // Therefore, we increase the cfd's ref count.
      if (flush_options.wait) {
        for (auto cfd : cfds) {
          cfd->Ref();
        }
      }
Y
Yanqin Jin 已提交
1971 1972 1973 1974 1975 1976 1977
      GenerateFlushRequest(cfds, &flush_req);
      SchedulePendingFlush(flush_req, flush_reason);
      MaybeScheduleFlushOrCompaction();
    }

    if (!writes_stopped) {
      write_thread_.ExitUnbatched(&w);
1978 1979 1980
      if (two_write_queues_) {
        nonmem_write_thread_.ExitUnbatched(&nonmem_w);
      }
Y
Yanqin Jin 已提交
1981 1982
    }
  }
1983
  TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
1984
  TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
Y
Yanqin Jin 已提交
1985 1986 1987 1988 1989
  if (s.ok() && flush_options.wait) {
    autovector<const uint64_t*> flush_memtable_ids;
    for (auto& iter : flush_req) {
      flush_memtable_ids.push_back(&(iter.second));
    }
1990 1991 1992 1993
    s = WaitForFlushMemTables(
        cfds, flush_memtable_ids,
        (flush_reason == FlushReason::kErrorRecovery ||
         flush_reason == FlushReason::kErrorRecoveryRetryFlush));
1994
    InstrumentedMutexLock lock_guard(&mutex_);
1995
    for (auto* cfd : cfds) {
1996
      cfd->UnrefAndTryDelete();
1997
    }
Y
Yanqin Jin 已提交
1998 1999 2000 2001
  }
  return s;
}

2002 2003 2004 2005 2006 2007 2008
// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
// cause write stall, for example if one memtable is being flushed already.
// This method tries to avoid write stall (similar to CompactRange() behavior)
// it emulates how the SuperVersion / LSM would change if flush happens, checks
// it against various constrains and delays flush if it'd cause write stall.
// Called should check status and flush_needed to see if flush already happened.
Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
2009
                                                 bool* flush_needed) {
2010 2011 2012 2013 2014 2015 2016
  {
    *flush_needed = true;
    InstrumentedMutexLock l(&mutex_);
    uint64_t orig_active_memtable_id = cfd->mem()->GetID();
    WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
    do {
      if (write_stall_condition != WriteStallCondition::kNormal) {
2017 2018 2019 2020 2021 2022 2023 2024
        // Same error handling as user writes: Don't wait if there's a
        // background error, even if it's a soft error. We might wait here
        // indefinitely as the pending flushes/compactions may never finish
        // successfully, resulting in the stall condition lasting indefinitely
        if (error_handler_.IsBGWorkStopped()) {
          return error_handler_.GetBGError();
        }

2025 2026 2027 2028 2029 2030 2031
        TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
                       "[%s] WaitUntilFlushWouldNotStallWrites"
                       " waiting on stall conditions to clear",
                       cfd->GetName().c_str());
        bg_cv_.Wait();
      }
2032 2033 2034 2035
      if (cfd->IsDropped()) {
        return Status::ColumnFamilyDropped();
      }
      if (shutting_down_.load(std::memory_order_acquire)) {
2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
        return Status::ShutdownInProgress();
      }

      uint64_t earliest_memtable_id =
          std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
      if (earliest_memtable_id > orig_active_memtable_id) {
        // We waited so long that the memtable we were originally waiting on was
        // flushed.
        *flush_needed = false;
        return Status::OK();
      }

      const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
      const auto* vstorage = cfd->current()->storage_info();

      // Skip stalling check if we're below auto-flush and auto-compaction
      // triggers. If it stalled in these conditions, that'd mean the stall
      // triggers are so low that stalling is needed for any background work. In
      // that case we shouldn't wait since background work won't be scheduled.
      if (cfd->imm()->NumNotFlushed() <
              cfd->ioptions()->min_write_buffer_number_to_merge &&
          vstorage->l0_delay_trigger_count() <
              mutable_cf_options.level0_file_num_compaction_trigger) {
        break;
      }

      // check whether one extra immutable memtable or an extra L0 file would
      // cause write stalling mode to be entered. It could still enter stall
      // mode due to pending compaction bytes, but that's less common
F
fanrui03 已提交
2065 2066 2067 2068 2069 2070
      write_stall_condition = ColumnFamilyData::GetWriteStallConditionAndCause(
                                  cfd->imm()->NumNotFlushed() + 1,
                                  vstorage->l0_delay_trigger_count() + 1,
                                  vstorage->estimated_compaction_needed_bytes(),
                                  mutable_cf_options, *cfd->ioptions())
                                  .first;
2071 2072 2073 2074 2075
    } while (write_stall_condition != WriteStallCondition::kNormal);
  }
  return Status::OK();
}

2076 2077 2078 2079 2080 2081 2082 2083
// Wait for memtables to be flushed for multiple column families.
// let N = cfds.size()
// for i in [0, N),
//  1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
//     have to be flushed for THIS column family;
//  2) if flush_memtable_ids[i] is null, then all memtables in THIS column
//     family have to be flushed.
// Finish waiting when ALL column families finish flushing memtables.
Y
Yanqin Jin 已提交
2084 2085
// resuming_from_bg_err indicates whether the caller is trying to resume from
// background error or in normal processing.
2086 2087
Status DBImpl::WaitForFlushMemTables(
    const autovector<ColumnFamilyData*>& cfds,
Y
Yanqin Jin 已提交
2088 2089
    const autovector<const uint64_t*>& flush_memtable_ids,
    bool resuming_from_bg_err) {
2090
  int num = static_cast<int>(cfds.size());
S
Siying Dong 已提交
2091 2092
  // Wait until the compaction completes
  InstrumentedMutexLock l(&mutex_);
Y
Yanqin Jin 已提交
2093 2094 2095
  // If the caller is trying to resume from bg error, then
  // error_handler_.IsDBStopped() is true.
  while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
S
Siying Dong 已提交
2096 2097 2098
    if (shutting_down_.load(std::memory_order_acquire)) {
      return Status::ShutdownInProgress();
    }
Y
Yanqin Jin 已提交
2099 2100 2101 2102
    // If an error has occurred during resumption, then no need to wait.
    if (!error_handler_.GetRecoveryError().ok()) {
      break;
    }
2103 2104 2105 2106 2107 2108 2109
    // If BGWorkStopped, which indicate that there is a BG error and
    // 1) soft error but requires no BG work, 2) no in auto_recovery_
    if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() &&
        error_handler_.GetBGError().severity() < Status::Severity::kHardError) {
      return error_handler_.GetBGError();
    }

2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124
    // Number of column families that have been dropped.
    int num_dropped = 0;
    // Number of column families that have finished flush.
    int num_finished = 0;
    for (int i = 0; i < num; ++i) {
      if (cfds[i]->IsDropped()) {
        ++num_dropped;
      } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
                 (flush_memtable_ids[i] != nullptr &&
                  cfds[i]->imm()->GetEarliestMemTableID() >
                      *flush_memtable_ids[i])) {
        ++num_finished;
      }
    }
    if (1 == num_dropped && 1 == num) {
2125
      return Status::ColumnFamilyDropped();
S
Siying Dong 已提交
2126
    }
2127 2128 2129 2130 2131
    // Column families involved in this flush request have either been dropped
    // or finished flush. Then it's time to finish waiting.
    if (num_dropped + num_finished == num) {
      break;
    }
S
Siying Dong 已提交
2132 2133
    bg_cv_.Wait();
  }
2134
  Status s;
Y
Yanqin Jin 已提交
2135 2136 2137
  // If not resuming from bg error, and an error has caused the DB to stop,
  // then report the bg error to caller.
  if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
2138
    s = error_handler_.GetBGError();
S
Siying Dong 已提交
2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156
  }
  return s;
}

Status DBImpl::EnableAutoCompaction(
    const std::vector<ColumnFamilyHandle*>& column_family_handles) {
  Status s;
  for (auto cf_ptr : column_family_handles) {
    Status status =
        this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
    if (!status.ok()) {
      s = status;
    }
  }

  return s;
}

2157
void DBImpl::DisableManualCompaction() {
2158 2159 2160 2161 2162 2163 2164 2165 2166
  InstrumentedMutexLock l(&mutex_);
  manual_compaction_paused_.fetch_add(1, std::memory_order_release);
  // Wait for any pending manual compactions to finish (typically through
  // failing with `Status::Incomplete`) prior to returning. This way we are
  // guaranteed no pending manual compaction will commit while manual
  // compactions are "disabled".
  while (HasPendingManualCompaction()) {
    bg_cv_.Wait();
  }
2167 2168 2169
}

void DBImpl::EnableManualCompaction() {
2170 2171 2172
  InstrumentedMutexLock l(&mutex_);
  assert(manual_compaction_paused_ > 0);
  manual_compaction_paused_.fetch_sub(1, std::memory_order_release);
2173 2174
}

S
Siying Dong 已提交
2175 2176 2177 2178 2179 2180 2181 2182 2183
void DBImpl::MaybeScheduleFlushOrCompaction() {
  mutex_.AssertHeld();
  if (!opened_successfully_) {
    // Compaction may introduce data race to DB open
    return;
  }
  if (bg_work_paused_ > 0) {
    // we paused the background work
    return;
2184
  } else if (error_handler_.IsBGWorkStopped() &&
2185
             !error_handler_.IsRecoveryInProgress()) {
2186 2187 2188 2189
    // There has been a hard error and this call is not part of the recovery
    // sequence. Bail out here so we don't get into an endless loop of
    // scheduling BG work which will again call this function
    return;
S
Siying Dong 已提交
2190 2191 2192 2193
  } else if (shutting_down_.load(std::memory_order_acquire)) {
    // DB is being deleted; no more background compactions
    return;
  }
2194
  auto bg_job_limits = GetBGJobLimits();
2195
  bool is_flush_pool_empty =
2196
      env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
2197
  while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
2198
         bg_flush_scheduled_ < bg_job_limits.max_flushes) {
S
Siying Dong 已提交
2199
    bg_flush_scheduled_++;
2200 2201 2202 2203 2204
    FlushThreadArg* fta = new FlushThreadArg;
    fta->db_ = this;
    fta->thread_pri_ = Env::Priority::HIGH;
    env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
                   &DBImpl::UnscheduleFlushCallback);
2205 2206 2207 2208
    --unscheduled_flushes_;
    TEST_SYNC_POINT_CALLBACK(
        "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
        &unscheduled_flushes_);
S
Siying Dong 已提交
2209 2210
  }

2211 2212 2213
  // special case -- if high-pri (flush) thread pool is empty, then schedule
  // flushes in low-pri (compaction) thread pool.
  if (is_flush_pool_empty) {
S
Siying Dong 已提交
2214 2215
    while (unscheduled_flushes_ > 0 &&
           bg_flush_scheduled_ + bg_compaction_scheduled_ <
2216
               bg_job_limits.max_flushes) {
S
Siying Dong 已提交
2217
      bg_flush_scheduled_++;
2218 2219 2220 2221 2222
      FlushThreadArg* fta = new FlushThreadArg;
      fta->db_ = this;
      fta->thread_pri_ = Env::Priority::LOW;
      env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
                     &DBImpl::UnscheduleFlushCallback);
2223
      --unscheduled_flushes_;
S
Siying Dong 已提交
2224 2225 2226 2227 2228 2229
    }
  }

  if (bg_compaction_paused_ > 0) {
    // we paused the background compaction
    return;
2230 2231 2232 2233 2234 2235
  } else if (error_handler_.IsBGWorkStopped()) {
    // Compaction is not part of the recovery sequence from a hard error. We
    // might get here because recovery might do a flush and install a new
    // super version, which will try to schedule pending compactions. Bail
    // out here and let the higher level recovery handle compactions
    return;
S
Siying Dong 已提交
2236 2237 2238 2239 2240
  }

  if (HasExclusiveManualCompaction()) {
    // only manual compactions are allowed to run. don't schedule automatic
    // compactions
2241
    TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
S
Siying Dong 已提交
2242 2243 2244
    return;
  }

2245
  while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
S
Siying Dong 已提交
2246 2247 2248
         unscheduled_compactions_ > 0) {
    CompactionArg* ca = new CompactionArg;
    ca->db = this;
2249
    ca->prepicked_compaction = nullptr;
S
Siying Dong 已提交
2250 2251 2252
    bg_compaction_scheduled_++;
    unscheduled_compactions_--;
    env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
2253
                   &DBImpl::UnscheduleCompactionCallback);
S
Siying Dong 已提交
2254 2255 2256
  }
}

2257
DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
S
Siying Dong 已提交
2258
  mutex_.AssertHeld();
2259
  return GetBGJobLimits(mutable_db_options_.max_background_flushes,
2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274
                        mutable_db_options_.max_background_compactions,
                        mutable_db_options_.max_background_jobs,
                        write_controller_.NeedSpeedupCompaction());
}

DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
                                           int max_background_compactions,
                                           int max_background_jobs,
                                           bool parallelize_compactions) {
  BGJobLimits res;
  if (max_background_flushes == -1 && max_background_compactions == -1) {
    // for our first stab implementing max_background_jobs, simply allocate a
    // quarter of the threads to flushes.
    res.max_flushes = std::max(1, max_background_jobs / 4);
    res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
S
Siying Dong 已提交
2275
  } else {
2276 2277 2278 2279 2280 2281 2282 2283
    // compatibility code in case users haven't migrated to max_background_jobs,
    // which automatically computes flush/compaction limits
    res.max_flushes = std::max(1, max_background_flushes);
    res.max_compactions = std::max(1, max_background_compactions);
  }
  if (!parallelize_compactions) {
    // throttle background compactions until we deem necessary
    res.max_compactions = 1;
S
Siying Dong 已提交
2284
  }
2285
  return res;
S
Siying Dong 已提交
2286 2287 2288
}

void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
2289
  assert(!cfd->queued_for_compaction());
S
Siying Dong 已提交
2290 2291
  cfd->Ref();
  compaction_queue_.push_back(cfd);
2292
  cfd->set_queued_for_compaction(true);
S
Siying Dong 已提交
2293 2294 2295 2296 2297 2298
}

ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
  assert(!compaction_queue_.empty());
  auto cfd = *compaction_queue_.begin();
  compaction_queue_.pop_front();
2299 2300
  assert(cfd->queued_for_compaction());
  cfd->set_queued_for_compaction(false);
S
Siying Dong 已提交
2301 2302 2303
  return cfd;
}

2304
DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
S
Siying Dong 已提交
2305
  assert(!flush_queue_.empty());
2306
  FlushRequest flush_req = flush_queue_.front();
S
Siying Dong 已提交
2307
  flush_queue_.pop_front();
2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318
  if (!immutable_db_options_.atomic_flush) {
    assert(flush_req.size() == 1);
  }
  for (const auto& elem : flush_req) {
    if (!immutable_db_options_.atomic_flush) {
      ColumnFamilyData* cfd = elem.first;
      assert(cfd);
      assert(cfd->queued_for_flush());
      cfd->set_queued_for_flush(false);
    }
  }
2319
  // TODO: need to unset flush reason?
2320
  return flush_req;
S
Siying Dong 已提交
2321 2322
}

2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341
ColumnFamilyData* DBImpl::PickCompactionFromQueue(
    std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
  assert(!compaction_queue_.empty());
  assert(*token == nullptr);
  autovector<ColumnFamilyData*> throttled_candidates;
  ColumnFamilyData* cfd = nullptr;
  while (!compaction_queue_.empty()) {
    auto first_cfd = *compaction_queue_.begin();
    compaction_queue_.pop_front();
    assert(first_cfd->queued_for_compaction());
    if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
      throttled_candidates.push_back(first_cfd);
      continue;
    }
    cfd = first_cfd;
    cfd->set_queued_for_compaction(false);
    break;
  }
  // Add throttled compaction candidates back to queue in the original order.
Y
Yanqin Jin 已提交
2342 2343
  for (auto iter = throttled_candidates.rbegin();
       iter != throttled_candidates.rend(); ++iter) {
2344 2345 2346 2347 2348
    compaction_queue_.push_front(*iter);
  }
  return cfd;
}

2349
void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2350
                                  FlushReason flush_reason) {
2351
  mutex_.AssertHeld();
2352 2353 2354
  if (flush_req.empty()) {
    return;
  }
2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375
  if (!immutable_db_options_.atomic_flush) {
    // For the non-atomic flush case, we never schedule multiple column
    // families in the same flush request.
    assert(flush_req.size() == 1);
    ColumnFamilyData* cfd = flush_req[0].first;
    assert(cfd);
    if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
      cfd->Ref();
      cfd->set_queued_for_flush(true);
      cfd->SetFlushReason(flush_reason);
      ++unscheduled_flushes_;
      flush_queue_.push_back(flush_req);
    }
  } else {
    for (auto& iter : flush_req) {
      ColumnFamilyData* cfd = iter.first;
      cfd->Ref();
      cfd->SetFlushReason(flush_reason);
    }
    ++unscheduled_flushes_;
    flush_queue_.push_back(flush_req);
S
Siying Dong 已提交
2376 2377 2378 2379
  }
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
2380
  mutex_.AssertHeld();
2381
  if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
S
Siying Dong 已提交
2382 2383 2384 2385 2386
    AddToCompactionQueue(cfd);
    ++unscheduled_compactions_;
  }
}

2387 2388
void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
                                  FileType type, uint64_t number, int job_id) {
S
Siying Dong 已提交
2389
  mutex_.AssertHeld();
2390
  PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
Y
Yi Wu 已提交
2391
  purge_files_.insert({{number, std::move(file_info)}});
S
Siying Dong 已提交
2392 2393
}

2394 2395 2396 2397 2398
void DBImpl::BGWorkFlush(void* arg) {
  FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
  delete reinterpret_cast<FlushThreadArg*>(arg);

  IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
S
Siying Dong 已提交
2399
  TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2400
  static_cast_with_check<DBImpl>(fta.db_)->BackgroundCallFlush(fta.thread_pri_);
S
Siying Dong 已提交
2401 2402 2403 2404 2405 2406 2407 2408
  TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
}

void DBImpl::BGWorkCompaction(void* arg) {
  CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
  delete reinterpret_cast<CompactionArg*>(arg);
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
  TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2409 2410
  auto prepicked_compaction =
      static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2411
  static_cast_with_check<DBImpl>(ca.db)->BackgroundCallCompaction(
2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425
      prepicked_compaction, Env::Priority::LOW);
  delete prepicked_compaction;
}

void DBImpl::BGWorkBottomCompaction(void* arg) {
  CompactionArg ca = *(static_cast<CompactionArg*>(arg));
  delete static_cast<CompactionArg*>(arg);
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
  TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
  auto* prepicked_compaction = ca.prepicked_compaction;
  assert(prepicked_compaction && prepicked_compaction->compaction &&
         !prepicked_compaction->manual_compaction_state);
  ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
  delete prepicked_compaction;
S
Siying Dong 已提交
2426 2427 2428 2429 2430 2431 2432 2433 2434
}

void DBImpl::BGWorkPurge(void* db) {
  IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
  TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
  reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
  TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
}

2435
void DBImpl::UnscheduleCompactionCallback(void* arg) {
S
Siying Dong 已提交
2436 2437
  CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
  delete reinterpret_cast<CompactionArg*>(arg);
2438 2439 2440 2441 2442
  if (ca.prepicked_compaction != nullptr) {
    if (ca.prepicked_compaction->compaction != nullptr) {
      delete ca.prepicked_compaction->compaction;
    }
    delete ca.prepicked_compaction;
S
Siying Dong 已提交
2443
  }
2444 2445 2446 2447 2448 2449
  TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
}

void DBImpl::UnscheduleFlushCallback(void* arg) {
  delete reinterpret_cast<FlushThreadArg*>(arg);
  TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
S
Siying Dong 已提交
2450 2451 2452
}

Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
2453 2454
                               LogBuffer* log_buffer, FlushReason* reason,
                               Env::Priority thread_pri) {
S
Siying Dong 已提交
2455 2456
  mutex_.AssertHeld();

2457
  Status status;
2458 2459 2460
  *reason = FlushReason::kOthers;
  // If BG work is stopped due to an error, but a recovery is in progress,
  // that means this flush is part of the recovery. So allow it to go through
2461 2462 2463 2464
  if (!error_handler_.IsBGWorkStopped()) {
    if (shutting_down_.load(std::memory_order_acquire)) {
      status = Status::ShutdownInProgress();
    }
2465
  } else if (!error_handler_.IsRecoveryInProgress()) {
2466
    status = error_handler_.GetBGError();
S
Siying Dong 已提交
2467 2468 2469 2470 2471 2472
  }

  if (!status.ok()) {
    return status;
  }

2473 2474 2475
  autovector<BGFlushArg> bg_flush_args;
  std::vector<SuperVersionContext>& superversion_contexts =
      job_context->superversion_contexts;
2476
  autovector<ColumnFamilyData*> column_families_not_to_flush;
S
Siying Dong 已提交
2477 2478
  while (!flush_queue_.empty()) {
    // This cfd is already referenced
2479 2480 2481 2482 2483 2484 2485 2486
    const FlushRequest& flush_req = PopFirstFromFlushQueue();
    superversion_contexts.clear();
    superversion_contexts.reserve(flush_req.size());

    for (const auto& iter : flush_req) {
      ColumnFamilyData* cfd = iter.first;
      if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
        // can't flush this CF, try next one
2487
        column_families_not_to_flush.push_back(cfd);
2488
        continue;
S
Siying Dong 已提交
2489
      }
2490 2491 2492 2493 2494 2495
      superversion_contexts.emplace_back(SuperVersionContext(true));
      bg_flush_args.emplace_back(cfd, iter.second,
                                 &(superversion_contexts.back()));
    }
    if (!bg_flush_args.empty()) {
      break;
S
Siying Dong 已提交
2496 2497 2498
    }
  }

2499
  if (!bg_flush_args.empty()) {
2500
    auto bg_job_limits = GetBGJobLimits();
2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513
    for (const auto& arg : bg_flush_args) {
      ColumnFamilyData* cfd = arg.cfd_;
      ROCKS_LOG_BUFFER(
          log_buffer,
          "Calling FlushMemTableToOutputFile with column "
          "family [%s], flush slots available %d, compaction slots available "
          "%d, "
          "flush slots scheduled %d, compaction slots scheduled %d",
          cfd->GetName().c_str(), bg_job_limits.max_flushes,
          bg_job_limits.max_compactions, bg_flush_scheduled_,
          bg_compaction_scheduled_);
    }
    status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
2514
                                         job_context, log_buffer, thread_pri);
2515
    TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2516 2517 2518
    // All the CFDs in the FlushReq must have the same flush reason, so just
    // grab the first one
    *reason = bg_flush_args[0].cfd_->GetFlushReason();
2519 2520
    for (auto& arg : bg_flush_args) {
      ColumnFamilyData* cfd = arg.cfd_;
2521
      if (cfd->UnrefAndTryDelete()) {
2522 2523
        arg.cfd_ = nullptr;
      }
S
Siying Dong 已提交
2524 2525
    }
  }
2526
  for (auto cfd : column_families_not_to_flush) {
2527
    cfd->UnrefAndTryDelete();
2528
  }
S
Siying Dong 已提交
2529 2530 2531
  return status;
}

2532
void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
S
Siying Dong 已提交
2533 2534 2535 2536 2537 2538 2539 2540 2541
  bool made_progress = false;
  JobContext job_context(next_job_id_.fetch_add(1), true);

  TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");

  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
                       immutable_db_options_.info_log.get());
  {
    InstrumentedMutexLock l(&mutex_);
A
Andrew Kryczka 已提交
2542
    assert(bg_flush_scheduled_);
S
Siying Dong 已提交
2543 2544
    num_running_flushes_++;

2545 2546 2547
    std::unique_ptr<std::list<uint64_t>::iterator>
        pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
            CaptureCurrentFileNumberInPendingOutputs()));
2548
    FlushReason reason;
S
Siying Dong 已提交
2549

2550 2551
    Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
                               &reason, thread_pri);
2552
    if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
2553
        reason != FlushReason::kErrorRecovery) {
S
Siying Dong 已提交
2554 2555 2556 2557 2558
      // Wait a little bit before retrying background flush in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed flushes for the duration of
      // the problem.
      uint64_t error_cnt =
2559
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
S
Siying Dong 已提交
2560 2561 2562 2563 2564 2565 2566 2567
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      mutex_.Unlock();
      ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                      "Waiting after background flush error: %s"
                      "Accumulated background error counts: %" PRIu64,
                      s.ToString().c_str(), error_cnt);
      log_buffer.FlushBufferToLog();
      LogFlush(immutable_db_options_.info_log);
2568
      immutable_db_options_.clock->SleepForMicroseconds(1000000);
S
Siying Dong 已提交
2569 2570 2571
      mutex_.Lock();
    }

Y
Yanqin Jin 已提交
2572
    TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
S
Siying Dong 已提交
2573 2574 2575 2576
    ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

    // If flush failed, we want to delete all temporary files that we might have
    // created. Thus, we force full scan in FindObsoleteFiles()
2577 2578
    FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
                                        !s.IsColumnFamilyDropped());
S
Siying Dong 已提交
2579
    // delete unnecessary files if any, this is done outside the mutex
2580 2581
    if (job_context.HaveSomethingToClean() ||
        job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
S
Siying Dong 已提交
2582
      mutex_.Unlock();
2583
      TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
S
Siying Dong 已提交
2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595
      // Have to flush the info logs before bg_flush_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
      log_buffer.FlushBufferToLog();
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
      }
      job_context.Clean();
      mutex_.Lock();
    }
Y
Yi Wu 已提交
2596
    TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
S
Siying Dong 已提交
2597 2598 2599 2600 2601 2602

    assert(num_running_flushes_ > 0);
    num_running_flushes_--;
    bg_flush_scheduled_--;
    // See if there's more work to be done
    MaybeScheduleFlushOrCompaction();
2603
    atomic_flush_install_cv_.SignalAll();
S
Siying Dong 已提交
2604 2605 2606 2607 2608 2609 2610 2611
    bg_cv_.SignalAll();
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
  }
}

2612 2613
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
                                      Env::Priority bg_thread_pri) {
S
Siying Dong 已提交
2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627
  bool made_progress = false;
  JobContext job_context(next_job_id_.fetch_add(1), true);
  TEST_SYNC_POINT("BackgroundCallCompaction:0");
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
                       immutable_db_options_.info_log.get());
  {
    InstrumentedMutexLock l(&mutex_);

    // This call will unlock/lock the mutex to wait for current running
    // IngestExternalFile() calls to finish.
    WaitForIngestFile();

    num_running_compactions_++;

2628 2629 2630
    std::unique_ptr<std::list<uint64_t>::iterator>
        pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
            CaptureCurrentFileNumberInPendingOutputs()));
S
Siying Dong 已提交
2631

2632 2633 2634 2635
    assert((bg_thread_pri == Env::Priority::BOTTOM &&
            bg_bottom_compaction_scheduled_) ||
           (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
    Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
2636
                                    prepicked_compaction, bg_thread_pri);
S
Siying Dong 已提交
2637
    TEST_SYNC_POINT("BackgroundCallCompaction:1");
2638 2639 2640
    if (s.IsBusy()) {
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      mutex_.Unlock();
2641 2642
      immutable_db_options_.clock->SleepForMicroseconds(
          10000);  // prevent hot loop
2643
      mutex_.Lock();
2644
    } else if (!s.ok() && !s.IsShutdownInProgress() &&
2645
               !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
S
Siying Dong 已提交
2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659
      // Wait a little bit before retrying background compaction in
      // case this is an environmental problem and we do not want to
      // chew up resources for failed compactions for the duration of
      // the problem.
      uint64_t error_cnt =
          default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
      bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
      mutex_.Unlock();
      log_buffer.FlushBufferToLog();
      ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                      "Waiting after background compaction error: %s, "
                      "Accumulated background error counts: %" PRIu64,
                      s.ToString().c_str(), error_cnt);
      LogFlush(immutable_db_options_.info_log);
2660
      immutable_db_options_.clock->SleepForMicroseconds(1000000);
S
Siying Dong 已提交
2661
      mutex_.Lock();
2662
    } else if (s.IsManualCompactionPaused()) {
2663
      ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2664 2665
      assert(m);
      ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2666
                       m->cfd->GetName().c_str(), job_context.job_id);
S
Siying Dong 已提交
2667 2668 2669 2670 2671 2672 2673
    }

    ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

    // If compaction failed, we want to delete all temporary files that we might
    // have created (they might not be all recorded in job_context in case of a
    // failure). Thus, we force full scan in FindObsoleteFiles()
2674
    FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2675
                                        !s.IsManualCompactionPaused() &&
2676 2677
                                        !s.IsColumnFamilyDropped() &&
                                        !s.IsBusy());
2678
    TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
S
Siying Dong 已提交
2679 2680

    // delete unnecessary files if any, this is done outside the mutex
2681 2682
    if (job_context.HaveSomethingToClean() ||
        job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
S
Siying Dong 已提交
2683 2684 2685 2686 2687 2688 2689 2690 2691
      mutex_.Unlock();
      // Have to flush the info logs before bg_compaction_scheduled_--
      // because if bg_flush_scheduled_ becomes 0 and the lock is
      // released, the deconstructor of DB can kick in and destroy all the
      // states of DB so info_log might not be available after that point.
      // It also applies to access other states that DB owns.
      log_buffer.FlushBufferToLog();
      if (job_context.HaveSomethingToDelete()) {
        PurgeObsoleteFiles(job_context);
2692
        TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
S
Siying Dong 已提交
2693 2694 2695 2696 2697 2698 2699
      }
      job_context.Clean();
      mutex_.Lock();
    }

    assert(num_running_compactions_ > 0);
    num_running_compactions_--;
2700 2701 2702 2703 2704 2705
    if (bg_thread_pri == Env::Priority::LOW) {
      bg_compaction_scheduled_--;
    } else {
      assert(bg_thread_pri == Env::Priority::BOTTOM);
      bg_bottom_compaction_scheduled_--;
    }
S
Siying Dong 已提交
2706 2707 2708 2709 2710

    versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();

    // See if there's more work to be done
    MaybeScheduleFlushOrCompaction();
2711 2712 2713
    if (made_progress ||
        (bg_compaction_scheduled_ == 0 &&
         bg_bottom_compaction_scheduled_ == 0) ||
2714
        HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
S
Siying Dong 已提交
2715 2716
      // signal if
      // * made_progress -- need to wakeup DelayWrite
2717
      // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
S
Siying Dong 已提交
2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731
      // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
      // If none of this is true, there is no need to signal since nobody is
      // waiting for it
      bg_cv_.SignalAll();
    }
    // IMPORTANT: there should be no code after calling SignalAll. This call may
    // signal the DB destructor that it's OK to proceed with destruction. In
    // that case, all DB variables will be dealloacated and referencing them
    // will cause trouble.
  }
}

Status DBImpl::BackgroundCompaction(bool* made_progress,
                                    JobContext* job_context,
2732
                                    LogBuffer* log_buffer,
2733 2734
                                    PrepickedCompaction* prepicked_compaction,
                                    Env::Priority thread_pri) {
2735 2736 2737 2738
  ManualCompactionState* manual_compaction =
      prepicked_compaction == nullptr
          ? nullptr
          : prepicked_compaction->manual_compaction_state;
S
Siying Dong 已提交
2739 2740 2741 2742 2743
  *made_progress = false;
  mutex_.AssertHeld();
  TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");

  bool is_manual = (manual_compaction != nullptr);
2744
  std::unique_ptr<Compaction> c;
2745 2746 2747 2748 2749
  if (prepicked_compaction != nullptr &&
      prepicked_compaction->compaction != nullptr) {
    c.reset(prepicked_compaction->compaction);
  }
  bool is_prepicked = is_manual || c;
S
Siying Dong 已提交
2750 2751 2752 2753 2754 2755

  // (manual_compaction->in_progress == false);
  bool trivial_move_disallowed =
      is_manual && manual_compaction->disallow_trivial_move;

  CompactionJobStats compaction_job_stats;
2756 2757 2758 2759
  Status status;
  if (!error_handler_.IsBGWorkStopped()) {
    if (shutting_down_.load(std::memory_order_acquire)) {
      status = Status::ShutdownInProgress();
2760
    } else if (is_manual &&
2761
               manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
2762
      status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2763 2764 2765
    }
  } else {
    status = error_handler_.GetBGError();
2766 2767 2768 2769 2770
    // If we get here, it means a hard error happened after this compaction
    // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
    // a chance to execute. Since we didn't pop a cfd from the compaction
    // queue, increment unscheduled_compactions_
    unscheduled_compactions_++;
S
Siying Dong 已提交
2771 2772 2773 2774 2775 2776 2777 2778 2779
  }

  if (!status.ok()) {
    if (is_manual) {
      manual_compaction->status = status;
      manual_compaction->done = true;
      manual_compaction->in_progress = false;
      manual_compaction = nullptr;
    }
2780 2781 2782 2783
    if (c) {
      c->ReleaseCompactionFiles(status);
      c.reset();
    }
S
Siying Dong 已提交
2784 2785 2786 2787 2788 2789 2790 2791
    return status;
  }

  if (is_manual) {
    // another thread cannot pick up the same work
    manual_compaction->in_progress = true;
  }

2792 2793
  std::unique_ptr<TaskLimiterToken> task_token;

S
Siying Dong 已提交
2794 2795
  // InternalKey manual_end_storage;
  // InternalKey* manual_end = &manual_end_storage;
2796
  bool sfm_reserved_compact_space = false;
S
Siying Dong 已提交
2797
  if (is_manual) {
2798
    ManualCompactionState* m = manual_compaction;
S
Siying Dong 已提交
2799 2800 2801 2802
    assert(m->in_progress);
    if (!c) {
      m->done = true;
      m->manual_end = nullptr;
2803 2804 2805 2806 2807 2808 2809
      ROCKS_LOG_BUFFER(
          log_buffer,
          "[%s] Manual compaction from level-%d from %s .. "
          "%s; nothing to do\n",
          m->cfd->GetName().c_str(), m->input_level,
          (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
          (m->end ? m->end->DebugString(true).c_str() : "(end)"));
S
Siying Dong 已提交
2810
    } else {
2811 2812
      // First check if we have enough room to do the compaction
      bool enough_room = EnoughRoomForCompaction(
2813
          m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827

      if (!enough_room) {
        // Then don't do the compaction
        c->ReleaseCompactionFiles(status);
        c.reset();
        // m's vars will get set properly at the end of this function,
        // as long as status == CompactionTooLarge
        status = Status::CompactionTooLarge();
      } else {
        ROCKS_LOG_BUFFER(
            log_buffer,
            "[%s] Manual compaction from level-%d to level-%d from %s .. "
            "%s; will stop at %s\n",
            m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2828 2829
            (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
            (m->end ? m->end->DebugString(true).c_str() : "(end)"),
2830 2831
            ((m->done || m->manual_end == nullptr)
                 ? "(end)"
2832
                 : m->manual_end->DebugString(true).c_str()));
2833
      }
S
Siying Dong 已提交
2834
    }
2835
  } else if (!is_prepicked && !compaction_queue_.empty()) {
2836
    if (HasExclusiveManualCompaction()) {
2837 2838 2839
      // Can't compact right now, but try again later
      TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");

Y
yizhu.sun 已提交
2840
      // Stay in the compaction queue.
2841 2842 2843 2844 2845
      unscheduled_compactions_++;

      return Status::OK();
    }

2846 2847 2848 2849 2850 2851 2852 2853
    auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
    if (cfd == nullptr) {
      // Can't find any executable task from the compaction queue.
      // All tasks have been throttled by compaction thread limiter.
      ++unscheduled_compactions_;
      return Status::Busy();
    }

S
Siying Dong 已提交
2854 2855 2856 2857 2858
    // We unreference here because the following code will take a Ref() on
    // this cfd if it is going to use it (Compaction class holds a
    // reference).
    // This will all happen under a mutex so we don't have to be afraid of
    // somebody else deleting it.
2859
    if (cfd->UnrefAndTryDelete()) {
S
Siying Dong 已提交
2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875
      // This was the last reference of the column family, so no need to
      // compact.
      return Status::OK();
    }

    // Pick up latest mutable CF Options and use it throughout the
    // compaction job
    // Compaction makes a copy of the latest MutableCFOptions. It should be used
    // throughout the compaction procedure to make sure consistency. It will
    // eventually be installed into SuperVersion
    auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
    if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
      // NOTE: try to avoid unnecessary copy of MutableCFOptions if
      // compaction is not necessary. Need to make sure mutex is held
      // until we make a copy in the following code
      TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2876 2877
      c.reset(cfd->PickCompaction(*mutable_cf_options, mutable_db_options_,
                                  log_buffer));
S
Siying Dong 已提交
2878
      TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2879

S
Siying Dong 已提交
2880
      if (c != nullptr) {
2881
        bool enough_room = EnoughRoomForCompaction(
2882
            cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2883

2884 2885 2886 2887 2888 2889 2890 2891
        if (!enough_room) {
          // Then don't do the compaction
          c->ReleaseCompactionFiles(status);
          c->column_family_data()
              ->current()
              ->storage_info()
              ->ComputeCompactionScore(*(c->immutable_cf_options()),
                                       *(c->mutable_cf_options()));
S
Siying Dong 已提交
2892 2893
          AddToCompactionQueue(cfd);
          ++unscheduled_compactions_;
2894 2895 2896 2897 2898 2899 2900

          c.reset();
          // Don't need to sleep here, because BackgroundCallCompaction
          // will sleep if !s.ok()
          status = Status::CompactionTooLarge();
        } else {
          // update statistics
S
Siying Dong 已提交
2901 2902
          RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
                            c->inputs(0)->size());
2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920
          // There are three things that can change compaction score:
          // 1) When flush or compaction finish. This case is covered by
          // InstallSuperVersionAndScheduleWork
          // 2) When MutableCFOptions changes. This case is also covered by
          // InstallSuperVersionAndScheduleWork, because this is when the new
          // options take effect.
          // 3) When we Pick a new compaction, we "remove" those files being
          // compacted from the calculation, which then influences compaction
          // score. Here we check if we need the new compaction even without the
          // files that are currently being compacted. If we need another
          // compaction, we might be able to execute it in parallel, so we add
          // it to the queue and schedule a new thread.
          if (cfd->NeedsCompaction()) {
            // Yes, we need more compactions!
            AddToCompactionQueue(cfd);
            ++unscheduled_compactions_;
            MaybeScheduleFlushOrCompaction();
          }
S
Siying Dong 已提交
2921 2922 2923 2924 2925
        }
      }
    }
  }

2926
  IOStatus io_s;
S
Siying Dong 已提交
2927 2928 2929 2930 2931 2932
  if (!c) {
    // Nothing to do
    ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
  } else if (c->deletion_compaction()) {
    // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
    // file if there is alive snapshot pointing to it
2933 2934
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
                             c->column_family_data());
S
Siying Dong 已提交
2935 2936 2937 2938 2939 2940 2941
    assert(c->num_input_files(1) == 0);
    assert(c->level() == 0);
    assert(c->column_family_data()->ioptions()->compaction_style ==
           kCompactionStyleFIFO);

    compaction_job_stats.num_input_files = c->num_input_files(0);

P
Peter Pei 已提交
2942 2943 2944
    NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
                            compaction_job_stats, job_context->job_id);

S
Siying Dong 已提交
2945 2946 2947 2948 2949 2950
    for (const auto& f : *c->inputs(0)) {
      c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
    }
    status = versions_->LogAndApply(c->column_family_data(),
                                    *c->mutable_cf_options(), c->edit(),
                                    &mutex_, directories_.GetDbDir());
2951
    io_s = versions_->io_status();
Y
Yanqin Jin 已提交
2952 2953 2954
    InstallSuperVersionAndScheduleWork(c->column_family_data(),
                                       &job_context->superversion_contexts[0],
                                       *c->mutable_cf_options());
S
Siying Dong 已提交
2955 2956 2957 2958
    ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
                     c->column_family_data()->GetName().c_str(),
                     c->num_input_files(0));
    *made_progress = true;
2959 2960
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
                             c->column_family_data());
S
Siying Dong 已提交
2961 2962
  } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
2963 2964
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
                             c->column_family_data());
S
Siying Dong 已提交
2965 2966 2967 2968 2969 2970 2971 2972 2973
    // Instrument for event update
    // TODO(yhchiang): add op details for showing trivial-move.
    ThreadStatusUtil::SetColumnFamily(
        c->column_family_data(), c->column_family_data()->ioptions()->env,
        immutable_db_options_.enable_thread_tracking);
    ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);

    compaction_job_stats.num_input_files = c->num_input_files(0);

P
Peter Pei 已提交
2974 2975 2976
    NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
                            compaction_job_stats, job_context->job_id);

S
Siying Dong 已提交
2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988
    // Move files to next level
    int32_t moved_files = 0;
    int64_t moved_bytes = 0;
    for (unsigned int l = 0; l < c->num_input_levels(); l++) {
      if (c->level(l) == c->output_level()) {
        continue;
      }
      for (size_t i = 0; i < c->num_input_files(l); i++) {
        FileMetaData* f = c->input(l, i);
        c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
        c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
                           f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
2989
                           f->largest, f->fd.smallest_seqno,
2990
                           f->fd.largest_seqno, f->marked_for_compaction,
2991
                           f->oldest_blob_file_number, f->oldest_ancester_time,
2992 2993
                           f->file_creation_time, f->file_checksum,
                           f->file_checksum_func_name);
S
Siying Dong 已提交
2994

2995 2996 2997 2998 2999
        ROCKS_LOG_BUFFER(
            log_buffer,
            "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
            c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
            c->output_level(), f->fd.GetFileSize());
S
Siying Dong 已提交
3000 3001 3002 3003 3004 3005 3006 3007
        ++moved_files;
        moved_bytes += f->fd.GetFileSize();
      }
    }

    status = versions_->LogAndApply(c->column_family_data(),
                                    *c->mutable_cf_options(), c->edit(),
                                    &mutex_, directories_.GetDbDir());
3008
    io_s = versions_->io_status();
S
Siying Dong 已提交
3009
    // Use latest MutableCFOptions
Y
Yanqin Jin 已提交
3010 3011 3012
    InstallSuperVersionAndScheduleWork(c->column_family_data(),
                                       &job_context->superversion_contexts[0],
                                       *c->mutable_cf_options());
S
Siying Dong 已提交
3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033

    VersionStorageInfo::LevelSummaryStorage tmp;
    c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
                                                             moved_bytes);
    {
      event_logger_.LogToBuffer(log_buffer)
          << "job" << job_context->job_id << "event"
          << "trivial_move"
          << "destination_level" << c->output_level() << "files" << moved_files
          << "total_files_size" << moved_bytes;
    }
    ROCKS_LOG_BUFFER(
        log_buffer,
        "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
        c->column_family_data()->GetName().c_str(), moved_files,
        c->output_level(), moved_bytes, status.ToString().c_str(),
        c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
    *made_progress = true;

    // Clear Instrument
    ThreadStatusUtil::ResetThreadStatus();
3034 3035
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
                             c->column_family_data());
3036
  } else if (!is_prepicked && c->output_level() > 0 &&
3037 3038 3039 3040 3041 3042 3043
             c->output_level() ==
                 c->column_family_data()
                     ->current()
                     ->storage_info()
                     ->MaxOutputLevel(
                         immutable_db_options_.allow_ingest_behind) &&
             env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
3044 3045 3046
    // Forward compactions involving last level to the bottom pool if it exists,
    // such that compactions unlikely to contribute to write stalls can be
    // delayed or deprioritized.
3047 3048 3049 3050 3051 3052
    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
    CompactionArg* ca = new CompactionArg;
    ca->db = this;
    ca->prepicked_compaction = new PrepickedCompaction;
    ca->prepicked_compaction->compaction = c.release();
    ca->prepicked_compaction->manual_compaction_state = nullptr;
3053 3054
    // Transfer requested token, so it doesn't need to do it again.
    ca->prepicked_compaction->task_token = std::move(task_token);
3055 3056
    ++bg_bottom_compaction_scheduled_;
    env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
3057
                   this, &DBImpl::UnscheduleCompactionCallback);
S
Siying Dong 已提交
3058
  } else {
3059 3060
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
                             c->column_family_data());
3061
    int output_level __attribute__((__unused__));
3062
    output_level = c->output_level();
S
Siying Dong 已提交
3063 3064
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
                             &output_level);
3065
    std::vector<SequenceNumber> snapshot_seqs;
S
Siying Dong 已提交
3066
    SequenceNumber earliest_write_conflict_snapshot;
3067 3068 3069
    SnapshotChecker* snapshot_checker;
    GetSnapshotContext(job_context, &snapshot_seqs,
                       &earliest_write_conflict_snapshot, &snapshot_checker);
S
Siying Dong 已提交
3070 3071
    assert(is_snapshot_supported_ || snapshots_.empty());
    CompactionJob compaction_job(
3072
        job_context->job_id, c.get(), immutable_db_options_,
3073
        file_options_for_compaction_, versions_.get(), &shutting_down_,
3074
        preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
3075 3076 3077 3078 3079
        GetDataDir(c->column_family_data(), c->output_path_id()),
        GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
        &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
        snapshot_checker, table_cache_, &event_logger_,
        c->mutable_cf_options()->paranoid_file_checks,
S
Siying Dong 已提交
3080
        c->mutable_cf_options()->report_bg_io_stats, dbname_,
3081
        &compaction_job_stats, thread_pri, io_tracer_,
3082
        is_manual ? &manual_compaction_paused_ : nullptr, db_id_,
3083
        db_session_id_, c->column_family_data()->GetFullHistoryTsLow());
S
Siying Dong 已提交
3084 3085
    compaction_job.Prepare();

P
Peter Pei 已提交
3086 3087
    NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
                            compaction_job_stats, job_context->job_id);
S
Siying Dong 已提交
3088
    mutex_.Unlock();
3089 3090
    TEST_SYNC_POINT_CALLBACK(
        "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
3091 3092
    // Should handle erorr?
    compaction_job.Run().PermitUncheckedError();
S
Siying Dong 已提交
3093 3094 3095 3096
    TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
    mutex_.Lock();

    status = compaction_job.Install(*c->mutable_cf_options());
3097
    io_s = compaction_job.io_status();
S
Siying Dong 已提交
3098
    if (status.ok()) {
Y
Yanqin Jin 已提交
3099 3100 3101
      InstallSuperVersionAndScheduleWork(c->column_family_data(),
                                         &job_context->superversion_contexts[0],
                                         *c->mutable_cf_options());
S
Siying Dong 已提交
3102 3103
    }
    *made_progress = true;
3104 3105
    TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
                             c->column_family_data());
S
Siying Dong 已提交
3106
  }
3107 3108 3109

  if (status.ok() && !io_s.ok()) {
    status = io_s;
3110 3111
  } else {
    io_s.PermitUncheckedError();
3112 3113
  }

S
Siying Dong 已提交
3114 3115 3116
  if (c != nullptr) {
    c->ReleaseCompactionFiles(status);
    *made_progress = true;
3117 3118 3119 3120 3121

#ifndef ROCKSDB_LITE
    // Need to make sure SstFileManager does its bookkeeping
    auto sfm = static_cast<SstFileManagerImpl*>(
        immutable_db_options_.sst_file_manager.get());
3122
    if (sfm && sfm_reserved_compact_space) {
3123 3124 3125 3126
      sfm->OnCompactionCompletion(c.get());
    }
#endif  // ROCKSDB_LITE

3127 3128
    NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
                                compaction_job_stats, job_context->job_id);
S
Siying Dong 已提交
3129 3130
  }

3131 3132
  if (status.ok() || status.IsCompactionTooLarge() ||
      status.IsManualCompactionPaused()) {
S
Siying Dong 已提交
3133
    // Done
3134
  } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
S
Siying Dong 已提交
3135 3136 3137 3138
    // Ignore compaction errors found during shutting down
  } else {
    ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
                   status.ToString().c_str());
3139
    if (!io_s.ok()) {
3140 3141 3142 3143 3144 3145 3146 3147
      // Error while writing to MANIFEST.
      // In fact, versions_->io_status() can also be the result of renaming
      // CURRENT file. With current code, it's just difficult to tell. So just
      // be pessimistic and try write to a new MANIFEST.
      // TODO: distinguish between MANIFEST write and CURRENT renaming
      auto err_reason = versions_->io_status().ok()
                            ? BackgroundErrorReason::kCompaction
                            : BackgroundErrorReason::kManifestWrite;
3148
      error_handler_.SetBGError(io_s, err_reason);
3149
    } else {
3150
      error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
3151
    }
3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168
    if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
      // Put this cfd back in the compaction queue so we can retry after some
      // time
      auto cfd = c->column_family_data();
      assert(cfd != nullptr);
      // Since this compaction failed, we need to recompute the score so it
      // takes the original input files into account
      c->column_family_data()
          ->current()
          ->storage_info()
          ->ComputeCompactionScore(*(c->immutable_cf_options()),
                                   *(c->mutable_cf_options()));
      if (!cfd->queued_for_compaction()) {
        AddToCompactionQueue(cfd);
        ++unscheduled_compactions_;
      }
    }
S
Siying Dong 已提交
3169
  }
3170 3171
  // this will unref its input_version and column_family_data
  c.reset();
S
Siying Dong 已提交
3172 3173

  if (is_manual) {
3174
    ManualCompactionState* m = manual_compaction;
S
Siying Dong 已提交
3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206
    if (!status.ok()) {
      m->status = status;
      m->done = true;
    }
    // For universal compaction:
    //   Because universal compaction always happens at level 0, so one
    //   compaction will pick up all overlapped files. No files will be
    //   filtered out due to size limit and left for a successive compaction.
    //   So we can safely conclude the current compaction.
    //
    //   Also note that, if we don't stop here, then the current compaction
    //   writes a new file back to level 0, which will be used in successive
    //   compaction. Hence the manual compaction will never finish.
    //
    // Stop the compaction if manual_end points to nullptr -- this means
    // that we compacted the whole range. manual_end should always point
    // to nullptr in case of universal compaction
    if (m->manual_end == nullptr) {
      m->done = true;
    }
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
      // Universal and FIFO compactions should always compact the whole range
      assert(m->cfd->ioptions()->compaction_style !=
                 kCompactionStyleUniversal ||
             m->cfd->ioptions()->num_levels > 1);
      assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
      m->tmp_storage = *m->manual_end;
      m->begin = &m->tmp_storage;
      m->incomplete = true;
    }
3207
    m->in_progress = false;  // not being processed anymore
S
Siying Dong 已提交
3208 3209 3210 3211 3212 3213 3214 3215 3216
  }
  TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
  return status;
}

bool DBImpl::HasPendingManualCompaction() {
  return (!manual_compaction_dequeue_.empty());
}

3217
void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
S
Siying Dong 已提交
3218 3219 3220
  manual_compaction_dequeue_.push_back(m);
}

3221
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
S
Siying Dong 已提交
3222
  // Remove from queue
3223
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
3224 3225 3226 3227 3228 3229
      manual_compaction_dequeue_.begin();
  while (it != manual_compaction_dequeue_.end()) {
    if (m == (*it)) {
      it = manual_compaction_dequeue_.erase(it);
      return;
    }
3230
    ++it;
S
Siying Dong 已提交
3231 3232 3233 3234 3235
  }
  assert(false);
  return;
}

3236
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
S
Siying Dong 已提交
3237 3238 3239 3240 3241 3242
  if (num_running_ingest_file_ > 0) {
    // We need to wait for other IngestExternalFile() calls to finish
    // before running a manual compaction.
    return true;
  }
  if (m->exclusive) {
3243 3244
    return (bg_bottom_compaction_scheduled_ > 0 ||
            bg_compaction_scheduled_ > 0);
S
Siying Dong 已提交
3245
  }
3246
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
3247 3248 3249 3250
      manual_compaction_dequeue_.begin();
  bool seen = false;
  while (it != manual_compaction_dequeue_.end()) {
    if (m == (*it)) {
3251
      ++it;
S
Siying Dong 已提交
3252 3253 3254 3255 3256 3257 3258 3259
      seen = true;
      continue;
    } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
      // Consider the other manual compaction *it, conflicts if:
      // overlaps with m
      // and (*it) is ahead in the queue and is not yet in progress
      return true;
    }
3260
    ++it;
S
Siying Dong 已提交
3261 3262 3263 3264 3265 3266
  }
  return false;
}

bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
  // Remove from priority queue
3267
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
3268 3269 3270 3271 3272 3273 3274
      manual_compaction_dequeue_.begin();
  while (it != manual_compaction_dequeue_.end()) {
    if ((*it)->exclusive) {
      return true;
    }
    if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
      // Allow automatic compaction if manual compaction is
H
hyunwoo 已提交
3275
      // in progress
S
Siying Dong 已提交
3276 3277
      return true;
    }
3278
    ++it;
S
Siying Dong 已提交
3279 3280 3281 3282 3283 3284
  }
  return false;
}

bool DBImpl::HasExclusiveManualCompaction() {
  // Remove from priority queue
3285
  std::deque<ManualCompactionState*>::iterator it =
S
Siying Dong 已提交
3286 3287 3288 3289 3290
      manual_compaction_dequeue_.begin();
  while (it != manual_compaction_dequeue_.end()) {
    if ((*it)->exclusive) {
      return true;
    }
3291
    ++it;
S
Siying Dong 已提交
3292 3293 3294 3295
  }
  return false;
}

3296
bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
S
Siying Dong 已提交
3297 3298 3299 3300 3301 3302 3303 3304 3305
  if ((m->exclusive) || (m1->exclusive)) {
    return true;
  }
  if (m->cfd != m1->cfd) {
    return false;
  }
  return true;
}

3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324
#ifndef ROCKSDB_LITE
void DBImpl::BuildCompactionJobInfo(
    const ColumnFamilyData* cfd, Compaction* c, const Status& st,
    const CompactionJobStats& compaction_job_stats, const int job_id,
    const Version* current, CompactionJobInfo* compaction_job_info) const {
  assert(compaction_job_info != nullptr);
  compaction_job_info->cf_id = cfd->GetID();
  compaction_job_info->cf_name = cfd->GetName();
  compaction_job_info->status = st;
  compaction_job_info->thread_id = env_->GetThreadID();
  compaction_job_info->job_id = job_id;
  compaction_job_info->base_input_level = c->start_level();
  compaction_job_info->output_level = c->output_level();
  compaction_job_info->stats = compaction_job_stats;
  compaction_job_info->table_properties = c->GetOutputTableProperties();
  compaction_job_info->compaction_reason = c->compaction_reason();
  compaction_job_info->compression = c->output_compression();
  for (size_t i = 0; i < c->num_input_levels(); ++i) {
    for (const auto fmd : *c->inputs(i)) {
3325 3326 3327 3328
      const FileDescriptor& desc = fmd->fd;
      const uint64_t file_number = desc.GetNumber();
      auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
                              desc.GetPathId());
3329
      compaction_job_info->input_files.push_back(fn);
3330 3331
      compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
          static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
3332
      if (compaction_job_info->table_properties.count(fn) == 0) {
3333
        std::shared_ptr<const TableProperties> tp;
3334 3335 3336 3337 3338 3339 3340 3341
        auto s = current->GetTableProperties(&tp, fmd, &fn);
        if (s.ok()) {
          compaction_job_info->table_properties[fn] = tp;
        }
      }
    }
  }
  for (const auto& newf : c->edit()->GetNewFiles()) {
3342 3343 3344 3345 3346 3347 3348
    const FileMetaData& meta = newf.second;
    const FileDescriptor& desc = meta.fd;
    const uint64_t file_number = desc.GetNumber();
    compaction_job_info->output_files.push_back(TableFileName(
        c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
    compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
        newf.first, file_number, meta.oldest_blob_file_number});
3349 3350 3351 3352
  }
}
#endif

3353
// SuperVersionContext gets created and destructed outside of the lock --
Y
Yanqin Jin 已提交
3354
// we use this conveniently to:
S
Siying Dong 已提交
3355 3356 3357 3358
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete SuperVersion()s outside of the lock -- superversions_to_free
//
// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3359
// same sv_context, we can't reuse the SuperVersion() that got
S
Siying Dong 已提交
3360 3361 3362 3363 3364
// malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free

3365 3366
void DBImpl::InstallSuperVersionAndScheduleWork(
    ColumnFamilyData* cfd, SuperVersionContext* sv_context,
Y
Yanqin Jin 已提交
3367
    const MutableCFOptions& mutable_cf_options) {
S
Siying Dong 已提交
3368 3369 3370 3371 3372 3373 3374 3375 3376 3377
  mutex_.AssertHeld();

  // Update max_total_in_memory_state_
  size_t old_memtable_size = 0;
  auto* old_sv = cfd->GetSuperVersion();
  if (old_sv) {
    old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
                        old_sv->mutable_cf_options.max_write_buffer_number;
  }

3378 3379
  // this branch is unlikely to step in
  if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3380 3381
    sv_context->NewSuperVersion();
  }
3382
  cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
S
Siying Dong 已提交
3383

3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394
  // There may be a small data race here. The snapshot tricking bottommost
  // compaction may already be released here. But assuming there will always be
  // newer snapshot created and released frequently, the compaction will be
  // triggered soon anyway.
  bottommost_files_mark_threshold_ = kMaxSequenceNumber;
  for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
    bottommost_files_mark_threshold_ = std::min(
        bottommost_files_mark_threshold_,
        my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
  }

S
Siying Dong 已提交
3395 3396 3397 3398 3399 3400
  // Whenever we install new SuperVersion, we might need to issue new flushes or
  // compactions.
  SchedulePendingCompaction(cfd);
  MaybeScheduleFlushOrCompaction();

  // Update max_total_in_memory_state_
3401 3402 3403
  max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
                               mutable_cf_options.write_buffer_size *
                                   mutable_cf_options.max_write_buffer_number;
S
Siying Dong 已提交
3404
}
Y
Yi Wu 已提交
3405

3406
// ShouldPurge is called by FindObsoleteFiles when doing a full scan,
Y
Yi Wu 已提交
3407
// and db mutex (mutex_) should already be held.
3408 3409 3410 3411
// Actually, the current implementation of FindObsoleteFiles with
// full_scan=true can issue I/O requests to obtain list of files in
// directories, e.g. env_->getChildren while holding db mutex.
bool DBImpl::ShouldPurge(uint64_t file_number) const {
Y
Yi Wu 已提交
3412 3413 3414
  return files_grabbed_for_purge_.find(file_number) ==
             files_grabbed_for_purge_.end() &&
         purge_files_.find(file_number) == purge_files_.end();
3415 3416 3417 3418 3419
}

// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
// (mutex_) should already be held.
void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
Y
Yi Wu 已提交
3420
  files_grabbed_for_purge_.insert(file_number);
3421 3422
}

Y
Yi Wu 已提交
3423 3424 3425 3426 3427 3428 3429 3430
void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
  InstrumentedMutexLock l(&mutex_);
  // snapshot_checker_ should only set once. If we need to set it multiple
  // times, we need to make sure the old one is not deleted while it is still
  // using by a compaction job.
  assert(!snapshot_checker_);
  snapshot_checker_.reset(snapshot_checker);
}
3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457

void DBImpl::GetSnapshotContext(
    JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
    SequenceNumber* earliest_write_conflict_snapshot,
    SnapshotChecker** snapshot_checker_ptr) {
  mutex_.AssertHeld();
  assert(job_context != nullptr);
  assert(snapshot_seqs != nullptr);
  assert(earliest_write_conflict_snapshot != nullptr);
  assert(snapshot_checker_ptr != nullptr);

  *snapshot_checker_ptr = snapshot_checker_.get();
  if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
    *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
  }
  if (*snapshot_checker_ptr != nullptr) {
    // If snapshot_checker is used, that means the flush/compaction may
    // contain values not visible to snapshot taken after
    // flush/compaction job starts. Take a snapshot and it will appear
    // in snapshot_seqs and force compaction iterator to consider such
    // snapshots.
    const Snapshot* job_snapshot =
        GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
    job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
  }
  *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
}
3458
}  // namespace ROCKSDB_NAMESPACE