column_family.cc 57.0 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12

#include <algorithm>
13
#include <cinttypes>
I
Igor Canadi 已提交
14
#include <limits>
15 16
#include <string>
#include <vector>
17

18 19 20 21 22
#include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h"
#include "db/compaction/compaction_picker_universal.h"
#include "db/db_impl/db_impl.h"
23
#include "db/internal_stats.h"
24
#include "db/job_context.h"
25
#include "db/range_del_aggregator.h"
26
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28
#include "db/write_controller.h"
29
#include "file/sst_file_manager_impl.h"
30
#include "memtable/hash_skiplist_rep.h"
31 32
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
33
#include "port/port.h"
34
#include "table/block_based/block_based_table_factory.h"
35
#include "table/merging_iterator.h"
36
#include "util/autovector.h"
37
#include "util/compression.h"
I
Igor Canadi 已提交
38 39 40

namespace rocksdb {

I
Igor Canadi 已提交
41
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
43
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
44 45 46 47 48 49 50
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
51 52 53 54 55
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
56 57
    // Job id == 0 means that this is not our background process, but rather
    // user thread
58 59 60
    // Need to hold some shared pointers owned by the initial_cf_options
    // before final cleaning up finishes.
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61
    JobContext job_context(0);
62
    mutex_->Lock();
63 64
    bool dropped = cfd_->IsDropped();
    if (cfd_->UnrefAndTryDelete()) {
65 66 67
      if (dropped) {
        db_->FindObsoleteFiles(&job_context, false, true);
      }
68 69
    }
    mutex_->Unlock();
I
Igor Canadi 已提交
70
    if (job_context.HaveSomethingToDelete()) {
71 72 73 74 75 76 77 78
      bool defer_purge =
          db_->immutable_db_options().avoid_unnecessary_blocking_io;
      db_->PurgeObsoleteFiles(job_context, defer_purge);
      if (defer_purge) {
        mutex_->Lock();
        db_->SchedulePurge();
        mutex_->Unlock();
      }
79
    }
Y
Yueh-Hsuan Chiang 已提交
80
    job_context.Clean();
81 82 83
  }
}

84 85
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

86 87 88 89
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

90 91 92 93
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
94
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95 96
  return Status::OK();
#else
97
  (void)desc;
98 99 100 101
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

102
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103 104 105
  return cfd()->user_comparator();
}

106
void GetIntTblPropCollectorFactory(
107
    const ImmutableCFOptions& ioptions,
108 109
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
110 111
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112 113 114 115 116 117 118
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
}

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
138
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139
    if (!ZSTD_TrainDictionarySupported()) {
140
      return Status::InvalidArgument(
141 142
          "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
          "is not linked with the binary.");
143 144 145 146 147 148 149
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
150 151 152
  return Status::OK();
}

153 154 155 156 157 158
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
159 160 161 162
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
163 164 165 166 167
  if (cf_options.max_successive_merges != 0) {
    return Status::InvalidArgument(
        "max_successive_merges > 0 is incompatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
168 169 170
  return Status::OK();
}

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
Status CheckCFPathsSupported(const DBOptions& db_options,
                             const ColumnFamilyOptions& cf_options) {
  // More than one cf_paths are supported only in universal
  // and level compaction styles. This function also checks the case
  // in which cf_paths is not specified, which results in db_paths
  // being used.
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
      (cf_options.compaction_style != kCompactionStyleLevel)) {
    if (cf_options.cf_paths.size() > 1) {
      return Status::NotSupported(
          "More than one CF paths are only supported in "
          "universal and level compaction styles. ");
    } else if (cf_options.cf_paths.empty() &&
               db_options.db_paths.size() > 1) {
      return Status::NotSupported(
          "More than one DB paths are only supported in "
          "universal and level compaction styles. ");
    }
  }
  return Status::OK();
}

193 194 195 196 197
namespace {
const uint64_t kDefaultTtl = 0xfffffffffffffffe;
const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
};  // namespace

