column_family.cc 37.6 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 3 4 5 6 7 8 9
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24
#include "db/internal_stats.h"
25
#include "db/job_context.h"
26
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28
#include "db/write_controller.h"
29
#include "db/writebuffer.h"
30
#include "memtable/hash_skiplist_rep.h"
31
#include "util/autovector.h"
32
#include "util/compression.h"
33
#include "util/options_helper.h"
34
#include "util/thread_status_util.h"
35
#include "util/xfunc.h"
I
Igor Canadi 已提交
36 37 38

namespace rocksdb {

I
Igor Canadi 已提交
39
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
40
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
41
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
42 43 44 45 46 47 48
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
49 50 51
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
52 53 54 55
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
56
    db_->FindObsoleteFiles(&job_context, false, true);
57
    mutex_->Unlock();
I
Igor Canadi 已提交
58 59
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
60
    }
Y
Yueh-Hsuan Chiang 已提交
61
    job_context.Clean();
62 63 64
  }
}

65 66
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

67 68 69 70
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

71 72 73 74 75 76 77 78 79 80 81 82 83 84
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
  *desc = ColumnFamilyDescriptor(
      cfd()->GetName(),
      BuildColumnFamilyOptions(*cfd()->options(),
                               *cfd()->GetLatestMutableCFOptions()));
  return Status::OK();
#else
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

85 86 87 88
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
void GetIntTblPropCollectorFactory(
    const ColumnFamilyOptions& cf_options,
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
  auto& collector_factories = cf_options.table_properties_collector_factories;
  for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
  return Status::OK();
}

127 128 129 130 131 132 133 134 135 136 137
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
  if (cf_options.filter_deletes) {
    return Status::InvalidArgument(
        "Delete filtering (filter_deletes) is not compatible with concurrent "
        "memtable writes (allow_concurrent_memtable_writes)");
  }
138 139 140 141
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
142 143 144
  return Status::OK();
}

145 146 147
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
                                    const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
148 149
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
150 151 152 153
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
154 155
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
156
#endif
157 158 159
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
160 161 162 163 164 165
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
166 167 168 169
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
170 171 172 173 174 175 176
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
177 178 179
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
180 181 182 183 184 185 186 187 188 189 190 191
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
  XFUNC_TEST("memtablelist_history", "transaction_xftest_SanitizeOptions",
             xf_transaction_set_memtable_history1,
             xf_transaction_set_memtable_history,
             &result.max_write_buffer_number_to_maintain);
  XFUNC_TEST("memtablelist_history_clear", "transaction_xftest_SanitizeOptions",
             xf_transaction_clear_memtable_history1,
             xf_transaction_clear_memtable_history,
             &result.max_write_buffer_number_to_maintain);

192 193 194 195 196
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
197 198 199 200
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
201 202 203 204 205 206 207 208 209
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

210 211 212 213 214 215
  if (result.level0_file_num_compaction_trigger == 0) {
    Warn(db_options.info_log.get(),
         "level0_file_num_compaction_trigger cannot be 0");
    result.level0_file_num_compaction_trigger = 1;
  }

216 217 218 219
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
220
    Warn(db_options.info_log.get(),
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
         "This condition must be satisfied: "
         "level0_stop_writes_trigger(%d) >= "
         "level0_slowdown_writes_trigger(%d) >= "
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
237
    Warn(db_options.info_log.get(),
238 239 240 241 242 243 244 245
         "Adjust the value to "
         "level0_stop_writes_trigger(%d)"
         "level0_slowdown_writes_trigger(%d)"
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
  }
246 247 248 249 250 251 252 253 254 255 256

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

257 258 259 260 261 262 263 264 265 266
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
267

268 269 270
  return result;
}

271 272 273
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
274

275 276 277 278 279 280 281 282 283 284 285 286 287
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
288
  uint32_t previous_refs = refs.fetch_sub(1);
289 290
  assert(previous_refs > 0);
  return previous_refs == 1;
