column_family.cc 55.0 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12

#include <algorithm>
13
#include <cinttypes>
I
Igor Canadi 已提交
14
#include <limits>
15 16
#include <string>
#include <vector>
17

18 19 20 21 22
#include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h"
#include "db/compaction/compaction_picker_universal.h"
#include "db/db_impl/db_impl.h"
23
#include "db/internal_stats.h"
24
#include "db/job_context.h"
25
#include "db/range_del_aggregator.h"
26
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28
#include "db/write_controller.h"
29
#include "file/sst_file_manager_impl.h"
30
#include "memtable/hash_skiplist_rep.h"
31 32
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
33
#include "port/port.h"
34
#include "table/block_based/block_based_table_factory.h"
35
#include "table/merging_iterator.h"
36
#include "util/autovector.h"
37
#include "util/compression.h"
I
Igor Canadi 已提交
38 39 40

namespace rocksdb {

I
Igor Canadi 已提交
41
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
43
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
44 45 46 47 48 49 50
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
51 52 53 54 55
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
56 57
    // Job id == 0 means that this is not our background process, but rather
    // user thread
58 59 60
    // Need to hold some shared pointers owned by the initial_cf_options
    // before final cleaning up finishes.
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61
    JobContext job_context(0);
62 63
    mutex_->Lock();
    if (cfd_->Unref()) {
64 65
      bool dropped = cfd_->IsDropped();

66
      delete cfd_;
67 68 69 70

      if (dropped) {
        db_->FindObsoleteFiles(&job_context, false, true);
      }
71 72
    }
    mutex_->Unlock();
I
Igor Canadi 已提交
73
    if (job_context.HaveSomethingToDelete()) {
74 75 76 77 78 79 80 81
      bool defer_purge =
          db_->immutable_db_options().avoid_unnecessary_blocking_io;
      db_->PurgeObsoleteFiles(job_context, defer_purge);
      if (defer_purge) {
        mutex_->Lock();
        db_->SchedulePurge();
        mutex_->Unlock();
      }
82
    }
Y
Yueh-Hsuan Chiang 已提交
83
    job_context.Clean();
84 85 86
  }
}

87 88
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

89 90 91 92
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

93 94 95 96
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
97
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
98 99
  return Status::OK();
#else
100
  (void)desc;
101 102 103 104
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

105
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
106 107 108
  return cfd()->user_comparator();
}

109
void GetIntTblPropCollectorFactory(
110
    const ImmutableCFOptions& ioptions,
111 112
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
113 114
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
115 116 117 118 119 120 121
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
}

122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
141
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
142
    if (!ZSTD_TrainDictionarySupported()) {
143
      return Status::InvalidArgument(
144 145
          "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
          "is not linked with the binary.");
146 147 148 149 150 151 152
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
153 154 155
  return Status::OK();
}

156 157 158 159 160 161
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
162 163 164 165
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
166 167 168
  return Status::OK();
}

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
Status CheckCFPathsSupported(const DBOptions& db_options,
                             const ColumnFamilyOptions& cf_options) {
  // More than one cf_paths are supported only in universal
  // and level compaction styles. This function also checks the case
  // in which cf_paths is not specified, which results in db_paths
  // being used.
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
      (cf_options.compaction_style != kCompactionStyleLevel)) {
    if (cf_options.cf_paths.size() > 1) {
      return Status::NotSupported(
          "More than one CF paths are only supported in "
          "universal and level compaction styles. ");
    } else if (cf_options.cf_paths.empty() &&
               db_options.db_paths.size() > 1) {
      return Status::NotSupported(
          "More than one DB paths are only supported in "
          "universal and level compaction styles. ");
    }
  }
  return Status::OK();
}

191
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
192
                                    const ColumnFamilyOptions& src) {
193
  ColumnFamilyOptions result = src;
194 195
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
196
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
197
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
198 199 200
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
201 202 203 204 205 206
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
207 208 209 210
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
211 212 213 214
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

215 216 217 218 219 220 221
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
222 223 224 225 226 227

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

228 229 230
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
231 232 233 234 235 236 237 238
  // fall back max_write_buffer_number_to_maintain if
  // max_write_buffer_size_to_maintain is not set
  if (result.max_write_buffer_size_to_maintain < 0) {
    result.max_write_buffer_size_to_maintain =
        result.max_write_buffer_number *
        static_cast<int64_t>(result.write_buffer_size);
  } else if (result.max_write_buffer_size_to_maintain == 0 &&
             result.max_write_buffer_number_to_maintain < 0) {
239 240
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
241 242 243 244 245 246
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
247

248 249 250 251 252
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
253 254 255 256
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
257 258 259 260 261 262 263 264
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

265 266 267 268
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

269
  if (result.level0_file_num_compaction_trigger == 0) {
270 271
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
272 273 274
    result.level0_file_num_compaction_trigger = 1;
  }

275 276 277 278
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
279 280 281 282 283 284 285 286
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
287 288 289 290 291 292 293 294 295
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
296 297 298 299 300 301 302 303
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
304
  }
305 306 307 308 309 310 311 312 313 314 315

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
#ifndef ROCKSDB_LITE
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
    DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
  }
