compaction_job.cc 41.5 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/compaction_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
#include <algorithm>
#include <vector>
#include <memory>
#include <list>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_helper.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
53
#include "util/thread_status_util.h"
I
Igor Canadi 已提交
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83

namespace rocksdb {

struct CompactionJob::CompactionState {
  Compaction* const compaction;

  // If there were two snapshots with seq numbers s1 and
  // s2 and s1 < s2, and if we find two instances of a key k1 then lies
  // entirely within s1 and s2, then the earlier version of k1 can be safely
  // deleted because that version is not visible in any snapshot.
  std::vector<SequenceNumber> existing_snapshots;

  // Files produced by compaction
  struct Output {
    uint64_t number;
    uint32_t path_id;
    uint64_t file_size;
    InternalKey smallest, largest;
    SequenceNumber smallest_seqno, largest_seqno;
  };
  std::vector<Output> outputs;

  // State kept for output being generated
  std::unique_ptr<WritableFile> outfile;
  std::unique_ptr<TableBuilder> builder;

  uint64_t total_bytes;

  Output* current_output() { return &outputs[outputs.size() - 1]; }

S
sdong 已提交
84 85 86 87 88
  explicit CompactionState(Compaction* c)
      : compaction(c),
        total_bytes(0),
        num_input_records(0),
        num_output_records(0) {}
I
Igor Canadi 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123

  // Create a client visible context of this compaction
  CompactionFilter::Context GetFilterContextV1() {
    CompactionFilter::Context context;
    context.is_full_compaction = compaction->IsFullCompaction();
    context.is_manual_compaction = compaction->IsManualCompaction();
    return context;
  }

  // Create a client visible context of this compaction
  CompactionFilterContext GetFilterContext() {
    CompactionFilterContext context;
    context.is_full_compaction = compaction->IsFullCompaction();
    context.is_manual_compaction = compaction->IsManualCompaction();
    return context;
  }

  std::vector<std::string> key_str_buf_;
  std::vector<std::string> existing_value_str_buf_;
  // new_value_buf_ will only be appended if a value changes
  std::vector<std::string> new_value_buf_;
  // if values_changed_buf_[i] is true
  // new_value_buf_ will add a new entry with the changed value
  std::vector<bool> value_changed_buf_;
  // to_delete_buf_[i] is true iff key_buf_[i] is deleted
  std::vector<bool> to_delete_buf_;

  std::vector<std::string> other_key_str_buf_;
  std::vector<std::string> other_value_str_buf_;

  std::vector<Slice> combined_key_buf_;
  std::vector<Slice> combined_value_buf_;

  std::string cur_prefix_;

S
sdong 已提交
124 125 126
  uint64_t num_input_records;
  uint64_t num_output_records;

I
Igor Canadi 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
  // Buffers the kv-pair that will be run through compaction filter V2
  // in the future.
  void BufferKeyValueSlices(const Slice& key, const Slice& value) {
    key_str_buf_.emplace_back(key.ToString());
    existing_value_str_buf_.emplace_back(value.ToString());
  }

  // Buffers the kv-pair that will not be run through compaction filter V2
  // in the future.
  void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
    other_key_str_buf_.emplace_back(key.ToString());
    other_value_str_buf_.emplace_back(value.ToString());
  }

  // Add a kv-pair to the combined buffer
  void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
    // The real strings are stored in the batch buffers
    combined_key_buf_.emplace_back(key);
    combined_value_buf_.emplace_back(value);
  }

  // Merging the two buffers
  void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
    size_t i = 0;
    size_t j = 0;
    size_t total_size = key_str_buf_.size() + other_key_str_buf_.size();
    combined_key_buf_.reserve(total_size);
    combined_value_buf_.reserve(total_size);

    while (i + j < total_size) {
      int comp_res = 0;
      if (i < key_str_buf_.size() && j < other_key_str_buf_.size()) {
        comp_res = comparator->Compare(key_str_buf_[i], other_key_str_buf_[j]);
      } else if (i >= key_str_buf_.size() && j < other_key_str_buf_.size()) {
        comp_res = 1;
      } else if (j >= other_key_str_buf_.size() && i < key_str_buf_.size()) {
        comp_res = -1;
      }
      if (comp_res > 0) {
        AddToCombinedKeyValueSlices(other_key_str_buf_[j],
                                    other_value_str_buf_[j]);
        j++;
      } else if (comp_res < 0) {
        AddToCombinedKeyValueSlices(key_str_buf_[i],
                                    existing_value_str_buf_[i]);
        i++;
      }
    }
  }

  void CleanupBatchBuffer() {
    to_delete_buf_.clear();
    key_str_buf_.clear();
    existing_value_str_buf_.clear();
    new_value_buf_.clear();
    value_changed_buf_.clear();

    to_delete_buf_.shrink_to_fit();
    key_str_buf_.shrink_to_fit();
    existing_value_str_buf_.shrink_to_fit();
    new_value_buf_.shrink_to_fit();
    value_changed_buf_.shrink_to_fit();

    other_key_str_buf_.clear();
    other_value_str_buf_.clear();
    other_key_str_buf_.shrink_to_fit();
    other_value_str_buf_.shrink_to_fit();
  }

  void CleanupMergedBuffer() {
    combined_key_buf_.clear();
    combined_value_buf_.clear();
    combined_key_buf_.shrink_to_fit();
    combined_value_buf_.shrink_to_fit();
  }
};

