flush_job.cc 11.3 KB
Newer Older
I
Igor Canadi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
23
#include "db/event_helpers.h"
I
Igor Canadi 已提交
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
I
Igor Canadi 已提交
44
#include "util/event_logger.h"
45
#include "util/file_util.h"
I
Igor Canadi 已提交
46 47 48 49 50 51 52
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
53
#include "util/thread_status_util.h"
I
Igor Canadi 已提交
54 55 56 57 58 59 60

namespace rocksdb {

FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
                   const DBOptions& db_options,
                   const MutableCFOptions& mutable_cf_options,
                   const EnvOptions& env_options, VersionSet* versions,
61 62
                   InstrumentedMutex* db_mutex,
                   std::atomic<bool>* shutting_down,
I
Igor Canadi 已提交
63 64
                   SequenceNumber newest_snapshot, JobContext* job_context,
                   LogBuffer* log_buffer, Directory* db_directory,
65
                   Directory* output_file_directory,
I
Igor Canadi 已提交
66 67
                   CompressionType output_compression, Statistics* stats,
                   EventLogger* event_logger)
I
Igor Canadi 已提交
68 69 70 71 72 73 74 75 76 77 78 79
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
      newest_snapshot_(newest_snapshot),
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
80
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
81
      output_compression_(output_compression),
I
Igor Canadi 已提交
82
      stats_(stats),
83 84
      event_logger_(event_logger) {
  // Update the thread status to indicate flush.
85
  ReportStartedFlush();
86 87 88 89 90 91 92
  TEST_SYNC_POINT("FlushJob::FlushJob()");
}

FlushJob::~FlushJob() {
  TEST_SYNC_POINT("FlushJob::~FlushJob()");
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
93

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
void FlushJob::ReportStartedFlush() {
  ThreadStatusUtil::SetColumnFamily(cfd_);
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_context_->job_id);
  IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  uint64_t input_size = 0;
  for (auto* mem : mems) {
    input_size += mem->ApproximateMemoryUsage();
  }
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_MEMTABLES,
      input_size);
}

void FlushJob::RecordFlushIOStats() {
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
  IOSTATS_RESET(bytes_written);
}

119
Status FlushJob::Run(uint64_t* file_number) {
120 121
  AutoThreadOperationStageUpdater stage_run(
      ThreadStatus::STAGE_FLUSH_RUN);
I
Igor Canadi 已提交
122
  // Save the contents of the earliest memtable as a new Table
123
  uint64_t fn;
I
Igor Canadi 已提交
124 125 126 127 128 129 130 131
  autovector<MemTable*> mems;
  cfd_->imm()->PickMemtablesToFlush(&mems);
  if (mems.empty()) {
    LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
                cfd_->GetName().c_str());
    return Status::OK();
  }

132
  ReportFlushInputSize(mems);
133

I
Igor Canadi 已提交
134 135 136 137 138 139 140 141 142 143 144 145
  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems[0];
  VersionEdit* edit = m->GetEdits();
  edit->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit->SetLogNumber(mems.back()->GetNextLogNumber());
  edit->SetColumnFamily(cfd_->GetID());

  // This will release and re-acquire the mutex.
146
  Status s = WriteLevel0Table(mems, edit, &fn);
I
Igor Canadi 已提交
147 148 149 150 151 152 153 154

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
155
    cfd_->imm()->RollbackMemtableFlush(mems, fn);
I
Igor Canadi 已提交
156 157 158
  } else {
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
159
        cfd_, mutable_cf_options_, mems, versions_, db_mutex_, fn,
I
Igor Canadi 已提交
160
        &job_context_->memtables_to_free, db_directory_, log_buffer_);
I
Igor Canadi 已提交
161 162
  }

163 164 165
  if (s.ok() && file_number != nullptr) {
    *file_number = fn;
  }
166
  RecordFlushIOStats();
167

168 169 170 171 172 173 174 175 176 177 178
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_context_->job_id << "event"
         << "flush_finished";
  stream << "lsm_state";
  stream.StartArray();
  auto vstorage = cfd_->current()->storage_info();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();