198
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
199
                                    const ColumnFamilyOptions& src) {
200
  ColumnFamilyOptions result = src;
201 202
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
203
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
204
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
205 206 207
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
208 209 210 211 212 213
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
214 215 216 217
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
218 219 220 221
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

222 223 224 225 226 227 228
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
229 230 231 232 233 234

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

235 236 237
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
238 239 240 241 242 243 244 245
  // fall back max_write_buffer_number_to_maintain if
  // max_write_buffer_size_to_maintain is not set
  if (result.max_write_buffer_size_to_maintain < 0) {
    result.max_write_buffer_size_to_maintain =
        result.max_write_buffer_number *
        static_cast<int64_t>(result.write_buffer_size);
  } else if (result.max_write_buffer_size_to_maintain == 0 &&
             result.max_write_buffer_number_to_maintain < 0) {
246 247
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
248 249 250 251 252 253
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
254

255 256 257 258 259
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
260 261 262 263
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
264 265 266 267 268 269 270 271
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

272 273 274 275
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

276
  if (result.level0_file_num_compaction_trigger == 0) {
277 278
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
279 280 281
    result.level0_file_num_compaction_trigger = 1;
  }

282 283 284 285
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
286 287 288 289 290 291 292 293
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
294 295 296 297 298 299 300 301 302
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
303 304 305 306 307 308 309 310
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
311
  }
312 313 314 315 316 317 318 319 320 321 322

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
#ifndef ROCKSDB_LITE
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
    DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
  }
#endif

  if (result.cf_paths.empty()) {
    result.cf_paths = db_options.db_paths;
  }

338 339
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
340
        result.cf_paths.size() > 1U) {
341 342 343 344 345 346 347
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
348

349 350 351 352
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

353 354 355 356 357 358 359 360 361 362 363 364 365 366
  bool is_block_based_table =
      (result.table_factory->Name() == BlockBasedTableFactory().Name());

  const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
  if (result.ttl == kDefaultTtl) {
    if (is_block_based_table &&
        result.compaction_style != kCompactionStyleFIFO) {
      result.ttl = kAdjustedTtl;
    } else {
      result.ttl = 0;
    }
  }

  const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
367

368 369 370
  // Turn on periodic compactions and set them to occur once every 30 days if
  // compaction filters are used and periodic_compaction_seconds is set to the
  // default value.
371 372 373
  if (result.compaction_style != kCompactionStyleFIFO) {
    if ((result.compaction_filter != nullptr ||
         result.compaction_filter_factory != nullptr) &&
374 375 376
        result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
        is_block_based_table) {
      result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
377 378 379
    }
  } else {
    // result.compaction_style == kCompactionStyleFIFO
380
    if (result.ttl == 0) {
381 382 383 384 385
      if (is_block_based_table) {
        if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
          result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
        }
        result.ttl = result.periodic_compaction_seconds;
386 387 388 389
      }
    } else if (result.periodic_compaction_seconds != 0) {
      result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
    }
390 391
  }

392 393 394 395 396 397 398 399 400 401 402 403
  // TTL compactions would work similar to Periodic Compactions in Universal in
  // most of the cases. So, if ttl is set, execute the periodic compaction
  // codepath.
  if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
    if (result.periodic_compaction_seconds != 0) {
      result.periodic_compaction_seconds =
          std::min(result.ttl, result.periodic_compaction_seconds);
    } else {
      result.periodic_compaction_seconds = result.ttl;
    }
  }

404 405 406 407
  if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
    result.periodic_compaction_seconds = 0;
  }

408 409 410
  return result;
}

411 412 413
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
414

415 416 417 418 419 420 421 422 423 424 425 426 427
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
428
  uint32_t previous_refs = refs.fetch_sub(1);
429 430
  assert(previous_refs > 0);
  return previous_refs == 1;
431 432 433 434 435 436 437
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
438 439 440
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
441 442 443
    to_delete.push_back(m);
  }
  current->Unref();
444 445 446
  if (cfd->Unref()) {
    delete cfd;
  }
447 448
}

449 450 451
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
                        MemTableListVersion* new_imm, Version* new_current) {
  cfd = new_cfd;
452 453 454
  mem = new_mem;
  imm = new_imm;
  current = new_current;
455
  cfd->Ref();
456 457 458 459 460 461
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

462 463
namespace {
void SuperVersionUnrefHandle(void* ptr) {
464 465 466 467
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
468
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
469 470 471 472 473 474 475
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
476 477 478
}
}  // anonymous namespace

479 480
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
481
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
482
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
483
    const FileOptions& file_options, ColumnFamilySet* column_family_set,
484
    BlockCacheTracer* const block_cache_tracer)
