column_family.cc 49.8 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
S
Siying Dong 已提交
23
#include "db/compaction_picker_universal.h"
24
#include "db/db_impl.h"
25
#include "db/internal_stats.h"
26
#include "db/job_context.h"
27
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
28
#include "db/version_set.h"
29
#include "db/write_controller.h"
30
#include "memtable/hash_skiplist_rep.h"
31 32
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
33
#include "table/block_based_table_factory.h"
34
#include "table/merging_iterator.h"
35
#include "util/autovector.h"
36
#include "util/compression.h"
I
Igor Canadi 已提交
37 38 39

namespace rocksdb {

I
Igor Canadi 已提交
40
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
41
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
42
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
43 44 45 46 47 48 49
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
50 51 52 53 54
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
55 56 57
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
58 59 60 61
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
62
    db_->FindObsoleteFiles(&job_context, false, true);
63
    mutex_->Unlock();
I
Igor Canadi 已提交
64 65
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
66
    }
Y
Yueh-Hsuan Chiang 已提交
67
    job_context.Clean();
68 69 70
  }
}

71 72
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

73 74 75 76
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

77 78 79 80
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
81
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
82 83 84 85 86 87
  return Status::OK();
#else
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

88
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
89 90 91
  return cfd()->user_comparator();
}

92
void GetIntTblPropCollectorFactory(
93
    const ImmutableCFOptions& ioptions,
94 95
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
96 97
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
98 99 100 101 102 103 104 105 106 107
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
    if (!CompressionTypeSupported(CompressionType::kZSTD)) {
      // Dictionary trainer is available since v0.6.1, but ZSTD was marked
      // stable only since v0.8.0. For now we enable the feature in stable
      // versions only.
      return Status::InvalidArgument(
          "zstd dictionary trainer cannot be used because " +
          CompressionTypeToString(CompressionType::kZSTD) +
          " is not linked with the binary.");
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
143 144 145
  return Status::OK();
}

146 147 148 149 150 151
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
152 153 154 155
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
156 157 158
  return Status::OK();
}

159
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
160
                                    const ColumnFamilyOptions& src) {
161
  ColumnFamilyOptions result = src;
162 163
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
164
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
165
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
166 167 168
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
169 170 171 172 173 174
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
175 176 177 178
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
179 180 181 182
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

183 184 185 186 187 188 189
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
190 191 192 193 194 195

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

196 197 198
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
199 200 201
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
202 203 204 205 206 207
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
208

209 210 211 212 213
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
214 215 216 217
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
218 219 220 221 222 223 224 225
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

226 227 228 229
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

230
  if (result.level0_file_num_compaction_trigger == 0) {
231 232
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
233 234 235
    result.level0_file_num_compaction_trigger = 1;
  }

236 237 238 239
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
240 241 242 243 244 245 246 247
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
248 249 250 251 252 253 254 255 256
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
257 258 259 260 261 262 263 264
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
265
  }
266 267 268 269 270 271 272 273 274 275 276

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

277 278 279 280 281 282 283 284 285 286
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
287

288 289 290 291
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

292 293 294
  return result;
}

295 296 297
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
298

299 300 301 302 303 304 305 306 307 308 309 310 311
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
312
  uint32_t previous_refs = refs.fetch_sub(1);
313 314
  assert(previous_refs > 0);
  return previous_refs == 1;
315 316 317 318 319 320 321
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
322 323 324
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

341 342
namespace {
void SuperVersionUnrefHandle(void* ptr) {
343 344 345 346
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
347
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
348 349 350 351 352 353 354
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
355 356 357
}
}  // anonymous namespace

358 359
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
360
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
361
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
362
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
363 364
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
365
      dummy_versions_(_dummy_versions),
366
      current_(nullptr),
367
      refs_(0),
368
      initialized_(false),
369
      dropped_(false),
370
      internal_comparator_(cf_options.comparator),
371
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
372 373
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
374 375
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
376
      write_buffer_manager_(write_buffer_manager),
377
      mem_(nullptr),
378 379
      imm_(ioptions_.min_write_buffer_number_to_merge,
           ioptions_.max_write_buffer_number_to_maintain),
