column_family.cc 51.8 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/compaction_picker_fifo.h"
S
Siying Dong 已提交
24
#include "db/compaction_picker_universal.h"
25
#include "db/db_impl.h"
26
#include "db/internal_stats.h"
27
#include "db/job_context.h"
28
#include "db/range_del_aggregator.h"
29
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
30
#include "db/version_set.h"
31
#include "db/write_controller.h"
32
#include "memtable/hash_skiplist_rep.h"
33 34
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
35
#include "table/block_based_table_factory.h"
36
#include "table/merging_iterator.h"
37
#include "util/autovector.h"
38
#include "util/compression.h"
39
#include "util/sst_file_manager_impl.h"
I
Igor Canadi 已提交
40 41 42

namespace rocksdb {

I
Igor Canadi 已提交
43
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
44
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
45
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
46 47 48 49 50 51 52
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
53 54 55 56 57
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
58 59
    // Job id == 0 means that this is not our background process, but rather
    // user thread
60 61 62
    // Need to hold some shared pointers owned by the initial_cf_options
    // before final cleaning up finishes.
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
63
    JobContext job_context(0);
64 65 66 67
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
68
    db_->FindObsoleteFiles(&job_context, false, true);
69
    mutex_->Unlock();
I
Igor Canadi 已提交
70
    if (job_context.HaveSomethingToDelete()) {
71 72 73 74 75 76 77 78
      bool defer_purge =
          db_->immutable_db_options().avoid_unnecessary_blocking_io;
      db_->PurgeObsoleteFiles(job_context, defer_purge);
      if (defer_purge) {
        mutex_->Lock();
        db_->SchedulePurge();
        mutex_->Unlock();
      }
79
    }
Y
Yueh-Hsuan Chiang 已提交
80
    job_context.Clean();
81 82 83
  }
}

84 85
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

86 87 88 89
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

90 91 92 93
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
94
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95 96
  return Status::OK();
#else
97
  (void)desc;
98 99 100 101
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

102
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103 104 105
  return cfd()->user_comparator();
}

106
void GetIntTblPropCollectorFactory(
107
    const ImmutableCFOptions& ioptions,
108 109
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
110 111
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112 113 114 115 116 117 118
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
}

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
138
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139
    if (!ZSTD_TrainDictionarySupported()) {
140
      return Status::InvalidArgument(
141 142
          "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
          "is not linked with the binary.");
143 144 145 146 147 148 149
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
150 151 152
  return Status::OK();
}

153 154 155 156 157 158
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
159 160 161 162
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
163 164 165
  return Status::OK();
}

166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
Status CheckCFPathsSupported(const DBOptions& db_options,
                             const ColumnFamilyOptions& cf_options) {
  // More than one cf_paths are supported only in universal
  // and level compaction styles. This function also checks the case
  // in which cf_paths is not specified, which results in db_paths
  // being used.
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
      (cf_options.compaction_style != kCompactionStyleLevel)) {
    if (cf_options.cf_paths.size() > 1) {
      return Status::NotSupported(
          "More than one CF paths are only supported in "
          "universal and level compaction styles. ");
    } else if (cf_options.cf_paths.empty() &&
               db_options.db_paths.size() > 1) {
      return Status::NotSupported(
          "More than one DB paths are only supported in "
          "universal and level compaction styles. ");
    }
  }
  return Status::OK();
}

188
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
189
                                    const ColumnFamilyOptions& src) {
190
  ColumnFamilyOptions result = src;
191 192
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
193
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
194
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
195 196 197
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
198 199 200 201 202 203
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
204 205 206 207
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
208 209 210 211
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

212 213 214 215 216 217 218
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
219 220 221 222 223 224

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

225 226 227
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
228 229 230
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
231 232 233 234 235 236
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
237

238 239 240 241 242
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
243 244 245 246
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
247 248 249 250 251 252 253 254
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

255 256 257 258
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

259
  if (result.level0_file_num_compaction_trigger == 0) {
260 261
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
262 263 264
    result.level0_file_num_compaction_trigger = 1;
  }

265 266 267 268
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
269 270 271 272 273 274 275 276
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
277 278 279 280 281 282 283 284 285
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
286 287 288 289 290 291 292 293
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
294
  }
295 296 297 298 299 300 301 302 303 304 305

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
#ifndef ROCKSDB_LITE
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
    DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
  }
#endif

  if (result.cf_paths.empty()) {
    result.cf_paths = db_options.db_paths;
  }

321 322
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
323
        result.cf_paths.size() > 1U) {
324 325 326 327 328 329 330
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
331

332 333 334 335
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

336 337 338
  return result;
}

339 340 341
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
342

343 344 345 346 347 348 349 350 351 352 353 354 355
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
356
  uint32_t previous_refs = refs.fetch_sub(1);
