compaction_job.cc 36.9 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/compaction_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
#include <algorithm>
#include <vector>
#include <memory>
#include <list>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
25
#include "db/event_helpers.h"
I
Igor Canadi 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_helper.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
53
#include "util/string_util.h"
I
Igor Canadi 已提交
54
#include "util/sync_point.h"
55
#include "util/thread_status_util.h"
I
Igor Canadi 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68

namespace rocksdb {

struct CompactionJob::CompactionState {
  Compaction* const compaction;

  // Files produced by compaction
  struct Output {
    uint64_t number;
    uint32_t path_id;
    uint64_t file_size;
    InternalKey smallest, largest;
    SequenceNumber smallest_seqno, largest_seqno;
69
    bool need_compaction;
I
Igor Canadi 已提交
70 71 72 73 74 75 76 77 78 79 80
  };
  std::vector<Output> outputs;

  // State kept for output being generated
  std::unique_ptr<WritableFile> outfile;
  std::unique_ptr<TableBuilder> builder;

  uint64_t total_bytes;

  Output* current_output() { return &outputs[outputs.size() - 1]; }

S
sdong 已提交
81 82 83 84 85
  explicit CompactionState(Compaction* c)
      : compaction(c),
        total_bytes(0),
        num_input_records(0),
        num_output_records(0) {}
I
Igor Canadi 已提交
86

S
sdong 已提交
87 88
  uint64_t num_input_records;
  uint64_t num_output_records;
I
Igor Canadi 已提交
89 90 91
};

CompactionJob::CompactionJob(
92
    int job_id, Compaction* compaction, const DBOptions& db_options,
I
Igor Canadi 已提交
93 94 95 96
    const EnvOptions& env_options, VersionSet* versions,
    std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
    Directory* db_directory, Directory* output_directory, Statistics* stats,
    std::vector<SequenceNumber> existing_snapshots,
97
    std::shared_ptr<Cache> table_cache,
I
Igor Canadi 已提交
98
    std::function<uint64_t()> yield_callback, EventLogger* event_logger,
99 100
    bool paranoid_file_checks, const std::string& dbname,
    CompactionJobStats* compaction_job_stats)
101 102
    : job_id_(job_id),
      compact_(new CompactionState(compaction)),
103
      compaction_job_stats_(compaction_job_stats),
I
Igor Canadi 已提交
104
      compaction_stats_(1),
105
      dbname_(dbname),
I
Igor Canadi 已提交
106 107 108 109 110 111 112
      db_options_(db_options),
      env_options_(env_options),
      env_(db_options.env),
      versions_(versions),
      shutting_down_(shutting_down),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
113
      output_directory_(output_directory),
I
Igor Canadi 已提交
114
      stats_(stats),
I
Igor Canadi 已提交
115
      existing_snapshots_(std::move(existing_snapshots)),
I
Igor Canadi 已提交
116
      table_cache_(std::move(table_cache)),
117
      yield_callback_(std::move(yield_callback)),
I
Igor Canadi 已提交
118 119
      event_logger_(event_logger),
      paranoid_file_checks_(paranoid_file_checks) {
120
  assert(log_buffer_ != nullptr);
I
Igor Canadi 已提交
121
  ThreadStatusUtil::SetColumnFamily(compact_->compaction->column_family_data());
122
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
123
  ReportStartedCompaction(compaction);
124 125 126 127 128 129
}

CompactionJob::~CompactionJob() {
  assert(compact_ == nullptr);
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
130

131 132 133 134 135 136 137 138 139 140 141 142 143 144
void CompactionJob::ReportStartedCompaction(
    Compaction* compaction) {
  ThreadStatusUtil::SetColumnFamily(
      compact_->compaction->column_family_data());

  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_id_);

  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
      (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
          compact_->compaction->output_level());

145 146 147
  // In the current design, a CompactionJob is always created
  // for non-trivial compaction.
  assert(compaction->IsTrivialMove() == false ||
148
         compaction->is_manual_compaction() == true);
149

150 151
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_PROP_FLAGS,
S
sdong 已提交
152
      compaction->is_manual_compaction() +
153
          (compaction->deletion_compaction() << 1));
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169

  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
      compaction->CalculateTotalInputSize());

  IOSTATS_RESET(bytes_written);
  IOSTATS_RESET(bytes_read);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_READ, 0);

  // Set the thread operation after operation properties
  // to ensure GetThreadList() can always show them all together.
  ThreadStatusUtil::SetThreadOperation(
      ThreadStatus::OP_COMPACTION);
170 171 172

  if (compaction_job_stats_) {
    compaction_job_stats_->is_manual_compaction =
S
sdong 已提交
173
        compaction->is_manual_compaction();
174
  }
175 176
}