CompactionJob::CompactionJob(
205
    int job_id, Compaction* compaction, const DBOptions& db_options,
I
Igor Canadi 已提交
206
    const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
207
    VersionSet* versions, std::atomic<bool>* shutting_down,
208 209
    LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory,
    Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported,
210
    std::shared_ptr<Cache> table_cache,
I
Igor Canadi 已提交
211
    std::function<uint64_t()> yield_callback)
212 213
    : job_id_(job_id),
      compact_(new CompactionState(compaction)),
I
Igor Canadi 已提交
214 215 216 217 218 219 220 221 222
      compaction_stats_(1),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      env_(db_options.env),
      versions_(versions),
      shutting_down_(shutting_down),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
223
      output_directory_(output_directory),
I
Igor Canadi 已提交
224 225 226 227 228 229 230 231 232 233
      stats_(stats),
      snapshots_(snapshots),
      is_snapshot_supported_(is_snapshot_supported),
      table_cache_(std::move(table_cache)),
      yield_callback_(std::move(yield_callback)) {}

void CompactionJob::Prepare() {
  compact_->CleanupBatchBuffer();
  compact_->CleanupMergedBuffer();

234 235
  auto* compaction = compact_->compaction;

I
Igor Canadi 已提交
236
  // Generate file_levels_ for compaction berfore making Iterator
237
  compaction->GenerateFileLevels();
I
Igor Canadi 已提交
238
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
I
Igor Canadi 已提交
239
  assert(cfd != nullptr);
240 241 242 243 244 245 246
  {
    Compaction::InputLevelSummaryBuffer inputs_summary;
    LogToBuffer(log_buffer_, "[%s] [JOB %d] Compacting %s, score %.2f",
                cfd->GetName().c_str(), job_id_,
                compaction->InputLevelSummary(&inputs_summary),
                compaction->score());
  }
I
Igor Canadi 已提交
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
  char scratch[2345];
  compact_->compaction->Summary(scratch, sizeof(scratch));
  LogToBuffer(log_buffer_, "[%s] Compaction start summary: %s\n",
              cfd->GetName().c_str(), scratch);

  assert(cfd->current()->storage_info()->NumLevelFiles(
             compact_->compaction->level()) > 0);
  assert(compact_->builder == nullptr);
  assert(!compact_->outfile);

  visible_at_tip_ = 0;
  latest_snapshot_ = 0;
  // TODO(icanadi) move snapshots_ out of CompactionJob
  snapshots_->getAll(compact_->existing_snapshots);
  if (compact_->existing_snapshots.size() == 0) {
    // optimize for fast path if there are no snapshots
    visible_at_tip_ = versions_->LastSequence();
    earliest_snapshot_ = visible_at_tip_;
  } else {
    latest_snapshot_ = compact_->existing_snapshots.back();
    // Add the current seqno as the 'latest' virtual
    // snapshot to the end of this list.
    compact_->existing_snapshots.push_back(versions_->LastSequence());
    earliest_snapshot_ = compact_->existing_snapshots[0];
  }

  // Is this compaction producing files at the bottommost level?
  bottommost_level_ = compact_->compaction->BottomMostLevel();
}

