column_family.cc 44.9 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
S
Siying Dong 已提交
23
#include "db/compaction_picker_universal.h"
24
#include "db/db_impl.h"
25
#include "db/internal_stats.h"
26
#include "db/job_context.h"
27
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
28
#include "db/version_set.h"
29
#include "db/write_controller.h"
30
#include "memtable/hash_skiplist_rep.h"
31 32
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
33
#include "table/block_based_table_factory.h"
34
#include "util/autovector.h"
35
#include "util/compression.h"
I
Igor Canadi 已提交
36 37 38

namespace rocksdb {

I
Igor Canadi 已提交
39
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
40
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
41
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
42 43 44 45 46 47 48
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
49 50 51 52 53
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
54 55 56
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
57 58 59 60
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
61
    db_->FindObsoleteFiles(&job_context, false, true);
62
    mutex_->Unlock();
I
Igor Canadi 已提交
63 64
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
65
    }
Y
Yueh-Hsuan Chiang 已提交
66
    job_context.Clean();
67 68 69
  }
}

70 71
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

72 73 74 75
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

76 77 78 79
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
80
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
81 82 83 84 85 86
  return Status::OK();
#else
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

87
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
88 89 90
  return cfd()->user_comparator();
}

91
void GetIntTblPropCollectorFactory(
92
    const ImmutableCFOptions& ioptions,
93 94
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
95 96
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
97 98 99 100 101 102 103 104 105 106
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
    if (!CompressionTypeSupported(CompressionType::kZSTD)) {
      // Dictionary trainer is available since v0.6.1, but ZSTD was marked
      // stable only since v0.8.0. For now we enable the feature in stable
      // versions only.
      return Status::InvalidArgument(
          "zstd dictionary trainer cannot be used because " +
          CompressionTypeToString(CompressionType::kZSTD) +
          " is not linked with the binary.");
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
142 143 144
  return Status::OK();
}

145 146 147 148 149 150
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
151 152 153 154
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
155 156 157
  return Status::OK();
}

158
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
159
                                    const ColumnFamilyOptions& src) {
160
  ColumnFamilyOptions result = src;
161 162
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
163
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
164
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
165 166 167
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
168 169 170 171 172 173
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
174 175 176 177
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
178 179 180 181
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

182 183 184 185 186 187 188
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
189 190 191 192 193 194

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

195 196 197
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
198 199 200
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
201 202 203 204 205 206
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
207

208 209 210 211 212
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
213 214 215 216
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
217 218 219 220 221 222 223 224
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

225 226 227 228
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

229
  if (result.level0_file_num_compaction_trigger == 0) {
230 231
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
232 233 234
    result.level0_file_num_compaction_trigger = 1;
  }

235 236 237 238
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
239 240 241 242 243 244 245 246
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
247 248 249 250 251 252 253 254 255
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
256 257 258 259 260 261 262 263
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
264
  }
265 266 267 268 269 270 271 272 273 274 275

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

276 277 278 279 280 281 282 283 284 285
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
286

287 288 289 290
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

291 292 293
  return result;
}

294 295 296
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
297

298 299 300 301 302 303 304 305 306 307 308 309 310
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
311
  uint32_t previous_refs = refs.fetch_sub(1);
312 313
  assert(previous_refs > 0);
  return previous_refs == 1;
314 315 316 317 318 319 320
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
321 322 323
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

