flush_job.cc 11.2 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
I
Igor Canadi 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/flush_job.h"

#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17

I
Igor Canadi 已提交
18 19 20 21 22 23
#include <algorithm>
#include <vector>

#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
24
#include "db/event_helpers.h"
I
Igor Canadi 已提交
25 26 27 28 29 30 31 32
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/likely.h"
33
#include "port/port.h"
I
Igor Canadi 已提交
34 35 36 37 38 39 40 41 42 43 44
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
I
Igor Canadi 已提交
45
#include "util/event_logger.h"
46
#include "util/file_util.h"
47
#include "util/iostats_context_imp.h"
I
Igor Canadi 已提交
48
#include "util/log_buffer.h"
49
#include "util/logging.h"
I
Igor Canadi 已提交
50 51 52 53
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
54
#include "util/thread_status_util.h"
I
Igor Canadi 已提交
55 56 57 58 59 60 61

namespace rocksdb {

FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
                   const DBOptions& db_options,
                   const MutableCFOptions& mutable_cf_options,
                   const EnvOptions& env_options, VersionSet* versions,
62 63
                   InstrumentedMutex* db_mutex,
                   std::atomic<bool>* shutting_down,
I
Igor Canadi 已提交
64
                   std::vector<SequenceNumber> existing_snapshots,
65
                   SequenceNumber earliest_write_conflict_snapshot,
I
Igor Canadi 已提交
66 67
                   JobContext* job_context, LogBuffer* log_buffer,
                   Directory* db_directory, Directory* output_file_directory,
I
Igor Canadi 已提交
68 69
                   CompressionType output_compression, Statistics* stats,
                   EventLogger* event_logger)
I
Igor Canadi 已提交
70 71 72 73 74 75 76 77
    : dbname_(dbname),
      cfd_(cfd),
      db_options_(db_options),
      mutable_cf_options_(mutable_cf_options),
      env_options_(env_options),
      versions_(versions),
      db_mutex_(db_mutex),
      shutting_down_(shutting_down),
I
Igor Canadi 已提交
78
      existing_snapshots_(std::move(existing_snapshots)),
79
      earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
I
Igor Canadi 已提交
80 81 82
      job_context_(job_context),
      log_buffer_(log_buffer),
      db_directory_(db_directory),
83
      output_file_directory_(output_file_directory),
I
Igor Canadi 已提交
84
      output_compression_(output_compression),
I
Igor Canadi 已提交
85
      stats_(stats),
86 87
      event_logger_(event_logger) {
  // Update the thread status to indicate flush.
88
  ReportStartedFlush();
89 90 91 92 93 94
  TEST_SYNC_POINT("FlushJob::FlushJob()");
}

FlushJob::~FlushJob() {
  ThreadStatusUtil::ResetThreadStatus();
}
I
Igor Canadi 已提交
95

96
void FlushJob::ReportStartedFlush() {
97 98
  ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
                                    cfd_->options()->enable_thread_tracking);
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
  ThreadStatusUtil::SetThreadOperationProperty(
      ThreadStatus::COMPACTION_JOB_ID,
      job_context_->job_id);
  IOSTATS_RESET(bytes_written);
}

void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
  uint64_t input_size = 0;
  for (auto* mem : mems) {
    input_size += mem->ApproximateMemoryUsage();
  }
  ThreadStatusUtil::IncreaseThreadOperationProperty(
      ThreadStatus::FLUSH_BYTES_MEMTABLES,
      input_size);
}

