column_family.cc 28.6 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24 25 26
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
27
#include "db/internal_stats.h"
I
Igor Canadi 已提交
28
#include "db/job_context.h"
29
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
30
#include "db/version_set.h"
31
#include "db/write_controller.h"
32
#include "util/autovector.h"
33
#include "util/hash_skiplist_rep.h"
34
#include "util/options_helper.h"
I
Igor Canadi 已提交
35 36 37

namespace rocksdb {

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
//                    (top - bottom)
//  and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
  uint64_t delay;
  if (n >= top) {
    delay = 1000;
  } else if (n < bottom) {
    delay = 0;
  } else {
    // If we are here, we know that:
    //   level0_start_slowdown <= n < level0_slowdown
    // since the previous two conditions are false.
    double how_much = static_cast<double>(n - bottom) / (top - bottom);
    delay = std::max(how_much * how_much * 1000, 100.0);
  }
  assert(delay <= 1000);
  return delay;
}
}  // namespace

I
Igor Canadi 已提交
68
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
69
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
70
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
71 72 73 74 75 76 77
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
78 79 80
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
81 82 83 84
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
85
    db_->FindObsoleteFiles(&job_context, false, true);
86
    mutex_->Unlock();
I
Igor Canadi 已提交
87 88
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
89
    }
90 91 92
  }
}

93 94
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

95 96 97 98
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

99 100 101 102
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

103 104 105 106
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
107 108 109 110
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
111 112
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
113
#endif
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  result.compression_per_level = src.compression_per_level;
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
129 130 131
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
132 133 134 135 136
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
137 138 139 140 141 142 143 144
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
I
Igor Canadi 已提交
145 146 147 148 149 150 151
  auto& collector_factories = result.table_properties_collector_factories;
  for (size_t i = 0; i < result.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    collector_factories[i] =
        std::make_shared<UserKeyTablePropertiesCollectorFactory>(
            collector_factories[i]);
152 153
  }
  // Add collector to collect internal key statistics
I
Igor Canadi 已提交
154 155
  collector_factories.push_back(
      std::make_shared<InternalKeyPropertiesCollectorFactory>());
156

I
Igor Canadi 已提交
157 158 159 160 161 162 163 164 165
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

166 167 168
  return result;
}

169 170 171
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
172

173 174 175 176 177 178 179 180 181 182 183 184 185
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
186
  uint32_t previous_refs = refs.fetch_sub(1);
187 188
  assert(previous_refs > 0);
  return previous_refs == 1;
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

212 213
namespace {
void SuperVersionUnrefHandle(void* ptr) {
214 215 216 217
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
218 219 220 221 222 223 224 225 226 227
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

228 229 230 231 232
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
233 234
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
235
      dummy_versions_(_dummy_versions),
236
      current_(nullptr),
237 238
      refs_(0),
      dropped_(false),
239 240
      internal_comparator_(cf_options.comparator),
      options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
L
Lei Jin 已提交
241
      ioptions_(options_),
242
      mutable_cf_options_(options_, ioptions_),
243
      write_buffer_(write_buffer),
244
      mem_(nullptr),
I
Igor Canadi 已提交
245
      imm_(options_.min_write_buffer_number_to_merge),
246 247
      super_version_(nullptr),
      super_version_number_(0),
248
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
249 250
      next_(nullptr),
      prev_(nullptr),
251
      log_number_(0),
252 253 254
      column_family_set_(column_family_set),
      pending_flush_(false),
      pending_compaction_(false) {
255 256
  Ref();

I
Igor Canadi 已提交
257 258
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
259
    internal_stats_.reset(
260
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
261
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
262
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
263
      compaction_picker_.reset(
264
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
265 266 267 268
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
269
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
270
      compaction_picker_.reset(
271
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
272 273 274 275 276 277 278
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
279
#endif  // !ROCKSDB_LITE
280 281 282 283 284 285 286
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
287
    }
288

289 290 291 292 293 294 295 296
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
      options_.Dump(ioptions_.info_log);
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
297
  }
298

299
  RecalculateWriteStallConditions(mutable_cf_options_);
300
}
I
Igor Canadi 已提交
301

302
// DB mutex held
I
Igor Canadi 已提交
303
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
304
  assert(refs_.load(std::memory_order_relaxed) == 0);
305 306 307 308 309 310
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
311 312 313 314
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
315
    column_family_set_->RemoveColumnFamily(this);
316 317 318 319 320 321
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

322 323 324 325 326
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
341

342 343
  if (dummy_versions_ != nullptr) {
    // List must be empty
344 345 346
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
347
  }
348

349 350
  if (mem_ != nullptr) {
    delete mem_->Unref();
351
  }
352
  autovector<MemTable*> to_delete;
353
  imm_.current()->Unref(&to_delete);
354 355 356 357 358
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
359 360 361 362 363 364 365 366 367 368
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

369 370
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
371
  if (current_ != nullptr) {
S
sdong 已提交
372
    auto* vstorage = current_->storage_info();
373 374
    const double score = vstorage->max_compaction_score();
    const int max_level = vstorage->max_compaction_score_level();
375 376 377

    auto write_controller = column_family_set_->write_controller_;

L
Lei Jin 已提交
378
    if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
379 380
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
381
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
382
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
383 384 385
          "(waiting for flush), max_write_buffer_number is set to %d",
          name_.c_str(), imm()->size(),
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
386
    } else if (vstorage->NumLevelFiles(0) >=
387
               mutable_cf_options.level0_stop_writes_trigger) {
388 389
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
390
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
391
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
392
          name_.c_str(), vstorage->NumLevelFiles(0));
393
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
394
               vstorage->NumLevelFiles(0) >=
395
                   mutable_cf_options.level0_slowdown_writes_trigger) {
S
sdong 已提交
396 397 398 399
      uint64_t slowdown =
          SlowdownAmount(vstorage->NumLevelFiles(0),
                         mutable_cf_options.level0_slowdown_writes_trigger,
                         mutable_cf_options.level0_stop_writes_trigger);
400 401
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
402
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
403 404
          "[%s] Stalling writes because we have %d level-0 files (%" PRIu64
          "us)",
S
sdong 已提交
405
          name_.c_str(), vstorage->NumLevelFiles(0), slowdown);
406 407
    } else if (mutable_cf_options.hard_rate_limit > 1.0 &&
               score > mutable_cf_options.hard_rate_limit) {
408 409 410 411 412
      uint64_t kHardLimitSlowdown = 1000;
      write_controller_token_ =
          write_controller->GetDelayToken(kHardLimitSlowdown);
      internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
                                            false);
413
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
414 415 416
          "[%s] Stalling writes because we hit hard limit on level %d. "
          "(%" PRIu64 "us)",
          name_.c_str(), max_level, kHardLimitSlowdown);