Status CompactionJob::Run() {
  log_buffer_->FlushBufferToLog();
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
280 281
  ThreadStatusUtil::SetColumnFamily(cfd);
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
282
  TEST_SYNC_POINT("CompactionJob::Run:Start");
I
Igor Canadi 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300

  const uint64_t start_micros = env_->NowMicros();
  std::unique_ptr<Iterator> input(
      versions_->MakeInputIterator(compact_->compaction));
  input->SeekToFirst();

  Status status;
  ParsedInternalKey ikey;
  std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2 =
      nullptr;
  auto context = compact_->GetFilterContext();
  compaction_filter_from_factory_v2 =
      cfd->ioptions()->compaction_filter_factory_v2->CreateCompactionFilterV2(
          context);
  auto compaction_filter_v2 = compaction_filter_from_factory_v2.get();

  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
  if (!compaction_filter_v2) {
S
sdong 已提交
301
    status = ProcessKeyValueCompaction(&imm_micros, input.get(), false);
I
Igor Canadi 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328
  } else {
    // temp_backup_input always point to the start of the current buffer
    // temp_backup_input = backup_input;
    // iterate through input,
    // 1) buffer ineligible keys and value keys into 2 separate buffers;
    // 2) send value_buffer to compaction filter and alternate the values;
    // 3) merge value_buffer with ineligible_value_buffer;
    // 4) run the modified "compaction" using the old for loop.
    bool prefix_initialized = false;
    shared_ptr<Iterator> backup_input(
        versions_->MakeInputIterator(compact_->compaction));
    backup_input->SeekToFirst();
    while (backup_input->Valid() &&
           !shutting_down_->load(std::memory_order_acquire) &&
           !cfd->IsDropped()) {
      // FLUSH preempts compaction
      // TODO(icanadi) this currently only checks if flush is necessary on
      // compacting column family. we should also check if flush is necessary on
      // other column families, too

      imm_micros += yield_callback_();

      Slice key = backup_input->key();
      Slice value = backup_input->value();

      if (!ParseInternalKey(key, &ikey)) {
        // log error
329
        Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
330 331
            "[%s] [JOB %d] Failed to parse key: %s", cfd->GetName().c_str(),
            job_id_, key.ToString().c_str());
I
Igor Canadi 已提交
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
        continue;
      } else {
        const SliceTransform* transformer =
            cfd->ioptions()->compaction_filter_factory_v2->GetPrefixExtractor();
        const auto key_prefix = transformer->Transform(ikey.user_key);
        if (!prefix_initialized) {
          compact_->cur_prefix_ = key_prefix.ToString();
          prefix_initialized = true;
        }
        // If the prefix remains the same, keep buffering
        if (key_prefix.compare(Slice(compact_->cur_prefix_)) == 0) {
          // Apply the compaction filter V2 to all the kv pairs sharing
          // the same prefix
          if (ikey.type == kTypeValue &&
              (visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
            // Buffer all keys sharing the same prefix for CompactionFilterV2
            // Iterate through keys to check prefix
            compact_->BufferKeyValueSlices(key, value);
          } else {
            // buffer ineligible keys
            compact_->BufferOtherKeyValueSlices(key, value);
          }
          backup_input->Next();
          continue;
          // finish changing values for eligible keys
        } else {
          // Now prefix changes, this batch is done.
          // Call compaction filter on the buffered values to change the value
          if (compact_->key_str_buf_.size() > 0) {
            CallCompactionFilterV2(compaction_filter_v2);
          }
          compact_->cur_prefix_ = key_prefix.ToString();
        }
      }

      // Merge this batch of data (values + ineligible keys)
      compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());

      // Done buffering for the current prefix. Spit it out to disk
      // Now just iterate through all the kv-pairs
S
sdong 已提交
372
      status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
I
Igor Canadi 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396

      if (!status.ok()) {
        break;
      }

      // After writing the kv-pairs, we can safely remove the reference
      // to the string buffer and clean them up
      compact_->CleanupBatchBuffer();
      compact_->CleanupMergedBuffer();
      // Buffer the key that triggers the mismatch in prefix
      if (ikey.type == kTypeValue &&
          (visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
        compact_->BufferKeyValueSlices(key, value);
      } else {
        compact_->BufferOtherKeyValueSlices(key, value);
      }
      backup_input->Next();
      if (!backup_input->Valid()) {
        // If this is the single last value, we need to merge it.
        if (compact_->key_str_buf_.size() > 0) {
          CallCompactionFilterV2(compaction_filter_v2);
        }
        compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());

S
sdong 已提交
397
        status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
398 399 400
        if (!status.ok()) {
          break;
        }
I
Igor Canadi 已提交
401 402 403 404 405 406

        compact_->CleanupBatchBuffer();
        compact_->CleanupMergedBuffer();
      }
    }  // done processing all prefix batches
    // finish the last batch
407 408 409 410 411 412
    if (status.ok()) {
      if (compact_->key_str_buf_.size() > 0) {
        CallCompactionFilterV2(compaction_filter_v2);
      }
      compact_->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
      status = ProcessKeyValueCompaction(&imm_micros, input.get(), true);
I
Igor Canadi 已提交
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
    }
  }  // checking for compaction filter v2

  if (status.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
    status = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during compaction");
  }
  if (status.ok() && compact_->builder != nullptr) {
    status = FinishCompactionOutputFile(input.get());
  }
  if (status.ok()) {
    status = input->status();
  }
  input.reset();

429 430
  if (output_directory_ && !db_options_.disableDataSync) {
    output_directory_->Fsync();
I
Igor Canadi 已提交
431 432 433
  }

  compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
434 435
  compaction_stats_.files_in_leveln =
      static_cast<int>(compact_->compaction->num_input_files(0));
I
Igor Canadi 已提交
436
  compaction_stats_.files_in_levelnp1 =
437
      static_cast<int>(compact_->compaction->num_input_files(1));
I
Igor Canadi 已提交
438 439
  MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);

440
  size_t num_output_files = compact_->outputs.size();
