column_family.cc 59.6 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12

#include <algorithm>
13
#include <cinttypes>
I
Igor Canadi 已提交
14
#include <limits>
15
#include <sstream>
16 17
#include <string>
#include <vector>
18

19 20 21 22 23
#include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h"
#include "db/compaction/compaction_picker_universal.h"
#include "db/db_impl/db_impl.h"
24
#include "db/internal_stats.h"
25
#include "db/job_context.h"
26
#include "db/range_del_aggregator.h"
27
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
28
#include "db/version_set.h"
29
#include "db/write_controller.h"
30
#include "file/sst_file_manager_impl.h"
31
#include "memtable/hash_skiplist_rep.h"
32 33
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
34
#include "port/port.h"
35
#include "rocksdb/table.h"
36
#include "table/merging_iterator.h"
37
#include "util/autovector.h"
38
#include "util/cast_util.h"
39
#include "util/compression.h"
I
Igor Canadi 已提交
40

41
namespace ROCKSDB_NAMESPACE {
I
Igor Canadi 已提交
42

I
Igor Canadi 已提交
43
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
44
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
45
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
46 47 48 49 50 51 52
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
53 54 55 56 57
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
58 59
    // Job id == 0 means that this is not our background process, but rather
    // user thread
60 61 62
    // Need to hold some shared pointers owned by the initial_cf_options
    // before final cleaning up finishes.
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
63
    JobContext job_context(0);
64
    mutex_->Lock();
65 66
    bool dropped = cfd_->IsDropped();
    if (cfd_->UnrefAndTryDelete()) {
67 68 69
      if (dropped) {
        db_->FindObsoleteFiles(&job_context, false, true);
      }
70 71
    }
    mutex_->Unlock();
I
Igor Canadi 已提交
72
    if (job_context.HaveSomethingToDelete()) {
73 74 75 76 77 78 79 80
      bool defer_purge =
          db_->immutable_db_options().avoid_unnecessary_blocking_io;
      db_->PurgeObsoleteFiles(job_context, defer_purge);
      if (defer_purge) {
        mutex_->Lock();
        db_->SchedulePurge();
        mutex_->Unlock();
      }
81
    }
Y
Yueh-Hsuan Chiang 已提交
82
    job_context.Clean();
83 84 85
  }
}

86 87
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

88 89 90 91
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

92 93 94 95
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
96
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
97 98
  return Status::OK();
#else
99
  (void)desc;
100 101 102 103
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

104
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
105 106 107
  return cfd()->user_comparator();
}

108
void GetIntTblPropCollectorFactory(
109
    const ImmutableCFOptions& ioptions,
110 111
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
112 113
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
114 115 116 117 118 119 120
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
}

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
140
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
141
    if (!ZSTD_TrainDictionarySupported()) {
142
      return Status::InvalidArgument(
143 144
          "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
          "is not linked with the binary.");
145 146 147 148 149 150 151
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
152 153 154 155 156 157 158 159 160 161

  if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
    std::ostringstream oss;
    oss << "The specified blob compression type "
        << CompressionTypeToString(cf_options.blob_compression_type)
        << " is not available.";

    return Status::InvalidArgument(oss.str());
  }

162 163 164
  return Status::OK();
}

165 166 167 168 169 170
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
171 172 173 174
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
175 176 177
  return Status::OK();
}

178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
Status CheckCFPathsSupported(const DBOptions& db_options,
                             const ColumnFamilyOptions& cf_options) {
  // More than one cf_paths are supported only in universal
  // and level compaction styles. This function also checks the case
  // in which cf_paths is not specified, which results in db_paths
  // being used.
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
      (cf_options.compaction_style != kCompactionStyleLevel)) {
    if (cf_options.cf_paths.size() > 1) {
      return Status::NotSupported(
          "More than one CF paths are only supported in "
          "universal and level compaction styles. ");
    } else if (cf_options.cf_paths.empty() &&
               db_options.db_paths.size() > 1) {
      return Status::NotSupported(
          "More than one DB paths are only supported in "
          "universal and level compaction styles. ");
    }
  }
  return Status::OK();
}

200 201 202 203 204
namespace {
const uint64_t kDefaultTtl = 0xfffffffffffffffe;
const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
};  // namespace

