column_family.cc 56.5 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12

#include <algorithm>
13
#include <cinttypes>
I
Igor Canadi 已提交
14
#include <limits>
15 16
#include <string>
#include <vector>
17

18 19 20 21 22
#include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h"
#include "db/compaction/compaction_picker_universal.h"
#include "db/db_impl/db_impl.h"
23
#include "db/internal_stats.h"
24
#include "db/job_context.h"
25
#include "db/range_del_aggregator.h"
26
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28
#include "db/write_controller.h"
29
#include "file/sst_file_manager_impl.h"
30
#include "memtable/hash_skiplist_rep.h"
31 32
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
33
#include "port/port.h"
34
#include "table/block_based/block_based_table_factory.h"
35
#include "table/merging_iterator.h"
36
#include "util/autovector.h"
37
#include "util/compression.h"
I
Igor Canadi 已提交
38 39 40

namespace rocksdb {

I
Igor Canadi 已提交
41
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
43
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
44 45 46 47 48 49 50
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
51 52 53 54 55
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
56 57
    // Job id == 0 means that this is not our background process, but rather
    // user thread
58 59 60
    // Need to hold some shared pointers owned by the initial_cf_options
    // before final cleaning up finishes.
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61
    JobContext job_context(0);
62
    mutex_->Lock();
63 64
    bool dropped = cfd_->IsDropped();
    if (cfd_->UnrefAndTryDelete()) {
65 66 67
      if (dropped) {
        db_->FindObsoleteFiles(&job_context, false, true);
      }
68 69
    }
    mutex_->Unlock();
I
Igor Canadi 已提交
70
    if (job_context.HaveSomethingToDelete()) {
71 72 73 74 75 76 77 78
      bool defer_purge =
          db_->immutable_db_options().avoid_unnecessary_blocking_io;
      db_->PurgeObsoleteFiles(job_context, defer_purge);
      if (defer_purge) {
        mutex_->Lock();
        db_->SchedulePurge();
        mutex_->Unlock();
      }
79
    }
Y
Yueh-Hsuan Chiang 已提交
80
    job_context.Clean();
81 82 83
  }
}

84 85
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

86 87 88 89
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

90 91 92 93
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
94
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95 96
  return Status::OK();
#else
97
  (void)desc;
98 99 100 101
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

102
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103 104 105
  return cfd()->user_comparator();
}

106
void GetIntTblPropCollectorFactory(
107
    const ImmutableCFOptions& ioptions,
108 109
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
110 111
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112 113 114 115 116 117 118
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
}

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
138
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139
    if (!ZSTD_TrainDictionarySupported()) {
140
      return Status::InvalidArgument(
141 142
          "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
          "is not linked with the binary.");
143 144 145 146 147 148 149
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
150 151 152
  return Status::OK();
}

153 154 155 156 157 158
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
159 160 161 162
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
163 164 165
  return Status::OK();
}

166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
Status CheckCFPathsSupported(const DBOptions& db_options,
                             const ColumnFamilyOptions& cf_options) {
  // More than one cf_paths are supported only in universal
  // and level compaction styles. This function also checks the case
  // in which cf_paths is not specified, which results in db_paths
  // being used.
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
      (cf_options.compaction_style != kCompactionStyleLevel)) {
    if (cf_options.cf_paths.size() > 1) {
      return Status::NotSupported(
          "More than one CF paths are only supported in "
          "universal and level compaction styles. ");
    } else if (cf_options.cf_paths.empty() &&
               db_options.db_paths.size() > 1) {
      return Status::NotSupported(
          "More than one DB paths are only supported in "
          "universal and level compaction styles. ");
    }
  }
  return Status::OK();
}

188 189 190 191 192
namespace {
const uint64_t kDefaultTtl = 0xfffffffffffffffe;
const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
};  // namespace