I
Igor Canadi 已提交
441 442 443 444 445
  if (compact_->builder != nullptr) {
    // An error occurred so ignore the last output.
    assert(num_output_files > 0);
    --num_output_files;
  }
446
  compaction_stats_.files_out_levelnp1 = static_cast<int>(num_output_files);
I
Igor Canadi 已提交
447

448
  for (size_t i = 0; i < compact_->compaction->num_input_files(0); i++) {
I
Igor Canadi 已提交
449 450 451
    compaction_stats_.bytes_readn +=
        compact_->compaction->input(0, i)->fd.GetFileSize();
    compaction_stats_.num_input_records +=
S
sdong 已提交
452
        static_cast<uint64_t>(compact_->compaction->input(0, i)->num_entries);
I
Igor Canadi 已提交
453 454
  }

455
  for (size_t i = 0; i < compact_->compaction->num_input_files(1); i++) {
I
Igor Canadi 已提交
456 457 458 459
    compaction_stats_.bytes_readnp1 +=
        compact_->compaction->input(1, i)->fd.GetFileSize();
  }

460
  for (size_t i = 0; i < num_output_files; i++) {
I
Igor Canadi 已提交
461 462
    compaction_stats_.bytes_written += compact_->outputs[i].file_size;
  }
S
sdong 已提交
463 464 465 466 467
  if (compact_->num_input_records > compact_->num_output_records) {
    compaction_stats_.num_dropped_records +=
        compact_->num_input_records - compact_->num_output_records;
    compact_->num_input_records = compact_->num_output_records = 0;
  }
I
Igor Canadi 已提交
468 469 470 471

  RecordCompactionIOStats();

  LogFlush(db_options_.info_log);
472
  TEST_SYNC_POINT("CompactionJob::Run:End");
473
  ThreadStatusUtil::ResetThreadStatus();
I
Igor Canadi 已提交
474 475 476
  return status;
}