205
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
206
                                    const ColumnFamilyOptions& src) {
207
  ColumnFamilyOptions result = src;
208 209
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
210
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
211
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
212 213 214
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
215 216 217 218 219 220
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
221 222 223 224
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
225 226 227 228
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

229 230 231 232 233 234 235
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
236 237 238 239 240 241

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

242 243 244
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
245 246 247 248 249 250 251 252
  // fall back max_write_buffer_number_to_maintain if
  // max_write_buffer_size_to_maintain is not set
  if (result.max_write_buffer_size_to_maintain < 0) {
    result.max_write_buffer_size_to_maintain =
        result.max_write_buffer_number *
        static_cast<int64_t>(result.write_buffer_size);
  } else if (result.max_write_buffer_size_to_maintain == 0 &&
             result.max_write_buffer_number_to_maintain < 0) {
253 254
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
255 256 257 258 259 260
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
261

262 263 264 265 266
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
267 268 269 270
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
271 272 273 274 275 276 277 278
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

279 280 281 282
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

283
  if (result.level0_file_num_compaction_trigger == 0) {
284 285
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "level0_file_num_compaction_trigger cannot be 0");
286 287 288
    result.level0_file_num_compaction_trigger = 1;
  }

289 290 291 292
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
293 294 295 296 297 298 299 300
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
301 302 303 304 305 306 307 308 309
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
310 311 312 313 314 315 316 317
    ROCKS_LOG_WARN(db_options.info_log.get(),
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
318
  }
319 320 321 322 323 324 325 326 327 328 329

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
#ifndef ROCKSDB_LITE
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
    DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
  }
#endif

  if (result.cf_paths.empty()) {
    result.cf_paths = db_options.db_paths;
  }

345 346
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
347
        result.cf_paths.size() > 1U) {
348 349 350 351 352 353 354
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
355

356 357 358 359
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

360 361
  bool is_block_based_table = (result.table_factory->IsInstanceOf(
      TableFactory::kBlockBasedTableName()));
362 363 364 365 366 367 368 369 370 371 372 373

  const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
  if (result.ttl == kDefaultTtl) {
    if (is_block_based_table &&
        result.compaction_style != kCompactionStyleFIFO) {
      result.ttl = kAdjustedTtl;
    } else {
      result.ttl = 0;
    }
  }

  const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
374

375 376 377
  // Turn on periodic compactions and set them to occur once every 30 days if
  // compaction filters are used and periodic_compaction_seconds is set to the
  // default value.
378 379 380
  if (result.compaction_style != kCompactionStyleFIFO) {
    if ((result.compaction_filter != nullptr ||
         result.compaction_filter_factory != nullptr) &&
381 382 383
        result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
        is_block_based_table) {
      result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
384 385 386
    }
  } else {
    // result.compaction_style == kCompactionStyleFIFO
387
    if (result.ttl == 0) {
388 389 390 391 392
      if (is_block_based_table) {
        if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
          result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
        }
        result.ttl = result.periodic_compaction_seconds;
393 394 395 396
      }
    } else if (result.periodic_compaction_seconds != 0) {
      result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
    }
397 398
  }

399 400 401 402 403 404 405 406 407 408 409 410
  // TTL compactions would work similar to Periodic Compactions in Universal in
  // most of the cases. So, if ttl is set, execute the periodic compaction
  // codepath.
  if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
    if (result.periodic_compaction_seconds != 0) {
      result.periodic_compaction_seconds =
          std::min(result.ttl, result.periodic_compaction_seconds);
    } else {
      result.periodic_compaction_seconds = result.ttl;
    }
  }

411 412 413 414
  if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
    result.periodic_compaction_seconds = 0;
  }

415 416 417
  return result;
}

418 419 420
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
421

422 423 424 425 426 427 428 429 430 431 432 433 434
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
435
  uint32_t previous_refs = refs.fetch_sub(1);
436 437
  assert(previous_refs > 0);
  return previous_refs == 1;
438 439 440 441 442 443 444
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
445 446 447
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
448 449 450
    to_delete.push_back(m);
  }
  current->Unref();
451 452 453
  if (cfd->Unref()) {
    delete cfd;
  }
454 455
}