193
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
194
                                    const ColumnFamilyOptions& src) {
195
  ColumnFamilyOptions result = src;
196 197
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
198
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
199
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
200 201 202
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
203 204 205 206 207 208
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
209 210 211 212
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
213 214 215 216
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

217 218 219 220 221 222 223
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
224 225 226 227 228 229

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

230 231 232
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
233 234 235 236 237 238 239 240
  // fall back max_write_buffer_number_to_maintain if
  // max_write_buffer_size_to_maintain is not set
  if (result.max_write_buffer_size_to_maintain < 0) {
    result.max_write_buffer_size_to_maintain =
        result.max_write_buffer_number *
        static_cast<int64_t>(result.write_buffer_size);
  } else if (result.max_write_buffer_size_to_maintain == 0 &&
             result.max_write_buffer_number_to_maintain < 0) {
241 242
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
243 244 245 246 247 248
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
249

250 251 252 253 254
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
255 256 257 258
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
259 260 261 262 263 264 265 266
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

267 268 269 270
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

271
  if (result.level0_file_num_compaction_trigger == 0) {
272 273
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
274 275 276
    result.level0_file_num_compaction_trigger = 1;
  }

277 278 279 280
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
281 282 283 284 285 286 287 288
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
289 290 291 292 293 294 295 296 297
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
298 299 300 301 302 303 304 305
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
306
  }
307 308 309 310 311 312 313 314 315 316 317

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
#ifndef ROCKSDB_LITE
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
    DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
  }
#endif

  if (result.cf_paths.empty()) {
    result.cf_paths = db_options.db_paths;
  }

333 334
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
335
        result.cf_paths.size() > 1U) {
336 337 338 339 340 341 342
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
343

344 345 346 347
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

348 349 350 351 352 353 354 355 356 357 358 359 360 361
  bool is_block_based_table =
      (result.table_factory->Name() == BlockBasedTableFactory().Name());

  const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
  if (result.ttl == kDefaultTtl) {
    if (is_block_based_table &&
        result.compaction_style != kCompactionStyleFIFO) {
      result.ttl = kAdjustedTtl;
    } else {
      result.ttl = 0;
    }
  }

  const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
362

363 364 365
  // Turn on periodic compactions and set them to occur once every 30 days if
  // compaction filters are used and periodic_compaction_seconds is set to the
  // default value.
366 367 368
  if (result.compaction_style != kCompactionStyleFIFO) {
    if ((result.compaction_filter != nullptr ||
         result.compaction_filter_factory != nullptr) &&
369 370 371
        result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
        is_block_based_table) {
      result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
372 373 374
    }
  } else {
    // result.compaction_style == kCompactionStyleFIFO
375
    if (result.ttl == 0) {
376 377 378 379 380
      if (is_block_based_table) {
        if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
          result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
        }
        result.ttl = result.periodic_compaction_seconds;
381 382 383 384
      }
    } else if (result.periodic_compaction_seconds != 0) {
      result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
    }
385 386
  }

387 388 389 390 391 392 393 394 395 396 397 398
  // TTL compactions would work similar to Periodic Compactions in Universal in
  // most of the cases. So, if ttl is set, execute the periodic compaction
  // codepath.
  if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
    if (result.periodic_compaction_seconds != 0) {
      result.periodic_compaction_seconds =
          std::min(result.ttl, result.periodic_compaction_seconds);
    } else {
      result.periodic_compaction_seconds = result.ttl;
    }
  }

399 400 401 402
  if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
    result.periodic_compaction_seconds = 0;
  }

403 404 405
  return result;
}

406 407 408
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
409

410 411 412 413 414 415 416 417 418 419 420 421 422
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
423
  uint32_t previous_refs = refs.fetch_sub(1);
424 425
  assert(previous_refs > 0);
  return previous_refs == 1;
426 427 428 429 430 431 432
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
433 434 435
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
436 437 438
    to_delete.push_back(m);
  }
  current->Unref();
439 440 441
  if (cfd->Unref()) {
    delete cfd;
  }
442 443
}

444 445 446
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
                        MemTableListVersion* new_imm, Version* new_current) {
  cfd = new_cfd;
447 448 449
  mem = new_mem;
  imm = new_imm;
  current = new_current;
450
  cfd->Ref();
451 452 453 454 455 456
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

457 458
namespace {
void SuperVersionUnrefHandle(void* ptr) {
459 460 461 462
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
463
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
464 465 466 467 468 469 470
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
471 472 473
}
}  // anonymous namespace

474 475
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
476
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
477
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
478 479
    const EnvOptions& env_options, ColumnFamilySet* column_family_set,
    BlockCacheTracer* const block_cache_tracer)
