flush_job.cc 17.0 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
I
Igor Canadi 已提交
5 6 7 8 9 10 11
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

12
#include <cinttypes>
13

I
Igor Canadi 已提交
14 15 16 17 18 19
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
20
#include "db/event_helpers.h"
I
Igor Canadi 已提交
21 22
#include "db/log_reader.h"
#include "db/log_writer.h"
23
#include "db/memtable.h"
I
Igor Canadi 已提交
24 25
#include "db/memtable_list.h"
#include "db/merge_context.h"
26
#include "db/range_tombstone_fragmenter.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28 29
#include "file/file_util.h"
#include "file/filename.h"
30 31 32
#include "logging/event_logger.h"
#include "logging/log_buffer.h"
#include "logging/logging.h"
33 34 35
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
36
#include "port/port.h"
I
Igor Canadi 已提交
37 38 39 40 41
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
42 43
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
44
#include "table/merging_iterator.h"
I
Igor Canadi 已提交
45 46
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
47
#include "test_util/sync_point.h"
I
Igor Canadi 已提交
48 49 50 51 52 53
#include "util/coding.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"

namespace rocksdb {

54 55
const char* GetFlushReasonString (FlushReason flush_reason) {
  switch (flush_reason) {
Z
Zhongyi Xie 已提交
56 57
    case FlushReason::kOthers:
      return "Other Reasons";
58 59 60 61 62 63 64 65 66 67 68 69 70 71
    case FlushReason::kGetLiveFiles:
      return "Get Live Files";
    case FlushReason::kShutDown:
      return "Shut down";
    case FlushReason::kExternalFileIngestion:
      return "External File Ingestion";
    case FlushReason::kManualCompaction:
      return "Manual Compaction";
    case FlushReason::kWriteBufferManager:
      return "Write Buffer Manager";
    case FlushReason::kWriteBufferFull:
      return "Write Buffer Full";
    case FlushReason::kTest:
      return "Test";
Z
Zhongyi Xie 已提交
72 73 74 75 76 77
    case FlushReason::kDeleteFiles:
      return "Delete Files";
    case FlushReason::kAutoCompaction:
      return "Auto Compaction";
    case FlushReason::kManualFlush:
      return "Manual Flush";
78 79
    case FlushReason::kErrorRecovery:
      return "Error Recovery";
80 81 82 83 84
    default:
      return "Invalid";
  }
}

I
Igor Canadi 已提交
85
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
86
                   const ImmutableDBOptions& db_options,
I
Igor Canadi 已提交
87
                   const MutableCFOptions& mutable_cf_options,
88 89
                   const uint64_t* max_memtable_id,
                   const EnvOptions& env_options, VersionSet* versions,
90 91
                   InstrumentedMutex* db_mutex,
                   std::atomic<bool>* shutting_down,
I
Igor Canadi 已提交
92
                   std::vector<SequenceNumber> existing_snapshots,
93
                   SequenceNumber earliest_write_conflict_snapshot,
Y
Yi Wu 已提交
94 95 96
                   SnapshotChecker* snapshot_checker, JobContext* job_context,
                   LogBuffer* log_buffer, Directory* db_directory,
                   Directory* output_file_directory,
I
Igor Canadi 已提交
97
                   CompressionType output_compression, Statistics* stats,
98
                   EventLogger* event_logger, bool measure_io_stats,
99 100
                   const bool sync_output_directory, const bool write_manifest,
                   Env::Priority thread_pri)
I
Igor Canadi 已提交
101 102 103 104
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
105
      max_memtable_id_(max_memtable_id),
I
Igor Canadi 已提交
106 107 108 109
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
I
Igor Canadi 已提交
110
      existing_snapshots_(std::move(existing_snapshots)),
111
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
Y
Yi Wu 已提交
112
      snapshot_checker_(snapshot_checker),
I
Igor Canadi 已提交
113 114 115
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
116
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
117
      output_compression_(output_compression),
I
Igor Canadi 已提交
118
      stats_(stats),