340 341
namespace {
void SuperVersionUnrefHandle(void* ptr) {
342 343 344 345
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
346 347 348 349 350 351 352 353 354 355
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

356 357
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
358
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
359
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
360
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
361 362
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
363
      dummy_versions_(_dummy_versions),
364
      current_(nullptr),
365
      refs_(0),
366
      initialized_(false),
367
      dropped_(false),
368
      internal_comparator_(cf_options.comparator),
369
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
370 371
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
372 373
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
374
      write_buffer_manager_(write_buffer_manager),
375
      mem_(nullptr),
376 377
      imm_(ioptions_.min_write_buffer_number_to_merge,
           ioptions_.max_write_buffer_number_to_maintain),
378 379
      super_version_(nullptr),
      super_version_number_(0),
380
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
381 382
      next_(nullptr),
      prev_(nullptr),
383
      log_number_(0),
384 385
      column_family_set_(column_family_set),
      pending_flush_(false),
386
      pending_compaction_(false),
387
      prev_compaction_needed_bytes_(0),
388 389
      allow_2pc_(db_options.allow_2pc),
      last_memtable_id_(0) {
390 391
  Ref();

392
  // Convert user defined table properties collector factories to internal ones.
393
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
394

I
Igor Canadi 已提交
395 396
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
397
    internal_stats_.reset(
398
        new InternalStats(ioptions_.num_levels, db_options.env, this));
I
Igor Canadi 已提交
399
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
400
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
401
      compaction_picker_.reset(
402
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
403 404 405 406
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
407
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
408
      compaction_picker_.reset(
409
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
410 411 412
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
413 414 415 416
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
417
#endif  // !ROCKSDB_LITE
418
    } else {
419 420 421 422
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
423 424
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
425
    }
426

427
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
428 429 430
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
431
      initial_cf_options_.Dump(ioptions_.info_log);
432
    } else {
433
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
434
    }
435
  }
436

437
  RecalculateWriteStallConditions(mutable_cf_options_);
438
}
I
Igor Canadi 已提交
439

440
// DB mutex held
I
Igor Canadi 已提交
441
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
442
  assert(refs_.load(std::memory_order_relaxed) == 0);
443 444 445 446 447 448
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
449 450 451 452
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
453
    column_family_set_->RemoveColumnFamily(this);
454 455 456 457 458 459
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

460 461 462 463 464
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
465 466 467 468 469 470 471
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

T
Tamir Duberstein 已提交
472
    bool is_last_reference __attribute__((__unused__));
I
Igor Canadi 已提交
473 474 475 476 477 478
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
479

480 481
  if (dummy_versions_ != nullptr) {
    // List must be empty
482
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
483
    bool deleted __attribute__((__unused__));
484
    deleted = dummy_versions_->Unref();
485
    assert(deleted);
486
  }
487

488 489
  if (mem_ != nullptr) {
    delete mem_->Unref();
490
  }
491
  autovector<MemTable*> to_delete;
492
  imm_.current()->Unref(&to_delete);
493 494 495 496 497
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
498 499 500 501 502 503 504 505 506 507
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

508
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
509
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
510 511
}

512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
    auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
531 532 533 534
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
535 536

namespace {
S
Siying Dong 已提交
537
// If penalize_stop is true, we further reduce slowdown rate.
538
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
539 540
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
541
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
542
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
543

544
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
567 568 569 570
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
571
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
572 573 574 575 576
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
577 578 579
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
580 581 582 583 584 585 586 587
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
588 589 590
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
591
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
592
                                         kDecSlowdownRatio);
593 594 595 596 597 598 599
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
600 601 602 603 604 605

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

606 607 608 609
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

610 611
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
612

613 614
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
615 616 617 618 619 620
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

621 622
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
623
  // Or twice as compaction trigger, if it is smaller.
624 625 626 627
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
628 629
    // res fits in int
    return static_cast<int>(res);
630
  }
631
}
632 633
}  // namespace

634
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
635
      const MutableCFOptions& mutable_cf_options) {
636
  auto write_stall_condition = WriteStallCondition::kNormal;
637
  if (current_ != nullptr) {
S
sdong 已提交
638
    auto* vstorage = current_->storage_info();
639
    auto write_controller = column_family_set_->write_controller_;
640 641
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
642

S
Siying Dong 已提交
643 644 645
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

646
    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
647
      write_controller_token_ = write_controller->GetStopToken();
648
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
649
      write_stall_condition = WriteStallCondition::kStopped;
650 651
      ROCKS_LOG_WARN(
          ioptions_.info_log,
652
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
653
          "(waiting for flush), max_write_buffer_number is set to %d",
654
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
655
          mutable_cf_options.max_write_buffer_number);
656 657 658
    } else if (!mutable_cf_options.disable_auto_compactions &&
               vstorage->l0_delay_trigger_count() >=
                   mutable_cf_options.level0_stop_writes_trigger) {
659
      write_controller_token_ = write_controller->GetStopToken();
660
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
661
      write_stall_condition = WriteStallCondition::kStopped;
662 663
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
664
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
665
      }
666 667 668
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
669 670
    } else if (!mutable_cf_options.disable_auto_compactions &&
               mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
671
               compaction_needed_bytes >=
672 673 674
                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
675
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
676
      write_stall_condition = WriteStallCondition::kStopped;
677 678
      ROCKS_LOG_WARN(
          ioptions_.info_log,
679 680
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
681
          name_.c_str(), compaction_needed_bytes);
682 683 684 685
    } else if (mutable_cf_options.max_write_buffer_number > 3 &&
               imm()->NumNotFlushed() >=
                   mutable_cf_options.max_write_buffer_number - 1) {
      write_controller_token_ =
686
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
687
                     prev_compaction_needed_bytes_, was_stopped,
688
                     mutable_cf_options.disable_auto_compactions);
