column_family.cc 30.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24 25 26
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
27
#include "db/internal_stats.h"
I
Igor Canadi 已提交
28
#include "db/job_context.h"
29
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
30
#include "db/version_set.h"
31
#include "db/write_controller.h"
32
#include "util/autovector.h"
33
#include "util/hash_skiplist_rep.h"
34
#include "util/options_helper.h"
I
Igor Canadi 已提交
35 36 37

namespace rocksdb {

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
//                    (top - bottom)
//  and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
  uint64_t delay;
  if (n >= top) {
    delay = 1000;
  } else if (n < bottom) {
    delay = 0;
  } else {
    // If we are here, we know that:
    //   level0_start_slowdown <= n < level0_slowdown
    // since the previous two conditions are false.
    double how_much = static_cast<double>(n - bottom) / (top - bottom);
    delay = std::max(how_much * how_much * 1000, 100.0);
  }
  assert(delay <= 1000);
  return delay;
}
}  // namespace

I
Igor Canadi 已提交
68
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
69
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
70
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
71 72 73 74 75 76 77
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
78 79 80
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
81 82 83 84
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
85
    db_->FindObsoleteFiles(&job_context, false, true);
86
    mutex_->Unlock();
I
Igor Canadi 已提交
87 88
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
89
    }
90 91 92
  }
}

93 94
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

95 96 97 98
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

99 100 101 102
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
void GetIntTblPropCollectorFactory(
    const ColumnFamilyOptions& cf_options,
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
  auto& collector_factories = cf_options.table_properties_collector_factories;
  for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

119 120 121
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
                                    const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
122 123
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
124 125 126 127
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
128 129
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
130
#endif
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  result.compression_per_level = src.compression_per_level;
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
146 147 148
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
149 150 151 152 153
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
154 155 156 157
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
158 159 160 161 162 163 164 165 166
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

167 168 169 170
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
171
    Warn(db_options.info_log.get(),
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
         "This condition must be satisfied: "
         "level0_stop_writes_trigger(%d) >= "
         "level0_slowdown_writes_trigger(%d) >= "
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
188
    Warn(db_options.info_log.get(),
189 190 191 192 193 194 195 196
         "Adjust the value to "
         "level0_stop_writes_trigger(%d)"
         "level0_slowdown_writes_trigger(%d)"
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
  }
197 198 199 200 201 202 203 204 205 206
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
207

208 209 210
  return result;
}

211 212 213
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
214

215 216 217 218 219 220 221 222 223 224 225 226 227
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
228
  uint32_t previous_refs = refs.fetch_sub(1);
229 230
  assert(previous_refs > 0);
  return previous_refs == 1;
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

254 255
namespace {
void SuperVersionUnrefHandle(void* ptr) {
256 257 258 259
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
260 261 262 263 264 265 266 267 268 269
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

270 271 272 273 274
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
275 276
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
277
      dummy_versions_(_dummy_versions),
278
      current_(nullptr),
279 280
      refs_(0),
      dropped_(false),
281
      internal_comparator_(cf_options.comparator),
282 283
      options_(*db_options,
               SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
L
Lei Jin 已提交
284
      ioptions_(options_),
285
      mutable_cf_options_(options_, ioptions_),
286
      write_buffer_(write_buffer),
287
      mem_(nullptr),
I
Igor Canadi 已提交
288
      imm_(options_.min_write_buffer_number_to_merge),
289 290
      super_version_(nullptr),
      super_version_number_(0),
291
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
292 293
      next_(nullptr),
      prev_(nullptr),
294
      log_number_(0),
295 296 297
      column_family_set_(column_family_set),
      pending_flush_(false),
      pending_compaction_(false) {
298 299
  Ref();

300 301 302
  // Convert user defined table properties collector factories to internal ones.
  GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);

I
Igor Canadi 已提交
303 304
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
305
    internal_stats_.reset(
306
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
307
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
308
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
309
      compaction_picker_.reset(
310
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
311 312 313 314
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
315
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
316
      compaction_picker_.reset(
317
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
318 319 320 321 322 323 324
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
325
#endif  // !ROCKSDB_LITE
326 327 328 329 330 331 332
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
333
    }
334

335 336 337 338 339 340 341 342
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
      options_.Dump(ioptions_.info_log);
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
343
  }
344

345
  RecalculateWriteStallConditions(mutable_cf_options_);
346
}
I
Igor Canadi 已提交
347

348
// DB mutex held
I
Igor Canadi 已提交
349
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
350
  assert(refs_.load(std::memory_order_relaxed) == 0);
351 352 353 354 355 356
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
357 358 359 360
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
361
    column_family_set_->RemoveColumnFamily(this);
362 363 364 365 366 367
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

368 369 370 371 372
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
387

388 389
  if (dummy_versions_ != nullptr) {
    // List must be empty
390 391 392
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
393
  }
394

395 396
  if (mem_ != nullptr) {
    delete mem_->Unref();
397
  }
398
  autovector<MemTable*> to_delete;
399
  imm_.current()->Unref(&to_delete);
400 401 402 403 404
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
405 406 407 408 409 410 411 412 413 414
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

415 416
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
417
  if (current_ != nullptr) {
S
sdong 已提交
418
    auto* vstorage = current_->storage_info();
419 420
    const double score = vstorage->max_compaction_score();
    const int max_level = vstorage->max_compaction_score_level();
421 422 423

    auto write_controller = column_family_set_->write_controller_;

L
Lei Jin 已提交
424
    if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
425 426
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
427
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
428
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
429 430 431
          "(waiting for flush), max_write_buffer_number is set to %d",
          name_.c_str(), imm()->size(),
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
432
    } else if (vstorage->l0_delay_trigger_count() >=
433
               mutable_cf_options.level0_stop_writes_trigger) {
434 435
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
436
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
437
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
438
          name_.c_str(), vstorage->l0_delay_trigger_count());
439
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
440
               vstorage->l0_delay_trigger_count() >=
441
                   mutable_cf_options.level0_slowdown_writes_trigger) {
S
sdong 已提交
442
      uint64_t slowdown =
S
sdong 已提交
443
          SlowdownAmount(vstorage->l0_delay_trigger_count(),
S
sdong 已提交
444 445
                         mutable_cf_options.level0_slowdown_writes_trigger,
                         mutable_cf_options.level0_stop_writes_trigger);
446 447
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
448
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
449 450
          "[%s] Stalling writes because we have %d level-0 files (%" PRIu64
          "us)",
S
sdong 已提交
451
          name_.c_str(), vstorage->l0_delay_trigger_count(), slowdown);
452 453
    } else if (mutable_cf_options.hard_rate_limit > 1.0 &&
               score > mutable_cf_options.hard_rate_limit) {
454 455 456
      uint64_t kHardLimitSlowdown = 1000;
      write_controller_token_ =
          write_controller->GetDelayToken(kHardLimitSlowdown);
457
      internal_stats_->RecordLevelNSlowdown(max_level, false);
458
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
459 460 461
          "[%s] Stalling writes because we hit hard limit on level %d. "
          "(%" PRIu64 "us)",
          name_.c_str(), max_level, kHardLimitSlowdown);
462 463 464 465 466
    } else if (mutable_cf_options.soft_rate_limit > 0.0 &&
               score > mutable_cf_options.soft_rate_limit) {
      uint64_t slowdown = SlowdownAmount(score,
          mutable_cf_options.soft_rate_limit,
          mutable_cf_options.hard_rate_limit);
467
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
468
      internal_stats_->RecordLevelNSlowdown(max_level, true);
469
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
470 471 472 473 474 475
          "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
          "us)",
          name_.c_str(), max_level, slowdown);
    } else {
      write_controller_token_.reset();
    }