456 457 458
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
                        MemTableListVersion* new_imm, Version* new_current) {
  cfd = new_cfd;
459 460 461
  mem = new_mem;
  imm = new_imm;
  current = new_current;
462
  cfd->Ref();
463 464 465 466 467 468
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

469 470
namespace {
void SuperVersionUnrefHandle(void* ptr) {
471 472 473 474
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
475
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
476 477 478 479 480 481 482
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
483 484 485
}
}  // anonymous namespace

486 487 488 489 490 491 492 493 494 495 496
std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
  std::vector<std::string> paths;
  paths.reserve(ioptions_.cf_paths.size());
  for (const DbPath& db_path : ioptions_.cf_paths) {
    paths.emplace_back(db_path.path);
  }
  return paths;
}

const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;

497 498
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
499
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
500
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
501
    const FileOptions& file_options, ColumnFamilySet* column_family_set,
502 503
    BlockCacheTracer* const block_cache_tracer,
    const std::shared_ptr<IOTracer>& io_tracer)
504 505
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
506
      dummy_versions_(_dummy_versions),
507
      current_(nullptr),
508
      refs_(0),
509
      initialized_(false),
510
      dropped_(false),
511
      internal_comparator_(cf_options.comparator),
512
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
513 514
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
515 516
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
517
      write_buffer_manager_(write_buffer_manager),
518
      mem_(nullptr),
519
      imm_(ioptions_.min_write_buffer_number_to_merge,
520 521
           ioptions_.max_write_buffer_number_to_maintain,
           ioptions_.max_write_buffer_size_to_maintain),
522 523
      super_version_(nullptr),
      super_version_number_(0),
524
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
525 526
      next_(nullptr),
      prev_(nullptr),
527
      log_number_(0),
Z
Zhongyi Xie 已提交
528
      flush_reason_(FlushReason::kOthers),
529
      column_family_set_(column_family_set),
530
      queued_for_flush_(false),
531
      queued_for_compaction_(false),
532
      prev_compaction_needed_bytes_(0),
533
      allow_2pc_(db_options.allow_2pc),
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
      last_memtable_id_(0),
      db_paths_registered_(false) {
  if (id_ != kDummyColumnFamilyDataId) {
    // TODO(cc): RegisterDbPaths can be expensive, considering moving it
    // outside of this constructor which might be called with db mutex held.
    // TODO(cc): considering using ioptions_.fs, currently some tests rely on
    // EnvWrapper, that's the main reason why we use env here.
    Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
    if (s.ok()) {
      db_paths_registered_ = true;
    } else {
      ROCKS_LOG_ERROR(
          ioptions_.info_log,
          "Failed to register data paths of column family (id: %d, name: %s)",
          id_, name_.c_str());
    }
  }
551 552
  Ref();

553
  // Convert user defined table properties collector factories to internal ones.
554
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
555

I
Igor Canadi 已提交
556 557
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
558
    internal_stats_.reset(
559
        new InternalStats(ioptions_.num_levels, db_options.env, this));
560
    table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
561
                                      block_cache_tracer, io_tracer));
562
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
563
      compaction_picker_.reset(
564
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
565 566 567 568
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
569
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
570
      compaction_picker_.reset(
571
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
572 573 574
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
575 576 577 578
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
579
#endif  // !ROCKSDB_LITE
580
    } else {
581 582 583 584
      ROCKS_LOG_ERROR(ioptions_.info_log,
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
585 586
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
587
    }
588

589
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
590 591 592
      ROCKS_LOG_INFO(ioptions_.info_log,
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
593
      initial_cf_options_.Dump(ioptions_.info_log);
594
    } else {
595
      ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
596
    }
597
  }
598

599
  RecalculateWriteStallConditions(mutable_cf_options_);
600
}
I
Igor Canadi 已提交
601

602
// DB mutex held
I
Igor Canadi 已提交
603
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
604
  assert(refs_.load(std::memory_order_relaxed) == 0);
605 606 607 608 609 610
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
611 612 613 614
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
615
    column_family_set_->RemoveColumnFamily(this);
616 617 618 619 620 621
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

622 623
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
624
  assert(!queued_for_flush_);
625
  assert(!queued_for_compaction_);
626
  assert(super_version_ == nullptr);
627

628 629
  if (dummy_versions_ != nullptr) {
    // List must be empty
630
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
T
Tamir Duberstein 已提交
631
    bool deleted __attribute__((__unused__));
632
    deleted = dummy_versions_->Unref();
633
    assert(deleted);
634
  }
635