I
Igor Canadi 已提交
177
void CompactionJob::Prepare() {
178 179
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_PREPARE);
I
Igor Canadi 已提交
180 181

  // Generate file_levels_ for compaction berfore making Iterator
182 183
  ColumnFamilyData* cfd __attribute__((unused)) =
      compact_->compaction->column_family_data();
I
Igor Canadi 已提交
184
  assert(cfd != nullptr);
I
Igor Canadi 已提交
185 186 187 188 189 190 191 192

  assert(cfd->current()->storage_info()->NumLevelFiles(
             compact_->compaction->level()) > 0);
  assert(compact_->builder == nullptr);
  assert(!compact_->outfile);

  visible_at_tip_ = 0;
  latest_snapshot_ = 0;
I
Igor Canadi 已提交
193
  if (existing_snapshots_.size() == 0) {
I
Igor Canadi 已提交
194 195 196 197
    // optimize for fast path if there are no snapshots
    visible_at_tip_ = versions_->LastSequence();
    earliest_snapshot_ = visible_at_tip_;
  } else {
I
Igor Canadi 已提交
198
    latest_snapshot_ = existing_snapshots_.back();
I
Igor Canadi 已提交
199 200
    // Add the current seqno as the 'latest' virtual
    // snapshot to the end of this list.
I
Igor Canadi 已提交
201 202
    existing_snapshots_.push_back(versions_->LastSequence());
    earliest_snapshot_ = existing_snapshots_[0];
I
Igor Canadi 已提交
203 204 205
  }

  // Is this compaction producing files at the bottommost level?
206
  bottommost_level_ = compact_->compaction->bottommost_level();
I
Igor Canadi 已提交
207 208 209
}

Status CompactionJob::Run() {
210 211
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_RUN);
212
  TEST_SYNC_POINT("CompactionJob::Run():Start");
I
Igor Canadi 已提交
213 214
  log_buffer_->FlushBufferToLog();
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
215
  auto* compaction = compact_->compaction;
A
Andres Notzli 已提交
216
  LogCompaction(cfd, compaction);
217

I
Igor Canadi 已提交
218 219 220 221 222 223
  const uint64_t start_micros = env_->NowMicros();
  std::unique_ptr<Iterator> input(
      versions_->MakeInputIterator(compact_->compaction));
  input->SeekToFirst();

  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
I
Igor Canadi 已提交
224
  auto status = ProcessKeyValueCompaction(&imm_micros, input.get());
I
Igor Canadi 已提交
225 226 227 228 229 230 231

  if (status.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd->IsDropped())) {
    status = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during compaction");
  }
  if (status.ok() && compact_->builder != nullptr) {
232
    status = FinishCompactionOutputFile(input->status());
I
Igor Canadi 已提交
233 234 235 236 237 238
  }
  if (status.ok()) {
    status = input->status();
  }
  input.reset();

239 240
  if (output_directory_ && !db_options_.disableDataSync) {
    output_directory_->Fsync();
I
Igor Canadi 已提交
241 242 243 244
  }

  compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros;
  MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
A
Andres Notzli 已提交
245
  UpdateCompactionStats();
I
Igor Canadi 已提交
246 247 248 249

  RecordCompactionIOStats();

  LogFlush(db_options_.info_log);
250
  TEST_SYNC_POINT("CompactionJob::Run():End");
I
Igor Canadi 已提交
251 252 253
  return status;
}

I
Igor Canadi 已提交
254 255 256
void CompactionJob::Install(Status* status,
                            const MutableCFOptions& mutable_cf_options,
                            InstrumentedMutex* db_mutex) {
257 258
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_INSTALL);
259
  db_mutex->AssertHeld();
I
Igor Canadi 已提交
260 261 262 263
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  cfd->internal_stats()->AddCompactionStats(
      compact_->compaction->output_level(), compaction_stats_);

264
  if (status->ok()) {
I
Igor Canadi 已提交
265
    *status = InstallCompactionResults(db_mutex, mutable_cf_options);
I
Igor Canadi 已提交
266 267
  }
  VersionStorageInfo::LevelSummaryStorage tmp;
268
  auto vstorage = cfd->current()->storage_info();
