column_family.cc 35.0 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24
#include "db/internal_stats.h"
25
#include "db/job_context.h"
26
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28
#include "db/write_controller.h"
29
#include "db/writebuffer.h"
30
#include "memtable/hash_skiplist_rep.h"
31
#include "util/autovector.h"
32
#include "util/compression.h"
33
#include "util/options_helper.h"
34
#include "util/thread_status_util.h"
35
#include "util/xfunc.h"
I
Igor Canadi 已提交
36 37 38

namespace rocksdb {

I
Igor Canadi 已提交
39
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
40
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
41
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
42 43 44 45 46 47 48
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
49 50 51
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
52 53 54 55
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
56
    db_->FindObsoleteFiles(&job_context, false, true);
57
    mutex_->Unlock();
I
Igor Canadi 已提交
58 59
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
60
    }
Y
Yueh-Hsuan Chiang 已提交
61
    job_context.Clean();
62 63 64
  }
}

65 66
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

67 68 69 70
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

71 72 73 74 75 76 77 78 79 80 81 82 83 84
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
  *desc = ColumnFamilyDescriptor(
      cfd()->GetName(),
      BuildColumnFamilyOptions(*cfd()->options(),
                               *cfd()->GetLatestMutableCFOptions()));
  return Status::OK();
#else
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

85 86 87 88
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
void GetIntTblPropCollectorFactory(
    const ColumnFamilyOptions& cf_options,
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
  auto& collector_factories = cf_options.table_properties_collector_factories;
  for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
  return Status::OK();
}

127 128 129 130 131 132 133 134 135 136 137 138 139 140
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
  if (cf_options.filter_deletes) {
    return Status::InvalidArgument(
        "Delete filtering (filter_deletes) is not compatible with concurrent "
        "memtable writes (allow_concurrent_memtable_writes)");
  }
  return Status::OK();
}

141 142 143
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
                                    const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
144 145
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
146 147 148 149
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
150 151
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
152
#endif
153 154 155
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
156 157 158 159 160 161
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
162 163 164 165
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
166 167 168 169 170 171 172
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
173 174 175
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
176 177 178 179 180 181 182 183 184 185 186 187
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
  XFUNC_TEST("memtablelist_history", "transaction_xftest_SanitizeOptions",
             xf_transaction_set_memtable_history1,
             xf_transaction_set_memtable_history,
             &result.max_write_buffer_number_to_maintain);
  XFUNC_TEST("memtablelist_history_clear", "transaction_xftest_SanitizeOptions",
             xf_transaction_clear_memtable_history1,
             xf_transaction_clear_memtable_history,
             &result.max_write_buffer_number_to_maintain);

188 189 190 191 192
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
193 194 195 196
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
197 198 199 200 201 202 203 204 205
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

206 207 208 209 210 211
  if (result.level0_file_num_compaction_trigger == 0) {
    Warn(db_options.info_log.get(),
         "level0_file_num_compaction_trigger cannot be 0");
    result.level0_file_num_compaction_trigger = 1;
  }

212 213 214 215
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
216
    Warn(db_options.info_log.get(),
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
         "This condition must be satisfied: "
         "level0_stop_writes_trigger(%d) >= "
         "level0_slowdown_writes_trigger(%d) >= "
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
233
    Warn(db_options.info_log.get(),
234 235 236 237 238 239 240 241
         "Adjust the value to "
         "level0_stop_writes_trigger(%d)"
         "level0_slowdown_writes_trigger(%d)"
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
  }
242 243 244 245 246 247 248 249 250 251
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
252

253 254 255
  return result;
}

256 257 258
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
259

260 261 262 263 264 265 266 267 268 269 270 271 272
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
273
  uint32_t previous_refs = refs.fetch_sub(1);
274 275
  assert(previous_refs > 0);
  return previous_refs == 1;
276 277 278 279 280 281 282
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
283 284 285
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