119
      event_logger_(event_logger),
120
      measure_io_stats_(measure_io_stats),
121 122
      sync_output_directory_(sync_output_directory),
      write_manifest_(write_manifest),
123 124
      edit_(nullptr),
      base_(nullptr),
125 126
      pick_memtable_called(false),
      thread_pri_(thread_pri) {
127
  // Update the thread status to indicate flush.
128
  ReportStartedFlush();
129 130 131 132 133 134
  TEST_SYNC_POINT("FlushJob::FlushJob()");
}

FlushJob::~FlushJob() {
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
135

136
void FlushJob::ReportStartedFlush() {
137
  ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
Y
Yi Wu 已提交
138
                                    db_options_.enable_thread_tracking);
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_context_->job_id);
  IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  uint64_t input_size = 0;
  for (auto* mem : mems) {
    input_size += mem->ApproximateMemoryUsage();
  }
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_MEMTABLES,
      input_size);
}

void FlushJob::RecordFlushIOStats() {
157 158
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
  ThreadStatusUtil::IncreaseThreadOperationProperty(
159
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
160
  IOSTATS_RESET(bytes_written);
161 162
}

163 164 165 166 167
void FlushJob::PickMemTable() {
  db_mutex_->AssertHeld();
  assert(!pick_memtable_called);
  pick_memtable_called = true;
  // Save the contents of the earliest memtable as a new Table
168
  cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
  if (mems_.empty()) {
    return;
  }

  ReportFlushInputSize(mems_);

  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems_[0];
  edit_ = m->GetEdits();
  edit_->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
  edit_->SetColumnFamily(cfd_->GetID());

  // path 0 for level 0 file.
  meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);

  base_ = cfd_->current();
  base_->Ref();  // it is likely that we do not need this reference
}

S
Siying Dong 已提交
193 194
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
                     FileMetaData* file_meta) {
195
  TEST_SYNC_POINT("FlushJob::Start");
196 197
  db_mutex_->AssertHeld();
  assert(pick_memtable_called);
198 199
  AutoThreadOperationStageUpdater stage_run(
      ThreadStatus::STAGE_FLUSH_RUN);
200
  if (mems_.empty()) {
201 202
    ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
                     cfd_->GetName().c_str());
203 204 205
    return Status::OK();
  }

206 207 208 209 210 211
  // I/O measurement variables
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
  uint64_t prev_write_nanos = 0;
  uint64_t prev_fsync_nanos = 0;
  uint64_t prev_range_sync_nanos = 0;
  uint64_t prev_prepare_write_nanos = 0;
212 213
  uint64_t prev_cpu_write_nanos = 0;
  uint64_t prev_cpu_read_nanos = 0;
214 215 216 217 218 219 220
  if (measure_io_stats_) {
    prev_perf_level = GetPerfLevel();
    SetPerfLevel(PerfLevel::kEnableTime);
    prev_write_nanos = IOSTATS(write_nanos);
    prev_fsync_nanos = IOSTATS(fsync_nanos);
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
221 222
    prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
    prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
223 224
  }

I
Igor Canadi 已提交
225
  // This will release and re-acquire the mutex.
226
  Status s = WriteLevel0Table();
I
Igor Canadi 已提交
227

228 229 230 231 232 233
  if (s.ok() && cfd_->IsDropped()) {
    s = Status::ColumnFamilyDropped("Column family dropped during compaction");
  }
  if ((s.ok() || s.IsColumnFamilyDropped()) &&
      shutting_down_->load(std::memory_order_acquire)) {
    s = Status::ShutdownInProgress("Database shutdown");
I
Igor Canadi 已提交
234 235 236
  }

  if (!s.ok()) {
237
    cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
238
  } else if (write_manifest_) {
239
    TEST_SYNC_POINT("FlushJob::InstallResults");
I
Igor Canadi 已提交
240
    // Replace immutable memtable with the generated Table
241
    s = cfd_->imm()->TryInstallMemtableFlushResults(
S
Siying Dong 已提交
242
        cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
243
        meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
244
        log_buffer_, &committed_flush_jobs_info_);
I
Igor Canadi 已提交
245 246
  }

