flush_job.cc 13.2 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
I
Igor Canadi 已提交
5 6 7 8 9 10 11 12 13 14 15 16
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17

I
Igor Canadi 已提交
18 19 20 21 22 23
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
24
#include "db/event_helpers.h"
I
Igor Canadi 已提交
25 26 27 28 29
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
30 31 32
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
33
#include "port/port.h"
34
#include "db/memtable.h"
I
Igor Canadi 已提交
35 36 37 38 39 40 41
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
42
#include "table/merging_iterator.h"
I
Igor Canadi 已提交
43 44 45
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
I
Igor Canadi 已提交
46
#include "util/event_logger.h"
47
#include "util/file_util.h"
48
#include "util/filename.h"
I
Igor Canadi 已提交
49
#include "util/log_buffer.h"
50
#include "util/logging.h"
I
Igor Canadi 已提交
51 52 53 54 55 56 57
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"

namespace rocksdb {

FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
58
                   const ImmutableDBOptions& db_options,
I
Igor Canadi 已提交
59
                   const MutableCFOptions& mutable_cf_options,
60
                   const EnvOptions env_options, VersionSet* versions,
61 62
                   InstrumentedMutex* db_mutex,
                   std::atomic<bool>* shutting_down,
I
Igor Canadi 已提交
63
                   std::vector<SequenceNumber> existing_snapshots,
64
                   SequenceNumber earliest_write_conflict_snapshot,
Y
Yi Wu 已提交
65 66 67
                   SnapshotChecker* snapshot_checker, JobContext* job_context,
                   LogBuffer* log_buffer, Directory* db_directory,
                   Directory* output_file_directory,
I
Igor Canadi 已提交
68
                   CompressionType output_compression, Statistics* stats,
69
                   EventLogger* event_logger, bool measure_io_stats)
I
Igor Canadi 已提交
70 71 72 73 74 75 76 77
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
I
Igor Canadi 已提交
78
      existing_snapshots_(std::move(existing_snapshots)),
79
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
Y
Yi Wu 已提交
80
      snapshot_checker_(snapshot_checker),
I
Igor Canadi 已提交
81 82 83
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
84
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
85
      output_compression_(output_compression),
I
Igor Canadi 已提交
86
      stats_(stats),
87
      event_logger_(event_logger),
88
      measure_io_stats_(measure_io_stats),
89 90
      edit_(nullptr),
      base_(nullptr),
91
      pick_memtable_called(false) {
92
  // Update the thread status to indicate flush.
93
  ReportStartedFlush();
94 95 96 97 98 99
  TEST_SYNC_POINT("FlushJob::FlushJob()");
}

FlushJob::~FlushJob() {
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
100

101
void FlushJob::ReportStartedFlush() {
102
  ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
Y
Yi Wu 已提交
103
                                    db_options_.enable_thread_tracking);
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_context_->job_id);
  IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  uint64_t input_size = 0;
  for (auto* mem : mems) {
    input_size += mem->ApproximateMemoryUsage();
  }
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_MEMTABLES,
      input_size);
}

void FlushJob::RecordFlushIOStats() {
122 123
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
124
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
125
  IOSTATS_RESET(bytes_written);
126 127
}

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
void FlushJob::PickMemTable() {
  db_mutex_->AssertHeld();
  assert(!pick_memtable_called);
  pick_memtable_called = true;
  // Save the contents of the earliest memtable as a new Table
  cfd_->imm()->PickMemtablesToFlush(&mems_);
  if (mems_.empty()) {
    return;
  }

  ReportFlushInputSize(mems_);

  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems_[0];
  edit_ = m->GetEdits();
  edit_->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
  edit_->SetColumnFamily(cfd_->GetID());

  // path 0 for level 0 file.
  meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);

  base_ = cfd_->current();
  base_->Ref();  // it is likely that we do not need this reference
}

158
Status FlushJob::Run(FileMetaData* file_meta) {
159 160
  db_mutex_->AssertHeld();
  assert(pick_memtable_called);
161 162
  AutoThreadOperationStageUpdater stage_run(
      ThreadStatus::STAGE_FLUSH_RUN);
163
  if (mems_.empty()) {
164 165
    ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
                     cfd_->GetName().c_str());
166 167 168
    return Status::OK();
  }

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  // I/O measurement variables
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  uint64_t prev_write_nanos = 0;
  uint64_t prev_fsync_nanos = 0;
  uint64_t prev_range_sync_nanos = 0;
  uint64_t prev_prepare_write_nanos = 0;
  if (measure_io_stats_) {
    prev_perf_level = GetPerfLevel();
    SetPerfLevel(PerfLevel::kEnableTime);
    prev_write_nanos = IOSTATS(write_nanos);
    prev_fsync_nanos = IOSTATS(fsync_nanos);
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  }

I
Igor Canadi 已提交
184
  // This will release and re-acquire the mutex.
185
  Status s = WriteLevel0Table();
I
Igor Canadi 已提交
186 187 188 189 190 191 192 193

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
194
    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
I
Igor Canadi 已提交
195
  } else {
196
    TEST_SYNC_POINT("FlushJob::InstallResults");
I
Igor Canadi 已提交
197 198
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
199 200
        cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
201
        log_buffer_);
I
Igor Canadi 已提交
202 203
  }

204
  if (s.ok() && file_meta != nullptr) {
205
    *file_meta = meta_;
206
  }
207
  RecordFlushIOStats();
208

209 210 211 212 213 214 215 216 217 218
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_context_->job_id << "event"
         << "flush_finished";
  stream << "lsm_state";
  stream.StartArray();
  auto vstorage = cfd_->current()->storage_info();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();
