column_family.cc 52.3 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
S
Siying Dong 已提交
23
#include "db/compaction_picker_universal.h"
24
#include "db/db_impl.h"
25
#include "db/internal_stats.h"
26
#include "db/job_context.h"
27
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
28
#include "db/version_set.h"
29
#include "db/write_controller.h"
30
#include "memtable/hash_skiplist_rep.h"
31 32
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
33
#include "table/block_based_table_factory.h"
34
#include "table/merging_iterator.h"
35
#include "util/autovector.h"
36
#include "util/compression.h"
37
#include "util/sst_file_manager_impl.h"
I
Igor Canadi 已提交
38 39 40

namespace rocksdb {

I
Igor Canadi 已提交
41
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
43
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
44 45 46 47 48 49 50
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
51 52 53 54 55
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
56 57
    // Job id == 0 means that this is not our background process, but rather
    // user thread
58 59 60
    // Need to hold some shared pointers owned by the initial_cf_options
    // before final cleaning up finishes.
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61
    JobContext job_context(0);
62 63 64 65
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
66
    db_->FindObsoleteFiles(&job_context, false, true);
67
    mutex_->Unlock();
I
Igor Canadi 已提交
68 69
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
70
    }
Y
Yueh-Hsuan Chiang 已提交
71
    job_context.Clean();
72 73 74
  }
}

75 76
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

77 78 79 80
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

81 82 83 84
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
85
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
86 87
  return Status::OK();
#else
88
  (void)desc;
89 90 91 92
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

93
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
94 95 96
  return cfd()->user_comparator();
}

97
void GetIntTblPropCollectorFactory(
98
    const ImmutableCFOptions& ioptions,
99 100
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
101 102
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
103 104 105 106 107 108 109 110 111 112
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
    if (!CompressionTypeSupported(CompressionType::kZSTD)) {
      // Dictionary trainer is available since v0.6.1, but ZSTD was marked
      // stable only since v0.8.0. For now we enable the feature in stable
      // versions only.
      return Status::InvalidArgument(
          "zstd dictionary trainer cannot be used because " +
          CompressionTypeToString(CompressionType::kZSTD) +
          " is not linked with the binary.");
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
148 149 150
  return Status::OK();
}

151 152 153 154 155 156
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
157 158 159 160
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
161 162 163
  return Status::OK();
}

164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
Status CheckCFPathsSupported(const DBOptions& db_options,
                             const ColumnFamilyOptions& cf_options) {
  // More than one cf_paths are supported only in universal
  // and level compaction styles. This function also checks the case
  // in which cf_paths is not specified, which results in db_paths
  // being used.
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
      (cf_options.compaction_style != kCompactionStyleLevel)) {
    if (cf_options.cf_paths.size() > 1) {
      return Status::NotSupported(
          "More than one CF paths are only supported in "
          "universal and level compaction styles. ");
    } else if (cf_options.cf_paths.empty() &&
               db_options.db_paths.size() > 1) {
      return Status::NotSupported(
          "More than one DB paths are only supported in "
          "universal and level compaction styles. ");
    }
  }
  return Status::OK();
}

186
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
187
                                    const ColumnFamilyOptions& src) {
188
  ColumnFamilyOptions result = src;
189 190
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
191
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
192
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
193 194 195
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
196 197 198 199 200 201
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
202 203 204 205
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
206 207 208 209
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

210 211 212 213 214 215 216
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
217 218 219 220 221 222

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

223 224 225
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
226 227 228
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
229 230 231 232 233 234
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
235

236 237 238 239 240
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
241 242 243 244
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
245 246 247 248 249 250 251 252
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

253 254 255 256
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

257
  if (result.level0_file_num_compaction_trigger == 0) {
258 259
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
260 261 262
    result.level0_file_num_compaction_trigger = 1;
  }

263 264 265 266
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
267 268 269 270 271 272 273 274
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
275 276 277 278 279 280 281 282 283
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
284 285 286 287 288 289 290 291
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
292
  }
293 294 295 296 297 298 299 300 301 302 303

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
#ifndef ROCKSDB_LITE
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
    DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
  }
#endif

  if (result.cf_paths.empty()) {
    result.cf_paths = db_options.db_paths;
  }

319 320
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
321
        result.cf_paths.size() > 1U) {
322 323 324 325 326 327 328
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
329

330 331 332 333
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

334 335 336
  return result;
}

337 338 339
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
340

341 342 343 344 345 346 347 348 349 350 351 352 353
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
354
  uint32_t previous_refs = refs.fetch_sub(1);
355 356
  assert(previous_refs > 0);
  return previous_refs == 1;
357 358 359 360 361 362 363
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
364 365 366
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