380 381
      super_version_(nullptr),
      super_version_number_(0),
382
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
383 384
      next_(nullptr),
      prev_(nullptr),
385
      log_number_(0),
386
      flush_reason_(FlushReason::kUnknown),
387 388
      column_family_set_(column_family_set),
      pending_flush_(false),
389
      pending_compaction_(false),
390
      prev_compaction_needed_bytes_(0),
391 392
      allow_2pc_(db_options.allow_2pc),
      last_memtable_id_(0) {
393 394
  Ref();

395
  // Convert user defined table properties collector factories to internal ones.
396
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
397

I
Igor Canadi 已提交
398 399
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
400
    internal_stats_.reset(
401
        new InternalStats(ioptions_.num_levels, db_options.env, this));
I
Igor Canadi 已提交
402
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
403
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
404
      compaction_picker_.reset(
405
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
406 407 408 409
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
410
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
411
      compaction_picker_.reset(
412
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
413 414 415
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
416 417 418 419
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
420
#endif  // !ROCKSDB_LITE
421
    } else {
422 423 424 425
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
426 427
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
428
    }
429

430
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
431 432 433
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
434
      initial_cf_options_.Dump(ioptions_.info_log);
435
    } else {
436
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
437
    }
438
  }
439

440
  RecalculateWriteStallConditions(mutable_cf_options_);
441
}
I
Igor Canadi 已提交
442

443
// DB mutex held
I
Igor Canadi 已提交
444
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
445
  assert(refs_.load(std::memory_order_relaxed) == 0);
446 447 448 449 450 451
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
452 453 454 455
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
456
    column_family_set_->RemoveColumnFamily(this);
457 458 459 460 461 462
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

463 464 465 466 467
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
468 469 470 471 472 473 474
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

T
Tamir Duberstein 已提交
475
    bool is_last_reference __attribute__((__unused__));
I
Igor Canadi 已提交
476 477 478 479 480 481
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
482

483 484
  if (dummy_versions_ != nullptr) {
    // List must be empty
485
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
486
    bool deleted __attribute__((__unused__));
487
    deleted = dummy_versions_->Unref();
488
    assert(deleted);
489
  }
490

491 492
  if (mem_ != nullptr) {
    delete mem_->Unref();
493
  }
494
  autovector<MemTable*> to_delete;
495
  imm_.current()->Unref(&to_delete);
496 497 498 499 500
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
501 502 503 504 505 506 507 508 509 510
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

511
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
512
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
513 514
}

515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
    auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
534 535 536 537
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
538 539

namespace {
S
Siying Dong 已提交
540
// If penalize_stop is true, we further reduce slowdown rate.
541
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
542 543
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
544
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
545
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
546

547
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
570 571 572 573
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
574
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
575 576 577 578 579
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
580 581 582
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
583 584 585 586 587 588 589 590
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
591 592 593
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
594
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
595
                                         kDecSlowdownRatio);
596 597 598 599 600 601 602
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
603 604 605 606 607 608

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

609 610 611 612
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

613 614
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
615

616 617
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
618 619 620 621 622 623
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

624 625
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
626
  // Or twice as compaction trigger, if it is smaller.
627 628 629 630
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
631 632
    // res fits in int
    return static_cast<int>(res);
633
  }
634
}
635 636
}  // namespace

637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
    const MutableCFOptions& mutable_cf_options) {
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
                 mutable_cf_options.max_write_buffer_number - 1) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

672
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
673
      const MutableCFOptions& mutable_cf_options) {
674
  auto write_stall_condition = WriteStallCondition::kNormal;
675
  if (current_ != nullptr) {
S
sdong 已提交
676
    auto* vstorage = current_->storage_info();
677
    auto write_controller = column_family_set_->write_controller_;
678 679
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
680

681 682 683 684 685 686
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
687 688 689
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

690 691
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
692
      write_controller_token_ = write_controller->GetStopToken();
693
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
694 695
      ROCKS_LOG_WARN(
          ioptions_.info_log,
696
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
697
          "(waiting for flush), max_write_buffer_number is set to %d",
698
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
699
          mutable_cf_options.max_write_buffer_number);
700 701
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
702
      write_controller_token_ = write_controller->GetStopToken();
703
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
704 705
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
706
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
707
      }
708 709 710
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
711 712
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
713 714
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
715
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
716 717
      ROCKS_LOG_WARN(
          ioptions_.info_log,
718 719
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
720
          name_.c_str(), compaction_needed_bytes);
721 722
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
723
      write_controller_token_ =
724
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
725
                     prev_compaction_needed_bytes_, was_stopped,
726
                     mutable_cf_options.disable_auto_compactions);