I
Igor Canadi 已提交
269
  const auto& stats = compaction_stats_;
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
  LogToBuffer(
      log_buffer_,
      "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
      "files in(%d, %d) out(%d) "
      "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
      "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
      cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
      (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
          static_cast<double>(stats.micros),
      stats.bytes_written / static_cast<double>(stats.micros),
      compact_->compaction->output_level(),
      stats.num_input_files_in_non_output_levels,
      stats.num_input_files_in_output_level,
      stats.num_output_files,
      stats.bytes_read_non_output_levels / 1048576.0,
      stats.bytes_read_output_level / 1048576.0,
      stats.bytes_written / 1048576.0,
      (stats.bytes_written + stats.bytes_read_output_level +
       stats.bytes_read_non_output_levels) /
          static_cast<double>(stats.bytes_read_non_output_levels),
      stats.bytes_written /
          static_cast<double>(stats.bytes_read_non_output_levels),
      status->ToString().c_str(), stats.num_input_records,
      stats.num_dropped_records);
I
Igor Canadi 已提交
294

295 296
  UpdateCompactionJobStats(stats);

297 298 299 300 301
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_id_ << "event"
         << "compaction_finished"
         << "output_level" << compact_->compaction->output_level()
         << "num_output_files" << compact_->outputs.size()
302 303 304
         << "total_output_size" << compact_->total_bytes
         << "num_input_records" << compact_->num_input_records
         << "num_output_records" << compact_->num_output_records;
305 306 307 308 309 310 311
  stream << "lsm_state";
  stream.StartArray();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();

312
  CleanupCompaction(*status);
I
Igor Canadi 已提交
313 314 315
}

Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
I
Igor Canadi 已提交
316
                                                Iterator* input) {
317 318
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
I
Igor Canadi 已提交
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
  Status status;
  std::string compaction_filter_value;
  ParsedInternalKey ikey;
  IterKey current_user_key;
  bool has_current_user_key = false;
  IterKey delete_key;
  SequenceNumber last_sequence_for_key __attribute__((unused)) =
      kMaxSequenceNumber;
  SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
  MergeHelper merge(cfd->user_comparator(), cfd->ioptions()->merge_operator,
                    db_options_.info_log.get(),
                    cfd->ioptions()->min_partial_merge_operands,
                    false /* internal key corruption is expected */);
  auto compaction_filter = cfd->ioptions()->compaction_filter;
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
  if (!compaction_filter) {
    compaction_filter_from_factory =
337
        compact_->compaction->CreateCompactionFilter();
I
Igor Canadi 已提交
338 339 340
    compaction_filter = compaction_filter_from_factory.get();
  }

341
  TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
342

I
Igor Canadi 已提交
343 344 345 346
  int64_t key_drop_user = 0;
  int64_t key_drop_newer_entry = 0;
  int64_t key_drop_obsolete = 0;
  int64_t loop_cnt = 0;
347 348 349

  StopWatchNano timer(env_, stats_ != nullptr);
  uint64_t total_filter_time = 0;
I
Igor Canadi 已提交
350 351
  while (input->Valid() && !shutting_down_->load(std::memory_order_acquire) &&
         !cfd->IsDropped() && status.ok()) {
S
sdong 已提交
352
    compact_->num_input_records++;
I
Igor Canadi 已提交
353
    if (++loop_cnt > 1000) {
354 355
      RecordDroppedKeys(
          &key_drop_user, &key_drop_newer_entry, &key_drop_obsolete);
I
Igor Canadi 已提交
356 357 358 359 360
      RecordCompactionIOStats();
      loop_cnt = 0;
    }
    // FLUSH preempts compaction
    // TODO(icanadi) this currently only checks if flush is necessary on
A
Andres Notzli 已提交
361 362
    // compacting column family. we should also check if flush is necessary
    // on other column families, too
I
Igor Canadi 已提交
363 364
    (*imm_micros) += yield_callback_();

I
Igor Canadi 已提交
365 366
    Slice key = input->key();
    Slice value = input->value();
I
Igor Canadi 已提交
367

368 369 370 371 372 373 374
    if (compaction_job_stats_ != nullptr) {
      compaction_job_stats_->total_input_raw_key_bytes +=
          input->key().size();
      compaction_job_stats_->total_input_raw_value_bytes +=
          input->value().size();
    }

I
Igor Canadi 已提交
375 376
    if (compact_->compaction->ShouldStopBefore(key) &&
        compact_->builder != nullptr) {
377
      status = FinishCompactionOutputFile(input->status());
I
Igor Canadi 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
      if (!status.ok()) {
        break;
      }
    }

    // Handle key/value, add to state, etc.
    bool drop = false;
    bool current_entry_is_merging = false;
    if (!ParseInternalKey(key, &ikey)) {
      // Do not hide error keys
      // TODO: error key stays in db forever? Figure out the intention/rationale
      // v10 error v8 : we cannot hide v8 even though it's pretty obvious.
      current_user_key.Clear();
      has_current_user_key = false;
      last_sequence_for_key = kMaxSequenceNumber;
      visible_in_snapshot = kMaxSequenceNumber;
    } else {
395
      if (compaction_job_stats_ != nullptr && ikey.type == kTypeDeletion) {
396 397 398
        compaction_job_stats_->num_input_deletion_records++;
      }

I
Igor Canadi 已提交
399 400 401 402 403 404 405 406 407
      if (!has_current_user_key ||
          cfd->user_comparator()->Compare(ikey.user_key,
                                          current_user_key.GetKey()) != 0) {
        // First occurrence of this user key
        current_user_key.SetKey(ikey.user_key);
        has_current_user_key = true;
        last_sequence_for_key = kMaxSequenceNumber;
        visible_in_snapshot = kMaxSequenceNumber;
        // apply the compaction filter to the first occurrence of the user key
I
Igor Canadi 已提交
408
        if (compaction_filter && ikey.type == kTypeValue &&
I
Igor Canadi 已提交
409 410 411 412 413 414 415 416
            (visible_at_tip_ || ikey.sequence > latest_snapshot_)) {
          // If the user has specified a compaction filter and the sequence
          // number is greater than any external snapshot, then invoke the
          // filter.
          // If the return value of the compaction filter is true, replace
          // the entry with a delete marker.
          bool value_changed = false;
          compaction_filter_value.clear();
417 418 419
          if (stats_ != nullptr) {
            timer.Start();
          }
I
Igor Canadi 已提交
420 421 422
          bool to_delete = compaction_filter->Filter(
              compact_->compaction->level(), ikey.user_key, value,
              &compaction_filter_value, &value_changed);
423
          total_filter_time += timer.ElapsedNanos();
I
Igor Canadi 已提交
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
          if (to_delete) {
            // make a copy of the original key and convert it to a delete
            delete_key.SetInternalKey(ExtractUserKey(key), ikey.sequence,
                                      kTypeDeletion);
            // anchor the key again
            key = delete_key.GetKey();
            // needed because ikey is backed by key
            ParseInternalKey(key, &ikey);
            // no value associated with delete
            value.clear();
            ++key_drop_user;
          } else if (value_changed) {
            value = compaction_filter_value;
          }
        }
      }

      // If there are no snapshots, then this kv affect visibility at tip.
      // Otherwise, search though all existing snapshots to find
      // the earlist snapshot that is affected by this kv.
      SequenceNumber prev_snapshot = 0;  // 0 means no previous snapshot
      SequenceNumber visible =
          visible_at_tip_
              ? visible_at_tip_
I
Igor Canadi 已提交
448 449
              : findEarliestVisibleSnapshot(ikey.sequence, existing_snapshots_,
                                            &prev_snapshot);
I
Igor Canadi 已提交
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486

      if (visible_in_snapshot == visible) {
        // If the earliest snapshot is which this key is visible in
        // is the same as the visibily of a previous instance of the
        // same key, then this kv is not visible in any snapshot.
        // Hidden by an newer entry for same user key
        // TODO: why not > ?
        assert(last_sequence_for_key >= ikey.sequence);
        drop = true;  // (A)
        ++key_drop_newer_entry;
      } else if (ikey.type == kTypeDeletion &&
                 ikey.sequence <= earliest_snapshot_ &&
                 compact_->compaction->KeyNotExistsBeyondOutputLevel(
                     ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
        ++key_drop_obsolete;
      } else if (ikey.type == kTypeMerge) {
        if (!merge.HasOperator()) {
          LogToBuffer(log_buffer_, "Options::merge_operator is null.");
          status = Status::InvalidArgument(
              "merge_operator is not properly initialized.");
          break;
        }
        // We know the merge type entry is not hidden, otherwise we would
        // have hit (A)
        // We encapsulate the merge related state machine in a different
        // object to minimize change to the existing flow. Turn out this
        // logic could also be nicely re-used for memtable flush purge
        // optimization in BuildTable.
        merge.MergeUntil(input, prev_snapshot, bottommost_level_,
I
Igor Canadi 已提交
487
                         db_options_.statistics.get(), nullptr, env_);
I
Igor Canadi 已提交
488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516

        current_entry_is_merging = true;
        if (merge.IsSuccess()) {
          // Successfully found Put/Delete/(end-of-key-range) while merging
          // Get the merge result
          key = merge.key();
          ParseInternalKey(key, &ikey);
          value = merge.value();
        } else {
          // Did not find a Put/Delete/(end-of-key-range) while merging
          // We now have some stack of merge operands to write out.
          // NOTE: key,value, and ikey are now referring to old entries.
          //       These will be correctly set below.
          assert(!merge.keys().empty());
          assert(merge.keys().size() == merge.values().size());

          // Hack to make sure last_sequence_for_key is correct
          ParseInternalKey(merge.keys().front(), &ikey);
        }
      }

      last_sequence_for_key = ikey.sequence;
      visible_in_snapshot = visible;
    }

    if (!drop) {
      // We may write a single key (e.g.: for Put/Delete or successful merge).
      // Or we may instead have to write a sequence/list of keys.
      // We have to write a sequence iff we have an unsuccessful merge
517 518 519 520 521 522 523
      if (current_entry_is_merging && !merge.IsSuccess()) {
        const auto& keys = merge.keys();
        const auto& values = merge.values();
        std::deque<std::string>::const_reverse_iterator key_iter =
          keys.rbegin();  // The back (*rbegin()) is the first key
        std::deque<std::string>::const_reverse_iterator value_iter =
          values.rbegin();
I
Igor Canadi 已提交
524 525 526 527

        key = Slice(*key_iter);
        value = Slice(*value_iter);

528 529 530
        // We have a list of keys to write, traverse the list.
        while (true) {
          status = WriteKeyValue(key, value, ikey, input->status());
I
Igor Canadi 已提交
531 532 533 534 535 536 537 538
          if (!status.ok()) {
            break;
          }

          ++key_iter;
          ++value_iter;

          // If at end of list
539
          if (key_iter == keys.rend() || value_iter == values.rend()) {
I
Igor Canadi 已提交
540
            // Sanity Check: if one ends, then both end
541
            assert(key_iter == keys.rend() && value_iter == values.rend());
I
Igor Canadi 已提交
542 543 544 545 546 547 548 549
            break;
          }

          // Otherwise not at end of list. Update key, value, and ikey.
          key = Slice(*key_iter);
          value = Slice(*value_iter);
          ParseInternalKey(key, &ikey);
        }
550 551 552 553
      } else {
        // There is only one item to be written out
        status = WriteKeyValue(key, value, ikey, input->status());
      }
I
Igor Canadi 已提交
554 555 556 557 558 559 560
    }    // if (!drop)

    // MergeUntil has moved input to the next entry
    if (!current_entry_is_merging) {
      input->Next();
    }
  }
561
  RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, total_filter_time);
562
  RecordDroppedKeys(&key_drop_user, &key_drop_newer_entry, &key_drop_obsolete);
I
Igor Canadi 已提交
563 564 565 566 567
  RecordCompactionIOStats();

  return status;
}