477
void CompactionJob::Install(Status* status, InstrumentedMutex* db_mutex) {
478
  db_mutex->AssertHeld();
I
Igor Canadi 已提交
479 480 481 482
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  cfd->internal_stats()->AddCompactionStats(
      compact_->compaction->output_level(), compaction_stats_);

483 484
  if (status->ok()) {
    *status = InstallCompactionResults(db_mutex);
I
Igor Canadi 已提交
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
  }
  VersionStorageInfo::LevelSummaryStorage tmp;
  const auto& stats = compaction_stats_;
  LogToBuffer(log_buffer_,
              "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
              "files in(%d, %d) out(%d) "
              "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
              "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
              cfd->GetName().c_str(),
              cfd->current()->storage_info()->LevelSummary(&tmp),
              (stats.bytes_readn + stats.bytes_readnp1) /
                  static_cast<double>(stats.micros),
              stats.bytes_written / static_cast<double>(stats.micros),
              compact_->compaction->output_level(), stats.files_in_leveln,
              stats.files_in_levelnp1, stats.files_out_levelnp1,
              stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
              stats.bytes_written / 1048576.0,
              (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
                  static_cast<double>(stats.bytes_readn),
              stats.bytes_written / static_cast<double>(stats.bytes_readn),
505
              status->ToString().c_str(), stats.num_input_records,
I
Igor Canadi 已提交
506 507
              stats.num_dropped_records);

508
  CleanupCompaction(*status);
I
Igor Canadi 已提交
509 510 511 512
}

Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
                                                Iterator* input,
S
sdong 已提交
513
                                                bool is_compaction_v2) {
I
Igor Canadi 已提交
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
  size_t combined_idx = 0;
  Status status;
  std::string compaction_filter_value;
  ParsedInternalKey ikey;
  IterKey current_user_key;
  bool has_current_user_key = false;
  IterKey delete_key;
  SequenceNumber last_sequence_for_key __attribute__((unused)) =
      kMaxSequenceNumber;
  SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator,
                    db_options_.info_log.get(),
                    cfd->ioptions()->min_partial_merge_operands,
                    false /* internal key corruption is expected */);
  auto compaction_filter = cfd->ioptions()->compaction_filter;
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (!compaction_filter) {
    auto context = compact_->GetFilterContextV1();
    compaction_filter_from_factory =
        cfd->ioptions()->compaction_filter_factory->CreateCompactionFilter(
            context);
    compaction_filter = compaction_filter_from_factory.get();
  }

  int64_t key_drop_user = 0;
  int64_t key_drop_newer_entry = 0;
  int64_t key_drop_obsolete = 0;
  int64_t loop_cnt = 0;
  while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) &&
         !cfd->IsDropped() && status.ok()) {
S
sdong 已提交
545
    compact_->num_input_records++;
I
Igor Canadi 已提交
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
    if (++loop_cnt > 1000) {
      if (key_drop_user > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
        key_drop_user = 0;
      }
      if (key_drop_newer_entry > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
                   key_drop_newer_entry);
        key_drop_newer_entry = 0;
      }
      if (key_drop_obsolete > 0) {
        RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
        key_drop_obsolete = 0;
      }
      RecordCompactionIOStats();
      loop_cnt = 0;
    }
    // FLUSH preempts compaction
    // TODO(icanadi) this currently only checks if flush is necessary on
    // compacting column family. we should also check if flush is necessary on
    // other column families, too
    (*imm_micros) += yield_callback_();

    Slice key;
    Slice value;
    // If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
    // This prefix batch should contain results after calling
    // compaction_filter_v2.
    //
    // If is_compaction_v2 is off, this function will go through all the
    // kv-pairs in input.
    if (!is_compaction_v2) {
      key = input->key();
      value = input->value();
    } else {
      if (combined_idx >= compact_->combined_key_buf_.size()) {
        break;
      }
      assert(combined_idx < compact_->combined_key_buf_.size());
      key = compact_->combined_key_buf_[combined_idx];
      value = compact_->combined_value_buf_[combined_idx];

      ++combined_idx;
    }

    if (compact_->compaction->ShouldStopBefore(key) &&
        compact_->builder != nullptr) {
      status = FinishCompactionOutputFile(input);
      if (!status.ok()) {
        break;
      }
    }

    // Handle key/value, add to state, etc.
    bool drop = false;
    bool current_entry_is_merging = false;
    if (!ParseInternalKey(key, &ikey)) {
      // Do not hide error keys
      // TODO: error key stays in db forever? Figure out the intention/rationale
      // v10 error v8 : we cannot hide v8 even though it's pretty obvious.
      current_user_key.Clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
      visible_in_snapshot = kMaxSequenceNumber;
    } else {
      if (!has_current_user_key ||
          cfd->user_comparator()->Compare(ikey.user_key,
                                          current_user_key.GetKey()) != 0) {
        // First occurrence of this user key
        current_user_key.SetKey(ikey.user_key);
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
        visible_in_snapshot = kMaxSequenceNumber;
        // apply the compaction filter to the first occurrence of the user key
        if (compaction_filter && !is_compaction_v2 && ikey.type == kTypeValue &&
            (visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
          // If the user has specified a compaction filter and the sequence
          // number is greater than any external snapshot, then invoke the
          // filter.
          // If the return value of the compaction filter is true, replace
          // the entry with a delete marker.
          bool value_changed = false;
          compaction_filter_value.clear();
          bool to_delete = compaction_filter->Filter(
              compact_->compaction->level(), ikey.user_key, value,
              &compaction_filter_value, &value_changed);
          if (to_delete) {
            // make a copy of the original key and convert it to a delete
            delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
                                      kTypeDeletion);
            // anchor the key again
            key = delete_key.GetKey();
            // needed because ikey is backed by key
            ParseInternalKey(key, &ikey);
            // no value associated with delete
            value.clear();
            ++key_drop_user;
          } else if (value_changed) {
            value = compaction_filter_value;
          }
        }
      }

      // If there are no snapshots, then this kv affect visibility at tip.
      // Otherwise, search though all existing snapshots to find
      // the earlist snapshot that is affected by this kv.
      SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
      SequenceNumber visible =
          visible_at_tip_
              ? visible_at_tip_
              : is_snapshot_supported_
                    ? findEarliestVisibleSnapshot(ikey.sequence,
                                                  compact_->existing_snapshots,
                                                  &prev_snapshot)
                    : 0;

      if (visible_in_snapshot == visible) {
        // If the earliest snapshot is which this key is visible in
        // is the same as the visibily of a previous instance of the
        // same key, then this kv is not visible in any snapshot.
        // Hidden by an newer entry for same user key
        // TODO: why not > ?
        assert(last_sequence_for_key >= ikey.sequence);
        drop = true;  // (A)
        ++key_drop_newer_entry;
      } else if (ikey.type == kTypeDeletion &&
                 ikey.sequence <= earliest_snapshot_ &&
                 compact_->compaction->KeyNotExistsBeyondOutputLevel(
                     ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
        ++key_drop_obsolete;
      } else if (ikey.type == kTypeMerge) {
        if (!merge.HasOperator()) {
          LogToBuffer(log_buffer_, "Options::merge_operator is null.");
          status = Status::InvalidArgument(
              "merge_operator is not properly initialized.");
          break;
        }
        // We know the merge type entry is not hidden, otherwise we would
        // have hit (A)
        // We encapsulate the merge related state machine in a different
        // object to minimize change to the existing flow. Turn out this
        // logic could also be nicely re-used for memtable flush purge
        // optimization in BuildTable.
        int steps = 0;
        merge.MergeUntil(input, prev_snapshot, bottommost_level_,
                         db_options_.statistics.get(), &steps);
        // Skip the Merge ops
        combined_idx = combined_idx - 1 + steps;

        current_entry_is_merging = true;
        if (merge.IsSuccess()) {
          // Successfully found Put/Delete/(end-of-key-range) while merging
          // Get the merge result
          key = merge.key();
          ParseInternalKey(key, &ikey);
          value = merge.value();
        } else {
          // Did not find a Put/Delete/(end-of-key-range) while merging
          // We now have some stack of merge operands to write out.
          // NOTE: key,value, and ikey are now referring to old entries.
          //       These will be correctly set below.
          assert(!merge.keys().empty());
          assert(merge.keys().size() == merge.values().size());

          // Hack to make sure last_sequence_for_key is correct
          ParseInternalKey(merge.keys().front(), &ikey);
        }
      }

      last_sequence_for_key = ikey.sequence;
      visible_in_snapshot = visible;
    }

    if (!drop) {
      // We may write a single key (e.g.: for Put/Delete or successful merge).
      // Or we may instead have to write a sequence/list of keys.
      // We have to write a sequence iff we have an unsuccessful merge
      bool has_merge_list = current_entry_is_merging && !merge.IsSuccess();
      const std::deque<std::string>* keys = nullptr;
      const std::deque<std::string>* values = nullptr;
      std::deque<std::string>::const_reverse_iterator key_iter;
      std::deque<std::string>::const_reverse_iterator value_iter;
      if (has_merge_list) {
        keys = &merge.keys();
        values = &merge.values();
        key_iter = keys->rbegin();  // The back (*rbegin()) is the first key
        value_iter = values->rbegin();

        key = Slice(*key_iter);
        value = Slice(*value_iter);
      }

      // If we have a list of keys to write, traverse the list.
      // If we have a single key to write, simply write that key.
      while (true) {
        // Invariant: key,value,ikey will always be the next entry to write
        char* kptr = (char*)key.data();
        std::string kstr;

        // Zeroing out the sequence number leads to better compression.
        // If this is the bottommost level (no files in lower levels)
        // and the earliest snapshot is larger than this seqno
        // then we can squash the seqno to zero.
        if (bottommost_level_ && ikey.sequence < earliest_snapshot_ &&
            ikey.type != kTypeMerge) {
          assert(ikey.type != kTypeDeletion);
          // make a copy because updating in place would cause problems
          // with the priority queue that is managing the input key iterator
          kstr.assign(key.data(), key.size());
          kptr = (char*)kstr.c_str();
          UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type);
        }

        Slice newkey(kptr, key.size());
        assert((key.clear(), 1));  // we do not need 'key' anymore

        // Open output file if necessary
        if (compact_->builder == nullptr) {
          status = OpenCompactionOutputFile();
          if (!status.ok()) {
            break;
          }
        }

        SequenceNumber seqno = GetInternalKeySeqno(newkey);
        if (compact_->builder->NumEntries() == 0) {
          compact_->current_output()->smallest.DecodeFrom(newkey);
          compact_->current_output()->smallest_seqno = seqno;
        } else {
          compact_->current_output()->smallest_seqno =
              std::min(compact_->current_output()->smallest_seqno, seqno);
        }
        compact_->current_output()->largest.DecodeFrom(newkey);
        compact_->builder->Add(newkey, value);
S
sdong 已提交
788
        compact_->num_output_records++,
I
Igor Canadi 已提交
789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882
            compact_->current_output()->largest_seqno =
                std::max(compact_->current_output()->largest_seqno, seqno);

        // Close output file if it is big enough
        if (compact_->builder->FileSize() >=
            compact_->compaction->MaxOutputFileSize()) {
          status = FinishCompactionOutputFile(input);
          if (!status.ok()) {
            break;
          }
        }

        // If we have a list of entries, move to next element
        // If we only had one entry, then break the loop.
        if (has_merge_list) {
          ++key_iter;
          ++value_iter;

          // If at end of list
          if (key_iter == keys->rend() || value_iter == values->rend()) {
            // Sanity Check: if one ends, then both end
            assert(key_iter == keys->rend() && value_iter == values->rend());
            break;
          }

          // Otherwise not at end of list. Update key, value, and ikey.
          key = Slice(*key_iter);
          value = Slice(*value_iter);
          ParseInternalKey(key, &ikey);

        } else {
          // Only had one item to begin with (Put/Delete)
          break;
        }
      }  // while (true)
    }    // if (!drop)

    // MergeUntil has moved input to the next entry
    if (!current_entry_is_merging) {
      input->Next();
    }
  }
  if (key_drop_user > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_USER, key_drop_user);
  }
  if (key_drop_newer_entry > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, key_drop_newer_entry);
  }
  if (key_drop_obsolete > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, key_drop_obsolete);
  }
  RecordCompactionIOStats();

  return status;
}