727
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
728 729
      ROCKS_LOG_WARN(
          ioptions_.info_log,
730 731 732 733 734 735
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
736 737
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
738 739 740
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
741
      write_controller_token_ =
742
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
743
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
744
                     mutable_cf_options.disable_auto_compactions);
745 746
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
747 748
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
749
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
750
      }
751 752 753 754 755
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
756 757
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
758 759 760 761 762 763 764 765 766 767 768
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

769
      write_controller_token_ =
770
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
771
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
772
                     mutable_cf_options.disable_auto_compactions);
773
      internal_stats_->AddCFStats(
774
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
775 776
      ROCKS_LOG_WARN(
          ioptions_.info_log,
777
          "[%s] Stalling writes because of estimated pending compaction "
778 779 780
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
781
    } else {
782
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
783 784 785 786 787 788
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
789
        ROCKS_LOG_INFO(
790
            ioptions_.info_log,
S
Siying Dong 已提交
791 792 793 794 795 796 797 798 799 800 801 802
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
803
          ROCKS_LOG_INFO(
804
              ioptions_.info_log,
S
Siying Dong 已提交
805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
820 821 822 823 824 825
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
826
      }
827
    }
828
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
829
  }
830
  return write_stall_condition;
831 832
}

L
Lei Jin 已提交
833
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
834
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
835 836
}

I
Igor Canadi 已提交
837 838 839
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
840

841 842 843 844
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

845 846 847 848
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

849
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
850 851
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
852
                      write_buffer_manager_, earliest_seq, id_);
853 854 855
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
856
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
857 858
  if (mem_ != nullptr) {
    delete mem_->Unref();
859
  }
A
agiardullo 已提交
860
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
861 862 863
  mem_->Ref();
}

864 865 866 867
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

868 869
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
870
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
871
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
872 873 874
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
875
  return result;
876 877
}

878 879 880 881 882 883 884
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

  std::vector<InternalIterator*> memtable_range_del_iters;
  auto* active_range_del_iter =
      super_version->mem->NewRangeTombstoneIterator(read_opts);
  if (active_range_del_iter != nullptr) {
    memtable_range_del_iters.push_back(active_range_del_iter);
  }
  super_version->imm->AddRangeTombstoneIterators(read_opts,
                                                 &memtable_range_del_iters);
  RangeDelAggregator range_del_agg(internal_comparator_, {} /* snapshots */,
                                   false /* collapse_deletions */);
  Status status;
  {
    std::unique_ptr<InternalIterator> memtable_range_del_iter(
        NewMergingIterator(&internal_comparator_,
                           memtable_range_del_iters.empty()
                               ? nullptr
                               : &memtable_range_del_iters[0],
                           static_cast<int>(memtable_range_del_iters.size())));
    status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter));
  }
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

947
const int ColumnFamilyData::kCompactAllLevels = -1;
948
const int ColumnFamilyData::kCompactToBaseLevel = -2;
949

950
Compaction* ColumnFamilyData::CompactRange(
951 952 953
    const MutableCFOptions& mutable_cf_options, int input_level,
    int output_level, uint32_t output_path_id, const InternalKey* begin,
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
954
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
955
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
956
      output_level, output_path_id, begin, end, compaction_end, conflict);
S
sdong 已提交
957 958 959 960
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
961 962
}

963
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
964
    InstrumentedMutex* db_mutex) {
J
jsteemann 已提交
965
  SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
I
Igor Canadi 已提交
966 967
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
968 969 970 971
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
972
    sv->Unref();
973 974 975 976 977
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
978
    InstrumentedMutex* db_mutex) {
979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
997
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
998 999
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1000
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1001 1002 1003
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1004
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1028
    // storage has not been altered and no Scrape has happened. The
1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1040 1041
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1042
  db_mutex->AssertHeld();
1043
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1044 1045
}