291 292 293 294 295 296 297
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
298 299 300
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

317 318
namespace {
void SuperVersionUnrefHandle(void* ptr) {
319 320 321 322
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
323 324 325 326 327 328 329 330 331 332
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

333 334 335 336 337
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
338 339
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
340
      dummy_versions_(_dummy_versions),
341
      current_(nullptr),
342 343
      refs_(0),
      dropped_(false),
344
      internal_comparator_(cf_options.comparator),
345 346
      options_(*db_options,
               SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
L
Lei Jin 已提交
347
      ioptions_(options_),
348
      mutable_cf_options_(options_, ioptions_),
349
      write_buffer_(write_buffer),
350
      mem_(nullptr),
351 352
      imm_(options_.min_write_buffer_number_to_merge,
           options_.max_write_buffer_number_to_maintain),
353 354
      super_version_(nullptr),
      super_version_number_(0),
355
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
356 357
      next_(nullptr),
      prev_(nullptr),
358
      log_number_(0),
359 360
      column_family_set_(column_family_set),
      pending_flush_(false),
361 362
      pending_compaction_(false),
      prev_compaction_needed_bytes_(0) {
363 364
  Ref();

365 366 367
  // Convert user defined table properties collector factories to internal ones.
  GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);

I
Igor Canadi 已提交
368 369
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
370
    internal_stats_.reset(
371
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
372
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
373
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
374
      compaction_picker_.reset(
375
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
376 377 378 379
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
380
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
381
      compaction_picker_.reset(
382
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
383 384 385 386 387 388 389
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
390
#endif  // !ROCKSDB_LITE
391 392 393 394 395 396 397
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
398
    }
399

400 401 402
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
403
      options_.DumpCFOptions(ioptions_.info_log);
404 405 406 407
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
408
  }
409

410
  RecalculateWriteStallConditions(mutable_cf_options_);
411
}
I
Igor Canadi 已提交
412

413
// DB mutex held
I
Igor Canadi 已提交
414
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
415
  assert(refs_.load(std::memory_order_relaxed) == 0);
416 417 418 419 420 421
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
422 423 424 425
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
426
    column_family_set_->RemoveColumnFamily(this);
427 428 429 430 431 432
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

433 434 435 436 437
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
438 439 440 441 442 443 444 445 446 447 448 449 450 451
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
452

453 454
  if (dummy_versions_ != nullptr) {
    // List must be empty
455 456 457
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
458
  }
459

460 461
  if (mem_ != nullptr) {
    delete mem_->Unref();
462
  }
463
  autovector<MemTable*> to_delete;
464
  imm_.current()->Unref(&to_delete);
465 466 467 468 469
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
470 471 472 473 474 475 476 477 478 479
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
const double kSlowdownRatio = 1.2;

namespace {
std::unique_ptr<WriteControllerToken> SetupDelay(
    uint64_t max_write_rate, WriteController* write_controller,
    uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes,
    bool auto_comapctions_disabled) {
  const uint64_t kMinWriteRate = 1024u;  // Minimum write rate 1KB/s.

  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
    if (prev_compaction_neeed_bytes > 0 &&
        prev_compaction_neeed_bytes <= compaction_needed_bytes) {
S
Siying Dong 已提交
513 514
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) /
                                         kSlowdownRatio);
515 516 517 518 519 520 521
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_neeed_bytes > compaction_needed_bytes) {
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
522 523
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kSlowdownRatio);
524 525 526 527 528 529 530
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
531 532 533 534 535 536 537 538 539 540 541 542 543 544 545

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
  // Or twice as compaction trigger, if it is smaller.
  return std::min(level0_file_num_compaction_trigger * 2,
                  level0_file_num_compaction_trigger +
                      (level0_slowdown_writes_trigger -
                       level0_file_num_compaction_trigger) /
                          4);
}
546 547
}  // namespace

548 549
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
550
  if (current_ != nullptr) {
S
sdong 已提交
551
    auto* vstorage = current_->storage_info();
552
    auto write_controller = column_family_set_->write_controller_;
553 554
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
555

556
    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
557 558
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
559
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
560
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
561
          "(waiting for flush), max_write_buffer_number is set to %d",
562
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
563
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
564
    } else if (vstorage->l0_delay_trigger_count() >=
565
               mutable_cf_options.level0_stop_writes_trigger) {
566
      write_controller_token_ = write_controller->GetStopToken();
567 568 569 570 571
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
      }
572
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
573
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
574
          name_.c_str(), vstorage->l0_delay_trigger_count());
