compaction_job.h 4.6 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once

#include <atomic>
#include <deque>
#include <limits>
#include <set>
#include <utility>
#include <vector>
#include <string>
#include <functional>

#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "db/memtable_list.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
32
#include "util/event_logger.h"
I
Igor Canadi 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"
#include "db/job_context.h"

namespace rocksdb {

class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class Arena;

class CompactionJob {
 public:
53
  CompactionJob(int job_id, Compaction* compaction, const DBOptions& db_options,
I
Igor Canadi 已提交
54
                const EnvOptions& env_options, VersionSet* versions,
55
                std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
56
                Directory* db_directory, Directory* output_directory,
I
Igor Canadi 已提交
57 58 59
                Statistics* stats,
                std::vector<SequenceNumber> existing_snapshots,
                std::shared_ptr<Cache> table_cache,
60
                std::function<uint64_t()> yield_callback,
61 62
                EventLogger* event_logger, bool paranoid_file_checks,
                const std::string& dbname);
I
Igor Canadi 已提交
63

64
  ~CompactionJob();
I
Igor Canadi 已提交
65 66 67 68 69 70 71 72 73 74 75 76

  // no copy/move
  CompactionJob(CompactionJob&& job) = delete;
  CompactionJob(const CompactionJob& job) = delete;
  CompactionJob& operator=(const CompactionJob& job) = delete;

  // REQUIRED: mutex held
  void Prepare();
  // REQUIRED mutex not held
  Status Run();
  // REQUIRED: mutex held
  // status is the return of Run()
I
Igor Canadi 已提交
77 78
  void Install(Status* status, const MutableCFOptions& mutable_cf_options,
               InstrumentedMutex* db_mutex);
I
Igor Canadi 已提交
79 80

 private:
81 82
  // update the thread status for starting a compaction.
  void ReportStartedCompaction(Compaction* compaction);
I
Igor Canadi 已提交
83 84 85 86
  void AllocateCompactionOutputFileNumbers();
  // Call compaction filter if is_compaction_v2 is not true. Then iterate
  // through input and compact the kv-pairs
  Status ProcessKeyValueCompaction(int64_t* imm_micros, Iterator* input,
S
sdong 已提交
87
                                   bool is_compaction_v2);
I
Igor Canadi 已提交
88
  // Call compaction_filter_v2->Filter() on kv-pairs in compact
89 90
  void CallCompactionFilterV2(CompactionFilterV2* compaction_filter_v2,
                              uint64_t* time);
I
Igor Canadi 已提交
91
  Status FinishCompactionOutputFile(Iterator* input);
I
Igor Canadi 已提交
92 93
  Status InstallCompactionResults(InstrumentedMutex* db_mutex,
                                  const MutableCFOptions& mutable_cf_options);
I
Igor Canadi 已提交
94 95 96 97 98
  SequenceNumber findEarliestVisibleSnapshot(
      SequenceNumber in, const std::vector<SequenceNumber>& snapshots,
      SequenceNumber* prev_snapshot);
  void RecordCompactionIOStats();
  Status OpenCompactionOutputFile();
99
  void CleanupCompaction(const Status& status);
I
Igor Canadi 已提交
100

101 102
  int job_id_;

I
Igor Canadi 已提交
103 104 105 106 107 108 109 110 111 112 113 114
  // CompactionJob state
  struct CompactionState;
  CompactionState* compact_;

  bool bottommost_level_;
  SequenceNumber earliest_snapshot_;
  SequenceNumber visible_at_tip_;
  SequenceNumber latest_snapshot_;

  InternalStats::CompactionStats compaction_stats_;

  // DBImpl state
115
  const std::string& dbname_;
I
Igor Canadi 已提交
116 117 118 119 120 121 122
  const DBOptions& db_options_;
  const EnvOptions& env_options_;
  Env* env_;
  VersionSet* versions_;
  std::atomic<bool>* shutting_down_;
  LogBuffer* log_buffer_;
  Directory* db_directory_;
123
  Directory* output_directory_;
I
Igor Canadi 已提交
124
  Statistics* stats_;
I
Igor Canadi 已提交
125 126 127 128 129
  // If there were two snapshots with seq numbers s1 and
  // s2 and s1 < s2, and if we find two instances of a key k1 then lies
  // entirely within s1 and s2, then the earlier version of k1 can be safely
  // deleted because that version is not visible in any snapshot.
  std::vector<SequenceNumber> existing_snapshots_;
I
Igor Canadi 已提交
130 131 132 133
  std::shared_ptr<Cache> table_cache_;

  // yield callback
  std::function<uint64_t()> yield_callback_;
134 135

  EventLogger* event_logger_;
I
Igor Canadi 已提交
136 137

  bool paranoid_file_checks_;
I
Igor Canadi 已提交
138 139 140
};

}  // namespace rocksdb