#endif

  if (result.cf_paths.empty()) {
    result.cf_paths = db_options.db_paths;
  }

331 332
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
333
        result.cf_paths.size() > 1U) {
334 335 336 337 338 339 340
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
341

342 343 344 345
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

346 347 348
  const uint64_t kDefaultPeriodicCompSecs = 0xffffffffffffffff;
  const uint64_t kDefaultTtlSecs = 30 * 24 * 60 * 60;

349 350 351
  // Turn on periodic compactions and set them to occur once every 30 days if
  // compaction filters are used and periodic_compaction_seconds is set to the
  // default value.
352 353 354 355 356 357 358 359
  if (result.compaction_style != kCompactionStyleFIFO) {
    if ((result.compaction_filter != nullptr ||
         result.compaction_filter_factory != nullptr) &&
        result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
      result.periodic_compaction_seconds = kDefaultTtlSecs;
    }
  } else {
    // result.compaction_style == kCompactionStyleFIFO
360 361 362 363 364 365 366 367
    if (result.ttl == 0) {
      if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
        result.periodic_compaction_seconds = kDefaultTtlSecs;
      }
      result.ttl = result.periodic_compaction_seconds;
    } else if (result.periodic_compaction_seconds != 0) {
      result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
    }
368 369
  }

370 371 372
  return result;
}

373 374 375
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
376

377 378 379 380 381 382 383 384 385 386 387 388 389
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
390
  uint32_t previous_refs = refs.fetch_sub(1);
391 392
  assert(previous_refs > 0);
  return previous_refs == 1;
393 394 395 396 397 398 399
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
400 401 402
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

419 420
namespace {
void SuperVersionUnrefHandle(void* ptr) {
421 422 423 424
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
425
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
426 427 428 429 430 431 432
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
433 434 435
}
}  // anonymous namespace

436 437
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
438
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
439
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
440 441
    const EnvOptions& env_options, ColumnFamilySet* column_family_set,
    BlockCacheTracer* const block_cache_tracer)
442 443
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
444
      dummy_versions_(_dummy_versions),
445
      current_(nullptr),
446
      refs_(0),
447
      initialized_(false),
448
      dropped_(false),
449
      internal_comparator_(cf_options.comparator),
450
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
451 452
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
453 454
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
455
      write_buffer_manager_(write_buffer_manager),
456
      mem_(nullptr),
457
      imm_(ioptions_.min_write_buffer_number_to_merge,
458 459
           ioptions_.max_write_buffer_number_to_maintain,
           ioptions_.max_write_buffer_size_to_maintain),
460 461
      super_version_(nullptr),
      super_version_number_(0),
462
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
463 464
      next_(nullptr),
      prev_(nullptr),
465
      log_number_(0),
Z
Zhongyi Xie 已提交
466
      flush_reason_(FlushReason::kOthers),
467
      column_family_set_(column_family_set),
468
      queued_for_flush_(false),
469
      queued_for_compaction_(false),
470
      prev_compaction_needed_bytes_(0),
471 472
      allow_2pc_(db_options.allow_2pc),
      last_memtable_id_(0) {
473 474
  Ref();

475
  // Convert user defined table properties collector factories to internal ones.
476
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
477

I
Igor Canadi 已提交
478 479
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
480
    internal_stats_.reset(
481
        new InternalStats(ioptions_.num_levels, db_options.env, this));
482 483
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache,
                                      block_cache_tracer));
484
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
485
      compaction_picker_.reset(
486
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
487 488 489 490
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
491
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
492
      compaction_picker_.reset(
493
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
494 495 496
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
497 498 499 500
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
501
#endif  // !ROCKSDB_LITE
502
    } else {
503 504 505 506
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
507 508
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
509
    }
510

511
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
512 513 514
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
515
      initial_cf_options_.Dump(ioptions_.info_log);
516
    } else {
517
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
518
    }
519
  }
520

521
  RecalculateWriteStallConditions(mutable_cf_options_);