480 481
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
482
      dummy_versions_(_dummy_versions),
483
      current_(nullptr),
484
      refs_(0),
485
      initialized_(false),
486
      dropped_(false),
487
      internal_comparator_(cf_options.comparator),
488
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
489 490
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
491 492
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
493
      write_buffer_manager_(write_buffer_manager),
494
      mem_(nullptr),
495
      imm_(ioptions_.min_write_buffer_number_to_merge,
496 497
           ioptions_.max_write_buffer_number_to_maintain,
           ioptions_.max_write_buffer_size_to_maintain),
498 499
      super_version_(nullptr),
      super_version_number_(0),
500
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
501 502
      next_(nullptr),
      prev_(nullptr),
503
      log_number_(0),
Z
Zhongyi Xie 已提交
504
      flush_reason_(FlushReason::kOthers),
505
      column_family_set_(column_family_set),
506
      queued_for_flush_(false),
507
      queued_for_compaction_(false),
508
      prev_compaction_needed_bytes_(0),
509 510
      allow_2pc_(db_options.allow_2pc),
      last_memtable_id_(0) {
511 512
  Ref();

513
  // Convert user defined table properties collector factories to internal ones.
514
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
515

I
Igor Canadi 已提交
516 517
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
518
    internal_stats_.reset(
519
        new InternalStats(ioptions_.num_levels, db_options.env, this));
520 521
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache,
                                      block_cache_tracer));
522
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
523
      compaction_picker_.reset(
524
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
525 526 527 528
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
529
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
530
      compaction_picker_.reset(
531
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
532 533 534
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
535 536 537 538
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
539
#endif  // !ROCKSDB_LITE
540
    } else {
541 542 543 544
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
545 546
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
547
    }
548

549
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
550 551 552
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
553
      initial_cf_options_.Dump(ioptions_.info_log);
554
    } else {
555
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
556
    }
557
  }
558

559
  RecalculateWriteStallConditions(mutable_cf_options_);
560
}
I
Igor Canadi 已提交
561

562
// DB mutex held
I
Igor Canadi 已提交
563
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
564
  assert(refs_.load(std::memory_order_relaxed) == 0);
565 566 567 568 569 570
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
571 572 573 574
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
575
    column_family_set_->RemoveColumnFamily(this);
576 577 578 579 580 581
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

582 583
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
584
  assert(!queued_for_flush_);
585
  assert(!queued_for_compaction_);
586
  assert(super_version_ == nullptr);
587

588 589
  if (dummy_versions_ != nullptr) {
    // List must be empty
590
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
591
    bool deleted __attribute__((__unused__));
592
    deleted = dummy_versions_->Unref();
593
    assert(deleted);
594
  }
595

596 597
  if (mem_ != nullptr) {
    delete mem_->Unref();
598
  }
599
  autovector<MemTable*> to_delete;
600
  imm_.current()->Unref(&to_delete);
601 602 603 604 605
  for (MemTable* m : to_delete) {
    delete m;
  }
}

606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635
bool ColumnFamilyData::UnrefAndTryDelete() {
  int old_refs = refs_.fetch_sub(1);
  assert(old_refs > 0);

  if (old_refs == 1) {
    assert(super_version_ == nullptr);
    delete this;
    return true;
  }

  if (old_refs == 2 && super_version_ != nullptr) {
    // Only the super_version_ holds me
    SuperVersion* sv = super_version_;
    super_version_ = nullptr;
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    sv->db_mutex->Unlock();
    local_sv_.reset();
    sv->db_mutex->Lock();

    if (sv->Unref()) {
      // May delete this ColumnFamilyData after calling Cleanup()
      sv->Cleanup();
      delete sv;
      return true;
    }
  }
  return false;
}

I
Igor Canadi 已提交
636 637 638 639 640 641 642 643 644 645
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

646
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
647
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
648 649
}

650 651 652 653
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
S
Siying Dong 已提交
654 655 656
    autovector<MemTable*> empty_list;
    auto imm_prep_log =
        imm()->PrecomputeMinLogContainingPrepSection(empty_list);