383 384
namespace {
void SuperVersionUnrefHandle(void* ptr) {
385 386 387 388
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
389
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
390 391 392 393 394 395 396
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
397 398 399
}
}  // anonymous namespace

400 401
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
402
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
403
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
404
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
405 406
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
407
      dummy_versions_(_dummy_versions),
408
      current_(nullptr),
409
      refs_(0),
410
      initialized_(false),
411
      dropped_(false),
412
      internal_comparator_(cf_options.comparator),
413
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
414 415
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
416 417
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
418
      write_buffer_manager_(write_buffer_manager),
419
      mem_(nullptr),
420 421
      imm_(ioptions_.min_write_buffer_number_to_merge,
           ioptions_.max_write_buffer_number_to_maintain),
422 423
      super_version_(nullptr),
      super_version_number_(0),
424
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
425 426
      next_(nullptr),
      prev_(nullptr),
427
      log_number_(0),
Z
Zhongyi Xie 已提交
428
      flush_reason_(FlushReason::kOthers),
429
      column_family_set_(column_family_set),
430
      queued_for_flush_(false),
431
      queued_for_compaction_(false),
432
      prev_compaction_needed_bytes_(0),
433 434
      allow_2pc_(db_options.allow_2pc),
      last_memtable_id_(0) {
435 436
  Ref();

437
  // Convert user defined table properties collector factories to internal ones.
438
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
439

I
Igor Canadi 已提交
440 441
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
442
    internal_stats_.reset(
443
        new InternalStats(ioptions_.num_levels, db_options.env, this));
I
Igor Canadi 已提交
444
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
445
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
446
      compaction_picker_.reset(
447
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
448 449 450 451
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
452
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
453
      compaction_picker_.reset(
454
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
455 456 457
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
458 459 460 461
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
462
#endif  // !ROCKSDB_LITE
463
    } else {
464 465 466 467
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
468 469
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
470
    }
471

472
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
473 474 475
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
476
      initial_cf_options_.Dump(ioptions_.info_log);
477
    } else {
478
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
479
    }
480
  }
481

482
  RecalculateWriteStallConditions(mutable_cf_options_);
483
}
I
Igor Canadi 已提交
484

485
// DB mutex held
I
Igor Canadi 已提交
486
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
487
  assert(refs_.load(std::memory_order_relaxed) == 0);
488 489 490 491 492 493
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
494 495 496 497
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
498
    column_family_set_->RemoveColumnFamily(this);
499 500 501 502 503 504
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

505 506
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
507
  assert(!queued_for_flush_);
508
  assert(!queued_for_compaction_);
509

I
Igor Canadi 已提交
510 511 512 513 514 515 516
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

T
Tamir Duberstein 已提交
517
    bool is_last_reference __attribute__((__unused__));
I
Igor Canadi 已提交
518 519 520 521 522 523
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
524

525 526
  if (dummy_versions_ != nullptr) {
    // List must be empty
527
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
528
    bool deleted __attribute__((__unused__));
529
    deleted = dummy_versions_->Unref();
530
    assert(deleted);
531
  }
532

533 534
  if (mem_ != nullptr) {
    delete mem_->Unref();
535
  }
536
  autovector<MemTable*> to_delete;
537
  imm_.current()->Unref(&to_delete);
538 539 540 541 542
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
543 544 545 546 547 548 549 550 551 552
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

553
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
554
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
555 556
}

557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
    auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
576 577 578 579
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
580 581

namespace {
S
Siying Dong 已提交
582
// If penalize_stop is true, we further reduce slowdown rate.
583
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
584 585
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
586
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
587
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
588

589
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
612 613 614 615
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
616
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
617 618 619 620 621
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
622 623 624
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
625 626 627 628 629 630 631 632
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
633 634 635
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
636
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
637
                                         kDecSlowdownRatio);
638 639 640 641 642 643 644
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
645 646 647 648 649 650

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

651 652 653 654
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

655 656
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
657

658 659
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
660 661 662 663 664 665
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

666 667
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
668
  // Or twice as compaction trigger, if it is smaller.
669 670 671 672
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
673 674
    // res fits in int
    return static_cast<int>(res);
675
  }
676
}
677 678
}  // namespace

679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
    const MutableCFOptions& mutable_cf_options) {
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
                 mutable_cf_options.max_write_buffer_number - 1) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