void CompactionJob::CallCompactionFilterV2(
    CompactionFilterV2* compaction_filter_v2) {
  if (compact_ == nullptr || compaction_filter_v2 == nullptr) {
    return;
  }

  // Assemble slice vectors for user keys and existing values.
  // We also keep track of our parsed internal key structs because
  // we may need to access the sequence number in the event that
  // keys are garbage collected during the filter process.
  std::vector<ParsedInternalKey> ikey_buf;
  std::vector<Slice> user_key_buf;
  std::vector<Slice> existing_value_buf;

  for (const auto& key : compact_->key_str_buf_) {
    ParsedInternalKey ikey;
    ParseInternalKey(Slice(key), &ikey);
    ikey_buf.emplace_back(ikey);
    user_key_buf.emplace_back(ikey.user_key);
  }
  for (const auto& value : compact_->existing_value_str_buf_) {
    existing_value_buf.emplace_back(Slice(value));
  }

  // If the user has specified a compaction filter and the sequence
  // number is greater than any external snapshot, then invoke the
  // filter.
  // If the return value of the compaction filter is true, replace
  // the entry with a delete marker.
  compact_->to_delete_buf_ = compaction_filter_v2->Filter(
      compact_->compaction->level(), user_key_buf, existing_value_buf,
      &compact_->new_value_buf_, &compact_->value_changed_buf_);

  // new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
  // kv-pairs in this compaction run needs to be deleted.
  assert(compact_->to_delete_buf_.size() == compact_->key_str_buf_.size());
  assert(compact_->to_delete_buf_.size() ==
         compact_->existing_value_str_buf_.size());
883 884
  assert(compact_->value_changed_buf_.empty() ||
         compact_->to_delete_buf_.size() ==
I
Igor Canadi 已提交
885 886 887 888 889 890 891 892 893 894 895 896 897 898
         compact_->value_changed_buf_.size());

  int new_value_idx = 0;
  for (unsigned int i = 0; i < compact_->to_delete_buf_.size(); ++i) {
    if (compact_->to_delete_buf_[i]) {
      // update the string buffer directly
      // the Slice buffer points to the updated buffer
      UpdateInternalKey(&compact_->key_str_buf_[i][0],
                        compact_->key_str_buf_[i].size(), ikey_buf[i].sequence,
                        kTypeDeletion);

      // no value associated with delete
      compact_->existing_value_str_buf_[i].clear();
      RecordTick(stats_, COMPACTION_KEY_DROP_USER);
899 900
    } else if (!compact_->value_changed_buf_.empty() &&
        compact_->value_changed_buf_[i]) {
I
Igor Canadi 已提交
901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952
      compact_->existing_value_str_buf_[i] =
          compact_->new_value_buf_[new_value_idx++];
    }
  }  // for
}