522
}
I
Igor Canadi 已提交
523

524
// DB mutex held
I
Igor Canadi 已提交
525
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
526
  assert(refs_.load(std::memory_order_relaxed) == 0);
527 528 529 530 531 532
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
533 534 535 536
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
537
    column_family_set_->RemoveColumnFamily(this);
538 539 540 541 542 543
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

544 545
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
546
  assert(!queued_for_flush_);
547
  assert(!queued_for_compaction_);
548

I
Igor Canadi 已提交
549 550 551 552 553 554 555
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

T
Tamir Duberstein 已提交
556
    bool is_last_reference __attribute__((__unused__));
I
Igor Canadi 已提交
557 558 559 560 561 562
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
563

564 565
  if (dummy_versions_ != nullptr) {
    // List must be empty
566
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
567
    bool deleted __attribute__((__unused__));
568
    deleted = dummy_versions_->Unref();
569
    assert(deleted);
570
  }
571

572 573
  if (mem_ != nullptr) {
    delete mem_->Unref();
574
  }
575
  autovector<MemTable*> to_delete;
576
  imm_.current()->Unref(&to_delete);
577 578 579 580 581
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
582 583 584 585 586 587 588 589 590 591
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

592
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
593
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
594 595
}

596 597 598 599
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
S
Siying Dong 已提交
600 601 602
    autovector<MemTable*> empty_list;
    auto imm_prep_log =
        imm()->PrecomputeMinLogContainingPrepSection(empty_list);
603 604 605 606 607 608 609 610 611 612 613 614 615 616
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
617 618 619 620
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
621 622

namespace {
S
Siying Dong 已提交
623
// If penalize_stop is true, we further reduce slowdown rate.
624
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
625 626
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
627
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
628
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
629

630
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
653 654 655 656
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
657
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
658 659 660 661 662
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
663 664 665
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
666 667 668 669 670 671 672 673
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
674 675 676
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
677
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
678
                                         kDecSlowdownRatio);
679 680 681 682 683 684 685
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
686 687 688 689 690 691

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

692 693 694 695
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

696 697
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
698

699 700
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
701 702 703 704 705 706
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

707 708
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
709
  // Or twice as compaction trigger, if it is smaller.
710 711 712 713
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
714 715
    // res fits in int
    return static_cast<int>(res);
716
  }
717
}
718 719
}  // namespace

720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
    const MutableCFOptions& mutable_cf_options) {
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
                 mutable_cf_options.max_write_buffer_number - 1) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

755
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
756
      const MutableCFOptions& mutable_cf_options) {
757
  auto write_stall_condition = WriteStallCondition::kNormal;
758
  if (current_ != nullptr) {
S
sdong 已提交
759
    auto* vstorage = current_->storage_info();
760
    auto write_controller = column_family_set_->write_controller_;
761 762
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
763

764 765 766 767 768 769
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
770 771 772
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

773 774
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
775
      write_controller_token_ = write_controller->GetStopToken();
776
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
777 778
      ROCKS_LOG_WARN(
          ioptions_.info_log,
779
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
780
          "(waiting for flush), max_write_buffer_number is set to %d",
781
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
782
          mutable_cf_options.max_write_buffer_number);
783 784
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
785
      write_controller_token_ = write_controller->GetStopToken();
786
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
787 788
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
789
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
790
      }
791 792 793
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
794 795
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
796 797
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
798
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
799 800
      ROCKS_LOG_WARN(
          ioptions_.info_log,
801 802
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
803
          name_.c_str(), compaction_needed_bytes);
804 805
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
806
      write_controller_token_ =
807
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
808
                     prev_compaction_needed_bytes_, was_stopped,
809
                     mutable_cf_options.disable_auto_compactions);
810
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
811 812
      ROCKS_LOG_WARN(
          ioptions_.info_log,
813 814 815 816 817 818
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
819 820
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
821 822 823
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
824
      write_controller_token_ =
825
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
826
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
827
                     mutable_cf_options.disable_auto_compactions);
828 829
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
830 831
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
832
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
833
      }
834 835 836 837 838
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
839 840
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
841 842 843 844 845 846 847 848 849 850 851
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

852
      write_controller_token_ =
853
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
854
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
855
                     mutable_cf_options.disable_auto_compactions);