568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
Status CompactionJob::WriteKeyValue(const Slice& key, const Slice& value,
    const ParsedInternalKey& ikey, const Status& input_status) {
  Slice newkey(key.data(), key.size());
  std::string kstr;

  // Zeroing out the sequence number leads to better compression.
  // If this is the bottommost level (no files in lower levels)
  // and the earliest snapshot is larger than this seqno
  // then we can squash the seqno to zero.
  if (bottommost_level_ && ikey.sequence < earliest_snapshot_ &&
      ikey.type != kTypeMerge) {
    assert(ikey.type != kTypeDeletion);
    // make a copy because updating in place would cause problems
    // with the priority queue that is managing the input key iterator
    kstr.assign(key.data(), key.size());
    UpdateInternalKey(&kstr, (uint64_t)0, ikey.type);
    newkey = Slice(kstr);
  }

  // Open output file if necessary
  if (compact_->builder == nullptr) {
    Status status = OpenCompactionOutputFile();
    if (!status.ok()) {
      return status;
    }
  }

  SequenceNumber seqno = GetInternalKeySeqno(newkey);
  if (compact_->builder->NumEntries() == 0) {
    compact_->current_output()->smallest.DecodeFrom(newkey);
    compact_->current_output()->smallest_seqno = seqno;
  } else {
    compact_->current_output()->smallest_seqno =
        std::min(compact_->current_output()->smallest_seqno, seqno);
  }
  compact_->current_output()->largest.DecodeFrom(newkey);
  compact_->builder->Add(newkey, value);
  compact_->num_output_records++;
  compact_->current_output()->largest_seqno =
    std::max(compact_->current_output()->largest_seqno, seqno);

  // Close output file if it is big enough
  Status status;
  if (compact_->builder->FileSize() >=
      compact_->compaction->max_output_file_size()) {
    status = FinishCompactionOutputFile(input_status);
  }

  return status;
}

