flush_job.cc 12.7 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
I
Igor Canadi 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17

I
Igor Canadi 已提交
18 19 20 21 22 23
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
24
#include "db/event_helpers.h"
I
Igor Canadi 已提交
25 26 27 28 29 30 31 32
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/likely.h"
33
#include "port/port.h"
I
Igor Canadi 已提交
34 35 36 37 38 39 40
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
41
#include "table/merging_iterator.h"
I
Igor Canadi 已提交
42 43 44
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
I
Igor Canadi 已提交
45
#include "util/event_logger.h"
46
#include "util/file_util.h"
47
#include "util/iostats_context_imp.h"
I
Igor Canadi 已提交
48
#include "util/log_buffer.h"
49
#include "util/logging.h"
I
Igor Canadi 已提交
50 51 52 53
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
54
#include "util/thread_status_util.h"
I
Igor Canadi 已提交
55 56 57 58

namespace rocksdb {

FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
59
                   const ImmutableDBOptions& db_options,
I
Igor Canadi 已提交
60 61
                   const MutableCFOptions& mutable_cf_options,
                   const EnvOptions& env_options, VersionSet* versions,
62 63
                   InstrumentedMutex* db_mutex,
                   std::atomic<bool>* shutting_down,
I
Igor Canadi 已提交
64
                   std::vector<SequenceNumber> existing_snapshots,
65
                   SequenceNumber earliest_write_conflict_snapshot,
I
Igor Canadi 已提交
66 67
                   JobContext* job_context, LogBuffer* log_buffer,
                   Directory* db_directory, Directory* output_file_directory,
I
Igor Canadi 已提交
68
                   CompressionType output_compression, Statistics* stats,
69
                   EventLogger* event_logger, bool measure_io_stats)
I
Igor Canadi 已提交
70 71 72 73 74 75 76 77
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
I
Igor Canadi 已提交
78
      existing_snapshots_(std::move(existing_snapshots)),
79
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
I
Igor Canadi 已提交
80 81 82
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
83
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
84
      output_compression_(output_compression),
I
Igor Canadi 已提交
85
      stats_(stats),
86
      event_logger_(event_logger),
87 88
      measure_io_stats_(measure_io_stats),
      pick_memtable_called(false) {
89
  // Update the thread status to indicate flush.
90
  ReportStartedFlush();
91 92 93 94 95 96
  TEST_SYNC_POINT("FlushJob::FlushJob()");
}

FlushJob::~FlushJob() {
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
97

98
void FlushJob::ReportStartedFlush() {
99
  ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
Y
Yi Wu 已提交
100
                                    db_options_.enable_thread_tracking);
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_context_->job_id);
  IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  uint64_t input_size = 0;
  for (auto* mem : mems) {
    input_size += mem->ApproximateMemoryUsage();
  }
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_MEMTABLES,
      input_size);
}

void FlushJob::RecordFlushIOStats() {
119 120
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
121
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
122
  IOSTATS_RESET(bytes_written);
123 124
}

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
void FlushJob::PickMemTable() {
  db_mutex_->AssertHeld();
  assert(!pick_memtable_called);
  pick_memtable_called = true;
  // Save the contents of the earliest memtable as a new Table
  cfd_->imm()->PickMemtablesToFlush(&mems_);
  if (mems_.empty()) {
    return;
  }

  ReportFlushInputSize(mems_);

  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems_[0];
  edit_ = m->GetEdits();
  edit_->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
  edit_->SetColumnFamily(cfd_->GetID());

  // path 0 for level 0 file.
  meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);

  base_ = cfd_->current();
  base_->Ref();  // it is likely that we do not need this reference
}

155
Status FlushJob::Run(FileMetaData* file_meta) {
156 157
  db_mutex_->AssertHeld();
  assert(pick_memtable_called);
158 159
  AutoThreadOperationStageUpdater stage_run(
      ThreadStatus::STAGE_FLUSH_RUN);
160 161 162 163 164 165
  if (mems_.empty()) {
    LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
                cfd_->GetName().c_str());
    return Status::OK();
  }

166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
  // I/O measurement variables
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  uint64_t prev_write_nanos = 0;
  uint64_t prev_fsync_nanos = 0;
  uint64_t prev_range_sync_nanos = 0;
  uint64_t prev_prepare_write_nanos = 0;
  if (measure_io_stats_) {
    prev_perf_level = GetPerfLevel();
    SetPerfLevel(PerfLevel::kEnableTime);
    prev_write_nanos = IOSTATS(write_nanos);
    prev_fsync_nanos = IOSTATS(fsync_nanos);
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  }

I
Igor Canadi 已提交
181
  // This will release and re-acquire the mutex.
182
  Status s = WriteLevel0Table();
I
Igor Canadi 已提交
183 184 185 186 187 188 189 190

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
191
    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
I
Igor Canadi 已提交
192
  } else {
193
    TEST_SYNC_POINT("FlushJob::InstallResults");
I
Igor Canadi 已提交
194 195
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
196 197
        cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
198
        log_buffer_);
I
Igor Canadi 已提交
199 200
  }

