column_family.cc 29.9 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24 25 26
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
27
#include "db/internal_stats.h"
I
Igor Canadi 已提交
28
#include "db/job_context.h"
29
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
30
#include "db/version_set.h"
31
#include "db/write_controller.h"
32
#include "util/autovector.h"
33
#include "util/hash_skiplist_rep.h"
34
#include "util/options_helper.h"
I
Igor Canadi 已提交
35 36 37

namespace rocksdb {

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
//                    (top - bottom)
//  and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
  uint64_t delay;
  if (n >= top) {
    delay = 1000;
  } else if (n < bottom) {
    delay = 0;
  } else {
    // If we are here, we know that:
    //   level0_start_slowdown <= n < level0_slowdown
    // since the previous two conditions are false.
    double how_much = static_cast<double>(n - bottom) / (top - bottom);
    delay = std::max(how_much * how_much * 1000, 100.0);
  }
  assert(delay <= 1000);
  return delay;
}
}  // namespace

I
Igor Canadi 已提交
68
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
69
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
70
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
71 72 73 74 75 76 77
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
78 79 80
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
81 82 83 84
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
85
    db_->FindObsoleteFiles(&job_context, false, true);
86
    mutex_->Unlock();
I
Igor Canadi 已提交
87 88
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
89
    }
90 91 92
  }
}

93 94
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

95 96 97 98
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

99 100 101 102
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

103
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
104 105
                                    const ColumnFamilyOptions& src,
                                    Logger* info_log) {
106 107
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
108 109 110 111
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
112 113
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
114
#endif
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  result.compression_per_level = src.compression_per_level;
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
130 131 132
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
133 134 135 136 137
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
138 139 140 141 142 143 144 145
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
I
Igor Canadi 已提交
146 147 148 149 150 151 152
  auto& collector_factories = result.table_properties_collector_factories;
  for (size_t i = 0; i < result.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    collector_factories[i] =
        std::make_shared<UserKeyTablePropertiesCollectorFactory>(
            collector_factories[i]);
153 154
  }
  // Add collector to collect internal key statistics
I
Igor Canadi 已提交
155 156
  collector_factories.push_back(
      std::make_shared<InternalKeyPropertiesCollectorFactory>());
157

I
Igor Canadi 已提交
158 159 160 161 162 163 164 165 166
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
    Warn(info_log,
         "This condition must be satisfied: "
         "level0_stop_writes_trigger(%d) >= "
         "level0_slowdown_writes_trigger(%d) >= "
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
    Warn(info_log,
         "Adjust the value to "
         "level0_stop_writes_trigger(%d)"
         "level0_slowdown_writes_trigger(%d)"
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
  }

198 199 200
  return result;
}

201 202 203
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
204

205 206 207 208 209 210 211 212 213 214 215 216 217
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
218
  uint32_t previous_refs = refs.fetch_sub(1);
219 220
  assert(previous_refs > 0);
  return previous_refs == 1;
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

244 245
namespace {
void SuperVersionUnrefHandle(void* ptr) {
246 247 248 249
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
250 251 252 253 254 255 256 257 258 259
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

260 261 262 263 264
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
265 266
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
267
      dummy_versions_(_dummy_versions),
268
      current_(nullptr),
269 270
      refs_(0),
      dropped_(false),
271
      internal_comparator_(cf_options.comparator),
272 273
      options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options,
                                            db_options->info_log.get())),
L
Lei Jin 已提交
274
      ioptions_(options_),
275
      mutable_cf_options_(options_, ioptions_),
276
      write_buffer_(write_buffer),
277
      mem_(nullptr),
I
Igor Canadi 已提交
278
      imm_(options_.min_write_buffer_number_to_merge),
279 280
      super_version_(nullptr),
      super_version_number_(0),
281
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
282 283
      next_(nullptr),
      prev_(nullptr),
284
      log_number_(0),
285 286 287
      column_family_set_(column_family_set),
      pending_flush_(false),
      pending_compaction_(false) {
288 289
  Ref();

I
Igor Canadi 已提交
290 291
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
292
    internal_stats_.reset(
293
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
294
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
295
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
296
      compaction_picker_.reset(
297
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
298 299 300 301
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
302
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
303
      compaction_picker_.reset(
304
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
305 306 307 308 309 310 311
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
312
#endif  // !ROCKSDB_LITE
313 314 315 316 317 318 319
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
320
    }
321

322 323 324 325 326 327 328 329
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
      options_.Dump(ioptions_.info_log);
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
330
  }
331

332
  RecalculateWriteStallConditions(mutable_cf_options_);
333
}
I
Igor Canadi 已提交
334

335
// DB mutex held
I
Igor Canadi 已提交
336
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
337
  assert(refs_.load(std::memory_order_relaxed) == 0);
338 339 340 341 342 343
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
344 345 346 347
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
348
    column_family_set_->RemoveColumnFamily(this);
349 350 351 352 353 354
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

355 356 357 358 359
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
374

375 376
  if (dummy_versions_ != nullptr) {
    // List must be empty
377 378 379
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
380
  }
381

382 383
  if (mem_ != nullptr) {
    delete mem_->Unref();
384
  }
385
  autovector<MemTable*> to_delete;
386
  imm_.current()->Unref(&to_delete);
387 388 389 390 391
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
392 393 394 395 396 397 398 399 400 401
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

402 403
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
404
  if (current_ != nullptr) {
S
sdong 已提交
405
    auto* vstorage = current_->storage_info();
406 407
    const double score = vstorage->max_compaction_score();
    const int max_level = vstorage->max_compaction_score_level();
408 409 410

    auto write_controller = column_family_set_->write_controller_;

L
Lei Jin 已提交
411
    if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
412 413
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
414
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
415
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
416 417 418
          "(waiting for flush), max_write_buffer_number is set to %d",
          name_.c_str(), imm()->size(),
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
419
    } else if (vstorage->NumLevelFiles(0) >=
420
               mutable_cf_options.level0_stop_writes_trigger) {
421 422
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
423
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
424
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
425
          name_.c_str(), vstorage->NumLevelFiles(0));
426
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
427
               vstorage->NumLevelFiles(0) >=
428
                   mutable_cf_options.level0_slowdown_writes_trigger) {
S
sdong 已提交
429 430 431 432
      uint64_t slowdown =
          SlowdownAmount(vstorage->NumLevelFiles(0),
                         mutable_cf_options.level0_slowdown_writes_trigger,
                         mutable_cf_options.level0_stop_writes_trigger);
433 434
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
435
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
436 437
          "[%s] Stalling writes because we have %d level-0 files (%" PRIu64
          "us)",
S
sdong 已提交
438
          name_.c_str(), vstorage->NumLevelFiles(0), slowdown);
439 440
    } else if (mutable_cf_options.hard_rate_limit > 1.0 &&
               score > mutable_cf_options.hard_rate_limit) {
441 442 443 444 445
      uint64_t kHardLimitSlowdown = 1000;
      write_controller_token_ =
          write_controller->GetDelayToken(kHardLimitSlowdown);
      internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
                                            false);