619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635
void CompactionJob::RecordDroppedKeys(
    int64_t* key_drop_user,
    int64_t* key_drop_newer_entry,
    int64_t* key_drop_obsolete) {
  if (*key_drop_user > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_USER, *key_drop_user);
    *key_drop_user = 0;
  }
  if (*key_drop_newer_entry > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, *key_drop_newer_entry);
    if (compaction_job_stats_) {
      compaction_job_stats_->num_records_replaced += *key_drop_newer_entry;
    }
    *key_drop_newer_entry = 0;
  }
  if (*key_drop_obsolete > 0) {
    RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, *key_drop_obsolete);
636 637 638 639
    if (compaction_job_stats_) {
      compaction_job_stats_->num_expired_deletion_records
          += *key_drop_obsolete;
    }
640 641 642 643
    *key_drop_obsolete = 0;
  }
}

644
Status CompactionJob::FinishCompactionOutputFile(const Status& input_status) {
645 646
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
I
Igor Canadi 已提交
647 648 649 650 651 652 653 654
  assert(compact_ != nullptr);
  assert(compact_->outfile);
  assert(compact_->builder != nullptr);

  const uint64_t output_number = compact_->current_output()->number;
  const uint32_t output_path_id = compact_->current_output()->path_id;
  assert(output_number != 0);

655
  TableProperties table_properties;
I
Igor Canadi 已提交
656
  // Check for iterator errors
657
  Status s = input_status;
I
Igor Canadi 已提交
658
  const uint64_t current_entries = compact_->builder->NumEntries();
659 660
  compact_->current_output()->need_compaction =
      compact_->builder->NeedCompact();
I
Igor Canadi 已提交
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691
  if (s.ok()) {
    s = compact_->builder->Finish();
  } else {
    compact_->builder->Abandon();
  }
  const uint64_t current_bytes = compact_->builder->FileSize();
  compact_->current_output()->file_size = current_bytes;
  compact_->total_bytes += current_bytes;

  // Finish and check for file errors
  if (s.ok() && !db_options_.disableDataSync) {
    if (db_options_.use_fsync) {
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
      s = compact_->outfile->Fsync();
    } else {
      StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
      s = compact_->outfile->Sync();
    }
  }
  if (s.ok()) {
    s = compact_->outfile->Close();
  }
  compact_->outfile.reset();

  if (s.ok() && current_entries > 0) {
    // Verify that the table is usable
    ColumnFamilyData* cfd = compact_->compaction->column_family_data();
    FileDescriptor fd(output_number, output_path_id, current_bytes);
    Iterator* iter = cfd->table_cache()->NewIterator(
        ReadOptions(), env_options_, cfd->internal_comparator(), fd);
    s = iter->status();
692

I
Igor Canadi 已提交
693
    if (s.ok() && paranoid_file_checks_) {
694 695 696 697
      for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
      s = iter->status();
    }

I
Igor Canadi 已提交
698 699
    delete iter;
    if (s.ok()) {
700 701 702 703 704 705 706
      TableFileCreationInfo info(compact_->builder->GetTableProperties());
      info.db_name = dbname_;
      info.cf_name = cfd->GetName();
      info.file_path = TableFileName(cfd->ioptions()->db_paths,
                                     fd.GetNumber(), fd.GetPathId());
      info.file_size = fd.GetFileSize();
      info.job_id = job_id_;
707 708
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
          "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
709
          " keys, %" PRIu64 " bytes%s",
710
          cfd->GetName().c_str(), job_id_, output_number, current_entries,
711 712 713
          current_bytes,
          compact_->current_output()->need_compaction ? " (need compaction)"
                                                      : "");
714 715
      EventHelpers::LogAndNotifyTableFileCreation(
          event_logger_, cfd->ioptions()->listeners, fd, info);
I
Igor Canadi 已提交
716 717
    }
  }
