column_family.cc 30.3 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24 25 26
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/writebuffer.h"
27
#include "db/internal_stats.h"
I
Igor Canadi 已提交
28
#include "db/job_context.h"
29
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
30
#include "db/version_set.h"
31
#include "db/write_controller.h"
32
#include "util/autovector.h"
33
#include "util/hash_skiplist_rep.h"
34
#include "util/options_helper.h"
I
Igor Canadi 已提交
35 36 37

namespace rocksdb {

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
//                    (top - bottom)
//  and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
  uint64_t delay;
  if (n >= top) {
    delay = 1000;
  } else if (n < bottom) {
    delay = 0;
  } else {
    // If we are here, we know that:
    //   level0_start_slowdown <= n < level0_slowdown
    // since the previous two conditions are false.
    double how_much = static_cast<double>(n - bottom) / (top - bottom);
    delay = std::max(how_much * how_much * 1000, 100.0);
  }
  assert(delay <= 1000);
  return delay;
}
}  // namespace

I
Igor Canadi 已提交
68
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
69
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
70
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
71 72 73 74 75 76 77
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
78 79 80
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
81 82 83 84
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
85
    db_->FindObsoleteFiles(&job_context, false, true);
86
    mutex_->Unlock();
I
Igor Canadi 已提交
87 88
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
89
    }
90 91 92
  }
}

93 94
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

95 96 97 98
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

99 100 101 102
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

103 104 105
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
                                    const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
106 107
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
108 109 110 111
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
112 113
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
114
#endif
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  result.compression_per_level = src.compression_per_level;
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
130 131 132
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
133 134 135 136 137
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
138 139 140 141 142 143 144 145
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
I
Igor Canadi 已提交
146 147 148 149 150 151 152
  auto& collector_factories = result.table_properties_collector_factories;
  for (size_t i = 0; i < result.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    collector_factories[i] =
        std::make_shared<UserKeyTablePropertiesCollectorFactory>(
            collector_factories[i]);
153 154
  }
  // Add collector to collect internal key statistics
I
Igor Canadi 已提交
155 156
  collector_factories.push_back(
      std::make_shared<InternalKeyPropertiesCollectorFactory>());
157

I
Igor Canadi 已提交
158 159 160 161 162 163 164 165 166
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

167 168 169 170
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
171
    Warn(db_options.info_log.get(),
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
         "This condition must be satisfied: "
         "level0_stop_writes_trigger(%d) >= "
         "level0_slowdown_writes_trigger(%d) >= "
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
188
    Warn(db_options.info_log.get(),
189 190 191 192 193 194 195 196
         "Adjust the value to "
         "level0_stop_writes_trigger(%d)"
         "level0_slowdown_writes_trigger(%d)"
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
  }
197 198 199 200 201 202 203 204 205 206
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
207

208 209 210
  return result;
}

211 212 213
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
214

215 216 217 218 219 220 221 222 223 224 225 226 227
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
228
  uint32_t previous_refs = refs.fetch_sub(1);
229 230
  assert(previous_refs > 0);
  return previous_refs == 1;
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