446
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
447 448 449
          "[%s] Stalling writes because we hit hard limit on level %d. "
          "(%" PRIu64 "us)",
          name_.c_str(), max_level, kHardLimitSlowdown);
450 451 452 453 454
    } else if (mutable_cf_options.soft_rate_limit > 0.0 &&
               score > mutable_cf_options.soft_rate_limit) {
      uint64_t slowdown = SlowdownAmount(score,
          mutable_cf_options.soft_rate_limit,
          mutable_cf_options.hard_rate_limit);
455 456
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
457
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
458 459 460 461 462 463
          "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
          "us)",
          name_.c_str(), max_level, slowdown);
    } else {
      write_controller_token_.reset();
    }
464 465 466
  }
}

L
Lei Jin 已提交
467
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
468
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
469 470
}

I
Igor Canadi 已提交
471 472 473
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
474

475 476 477 478
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

479
MemTable* ColumnFamilyData::ConstructNewMemtable(
L
Lei Jin 已提交
480
    const MutableCFOptions& mutable_cf_options) {
481
  assert(current_ != nullptr);
482 483 484 485 486 487
  return new MemTable(internal_comparator_, ioptions_,
                      mutable_cf_options, write_buffer_);
}

void ColumnFamilyData::CreateNewMemtable(
    const MutableCFOptions& mutable_cf_options) {
488 489
  if (mem_ != nullptr) {
    delete mem_->Unref();
490
  }
491
  SetMemtable(ConstructNewMemtable(mutable_cf_options));
492 493 494
  mem_->Ref();
}

495 496 497 498
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

499 500
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
501
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
502
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
503 504 505
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
506
  return result;
507 508
}

509 510 511 512 513
Compaction* ColumnFamilyData::CompactRange(
    const MutableCFOptions& mutable_cf_options,
    int input_level, int output_level, uint32_t output_path_id,
    const InternalKey* begin, const InternalKey* end,
    InternalKey** compaction_end) {
S
sdong 已提交
514
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
515
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
S
sdong 已提交
516 517 518 519 520
      output_level, output_path_id, begin, end, compaction_end);
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
521 522
}