636 637
  if (mem_ != nullptr) {
    delete mem_->Unref();
638
  }
639
  autovector<MemTable*> to_delete;
640
  imm_.current()->Unref(&to_delete);
641 642 643
  for (MemTable* m : to_delete) {
    delete m;
  }
644 645 646 647 648 649 650 651 652 653 654 655

  if (db_paths_registered_) {
    // TODO(cc): considering using ioptions_.fs, currently some tests rely on
    // EnvWrapper, that's the main reason why we use env here.
    Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
    if (!s.ok()) {
      ROCKS_LOG_ERROR(
          ioptions_.info_log,
          "Failed to unregister data paths of column family (id: %d, name: %s)",
          id_, name_.c_str());
    }
  }
656 657
}

658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687
bool ColumnFamilyData::UnrefAndTryDelete() {
  int old_refs = refs_.fetch_sub(1);
  assert(old_refs > 0);

  if (old_refs == 1) {
    assert(super_version_ == nullptr);
    delete this;
    return true;
  }

  if (old_refs == 2 && super_version_ != nullptr) {
    // Only the super_version_ holds me
    SuperVersion* sv = super_version_;
    super_version_ = nullptr;
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    sv->db_mutex->Unlock();
    local_sv_.reset();
    sv->db_mutex->Lock();

    if (sv->Unref()) {
      // May delete this ColumnFamilyData after calling Cleanup()
      sv->Cleanup();
      delete sv;
      return true;
    }
  }
  return false;
}

I
Igor Canadi 已提交
688 689 690 691 692 693 694 695 696 697
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

698
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
699
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
700 701
}

702 703 704 705
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
S
Siying Dong 已提交
706 707 708
    autovector<MemTable*> empty_list;
    auto imm_prep_log =
        imm()->PrecomputeMinLogContainingPrepSection(empty_list);
709 710 711 712 713 714 715 716 717 718 719 720 721 722
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
723 724 725 726
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
727 728

namespace {
S
Siying Dong 已提交
729
// If penalize_stop is true, we further reduce slowdown rate.
730
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
731 732
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
733
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
734
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
735

736
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
759 760 761 762
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
763
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
764 765 766 767 768
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
769 770 771
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
772 773 774 775 776 777 778 779
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
780 781 782
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
783
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
784
                                         kDecSlowdownRatio);
785 786 787 788 789 790 791
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
792 793 794 795 796 797

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

798 799 800 801
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

802 803
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
804

805 806
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
807 808 809 810 811 812
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

813 814
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
815
  // Or twice as compaction trigger, if it is smaller.
816 817 818 819
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
820 821
    // res fits in int
    return static_cast<int>(res);
822
  }
823
}
824 825
}  // namespace

826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
    const MutableCFOptions& mutable_cf_options) {
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
                 mutable_cf_options.max_write_buffer_number - 1) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

861
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
862
      const MutableCFOptions& mutable_cf_options) {
863
  auto write_stall_condition = WriteStallCondition::kNormal;
864
  if (current_ != nullptr) {
S
sdong 已提交
865
    auto* vstorage = current_->storage_info();
866
    auto write_controller = column_family_set_->write_controller_;
867 868
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
869

870 871 872 873 874 875
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
876 877 878
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

879 880
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
881
      write_controller_token_ = write_controller->GetStopToken();
882
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
883 884
      ROCKS_LOG_WARN(
          ioptions_.info_log,
885
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
886
          "(waiting for flush), max_write_buffer_number is set to %d",
887
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
888
          mutable_cf_options.max_write_buffer_number);
889 890
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
891
      write_controller_token_ = write_controller->GetStopToken();
892
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
893 894
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
895
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
896
      }
897 898 899
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
900 901
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
902 903
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
904
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
905 906
      ROCKS_LOG_WARN(
          ioptions_.info_log,
907 908
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
909
          name_.c_str(), compaction_needed_bytes);
910 911
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
912
      write_controller_token_ =
913
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
914
                     prev_compaction_needed_bytes_, was_stopped,
915
                     mutable_cf_options.disable_auto_compactions);
916
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
917 918
      ROCKS_LOG_WARN(
          ioptions_.info_log,
919 920 921 922 923 924
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
925 926
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
927 928 929
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
930
      write_controller_token_ =
931
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
932
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
933
                     mutable_cf_options.disable_auto_compactions);
