column_family.cc 27.3 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24 25 26
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
27
#include "db/internal_stats.h"
I
Igor Canadi 已提交
28
#include "db/job_context.h"
29
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
30
#include "db/version_set.h"
31
#include "db/write_controller.h"
32
#include "util/autovector.h"
33
#include "util/hash_skiplist_rep.h"
34
#include "util/options_helper.h"
I
Igor Canadi 已提交
35 36 37

namespace rocksdb {

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
//                    (top - bottom)
//  and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
  uint64_t delay;
  if (n >= top) {
    delay = 1000;
  } else if (n < bottom) {
    delay = 0;
  } else {
    // If we are here, we know that:
    //   level0_start_slowdown <= n < level0_slowdown
    // since the previous two conditions are false.
    double how_much = static_cast<double>(n - bottom) / (top - bottom);
    delay = std::max(how_much * how_much * 1000, 100.0);
  }
  assert(delay <= 1000);
  return delay;
}
}  // namespace

I
Igor Canadi 已提交
68 69 70
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
    ColumnFamilyData* column_family_data, DBImpl* db, port::Mutex* mutex)
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
71 72 73 74 75 76 77
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
I
Igor Canadi 已提交
78
    JobContext job_context;
79 80 81 82
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
83
    db_->FindObsoleteFiles(&job_context, false, true);
84
    mutex_->Unlock();
I
Igor Canadi 已提交
85 86
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
87
    }
88 89 90
  }
}

91 92
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

93 94 95 96
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

97 98 99 100
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

101 102 103 104
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
105 106 107 108
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
109 110
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
111
#endif
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  result.compression_per_level = src.compression_per_level;
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
127 128 129
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
130 131 132 133 134
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
135 136 137 138 139 140 141 142
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
I
Igor Canadi 已提交
143 144 145 146 147 148 149
  auto& collector_factories = result.table_properties_collector_factories;
  for (size_t i = 0; i < result.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    collector_factories[i] =
        std::make_shared<UserKeyTablePropertiesCollectorFactory>(
            collector_factories[i]);
150 151
  }
  // Add collector to collect internal key statistics
I
Igor Canadi 已提交
152 153
  collector_factories.push_back(
      std::make_shared<InternalKeyPropertiesCollectorFactory>());
154

I
Igor Canadi 已提交
155 156 157 158 159 160 161 162 163
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

164 165 166
  return result;
}

167 168 169
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
170

171 172 173 174 175 176 177 178 179 180 181 182 183
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
184 185 186
  uint32_t previous_refs = refs.fetch_sub(1, std::memory_order_relaxed);
  assert(previous_refs > 0);
  return previous_refs == 1;
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

