flush_job.cc 14.7 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
I
Igor Canadi 已提交
5 6 7 8 9 10 11 12 13 14 15 16
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17

I
Igor Canadi 已提交
18 19 20 21 22 23
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
24
#include "db/event_helpers.h"
I
Igor Canadi 已提交
25 26 27 28 29
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
30 31 32
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
33
#include "port/port.h"
34
#include "db/memtable.h"
I
Igor Canadi 已提交
35 36 37 38 39 40 41
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
42
#include "table/merging_iterator.h"
I
Igor Canadi 已提交
43 44 45
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
I
Igor Canadi 已提交
46
#include "util/event_logger.h"
47
#include "util/file_util.h"
48
#include "util/filename.h"
I
Igor Canadi 已提交
49
#include "util/log_buffer.h"
50
#include "util/logging.h"
I
Igor Canadi 已提交
51 52 53 54 55 56
#include "util/mutexlock.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"

namespace rocksdb {

57 58
const char* GetFlushReasonString (FlushReason flush_reason) {
  switch (flush_reason) {
Z
Zhongyi Xie 已提交
59 60
    case FlushReason::kOthers:
      return "Other Reasons";
61 62 63 64 65 66 67 68 69 70 71 72 73 74
    case FlushReason::kGetLiveFiles:
      return "Get Live Files";
    case FlushReason::kShutDown:
      return "Shut down";
    case FlushReason::kExternalFileIngestion:
      return "External File Ingestion";
    case FlushReason::kManualCompaction:
      return "Manual Compaction";
    case FlushReason::kWriteBufferManager:
      return "Write Buffer Manager";
    case FlushReason::kWriteBufferFull:
      return "Write Buffer Full";
    case FlushReason::kTest:
      return "Test";
Z
Zhongyi Xie 已提交
75 76 77 78 79 80
    case FlushReason::kDeleteFiles:
      return "Delete Files";
    case FlushReason::kAutoCompaction:
      return "Auto Compaction";
    case FlushReason::kManualFlush:
      return "Manual Flush";
81 82 83 84 85 86
    default:
      return "Invalid";
  }
}


I
Igor Canadi 已提交
87
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
88
                   const ImmutableDBOptions& db_options,
I
Igor Canadi 已提交
89
                   const MutableCFOptions& mutable_cf_options,
90
                   const EnvOptions env_options, VersionSet* versions,
91 92
                   InstrumentedMutex* db_mutex,
                   std::atomic<bool>* shutting_down,
I
Igor Canadi 已提交
93
                   std::vector<SequenceNumber> existing_snapshots,
94
                   SequenceNumber earliest_write_conflict_snapshot,
Y
Yi Wu 已提交
95 96 97
                   SnapshotChecker* snapshot_checker, JobContext* job_context,
                   LogBuffer* log_buffer, Directory* db_directory,
                   Directory* output_file_directory,
I
Igor Canadi 已提交
98
                   CompressionType output_compression, Statistics* stats,
99
                   EventLogger* event_logger, bool measure_io_stats)
I
Igor Canadi 已提交
100 101 102 103 104 105 106 107
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
I
Igor Canadi 已提交
108
      existing_snapshots_(std::move(existing_snapshots)),
109
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
Y
Yi Wu 已提交
110
      snapshot_checker_(snapshot_checker),
I
Igor Canadi 已提交
111 112 113
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
114
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
115
      output_compression_(output_compression),
I
Igor Canadi 已提交
116
      stats_(stats),
117
      event_logger_(event_logger),
118
      measure_io_stats_(measure_io_stats),
119 120
      edit_(nullptr),
      base_(nullptr),
121
      pick_memtable_called(false) {
122
  // Update the thread status to indicate flush.
123
  ReportStartedFlush();
124 125 126 127 128 129
  TEST_SYNC_POINT("FlushJob::FlushJob()");
}

FlushJob::~FlushJob() {
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
130

131
void FlushJob::ReportStartedFlush() {
132
  ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
Y
Yi Wu 已提交
133
                                    db_options_.enable_thread_tracking);
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_context_->job_id);
  IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  uint64_t input_size = 0;
  for (auto* mem : mems) {
    input_size += mem->ApproximateMemoryUsage();
  }
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_MEMTABLES,
      input_size);
}

void FlushJob::RecordFlushIOStats() {
152 153
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
154
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
155
  IOSTATS_RESET(bytes_written);
156 157
}