934 935
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
936 937
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
938
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
939
      }
940 941 942 943 944
      ROCKS_LOG_WARN(ioptions_.info_log,
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
945 946
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
947 948 949 950 951 952 953 954 955 956 957
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

958
      write_controller_token_ =
959
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
960
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
961
                     mutable_cf_options.disable_auto_compactions);
962
      internal_stats_->AddCFStats(
963
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
964 965
      ROCKS_LOG_WARN(
          ioptions_.info_log,
966
          "[%s] Stalling writes because of estimated pending compaction "
967 968 969
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
970
    } else {
971
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
972 973 974 975 976 977
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
978
        ROCKS_LOG_INFO(
979
            ioptions_.info_log,
S
Siying Dong 已提交
980 981 982 983 984 985 986 987 988 989 990 991
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
992
          ROCKS_LOG_INFO(
993
              ioptions_.info_log,
S
Siying Dong 已提交
994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
1009 1010 1011 1012 1013 1014
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
1015
      }
1016
    }
1017
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
1018
  }
1019
  return write_stall_condition;
1020 1021
}

1022 1023
const FileOptions* ColumnFamilyData::soptions() const {
  return &(column_family_set_->file_options_);
L
Lei Jin 已提交
1024 1025
}

I
Igor Canadi 已提交
1026 1027 1028
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
1029

1030 1031 1032 1033
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

1034 1035 1036 1037
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

1038 1039 1040 1041
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  return current_->GetSstFilesSize();
}

1042
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
1043 1044
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
1045
                      write_buffer_manager_, earliest_seq, id_);
1046 1047 1048
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
1049
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1050 1051
  if (mem_ != nullptr) {
    delete mem_->Unref();
1052
  }
A
agiardullo 已提交
1053
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1054 1055 1056
  mem_->Ref();
}

1057
bool ColumnFamilyData::NeedsCompaction() const {
1058 1059
  return !mutable_cf_options_.disable_auto_compactions &&
         compaction_picker_->NeedsCompaction(current_->storage_info());
1060 1061
}

1062
Compaction* ColumnFamilyData::PickCompaction(
1063 1064
    const MutableCFOptions& mutable_options,
    const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
1065 1066 1067
  SequenceNumber earliest_mem_seqno =
      std::min(mem_->GetEarliestSequenceNumber(),
               imm_.current()->GetEarliestSequenceNumber(false));
S
sdong 已提交
1068
  auto* result = compaction_picker_->PickCompaction(
1069 1070
      GetName(), mutable_options, mutable_db_options, current_->storage_info(),
      log_buffer, earliest_mem_seqno);
S
sdong 已提交
1071 1072 1073
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
1074
  return result;
1075 1076
}

1077 1078 1079 1080 1081 1082 1083
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
    bool* overlap) {
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

1099
  auto read_seq = super_version->current->version_set()->LastSequence();
1100
  ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1101
  auto* active_range_del_iter =
1102 1103 1104 1105 1106 1107
      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
  range_del_agg.AddTombstones(
      std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
  super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
                                                 &range_del_agg);

1108
  Status status;
1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          !ParseInternalKey(memtable_iter->key(), &seek_result)) {
        status = Status::Corruption("DB have corrupted keys");
      }
    }
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

1136
const int ColumnFamilyData::kCompactAllLevels = -1;
1137
const int ColumnFamilyData::kCompactToBaseLevel = -2;
1138

1139
Compaction* ColumnFamilyData::CompactRange(
1140 1141
    const MutableCFOptions& mutable_cf_options,
    const MutableDBOptions& mutable_db_options, int input_level,
1142
    int output_level, const CompactRangeOptions& compact_range_options,
1143
    const InternalKey* begin, const InternalKey* end,
1144 1145
    InternalKey** compaction_end, bool* conflict,
    uint64_t max_file_num_to_ignore) {
S
sdong 已提交
1146
  auto* result = compaction_picker_->CompactRange(
1147 1148 1149
      GetName(), mutable_cf_options, mutable_db_options,
      current_->storage_info(), input_level, output_level,
      compact_range_options, begin, end, compaction_end, conflict,
1150
      max_file_num_to_ignore);
S
sdong 已提交
1151 1152 1153 1154
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
1155 1156
}

1157 1158
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
  SuperVersion* sv = GetThreadLocalSuperVersion(db);