718
  compact_->builder.reset();
I
Igor Canadi 已提交
719 720 721
  return s;
}

I
Igor Canadi 已提交
722 723
Status CompactionJob::InstallCompactionResults(
    InstrumentedMutex* db_mutex, const MutableCFOptions& mutable_cf_options) {
724
  db_mutex->AssertHeld();
I
Igor Canadi 已提交
725

726
  auto* compaction = compact_->compaction;
I
Igor Canadi 已提交
727 728 729 730
  // paranoia: verify that the files that we started with
  // still exist in the current version and in the same original level.
  // This ensures that a concurrent compaction did not erroneously
  // pick the same files to compact_.
731 732 733
  if (!versions_->VerifyCompactionFileConsistency(compaction)) {
    Compaction::InputLevelSummaryBuffer inputs_summary;

734
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
735 736 737
        "[%s] [JOB %d] Compaction %s aborted",
        compaction->column_family_data()->GetName().c_str(), job_id_,
        compaction->InputLevelSummary(&inputs_summary));
I
Igor Canadi 已提交
738 739 740
    return Status::Corruption("Compaction input files inconsistent");
  }

741 742 743 744 745 746 747
  {
    Compaction::InputLevelSummaryBuffer inputs_summary;
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
        compaction->column_family_data()->GetName().c_str(), job_id_,
        compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
  }
I
Igor Canadi 已提交
748 749

  // Add compaction outputs
750
  compaction->AddInputDeletions(compact_->compaction->edit());
I
Igor Canadi 已提交
751 752
  for (size_t i = 0; i < compact_->outputs.size(); i++) {
    const CompactionState::Output& out = compact_->outputs[i];
753 754 755 756
    compaction->edit()->AddFile(compaction->output_level(), out.number,
                                out.path_id, out.file_size, out.smallest,
                                out.largest, out.smallest_seqno,
                                out.largest_seqno, out.need_compaction);
757 758
  }
  return versions_->LogAndApply(compaction->column_family_data(),
I
Igor Canadi 已提交
759
                                mutable_cf_options, compaction->edit(),
760
                                db_mutex, db_directory_);
