flush_job.cc 13.3 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
I
Igor Canadi 已提交
5 6 7 8 9 10 11 12 13 14 15 16
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17

I
Igor Canadi 已提交
18 19 20 21 22 23
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
24
#include "db/event_helpers.h"
I
Igor Canadi 已提交
25 26 27 28 29
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
30 31 32
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
33
#include "port/port.h"
34
#include "db/memtable.h"
I
Igor Canadi 已提交
35 36 37 38 39 40 41
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
42
#include "table/merging_iterator.h"
I
Igor Canadi 已提交
43 44 45
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
I
Igor Canadi 已提交
46
#include "util/event_logger.h"
47
#include "util/file_util.h"
48
#include "util/filename.h"
I
Igor Canadi 已提交
49
#include "util/log_buffer.h"
50
#include "util/logging.h"
I
Igor Canadi 已提交
51 52 53 54 55 56 57
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"

namespace rocksdb {

FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
58
                   const ImmutableDBOptions& db_options,
I
Igor Canadi 已提交
59
                   const MutableCFOptions& mutable_cf_options,
60
                   const EnvOptions env_options, VersionSet* versions,
61 62
                   InstrumentedMutex* db_mutex,
                   std::atomic<bool>* shutting_down,
I
Igor Canadi 已提交
63
                   std::vector<SequenceNumber> existing_snapshots,
64
                   SequenceNumber earliest_write_conflict_snapshot,
Y
Yi Wu 已提交
65 66 67
                   SnapshotChecker* snapshot_checker, JobContext* job_context,
                   LogBuffer* log_buffer, Directory* db_directory,
                   Directory* output_file_directory,
I
Igor Canadi 已提交
68
                   CompressionType output_compression, Statistics* stats,
69
                   EventLogger* event_logger, bool measure_io_stats)
I
Igor Canadi 已提交
70 71 72 73 74 75 76 77
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
I
Igor Canadi 已提交
78
      existing_snapshots_(std::move(existing_snapshots)),
79
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
Y
Yi Wu 已提交
80
      snapshot_checker_(snapshot_checker),
I
Igor Canadi 已提交
81 82 83
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
84
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
85
      output_compression_(output_compression),
I
Igor Canadi 已提交
86
      stats_(stats),
87
      event_logger_(event_logger),
88
      measure_io_stats_(measure_io_stats),
89 90
      edit_(nullptr),
      base_(nullptr),
91
      pick_memtable_called(false) {
92
  // Update the thread status to indicate flush.
93
  ReportStartedFlush();
94 95 96 97 98 99
  TEST_SYNC_POINT("FlushJob::FlushJob()");
}

FlushJob::~FlushJob() {
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
100

101
void FlushJob::ReportStartedFlush() {
102
  ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
Y
Yi Wu 已提交
103
                                    db_options_.enable_thread_tracking);
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_context_->job_id);
  IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  uint64_t input_size = 0;
  for (auto* mem : mems) {
    input_size += mem->ApproximateMemoryUsage();
  }
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_MEMTABLES,
      input_size);
}

void FlushJob::RecordFlushIOStats() {
122 123
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
124
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
125
  IOSTATS_RESET(bytes_written);
126 127
}

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
void FlushJob::PickMemTable() {
  db_mutex_->AssertHeld();
  assert(!pick_memtable_called);
  pick_memtable_called = true;
  // Save the contents of the earliest memtable as a new Table
  cfd_->imm()->PickMemtablesToFlush(&mems_);
  if (mems_.empty()) {
    return;
  }

  ReportFlushInputSize(mems_);

  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems_[0];
  edit_ = m->GetEdits();
  edit_->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
  edit_->SetColumnFamily(cfd_->GetID());

  // path 0 for level 0 file.
  meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);

  base_ = cfd_->current();
  base_->Ref();  // it is likely that we do not need this reference
}

158
Status FlushJob::Run(FileMetaData* file_meta) {
159 160
  db_mutex_->AssertHeld();
  assert(pick_memtable_called);
161 162
  AutoThreadOperationStageUpdater stage_run(
      ThreadStatus::STAGE_FLUSH_RUN);
163
  if (mems_.empty()) {
164 165
    ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
                     cfd_->GetName().c_str());
166 167 168
    return Status::OK();
  }

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
  // I/O measurement variables
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  uint64_t prev_write_nanos = 0;
  uint64_t prev_fsync_nanos = 0;
  uint64_t prev_range_sync_nanos = 0;
  uint64_t prev_prepare_write_nanos = 0;
  if (measure_io_stats_) {
    prev_perf_level = GetPerfLevel();
    SetPerfLevel(PerfLevel::kEnableTime);
    prev_write_nanos = IOSTATS(write_nanos);
    prev_fsync_nanos = IOSTATS(fsync_nanos);
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  }

I
Igor Canadi 已提交
184
  // This will release and re-acquire the mutex.
185
  Status s = WriteLevel0Table();
I
Igor Canadi 已提交
186 187 188 189 190 191 192 193

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
194
    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
I
Igor Canadi 已提交
195
  } else {
196
    TEST_SYNC_POINT("FlushJob::InstallResults");
I
Igor Canadi 已提交
197 198
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
199 200
        cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
201
        log_buffer_);
I
Igor Canadi 已提交
202 203
  }

204
  if (s.ok() && file_meta != nullptr) {
205
    *file_meta = meta_;
206
  }
207
  RecordFlushIOStats();
208

209 210 211 212 213 214 215 216 217 218
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_context_->job_id << "event"
         << "flush_finished";
  stream << "lsm_state";
  stream.StartArray();
  auto vstorage = cfd_->current()->storage_info();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();
219
  stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
220