I
Igor Canadi 已提交
1159 1160
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
1161 1162 1163 1164
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
1165
    sv->Unref();
1166 1167 1168 1169
  }
  return sv;
}

1170
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
1189
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1190 1191
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1192
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1193 1194 1195
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1196
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1197
      db->mutex()->Lock();
1198 1199 1200
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
1201 1202 1203 1204 1205 1206
      if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
        db->AddSuperVersionsToFreeQueue(sv);
        db->SchedulePurge();
      } else {
        sv_to_delete = sv;
      }
1207
    } else {
1208
      db->mutex()->Lock();
1209 1210
    }
    sv = super_version_->Ref();
1211
    db->mutex()->Unlock();
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1225
    // storage has not been altered and no Scrape has happened. The
1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1237 1238
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1239
  db_mutex->AssertHeld();
1240
  return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1241 1242
}

1243 1244
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1245
    const MutableCFOptions& mutable_cf_options) {
1246
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1247
  new_superversion->db_mutex = db_mutex;
1248
  new_superversion->mutable_cf_options = mutable_cf_options;
1249
  new_superversion->Init(this, mem_, imm_.current(), current_);
1250 1251 1252
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1253
  super_version_->version_number = super_version_number_;
1254 1255 1256 1257
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1258 1259 1260 1261 1262 1263
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1264 1265 1266 1267
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
      old_superversion->Cleanup();
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1279 1280
}

1281 1282
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1283
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1284 1285
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1286 1287 1288
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1289
    auto sv = static_cast<SuperVersion*>(ptr);
1290 1291 1292 1293 1294 1295
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1296 1297 1298
  }
}

1299 1300 1301 1302 1303 1304 1305
Status ColumnFamilyData::ValidateOptions(
    const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
  Status s;
  s = CheckCompressionSupported(cf_options);
  if (s.ok() && db_options.allow_concurrent_memtable_write) {
    s = CheckConcurrentWritesSupported(cf_options);
  }
1306 1307 1308 1309 1310
  if (s.ok() && db_options.unordered_write &&
      cf_options.max_successive_merges != 0) {
    s = Status::InvalidArgument(
        "max_successive_merges > 0 is incompatible with unordered_write");
  }
1311 1312 1313 1314 1315 1316 1317
  if (s.ok()) {
    s = CheckCFPathsSupported(db_options, cf_options);
  }
  if (!s.ok()) {
    return s;
  }

1318
  if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1319 1320
    if (!cf_options.table_factory->IsInstanceOf(
            TableFactory::kBlockBasedTableName())) {
1321 1322 1323 1324 1325
      return Status::NotSupported(
          "TTL is only supported in Block-Based Table format. ");
    }
  }

1326
  if (cf_options.periodic_compaction_seconds > 0 &&
1327
      cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1328 1329
    if (!cf_options.table_factory->IsInstanceOf(
            TableFactory::kBlockBasedTableName())) {
1330 1331 1332 1333 1334 1335 1336 1337
      return Status::NotSupported(
          "Periodic Compaction is only supported in "
          "Block-Based Table format. ");
    }
  }
  return s;
}

I
Igor Canadi 已提交
1338
#ifndef ROCKSDB_LITE
1339
Status ColumnFamilyData::SetOptions(
1340 1341
    const DBOptions& db_options,
    const std::unordered_map<std::string, std::string>& options_map) {
1342
  MutableCFOptions new_mutable_cf_options;
1343 1344 1345
  Status s =
      GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                   ioptions_.info_log, &new_mutable_cf_options);
1346 1347 1348 1349 1350
  if (s.ok()) {
    ColumnFamilyOptions cf_options =
        BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
    s = ValidateOptions(db_options, cf_options);
  }
1351
  if (s.ok()) {
1352
    mutable_cf_options_ = new_mutable_cf_options;
1353
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1354
  }
1355
  return s;
1356
}
I
Igor Canadi 已提交
1357
#endif  // ROCKSDB_LITE
1358