357 358
  assert(previous_refs > 0);
  return previous_refs == 1;
359 360 361 362 363 364 365
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
366 367 368
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

385 386
namespace {
void SuperVersionUnrefHandle(void* ptr) {
387 388 389 390
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
391
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
392 393 394 395 396 397 398
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
399 400 401
}
}  // anonymous namespace

402 403
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
404
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
405
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
406
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
407 408
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
409
      dummy_versions_(_dummy_versions),
410
      current_(nullptr),
411
      refs_(0),
412
      initialized_(false),
413
      dropped_(false),
414
      internal_comparator_(cf_options.comparator),
415
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
416 417
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
418 419
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
420
      write_buffer_manager_(write_buffer_manager),
421
      mem_(nullptr),
422 423
      imm_(ioptions_.min_write_buffer_number_to_merge,
           ioptions_.max_write_buffer_number_to_maintain),
424 425
      super_version_(nullptr),
      super_version_number_(0),
426
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
427 428
      next_(nullptr),
      prev_(nullptr),
429
      log_number_(0),
Z
Zhongyi Xie 已提交
430
      flush_reason_(FlushReason::kOthers),
431
      column_family_set_(column_family_set),
432
      queued_for_flush_(false),
433
      queued_for_compaction_(false),
434
      prev_compaction_needed_bytes_(0),
435 436
      allow_2pc_(db_options.allow_2pc),
      last_memtable_id_(0) {
437 438
  Ref();

439
  // Convert user defined table properties collector factories to internal ones.
440
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
441

I
Igor Canadi 已提交
442 443
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
444
    internal_stats_.reset(
445
        new InternalStats(ioptions_.num_levels, db_options.env, this));
I
Igor Canadi 已提交
446
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
447
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
448
      compaction_picker_.reset(
449
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
450 451 452 453
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
454
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
455
      compaction_picker_.reset(
456
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
457 458 459
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
460 461 462 463
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
464
#endif  // !ROCKSDB_LITE
465
    } else {
466 467 468 469
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
470 471
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
472
    }
473

474
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
475 476 477
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
478
      initial_cf_options_.Dump(ioptions_.info_log);
479
    } else {
480
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
481
    }
482
  }
483

484
  RecalculateWriteStallConditions(mutable_cf_options_);
485
}
I
Igor Canadi 已提交
486

487
// DB mutex held
I
Igor Canadi 已提交
488
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
489
  assert(refs_.load(std::memory_order_relaxed) == 0);
490 491 492 493 494 495
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
496 497 498 499
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
500
    column_family_set_->RemoveColumnFamily(this);
501 502 503 504 505 506
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

507 508
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
509
  assert(!queued_for_flush_);
510
  assert(!queued_for_compaction_);
511

I
Igor Canadi 已提交
512 513 514 515 516 517 518
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

T
Tamir Duberstein 已提交
519
    bool is_last_reference __attribute__((__unused__));
I
Igor Canadi 已提交
520 521 522 523 524 525
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
526

527 528
  if (dummy_versions_ != nullptr) {
    // List must be empty
529
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
530
    bool deleted __attribute__((__unused__));
531
    deleted = dummy_versions_->Unref();
532
    assert(deleted);
533
  }
534

535 536
  if (mem_ != nullptr) {
    delete mem_->Unref();
537
  }
538
  autovector<MemTable*> to_delete;
539
  imm_.current()->Unref(&to_delete);
540 541 542 543 544
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
545 546 547 548 549 550 551 552 553 554
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

555
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
556
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
557 558
}

559 560 561 562
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
S
Siying Dong 已提交
563 564 565
    autovector<MemTable*> empty_list;
    auto imm_prep_log =
        imm()->PrecomputeMinLogContainingPrepSection(empty_list);
566 567 568 569 570 571 572 573 574 575 576 577 578 579
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
580 581 582 583
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
584 585

namespace {
S
Siying Dong 已提交
586
// If penalize_stop is true, we further reduce slowdown rate.
587
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
588 589
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
590
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
591
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
592

593
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
616 617 618 619
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
620
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
621 622 623 624 625
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
626 627 628
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
629 630 631 632 633 634 635 636
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
637 638 639
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
640
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
641
                                         kDecSlowdownRatio);
642 643 644 645 646 647 648
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
649 650 651 652 653 654

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

655 656 657 658
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

659 660
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
661

662 663
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
664 665 666 667 668 669
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

670 671
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
672
  // Or twice as compaction trigger, if it is smaller.
673 674 675 676
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
677 678
    // res fits in int
    return static_cast<int>(res);
679
  }
680
}
681 682
}  // namespace