485 486
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
487
      dummy_versions_(_dummy_versions),
488
      current_(nullptr),
489
      refs_(0),
490
      initialized_(false),
491
      dropped_(false),
492
      internal_comparator_(cf_options.comparator),
493
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
494 495
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
496 497
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
498
      write_buffer_manager_(write_buffer_manager),
499
      mem_(nullptr),
500
      imm_(ioptions_.min_write_buffer_number_to_merge,
501 502
           ioptions_.max_write_buffer_number_to_maintain,
           ioptions_.max_write_buffer_size_to_maintain),
503 504
      super_version_(nullptr),
      super_version_number_(0),
505
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
506 507
      next_(nullptr),
      prev_(nullptr),
508
      log_number_(0),
Z
Zhongyi Xie 已提交
509
      flush_reason_(FlushReason::kOthers),
510
      column_family_set_(column_family_set),
511
      queued_for_flush_(false),
512
      queued_for_compaction_(false),
513
      prev_compaction_needed_bytes_(0),
514 515
      allow_2pc_(db_options.allow_2pc),
      last_memtable_id_(0) {
516 517
  Ref();

518
  // Convert user defined table properties collector factories to internal ones.
519
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
520

I
Igor Canadi 已提交
521 522
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
523
    internal_stats_.reset(
524
        new InternalStats(ioptions_.num_levels, db_options.env, this));
525
    table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
526
                                      block_cache_tracer));
527
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
528
      compaction_picker_.reset(
529
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
530 531 532 533
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
534
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
535
      compaction_picker_.reset(
536
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
537 538 539
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
540 541 542 543
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
544
#endif  // !ROCKSDB_LITE
545
    } else {
546 547 548 549
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
550 551
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
552
    }
553

554
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
555 556 557
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
558
      initial_cf_options_.Dump(ioptions_.info_log);
559
    } else {
560
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
561
    }
562
  }
563

564
  RecalculateWriteStallConditions(mutable_cf_options_);
565
}
I
Igor Canadi 已提交
566

567
// DB mutex held
I
Igor Canadi 已提交
568
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
569
  assert(refs_.load(std::memory_order_relaxed) == 0);
570 571 572 573 574 575
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
576 577 578 579
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
580
    column_family_set_->RemoveColumnFamily(this);
581 582 583 584 585 586
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

587 588
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
589
  assert(!queued_for_flush_);
590
  assert(!queued_for_compaction_);
591
  assert(super_version_ == nullptr);
592

593 594
  if (dummy_versions_ != nullptr) {
    // List must be empty
595
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
596
    bool deleted __attribute__((__unused__));
597
    deleted = dummy_versions_->Unref();
598
    assert(deleted);
599
  }
600

601 602
  if (mem_ != nullptr) {
    delete mem_->Unref();
603
  }
604
  autovector<MemTable*> to_delete;
605
  imm_.current()->Unref(&to_delete);
606 607 608 609 610
  for (MemTable* m : to_delete) {
    delete m;
  }
}

611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
bool ColumnFamilyData::UnrefAndTryDelete() {
  int old_refs = refs_.fetch_sub(1);
  assert(old_refs > 0);

  if (old_refs == 1) {
    assert(super_version_ == nullptr);
    delete this;
    return true;
  }

  if (old_refs == 2 && super_version_ != nullptr) {
    // Only the super_version_ holds me
    SuperVersion* sv = super_version_;
    super_version_ = nullptr;
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    sv->db_mutex->Unlock();
    local_sv_.reset();
    sv->db_mutex->Lock();

    if (sv->Unref()) {
      // May delete this ColumnFamilyData after calling Cleanup()
      sv->Cleanup();
      delete sv;
      return true;
    }
  }
  return false;
}

I
Igor Canadi 已提交
641 642 643 644 645 646 647 648 649 650
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

651
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
652
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
653 654
}

655 656 657 658
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
S
Siying Dong 已提交
659 660 661
    autovector<MemTable*> empty_list;
    auto imm_prep_log =
        imm()->PrecomputeMinLogContainingPrepSection(empty_list);
662 663 664 665 666 667 668 669 670 671 672 673 674 675
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
676 677 678 679
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
680 681