I
Igor Canadi 已提交
761 762 763 764 765 766 767 768 769 770 771
}

// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
// sequence numbers.
// Employ a sequential search because the total number of
// snapshots are typically small.
inline SequenceNumber CompactionJob::findEarliestVisibleSnapshot(
    SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
    SequenceNumber* prev_snapshot) {
I
Igor Canadi 已提交
772
  assert(snapshots.size());
I
Igor Canadi 已提交
773 774 775 776 777 778 779 780 781 782
  SequenceNumber prev __attribute__((unused)) = 0;
  for (const auto cur : snapshots) {
    assert(prev <= cur);
    if (cur >= in) {
      *prev_snapshot = prev;
      return cur;
    }
    prev = cur;  // assignment
    assert(prev);
  }
783 784 785 786 787
  Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
      "CompactionJob is not able to find snapshot"
      " with SeqId later than %" PRIu64
      ": current MaxSeqId is %" PRIu64 "",
      in, snapshots[snapshots.size() - 1]);
I
Igor Canadi 已提交
788 789 790 791 792 793
  assert(0);
  return 0;
}

void CompactionJob::RecordCompactionIOStats() {
  RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
794 795
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
I
Igor Canadi 已提交
796 797
  IOSTATS_RESET(bytes_read);
  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
798 799
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
I
Igor Canadi 已提交
800 801 802 803 804 805
  IOSTATS_RESET(bytes_written);
}

Status CompactionJob::OpenCompactionOutputFile() {
  assert(compact_ != nullptr);
  assert(compact_->builder == nullptr);
806 807
  // no need to lock because VersionSet::next_file_number_ is atomic
  uint64_t file_number = versions_->NewFileNumber();
I
Igor Canadi 已提交
808 809
  // Make the output file
  std::string fname = TableFileName(db_options_.db_paths, file_number,
810
                                    compact_->compaction->output_path_id());
I
Igor Canadi 已提交
811 812 813 814
  Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_);

  if (!s.ok()) {
    Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log,
815
        "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
816
        " fails at NewWritableFile with status %s",
817
        compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
I
Igor Canadi 已提交
818 819 820 821 822 823
        file_number, s.ToString().c_str());
    LogFlush(db_options_.info_log);
    return s;
  }
  CompactionState::Output out;
  out.number = file_number;
824
  out.path_id = compact_->compaction->output_path_id();
I
Igor Canadi 已提交
825 826 827 828 829 830
  out.smallest.Clear();
  out.largest.Clear();
  out.smallest_seqno = out.largest_seqno = 0;

  compact_->outputs.push_back(out);
  compact_->outfile->SetIOPriority(Env::IO_LOW);
I
Igor Canadi 已提交
831 832
  compact_->outfile->SetPreallocationBlockSize(
      static_cast<size_t>(compact_->compaction->OutputFilePreallocationSize()));
I
Igor Canadi 已提交
833 834

  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
835 836 837 838 839 840 841 842 843 844
  bool skip_filters = false;

  // If the Column family flag is to only optimize filters for hits,
  // we can skip creating filters if this is the bottommost_level where
  // data is going to be found
  //
  if (cfd->ioptions()->optimize_filters_for_hits && bottommost_level_) {
    skip_filters = true;
  }

I
Igor Canadi 已提交
845
  compact_->builder.reset(NewTableBuilder(
846 847
      *cfd->ioptions(), cfd->internal_comparator(),
      cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(),
848
      compact_->compaction->output_compression(),
849
      cfd->ioptions()->compression_opts, skip_filters));
I
Igor Canadi 已提交
850 851 852 853
  LogFlush(db_options_.info_log);
  return s;
}

854
void CompactionJob::CleanupCompaction(const Status& status) {
I
Igor Canadi 已提交
855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874
  if (compact_->builder != nullptr) {
    // May happen if we get a shutdown call in the middle of compaction
    compact_->builder->Abandon();
    compact_->builder.reset();
  } else {
    assert(!status.ok() || compact_->outfile == nullptr);
  }
  for (size_t i = 0; i < compact_->outputs.size(); i++) {
    const CompactionState::Output& out = compact_->outputs[i];

    // If this file was inserted into the table cache then remove
    // them here because this compaction was not committed.
    if (!status.ok()) {
      TableCache::Evict(table_cache_.get(), out.number);
    }
  }
  delete compact_;
  compact_ = nullptr;
}

875 876 877
#ifndef ROCKSDB_LITE
namespace {
void CopyPrefix(
878 879 880 881
    const Slice& src, size_t prefix_length, std::string* dst) {
  assert(prefix_length > 0);
  size_t length = src.size() > prefix_length ? prefix_length : src.size();
  dst->assign(src.data(), length);
882 883 884 885 886
}
}  // namespace

#endif  // !ROCKSDB_LITE