void FlushJob::RecordFlushIOStats() {
117
  ThreadStatusUtil::SetThreadOperationProperty(
118 119 120
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}

121
Status FlushJob::Run(FileMetaData* file_meta) {
122 123
  AutoThreadOperationStageUpdater stage_run(
      ThreadStatus::STAGE_FLUSH_RUN);
I
Igor Canadi 已提交
124
  // Save the contents of the earliest memtable as a new Table
125
  FileMetaData meta;
I
Igor Canadi 已提交
126 127 128 129 130 131 132 133
  autovector<MemTable*> mems;
  cfd_->imm()->PickMemtablesToFlush(&mems);
  if (mems.empty()) {
    LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
                cfd_->GetName().c_str());
    return Status::OK();
  }

134
  ReportFlushInputSize(mems);
135

I
Igor Canadi 已提交
136 137 138 139 140 141 142 143 144 145 146 147
  // entries mems are (implicitly) sorted in ascending order by their created
  // time. We will use the first memtable's `edit` to keep the meta info for
  // this flush.
  MemTable* m = mems[0];
  VersionEdit* edit = m->GetEdits();
  edit->SetPrevLogNumber(0);
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
  // will no longer be picked up for recovery.
  edit->SetLogNumber(mems.back()->GetNextLogNumber());
  edit->SetColumnFamily(cfd_->GetID());

  // This will release and re-acquire the mutex.
148
  Status s = WriteLevel0Table(mems, edit, &meta);
I
Igor Canadi 已提交
149 150 151 152 153 154 155 156

  if (s.ok() &&
      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
    s = Status::ShutdownInProgress(
        "Database shutdown or Column family drop during flush");
  }

  if (!s.ok()) {
157
    cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
I
Igor Canadi 已提交
158
  } else {
159
    TEST_SYNC_POINT("FlushJob::InstallResults");
I
Igor Canadi 已提交
160 161
    // Replace immutable memtable with the generated Table
    s = cfd_->imm()->InstallMemtableFlushResults(
162 163 164
        cfd_, mutable_cf_options_, mems, versions_, db_mutex_,
        meta.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
        log_buffer_);
I
Igor Canadi 已提交
165 166
  }

167 168
  if (s.ok() && file_meta != nullptr) {
    *file_meta = meta;
169
  }
170
  RecordFlushIOStats();
171

172 173 174 175 176 177 178 179 180 181 182
  auto stream = event_logger_->LogToBuffer(log_buffer_);
  stream << "job" << job_context_->job_id << "event"
         << "flush_finished";
  stream << "lsm_state";
  stream.StartArray();
  auto vstorage = cfd_->current()->storage_info();
  for (int level = 0; level < vstorage->num_levels(); ++level) {
    stream << vstorage->NumLevelFiles(level);
  }
  stream.EndArray();

I
Igor Canadi 已提交
183 184 185 186
  return s;
}

Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
187
                                  VersionEdit* edit, FileMetaData* meta) {
188 189
  AutoThreadOperationStageUpdater stage_updater(
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
I
Igor Canadi 已提交
190 191
  db_mutex_->AssertHeld();
  const uint64_t start_micros = db_options_.env->NowMicros();
192
  // path 0 for level 0 file.
193
  meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
I
Igor Canadi 已提交
194 195 196 197 198 199 200 201 202

  Version* base = cfd_->current();
  base->Ref();  // it is likely that we do not need this reference
  Status s;
  {
    db_mutex_->Unlock();
    if (log_buffer_) {
      log_buffer_->FlushBufferToLog();
    }
S
sdong 已提交
203
    std::vector<InternalIterator*> memtables;
I
Igor Canadi 已提交
204 205 206
    ReadOptions ro;
    ro.total_order_seek = true;
    Arena arena;
207 208
    uint64_t total_num_entries = 0, total_num_deletes = 0;
    size_t total_memory_usage = 0;
I
Igor Canadi 已提交
209
    for (MemTable* m : mems) {
210
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
211 212
          "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
          cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
I
Igor Canadi 已提交
213
      memtables.push_back(m->NewIterator(ro, &arena));
214 215 216
      total_num_entries += m->num_entries();
      total_num_deletes += m->num_deletes();
      total_memory_usage += m->ApproximateMemoryUsage();
I
Igor Canadi 已提交
217
    }
218 219 220 221 222 223 224

    event_logger_->Log() << "job" << job_context_->job_id << "event"
                         << "flush_started"
                         << "num_memtables" << mems.size() << "num_entries"
                         << total_num_entries << "num_deletes"
                         << total_num_deletes << "memory_usage"
                         << total_memory_usage;
225

226
    TableFileCreationInfo info;
I
Igor Canadi 已提交
227
    {
228 229 230
      ScopedArenaIterator iter(
          NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                             static_cast<int>(memtables.size()), &arena));
231
      Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
232
          "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
233
          cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber());
I
Igor Canadi 已提交
234

235 236
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                               &output_compression_);
237 238 239 240
      s = BuildTable(
          dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
          cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(),
          cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
241 242 243
          cfd_->GetName(), existing_snapshots_,
          earliest_write_conflict_snapshot_, output_compression_,
          cfd_->ioptions()->compression_opts,
244 245
          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
          Env::IO_HIGH, &table_properties_, 0 /* level */);
246
      info.table_properties = table_properties_;
I
Igor Canadi 已提交
247 248
      LogFlush(db_options_.info_log);
    }
249
    Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
250 251 252
        "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
        " bytes %s"
        "%s",
253 254 255
        cfd_->GetName().c_str(), job_context_->job_id, meta->fd.GetNumber(),
        meta->fd.GetFileSize(), s.ToString().c_str(),
        meta->marked_for_compaction ? " (needs compaction)" : "");
256 257 258

    // output to event logger
    if (s.ok()) {
259 260 261
      info.db_name = dbname_;
      info.cf_name = cfd_->GetName();
      info.file_path = TableFileName(db_options_.db_paths,
262 263 264
                                     meta->fd.GetNumber(),
                                     meta->fd.GetPathId());
      info.file_size = meta->fd.GetFileSize();
265 266 267
      info.job_id = job_context_->job_id;
      EventHelpers::LogAndNotifyTableFileCreation(
          event_logger_, db_options_.listeners,
268
          meta->fd, info);
269
      TEST_SYNC_POINT("FlushJob::LogAndNotifyTableFileCreation()");
270 271
    }

272 273
    if (!db_options_.disableDataSync && output_file_directory_ != nullptr) {
      output_file_directory_->Fsync();
I
Igor Canadi 已提交
274
    }
275
    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
I
Igor Canadi 已提交
276 277 278 279 280 281 282 283 284
    db_mutex_->Lock();
  }
  base->Unref();

  // re-acquire the most current version
  base = cfd_->current();

  // Note that if file_size is zero, the file has been deleted and
  // should not be added to the manifest.
285
  if (s.ok() && meta->fd.GetFileSize() > 0) {
I
Igor Canadi 已提交
286 287 288 289
    // if we have more than 1 background thread, then we cannot
    // insert files directly into higher levels because some other
    // threads could be concurrently producing compacted files for
    // that key range.
290 291
    // Add file to L0
    edit->AddFile(0 /* level */, meta->fd.GetNumber(), meta->fd.GetPathId(),
292 293 294
                  meta->fd.GetFileSize(), meta->smallest, meta->largest,
                  meta->smallest_seqno, meta->largest_seqno,
                  meta->marked_for_compaction);
I
Igor Canadi 已提交
295 296 297 298
  }

  InternalStats::CompactionStats stats(1);
  stats.micros = db_options_.env->NowMicros() - start_micros;
299
  stats.bytes_written = meta->fd.GetFileSize();
300
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
I
Igor Canadi 已提交
301
  cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
302
                                     meta->fd.GetFileSize());
303
  RecordTick(stats_, FLUSH_WRITE_BYTES, meta->fd.GetFileSize());
I
Igor Canadi 已提交
304 305 306 307
  return s;
}

}  // namespace rocksdb