I
Igor Canadi 已提交
179 180 181 182 183
  return s;
}

Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
                                  VersionEdit* edit, uint64_t* filenumber) {
184 185
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
I
Igor Canadi 已提交
186 187 188
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
  FileMetaData meta;
189
  // path 0 for level 0 file.
I
Igor Canadi 已提交
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
  *filenumber = meta.fd.GetNumber();

  const SequenceNumber earliest_seqno_in_memtable =
      mems[0]->GetFirstSequenceNumber();
  Version* base = cfd_->current();
  base->Ref();  // it is likely that we do not need this reference
  Status s;
  {
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
    std::vector<Iterator*> memtables;
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
207 208
    uint64_t total_num_entries = 0, total_num_deletes = 0;
    size_t total_memory_usage = 0;
I
Igor Canadi 已提交
209
    for (MemTable* m : mems) {
210
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
211 212
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
I
Igor Canadi 已提交
213
      memtables.push_back(m->NewIterator(ro, &arena));
214 215 216
      total_num_entries += m->num_entries();
      total_num_deletes += m->num_deletes();
      total_memory_usage += m->ApproximateMemoryUsage();
I
Igor Canadi 已提交
217
    }
218 219 220 221 222 223 224

    event_logger_->Log() << "job" << job_context_->job_id << "event"
                         << "flush_started"
                         << "num_memtables" << mems.size() << "num_entries"
                         << total_num_entries << "num_deletes"
                         << total_num_deletes << "memory_usage"
                         << total_memory_usage;
225
    TableFileCreationInfo info;
I
Igor Canadi 已提交
226
    {
227 228 229
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
230
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
231 232
          "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
          cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber());
I
Igor Canadi 已提交
233

234 235
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                               &output_compression_);
I
Igor Canadi 已提交
236 237
      s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
                     cfd_->table_cache(), iter.get(), &meta,
238 239
                     cfd_->internal_comparator(),
                     cfd_->int_tbl_prop_collector_factories(), newest_snapshot_,
I
Igor Canadi 已提交
240
                     earliest_seqno_in_memtable, output_compression_,
241
                     cfd_->ioptions()->compression_opts,
242
                     mutable_cf_options_.paranoid_file_checks, Env::IO_HIGH,
243
                     &info.table_properties);
I
Igor Canadi 已提交
244 245
      LogFlush(db_options_.info_log);
    }
246
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
247 248 249
        "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
        cfd_->GetName().c_str(), job_context_->job_id, meta.fd.GetNumber(),
        meta.fd.GetFileSize(), s.ToString().c_str());
250 251 252

    // output to event logger
    if (s.ok()) {
253 254 255 256 257 258 259 260 261 262
      info.db_name = dbname_;
      info.cf_name = cfd_->GetName();
      info.file_path = TableFileName(db_options_.db_paths,
                                     meta.fd.GetNumber(),
                                     meta.fd.GetPathId());
      info.file_size = meta.fd.GetFileSize();
      info.job_id = job_context_->job_id;
      EventHelpers::LogAndNotifyTableFileCreation(
          event_logger_, db_options_.listeners,
          meta.fd, info);
263 264
    }

265 266
    if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
      output_file_directory_->Fsync();
I
Igor Canadi 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
    }
    db_mutex_->Lock();
  }
  base->Unref();

  // re-acquire the most current version
  base = cfd_->current();

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
  int level = 0;
  if (s.ok() && meta.fd.GetFileSize() > 0) {
    const Slice min_user_key = meta.smallest.user_key();
    const Slice max_user_key = meta.largest.user_key();
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
    if (base != nullptr && db_options_.max_background_compactions <= 1 &&
        db_options_.max_background_flushes == 0 &&
        cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
S
sdong 已提交
288
      level = base->storage_info()->PickLevelForMemTableOutput(
S
sdong 已提交
289
          mutable_cf_options_, min_user_key, max_user_key);
290 291 292 293 294 295
      // If level does not match path id, reset level back to 0
      uint32_t fdpath = LevelCompactionPicker::GetPathId(
          *cfd_->ioptions(), mutable_cf_options_, level);
      if (fdpath != 0) {
        level = 0;
      }
I
Igor Canadi 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
    }
    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
                  meta.smallest_seqno, meta.largest_seqno);
  }

  InternalStats::CompactionStats stats(1);
  stats.micros = db_options_.env->NowMicros() - start_micros;
  stats.bytes_written = meta.fd.GetFileSize();
  cfd_->internal_stats()->AddCompactionStats(level, stats);
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
                                     meta.fd.GetFileSize());
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
  return s;
}

}  // namespace rocksdb