575
    } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
576
               compaction_needed_bytes >=
577 578 579 580 581
                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
          InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
582 583
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
584
          name_.c_str(), compaction_needed_bytes);
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599
    } else if (mutable_cf_options.max_write_buffer_number > 3 &&
               imm()->NumNotFlushed() >=
                   mutable_cf_options.max_write_buffer_number - 1) {
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
600
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
601
               vstorage->l0_delay_trigger_count() >=
602
                   mutable_cf_options.level0_slowdown_writes_trigger) {
603 604 605 606
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
607 608 609 610 611
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
      }
612
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
613 614 615 616
          "[%s] Stalling writes because we have %d level-0 files "
          "rate %" PRIu64,
          name_.c_str(), vstorage->l0_delay_trigger_count(),
          write_controller->delayed_write_rate());
617 618 619
    } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
               vstorage->estimated_compaction_needed_bytes() >=
                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
620 621 622 623
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
624 625
      internal_stats_->AddCFStats(
          InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
626
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
627
          "[%s] Stalling writes because of estimated pending compaction "
628 629 630
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
    } else if (vstorage->l0_delay_trigger_count() >=
               GetL0ThresholdSpeedupCompaction(
                   mutable_cf_options.level0_file_num_compaction_trigger,
                   mutable_cf_options.level0_slowdown_writes_trigger)) {
      write_controller_token_ = write_controller->GetCompactionPressureToken();
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "[%s] Increasing compaction threads because we have %d level-0 "
          "files ",
          name_.c_str(), vstorage->l0_delay_trigger_count());
    } else if (vstorage->estimated_compaction_needed_bytes() >=
               mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
      // Increase compaction threads if bytes needed for compaction exceeds
      // 1/4 of threshold for slowing down.
      // If soft pending compaction byte limit is not set, always speed up
      // compaction.
      write_controller_token_ = write_controller->GetCompactionPressureToken();
      if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
        Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
            "[%s] Increasing compaction threads because of estimated pending "
            "compaction "
            "bytes %" PRIu64,
            name_.c_str(), vstorage->estimated_compaction_needed_bytes());
      }
654 655 656
    } else {
      write_controller_token_.reset();
    }
657
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
658 659 660
  }
}

L
Lei Jin 已提交
661
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
662
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
663 664
}

I
Igor Canadi 已提交
665 666 667
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
668

669 670 671 672
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

673 674 675 676
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

677
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
678
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
679
  assert(current_ != nullptr);
A
agiardullo 已提交
680 681
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
                      write_buffer_, earliest_seq);
682 683 684
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
685
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
686 687
  if (mem_ != nullptr) {
    delete mem_->Unref();
688
  }
A
agiardullo 已提交
689
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
690 691 692
  mem_->Ref();
}

693 694 695 696
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

697 698
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
699
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
700
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
701 702 703
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
704
  return result;
705 706
}

707
const int ColumnFamilyData::kCompactAllLevels = -1;
708
const int ColumnFamilyData::kCompactToBaseLevel = -2;
709

710
Compaction* ColumnFamilyData::CompactRange(
711 712 713
    const MutableCFOptions& mutable_cf_options, int input_level,
    int output_level, uint32_t output_path_id, const InternalKey* begin,
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
714
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
715
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
716
      output_level, output_path_id, begin, end, compaction_end, conflict);