683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
    const MutableCFOptions& mutable_cf_options) {
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
                 mutable_cf_options.max_write_buffer_number - 1) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

718
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
719
      const MutableCFOptions& mutable_cf_options) {
720
  auto write_stall_condition = WriteStallCondition::kNormal;
721
  if (current_ != nullptr) {
S
sdong 已提交
722
    auto* vstorage = current_->storage_info();
723
    auto write_controller = column_family_set_->write_controller_;
724 725
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
726

727 728 729 730 731 732
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
733 734 735
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

736 737
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
738
      write_controller_token_ = write_controller->GetStopToken();
739
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
740 741
      ROCKS_LOG_WARN(
          ioptions_.info_log,
742
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
743
          "(waiting for flush), max_write_buffer_number is set to %d",
744
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
745
          mutable_cf_options.max_write_buffer_number);
746 747
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
748
      write_controller_token_ = write_controller->GetStopToken();
749
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
750 751
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
752
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
753
      }
754 755 756
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
757 758
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
759 760
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
761
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
762 763
      ROCKS_LOG_WARN(
          ioptions_.info_log,
764 765
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
766
          name_.c_str(), compaction_needed_bytes);
767 768
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
769
      write_controller_token_ =
770
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
771
                     prev_compaction_needed_bytes_, was_stopped,
772
                     mutable_cf_options.disable_auto_compactions);
773
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
774 775
      ROCKS_LOG_WARN(
          ioptions_.info_log,
776 777 778 779 780 781
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
782 783
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
784 785 786
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
787
      write_controller_token_ =
788
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
789
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
790
                     mutable_cf_options.disable_auto_compactions);
791 792
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
793 794
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
795
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
796
      }
797 798 799 800 801
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
802 803
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
804 805 806 807 808 809 810 811 812 813 814
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

815
      write_controller_token_ =
816
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
817
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
818
                     mutable_cf_options.disable_auto_compactions);
819
      internal_stats_->AddCFStats(
820
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
821 822
      ROCKS_LOG_WARN(
          ioptions_.info_log,
823
          "[%s] Stalling writes because of estimated pending compaction "
824 825 826
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
827
    } else {
828
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
829 830 831 832 833 834
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
835
        ROCKS_LOG_INFO(
836
            ioptions_.info_log,
S
Siying Dong 已提交
837 838 839 840 841 842 843 844 845 846 847 848
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
849
          ROCKS_LOG_INFO(
850
              ioptions_.info_log,
S
Siying Dong 已提交
851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
866 867 868 869 870 871
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
872
      }
873
    }
874
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
875
  }
876
  return write_stall_condition;
877 878
}

L
Lei Jin 已提交
879
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
880
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
881 882
}

I
Igor Canadi 已提交
883 884 885
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
886

887 888 889 890
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

891 892 893 894
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

895 896 897 898
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  return current_->GetSstFilesSize();
}

899
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
900 901
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
902
                      write_buffer_manager_, earliest_seq, id_);
903 904 905
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
906
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
907 908
  if (mem_ != nullptr) {
    delete mem_->Unref();
909
  }
A
agiardullo 已提交
910
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
911 912 913
  mem_->Ref();
}

914 915 916 917
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

918 919
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
920
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
921
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
922 923 924
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
925
  return result;
926 927
}

928 929 930 931 932 933 934
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

950
  auto read_seq = super_version->current->version_set()->LastSequence();
951
  ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
952
  auto* active_range_del_iter =
953 954 955 956 957 958
      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
  range_del_agg.AddTombstones(
      std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
  super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
                                                 &range_del_agg);

959
  Status status;
960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

987
const int ColumnFamilyData::kCompactAllLevels = -1;
988
const int ColumnFamilyData::kCompactToBaseLevel = -2;
989

990
Compaction* ColumnFamilyData::CompactRange(
991
    const MutableCFOptions& mutable_cf_options, int input_level,
992 993 994
    int output_level, uint32_t output_path_id, uint32_t max_subcompactions,
    const InternalKey* begin, const InternalKey* end,
    InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
995
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
996
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
997 998
      output_level, output_path_id, max_subcompactions, begin, end,
      compaction_end, conflict);
S
sdong 已提交
999 1000 1001 1002
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
1003 1004
}

1005
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
1006
    InstrumentedMutex* db_mutex) {
J
jsteemann 已提交
1007
  SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
I
Igor Canadi 已提交
1008 1009
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
1010 1011 1012 1013
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
1014
    sv->Unref();
1015 1016 1017 1018 1019
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
1020
    InstrumentedMutex* db_mutex) {
1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
1039
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1040 1041
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1042
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1043 1044 1045
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1046
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1070
    // storage has not been altered and no Scrape has happened. The
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1082 1083
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1084
  db_mutex->AssertHeld();
1085
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1086 1087
}