S
Stream  
Shaohua Li 已提交
1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
1372 1373 1374 1375
  } else if (level < base_level) {
    // There is no restriction which prevents level passed in to be smaller
    // than base_level.
    return Env::WLTH_MEDIUM;
S
Stream  
Shaohua Li 已提交
1376 1377 1378 1379 1380
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

1381
Status ColumnFamilyData::AddDirectories(
1382
    std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
1383
  Status s;
1384
  assert(created_dirs != nullptr);
1385 1386
  assert(data_dirs_.empty());
  for (auto& p : ioptions_.cf_paths) {
1387 1388 1389
    auto existing_dir = created_dirs->find(p.path);

    if (existing_dir == created_dirs->end()) {
1390 1391
      std::unique_ptr<FSDirectory> path_directory;
      s = DBImpl::CreateAndNewDirectory(ioptions_.fs, p.path, &path_directory);
1392 1393 1394 1395 1396 1397 1398 1399
      if (!s.ok()) {
        return s;
      }
      assert(path_directory != nullptr);
      data_dirs_.emplace_back(path_directory.release());
      (*created_dirs)[p.path] = data_dirs_.back();
    } else {
      data_dirs_.emplace_back(existing_dir->second);
1400 1401 1402 1403 1404 1405
    }
  }
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
  return s;
}

1406
FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1407 1408 1409 1410 1411 1412 1413 1414
  if (data_dirs_.empty()) {
    return nullptr;
  }

  assert(path_id < data_dirs_.size());
  return data_dirs_[path_id].get();
}

I
Igor Canadi 已提交
1415
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1416
                                 const ImmutableDBOptions* db_options,
1417
                                 const FileOptions& file_options,
1418
                                 Cache* table_cache,
1419 1420
                                 WriteBufferManager* _write_buffer_manager,
                                 WriteController* _write_controller,
1421 1422
                                 BlockCacheTracer* const block_cache_tracer,
                                 const std::shared_ptr<IOTracer>& io_tracer)
1423
    : max_column_family_(0),
1424
      dummy_cfd_(new ColumnFamilyData(
1425 1426
          ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
          nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
1427
          block_cache_tracer, io_tracer)),
I
Igor Canadi 已提交
1428
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1429 1430
      db_name_(dbname),
      db_options_(db_options),
1431
      file_options_(file_options),
1432
      table_cache_(table_cache),
1433 1434
      write_buffer_manager_(_write_buffer_manager),
      write_controller_(_write_controller),
1435 1436
      block_cache_tracer_(block_cache_tracer),
      io_tracer_(io_tracer) {
1437
  // initialize linked list
1438 1439
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1440
}
I
Igor Canadi 已提交
1441 1442

ColumnFamilySet::~ColumnFamilySet() {
1443 1444 1445
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1446
    bool last_ref __attribute__((__unused__));
1447
    last_ref = cfd->UnrefAndTryDelete();
1448
    assert(last_ref);
I
Igor Canadi 已提交
1449
  }
1450
  bool dummy_last_ref __attribute__((__unused__));
1451
  dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1452
  assert(dummy_last_ref);
I
Igor Canadi 已提交
1453 1454 1455
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1456 1457
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1469 1470 1471
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1472 1473 1474 1475 1476
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1477 1478
    return nullptr;
  }
I
Igor Canadi 已提交
1479 1480 1481 1482 1483 1484
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1485 1486 1487 1488 1489 1490
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1491 1492 1493 1494
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1495
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1496 1497 1498 1499
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1500 1501
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1502
      *db_options_, file_options_, this, block_cache_tracer_, io_tracer_);
1503
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1504 1505
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1506
  // add to linked list
1507 1508 1509 1510 1511
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1512 1513 1514
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1515 1516 1517
  return new_cfd;
}

1518 1519 1520 1521
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
1522
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1523 1524 1525 1526 1527 1528 1529 1530 1531
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
1532
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1533
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1534
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1535 1536
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1537
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1538 1539
}

I
Igor Canadi 已提交
1540
// under a DB mutex OR from a write thread
1541
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1542 1543 1544 1545 1546 1547
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1548
  handle_.SetCFD(current_);
1549 1550
  return current_ != nullptr;
}
1551

1552 1553 1554 1555 1556 1557 1558 1559 1560 1561
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1562
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1563
  assert(current_ != nullptr);
1564
  return &handle_;
1565 1566
}

1567 1568 1569
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
1570
    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1571 1572 1573 1574 1575
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1576 1577 1578
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1579
    return column_family->GetComparator();
1580 1581 1582 1583
  }
  return nullptr;
}

1584
}  // namespace ROCKSDB_NAMESPACE