column_family.cc 37.7 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5 6 7 8 9
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24
#include "db/internal_stats.h"
25
#include "db/job_context.h"
26
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28
#include "db/write_controller.h"
29
#include "db/writebuffer.h"
30
#include "memtable/hash_skiplist_rep.h"
31
#include "util/autovector.h"
32
#include "util/compression.h"
33
#include "util/options_helper.h"
34
#include "util/thread_status_util.h"
35
#include "util/xfunc.h"
I
Igor Canadi 已提交
36 37 38

namespace rocksdb {

I
Igor Canadi 已提交
39
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
40
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
41
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
42 43 44 45 46 47 48
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
49 50 51
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
52 53 54 55
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
56
    db_->FindObsoleteFiles(&job_context, false, true);
57
    mutex_->Unlock();
I
Igor Canadi 已提交
58 59
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
60
    }
Y
Yueh-Hsuan Chiang 已提交
61
    job_context.Clean();
62 63 64
  }
}

65 66
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

67 68 69 70
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

71 72 73 74 75 76 77 78 79 80 81 82 83 84
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
  *desc = ColumnFamilyDescriptor(
      cfd()->GetName(),
      BuildColumnFamilyOptions(*cfd()->options(),
                               *cfd()->GetLatestMutableCFOptions()));
  return Status::OK();
#else
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

85 86 87 88
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
void GetIntTblPropCollectorFactory(
    const ColumnFamilyOptions& cf_options,
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
  auto& collector_factories = cf_options.table_properties_collector_factories;
  for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
  return Status::OK();
}

127 128 129 130 131 132
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
133 134 135 136
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
137 138 139
  return Status::OK();
}

140 141 142
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
                                    const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
143 144
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
145 146
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
147
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
148
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
149 150 151
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
152 153 154 155 156 157
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
158 159 160 161
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
162 163 164 165 166 167 168
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
169 170 171
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
172 173 174
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
175 176 177 178 179 180
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
181 182 183 184 185 186 187 188 189
  XFUNC_TEST("memtablelist_history", "transaction_xftest_SanitizeOptions",
             xf_transaction_set_memtable_history1,
             xf_transaction_set_memtable_history,
             &result.max_write_buffer_number_to_maintain);
  XFUNC_TEST("memtablelist_history_clear", "transaction_xftest_SanitizeOptions",
             xf_transaction_clear_memtable_history1,
             xf_transaction_clear_memtable_history,
             &result.max_write_buffer_number_to_maintain);

190 191 192 193 194
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
195 196 197 198
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
199 200 201 202 203 204 205 206 207
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

208 209 210 211 212 213
  if (result.level0_file_num_compaction_trigger == 0) {
    Warn(db_options.info_log.get(),
         "level0_file_num_compaction_trigger cannot be 0");
    result.level0_file_num_compaction_trigger = 1;
  }

214 215 216 217
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
218
    Warn(db_options.info_log.get(),
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
         "This condition must be satisfied: "
         "level0_stop_writes_trigger(%d) >= "
         "level0_slowdown_writes_trigger(%d) >= "
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
235
    Warn(db_options.info_log.get(),
236 237 238 239 240 241 242 243
         "Adjust the value to "
         "level0_stop_writes_trigger(%d)"
         "level0_slowdown_writes_trigger(%d)"
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
  }
244 245 246 247 248 249 250 251 252 253 254

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

255 256 257 258 259 260 261 262 263 264
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
265

266 267 268
  return result;
}

269 270 271
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
272

273 274 275 276 277 278 279 280 281 282 283 284 285
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
286
  uint32_t previous_refs = refs.fetch_sub(1);
287 288
  assert(previous_refs > 0);
  return previous_refs == 1;
289 290 291 292 293 294 295
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
296 297 298
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