302 303
namespace {
void SuperVersionUnrefHandle(void* ptr) {
304 305 306 307
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
308 309 310 311 312 313 314 315 316 317
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

318 319 320 321 322
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
323 324
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
325
      dummy_versions_(_dummy_versions),
326
      current_(nullptr),
327 328
      refs_(0),
      dropped_(false),
329
      internal_comparator_(cf_options.comparator),
330 331
      options_(*db_options,
               SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
L
Lei Jin 已提交
332
      ioptions_(options_),
333
      mutable_cf_options_(options_, ioptions_),
334
      write_buffer_(write_buffer),
335
      mem_(nullptr),
336 337
      imm_(options_.min_write_buffer_number_to_merge,
           options_.max_write_buffer_number_to_maintain),
338 339
      super_version_(nullptr),
      super_version_number_(0),
340
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
341 342
      next_(nullptr),
      prev_(nullptr),
343
      log_number_(0),
344 345
      column_family_set_(column_family_set),
      pending_flush_(false),
346 347
      pending_compaction_(false),
      prev_compaction_needed_bytes_(0) {
348 349
  Ref();

350 351 352
  // Convert user defined table properties collector factories to internal ones.
  GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);

I
Igor Canadi 已提交
353 354
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
355
    internal_stats_.reset(
356
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
357
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
358
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
359
      compaction_picker_.reset(
360
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
361 362 363 364
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
365
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
366
      compaction_picker_.reset(
367
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
368 369 370 371 372 373 374
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
375
#endif  // !ROCKSDB_LITE
376 377 378 379 380 381 382
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
383
    }
384

385 386 387
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
388
      options_.DumpCFOptions(ioptions_.info_log);
389 390 391 392
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
393
  }
394

395
  RecalculateWriteStallConditions(mutable_cf_options_);
396
}
I
Igor Canadi 已提交
397

398
// DB mutex held
I
Igor Canadi 已提交
399
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
400
  assert(refs_.load(std::memory_order_relaxed) == 0);
401 402 403 404 405 406
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
407 408 409 410
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
411
    column_family_set_->RemoveColumnFamily(this);
412 413 414 415 416 417
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

418 419 420 421 422
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
423 424 425 426 427 428 429 430 431 432 433 434 435 436
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
437

438 439
  if (dummy_versions_ != nullptr) {
    // List must be empty
440 441 442
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
443
  }
444

445 446
  if (mem_ != nullptr) {
    delete mem_->Unref();
447
  }
448
  autovector<MemTable*> to_delete;
449
  imm_.current()->Unref(&to_delete);
450 451 452 453 454
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
455 456 457 458 459 460 461 462 463 464
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
const double kSlowdownRatio = 1.2;

namespace {
std::unique_ptr<WriteControllerToken> SetupDelay(
    uint64_t max_write_rate, WriteController* write_controller,
    uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes,
    bool auto_comapctions_disabled) {
  const uint64_t kMinWriteRate = 1024u;  // Minimum write rate 1KB/s.

  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
    if (prev_compaction_neeed_bytes > 0 &&
        prev_compaction_neeed_bytes <= compaction_needed_bytes) {
S
Siying Dong 已提交
498 499
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) /
                                         kSlowdownRatio);
500 501 502 503 504 505 506
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_neeed_bytes > compaction_needed_bytes) {
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
507 508
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kSlowdownRatio);
509 510 511 512 513 514 515 516 517
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
}  // namespace

518 519
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
520
  if (current_ != nullptr) {
S
sdong 已提交
521
    auto* vstorage = current_->storage_info();
522
    auto write_controller = column_family_set_->write_controller_;
523 524
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
525

526
    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
527 528
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
529
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
530
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
531
          "(waiting for flush), max_write_buffer_number is set to %d",
532
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
533
          mutable_cf_options.max_write_buffer_number);
534 535 536
    } else if (mutable_cf_options.max_write_buffer_number > 3 &&
               imm()->NumNotFlushed() >=
                   mutable_cf_options.max_write_buffer_number - 1) {
537 538 539 540
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
541 542 543
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "[%s] Stalling writes because we have %d immutable memtables "
544 545
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
546
          name_.c_str(), imm()->NumNotFlushed(),
547 548
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
S
sdong 已提交
549
    } else if (vstorage->l0_delay_trigger_count() >=
550
               mutable_cf_options.level0_stop_writes_trigger) {
551
      write_controller_token_ = write_controller->GetStopToken();
552 553 554 555 556
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
      }
557
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
558
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
559
          name_.c_str(), vstorage->l0_delay_trigger_count());
560
    } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
561
               compaction_needed_bytes >=
562 563 564 565 566
                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
          InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
567 568
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
569
          name_.c_str(), compaction_needed_bytes);
570
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
571
               vstorage->l0_delay_trigger_count() >=
572
                   mutable_cf_options.level0_slowdown_writes_trigger) {
573 574 575 576
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
577 578 579 580 581
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
      }
582
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
583 584 585 586
          "[%s] Stalling writes because we have %d level-0 files "
          "rate %" PRIu64,
          name_.c_str(), vstorage->l0_delay_trigger_count(),
          write_controller->delayed_write_rate());
587 588 589
    } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
               vstorage->estimated_compaction_needed_bytes() >=
                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
590 591 592 593
      write_controller_token_ =
          SetupDelay(ioptions_.delayed_write_rate, write_controller,
                     compaction_needed_bytes, prev_compaction_needed_bytes_,
                     mutable_cf_options.disable_auto_compactions);
594 595
      internal_stats_->AddCFStats(
          InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
596
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
597
          "[%s] Stalling writes because of estimated pending compaction "
598 599 600
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
601 602 603
    } else {
      write_controller_token_.reset();
    }
604
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
605 606 607
  }
}

L
Lei Jin 已提交
608
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
609
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
610 611
}