namespace {
S
Siying Dong 已提交
682
// If penalize_stop is true, we further reduce slowdown rate.
683
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
684 685
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
686
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
687
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
688

689
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
712 713 714 715
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
716
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
717 718 719 720 721
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
722 723 724
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
725 726 727 728 729 730 731 732
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
733 734 735
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
736
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
737
                                         kDecSlowdownRatio);
738 739 740 741 742 743 744
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
745 746 747 748 749 750

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

751 752 753 754
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

755 756
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
757

758 759
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
760 761 762 763 764 765
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

766 767
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
768
  // Or twice as compaction trigger, if it is smaller.
769 770 771 772
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
773 774
    // res fits in int
    return static_cast<int>(res);
775
  }
776
}
777 778
}  // namespace

779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
    const MutableCFOptions& mutable_cf_options) {
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
                 mutable_cf_options.max_write_buffer_number - 1) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

814
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
815
      const MutableCFOptions& mutable_cf_options) {
816
  auto write_stall_condition = WriteStallCondition::kNormal;
817
  if (current_ != nullptr) {
S
sdong 已提交
818
    auto* vstorage = current_->storage_info();
819
    auto write_controller = column_family_set_->write_controller_;
820 821
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
822

823 824 825 826 827 828
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
829 830 831
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

832 833
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
834
      write_controller_token_ = write_controller->GetStopToken();
835
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
836 837
      ROCKS_LOG_WARN(
          ioptions_.info_log,
838
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
839
          "(waiting for flush), max_write_buffer_number is set to %d",
840
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
841
          mutable_cf_options.max_write_buffer_number);
842 843
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
844
      write_controller_token_ = write_controller->GetStopToken();
845
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
846 847
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
848
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
849
      }
850 851 852
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
853 854
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
855 856
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
857
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
858 859
      ROCKS_LOG_WARN(
          ioptions_.info_log,
860 861
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
862
          name_.c_str(), compaction_needed_bytes);
863 864
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
865
      write_controller_token_ =
866
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
867
                     prev_compaction_needed_bytes_, was_stopped,
868
                     mutable_cf_options.disable_auto_compactions);
869
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
870 871
      ROCKS_LOG_WARN(
          ioptions_.info_log,
872 873 874 875 876 877
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
878 879
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
880 881 882
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
883
      write_controller_token_ =
884
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
885
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
886
                     mutable_cf_options.disable_auto_compactions);
887 888
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
889 890
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
891
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
892
      }
893 894 895 896 897
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
898 899
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
900 901 902 903 904 905 906 907 908 909 910
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

911
      write_controller_token_ =
912
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
913
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
914
                     mutable_cf_options.disable_auto_compactions);
915
      internal_stats_->AddCFStats(
916
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
917 918
      ROCKS_LOG_WARN(
          ioptions_.info_log,
919
          "[%s] Stalling writes because of estimated pending compaction "
920 921 922
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
923
    } else {
924
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
925 926 927 928 929 930
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
931
        ROCKS_LOG_INFO(
932
            ioptions_.info_log,
S
Siying Dong 已提交
933 934 935 936 937 938 939 940 941 942 943 944
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
945
          ROCKS_LOG_INFO(
946
              ioptions_.info_log,
S
Siying Dong 已提交
947 948 949 950 951 952 953 954 955 956 957 958 959 960 961
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
962 963 964 965 966 967
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
968
      }
969
    }
970
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
971
  }
972
  return write_stall_condition;
973 974
}

975 976
const FileOptions* ColumnFamilyData::soptions() const {
  return &(column_family_set_->file_options_);
L
Lei Jin 已提交
977 978
}

I
Igor Canadi 已提交
979 980 981
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
982

983 984 985 986
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

987 988 989 990
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

991 992 993 994
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  return current_->GetSstFilesSize();
}

995
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
996 997
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
998
                      write_buffer_manager_, earliest_seq, id_);
999 1000 1001
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
1002
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1003 1004
  if (mem_ != nullptr) {
    delete mem_->Unref();
1005
  }
A
agiardullo 已提交
1006
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1007 1008 1009
  mem_->Ref();
}

1010 1011 1012 1013
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

1014 1015
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
1016 1017 1018
  SequenceNumber earliest_mem_seqno =
      std::min(mem_->GetEarliestSequenceNumber(),
               imm_.current()->GetEarliestSequenceNumber(false));