315 316
namespace {
void SuperVersionUnrefHandle(void* ptr) {
317 318 319 320
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
321 322 323 324 325 326 327 328 329 330
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

331 332 333 334 335
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
336 337
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
338
      dummy_versions_(_dummy_versions),
339
      current_(nullptr),
340 341
      refs_(0),
      dropped_(false),
342
      internal_comparator_(cf_options.comparator),
343 344
      options_(*db_options,
               SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
L
Lei Jin 已提交
345
      ioptions_(options_),
346
      mutable_cf_options_(options_, ioptions_),
347
      write_buffer_(write_buffer),
348
      mem_(nullptr),
349 350
      imm_(options_.min_write_buffer_number_to_merge,
           options_.max_write_buffer_number_to_maintain),
351 352
      super_version_(nullptr),
      super_version_number_(0),
353
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
354 355
      next_(nullptr),
      prev_(nullptr),
356
      log_number_(0),
357 358
      column_family_set_(column_family_set),
      pending_flush_(false),
359 360
      pending_compaction_(false),
      prev_compaction_needed_bytes_(0) {
361 362
  Ref();

363 364 365
  // Convert user defined table properties collector factories to internal ones.
  GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);

I
Igor Canadi 已提交
366 367
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
368
    internal_stats_.reset(
369
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
370
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
371
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
372
      compaction_picker_.reset(
373
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
374 375 376 377
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
378
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
379
      compaction_picker_.reset(
380
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
381 382 383 384 385 386 387
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
388
#endif  // !ROCKSDB_LITE
389 390 391 392 393 394 395
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
396
    }
397

398 399 400
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
401
      options_.DumpCFOptions(ioptions_.info_log);
402 403 404 405
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
406
  }
407

408
  RecalculateWriteStallConditions(mutable_cf_options_);
409
}
I
Igor Canadi 已提交
410

411
// DB mutex held
I
Igor Canadi 已提交
412
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
413
  assert(refs_.load(std::memory_order_relaxed) == 0);
414 415 416 417 418 419
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
420 421 422 423
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
424
    column_family_set_->RemoveColumnFamily(this);
425 426 427 428 429 430
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

431 432 433 434 435
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
436 437 438 439 440 441 442 443 444 445 446 447 448 449
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
450

451 452
  if (dummy_versions_ != nullptr) {
    // List must be empty
453 454 455
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
456
  }
457

458 459
  if (mem_ != nullptr) {
    delete mem_->Unref();
460
  }
461
  autovector<MemTable*> to_delete;
462
  imm_.current()->Unref(&to_delete);
463 464 465 466 467
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
468 469 470 471 472 473 474 475 476 477
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
const double kSlowdownRatio = 1.2;

namespace {
std::unique_ptr<WriteControllerToken> SetupDelay(
    uint64_t max_write_rate, WriteController* write_controller,
    uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes,
    bool auto_comapctions_disabled) {
  const uint64_t kMinWriteRate = 1024u;  // Minimum write rate 1KB/s.

  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
    if (prev_compaction_neeed_bytes > 0 &&
        prev_compaction_neeed_bytes <= compaction_needed_bytes) {
S
Siying Dong 已提交
511 512
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) /
                                         kSlowdownRatio);
513 514 515 516 517 518 519
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_neeed_bytes > compaction_needed_bytes) {
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
520 521
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kSlowdownRatio);
522 523 524 525 526 527 528
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
  // Or twice as compaction trigger, if it is smaller.
  return std::min(level0_file_num_compaction_trigger * 2,
                  level0_file_num_compaction_trigger +
                      (level0_slowdown_writes_trigger -
                       level0_file_num_compaction_trigger) /
                          4);
}
544 545
}  // namespace

546 547
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
548
  if (current_ != nullptr) {
S
sdong 已提交
549
    auto* vstorage = current_->storage_info();
550
    auto write_controller = column_family_set_->write_controller_;
551 552
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
553

554
    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
555 556
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
557
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
558
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
559
          "(waiting for flush), max_write_buffer_number is set to %d",
560
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
561
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
562
    } else if (vstorage->l0_delay_trigger_count() >=
563
               mutable_cf_options.level0_stop_writes_trigger) {
564
      write_controller_token_ = write_controller->GetStopToken();
565 566 567 568 569
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
      }
570
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
571
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
572
          name_.c_str(), vstorage->l0_delay_trigger_count());
573
    } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
574
               compaction_needed_bytes >=
575 576 577 578 579
                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
          InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
580 581
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
582
          name_.c_str(), compaction_needed_bytes);
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597
    } else if (mutable_cf_options.max_write_buffer_number > 3 &&
               imm()->NumNotFlushed() >=
                   mutable_cf_options.max_write_buffer_number - 1) {
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
598
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
599
               vstorage->l0_delay_trigger_count() >=
600
                   mutable_cf_options.level0_slowdown_writes_trigger) {
601 602 603 604
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
605 606 607 608 609
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
      }
610
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
611 612 613 614
          "[%s] Stalling writes because we have %d level-0 files "
          "rate %" PRIu64,
          name_.c_str(), vstorage->l0_delay_trigger_count(),
          write_controller->delayed_write_rate());
615 616 617
    } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
               vstorage->estimated_compaction_needed_bytes() >=
                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
618 619 620 621
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
622 623
      internal_stats_->AddCFStats(
          InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
624
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
625
          "[%s] Stalling writes because of estimated pending compaction "
626 627 628
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651
    } else if (vstorage->l0_delay_trigger_count() >=
               GetL0ThresholdSpeedupCompaction(
                   mutable_cf_options.level0_file_num_compaction_trigger,
                   mutable_cf_options.level0_slowdown_writes_trigger)) {
      write_controller_token_ = write_controller->GetCompactionPressureToken();
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "[%s] Increasing compaction threads because we have %d level-0 "
          "files ",
          name_.c_str(), vstorage->l0_delay_trigger_count());
    } else if (vstorage->estimated_compaction_needed_bytes() >=
               mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
      // Increase compaction threads if bytes needed for compaction exceeds
      // 1/4 of threshold for slowing down.
      // If soft pending compaction byte limit is not set, always speed up
      // compaction.
      write_controller_token_ = write_controller->GetCompactionPressureToken();
      if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
        Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
            "[%s] Increasing compaction threads because of estimated pending "
            "compaction "
            "bytes %" PRIu64,
            name_.c_str(), vstorage->estimated_compaction_needed_bytes());
      }