I
Igor Canadi 已提交
612 613 614
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
615

616 617 618 619
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

620 621 622 623
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

624
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
625
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
626
  assert(current_ != nullptr);
A
agiardullo 已提交
627 628
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
                      write_buffer_, earliest_seq);
629 630 631
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
632
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
633 634
  if (mem_ != nullptr) {
    delete mem_->Unref();
635
  }
A
agiardullo 已提交
636
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
637 638 639
  mem_->Ref();
}

640 641 642 643
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

644 645
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
646
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
647
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
648 649 650
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
651
  return result;
652 653
}

654
const int ColumnFamilyData::kCompactAllLevels = -1;
655
const int ColumnFamilyData::kCompactToBaseLevel = -2;
656

657
Compaction* ColumnFamilyData::CompactRange(
658 659 660
    const MutableCFOptions& mutable_cf_options, int input_level,
    int output_level, uint32_t output_path_id, const InternalKey* begin,
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
661
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
662
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
663
      output_level, output_path_id, begin, end, compaction_end, conflict);
S
sdong 已提交
664 665 666 667
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
668 669
}

670
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
671
    InstrumentedMutex* db_mutex) {
672
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
673 674 675 676
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
677 678 679 680 681
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
682
    InstrumentedMutex* db_mutex) {
683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
705
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
706 707 708
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
709
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
733
    // storage has not been altered and no Scrape has happened. The
734 735 736 737 738 739 740 741 742 743 744
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

745
SuperVersion* ColumnFamilyData::InstallSuperVersion(
746
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
747 748 749 750 751
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
752
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
753
    const MutableCFOptions& mutable_cf_options) {
754
  new_superversion->db_mutex = db_mutex;
755
  new_superversion->mutable_cf_options = mutable_cf_options;
756 757 758 759
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
760
  super_version_->version_number = super_version_number_;
761
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
762
  ResetThreadLocalSuperVersions();
763

764
  RecalculateWriteStallConditions(mutable_cf_options);
765

766 767 768 769 770
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
771 772
}

773 774
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
775
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
776 777
  for (auto ptr : sv_ptrs) {
    assert(ptr);
778 779 780
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
781 782 783 784 785 786 787 788
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
789
#ifndef ROCKSDB_LITE
790
Status ColumnFamilyData::SetOptions(
791 792
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
793 794 795
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
796
    mutable_cf_options_ = new_mutable_cf_options;
797
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
798
  }
799
  return s;
800
}
I
Igor Canadi 已提交
801
#endif  // ROCKSDB_LITE
802

I
Igor Canadi 已提交
803
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
804
                                 const DBOptions* db_options,
L
Lei Jin 已提交
805
                                 const EnvOptions& env_options,
806
                                 Cache* table_cache,
807
                                 WriteBuffer* write_buffer,
808
                                 WriteController* write_controller)
809
    : max_column_family_(0),
810
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
811
                                      ColumnFamilyOptions(), db_options,
812
                                      env_options, nullptr)),
I
Igor Canadi 已提交
813
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
814 815
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
816
      env_options_(env_options),
817
      table_cache_(table_cache),
818
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
819
      write_controller_(write_controller) {
820
  // initialize linked list
821 822
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
823
}
I
Igor Canadi 已提交
824 825

ColumnFamilySet::~ColumnFamilySet() {
826 827 828 829
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
830 831
    delete cfd;
  }
832
  dummy_cfd_->Unref();
833
  delete dummy_cfd_;
I
Igor Canadi 已提交
834 835 836
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
837 838
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
839 840 841 842 843 844 845 846 847 848 849
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

850 851 852
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
853 854 855 856 857
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
858 859
    return nullptr;
  }
I
Igor Canadi 已提交
860 861 862 863 864 865
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

866 867 868 869 870 871
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

872 873 874 875
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
876
// under a DB mutex AND write thread
I
Igor Canadi 已提交
877 878 879 880 881
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
882 883 884
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
885
  column_families_.insert({name, id});
I
Igor Canadi 已提交
886 887
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
888
  // add to linked list
889 890 891 892 893
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
894 895 896
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
897 898 899
  return new_cfd;
}

900 901 902 903
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
904
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
905 906 907 908 909 910 911 912 913
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
914
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
915
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
916
  auto cfd_iter = column_family_data_.find(cfd->GetID());
917 918
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
919
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
920 921
}

I
Igor Canadi 已提交
922
// under a DB mutex OR from a write thread
923
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
924 925 926 927 928 929
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
930
  handle_.SetCFD(current_);
931 932
  return current_ != nullptr;
}
933

934 935 936 937 938 939 940 941 942 943
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

944
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
945
  assert(current_ != nullptr);
946
  return &handle_;
947 948
}

949 950 951 952 953 954 955 956 957
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

958 959 960 961 962 963 964 965 966
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
967
}  // namespace rocksdb