210 211
namespace {
void SuperVersionUnrefHandle(void* ptr) {
212 213 214 215
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
216 217 218 219 220 221 222 223 224 225
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

226
ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
I
Igor Canadi 已提交
227 228
                                   Version* _dummy_versions,
                                   Cache* _table_cache,
229
                                   WriteBuffer* write_buffer,
230
                                   const ColumnFamilyOptions& cf_options,
I
Igor Canadi 已提交
231
                                   const DBOptions* db_options,
L
Lei Jin 已提交
232
                                   const EnvOptions& env_options,
233
                                   ColumnFamilySet* column_family_set)
234 235
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
236
      dummy_versions_(_dummy_versions),
237
      current_(nullptr),
238 239
      refs_(0),
      dropped_(false),
240 241
      internal_comparator_(cf_options.comparator),
      options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
L
Lei Jin 已提交
242
      ioptions_(options_),
243
      mutable_cf_options_(options_, ioptions_),
244
      write_buffer_(write_buffer),
245
      mem_(nullptr),
I
Igor Canadi 已提交
246
      imm_(options_.min_write_buffer_number_to_merge),
247 248
      super_version_(nullptr),
      super_version_number_(0),
249
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
250 251
      next_(nullptr),
      prev_(nullptr),
252
      log_number_(0),
253 254 255
      column_family_set_(column_family_set) {
  Ref();

I
Igor Canadi 已提交
256 257
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
258
    internal_stats_.reset(
259
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
260
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
261
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
262
      compaction_picker_.reset(
263
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
264 265 266 267
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
268
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
269
      compaction_picker_.reset(
270
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
271 272 273 274 275 276 277
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
278
#endif  // !ROCKSDB_LITE
279 280 281 282 283 284 285
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
286
    }
287

288 289
    Log(InfoLogLevel::INFO_LEVEL,
        ioptions_.info_log, "Options for column family \"%s\":\n",
290
        name.c_str());
I
Igor Canadi 已提交
291
    options_.Dump(ioptions_.info_log);
292
  }
293

294
  RecalculateWriteStallConditions(mutable_cf_options_);
295
}
I
Igor Canadi 已提交
296

297
// DB mutex held
I
Igor Canadi 已提交
298
ColumnFamilyData::~ColumnFamilyData() {
299 300 301 302 303 304 305 306 307 308
  assert(refs_ == 0);
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

  // it's nullptr for dummy CFD
  if (column_family_set_ != nullptr) {
    // remove from column_family_set
I
Igor Canadi 已提交
309
    column_family_set_->RemoveColumnFamily(this);
310 311 312 313 314 315
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

I
Igor Canadi 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
330

331 332
  if (dummy_versions_ != nullptr) {
    // List must be empty
333 334 335
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
336
  }
337

338 339
  if (mem_ != nullptr) {
    delete mem_->Unref();
340
  }
341
  autovector<MemTable*> to_delete;
342
  imm_.current()->Unref(&to_delete);
343 344 345 346 347
  for (MemTable* m : to_delete) {
    delete m;
  }
}

348 349
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
350
  if (current_ != nullptr) {
S
sdong 已提交
351
    auto* vstorage = current_->storage_info();
352 353
    const double score = vstorage->max_compaction_score();
    const int max_level = vstorage->max_compaction_score_level();
354 355 356

    auto write_controller = column_family_set_->write_controller_;

L
Lei Jin 已提交
357
    if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
358 359
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
360
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
361
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
362 363 364
          "(waiting for flush), max_write_buffer_number is set to %d",
          name_.c_str(), imm()->size(),
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
365
    } else if (vstorage->NumLevelFiles(0) >=
366
               mutable_cf_options.level0_stop_writes_trigger) {
367 368
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
369
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
370
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
371
          name_.c_str(), vstorage->NumLevelFiles(0));
372
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
373
               vstorage->NumLevelFiles(0) >=
374
                   mutable_cf_options.level0_slowdown_writes_trigger) {
S
sdong 已提交
375 376 377 378
      uint64_t slowdown =
          SlowdownAmount(vstorage->NumLevelFiles(0),
                         mutable_cf_options.level0_slowdown_writes_trigger,
                         mutable_cf_options.level0_stop_writes_trigger);
379 380
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
381
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
382 383
          "[%s] Stalling writes because we have %d level-0 files (%" PRIu64
          "us)",
S
sdong 已提交
384
          name_.c_str(), vstorage->NumLevelFiles(0), slowdown);
385 386
    } else if (mutable_cf_options.hard_rate_limit > 1.0 &&
               score > mutable_cf_options.hard_rate_limit) {
387 388 389 390 391
      uint64_t kHardLimitSlowdown = 1000;
      write_controller_token_ =
          write_controller->GetDelayToken(kHardLimitSlowdown);
      internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
                                            false);
392
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
393 394 395
          "[%s] Stalling writes because we hit hard limit on level %d. "
          "(%" PRIu64 "us)",
          name_.c_str(), max_level, kHardLimitSlowdown);
396 397 398 399 400
    } else if (mutable_cf_options.soft_rate_limit > 0.0 &&
               score > mutable_cf_options.soft_rate_limit) {
      uint64_t slowdown = SlowdownAmount(score,
          mutable_cf_options.soft_rate_limit,
          mutable_cf_options.hard_rate_limit);
401 402
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
403
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
404 405 406 407 408 409
          "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
          "us)",
          name_.c_str(), max_level, slowdown);
    } else {
      write_controller_token_.reset();
    }
410 411 412
  }
}

L
Lei Jin 已提交
413
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
414
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
415 416
}

I
Igor Canadi 已提交
417 418 419
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
420

421
MemTable* ColumnFamilyData::ConstructNewMemtable(
L
Lei Jin 已提交
422
    const MutableCFOptions& mutable_cf_options) {
423
  assert(current_ != nullptr);
424 425 426 427 428 429
  return new MemTable(internal_comparator_, ioptions_,
                      mutable_cf_options, write_buffer_);
}

void ColumnFamilyData::CreateNewMemtable(
    const MutableCFOptions& mutable_cf_options) {
430 431
  if (mem_ != nullptr) {
    delete mem_->Unref();
432
  }
433
  SetMemtable(ConstructNewMemtable(mutable_cf_options));
434 435 436
  mem_->Ref();
}

437 438
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
439
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
440
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
441 442 443
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
444
  return result;
445 446
}

447 448 449 450 451
Compaction* ColumnFamilyData::CompactRange(
    const MutableCFOptions& mutable_cf_options,
    int input_level, int output_level, uint32_t output_path_id,
    const InternalKey* begin, const InternalKey* end,
    InternalKey** compaction_end) {
S
sdong 已提交
452
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
453
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
S
sdong 已提交
454 455 456 457 458
      output_level, output_path_id, begin, end, compaction_end);
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
459 460
}