219
  stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
220

221 222 223 224 225 226 227 228 229 230 231 232
  if (measure_io_stats_) {
    if (prev_perf_level != PerfLevel::kEnableTime) {
      SetPerfLevel(prev_perf_level);
    }
    stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
    stream << "file_range_sync_nanos"
           << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
    stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
    stream << "file_prepare_write_nanos"
           << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
  }

I
Igor Canadi 已提交
233 234 235
  return s;
}

236 237 238 239 240 241
void FlushJob::Cancel() {
  db_mutex_->AssertHeld();
  assert(base_ != nullptr);
  base_->Unref();
}

242
Status FlushJob::WriteLevel0Table() {
243 244
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
I
Igor Canadi 已提交
245 246 247 248 249 250 251 252
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
  Status s;
  {
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
A
Andrew Kryczka 已提交
253 254 255
    // memtables and range_del_iters store internal iterators over each data
    // memtable and its associated range deletion memtable, respectively, at
    // corresponding indexes.
S
sdong 已提交
256
    std::vector<InternalIterator*> memtables;
A
Andrew Kryczka 已提交
257
    std::vector<InternalIterator*> range_del_iters;
I
Igor Canadi 已提交
258 259 260
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
261 262
    uint64_t total_num_entries = 0, total_num_deletes = 0;
    size_t total_memory_usage = 0;
263
    for (MemTable* m : mems_) {
264 265
      ROCKS_LOG_INFO(
          db_options_.info_log,
266 267
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
I
Igor Canadi 已提交
268
      memtables.push_back(m->NewIterator(ro, &arena));
A
Andrew Kryczka 已提交
269 270 271 272
      auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
      if (range_del_iter != nullptr) {
        range_del_iters.push_back(range_del_iter);
      }
273 274 275
      total_num_entries += m->num_entries();
      total_num_deletes += m->num_deletes();
      total_memory_usage += m->ApproximateMemoryUsage();
I
Igor Canadi 已提交
276
    }
277 278 279

    event_logger_->Log() << "job" << job_context_->job_id << "event"
                         << "flush_started"
280
                         << "num_memtables" << mems_.size() << "num_entries"
281 282 283
                         << total_num_entries << "num_deletes"
                         << total_num_deletes << "memory_usage"
                         << total_memory_usage;
284

I
Igor Canadi 已提交
285
    {
286 287 288
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
289
      std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
290 291
          &cfd_->internal_comparator(),
          range_del_iters.empty() ? nullptr : &range_del_iters[0],
292
          static_cast<int>(range_del_iters.size())));
293 294 295 296
      ROCKS_LOG_INFO(db_options_.info_log,
                     "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
                     cfd_->GetName().c_str(), job_context_->job_id,
                     meta_.fd.GetNumber());
I
Igor Canadi 已提交
297

298 299
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                               &output_compression_);
300 301
      int64_t _current_time = 0;
      db_options_.env->GetCurrentTime(&_current_time);  // ignore error
S
Sagar Vemuri 已提交
302 303
      const uint64_t current_time = static_cast<uint64_t>(_current_time);

Y
Yi Wu 已提交
304 305 306
      uint64_t oldest_key_time =
          mems_.front()->ApproximateOldestKeyTime();

307
      s = BuildTable(
A
Aaron Gao 已提交
308
          dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
309
          env_options_, cfd_->table_cache(), iter.get(),
A
Andrew Kryczka 已提交
310 311 312
          std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
          cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
          cfd_->GetName(), existing_snapshots_,
Y
Yi Wu 已提交
313 314
          earliest_write_conflict_snapshot_, snapshot_checker_,
          output_compression_, cfd_->ioptions()->compression_opts,
315
          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
316
          TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Y
Yi Wu 已提交
317 318
          Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
          oldest_key_time);
I
Igor Canadi 已提交
319 320
      LogFlush(db_options_.info_log);
    }
321 322 323 324 325 326 327 328
    ROCKS_LOG_INFO(db_options_.info_log,
                   "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
                   " bytes %s"
                   "%s",
                   cfd_->GetName().c_str(), job_context_->job_id,
                   meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
                   s.ToString().c_str(),
                   meta_.marked_for_compaction ? " (needs compaction)" : "");
329

S
Sagar Vemuri 已提交
330
    if (output_file_directory_ != nullptr) {
331
      output_file_directory_->Fsync();
I
Igor Canadi 已提交
332
    }
333
    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
I
Igor Canadi 已提交
334 335
    db_mutex_->Lock();
  }
336
  base_->Unref();
I
Igor Canadi 已提交
337 338 339

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
340
  if (s.ok() && meta_.fd.GetFileSize() > 0) {
I
Igor Canadi 已提交
341 342 343 344
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
345
    // Add file to L0
346 347 348 349
    edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
                   meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
                   meta_.smallest_seqno, meta_.largest_seqno,
                   meta_.marked_for_compaction);
I
Igor Canadi 已提交
350 351
  }

352
  // Note that here we treat flush as level 0 compaction in internal stats
I
Igor Canadi 已提交
353 354
  InternalStats::CompactionStats stats(1);
  stats.micros = db_options_.env->NowMicros() - start_micros;
355
  stats.bytes_written = meta_.fd.GetFileSize();
356
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
I
Igor Canadi 已提交
357
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
358
                                     meta_.fd.GetFileSize());
359
  RecordFlushIOStats();
I
Igor Canadi 已提交
360 361 362 363
  return s;
}

}  // namespace rocksdb