689
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
690
      write_stall_condition = WriteStallCondition::kDelayed;
691 692
      ROCKS_LOG_WARN(
          ioptions_.info_log,
693 694 695 696 697 698
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
699 700
    } else if (!mutable_cf_options.disable_auto_compactions &&
               mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
701
               vstorage->l0_delay_trigger_count() >=
702
                   mutable_cf_options.level0_slowdown_writes_trigger) {
S
Siying Dong 已提交
703 704 705
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
706
      write_controller_token_ =
707
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
708
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
709
                     mutable_cf_options.disable_auto_compactions);
710 711
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
712
      write_stall_condition = WriteStallCondition::kDelayed;
713 714
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
715
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
716
      }
717 718 719 720 721
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
722 723
    } else if (!mutable_cf_options.disable_auto_compactions &&
               mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
724 725
               vstorage->estimated_compaction_needed_bytes() >=
                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
S
Siying Dong 已提交
726 727 728 729 730 731 732 733 734 735 736
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

737
      write_controller_token_ =
738
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
739
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
740
                     mutable_cf_options.disable_auto_compactions);
741
      internal_stats_->AddCFStats(
742
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
743
      write_stall_condition = WriteStallCondition::kDelayed;
744 745
      ROCKS_LOG_WARN(
          ioptions_.info_log,
746
          "[%s] Stalling writes because of estimated pending compaction "
747 748 749
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
750 751 752 753 754 755 756
    } else {
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
757
        ROCKS_LOG_INFO(
758
            ioptions_.info_log,
S
Siying Dong 已提交
759 760 761 762 763 764 765 766 767 768 769 770
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
771
          ROCKS_LOG_INFO(
772
              ioptions_.info_log,
S
Siying Dong 已提交
773 774 775 776 777 778 779 780 781 782 783 784 785 786 787
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
788 789 790 791 792 793
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
794
      }
795
    }
796
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
797
  }
798
  return write_stall_condition;
799 800
}

L
Lei Jin 已提交
801
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
802
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
803 804
}

I
Igor Canadi 已提交
805 806 807
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
808

809 810 811 812
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

813 814 815 816
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

817
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
818 819
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
820
                      write_buffer_manager_, earliest_seq, id_);
821 822 823
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
824
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
825 826
  if (mem_ != nullptr) {
    delete mem_->Unref();
827
  }
A
agiardullo 已提交
828
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
829 830 831
  mem_->Ref();
}

832 833 834 835
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

836 837
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
838
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
839
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
840 841 842
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
843
  return result;
844 845
}

846 847 848 849 850 851 852
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

853
const int ColumnFamilyData::kCompactAllLevels = -1;
854
const int ColumnFamilyData::kCompactToBaseLevel = -2;
855

856
Compaction* ColumnFamilyData::CompactRange(
857 858 859
    const MutableCFOptions& mutable_cf_options, int input_level,
    int output_level, uint32_t output_path_id, const InternalKey* begin,
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
860
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
861
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
862
      output_level, output_path_id, begin, end, compaction_end, conflict);
S
sdong 已提交
863 864 865 866
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
867 868
}

869
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
870
    InstrumentedMutex* db_mutex) {
871
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
872 873 874
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
875 876 877 878
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
879
    sv->Unref();
880 881 882 883 884
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
885
    InstrumentedMutex* db_mutex) {
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
908
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
909 910 911
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
912
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
936
    // storage has not been altered and no Scrape has happened. The
937 938 939 940 941 942 943 944 945 946 947
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

948 949
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
950
  db_mutex->AssertHeld();
951
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
952 953
}