417 418 419 420 421
    } else if (mutable_cf_options.soft_rate_limit > 0.0 &&
               score > mutable_cf_options.soft_rate_limit) {
      uint64_t slowdown = SlowdownAmount(score,
          mutable_cf_options.soft_rate_limit,
          mutable_cf_options.hard_rate_limit);
422 423
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
424
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
425 426 427 428 429 430
          "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
          "us)",
          name_.c_str(), max_level, slowdown);
    } else {
      write_controller_token_.reset();
    }
431 432 433
  }
}

L
Lei Jin 已提交
434
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
435
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
436 437
}

I
Igor Canadi 已提交
438 439 440
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
441

442 443 444 445
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

446
MemTable* ColumnFamilyData::ConstructNewMemtable(
L
Lei Jin 已提交
447
    const MutableCFOptions& mutable_cf_options) {
448
  assert(current_ != nullptr);
449 450 451 452 453 454
  return new MemTable(internal_comparator_, ioptions_,
                      mutable_cf_options, write_buffer_);
}

void ColumnFamilyData::CreateNewMemtable(
    const MutableCFOptions& mutable_cf_options) {
455 456
  if (mem_ != nullptr) {
    delete mem_->Unref();
457
  }
458
  SetMemtable(ConstructNewMemtable(mutable_cf_options));
459 460 461
  mem_->Ref();
}

462 463 464 465
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

466 467
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
468
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
469
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
470 471 472
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
473
  return result;
474 475
}

476 477 478 479 480
Compaction* ColumnFamilyData::CompactRange(
    const MutableCFOptions& mutable_cf_options,
    int input_level, int output_level, uint32_t output_path_id,
    const InternalKey* begin, const InternalKey* end,
    InternalKey** compaction_end) {
S
sdong 已提交
481
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
482
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
S
sdong 已提交
483 484 485 486 487
      output_level, output_path_id, begin, end, compaction_end);
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
488 489
}

490
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
491
    InstrumentedMutex* db_mutex) {
492
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
493 494 495 496
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
497 498 499 500 501
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
502
    InstrumentedMutex* db_mutex) {
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
525
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
526 527 528
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
529
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
    // storage has not been altered and no Scrape has happend. The
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

O
Ori Bernstein 已提交
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
void ColumnFamilyData::NotifyOnCompactionCompleted(
    DB* db, Compaction* c, const Status& status) {
#ifndef ROCKSDB_LITE
  auto listeners = ioptions()->listeners;
  CompactionJobInfo info;
  info.cf_name = c->column_family_data()->GetName();
  info.status = status;
  info.output_level = c->output_level();
  for (const auto fmd : *c->inputs(c->level())) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      fmd->fd.GetNumber(),
                      fmd->fd.GetPathId()));
  }
  for (const auto newf : c->edit()->GetNewFiles()) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      newf.second.fd.GetNumber(),
                      newf.second.fd.GetPathId()));
  }
  for (auto listener : listeners) {
    listener->OnCompactionCompleted(db, info);
  }