Status CompactionJob::FinishCompactionOutputFile(Iterator* input) {
  assert(compact_ != nullptr);
  assert(compact_->outfile);
  assert(compact_->builder != nullptr);

  const uint64_t output_number = compact_->current_output()->number;
  const uint32_t output_path_id = compact_->current_output()->path_id;
  assert(output_number != 0);

  // Check for iterator errors
  Status s = input->status();
  const uint64_t current_entries = compact_->builder->NumEntries();
  if (s.ok()) {
    s = compact_->builder->Finish();
  } else {
    compact_->builder->Abandon();
  }
  const uint64_t current_bytes = compact_->builder->FileSize();
  compact_->current_output()->file_size = current_bytes;
  compact_->total_bytes += current_bytes;
  compact_->builder.reset();

  // Finish and check for file errors
  if (s.ok() && !db_options_.disableDataSync) {
    if (db_options_.use_fsync) {
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
      s = compact_->outfile->Fsync();
    } else {
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
      s = compact_->outfile->Sync();
    }
  }
  if (s.ok()) {
    s = compact_->outfile->Close();
  }
  compact_->outfile.reset();

  if (s.ok() && current_entries > 0) {
    // Verify that the table is usable
    ColumnFamilyData* cfd = compact_->compaction->column_family_data();
    FileDescriptor fd(output_number, output_path_id, current_bytes);
    Iterator* iter = cfd->table_cache()->NewIterator(
        ReadOptions(), env_options_, cfd->internal_comparator(), fd);
    s = iter->status();
    delete iter;
    if (s.ok()) {
953 954 955 956 957
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
          "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
          " keys, %" PRIu64 " bytes",
          cfd->GetName().c_str(), job_id_, output_number, current_entries,
          current_bytes);
I
Igor Canadi 已提交
958 959 960 961 962
    }
  }
  return s;
}

963
Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) {
964
  db_mutex->AssertHeld();
I
Igor Canadi 已提交
965

966
  auto* compaction = compact_->compaction;
I
Igor Canadi 已提交
967 968 969 970
  // paranoia: verify that the files that we started with
  // still exist in the current version and in the same original level.
  // This ensures that a concurrent compaction did not erroneously
  // pick the same files to compact_.
971 972 973
  if (!versions_->VerifyCompactionFileConsistency(compaction)) {
    Compaction::InputLevelSummaryBuffer inputs_summary;

974
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
975 976 977
        "[%s] [JOB %d] Compaction %s aborted",
        compaction->column_family_data()->GetName().c_str(), job_id_,
        compaction->InputLevelSummary(&inputs_summary));
I
Igor Canadi 已提交
978 979 980
    return Status::Corruption("Compaction input files inconsistent");
  }