714
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
715
      const MutableCFOptions& mutable_cf_options) {
716
  auto write_stall_condition = WriteStallCondition::kNormal;
717
  if (current_ != nullptr) {
S
sdong 已提交
718
    auto* vstorage = current_->storage_info();
719
    auto write_controller = column_family_set_->write_controller_;
720 721
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
722

723 724 725 726 727 728
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
729 730 731
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

732 733
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
734
      write_controller_token_ = write_controller->GetStopToken();
735
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
736 737
      ROCKS_LOG_WARN(
          ioptions_.info_log,
738
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
739
          "(waiting for flush), max_write_buffer_number is set to %d",
740
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
741
          mutable_cf_options.max_write_buffer_number);
742 743
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
744
      write_controller_token_ = write_controller->GetStopToken();
745
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
746 747
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
748
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
749
      }
750 751 752
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
753 754
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
755 756
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
757
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
758 759
      ROCKS_LOG_WARN(
          ioptions_.info_log,
760 761
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
762
          name_.c_str(), compaction_needed_bytes);
763 764
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
765
      write_controller_token_ =
766
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
767
                     prev_compaction_needed_bytes_, was_stopped,
768
                     mutable_cf_options.disable_auto_compactions);
769
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
770 771
      ROCKS_LOG_WARN(
          ioptions_.info_log,
772 773 774 775 776 777
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
778 779
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
780 781 782
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
783
      write_controller_token_ =
784
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
785
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
786
                     mutable_cf_options.disable_auto_compactions);
787 788
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
789 790
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
791
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
792
      }
793 794 795 796 797
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
798 799
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
800 801 802 803 804 805 806 807 808 809 810
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

811
      write_controller_token_ =
812
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
813
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
814
                     mutable_cf_options.disable_auto_compactions);
815
      internal_stats_->AddCFStats(
816
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
817 818
      ROCKS_LOG_WARN(
          ioptions_.info_log,
819
          "[%s] Stalling writes because of estimated pending compaction "
820 821 822
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
823
    } else {
824
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
825 826 827 828 829 830
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
831
        ROCKS_LOG_INFO(
832
            ioptions_.info_log,
S
Siying Dong 已提交
833 834 835 836 837 838 839 840 841 842 843 844
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
845
          ROCKS_LOG_INFO(
846
              ioptions_.info_log,
S
Siying Dong 已提交
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
862 863 864 865 866 867
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
868
      }
869
    }
870
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
871
  }
872
  return write_stall_condition;
873 874
}

L
Lei Jin 已提交
875
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
876
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
877 878
}

I
Igor Canadi 已提交
879 880 881
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
882

883 884 885 886
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

887 888 889 890
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

891 892 893 894
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  return current_->GetSstFilesSize();
}

895
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
896 897
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
898
                      write_buffer_manager_, earliest_seq, id_);
899 900 901
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
902
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
903 904
  if (mem_ != nullptr) {
    delete mem_->Unref();
905
  }
A
agiardullo 已提交
906
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
907 908 909
  mem_->Ref();
}

910 911 912 913
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

914 915
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
916
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
917
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
918 919 920
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
921
  return result;
922 923
}

924 925 926 927 928 929 930
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

  std::vector<InternalIterator*> memtable_range_del_iters;
  auto* active_range_del_iter =
      super_version->mem->NewRangeTombstoneIterator(read_opts);
  if (active_range_del_iter != nullptr) {
    memtable_range_del_iters.push_back(active_range_del_iter);
  }
  super_version->imm->AddRangeTombstoneIterators(read_opts,
                                                 &memtable_range_del_iters);
  RangeDelAggregator range_del_agg(internal_comparator_, {} /* snapshots */,
                                   false /* collapse_deletions */);
  Status status;
  {
    std::unique_ptr<InternalIterator> memtable_range_del_iter(
        NewMergingIterator(&internal_comparator_,
                           memtable_range_del_iters.empty()
                               ? nullptr
                               : &memtable_range_del_iters[0],
                           static_cast<int>(memtable_range_del_iters.size())));
    status = range_del_agg.AddTombstones(std::move(memtable_range_del_iter));
  }
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

993
const int ColumnFamilyData::kCompactAllLevels = -1;
994
const int ColumnFamilyData::kCompactToBaseLevel = -2;
995

996
Compaction* ColumnFamilyData::CompactRange(
997
    const MutableCFOptions& mutable_cf_options, int input_level,
998 999 1000
    int output_level, uint32_t output_path_id, uint32_t max_subcompactions,
    const InternalKey* begin, const InternalKey* end,
    InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
1001
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
1002
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
1003 1004
      output_level, output_path_id, max_subcompactions, begin, end,
      compaction_end, conflict);
S
sdong 已提交
1005 1006 1007 1008
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
1009 1010
}

1011
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
1012
    InstrumentedMutex* db_mutex) {
J
jsteemann 已提交
1013
  SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
I
Igor Canadi 已提交
1014 1015
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
1016 1017 1018 1019
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
1020
    sv->Unref();
1021 1022 1023 1024 1025
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
1026
    InstrumentedMutex* db_mutex) {
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
1045
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1046 1047
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1048
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1049 1050 1051
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1052
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1076
    // storage has not been altered and no Scrape has happened. The
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1088 1089
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1090
  db_mutex->AssertHeld();
1091
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1092 1093
}