254 255
namespace {
void SuperVersionUnrefHandle(void* ptr) {
256 257 258 259
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
260 261 262 263 264 265 266 267 268 269
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

270 271 272 273 274
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
275 276
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
277
      dummy_versions_(_dummy_versions),
278
      current_(nullptr),
279 280
      refs_(0),
      dropped_(false),
281
      internal_comparator_(cf_options.comparator),
282 283
      options_(*db_options,
               SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
L
Lei Jin 已提交
284
      ioptions_(options_),
285
      mutable_cf_options_(options_, ioptions_),
286
      write_buffer_(write_buffer),
287
      mem_(nullptr),
I
Igor Canadi 已提交
288
      imm_(options_.min_write_buffer_number_to_merge),
289 290
      super_version_(nullptr),
      super_version_number_(0),
291
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
292 293
      next_(nullptr),
      prev_(nullptr),
294
      log_number_(0),
295 296 297
      column_family_set_(column_family_set),
      pending_flush_(false),
      pending_compaction_(false) {
298 299
  Ref();

I
Igor Canadi 已提交
300 301
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
302
    internal_stats_.reset(
303
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
304
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
305
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
306
      compaction_picker_.reset(
307
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
308 309 310 311
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
312
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
313
      compaction_picker_.reset(
314
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
315 316 317 318 319 320 321
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
322
#endif  // !ROCKSDB_LITE
323 324 325 326 327 328 329
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
330
    }
331

332 333 334 335 336 337 338 339
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
      options_.Dump(ioptions_.info_log);
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
340
  }
341

342
  RecalculateWriteStallConditions(mutable_cf_options_);
343
}
I
Igor Canadi 已提交
344

345
// DB mutex held
I
Igor Canadi 已提交
346
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
347
  assert(refs_.load(std::memory_order_relaxed) == 0);
348 349 350 351 352 353
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
354 355 356 357
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
358
    column_family_set_->RemoveColumnFamily(this);
359 360 361 362 363 364
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

365 366 367 368 369
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
370 371 372 373 374 375 376 377 378 379 380 381 382 383
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
384

385 386
  if (dummy_versions_ != nullptr) {
    // List must be empty
387 388 389
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
390
  }
391

392 393
  if (mem_ != nullptr) {
    delete mem_->Unref();
394
  }
395
  autovector<MemTable*> to_delete;
396
  imm_.current()->Unref(&to_delete);
397 398 399 400 401
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
402 403 404 405 406 407 408 409 410 411
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

412 413
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
414
  if (current_ != nullptr) {
S
sdong 已提交
415
    auto* vstorage = current_->storage_info();
416 417
    const double score = vstorage->max_compaction_score();
    const int max_level = vstorage->max_compaction_score_level();
418 419 420

    auto write_controller = column_family_set_->write_controller_;

L
Lei Jin 已提交
421
    if (imm()->size() >= mutable_cf_options.max_write_buffer_number) {
422 423
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
424
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
425
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
426 427 428
          "(waiting for flush), max_write_buffer_number is set to %d",
          name_.c_str(), imm()->size(),
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
429
    } else if (vstorage->l0_delay_trigger_count() >=
430
               mutable_cf_options.level0_stop_writes_trigger) {
431 432
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
433
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
434
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
435
          name_.c_str(), vstorage->l0_delay_trigger_count());
436
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
437
               vstorage->l0_delay_trigger_count() >=
438
                   mutable_cf_options.level0_slowdown_writes_trigger) {
S
sdong 已提交
439
      uint64_t slowdown =
S
sdong 已提交
440
          SlowdownAmount(vstorage->l0_delay_trigger_count(),
S
sdong 已提交
441 442
                         mutable_cf_options.level0_slowdown_writes_trigger,
                         mutable_cf_options.level0_stop_writes_trigger);
443 444
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
445
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
446 447
          "[%s] Stalling writes because we have %d level-0 files (%" PRIu64
          "us)",
S
sdong 已提交
448
          name_.c_str(), vstorage->l0_delay_trigger_count(), slowdown);
449 450
    } else if (mutable_cf_options.hard_rate_limit > 1.0 &&
               score > mutable_cf_options.hard_rate_limit) {
451 452 453
      uint64_t kHardLimitSlowdown = 1000;
      write_controller_token_ =
          write_controller->GetDelayToken(kHardLimitSlowdown);
454
      internal_stats_->RecordLevelNSlowdown(max_level, false);
455
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
456 457 458
          "[%s] Stalling writes because we hit hard limit on level %d. "
          "(%" PRIu64 "us)",
          name_.c_str(), max_level, kHardLimitSlowdown);
459 460 461 462 463
    } else if (mutable_cf_options.soft_rate_limit > 0.0 &&
               score > mutable_cf_options.soft_rate_limit) {
      uint64_t slowdown = SlowdownAmount(score,
          mutable_cf_options.soft_rate_limit,
          mutable_cf_options.hard_rate_limit);
464
      write_controller_token_ = write_controller->GetDelayToken(slowdown);
465
      internal_stats_->RecordLevelNSlowdown(max_level, true);
466
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
467 468 469 470 471 472
          "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
          "us)",
          name_.c_str(), max_level, slowdown);
    } else {
      write_controller_token_.reset();
    }
473 474 475
  }
}

L
Lei Jin 已提交
476
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
477
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
478 479
}

I
Igor Canadi 已提交
480 481 482
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
483

484 485 486 487
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

488
MemTable* ColumnFamilyData::ConstructNewMemtable(
L
Lei Jin 已提交
489
    const MutableCFOptions& mutable_cf_options) {
490
  assert(current_ != nullptr);
491 492 493 494 495 496
  return new MemTable(internal_comparator_, ioptions_,
                      mutable_cf_options, write_buffer_);
}

void ColumnFamilyData::CreateNewMemtable(
    const MutableCFOptions& mutable_cf_options) {
497 498
  if (mem_ != nullptr) {
    delete mem_->Unref();
499
  }
500
  SetMemtable(ConstructNewMemtable(mutable_cf_options));
501 502 503
  mem_->Ref();
}

504 505 506 507
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

508 509
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
510
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
511
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
512 513 514
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
515
  return result;
516 517
}

518 519 520 521 522
Compaction* ColumnFamilyData::CompactRange(
    const MutableCFOptions& mutable_cf_options,
    int input_level, int output_level, uint32_t output_path_id,
    const InternalKey* begin, const InternalKey* end,
    InternalKey** compaction_end) {
S
sdong 已提交
523
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
524
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
S
sdong 已提交
525 526 527 528 529
      output_level, output_path_id, begin, end, compaction_end);
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
530 531
}