S
sdong 已提交
717 718 719 720
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
721 722
}

723
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
724
    InstrumentedMutex* db_mutex) {
725
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
726 727 728 729
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
730 731 732 733 734
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
735
    InstrumentedMutex* db_mutex) {
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
758
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
759 760 761
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
762
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
786
    // storage has not been altered and no Scrape has happened. The
787 788 789 790 791 792 793 794 795 796 797
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

798
SuperVersion* ColumnFamilyData::InstallSuperVersion(
799
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
800 801 802 803 804
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
805
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
806
    const MutableCFOptions& mutable_cf_options) {
807
  new_superversion->db_mutex = db_mutex;
808
  new_superversion->mutable_cf_options = mutable_cf_options;
809 810 811 812
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
813
  super_version_->version_number = super_version_number_;
814
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
815
  ResetThreadLocalSuperVersions();
816

817
  RecalculateWriteStallConditions(mutable_cf_options);
818

819 820 821 822 823
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
824 825
}

826 827
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
828
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
829 830
  for (auto ptr : sv_ptrs) {
    assert(ptr);
831 832 833
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
834 835 836 837 838 839 840 841
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
842
#ifndef ROCKSDB_LITE
843
Status ColumnFamilyData::SetOptions(
844 845
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
846 847 848
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
849
    mutable_cf_options_ = new_mutable_cf_options;
850
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
851
  }
852
  return s;
853
}
I
Igor Canadi 已提交
854
#endif  // ROCKSDB_LITE
855

I
Igor Canadi 已提交
856
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
857
                                 const DBOptions* db_options,
L
Lei Jin 已提交
858
                                 const EnvOptions& env_options,
859
                                 Cache* table_cache,
860
                                 WriteBuffer* write_buffer,
861
                                 WriteController* write_controller)
862
    : max_column_family_(0),
863
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
864
                                      ColumnFamilyOptions(), db_options,
865
                                      env_options, nullptr)),
I
Igor Canadi 已提交
866
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
867 868
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
869
      env_options_(env_options),
870
      table_cache_(table_cache),
871
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
872
      write_controller_(write_controller) {
873
  // initialize linked list
874 875
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
876
}
I
Igor Canadi 已提交
877 878

ColumnFamilySet::~ColumnFamilySet() {
879 880 881 882
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
883 884
    delete cfd;
  }
885
  dummy_cfd_->Unref();
886
  delete dummy_cfd_;
I
Igor Canadi 已提交
887 888 889
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
890 891
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
892 893 894 895 896 897 898 899 900 901 902
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

903 904 905
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
906 907 908 909 910
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
911 912
    return nullptr;
  }
I
Igor Canadi 已提交
913 914 915 916 917 918
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

919 920 921 922 923 924
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

925 926 927 928
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
929
// under a DB mutex AND write thread
I
Igor Canadi 已提交
930 931 932 933 934
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
935 936 937
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
938
  column_families_.insert({name, id});
I
Igor Canadi 已提交
939 940
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
941
  // add to linked list
942 943 944 945 946
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
947 948 949
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
950 951 952
  return new_cfd;
}

953 954 955 956
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
957
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
958 959 960 961 962 963 964 965 966
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
967
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
968
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
969
  auto cfd_iter = column_family_data_.find(cfd->GetID());
970 971
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
972
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
973 974
}

I
Igor Canadi 已提交
975
// under a DB mutex OR from a write thread
976
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
977 978 979 980 981 982
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
983
  handle_.SetCFD(current_);
984 985
  return current_ != nullptr;
}
986

987 988 989 990 991 992 993 994 995 996
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

997
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
998
  assert(current_ != nullptr);
999
  return &handle_;
1000 1001
}

1002 1003 1004 1005 1006 1007 1008 1009 1010
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1011 1012 1013 1014 1015 1016 1017 1018 1019
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
1020
}  // namespace rocksdb