523
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
524
    InstrumentedMutex* db_mutex) {
525
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
526 527 528 529
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
530 531 532 533 534
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
535
    InstrumentedMutex* db_mutex) {
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
558
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
559 560 561
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
562
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
    // storage has not been altered and no Scrape has happend. The
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

O
Ori Bernstein 已提交
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
void ColumnFamilyData::NotifyOnCompactionCompleted(
    DB* db, Compaction* c, const Status& status) {
#ifndef ROCKSDB_LITE
  auto listeners = ioptions()->listeners;
  CompactionJobInfo info;
  info.cf_name = c->column_family_data()->GetName();
  info.status = status;
  info.output_level = c->output_level();
  for (const auto fmd : *c->inputs(c->level())) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      fmd->fd.GetNumber(),
                      fmd->fd.GetPathId()));
  }
  for (const auto newf : c->edit()->GetNewFiles()) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      newf.second.fd.GetNumber(),
                      newf.second.fd.GetPathId()));
  }
  for (auto listener : listeners) {
    listener->OnCompactionCompleted(db, info);
  }
#endif  // ROCKSDB_LITE
}

624 625 626 627
void ColumnFamilyData::NotifyOnFlushCompleted(
    DB* db, const std::string& file_path,
    bool triggered_flush_slowdown,
    bool triggered_flush_stop) {
628 629

#ifndef ROCKSDB_LITE
630 631 632 633 634 635 636
  auto listeners = ioptions()->listeners;
  for (auto listener : listeners) {
    listener->OnFlushCompleted(
        db, GetName(), file_path,
        // Use path 0 as fulled memtables are first flushed into path 0.
        triggered_flush_slowdown, triggered_flush_stop);
  }
637
#endif  // ROCKSDB_LITE
638 639
}

640
SuperVersion* ColumnFamilyData::InstallSuperVersion(
641
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
642 643 644 645 646
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
647
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
648
    const MutableCFOptions& mutable_cf_options) {
649
  new_superversion->db_mutex = db_mutex;
650
  new_superversion->mutable_cf_options = mutable_cf_options;
651 652 653 654
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
655
  super_version_->version_number = super_version_number_;
656
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
657
  ResetThreadLocalSuperVersions();
658

659
  RecalculateWriteStallConditions(mutable_cf_options);
660

661 662 663 664 665
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
666 667
}

668 669
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
670
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
671 672
  for (auto ptr : sv_ptrs) {
    assert(ptr);
673 674 675
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
676 677 678 679 680 681 682 683
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
684
#ifndef ROCKSDB_LITE
685
Status ColumnFamilyData::SetOptions(
686 687
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
688 689 690
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
691
    mutable_cf_options_ = new_mutable_cf_options;
692
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
693
  }
694
  return s;
695
}
I
Igor Canadi 已提交
696
#endif  // ROCKSDB_LITE
697

I
Igor Canadi 已提交
698
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
699
                                 const DBOptions* db_options,
L
Lei Jin 已提交
700
                                 const EnvOptions& env_options,
701
                                 Cache* table_cache,
702
                                 WriteBuffer* write_buffer,
703
                                 WriteController* write_controller)
704
    : max_column_family_(0),
705
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
706
                                      ColumnFamilyOptions(), db_options,
707
                                      env_options, nullptr)),
I
Igor Canadi 已提交
708
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
709 710
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
711
      env_options_(env_options),
712
      table_cache_(table_cache),
713
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
714
      write_controller_(write_controller) {
715
  // initialize linked list
716 717
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
718
}
I
Igor Canadi 已提交
719 720

ColumnFamilySet::~ColumnFamilySet() {
721 722 723 724
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
725 726
    delete cfd;
  }
727
  dummy_cfd_->Unref();
728
  delete dummy_cfd_;
I
Igor Canadi 已提交
729 730 731
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
732 733
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
734 735 736 737 738 739 740 741 742 743 744
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

745 746 747
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
748 749 750 751 752
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
753 754
    return nullptr;
  }
I
Igor Canadi 已提交
755 756 757 758 759 760
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

761 762 763 764 765 766
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

767 768 769 770
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
771
// under a DB mutex AND write thread
I
Igor Canadi 已提交
772 773 774 775 776
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
777 778 779
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
780
  column_families_.insert({name, id});
I
Igor Canadi 已提交
781 782
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
783
  // add to linked list
784 785 786 787 788
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
789 790 791
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
792 793 794
  return new_cfd;
}

795 796 797 798
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
799
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
800 801 802 803 804 805 806 807 808
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
809
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
810
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
811
  auto cfd_iter = column_family_data_.find(cfd->GetID());
812 813
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
814
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
815 816
}

I
Igor Canadi 已提交
817
// under a DB mutex OR from a write thread
818
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
819 820 821 822 823 824
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
825
  handle_.SetCFD(current_);
826 827
  return current_ != nullptr;
}
828

829 830 831 832 833 834 835 836 837 838
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

839
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
840
  assert(current_ != nullptr);
841
  return &handle_;
842 843
}

I
Igor Canadi 已提交
844 845 846 847 848 849 850
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
  if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
    flush_scheduler_->ScheduleFlush(current_);
    current_->mem()->MarkFlushScheduled();
  }
}

851 852 853 854 855 856 857 858 859
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

860 861 862 863 864 865 866 867 868
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
869
}  // namespace rocksdb