1094 1095
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1096
    const MutableCFOptions& mutable_cf_options) {
1097
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1098
  new_superversion->db_mutex = db_mutex;
1099
  new_superversion->mutable_cf_options = mutable_cf_options;
1100 1101 1102 1103
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1104
  super_version_->version_number = super_version_number_;
1105 1106 1107 1108
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1109 1110 1111 1112 1113 1114
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1115 1116 1117 1118
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1130 1131
}

1132 1133
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1134
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1135 1136
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1137 1138 1139
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1140
    auto sv = static_cast<SuperVersion*>(ptr);
1141 1142 1143 1144 1145 1146
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1147 1148 1149
  }
}

I
Igor Canadi 已提交
1150
#ifndef ROCKSDB_LITE
1151
Status ColumnFamilyData::SetOptions(
1152 1153
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
1154 1155 1156
  Status s =
      GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                   ioptions_.info_log, &new_mutable_cf_options);
1157
  if (s.ok()) {
1158
    mutable_cf_options_ = new_mutable_cf_options;
1159
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1160
  }
1161
  return s;
1162
}
I
Igor Canadi 已提交
1163
#endif  // ROCKSDB_LITE
1164

S
Stream  
Shaohua Li 已提交
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
Status ColumnFamilyData::AddDirectories() {
  Status s;
  assert(data_dirs_.empty());
  for (auto& p : ioptions_.cf_paths) {
    std::unique_ptr<Directory> path_directory;
    s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
    if (!s.ok()) {
      return s;
    }
    assert(path_directory != nullptr);
    data_dirs_.emplace_back(path_directory.release());
  }
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
  return s;
}

Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
  if (data_dirs_.empty()) {
    return nullptr;
  }

  assert(path_id < data_dirs_.size());
  return data_dirs_[path_id].get();
}

I
Igor Canadi 已提交
1208
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1209
                                 const ImmutableDBOptions* db_options,
L
Lei Jin 已提交
1210
                                 const EnvOptions& env_options,
1211
                                 Cache* table_cache,
1212
                                 WriteBufferManager* write_buffer_manager,
1213
                                 WriteController* write_controller)
1214
    : max_column_family_(0),
1215
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
1216
                                      ColumnFamilyOptions(), *db_options,
1217
                                      env_options, nullptr)),
I
Igor Canadi 已提交
1218
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1219 1220
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
1221
      env_options_(env_options),
1222
      table_cache_(table_cache),
1223
      write_buffer_manager_(write_buffer_manager),
I
Igor Canadi 已提交
1224
      write_controller_(write_controller) {
1225
  // initialize linked list
1226 1227
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1228
}
I
Igor Canadi 已提交
1229 1230

ColumnFamilySet::~ColumnFamilySet() {
1231 1232 1233
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1234 1235
    bool last_ref __attribute__((__unused__));
    last_ref = cfd->Unref();
1236
    assert(last_ref);
I
Igor Canadi 已提交
1237 1238
    delete cfd;
  }
1239 1240
  bool dummy_last_ref __attribute__((__unused__));
  dummy_last_ref = dummy_cfd_->Unref();
1241
  assert(dummy_last_ref);
1242
  delete dummy_cfd_;
I
Igor Canadi 已提交
1243 1244 1245
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1246 1247
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1259 1260 1261
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1262 1263 1264 1265 1266
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1267 1268
    return nullptr;
  }
I
Igor Canadi 已提交
1269 1270 1271 1272 1273 1274
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1275 1276 1277 1278 1279 1280
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1281 1282 1283 1284
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1285
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1286 1287 1288 1289
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1290 1291
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1292
      *db_options_, env_options_, this);
1293
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1294 1295
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1296
  // add to linked list
1297 1298 1299 1300 1301
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1302 1303 1304
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1305 1306 1307
  return new_cfd;
}

1308 1309 1310 1311
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1312
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1313 1314 1315 1316 1317 1318 1319 1320 1321
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1322
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1323
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1324
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1325 1326
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1327
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1328 1329
}

I
Igor Canadi 已提交
1330
// under a DB mutex OR from a write thread
1331
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1332 1333 1334 1335 1336 1337
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1338
  handle_.SetCFD(current_);
1339 1340
  return current_ != nullptr;
}
1341

1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1352
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1353
  assert(current_ != nullptr);
1354
  return &handle_;
1355 1356
}

1357 1358 1359 1360 1361 1362 1363 1364 1365
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1366 1367 1368
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1369
    return column_family->GetComparator();
1370 1371 1372 1373
  }
  return nullptr;
}

I
Igor Canadi 已提交
1374
}  // namespace rocksdb