#endif  // ROCKSDB_LITE
}

591 592 593 594
void ColumnFamilyData::NotifyOnFlushCompleted(
    DB* db, const std::string& file_path,
    bool triggered_flush_slowdown,
    bool triggered_flush_stop) {
595 596

#ifndef ROCKSDB_LITE
597 598 599 600 601 602 603
  auto listeners = ioptions()->listeners;
  for (auto listener : listeners) {
    listener->OnFlushCompleted(
        db, GetName(), file_path,
        // Use path 0 as fulled memtables are first flushed into path 0.
        triggered_flush_slowdown, triggered_flush_stop);
  }
604
#endif  // ROCKSDB_LITE
605 606
}

607
SuperVersion* ColumnFamilyData::InstallSuperVersion(
608
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
609 610 611 612 613
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
614
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
615
    const MutableCFOptions& mutable_cf_options) {
616
  new_superversion->db_mutex = db_mutex;
617
  new_superversion->mutable_cf_options = mutable_cf_options;
618 619 620 621
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
622
  super_version_->version_number = super_version_number_;
623
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
624
  ResetThreadLocalSuperVersions();
625

626
  RecalculateWriteStallConditions(mutable_cf_options);
627

628 629 630 631 632
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
633 634
}

635 636
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
637
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
638 639
  for (auto ptr : sv_ptrs) {
    assert(ptr);
640 641 642
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
643 644 645 646 647 648 649 650
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
651
#ifndef ROCKSDB_LITE
652
Status ColumnFamilyData::SetOptions(
653 654
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
655 656 657
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
658
    mutable_cf_options_ = new_mutable_cf_options;
659
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
660
  }
661
  return s;
662
}
I
Igor Canadi 已提交
663
#endif  // ROCKSDB_LITE
664

I
Igor Canadi 已提交
665
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
666
                                 const DBOptions* db_options,
L
Lei Jin 已提交
667
                                 const EnvOptions& env_options,
668
                                 Cache* table_cache,
669
                                 WriteBuffer* write_buffer,
670
                                 WriteController* write_controller)
671
    : max_column_family_(0),
672
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
673
                                      ColumnFamilyOptions(), db_options,
674
                                      env_options, nullptr)),
I
Igor Canadi 已提交
675
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
676 677
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
678
      env_options_(env_options),
679
      table_cache_(table_cache),
680
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
681
      write_controller_(write_controller) {
682
  // initialize linked list
683 684
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
685
}
I
Igor Canadi 已提交
686 687

ColumnFamilySet::~ColumnFamilySet() {
688 689 690 691
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
692 693
    delete cfd;
  }
694
  dummy_cfd_->Unref();
695
  delete dummy_cfd_;
I
Igor Canadi 已提交
696 697 698
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
699 700
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
701 702 703 704 705 706 707 708 709 710 711
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

712 713 714
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
715 716 717 718 719
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
720 721
    return nullptr;
  }
I
Igor Canadi 已提交
722 723 724 725 726 727
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

728 729 730 731 732 733
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

734 735 736 737
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
738
// under a DB mutex AND write thread
I
Igor Canadi 已提交
739 740 741 742 743
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
744 745 746
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
747
  column_families_.insert({name, id});
I
Igor Canadi 已提交
748 749
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
750
  // add to linked list
751 752 753 754 755
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
756 757 758
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
759 760 761
  return new_cfd;
}

762 763 764 765
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
766
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
767 768 769 770 771 772 773 774 775
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
776
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
777
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
778
  auto cfd_iter = column_family_data_.find(cfd->GetID());
779 780
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
781
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
782 783
}

I
Igor Canadi 已提交
784
// under a DB mutex OR from a write thread
785
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
786 787 788 789 790 791
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
792
  handle_.SetCFD(current_);
793 794
  return current_ != nullptr;
}
795

796 797 798 799 800 801 802 803 804 805
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

806
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
807
  assert(current_ != nullptr);
808
  return &handle_;
809 810
}

I
Igor Canadi 已提交
811 812 813 814 815 816 817
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
  if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
    flush_scheduler_->ScheduleFlush(current_);
    current_->mem()->MarkFlushScheduled();
  }
}

818 819 820 821 822 823 824 825 826
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

827 828 829 830 831 832 833 834 835
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
836
}  // namespace rocksdb