954 955
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
956
    const MutableCFOptions& mutable_cf_options) {
957
  SuperVersion* new_superversion = sv_context->new_superversion.release();
958
  new_superversion->db_mutex = db_mutex;
959
  new_superversion->mutable_cf_options = mutable_cf_options;
960 961 962 963
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
964
  super_version_->version_number = super_version_number_;
965 966 967 968
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
969 970 971 972
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
973 974 975 976 977 978 979 980 981 982 983 984
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }

985
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
986
  ResetThreadLocalSuperVersions();
I
Igor Canadi 已提交
987 988
}

989 990
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
991
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
992 993
  for (auto ptr : sv_ptrs) {
    assert(ptr);
994 995 996
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
997 998 999 1000 1001 1002 1003 1004
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
1005
#ifndef ROCKSDB_LITE
1006
Status ColumnFamilyData::SetOptions(
1007 1008
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
1009 1010 1011
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
1012
    mutable_cf_options_ = new_mutable_cf_options;
1013
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1014
  }
1015
  return s;
1016
}
I
Igor Canadi 已提交
1017
#endif  // ROCKSDB_LITE
1018

S
Stream  
Shaohua Li 已提交
1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

I
Igor Canadi 已提交
1037
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1038
                                 const ImmutableDBOptions* db_options,
L
Lei Jin 已提交
1039
                                 const EnvOptions& env_options,
1040
                                 Cache* table_cache,
1041
                                 WriteBufferManager* write_buffer_manager,
1042
                                 WriteController* write_controller)
1043
    : max_column_family_(0),
1044
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
1045
                                      ColumnFamilyOptions(), *db_options,
1046
                                      env_options, nullptr)),
I
Igor Canadi 已提交
1047
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1048 1049
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
1050
      env_options_(env_options),
1051
      table_cache_(table_cache),
1052
      write_buffer_manager_(write_buffer_manager),
I
Igor Canadi 已提交
1053
      write_controller_(write_controller) {
1054
  // initialize linked list
1055 1056
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1057
}
I
Igor Canadi 已提交
1058 1059

ColumnFamilySet::~ColumnFamilySet() {
1060 1061 1062
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1063 1064
    bool last_ref __attribute__((__unused__));
    last_ref = cfd->Unref();
1065
    assert(last_ref);
I
Igor Canadi 已提交
1066 1067
    delete cfd;
  }
1068 1069
  bool dummy_last_ref __attribute__((__unused__));
  dummy_last_ref = dummy_cfd_->Unref();
1070
  assert(dummy_last_ref);
1071
  delete dummy_cfd_;
I
Igor Canadi 已提交
1072 1073 1074
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1075 1076
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1088 1089 1090
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1091 1092 1093 1094 1095
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1096 1097
    return nullptr;
  }
I
Igor Canadi 已提交
1098 1099 1100 1101 1102 1103
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1104 1105 1106 1107 1108 1109
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1110 1111 1112 1113
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1114
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1115 1116 1117 1118
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1119 1120
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1121
      *db_options_, env_options_, this);
1122
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1123 1124
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1125
  // add to linked list
1126 1127 1128 1129 1130
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1131 1132 1133
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1134 1135 1136
  return new_cfd;
}

1137 1138 1139 1140
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1141
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1142 1143 1144 1145 1146 1147 1148 1149 1150
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1151
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1152
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1153
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1154 1155
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1156
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1157 1158
}

I
Igor Canadi 已提交
1159
// under a DB mutex OR from a write thread
1160
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1161 1162 1163 1164 1165 1166
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1167
  handle_.SetCFD(current_);
1168 1169
  return current_ != nullptr;
}
1170

1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1181
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1182
  assert(current_ != nullptr);
1183
  return &handle_;
1184 1185
}

1186 1187 1188 1189 1190 1191 1192 1193 1194
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1195 1196 1197
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1198
    return column_family->GetComparator();
1199 1200 1201 1202
  }
  return nullptr;
}

I
Igor Canadi 已提交
1203
}  // namespace rocksdb