flush_job.cc 8.0 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
43
#include "util/file_util.h"
I
Igor Canadi 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"

namespace rocksdb {

FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
                   const DBOptions& db_options,
                   const MutableCFOptions& mutable_cf_options,
                   const EnvOptions& env_options, VersionSet* versions,
                   port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
                   SequenceNumber newest_snapshot, JobContext* job_context,
                   LogBuffer* log_buffer, Directory* db_directory,
61
                   Directory* output_file_directory,
I
Igor Canadi 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74
                   CompressionType output_compression, Statistics* stats)
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
      newest_snapshot_(newest_snapshot),
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
75
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
76 77 78
      output_compression_(output_compression),
      stats_(stats) {}

79
Status FlushJob::Run(uint64_t* file_number) {
I
Igor Canadi 已提交
80
  // Save the contents of the earliest memtable as a new Table
81
  uint64_t fn;
I
Igor Canadi 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  autovector<MemTable*> mems;
  cfd_->imm()->PickMemtablesToFlush(&mems);
  if (mems.empty()) {
    LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
                cfd_->GetName().c_str());
    return Status::OK();
  }

  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems[0];
  VersionEdit* edit = m->GetEdits();
  edit->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit->SetLogNumber(mems.back()->GetNextLogNumber());
  edit->SetColumnFamily(cfd_->GetID());

  // This will release and re-acquire the mutex.
102
  Status s = WriteLevel0Table(mems, edit, &fn);
I
Igor Canadi 已提交
103 104 105 106 107 108 109 110

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
111
    cfd_->imm()->RollbackMemtableFlush(mems, fn);
I
Igor Canadi 已提交
112 113 114
  } else {
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
115
        cfd_, mutable_cf_options_, mems, versions_, db_mutex_, fn,
I
Igor Canadi 已提交
116
        &job_context_->memtables_to_free, db_directory_, log_buffer_);
I
Igor Canadi 已提交
117 118
  }

119 120 121
  if (s.ok() && file_number != nullptr) {
    *file_number = fn;
  }
I
Igor Canadi 已提交
122 123 124 125 126 127 128 129
  return s;
}

Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
                                  VersionEdit* edit, uint64_t* filenumber) {
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
  FileMetaData meta;
130
  // path 0 for level 0 file.
I
Igor Canadi 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  *filenumber = meta.fd.GetNumber();

  const SequenceNumber earliest_seqno_in_memtable =
      mems[0]->GetFirstSequenceNumber();
  Version* base = cfd_->current();
  base->Ref();  // it is likely that we do not need this reference
  Status s;
  {
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
    std::vector<Iterator*> memtables;
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
    for (MemTable* m : mems) {
149
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
I
Igor Canadi 已提交
150 151 152 153 154
          "[%s] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), m->GetNextLogNumber());
      memtables.push_back(m->NewIterator(ro, &arena));
    }
    {
155 156 157
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
158
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
I
Igor Canadi 已提交
159 160 161 162 163 164 165 166 167 168
          "[%s] Level-0 flush table #%" PRIu64 ": started",
          cfd_->GetName().c_str(), meta.fd.GetNumber());

      s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
                     cfd_->table_cache(), iter.get(), &meta,
                     cfd_->internal_comparator(), newest_snapshot_,
                     earliest_seqno_in_memtable, output_compression_,
                     cfd_->ioptions()->compression_opts, Env::IO_HIGH);
      LogFlush(db_options_.info_log);
    }
169
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
I
Igor Canadi 已提交
170 171 172
        "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
        cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
        s.ToString().c_str());
173 174
    if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
      output_file_directory_->Fsync();
I
Igor Canadi 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
    }
    db_mutex_->Lock();
  }
  base->Unref();

  // re-acquire the most current version
  base = cfd_->current();

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.fd.GetFileSize() > 0) {
    const Slice min_user_key = meta.smallest.user_key();
    const Slice max_user_key = meta.largest.user_key();
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
    if (base != nullptr && db_options_.max_background_compactions <= 1 &&
        db_options_.max_background_flushes == 0 &&
        cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
S
sdong 已提交
196
      level = base->storage_info()->PickLevelForMemTableOutput(
S
sdong 已提交
197
          mutable_cf_options_, min_user_key, max_user_key);
198 199 200 201 202 203
      // If level does not match path id, reset level back to 0
      uint32_t fdpath = LevelCompactionPicker::GetPathId(
          *cfd_->ioptions(), mutable_cf_options_, level);
      if (fdpath != 0) {
        level = 0;
      }
I
Igor Canadi 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
    }
    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
  }

  InternalStats::CompactionStats stats(1);
  stats.micros = db_options_.env->NowMicros() - start_micros;
  stats.bytes_written = meta.fd.GetFileSize();
  cfd_->internal_stats()->AddCompactionStats(level, stats);
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
                                     meta.fd.GetFileSize());
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
  return s;
}

}  // namespace rocksdb