1088 1089
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1090
    const MutableCFOptions& mutable_cf_options) {
1091
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1092
  new_superversion->db_mutex = db_mutex;
1093
  new_superversion->mutable_cf_options = mutable_cf_options;
1094 1095 1096 1097
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1098
  super_version_->version_number = super_version_number_;
1099 1100 1101 1102
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1103 1104 1105 1106 1107 1108
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1109 1110 1111 1112
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1124 1125
}

1126 1127
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1128
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1129 1130
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1131 1132 1133
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1134
    auto sv = static_cast<SuperVersion*>(ptr);
1135 1136 1137 1138 1139 1140
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1141 1142 1143
  }
}

I
Igor Canadi 已提交
1144
#ifndef ROCKSDB_LITE
1145
Status ColumnFamilyData::SetOptions(
1146 1147
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
1148 1149 1150
  Status s =
      GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                   ioptions_.info_log, &new_mutable_cf_options);
1151
  if (s.ok()) {
1152
    mutable_cf_options_ = new_mutable_cf_options;
1153
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1154
  }
1155
  return s;
1156
}
I
Igor Canadi 已提交
1157
#endif  // ROCKSDB_LITE
1158

S
Stream  
Shaohua Li 已提交
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
Status ColumnFamilyData::AddDirectories() {
  Status s;
  assert(data_dirs_.empty());
  for (auto& p : ioptions_.cf_paths) {
    std::unique_ptr<Directory> path_directory;
    s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
    if (!s.ok()) {
      return s;
    }
    assert(path_directory != nullptr);
    data_dirs_.emplace_back(path_directory.release());
  }
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
  return s;
}

Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
  if (data_dirs_.empty()) {
    return nullptr;
  }

  assert(path_id < data_dirs_.size());
  return data_dirs_[path_id].get();
}

I
Igor Canadi 已提交
1202
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1203
                                 const ImmutableDBOptions* db_options,
L
Lei Jin 已提交
1204
                                 const EnvOptions& env_options,
1205
                                 Cache* table_cache,
1206
                                 WriteBufferManager* write_buffer_manager,
1207
                                 WriteController* write_controller)
1208
    : max_column_family_(0),
1209
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
1210
                                      ColumnFamilyOptions(), *db_options,
1211
                                      env_options, nullptr)),
I
Igor Canadi 已提交
1212
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1213 1214
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
1215
      env_options_(env_options),
1216
      table_cache_(table_cache),
1217
      write_buffer_manager_(write_buffer_manager),
I
Igor Canadi 已提交
1218
      write_controller_(write_controller) {
1219
  // initialize linked list
1220 1221
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1222
}
I
Igor Canadi 已提交
1223 1224

ColumnFamilySet::~ColumnFamilySet() {
1225 1226 1227
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1228 1229
    bool last_ref __attribute__((__unused__));
    last_ref = cfd->Unref();
1230
    assert(last_ref);
I
Igor Canadi 已提交
1231 1232
    delete cfd;
  }
1233 1234
  bool dummy_last_ref __attribute__((__unused__));
  dummy_last_ref = dummy_cfd_->Unref();
1235
  assert(dummy_last_ref);
1236
  delete dummy_cfd_;
I
Igor Canadi 已提交
1237 1238 1239
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1240 1241
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1253 1254 1255
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1256 1257 1258 1259 1260
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1261 1262
    return nullptr;
  }
I
Igor Canadi 已提交
1263 1264 1265 1266 1267 1268
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1269 1270 1271 1272 1273 1274
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1275 1276 1277 1278
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1279
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1280 1281 1282 1283
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1284 1285
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1286
      *db_options_, env_options_, this);
1287
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1288 1289
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1290
  // add to linked list
1291 1292 1293 1294 1295
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1296 1297 1298
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1299 1300 1301
  return new_cfd;
}

1302 1303 1304 1305
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1306
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1307 1308 1309 1310 1311 1312 1313 1314 1315
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1316
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1317
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1318
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1319 1320
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1321
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1322 1323
}

I
Igor Canadi 已提交
1324
// under a DB mutex OR from a write thread
1325
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1326 1327 1328 1329 1330 1331
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1332
  handle_.SetCFD(current_);
1333 1334
  return current_ != nullptr;
}
1335

1336 1337 1338 1339 1340 1341 1342 1343 1344 1345
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1346
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1347
  assert(current_ != nullptr);
1348
  return &handle_;
1349 1350
}

1351 1352 1353 1354 1355 1356 1357 1358 1359
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1360 1361 1362
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1363
    return column_family->GetComparator();
1364 1365 1366 1367
  }
  return nullptr;
}

I
Igor Canadi 已提交
1368
}  // namespace rocksdb