461 462 463
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
    port::Mutex* db_mutex) {
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
464 465 466 467
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
    port::Mutex* db_mutex) {
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
496
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
497 498 499
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
500
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
    // storage has not been altered and no Scrape has happend. The
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

536 537 538 539
void ColumnFamilyData::NotifyOnFlushCompleted(
    DB* db, const std::string& file_path,
    bool triggered_flush_slowdown,
    bool triggered_flush_stop) {
540 541

#ifndef ROCKSDB_LITE
542 543 544 545 546 547 548
  auto listeners = ioptions()->listeners;
  for (auto listener : listeners) {
    listener->OnFlushCompleted(
        db, GetName(), file_path,
        // Use path 0 as fulled memtables are first flushed into path 0.
        triggered_flush_slowdown, triggered_flush_stop);
  }
549
#endif  // ROCKSDB_LITE
550 551
}

552
SuperVersion* ColumnFamilyData::InstallSuperVersion(
553
    SuperVersion* new_superversion, port::Mutex* db_mutex) {
554 555 556 557 558 559 560
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
    SuperVersion* new_superversion, port::Mutex* db_mutex,
    const MutableCFOptions& mutable_cf_options) {
561
  new_superversion->db_mutex = db_mutex;
562
  new_superversion->mutable_cf_options = mutable_cf_options;
563 564 565 566
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
567
  super_version_->version_number = super_version_number_;
568
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
569
  ResetThreadLocalSuperVersions();
570

571
  RecalculateWriteStallConditions(mutable_cf_options);
572

573 574 575 576 577
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
578 579
}

580 581
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
582
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
583 584
  for (auto ptr : sv_ptrs) {
    assert(ptr);
585 586 587
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
588 589 590 591 592 593 594 595
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
596
#ifndef ROCKSDB_LITE
597
Status ColumnFamilyData::SetOptions(
598 599
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
600 601 602
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
603
    mutable_cf_options_ = new_mutable_cf_options;
604
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
605
  }
606
  return s;
607
}
I
Igor Canadi 已提交
608
#endif  // ROCKSDB_LITE
609

I
Igor Canadi 已提交
610
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
611
                                 const DBOptions* db_options,
L
Lei Jin 已提交
612
                                 const EnvOptions& env_options,
613
                                 Cache* table_cache,
614
                                 WriteBuffer* write_buffer,
615
                                 WriteController* write_controller)
616
    : max_column_family_(0),
617
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
618
                                      ColumnFamilyOptions(), db_options,
619
                                      env_options, nullptr)),
I
Igor Canadi 已提交
620
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
621 622
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
623
      env_options_(env_options),
624
      table_cache_(table_cache),
625
      write_buffer_(write_buffer),
626
      write_controller_(write_controller),
627
      spin_lock_(ATOMIC_FLAG_INIT) {
628
  // initialize linked list
629 630
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
631
}
I
Igor Canadi 已提交
632 633

ColumnFamilySet::~ColumnFamilySet() {
634 635 636 637
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
638 639
    delete cfd;
  }
640
  dummy_cfd_->Unref();
641
  delete dummy_cfd_;
I
Igor Canadi 已提交
642 643 644
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
645 646
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
647 648 649 650 651 652 653 654 655 656 657
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

658 659 660
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
661 662 663 664 665
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
666 667
    return nullptr;
  }
I
Igor Canadi 已提交
668 669 670 671 672 673
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

674 675 676 677 678 679
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

680 681 682 683
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

684
// under a DB mutex
I
Igor Canadi 已提交
685 686 687 688 689
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
690 691 692
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
693 694
  Lock();
  column_families_.insert({name, id});
I
Igor Canadi 已提交
695
  column_family_data_.insert({id, new_cfd});
696
  Unlock();
I
Igor Canadi 已提交
697
  max_column_family_ = std::max(max_column_family_, id);
698
  // add to linked list
699 700 701 702 703
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
704 705 706
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
707 708 709
  return new_cfd;
}

I
Igor Canadi 已提交
710 711 712 713 714 715 716 717
void ColumnFamilySet::Lock() {
  // spin lock
  while (spin_lock_.test_and_set(std::memory_order_acquire)) {
  }
}

void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }

718 719 720 721 722 723 724 725 726 727 728 729 730 731
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
    if (cfd->refs_ == 0) {
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

732
// under a DB mutex
I
Igor Canadi 已提交
733
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
734
  auto cfd_iter = column_family_data_.find(cfd->GetID());
735
  assert(cfd_iter != column_family_data_.end());
736
  Lock();
737
  column_family_data_.erase(cfd_iter);
738 739
  column_families_.erase(cfd->GetName());
  Unlock();
I
Igor Canadi 已提交
740 741
}

742
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
743 744 745 746 747 748 749 750
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    // maybe outside of db mutex, should lock
    column_family_set_->Lock();
    current_ = column_family_set_->GetColumnFamily(column_family_id);
    column_family_set_->Unlock();
I
Igor Canadi 已提交
751 752 753 754 755
    // TODO(icanadi) Maybe remove column family from the hash table when it's
    // dropped?
    if (current_ != nullptr && current_->IsDropped()) {
      current_ = nullptr;
    }
I
Igor Canadi 已提交
756
  }
757
  handle_.SetCFD(current_);
758 759
  return current_ != nullptr;
}
760

761 762 763 764 765 766 767 768 769 770
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

771
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
772
  assert(current_ != nullptr);
773
  return &handle_;
774 775
}

I
Igor Canadi 已提交
776 777 778 779 780 781 782
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
  if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
    flush_scheduler_->ScheduleFlush(current_);
    current_->mem()->MarkFlushScheduled();
  }
}

783 784 785 786 787 788 789 790 791
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

792 793 794 795 796 797 798 799 800
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
801
}  // namespace rocksdb