657 658 659 660 661 662 663 664 665 666 667 668 669 670
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
671 672 673 674
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
675 676

namespace {
S
Siying Dong 已提交
677
// If penalize_stop is true, we further reduce slowdown rate.
678
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
679 680
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
681
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
682
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
683

684
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
707 708 709 710
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
711
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
712 713 714 715 716
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
717 718 719
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
720 721 722 723 724 725 726 727
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
728 729 730
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
731
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
732
                                         kDecSlowdownRatio);
733 734 735 736 737 738 739
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
740 741 742 743 744 745

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

746 747 748 749
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

750 751
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
752

753 754
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
755 756 757 758 759 760
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

761 762
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
763
  // Or twice as compaction trigger, if it is smaller.
764 765 766 767
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
768 769
    // res fits in int
    return static_cast<int>(res);
770
  }
771
}
772 773
}  // namespace

774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
    const MutableCFOptions& mutable_cf_options) {
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
                 mutable_cf_options.max_write_buffer_number - 1) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

809
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
810
      const MutableCFOptions& mutable_cf_options) {
811
  auto write_stall_condition = WriteStallCondition::kNormal;
812
  if (current_ != nullptr) {
S
sdong 已提交
813
    auto* vstorage = current_->storage_info();
814
    auto write_controller = column_family_set_->write_controller_;
815 816
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
817

818 819 820 821 822 823
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
824 825 826
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

827 828
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
829
      write_controller_token_ = write_controller->GetStopToken();
830
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
831 832
      ROCKS_LOG_WARN(
          ioptions_.info_log,
833
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
834
          "(waiting for flush), max_write_buffer_number is set to %d",
835
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
836
          mutable_cf_options.max_write_buffer_number);
837 838
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
839
      write_controller_token_ = write_controller->GetStopToken();
840
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
841 842
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
843
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
844
      }
845 846 847
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
848 849
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
850 851
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
852
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
853 854
      ROCKS_LOG_WARN(
          ioptions_.info_log,
855 856
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
857
          name_.c_str(), compaction_needed_bytes);
858 859
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
860
      write_controller_token_ =
861
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
862
                     prev_compaction_needed_bytes_, was_stopped,
863
                     mutable_cf_options.disable_auto_compactions);
864
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
865 866
      ROCKS_LOG_WARN(
          ioptions_.info_log,
867 868 869 870 871 872
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
873 874
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
875 876 877
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
878
      write_controller_token_ =
879
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
880
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
881
                     mutable_cf_options.disable_auto_compactions);
882 883
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
884 885
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
886
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
887
      }
888 889 890 891 892
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
893 894
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
895 896 897 898 899 900 901 902 903 904 905
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

906
      write_controller_token_ =
907
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
908
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
909
                     mutable_cf_options.disable_auto_compactions);
910
      internal_stats_->AddCFStats(
911
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
912 913
      ROCKS_LOG_WARN(
          ioptions_.info_log,
914
          "[%s] Stalling writes because of estimated pending compaction "
915 916 917
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
918
    } else {
919
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
920 921 922 923 924 925
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
926
        ROCKS_LOG_INFO(
927
            ioptions_.info_log,
S
Siying Dong 已提交
928 929 930 931 932 933 934 935 936 937 938 939
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
940
          ROCKS_LOG_INFO(
941
              ioptions_.info_log,
S
Siying Dong 已提交
942 943 944 945 946 947 948 949 950 951 952 953 954 955 956
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
957 958 959 960 961 962
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
963
      }
964
    }
965
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
966
  }
967
  return write_stall_condition;
968 969
}

L
Lei Jin 已提交
970
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
971
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
972 973
}

I
Igor Canadi 已提交
974 975 976
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
977

978 979 980 981
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

982 983 984 985
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

986 987 988 989
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  return current_->GetSstFilesSize();
}

990
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
991 992
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
993
                      write_buffer_manager_, earliest_seq, id_);
994 995 996
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
997
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
998 999
  if (mem_ != nullptr) {
    delete mem_->Unref();
1000
  }