476 477 478
  }
}

L
Lei Jin 已提交
479
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
480
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
481 482
}

I
Igor Canadi 已提交
483 484 485
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
486

487 488 489 490
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

491
MemTable* ColumnFamilyData::ConstructNewMemtable(
L
Lei Jin 已提交
492
    const MutableCFOptions& mutable_cf_options) {
493
  assert(current_ != nullptr);
494 495 496 497 498 499
  return new MemTable(internal_comparator_, ioptions_,
                      mutable_cf_options, write_buffer_);
}

void ColumnFamilyData::CreateNewMemtable(
    const MutableCFOptions& mutable_cf_options) {
500 501
  if (mem_ != nullptr) {
    delete mem_->Unref();
502
  }
503
  SetMemtable(ConstructNewMemtable(mutable_cf_options));
504 505 506
  mem_->Ref();
}

507 508 509 510
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

511 512
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
513
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
514
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
515 516 517
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
518
  return result;
519 520
}

521 522 523 524 525
Compaction* ColumnFamilyData::CompactRange(
    const MutableCFOptions& mutable_cf_options,
    int input_level, int output_level, uint32_t output_path_id,
    const InternalKey* begin, const InternalKey* end,
    InternalKey** compaction_end) {
S
sdong 已提交
526
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
527
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
S
sdong 已提交
528 529 530 531 532
      output_level, output_path_id, begin, end, compaction_end);
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
533 534
}