S
sdong 已提交
1019
  auto* result = compaction_picker_->PickCompaction(
1020 1021
      GetName(), mutable_options, current_->storage_info(), log_buffer,
      earliest_mem_seqno);
S
sdong 已提交
1022 1023 1024
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
1025
  return result;
1026 1027
}

1028 1029 1030 1031 1032 1033 1034
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

1050
  auto read_seq = super_version->current->version_set()->LastSequence();
1051
  ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1052
  auto* active_range_del_iter =
1053 1054 1055 1056 1057 1058
      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
  range_del_agg.AddTombstones(
      std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
  super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
                                                 &range_del_agg);

1059
  Status status;
1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

1087
const int ColumnFamilyData::kCompactAllLevels = -1;
1088
const int ColumnFamilyData::kCompactToBaseLevel = -2;
1089

1090
Compaction* ColumnFamilyData::CompactRange(
1091
    const MutableCFOptions& mutable_cf_options, int input_level,
1092
    int output_level, const CompactRangeOptions& compact_range_options,
1093
    const InternalKey* begin, const InternalKey* end,
1094 1095
    InternalKey** compaction_end, bool* conflict,
    uint64_t max_file_num_to_ignore) {
S
sdong 已提交
1096
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
1097
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
1098 1099
      output_level, compact_range_options, begin, end, compaction_end, conflict,
      max_file_num_to_ignore);
S
sdong 已提交
1100 1101 1102 1103
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
1104 1105
}

1106 1107
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
  SuperVersion* sv = GetThreadLocalSuperVersion(db);
I
Igor Canadi 已提交
1108 1109
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
1110 1111 1112 1113
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
1114
    sv->Unref();
1115 1116 1117 1118
  }
  return sv;
}

1119
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
1138
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1139 1140
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1141
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1142 1143 1144
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1145
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1146
      db->mutex()->Lock();
1147 1148 1149
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
1150 1151 1152 1153 1154 1155
      if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
        db->AddSuperVersionsToFreeQueue(sv);
        db->SchedulePurge();
      } else {
        sv_to_delete = sv;
      }
1156
    } else {
1157
      db->mutex()->Lock();
1158 1159
    }
    sv = super_version_->Ref();
1160
    db->mutex()->Unlock();
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1174
    // storage has not been altered and no Scrape has happened. The
1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1186 1187
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1188
  db_mutex->AssertHeld();
1189
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1190 1191
}

1192 1193
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1194
    const MutableCFOptions& mutable_cf_options) {
1195
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1196
  new_superversion->db_mutex = db_mutex;
1197
  new_superversion->mutable_cf_options = mutable_cf_options;
1198
  new_superversion->Init(this, mem_, imm_.current(), current_);
1199 1200 1201
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1202
  super_version_->version_number = super_version_number_;
1203 1204 1205 1206
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1207 1208 1209 1210 1211 1212
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1213 1214 1215 1216
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1228 1229
}

1230 1231
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1232
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1233 1234
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1235 1236 1237
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1238
    auto sv = static_cast<SuperVersion*>(ptr);
1239 1240 1241 1242 1243 1244
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1245 1246 1247
  }
}

1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261
Status ColumnFamilyData::ValidateOptions(
    const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
  Status s;
  s = CheckCompressionSupported(cf_options);
  if (s.ok() && db_options.allow_concurrent_memtable_write) {
    s = CheckConcurrentWritesSupported(cf_options);
  }
  if (s.ok()) {
    s = CheckCFPathsSupported(db_options, cf_options);
  }
  if (!s.ok()) {
    return s;
  }

1262
  if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1263 1264 1265 1266 1267 1268
    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
      return Status::NotSupported(
          "TTL is only supported in Block-Based Table format. ");
    }
  }

1269
  if (cf_options.periodic_compaction_seconds > 0 &&
1270
      cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1271 1272 1273 1274 1275 1276 1277 1278 1279
    if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
      return Status::NotSupported(
          "Periodic Compaction is only supported in "
          "Block-Based Table format. ");
    }
  }
  return s;
}

I
Igor Canadi 已提交
1280
#ifndef ROCKSDB_LITE
1281
Status ColumnFamilyData::SetOptions(
1282 1283
    const DBOptions& db_options,
    const std::unordered_map<std::string, std::string>& options_map) {
1284
  MutableCFOptions new_mutable_cf_options;
1285 1286 1287
  Status s =
      GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                   ioptions_.info_log, &new_mutable_cf_options);