158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
void FlushJob::PickMemTable() {
  db_mutex_->AssertHeld();
  assert(!pick_memtable_called);
  pick_memtable_called = true;
  // Save the contents of the earliest memtable as a new Table
  cfd_->imm()->PickMemtablesToFlush(&mems_);
  if (mems_.empty()) {
    return;
  }

  ReportFlushInputSize(mems_);

  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems_[0];
  edit_ = m->GetEdits();
  edit_->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
  edit_->SetColumnFamily(cfd_->GetID());

  // path 0 for level 0 file.
  meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);

  base_ = cfd_->current();
  base_->Ref();  // it is likely that we do not need this reference
}

S
Siying Dong 已提交
188 189
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
                     FileMetaData* file_meta) {
190
  TEST_SYNC_POINT("FlushJob::Start");
191 192
  db_mutex_->AssertHeld();
  assert(pick_memtable_called);
193 194
  AutoThreadOperationStageUpdater stage_run(
      ThreadStatus::STAGE_FLUSH_RUN);
195
  if (mems_.empty()) {
196 197
    ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
                     cfd_->GetName().c_str());
198 199 200
    return Status::OK();
  }

201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
  // I/O measurement variables
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  uint64_t prev_write_nanos = 0;
  uint64_t prev_fsync_nanos = 0;
  uint64_t prev_range_sync_nanos = 0;
  uint64_t prev_prepare_write_nanos = 0;
  if (measure_io_stats_) {
    prev_perf_level = GetPerfLevel();
    SetPerfLevel(PerfLevel::kEnableTime);
    prev_write_nanos = IOSTATS(write_nanos);
    prev_fsync_nanos = IOSTATS(fsync_nanos);
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
  }

I
Igor Canadi 已提交
216
  // This will release and re-acquire the mutex.
217
  Status s = WriteLevel0Table();
I
Igor Canadi 已提交
218 219 220 221 222 223 224 225

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
226
    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
I
Igor Canadi 已提交
227
  } else {
228
    TEST_SYNC_POINT("FlushJob::InstallResults");
I
Igor Canadi 已提交
229 230
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
S
Siying Dong 已提交
231
        cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
232
        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
233
        log_buffer_);
I
Igor Canadi 已提交
234 235
  }

236
  if (s.ok() && file_meta != nullptr) {
237
    *file_meta = meta_;
238
  }
239
  RecordFlushIOStats();
240

241 242 243
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_context_->job_id << "event"
         << "flush_finished";
244 245
  stream << "output_compression"
         << CompressionTypeToString(output_compression_);
246 247 248 249 250 251 252
  stream << "lsm_state";
  stream.StartArray();
  auto vstorage = cfd_->current()->storage_info();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();
253
  stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
254

255 256 257 258 259 260 261 262 263 264 265 266
  if (measure_io_stats_) {
    if (prev_perf_level != PerfLevel::kEnableTime) {
      SetPerfLevel(prev_perf_level);
    }
    stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
    stream << "file_range_sync_nanos"
           << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
    stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
    stream << "file_prepare_write_nanos"
           << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
  }

I
Igor Canadi 已提交
267 268 269
  return s;
}

270 271 272 273 274 275
void FlushJob::Cancel() {
  db_mutex_->AssertHeld();
  assert(base_ != nullptr);
  base_->Unref();
}