A
Andres Notzli 已提交
887 888 889 890 891 892 893 894 895
void CompactionJob::UpdateCompactionStats() {
  size_t num_output_files = compact_->outputs.size();
  if (compact_->builder != nullptr) {
    // An error occurred so ignore the last output.
    assert(num_output_files > 0);
    --num_output_files;
  }
  compaction_stats_.num_output_files = static_cast<int>(num_output_files);

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914
  Compaction* compaction = compact_->compaction;
  compaction_stats_.num_input_files_in_non_output_levels = 0;
  compaction_stats_.num_input_files_in_output_level = 0;
  for (int input_level = 0;
       input_level < static_cast<int>(compaction->num_input_levels());
       ++input_level) {
    if (compaction->start_level() + input_level
        != compaction->output_level()) {
      UpdateCompactionInputStatsHelper(
          &compaction_stats_.num_input_files_in_non_output_levels,
          &compaction_stats_.bytes_read_non_output_levels,
          input_level);
    } else {
      UpdateCompactionInputStatsHelper(
          &compaction_stats_.num_input_files_in_output_level,
          &compaction_stats_.bytes_read_output_level,
          input_level);
    }
  }
A
Andres Notzli 已提交
915 916 917 918 919 920 921 922

  for (size_t i = 0; i < num_output_files; i++) {
    compaction_stats_.bytes_written += compact_->outputs[i].file_size;
  }
  if (compact_->num_input_records > compact_->num_output_records) {
    compaction_stats_.num_dropped_records +=
        compact_->num_input_records - compact_->num_output_records;
  }
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938
}

void CompactionJob::UpdateCompactionInputStatsHelper(
    int* num_files, uint64_t* bytes_read, int input_level) {
  const Compaction* compaction = compact_->compaction;
  auto num_input_files = compaction->num_input_files(input_level);
  *num_files += static_cast<int>(num_input_files);

  for (size_t i = 0; i < num_input_files; ++i) {
    const auto* file_meta = compaction->input(input_level, i);
    *bytes_read += file_meta->fd.GetFileSize();
    compaction_stats_.num_input_records +=
        static_cast<uint64_t>(file_meta->num_entries);
  }
}

939 940 941 942 943 944 945 946
void CompactionJob::UpdateCompactionJobStats(
    const InternalStats::CompactionStats& stats) const {
#ifndef ROCKSDB_LITE
  if (compaction_job_stats_) {
    compaction_job_stats_->elapsed_micros = stats.micros;

    // input information
    compaction_job_stats_->total_input_bytes =
947 948 949 950
        stats.bytes_read_non_output_levels +
        stats.bytes_read_output_level;
    compaction_job_stats_->num_input_records =
        compact_->num_input_records;
951
    compaction_job_stats_->num_input_files =
952 953
        stats.num_input_files_in_non_output_levels +
        stats.num_input_files_in_output_level;
954
    compaction_job_stats_->num_input_files_at_output_level =
955
        stats.num_input_files_in_output_level;
956 957 958 959 960

    // output information
    compaction_job_stats_->total_output_bytes = stats.bytes_written;
    compaction_job_stats_->num_output_records =
        compact_->num_output_records;
961
    compaction_job_stats_->num_output_files = stats.num_output_files;
962 963 964

    if (compact_->outputs.size() > 0U) {
      CopyPrefix(
965 966 967
          compact_->outputs[0].smallest.user_key(),
          CompactionJobStats::kMaxPrefixLength,
          &compaction_job_stats_->smallest_output_key_prefix);
968
      CopyPrefix(
969 970 971
          compact_->current_output()->largest.user_key(),
          CompactionJobStats::kMaxPrefixLength,
          &compaction_job_stats_->largest_output_key_prefix);
972 973 974 975 976
    }
  }
#endif  // !ROCKSDB_LITE
}

A
Andres Notzli 已提交
977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
void CompactionJob::LogCompaction(
    ColumnFamilyData* cfd, Compaction* compaction) {
  // Let's check if anything will get logged. Don't prepare all the info if
  // we're not logging
  if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
    Compaction::InputLevelSummaryBuffer inputs_summary;
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(),
        job_id_, compaction->InputLevelSummary(&inputs_summary),
        compaction->score());
    char scratch[2345];
    compaction->Summary(scratch, sizeof(scratch));
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
        "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch);
    // build event logger report
    auto stream = event_logger_->Log();
    stream << "job" << job_id_ << "event"
           << "compaction_started";
    for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
      stream << ("files_L" + ToString(compaction->level(i)));
      stream.StartArray();
      for (auto f : *compaction->inputs(i)) {
        stream << f->fd.GetNumber();
      }
      stream.EndArray();
    }
    stream << "score" << compaction->score() << "input_data_size"
           << compaction->CalculateTotalInputSize();
  }
}

I
Igor Canadi 已提交
1008
}  // namespace rocksdb