856
      internal_stats_->AddCFStats(
857
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
858 859
      ROCKS_LOG_WARN(
          ioptions_.info_log,
860
          "[%s] Stalling writes because of estimated pending compaction "
861 862 863
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
864
    } else {
865
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
866 867 868 869 870 871
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
872
        ROCKS_LOG_INFO(
873
            ioptions_.info_log,
S
Siying Dong 已提交
874 875 876 877 878 879 880 881 882 883 884 885
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
886
          ROCKS_LOG_INFO(
887
              ioptions_.info_log,
S
Siying Dong 已提交
888 889 890 891 892 893 894 895 896 897 898 899 900 901 902
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
903 904 905 906 907 908
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
909
      }
910
    }
911
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
912
  }
913
  return write_stall_condition;
914 915
}

L
Lei Jin 已提交
916
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
917
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
918 919
}

I
Igor Canadi 已提交
920 921 922
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
923

924 925 926 927
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

928 929 930 931
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

932 933 934 935
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  return current_->GetSstFilesSize();
}

936
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
937 938
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
939
                      write_buffer_manager_, earliest_seq, id_);
940 941 942
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
943
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
944 945
  if (mem_ != nullptr) {
    delete mem_->Unref();
946
  }
A
agiardullo 已提交
947
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
948 949 950
  mem_->Ref();
}

951 952 953 954
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

955 956
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
957
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
958
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
959 960 961
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
962
  return result;
963 964
}

965 966 967 968 969 970 971
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

987
  auto read_seq = super_version->current->version_set()->LastSequence();
988
  ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
989
  auto* active_range_del_iter =
990 991 992 993 994 995
      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
  range_del_agg.AddTombstones(
      std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
  super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
                                                 &range_del_agg);

996
  Status status;
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

1024
const int ColumnFamilyData::kCompactAllLevels = -1;
1025
const int ColumnFamilyData::kCompactToBaseLevel = -2;
1026

1027
Compaction* ColumnFamilyData::CompactRange(
1028
    const MutableCFOptions& mutable_cf_options, int input_level,
1029
    int output_level, const CompactRangeOptions& compact_range_options,
1030
    const InternalKey* begin, const InternalKey* end,
1031 1032
    InternalKey** compaction_end, bool* conflict,
    uint64_t max_file_num_to_ignore) {
S
sdong 已提交
1033
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
1034
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
1035 1036
      output_level, compact_range_options, begin, end, compaction_end, conflict,
      max_file_num_to_ignore);
S
sdong 已提交
1037 1038 1039 1040
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
1041 1042
}

1043
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
1044
    InstrumentedMutex* db_mutex) {
J
jsteemann 已提交
1045
  SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
I
Igor Canadi 已提交
1046 1047
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
1048 1049 1050 1051
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
1052
    sv->Unref();
1053 1054 1055 1056 1057
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
1058
    InstrumentedMutex* db_mutex) {
1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
1077
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1078 1079
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1080
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1081 1082 1083
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1084
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1108
    // storage has not been altered and no Scrape has happened. The
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1120 1121
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1122
  db_mutex->AssertHeld();
1123
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1124 1125
}

1126 1127
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1128
    const MutableCFOptions& mutable_cf_options) {
1129
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1130
  new_superversion->db_mutex = db_mutex;
1131
  new_superversion->mutable_cf_options = mutable_cf_options;
1132 1133 1134 1135
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1136
  super_version_->version_number = super_version_number_;
1137 1138 1139 1140
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1141 1142 1143 1144 1145 1146
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1147 1148 1149 1150
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1162 1163
}

1164 1165
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1166
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1167 1168
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1169 1170 1171
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1172
    auto sv = static_cast<SuperVersion*>(ptr);
1173 1174 1175 1176 1177 1178
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1179 1180 1181
  }
}

1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
Status ColumnFamilyData::ValidateOptions(
    const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
  Status s;
  s = CheckCompressionSupported(cf_options);
  if (s.ok() && db_options.allow_concurrent_memtable_write) {
    s = CheckConcurrentWritesSupported(cf_options);
  }
  if (s.ok()) {
    s = CheckCFPathsSupported(db_options, cf_options);
  }
  if (!s.ok()) {
    return s;
  }

  if (cf_options.ttl > 0) {
    if (db_options.max_open_files != -1) {
      return Status::NotSupported(
          "TTL is only supported when files are always "
          "kept open (set max_open_files = -1). ");
    }
    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
      return Status::NotSupported(
          "TTL is only supported in Block-Based Table format. ");
    }
  }

1208 1209
  if (cf_options.periodic_compaction_seconds > 0 &&
      cf_options.periodic_compaction_seconds < port::kMaxUint64) {
1210 1211 1212 1213 1214 1215 1216 1217 1218
    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
      return Status::NotSupported(
          "Periodic Compaction is only supported in "
          "Block-Based Table format. ");
    }
  }
  return s;
}