535
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
536
    InstrumentedMutex* db_mutex) {
537
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
538 539 540 541
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
542 543 544 545 546
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
547
    InstrumentedMutex* db_mutex) {
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
570
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
571 572 573
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
574
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
    // storage has not been altered and no Scrape has happend. The
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

O
Ori Bernstein 已提交
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635
void ColumnFamilyData::NotifyOnCompactionCompleted(
    DB* db, Compaction* c, const Status& status) {
#ifndef ROCKSDB_LITE
  auto listeners = ioptions()->listeners;
  CompactionJobInfo info;
  info.cf_name = c->column_family_data()->GetName();
  info.status = status;
  info.output_level = c->output_level();
  for (const auto fmd : *c->inputs(c->level())) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      fmd->fd.GetNumber(),
                      fmd->fd.GetPathId()));
  }
  for (const auto newf : c->edit()->GetNewFiles()) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      newf.second.fd.GetNumber(),
                      newf.second.fd.GetPathId()));
  }
  for (auto listener : listeners) {
    listener->OnCompactionCompleted(db, info);
  }
#endif  // ROCKSDB_LITE
}

636 637 638 639
void ColumnFamilyData::NotifyOnFlushCompleted(
    DB* db, const std::string& file_path,
    bool triggered_flush_slowdown,
    bool triggered_flush_stop) {
640 641

#ifndef ROCKSDB_LITE
642 643 644 645 646 647 648
  auto listeners = ioptions()->listeners;
  for (auto listener : listeners) {
    listener->OnFlushCompleted(
        db, GetName(), file_path,
        // Use path 0 as fulled memtables are first flushed into path 0.
        triggered_flush_slowdown, triggered_flush_stop);
  }
649
#endif  // ROCKSDB_LITE
650 651
}

652
SuperVersion* ColumnFamilyData::InstallSuperVersion(
653
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
654 655 656 657 658
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
659
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
660
    const MutableCFOptions& mutable_cf_options) {
661
  new_superversion->db_mutex = db_mutex;
662
  new_superversion->mutable_cf_options = mutable_cf_options;
663 664 665 666
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
667
  super_version_->version_number = super_version_number_;
668
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
669
  ResetThreadLocalSuperVersions();
670

671
  RecalculateWriteStallConditions(mutable_cf_options);
672

673 674 675 676 677
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
678 679
}

680 681
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
682
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
683 684
  for (auto ptr : sv_ptrs) {
    assert(ptr);
685 686 687
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
688 689 690 691 692 693 694 695
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
696
#ifndef ROCKSDB_LITE
697
Status ColumnFamilyData::SetOptions(
698 699
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
700 701 702
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
703
    mutable_cf_options_ = new_mutable_cf_options;
704
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
705
  }
706
  return s;
707
}
I
Igor Canadi 已提交
708
#endif  // ROCKSDB_LITE
709

I
Igor Canadi 已提交
710
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
711
                                 const DBOptions* db_options,
L
Lei Jin 已提交
712
                                 const EnvOptions& env_options,
713
                                 Cache* table_cache,
714
                                 WriteBuffer* write_buffer,
715
                                 WriteController* write_controller)
716
    : max_column_family_(0),
717
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
718
                                      ColumnFamilyOptions(), db_options,
719
                                      env_options, nullptr)),
I
Igor Canadi 已提交
720
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
721 722
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
723
      env_options_(env_options),
724
      table_cache_(table_cache),
725
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
726
      write_controller_(write_controller) {
727
  // initialize linked list
728 729
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
730
}
I
Igor Canadi 已提交
731 732

ColumnFamilySet::~ColumnFamilySet() {
733 734 735 736
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
737 738
    delete cfd;
  }
739
  dummy_cfd_->Unref();
740
  delete dummy_cfd_;
I
Igor Canadi 已提交
741 742 743
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
744 745
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
746 747 748 749 750 751 752 753 754 755 756
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

757 758 759
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
760 761 762 763 764
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
765 766
    return nullptr;
  }
I
Igor Canadi 已提交
767 768 769 770 771 772
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

773 774 775 776 777 778
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

779 780 781 782
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
783
// under a DB mutex AND write thread
I
Igor Canadi 已提交
784 785 786 787 788
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
789 790 791
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
792
  column_families_.insert({name, id});
I
Igor Canadi 已提交
793 794
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
795
  // add to linked list
796 797 798 799 800
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
801 802 803
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
804 805 806
  return new_cfd;
}

807 808 809 810
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
811
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
812 813 814 815 816 817 818 819 820
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
821
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
822
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
823
  auto cfd_iter = column_family_data_.find(cfd->GetID());
824 825
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
826
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
827 828
}

I
Igor Canadi 已提交
829
// under a DB mutex OR from a write thread
830
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
831 832 833 834 835 836
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
837
  handle_.SetCFD(current_);
838 839
  return current_ != nullptr;
}
840

841 842 843 844 845 846 847 848 849 850
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

851
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
852
  assert(current_ != nullptr);
853
  return &handle_;
854 855
}

I
Igor Canadi 已提交
856 857 858 859 860 861 862
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
  if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
    flush_scheduler_->ScheduleFlush(current_);
    current_->mem()->MarkFlushScheduled();
  }
}

863 864 865 866 867 868 869 870 871
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

872 873 874 875 876 877 878 879 880
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
881
}  // namespace rocksdb