247
  if (s.ok() && file_meta != nullptr) {
248
    *file_meta = meta_;
249
  }
250
  RecordFlushIOStats();
251

252 253 254
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_context_->job_id << "event"
         << "flush_finished";
255 256
  stream << "output_compression"
         << CompressionTypeToString(output_compression_);
257 258 259 260 261 262 263
  stream << "lsm_state";
  stream.StartArray();
  auto vstorage = cfd_->current()->storage_info();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();
264
  stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
265

266 267 268 269 270 271 272 273 274 275
  if (measure_io_stats_) {
    if (prev_perf_level != PerfLevel::kEnableTime) {
      SetPerfLevel(prev_perf_level);
    }
    stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
    stream << "file_range_sync_nanos"
           << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
    stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
    stream << "file_prepare_write_nanos"
           << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
276 277 278 279
    stream << "file_cpu_write_nanos"
           << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
    stream << "file_cpu_read_nanos"
           << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
280 281
  }

I
Igor Canadi 已提交
282 283 284
  return s;
}

285 286 287 288 289 290
void FlushJob::Cancel() {
  db_mutex_->AssertHeld();
  assert(base_ != nullptr);
  base_->Unref();
}

291
Status FlushJob::WriteLevel0Table() {
292 293
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
I
Igor Canadi 已提交
294 295
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
296
  const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
I
Igor Canadi 已提交
297 298
  Status s;
  {
S
Stream  
Shaohua Li 已提交
299
    auto write_hint = cfd_->CalculateSSTWriteHint(0);
I
Igor Canadi 已提交
300 301 302 303
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
A
Andrew Kryczka 已提交
304 305 306
    // memtables and range_del_iters store internal iterators over each data
    // memtable and its associated range deletion memtable, respectively, at
    // corresponding indexes.
S
sdong 已提交
307
    std::vector<InternalIterator*> memtables;
308 309
    std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
        range_del_iters;
I
Igor Canadi 已提交
310 311 312
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
313
    uint64_t total_num_entries = 0, total_num_deletes = 0;
314
    uint64_t total_data_size = 0;
315
    size_t total_memory_usage = 0;
316
    for (MemTable* m : mems_) {
317 318
      ROCKS_LOG_INFO(
          db_options_.info_log,
319 320
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
I
Igor Canadi 已提交
321
      memtables.push_back(m->NewIterator(ro, &arena));
322
      auto* range_del_iter =
323
          m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
A
Andrew Kryczka 已提交
324
      if (range_del_iter != nullptr) {
325
        range_del_iters.emplace_back(range_del_iter);
A
Andrew Kryczka 已提交
326
      }
327 328
      total_num_entries += m->num_entries();
      total_num_deletes += m->num_deletes();
329
      total_data_size += m->get_data_size();
330
      total_memory_usage += m->ApproximateMemoryUsage();
I
Igor Canadi 已提交
331
    }
332

333 334 335 336 337 338 339 340
    event_logger_->Log() << "job" << job_context_->job_id << "event"
                         << "flush_started"
                         << "num_memtables" << mems_.size() << "num_entries"
                         << total_num_entries << "num_deletes"
                         << total_num_deletes << "total_data_size"
                         << total_data_size << "memory_usage"
                         << total_memory_usage << "flush_reason"
                         << GetFlushReasonString(cfd_->GetFlushReason());
341

I
Igor Canadi 已提交
342
    {
343 344 345
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
346 347 348 349
      ROCKS_LOG_INFO(db_options_.info_log,
                     "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
                     cfd_->GetName().c_str(), job_context_->job_id,
                     meta_.fd.GetNumber());
I
Igor Canadi 已提交
350

351 352
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                               &output_compression_);
353
      int64_t _current_time = 0;
354 355 356 357 358 359 360 361 362
      auto status = db_options_.env->GetCurrentTime(&_current_time);
      // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
      if (!status.ok()) {
        ROCKS_LOG_WARN(
            db_options_.info_log,
            "Failed to get current time to populate creation_time property. "
            "Status: %s",
            status.ToString().c_str());
      }
S
Sagar Vemuri 已提交
363 364
      const uint64_t current_time = static_cast<uint64_t>(_current_time);

Y
Yi Wu 已提交
365 366 367
      uint64_t oldest_key_time =
          mems_.front()->ApproximateOldestKeyTime();

368
      s = BuildTable(
A
Aaron Gao 已提交
369
          dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
370
          env_options_, cfd_->table_cache(), iter.get(),
371
          std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
A
Andrew Kryczka 已提交
372 373
          cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
          cfd_->GetName(), existing_snapshots_,
Y
Yi Wu 已提交
374
          earliest_write_conflict_snapshot_, snapshot_checker_,
375 376
          output_compression_, mutable_cf_options_.sample_for_compression,
          cfd_->ioptions()->compression_opts,
377
          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
378
          TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Y
Yi Wu 已提交
379
          Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
S
Sagar Vemuri 已提交
380
          oldest_key_time, write_hint, current_time);
I
Igor Canadi 已提交
381 382
      LogFlush(db_options_.info_log);
    }
383 384 385 386 387 388 389 390
    ROCKS_LOG_INFO(db_options_.info_log,
                   "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
                   " bytes %s"
                   "%s",
                   cfd_->GetName().c_str(), job_context_->job_id,
                   meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
                   s.ToString().c_str(),
                   meta_.marked_for_compaction ? " (needs compaction)" : "");
391

392
    if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
393
      s = output_file_directory_->Fsync();
I
Igor Canadi 已提交
394
    }
395
    TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
I
Igor Canadi 已提交
396 397
    db_mutex_->Lock();
  }