I
Igor Canadi 已提交
1219
#ifndef ROCKSDB_LITE
1220
Status ColumnFamilyData::SetOptions(
1221 1222
    const DBOptions& db_options,
    const std::unordered_map<std::string, std::string>& options_map) {
1223
  MutableCFOptions new_mutable_cf_options;
1224 1225 1226
  Status s =
      GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                   ioptions_.info_log, &new_mutable_cf_options);
1227 1228 1229 1230 1231
  if (s.ok()) {
    ColumnFamilyOptions cf_options =
        BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
    s = ValidateOptions(db_options, cf_options);
  }
1232
  if (s.ok()) {
1233
    mutable_cf_options_ = new_mutable_cf_options;
1234
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1235
  }
1236
  return s;
1237
}
I
Igor Canadi 已提交
1238
#endif  // ROCKSDB_LITE
1239

S
Stream  
Shaohua Li 已提交
1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282
Status ColumnFamilyData::AddDirectories() {
  Status s;
  assert(data_dirs_.empty());
  for (auto& p : ioptions_.cf_paths) {
    std::unique_ptr<Directory> path_directory;
    s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
    if (!s.ok()) {
      return s;
    }
    assert(path_directory != nullptr);
    data_dirs_.emplace_back(path_directory.release());
  }
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
  return s;
}

Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
  if (data_dirs_.empty()) {
    return nullptr;
  }

  assert(path_id < data_dirs_.size());
  return data_dirs_[path_id].get();
}

I
Igor Canadi 已提交
1283
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1284
                                 const ImmutableDBOptions* db_options,
L
Lei Jin 已提交
1285
                                 const EnvOptions& env_options,
1286
                                 Cache* table_cache,
1287
                                 WriteBufferManager* write_buffer_manager,
1288 1289
                                 WriteController* write_controller,
                                 BlockCacheTracer* const block_cache_tracer)
1290
    : max_column_family_(0),
1291 1292 1293
      dummy_cfd_(new ColumnFamilyData(
          0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
          env_options, nullptr, block_cache_tracer)),
I
Igor Canadi 已提交
1294
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1295 1296
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
1297
      env_options_(env_options),
1298
      table_cache_(table_cache),
1299
      write_buffer_manager_(write_buffer_manager),
1300 1301
      write_controller_(write_controller),
      block_cache_tracer_(block_cache_tracer) {
1302
  // initialize linked list
1303 1304
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1305
}
I
Igor Canadi 已提交
1306 1307

ColumnFamilySet::~ColumnFamilySet() {
1308 1309 1310
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1311 1312
    bool last_ref __attribute__((__unused__));
    last_ref = cfd->Unref();
1313
    assert(last_ref);
I
Igor Canadi 已提交
1314 1315
    delete cfd;
  }
1316 1317
  bool dummy_last_ref __attribute__((__unused__));
  dummy_last_ref = dummy_cfd_->Unref();
1318
  assert(dummy_last_ref);
1319
  delete dummy_cfd_;
I
Igor Canadi 已提交
1320 1321 1322
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1323 1324
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1336 1337 1338
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1339 1340 1341 1342 1343
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1344 1345
    return nullptr;
  }
I
Igor Canadi 已提交
1346 1347 1348 1349 1350 1351
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1352 1353 1354 1355 1356 1357
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1358 1359 1360 1361
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1362
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1363 1364 1365 1366
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1367 1368
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1369
      *db_options_, env_options_, this, block_cache_tracer_);
1370
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1371 1372
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1373
  // add to linked list
1374 1375 1376 1377 1378
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1379 1380 1381
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1382 1383 1384
  return new_cfd;
}

1385 1386 1387 1388
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1389
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1390 1391 1392 1393 1394 1395 1396 1397 1398
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1399
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1400
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1401
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1402 1403
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1404
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1405 1406
}

I
Igor Canadi 已提交
1407
// under a DB mutex OR from a write thread
1408
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1409 1410 1411 1412 1413 1414
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1415
  handle_.SetCFD(current_);
1416 1417
  return current_ != nullptr;
}
1418

1419 1420 1421 1422 1423 1424 1425 1426 1427 1428
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1429
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1430
  assert(current_ != nullptr);
1431
  return &handle_;
1432 1433
}

1434 1435 1436 1437 1438 1439 1440 1441 1442
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1443 1444 1445
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1446
    return column_family->GetComparator();
1447 1448 1449 1450
  }
  return nullptr;
}

I
Igor Canadi 已提交
1451
}  // namespace rocksdb