1046 1047
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1048
    const MutableCFOptions& mutable_cf_options) {
1049
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1050
  new_superversion->db_mutex = db_mutex;
1051
  new_superversion->mutable_cf_options = mutable_cf_options;
1052 1053 1054 1055
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1056
  super_version_->version_number = super_version_number_;
1057 1058 1059 1060
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1061 1062 1063 1064 1065 1066
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1067 1068 1069 1070
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1082 1083
}

1084 1085
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1086
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1087 1088
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1089 1090 1091
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1092
    auto sv = static_cast<SuperVersion*>(ptr);
1093 1094 1095 1096 1097 1098
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1099 1100 1101
  }
}

I
Igor Canadi 已提交
1102
#ifndef ROCKSDB_LITE
1103
Status ColumnFamilyData::SetOptions(
1104 1105
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
1106 1107 1108
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
1109
    mutable_cf_options_ = new_mutable_cf_options;
1110
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1111
  }
1112
  return s;
1113
}
I
Igor Canadi 已提交
1114
#endif  // ROCKSDB_LITE
1115

S
Stream  
Shaohua Li 已提交
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

I
Igor Canadi 已提交
1134
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1135
                                 const ImmutableDBOptions* db_options,
L
Lei Jin 已提交
1136
                                 const EnvOptions& env_options,
1137
                                 Cache* table_cache,
1138
                                 WriteBufferManager* write_buffer_manager,
1139
                                 WriteController* write_controller)
1140
    : max_column_family_(0),
1141
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
1142
                                      ColumnFamilyOptions(), *db_options,
1143
                                      env_options, nullptr)),
I
Igor Canadi 已提交
1144
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1145 1146
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
1147
      env_options_(env_options),
1148
      table_cache_(table_cache),
1149
      write_buffer_manager_(write_buffer_manager),
I
Igor Canadi 已提交
1150
      write_controller_(write_controller) {
1151
  // initialize linked list
1152 1153
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1154
}
I
Igor Canadi 已提交
1155 1156

ColumnFamilySet::~ColumnFamilySet() {
1157 1158 1159
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1160 1161
    bool last_ref __attribute__((__unused__));
    last_ref = cfd->Unref();
1162
    assert(last_ref);
I
Igor Canadi 已提交
1163 1164
    delete cfd;
  }
1165 1166
  bool dummy_last_ref __attribute__((__unused__));
  dummy_last_ref = dummy_cfd_->Unref();
1167
  assert(dummy_last_ref);
1168
  delete dummy_cfd_;
I
Igor Canadi 已提交
1169 1170 1171
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1172 1173
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1185 1186 1187
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1188 1189 1190 1191 1192
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1193 1194
    return nullptr;
  }
I
Igor Canadi 已提交
1195 1196 1197 1198 1199 1200
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1201 1202 1203 1204 1205 1206
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1207 1208 1209 1210
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1211
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1212 1213 1214 1215
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1216 1217
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1218
      *db_options_, env_options_, this);
1219
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1220 1221
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1222
  // add to linked list
1223 1224 1225 1226 1227
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1228 1229 1230
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1231 1232 1233
  return new_cfd;
}

1234 1235 1236 1237
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1238
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1239 1240 1241 1242 1243 1244 1245 1246 1247
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1248
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1249
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1250
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1251 1252
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1253
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1254 1255
}

I
Igor Canadi 已提交
1256
// under a DB mutex OR from a write thread
1257
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1258 1259 1260 1261 1262 1263
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1264
  handle_.SetCFD(current_);
1265 1266
  return current_ != nullptr;
}
1267

1268 1269 1270 1271 1272 1273 1274 1275 1276 1277
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1278
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1279
  assert(current_ != nullptr);
1280
  return &handle_;
1281 1282
}

1283 1284 1285 1286 1287 1288 1289 1290 1291
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1292 1293 1294
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1295
    return column_family->GetComparator();
1296 1297 1298 1299
  }
  return nullptr;
}

I
Igor Canadi 已提交
1300
}  // namespace rocksdb