221 222 223 224 225 226 227 228 229 230 231 232
  if (measure_io_stats_) {
    if (prev_perf_level != PerfLevel::kEnableTime) {
      SetPerfLevel(prev_perf_level);
    }
    stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
    stream << "file_range_sync_nanos"
           << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
    stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
    stream << "file_prepare_write_nanos"
           << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
  }

I
Igor Canadi 已提交
233 234 235
  return s;
}

236 237 238 239 240 241
void FlushJob::Cancel() {
  db_mutex_->AssertHeld();
  assert(base_ != nullptr);
  base_->Unref();
}

242
Status FlushJob::WriteLevel0Table() {
243 244
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
I
Igor Canadi 已提交
245 246 247 248
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
  Status s;
  {
S
Stream  
Shaohua Li 已提交
249
    auto write_hint = cfd_->CalculateSSTWriteHint(0);
I
Igor Canadi 已提交
250 251 252 253
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
A
Andrew Kryczka 已提交
254 255 256
    // memtables and range_del_iters store internal iterators over each data
    // memtable and its associated range deletion memtable, respectively, at
    // corresponding indexes.
S
sdong 已提交
257
    std::vector<InternalIterator*> memtables;
A
Andrew Kryczka 已提交
258
    std::vector<InternalIterator*> range_del_iters;
I
Igor Canadi 已提交
259 260 261
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
262 263
    uint64_t total_num_entries = 0, total_num_deletes = 0;
    size_t total_memory_usage = 0;
264
    for (MemTable* m : mems_) {
265 266
      ROCKS_LOG_INFO(
          db_options_.info_log,
267 268
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
I
Igor Canadi 已提交
269
      memtables.push_back(m->NewIterator(ro, &arena));
A
Andrew Kryczka 已提交
270 271 272 273
      auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
      if (range_del_iter != nullptr) {
        range_del_iters.push_back(range_del_iter);
      }
274 275 276
      total_num_entries += m->num_entries();
      total_num_deletes += m->num_deletes();
      total_memory_usage += m->ApproximateMemoryUsage();
I
Igor Canadi 已提交
277
    }
278 279 280

    event_logger_->Log() << "job" << job_context_->job_id << "event"
                         << "flush_started"
281
                         << "num_memtables" << mems_.size() << "num_entries"
282 283 284
                         << total_num_entries << "num_deletes"
                         << total_num_deletes << "memory_usage"
                         << total_memory_usage;
285

I
Igor Canadi 已提交
286
    {
287 288 289
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
290
      std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
291 292
          &cfd_->internal_comparator(),
          range_del_iters.empty() ? nullptr : &range_del_iters[0],
293
          static_cast<int>(range_del_iters.size())));
294 295 296 297
      ROCKS_LOG_INFO(db_options_.info_log,
                     "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
                     cfd_->GetName().c_str(), job_context_->job_id,
                     meta_.fd.GetNumber());
I
Igor Canadi 已提交
298

299 300
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                               &output_compression_);
301 302
      int64_t _current_time = 0;
      db_options_.env->GetCurrentTime(&_current_time);  // ignore error
S
Sagar Vemuri 已提交
303 304
      const uint64_t current_time = static_cast<uint64_t>(_current_time);

Y
Yi Wu 已提交
305 306 307
      uint64_t oldest_key_time =
          mems_.front()->ApproximateOldestKeyTime();

308
      s = BuildTable(
A
Aaron Gao 已提交
309
          dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
310
          env_options_, cfd_->table_cache(), iter.get(),
A
Andrew Kryczka 已提交
311 312 313
          std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
          cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
          cfd_->GetName(), existing_snapshots_,
Y
Yi Wu 已提交
314 315
          earliest_write_conflict_snapshot_, snapshot_checker_,
          output_compression_, cfd_->ioptions()->compression_opts,
316
          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
317
          TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Y
Yi Wu 已提交
318
          Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
S
Stream  
Shaohua Li 已提交
319
          oldest_key_time, write_hint);
I
Igor Canadi 已提交
320 321
      LogFlush(db_options_.info_log);
    }
322 323 324 325 326 327 328 329
    ROCKS_LOG_INFO(db_options_.info_log,
                   "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
                   " bytes %s"
                   "%s",
                   cfd_->GetName().c_str(), job_context_->job_id,
                   meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
                   s.ToString().c_str(),
                   meta_.marked_for_compaction ? " (needs compaction)" : "");
330

S
Sagar Vemuri 已提交
331
    if (output_file_directory_ != nullptr) {
332
      output_file_directory_->Fsync();
I
Igor Canadi 已提交
333
    }
334
    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
I
Igor Canadi 已提交
335 336
    db_mutex_->Lock();
  }
337
  base_->Unref();
I
Igor Canadi 已提交
338 339 340

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
341
  if (s.ok() && meta_.fd.GetFileSize() > 0) {
I
Igor Canadi 已提交
342 343 344 345
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
346
    // Add file to L0
347 348 349 350
    edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
                   meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
                   meta_.smallest_seqno, meta_.largest_seqno,
                   meta_.marked_for_compaction);
I
Igor Canadi 已提交
351 352
  }

353
  // Note that here we treat flush as level 0 compaction in internal stats
I
Igor Canadi 已提交
354 355
  InternalStats::CompactionStats stats(1);
  stats.micros = db_options_.env->NowMicros() - start_micros;
356
  stats.bytes_written = meta_.fd.GetFileSize();
357
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
I
Igor Canadi 已提交
358
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
359
                                     meta_.fd.GetFileSize());
360
  RecordFlushIOStats();
I
Igor Canadi 已提交
361 362 363 364
  return s;
}

}  // namespace rocksdb