532
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
533
    InstrumentedMutex* db_mutex) {
534
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
535 536 537 538
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
539 540 541 542 543
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
544
    InstrumentedMutex* db_mutex) {
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
567
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
568 569 570
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
571
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
    // storage has not been altered and no Scrape has happend. The
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

O
Ori Bernstein 已提交
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
void ColumnFamilyData::NotifyOnCompactionCompleted(
    DB* db, Compaction* c, const Status& status) {
#ifndef ROCKSDB_LITE
  auto listeners = ioptions()->listeners;
  CompactionJobInfo info;
  info.cf_name = c->column_family_data()->GetName();
  info.status = status;
  info.output_level = c->output_level();
  for (const auto fmd : *c->inputs(c->level())) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      fmd->fd.GetNumber(),
                      fmd->fd.GetPathId()));
  }
  for (const auto newf : c->edit()->GetNewFiles()) {
    info.input_files.push_back(
        TableFileName(options_.db_paths,
                      newf.second.fd.GetNumber(),
                      newf.second.fd.GetPathId()));
  }
  for (auto listener : listeners) {
    listener->OnCompactionCompleted(db, info);
  }
#endif  // ROCKSDB_LITE
}

633 634 635 636
void ColumnFamilyData::NotifyOnFlushCompleted(
    DB* db, const std::string& file_path,
    bool triggered_flush_slowdown,
    bool triggered_flush_stop) {
637 638

#ifndef ROCKSDB_LITE
639 640 641 642 643 644 645
  auto listeners = ioptions()->listeners;
  for (auto listener : listeners) {
    listener->OnFlushCompleted(
        db, GetName(), file_path,
        // Use path 0 as fulled memtables are first flushed into path 0.
        triggered_flush_slowdown, triggered_flush_stop);
  }
646
#endif  // ROCKSDB_LITE
647 648
}

649
SuperVersion* ColumnFamilyData::InstallSuperVersion(
650
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
651 652 653 654 655
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
656
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
657
    const MutableCFOptions& mutable_cf_options) {
658
  new_superversion->db_mutex = db_mutex;
659
  new_superversion->mutable_cf_options = mutable_cf_options;
660 661 662 663
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
664
  super_version_->version_number = super_version_number_;
665
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
666
  ResetThreadLocalSuperVersions();
667

668
  RecalculateWriteStallConditions(mutable_cf_options);
669

670 671 672 673 674
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
675 676
}

677 678
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
679
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
680 681
  for (auto ptr : sv_ptrs) {
    assert(ptr);
682 683 684
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
685 686 687 688 689 690 691 692
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
693
#ifndef ROCKSDB_LITE
694
Status ColumnFamilyData::SetOptions(
695 696
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
697 698 699
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
700
    mutable_cf_options_ = new_mutable_cf_options;
701
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
702
  }
703
  return s;
704
}
I
Igor Canadi 已提交
705
#endif  // ROCKSDB_LITE
706

I
Igor Canadi 已提交
707
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
708
                                 const DBOptions* db_options,
L
Lei Jin 已提交
709
                                 const EnvOptions& env_options,
710
                                 Cache* table_cache,
711
                                 WriteBuffer* write_buffer,
712
                                 WriteController* write_controller)
713
    : max_column_family_(0),
714
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
715
                                      ColumnFamilyOptions(), db_options,
716
                                      env_options, nullptr)),
I
Igor Canadi 已提交
717
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
718 719
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
720
      env_options_(env_options),
721
      table_cache_(table_cache),
722
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
723
      write_controller_(write_controller) {
724
  // initialize linked list
725 726
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
727
}
I
Igor Canadi 已提交
728 729

ColumnFamilySet::~ColumnFamilySet() {
730 731 732 733
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
734 735
    delete cfd;
  }
736
  dummy_cfd_->Unref();
737
  delete dummy_cfd_;
I
Igor Canadi 已提交
738 739 740
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
741 742
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
743 744 745 746 747 748 749 750 751 752 753
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

754 755 756
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
757 758 759 760 761
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
762 763
    return nullptr;
  }
I
Igor Canadi 已提交
764 765 766 767 768 769
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

770 771 772 773 774 775
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

776 777 778 779
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
780
// under a DB mutex AND write thread
I
Igor Canadi 已提交
781 782 783 784 785
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
786 787 788
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
789
  column_families_.insert({name, id});
I
Igor Canadi 已提交
790 791
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
792
  // add to linked list
793 794 795 796 797
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
798 799 800
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
801 802 803
  return new_cfd;
}

804 805 806 807
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
808
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
809 810 811 812 813 814 815 816 817
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
818
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
819
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
820
  auto cfd_iter = column_family_data_.find(cfd->GetID());
821 822
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
823
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
824 825
}

I
Igor Canadi 已提交
826
// under a DB mutex OR from a write thread
827
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
828 829 830 831 832 833
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
834
  handle_.SetCFD(current_);
835 836
  return current_ != nullptr;
}
837

838 839 840 841 842 843 844 845 846 847
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

848
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
849
  assert(current_ != nullptr);
850
  return &handle_;
851 852
}

I
Igor Canadi 已提交
853 854 855 856 857 858 859
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
  if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
    flush_scheduler_->ScheduleFlush(current_);
    current_->mem()->MarkFlushScheduled();
  }
}

860 861 862 863 864 865 866 867 868
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

869 870 871 872 873 874 875 876 877
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
878
}  // namespace rocksdb