276
Status FlushJob::WriteLevel0Table() {
277 278
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
I
Igor Canadi 已提交
279 280 281 282
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
  Status s;
  {
S
Stream  
Shaohua Li 已提交
283
    auto write_hint = cfd_->CalculateSSTWriteHint(0);
I
Igor Canadi 已提交
284 285 286 287
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
A
Andrew Kryczka 已提交
288 289 290
    // memtables and range_del_iters store internal iterators over each data
    // memtable and its associated range deletion memtable, respectively, at
    // corresponding indexes.
S
sdong 已提交
291
    std::vector<InternalIterator*> memtables;
A
Andrew Kryczka 已提交
292
    std::vector<InternalIterator*> range_del_iters;
I
Igor Canadi 已提交
293 294 295
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
296 297
    uint64_t total_num_entries = 0, total_num_deletes = 0;
    size_t total_memory_usage = 0;
298
    for (MemTable* m : mems_) {
299 300
      ROCKS_LOG_INFO(
          db_options_.info_log,
301 302
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
I
Igor Canadi 已提交
303
      memtables.push_back(m->NewIterator(ro, &arena));
A
Andrew Kryczka 已提交
304 305 306 307
      auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
      if (range_del_iter != nullptr) {
        range_del_iters.push_back(range_del_iter);
      }
308 309 310
      total_num_entries += m->num_entries();
      total_num_deletes += m->num_deletes();
      total_memory_usage += m->ApproximateMemoryUsage();
I
Igor Canadi 已提交
311
    }
312

313 314 315 316 317 318 319
    event_logger_->Log()
        << "job" << job_context_->job_id << "event"
        << "flush_started"
        << "num_memtables" << mems_.size() << "num_entries" << total_num_entries
        << "num_deletes" << total_num_deletes << "memory_usage"
        << total_memory_usage << "flush_reason"
        << GetFlushReasonString(cfd_->GetFlushReason());
320

I
Igor Canadi 已提交
321
    {
322 323 324
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
325
      std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
326 327
          &cfd_->internal_comparator(),
          range_del_iters.empty() ? nullptr : &range_del_iters[0],
328
          static_cast<int>(range_del_iters.size())));
329 330 331 332
      ROCKS_LOG_INFO(db_options_.info_log,
                     "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
                     cfd_->GetName().c_str(), job_context_->job_id,
                     meta_.fd.GetNumber());
I
Igor Canadi 已提交
333

334 335
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                               &output_compression_);
336
      int64_t _current_time = 0;
337 338 339 340 341 342 343 344 345
      auto status = db_options_.env->GetCurrentTime(&_current_time);
      // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
      if (!status.ok()) {
        ROCKS_LOG_WARN(
            db_options_.info_log,
            "Failed to get current time to populate creation_time property. "
            "Status: %s",
            status.ToString().c_str());
      }
S
Sagar Vemuri 已提交
346 347
      const uint64_t current_time = static_cast<uint64_t>(_current_time);

Y
Yi Wu 已提交
348 349 350
      uint64_t oldest_key_time =
          mems_.front()->ApproximateOldestKeyTime();

351
      s = BuildTable(
A
Aaron Gao 已提交
352
          dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
353
          env_options_, cfd_->table_cache(), iter.get(),
A
Andrew Kryczka 已提交
354 355 356
          std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
          cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
          cfd_->GetName(), existing_snapshots_,
Y
Yi Wu 已提交
357 358
          earliest_write_conflict_snapshot_, snapshot_checker_,
          output_compression_, cfd_->ioptions()->compression_opts,
359
          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
360
          TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Y
Yi Wu 已提交
361
          Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
S
Stream  
Shaohua Li 已提交
362
          oldest_key_time, write_hint);
I
Igor Canadi 已提交
363 364
      LogFlush(db_options_.info_log);
    }
365 366 367 368 369 370 371 372
    ROCKS_LOG_INFO(db_options_.info_log,
                   "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
                   " bytes %s"
                   "%s",
                   cfd_->GetName().c_str(), job_context_->job_id,
                   meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
                   s.ToString().c_str(),
                   meta_.marked_for_compaction ? " (needs compaction)" : "");
373

S
Sagar Vemuri 已提交
374
    if (output_file_directory_ != nullptr) {
375
      output_file_directory_->Fsync();
I
Igor Canadi 已提交
376
    }
377
    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
I
Igor Canadi 已提交
378 379
    db_mutex_->Lock();
  }
380
  base_->Unref();
I
Igor Canadi 已提交
381 382 383

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
384
  if (s.ok() && meta_.fd.GetFileSize() > 0) {
I
Igor Canadi 已提交
385 386 387 388
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
389
    // Add file to L0
390 391
    edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
                   meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
392
                   meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
393
                   meta_.marked_for_compaction);
I
Igor Canadi 已提交
394 395
  }

396
  // Note that here we treat flush as level 0 compaction in internal stats
397
  InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
I
Igor Canadi 已提交
398
  stats.micros = db_options_.env->NowMicros() - start_micros;
399
  stats.bytes_written = meta_.fd.GetFileSize();
400
  MeasureTime(stats_, FLUSH_TIME, stats.micros);
401
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
I
Igor Canadi 已提交
402
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
403
                                     meta_.fd.GetFileSize());
404
  RecordFlushIOStats();
I
Igor Canadi 已提交
405 406 407 408
  return s;
}

}  // namespace rocksdb