981 982 983 984 985 986 987
  {
    Compaction::InputLevelSummaryBuffer inputs_summary;
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
        compaction->column_family_data()->GetName().c_str(), job_id_,
        compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
  }
I
Igor Canadi 已提交
988 989

  // Add compaction outputs
990
  compaction->AddInputDeletions(compact_->compaction->edit());
I
Igor Canadi 已提交
991 992
  for (size_t i = 0; i < compact_->outputs.size(); i++) {
    const CompactionState::Output& out = compact_->outputs[i];
993 994 995 996 997 998 999
    compaction->edit()->AddFile(
        compaction->output_level(), out.number, out.path_id, out.file_size,
        out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
  }
  return versions_->LogAndApply(compaction->column_family_data(),
                                mutable_cf_options_, compaction->edit(),
                                db_mutex, db_directory_);
I
Igor Canadi 已提交
1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
}

// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot(
    SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
    SequenceNumber* prev_snapshot) {
I
Igor Canadi 已提交
1011
  assert(snapshots.size());
I
Igor Canadi 已提交
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021
  SequenceNumber prev __attribute__((unused)) = 0;
  for (const auto cur : snapshots) {
    assert(prev <= cur);
    if (cur >= in) {
      *prev_snapshot = prev;
      return cur;
    }
    prev = cur;  // assignment
    assert(prev);
  }
1022 1023 1024 1025 1026
  Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
      "CompactionJob is not able to find snapshot"
      " with SeqId later than %" PRIu64
      ": current MaxSeqId is %" PRIu64 "",
      in, snapshots[snapshots.size() - 1]);
I
Igor Canadi 已提交
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
  assert(0);
  return 0;
}

void CompactionJob::RecordCompactionIOStats() {
  RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
  IOSTATS_RESET(bytes_read);
  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
  IOSTATS_RESET(bytes_written);
}

Status CompactionJob::OpenCompactionOutputFile() {
  assert(compact_ != nullptr);
  assert(compact_->builder == nullptr);
1041 1042
  // no need to lock because VersionSet::next_file_number_ is atomic
  uint64_t file_number = versions_->NewFileNumber();
I
Igor Canadi 已提交
1043 1044 1045 1046 1047 1048 1049
  // Make the output file
  std::string fname = TableFileName(db_options_.db_paths, file_number,
                                    compact_->compaction->GetOutputPathId());
  Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_);

  if (!s.ok()) {
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
1050
        "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1051
        " fails at NewWritableFile with status %s",
1052
        compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
I
Igor Canadi 已提交
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
        file_number, s.ToString().c_str());
    LogFlush(db_options_.info_log);
    return s;
  }
  CompactionState::Output out;
  out.number = file_number;
  out.path_id = compact_->compaction->GetOutputPathId();
  out.smallest.Clear();
  out.largest.Clear();
  out.smallest_seqno = out.largest_seqno = 0;

  compact_->outputs.push_back(out);
  compact_->outfile->SetIOPriority(Env::IO_LOW);
1066 1067
  compact_->outfile->SetPreallocationBlockSize(static_cast<size_t>(
      compact_->compaction->OutputFilePreallocationSize(mutable_cf_options_)));
I
Igor Canadi 已提交
1068 1069

  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
  bool skip_filters = false;

  // If the Column family flag is to only optimize filters for hits,
  // we can skip creating filters if this is the bottommost_level where
  // data is going to be found
  //
  if (cfd->ioptions()->optimize_filters_for_hits && bottommost_level_) {
    skip_filters = true;
  }

I
Igor Canadi 已提交
1080 1081 1082
  compact_->builder.reset(NewTableBuilder(
      *cfd->ioptions(), cfd->internal_comparator(), compact_->outfile.get(),
      compact_->compaction->OutputCompressionType(),
1083
      cfd->ioptions()->compression_opts, skip_filters));
I
Igor Canadi 已提交
1084 1085 1086 1087
  LogFlush(db_options_.info_log);
  return s;
}

1088
void CompactionJob::CleanupCompaction(const Status& status) {
I
Igor Canadi 已提交
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
  if (compact_->builder != nullptr) {
    // May happen if we get a shutdown call in the middle of compaction
    compact_->builder->Abandon();
    compact_->builder.reset();
  } else {
    assert(!status.ok() || compact_->outfile == nullptr);
  }
  for (size_t i = 0; i < compact_->outputs.size(); i++) {
    const CompactionState::Output& out = compact_->outputs[i];

    // If this file was inserted into the table cache then remove
    // them here because this compaction was not committed.
    if (!status.ok()) {
      TableCache::Evict(table_cache_.get(), out.number);
    }
  }
  delete compact_;
  compact_ = nullptr;
}

}  // namespace rocksdb