652 653 654
    } else {
      write_controller_token_.reset();
    }
655
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
656 657 658
  }
}

L
Lei Jin 已提交
659
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
660
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
661 662
}

I
Igor Canadi 已提交
663 664 665
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
666

667 668 669 670
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

671 672 673 674
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

675
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
676
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
677
  assert(current_ != nullptr);
A
agiardullo 已提交
678 679
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
                      write_buffer_, earliest_seq);
680 681 682
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
683
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
684 685
  if (mem_ != nullptr) {
    delete mem_->Unref();
686
  }
A
agiardullo 已提交
687
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
688 689 690
  mem_->Ref();
}

691 692 693 694
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

695 696
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
697
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
698
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
699 700 701
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
702
  return result;
703 704
}

705
const int ColumnFamilyData::kCompactAllLevels = -1;
706
const int ColumnFamilyData::kCompactToBaseLevel = -2;
707

708
Compaction* ColumnFamilyData::CompactRange(
709 710 711
    const MutableCFOptions& mutable_cf_options, int input_level,
    int output_level, uint32_t output_path_id, const InternalKey* begin,
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
712
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
713
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
714
      output_level, output_path_id, begin, end, compaction_end, conflict);
S
sdong 已提交
715 716 717 718
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
719 720
}

721
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
722
    InstrumentedMutex* db_mutex) {
723
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
724 725 726 727
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
728 729 730 731 732
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
733
    InstrumentedMutex* db_mutex) {
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
756
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
757 758 759
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
760
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
784
    // storage has not been altered and no Scrape has happened. The
785 786 787 788 789 790 791 792 793 794 795
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

796
SuperVersion* ColumnFamilyData::InstallSuperVersion(
797
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
798 799 800 801 802
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
803
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
804
    const MutableCFOptions& mutable_cf_options) {
805
  new_superversion->db_mutex = db_mutex;
806
  new_superversion->mutable_cf_options = mutable_cf_options;
807 808 809 810
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
811
  super_version_->version_number = super_version_number_;
812
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
813
  ResetThreadLocalSuperVersions();
814

815
  RecalculateWriteStallConditions(mutable_cf_options);
816

817 818 819 820 821
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
822 823
}

824 825
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
826
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
827 828
  for (auto ptr : sv_ptrs) {
    assert(ptr);
829 830 831
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
832 833 834 835 836 837 838 839
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
840
#ifndef ROCKSDB_LITE
841
Status ColumnFamilyData::SetOptions(
842 843
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
844 845 846
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
847
    mutable_cf_options_ = new_mutable_cf_options;
848
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
849
  }
850
  return s;
851
}
I
Igor Canadi 已提交
852
#endif  // ROCKSDB_LITE
853

I
Igor Canadi 已提交
854
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
855
                                 const DBOptions* db_options,
L
Lei Jin 已提交
856
                                 const EnvOptions& env_options,
857
                                 Cache* table_cache,
858
                                 WriteBuffer* write_buffer,
859
                                 WriteController* write_controller)
860
    : max_column_family_(0),
861
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
862
                                      ColumnFamilyOptions(), db_options,
863
                                      env_options, nullptr)),
I
Igor Canadi 已提交
864
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
865 866
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
867
      env_options_(env_options),
868
      table_cache_(table_cache),
869
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
870
      write_controller_(write_controller) {
871
  // initialize linked list
872 873
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
874
}
I
Igor Canadi 已提交
875 876

ColumnFamilySet::~ColumnFamilySet() {
877 878 879 880
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
881 882
    delete cfd;
  }
883
  dummy_cfd_->Unref();
884
  delete dummy_cfd_;
I
Igor Canadi 已提交
885 886 887
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
888 889
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
890 891 892 893 894 895 896 897 898 899 900
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

901 902 903
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
904 905 906 907 908
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
909 910
    return nullptr;
  }
I
Igor Canadi 已提交
911 912 913 914 915 916
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

917 918 919 920 921 922
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

923 924 925 926
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
927
// under a DB mutex AND write thread
I
Igor Canadi 已提交
928 929 930 931 932
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
933 934 935
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
936
  column_families_.insert({name, id});
I
Igor Canadi 已提交
937 938
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
939
  // add to linked list
940 941 942 943 944
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
945 946 947
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
948 949 950
  return new_cfd;
}

951 952 953 954
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
955
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
956 957 958 959 960 961 962 963 964
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
965
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
966
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
967
  auto cfd_iter = column_family_data_.find(cfd->GetID());
968 969
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
970
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
971 972
}

I
Igor Canadi 已提交
973
// under a DB mutex OR from a write thread
974
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
975 976 977 978 979 980
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
981
  handle_.SetCFD(current_);
982 983
  return current_ != nullptr;
}
984

985 986 987 988 989 990 991 992 993 994
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

995
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
996
  assert(current_ != nullptr);
997
  return &handle_;
998 999
}

1000 1001 1002 1003 1004 1005 1006 1007 1008
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1009 1010 1011 1012 1013 1014 1015 1016 1017
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
1018
}  // namespace rocksdb