398
  base_->Unref();
I
Igor Canadi 已提交
399 400 401

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
402
  if (s.ok() && meta_.fd.GetFileSize() > 0) {
I
Igor Canadi 已提交
403 404 405 406
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
407
    // Add file to L0
408 409
    edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
                   meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
410
                   meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
411
                   meta_.marked_for_compaction, meta_.oldest_blob_file_number);
I
Igor Canadi 已提交
412
  }
413 414 415 416
#ifndef ROCKSDB_LITE
  // Piggyback FlushJobInfo on the first first flushed memtable.
  mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
#endif  // !ROCKSDB_LITE
I
Igor Canadi 已提交
417

418
  // Note that here we treat flush as level 0 compaction in internal stats
419
  InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
I
Igor Canadi 已提交
420
  stats.micros = db_options_.env->NowMicros() - start_micros;
421
  stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
422
  stats.bytes_written = meta_.fd.GetFileSize();
S
Siying Dong 已提交
423
  RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
424
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
I
Igor Canadi 已提交
425
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
426
                                     meta_.fd.GetFileSize());
427
  RecordFlushIOStats();
I
Igor Canadi 已提交
428 429 430
  return s;
}

431 432 433
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
  db_mutex_->AssertHeld();
434
  std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
435 436
  info->cf_id = cfd_->GetID();
  info->cf_name = cfd_->GetName();
437 438 439 440 441 442

  const uint64_t file_number = meta_.fd.GetNumber();
  info->file_path =
      MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
  info->file_number = file_number;
  info->oldest_blob_file_number = meta_.oldest_blob_file_number;
443 444 445 446 447 448 449 450 451 452
  info->thread_id = db_options_.env->GetThreadID();
  info->job_id = job_context_->job_id;
  info->smallest_seqno = meta_.fd.smallest_seqno;
  info->largest_seqno = meta_.fd.largest_seqno;
  info->table_properties = table_properties_;
  info->flush_reason = cfd_->GetFlushReason();
  return info;
}
#endif  // !ROCKSDB_LITE

I
Igor Canadi 已提交
453
}  // namespace rocksdb