A
agiardullo 已提交
1001
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1002 1003 1004
  mem_->Ref();
}

1005 1006 1007 1008
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

1009 1010
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
1011 1012 1013
  SequenceNumber earliest_mem_seqno =
      std::min(mem_->GetEarliestSequenceNumber(),
               imm_.current()->GetEarliestSequenceNumber(false));
S
sdong 已提交
1014
  auto* result = compaction_picker_->PickCompaction(
1015 1016
      GetName(), mutable_options, current_->storage_info(), log_buffer,
      earliest_mem_seqno);
S
sdong 已提交
1017 1018 1019
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
1020
  return result;
1021 1022
}

1023 1024 1025 1026 1027 1028 1029
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

1045
  auto read_seq = super_version->current->version_set()->LastSequence();
1046
  ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1047
  auto* active_range_del_iter =
1048 1049 1050 1051 1052 1053
      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
  range_del_agg.AddTombstones(
      std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
  super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
                                                 &range_del_agg);

1054
  Status status;
1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

1082
const int ColumnFamilyData::kCompactAllLevels = -1;
1083
const int ColumnFamilyData::kCompactToBaseLevel = -2;
1084

1085
Compaction* ColumnFamilyData::CompactRange(
1086
    const MutableCFOptions& mutable_cf_options, int input_level,
1087
    int output_level, const CompactRangeOptions& compact_range_options,
1088
    const InternalKey* begin, const InternalKey* end,
1089 1090
    InternalKey** compaction_end, bool* conflict,
    uint64_t max_file_num_to_ignore) {
S
sdong 已提交
1091
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
1092
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
1093 1094
      output_level, compact_range_options, begin, end, compaction_end, conflict,
      max_file_num_to_ignore);
S
sdong 已提交
1095 1096 1097 1098
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
1099 1100
}

1101
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
1102
    InstrumentedMutex* db_mutex) {
J
jsteemann 已提交
1103
  SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
I
Igor Canadi 已提交
1104 1105
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
1106 1107 1108 1109
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
1110
    sv->Unref();
1111 1112 1113 1114 1115
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
1116
    InstrumentedMutex* db_mutex) {
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
1135
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1136 1137
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1138
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1139 1140 1141
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1142
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1166
    // storage has not been altered and no Scrape has happened. The
1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1178 1179
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1180
  db_mutex->AssertHeld();
1181
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1182 1183
}

1184 1185
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1186
    const MutableCFOptions& mutable_cf_options) {
1187
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1188
  new_superversion->db_mutex = db_mutex;
1189
  new_superversion->mutable_cf_options = mutable_cf_options;
1190
  new_superversion->Init(this, mem_, imm_.current(), current_);
1191 1192 1193
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1194
  super_version_->version_number = super_version_number_;
1195 1196 1197 1198
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1199 1200 1201 1202 1203 1204
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1205 1206 1207 1208
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1220 1221
}

1222 1223
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1224
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1225 1226
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1227 1228 1229
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1230
    auto sv = static_cast<SuperVersion*>(ptr);
1231 1232 1233 1234 1235 1236
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1237 1238 1239
  }
}

1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253
Status ColumnFamilyData::ValidateOptions(
    const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
  Status s;
  s = CheckCompressionSupported(cf_options);
  if (s.ok() && db_options.allow_concurrent_memtable_write) {
    s = CheckConcurrentWritesSupported(cf_options);
  }
  if (s.ok()) {
    s = CheckCFPathsSupported(db_options, cf_options);
  }
  if (!s.ok()) {
    return s;
  }

1254
  if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1255 1256 1257 1258 1259 1260
    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
      return Status::NotSupported(
          "TTL is only supported in Block-Based Table format. ");
    }
  }

1261
  if (cf_options.periodic_compaction_seconds > 0 &&
1262
      cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1263 1264 1265 1266 1267 1268 1269 1270 1271
    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
      return Status::NotSupported(
          "Periodic Compaction is only supported in "
          "Block-Based Table format. ");
    }
  }
  return s;
}