1288 1289 1290 1291 1292
  if (s.ok()) {
    ColumnFamilyOptions cf_options =
        BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
    s = ValidateOptions(db_options, cf_options);
  }
1293
  if (s.ok()) {
1294
    mutable_cf_options_ = new_mutable_cf_options;
1295
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1296
  }
1297
  return s;
1298
}
I
Igor Canadi 已提交
1299
#endif  // ROCKSDB_LITE
1300

S
Stream  
Shaohua Li 已提交
1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
1314 1315 1316 1317
  } else if (level < base_level) {
    // There is no restriction which prevents level passed in to be smaller
    // than base_level.
    return Env::WLTH_MEDIUM;
S
Stream  
Shaohua Li 已提交
1318 1319 1320 1321 1322
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347
Status ColumnFamilyData::AddDirectories() {
  Status s;
  assert(data_dirs_.empty());
  for (auto& p : ioptions_.cf_paths) {
    std::unique_ptr<Directory> path_directory;
    s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
    if (!s.ok()) {
      return s;
    }
    assert(path_directory != nullptr);
    data_dirs_.emplace_back(path_directory.release());
  }
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
  return s;
}

Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
  if (data_dirs_.empty()) {
    return nullptr;
  }

  assert(path_id < data_dirs_.size());
  return data_dirs_[path_id].get();
}

I
Igor Canadi 已提交
1348
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1349
                                 const ImmutableDBOptions* db_options,
1350
                                 const FileOptions& file_options,
1351
                                 Cache* table_cache,
1352
                                 WriteBufferManager* write_buffer_manager,
1353 1354
                                 WriteController* write_controller,
                                 BlockCacheTracer* const block_cache_tracer)
1355
    : max_column_family_(0),
1356 1357
      dummy_cfd_(new ColumnFamilyData(
          0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
1358
          file_options, nullptr, block_cache_tracer)),
I
Igor Canadi 已提交
1359
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1360 1361
      db_name_(dbname),
      db_options_(db_options),
1362
      file_options_(file_options),
1363
      table_cache_(table_cache),
1364
      write_buffer_manager_(write_buffer_manager),
1365 1366
      write_controller_(write_controller),
      block_cache_tracer_(block_cache_tracer) {
1367
  // initialize linked list
1368 1369
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1370
}
I
Igor Canadi 已提交
1371 1372

ColumnFamilySet::~ColumnFamilySet() {
1373 1374 1375
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1376
    bool last_ref __attribute__((__unused__));
1377
    last_ref = cfd->UnrefAndTryDelete();
1378
    assert(last_ref);
I
Igor Canadi 已提交
1379
  }
1380
  bool dummy_last_ref __attribute__((__unused__));
1381
  dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1382
  assert(dummy_last_ref);
I
Igor Canadi 已提交
1383 1384 1385
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1386 1387
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1399 1400 1401
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1402 1403 1404 1405 1406
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1407 1408
    return nullptr;
  }
I
Igor Canadi 已提交
1409 1410 1411 1412 1413 1414
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1415 1416 1417 1418 1419 1420
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1421 1422 1423 1424
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1425
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1426 1427 1428 1429
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1430 1431
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1432
      *db_options_, file_options_, this, block_cache_tracer_);
1433
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1434 1435
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1436
  // add to linked list
1437 1438 1439 1440 1441
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1442 1443 1444
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1445 1446 1447
  return new_cfd;
}

1448 1449 1450 1451
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1452
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1453 1454 1455 1456 1457 1458 1459 1460 1461
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1462
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1463
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1464
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1465 1466
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1467
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1468 1469
}

I
Igor Canadi 已提交
1470
// under a DB mutex OR from a write thread
1471
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1472 1473 1474 1475 1476 1477
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1478
  handle_.SetCFD(current_);
1479 1480
  return current_ != nullptr;
}
1481

1482 1483 1484 1485 1486 1487 1488 1489 1490 1491
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1492
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1493
  assert(current_ != nullptr);
1494
  return &handle_;
1495 1496
}

1497 1498 1499 1500 1501 1502 1503 1504 1505
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1506 1507 1508
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1509
    return column_family->GetComparator();
1510 1511 1512 1513
  }
  return nullptr;
}

I
Igor Canadi 已提交
1514
}  // namespace rocksdb