201
  if (s.ok() && file_meta != nullptr) {
202
    *file_meta = meta_;
203
  }
204
  RecordFlushIOStats();
205

206 207 208 209 210 211 212 213 214 215
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_context_->job_id << "event"
         << "flush_finished";
  stream << "lsm_state";
  stream.StartArray();
  auto vstorage = cfd_->current()->storage_info();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();
216
  stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
217

218 219 220 221 222 223 224 225 226 227 228 229
  if (measure_io_stats_) {
    if (prev_perf_level != PerfLevel::kEnableTime) {
      SetPerfLevel(prev_perf_level);
    }
    stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
    stream << "file_range_sync_nanos"
           << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
    stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
    stream << "file_prepare_write_nanos"
           << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
  }

I
Igor Canadi 已提交
230 231 232
  return s;
}

233 234 235 236 237 238
void FlushJob::Cancel() {
  db_mutex_->AssertHeld();
  assert(base_ != nullptr);
  base_->Unref();
}

239
Status FlushJob::WriteLevel0Table() {
240 241
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
I
Igor Canadi 已提交
242 243 244 245 246 247 248 249
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
  Status s;
  {
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
A
Andrew Kryczka 已提交
250 251 252
    // memtables and range_del_iters store internal iterators over each data
    // memtable and its associated range deletion memtable, respectively, at
    // corresponding indexes.
S
sdong 已提交
253
    std::vector<InternalIterator*> memtables;
A
Andrew Kryczka 已提交
254
    std::vector<InternalIterator*> range_del_iters;
I
Igor Canadi 已提交
255 256 257
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
258 259
    uint64_t total_num_entries = 0, total_num_deletes = 0;
    size_t total_memory_usage = 0;
260
    for (MemTable* m : mems_) {
261
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
262 263
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
I
Igor Canadi 已提交
264
      memtables.push_back(m->NewIterator(ro, &arena));
A
Andrew Kryczka 已提交
265 266 267 268
      auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
      if (range_del_iter != nullptr) {
        range_del_iters.push_back(range_del_iter);
      }
269 270 271
      total_num_entries += m->num_entries();
      total_num_deletes += m->num_deletes();
      total_memory_usage += m->ApproximateMemoryUsage();
I
Igor Canadi 已提交
272
    }
273 274 275

    event_logger_->Log() << "job" << job_context_->job_id << "event"
                         << "flush_started"
276
                         << "num_memtables" << mems_.size() << "num_entries"
277 278 279
                         << total_num_entries << "num_deletes"
                         << total_num_deletes << "memory_usage"
                         << total_memory_usage;
280

I
Igor Canadi 已提交
281
    {
282 283 284
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
285
      std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
286 287
          &cfd_->internal_comparator(),
          range_del_iters.empty() ? nullptr : &range_del_iters[0],
288
          static_cast<int>(range_del_iters.size())));
289
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
290
          "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
291
          cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber());
I
Igor Canadi 已提交
292

293 294
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                               &output_compression_);
295
      s = BuildTable(
A
Aaron Gao 已提交
296
          dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
A
Andrew Kryczka 已提交
297 298 299 300
          env_options_, cfd_->table_cache(), iter.get(),
          std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
          cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
          cfd_->GetName(), existing_snapshots_,
301 302
          earliest_write_conflict_snapshot_, output_compression_,
          cfd_->ioptions()->compression_opts,
303
          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
304
          TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
305
          Env::IO_HIGH, &table_properties_, 0 /* level */);
I
Igor Canadi 已提交
306 307
      LogFlush(db_options_.info_log);
    }
308
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
309 310 311
        "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
        " bytes %s"
        "%s",
312 313 314
        cfd_->GetName().c_str(), job_context_->job_id, meta_.fd.GetNumber(),
        meta_.fd.GetFileSize(), s.ToString().c_str(),
        meta_.marked_for_compaction ? " (needs compaction)" : "");
315

S
Sagar Vemuri 已提交
316
    if (output_file_directory_ != nullptr) {
317
      output_file_directory_->Fsync();
I
Igor Canadi 已提交
318
    }
319
    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
I
Igor Canadi 已提交
320 321
    db_mutex_->Lock();
  }
322
  base_->Unref();
I
Igor Canadi 已提交
323 324 325

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
326
  if (s.ok() && meta_.fd.GetFileSize() > 0) {
I
Igor Canadi 已提交
327 328 329 330
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
331
    // Add file to L0
332 333 334 335
    edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
                   meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
                   meta_.smallest_seqno, meta_.largest_seqno,
                   meta_.marked_for_compaction);
I
Igor Canadi 已提交
336 337
  }

338
  // Note that here we treat flush as level 0 compaction in internal stats
I
Igor Canadi 已提交
339 340
  InternalStats::CompactionStats stats(1);
  stats.micros = db_options_.env->NowMicros() - start_micros;
341
  stats.bytes_written = meta_.fd.GetFileSize();
342
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
I
Igor Canadi 已提交
343
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
344
                                     meta_.fd.GetFileSize());
345
  RecordFlushIOStats();
I
Igor Canadi 已提交
346 347 348 349
  return s;
}

}  // namespace rocksdb