I
Igor Canadi 已提交
1272
#ifndef ROCKSDB_LITE
1273
Status ColumnFamilyData::SetOptions(
1274 1275
    const DBOptions& db_options,
    const std::unordered_map<std::string, std::string>& options_map) {
1276
  MutableCFOptions new_mutable_cf_options;
1277 1278 1279
  Status s =
      GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                   ioptions_.info_log, &new_mutable_cf_options);
1280 1281 1282 1283 1284
  if (s.ok()) {
    ColumnFamilyOptions cf_options =
        BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
    s = ValidateOptions(db_options, cf_options);
  }
1285
  if (s.ok()) {
1286
    mutable_cf_options_ = new_mutable_cf_options;
1287
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1288
  }
1289
  return s;
1290
}
I
Igor Canadi 已提交
1291
#endif  // ROCKSDB_LITE
1292

S
Stream  
Shaohua Li 已提交
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335
Status ColumnFamilyData::AddDirectories() {
  Status s;
  assert(data_dirs_.empty());
  for (auto& p : ioptions_.cf_paths) {
    std::unique_ptr<Directory> path_directory;
    s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
    if (!s.ok()) {
      return s;
    }
    assert(path_directory != nullptr);
    data_dirs_.emplace_back(path_directory.release());
  }
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
  return s;
}

Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
  if (data_dirs_.empty()) {
    return nullptr;
  }

  assert(path_id < data_dirs_.size());
  return data_dirs_[path_id].get();
}

I
Igor Canadi 已提交
1336
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1337
                                 const ImmutableDBOptions* db_options,
L
Lei Jin 已提交
1338
                                 const EnvOptions& env_options,
1339
                                 Cache* table_cache,
1340
                                 WriteBufferManager* write_buffer_manager,
1341 1342
                                 WriteController* write_controller,
                                 BlockCacheTracer* const block_cache_tracer)
1343
    : max_column_family_(0),
1344 1345 1346
      dummy_cfd_(new ColumnFamilyData(
          0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
          env_options, nullptr, block_cache_tracer)),
I
Igor Canadi 已提交
1347
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1348 1349
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
1350
      env_options_(env_options),
1351
      table_cache_(table_cache),
1352
      write_buffer_manager_(write_buffer_manager),
1353 1354
      write_controller_(write_controller),
      block_cache_tracer_(block_cache_tracer) {
1355
  // initialize linked list
1356 1357
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1358
}
I
Igor Canadi 已提交
1359 1360

ColumnFamilySet::~ColumnFamilySet() {
1361 1362 1363
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1364
    bool last_ref __attribute__((__unused__));
1365
    last_ref = cfd->UnrefAndTryDelete();
1366
    assert(last_ref);
I
Igor Canadi 已提交
1367
  }
1368
  bool dummy_last_ref __attribute__((__unused__));
1369
  dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1370
  assert(dummy_last_ref);
I
Igor Canadi 已提交
1371 1372 1373
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1374 1375
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1387 1388 1389
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1390 1391 1392 1393 1394
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1395 1396
    return nullptr;
  }
I
Igor Canadi 已提交
1397 1398 1399 1400 1401 1402
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1403 1404 1405 1406 1407 1408
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1409 1410 1411 1412
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1413
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1414 1415 1416 1417
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1418 1419
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1420
      *db_options_, env_options_, this, block_cache_tracer_);
1421
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1422 1423
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1424
  // add to linked list
1425 1426 1427 1428 1429
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1430 1431 1432
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1433 1434 1435
  return new_cfd;
}

1436 1437 1438 1439
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1440
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1441 1442 1443 1444 1445 1446 1447 1448 1449
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1450
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1451
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1452
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1453 1454
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1455
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1456 1457
}

I
Igor Canadi 已提交
1458
// under a DB mutex OR from a write thread
1459
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1460 1461 1462 1463 1464 1465
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1466
  handle_.SetCFD(current_);
1467 1468
  return current_ != nullptr;
}
1469

1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1480
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1481
  assert(current_ != nullptr);
1482
  return &handle_;
1483 1484
}

1485 1486 1487 1488 1489 1490 1491 1492 1493
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1494 1495 1496
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1497
    return column_family->GetComparator();
1498 1499 1500 1501
  }
  return nullptr;
}

I
Igor Canadi 已提交
1502
}  // namespace rocksdb