column_family.cc 61.1 KB
Newer Older
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
S
Siying Dong 已提交
2 3 4
//  This source code is licensed under both the GPLv2 (found in the
//  COPYING file in the root directory) and Apache 2.0 License
//  (found in the LICENSE.Apache file in the root directory).
5 6 7 8 9
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12

#include <algorithm>
13
#include <cinttypes>
I
Igor Canadi 已提交
14
#include <limits>
15
#include <sstream>
16 17
#include <string>
#include <vector>
18

19
#include "db/blob/blob_file_cache.h"
20 21 22 23 24
#include "db/compaction/compaction_picker.h"
#include "db/compaction/compaction_picker_fifo.h"
#include "db/compaction/compaction_picker_level.h"
#include "db/compaction/compaction_picker_universal.h"
#include "db/db_impl/db_impl.h"
25
#include "db/internal_stats.h"
26
#include "db/job_context.h"
27
#include "db/range_del_aggregator.h"
28
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
29
#include "db/version_set.h"
30
#include "db/write_controller.h"
31
#include "file/sst_file_manager_impl.h"
32
#include "logging/logging.h"
33 34
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
35
#include "port/port.h"
36
#include "rocksdb/convenience.h"
37
#include "rocksdb/table.h"
38
#include "table/merging_iterator.h"
39
#include "util/autovector.h"
40
#include "util/cast_util.h"
41
#include "util/compression.h"
I
Igor Canadi 已提交
42

43
namespace ROCKSDB_NAMESPACE {
I
Igor Canadi 已提交
44

I
Igor Canadi 已提交
45
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
46
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
47
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
48 49 50 51 52 53 54
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
A
Aaron Gao 已提交
55 56 57 58 59
#ifndef ROCKSDB_LITE
    for (auto& listener : cfd_->ioptions()->listeners) {
      listener->OnColumnFamilyHandleDeletionStarted(this);
    }
#endif  // ROCKSDB_LITE
60 61
    // Job id == 0 means that this is not our background process, but rather
    // user thread
62 63 64
    // Need to hold some shared pointers owned by the initial_cf_options
    // before final cleaning up finishes.
    ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
65
    JobContext job_context(0);
66
    mutex_->Lock();
67 68
    bool dropped = cfd_->IsDropped();
    if (cfd_->UnrefAndTryDelete()) {
69 70 71
      if (dropped) {
        db_->FindObsoleteFiles(&job_context, false, true);
      }
72 73
    }
    mutex_->Unlock();
I
Igor Canadi 已提交
74
    if (job_context.HaveSomethingToDelete()) {
75 76 77
      bool defer_purge =
          db_->immutable_db_options().avoid_unnecessary_blocking_io;
      db_->PurgeObsoleteFiles(job_context, defer_purge);
78
    }
Y
Yueh-Hsuan Chiang 已提交
79
    job_context.Clean();
80 81 82
  }
}

83 84
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

85 86 87 88
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

89 90 91 92
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
#ifndef ROCKSDB_LITE
  // accessing mutable cf-options requires db mutex.
  InstrumentedMutexLock l(mutex_);
Y
Yi Wu 已提交
93
  *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
94 95
  return Status::OK();
#else
96
  (void)desc;
97 98 99 100
  return Status::NotSupported();
#endif  // !ROCKSDB_LITE
}

101
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
102 103 104
  return cfd()->user_comparator();
}

105
void GetIntTblPropCollectorFactory(
106
    const ImmutableCFOptions& ioptions,
107 108 109
    IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
  assert(int_tbl_prop_collector_factories);

110 111
  auto& collector_factories = ioptions.table_properties_collector_factories;
  for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112 113 114 115 116 117 118
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
}

119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
138
  if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139
    if (!ZSTD_TrainDictionarySupported()) {
140
      return Status::InvalidArgument(
141 142
          "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
          "is not linked with the binary.");
143 144 145 146 147 148 149
    }
    if (cf_options.compression_opts.max_dict_bytes == 0) {
      return Status::InvalidArgument(
          "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
          "should be nonzero if we're using zstd's dictionary generator.");
    }
  }
150 151 152 153 154 155 156 157 158 159

  if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
    std::ostringstream oss;
    oss << "The specified blob compression type "
        << CompressionTypeToString(cf_options.blob_compression_type)
        << " is not available.";

    return Status::InvalidArgument(oss.str());
  }

160 161 162
  return Status::OK();
}

163 164 165 166 167 168
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
  if (cf_options.inplace_update_support) {
    return Status::InvalidArgument(
        "In-place memtable updates (inplace_update_support) is not compatible "
        "with concurrent writes (allow_concurrent_memtable_write)");
  }
169 170 171 172
  if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
    return Status::InvalidArgument(
        "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
  }
173 174 175
  return Status::OK();
}

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
Status CheckCFPathsSupported(const DBOptions& db_options,
                             const ColumnFamilyOptions& cf_options) {
  // More than one cf_paths are supported only in universal
  // and level compaction styles. This function also checks the case
  // in which cf_paths is not specified, which results in db_paths
  // being used.
  if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
      (cf_options.compaction_style != kCompactionStyleLevel)) {
    if (cf_options.cf_paths.size() > 1) {
      return Status::NotSupported(
          "More than one CF paths are only supported in "
          "universal and level compaction styles. ");
    } else if (cf_options.cf_paths.empty() &&
               db_options.db_paths.size() > 1) {
      return Status::NotSupported(
          "More than one DB paths are only supported in "
          "universal and level compaction styles. ");
    }
  }
  return Status::OK();
}

198 199 200
namespace {
const uint64_t kDefaultTtl = 0xfffffffffffffffe;
const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
201
}  // namespace
202

203
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
204
                                    const ColumnFamilyOptions& src) {
205
  ColumnFamilyOptions result = src;
206 207
  size_t clamp_max = std::conditional<
      sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
208
      std::integral_constant<uint64_t, 64ull << 30>>::type::value;
209 210
  ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10,
              clamp_max);
211 212 213
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
214 215
    result.arena_block_size =
        std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
A
agiardullo 已提交
216 217 218 219 220

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
221 222 223 224
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
225 226 227 228
  if (result.min_write_buffer_number_to_merge < 1) {
    result.min_write_buffer_number_to_merge = 1;
  }

229 230 231 232 233 234 235
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
236 237 238 239 240 241

  if (result.compaction_style == kCompactionStyleUniversal &&
      db_options.allow_ingest_behind && result.num_levels < 3) {
    result.num_levels = 3;
  }

242 243 244
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
245 246 247 248 249 250 251 252
  // fall back max_write_buffer_number_to_maintain if
  // max_write_buffer_size_to_maintain is not set
  if (result.max_write_buffer_size_to_maintain < 0) {
    result.max_write_buffer_size_to_maintain =
        result.max_write_buffer_number *
        static_cast<int64_t>(result.write_buffer_size);
  } else if (result.max_write_buffer_size_to_maintain == 0 &&
             result.max_write_buffer_number_to_maintain < 0) {
253 254
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
255 256 257 258 259 260
  // bloom filter size shouldn't exceed 1/4 of memtable size.
  if (result.memtable_prefix_bloom_size_ratio > 0.25) {
    result.memtable_prefix_bloom_size_ratio = 0.25;
  } else if (result.memtable_prefix_bloom_size_ratio < 0) {
    result.memtable_prefix_bloom_size_ratio = 0;
  }
261

262 263 264 265 266
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
267 268 269 270
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
271 272 273 274 275 276 277 278
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

279 280 281 282
  if (result.max_bytes_for_level_multiplier <= 0) {
    result.max_bytes_for_level_multiplier = 1;
  }

283
  if (result.level0_file_num_compaction_trigger == 0) {
284
    ROCKS_LOG_WARN(db_options.logger,
285
                   "level0_file_num_compaction_trigger cannot be 0");
286 287 288
    result.level0_file_num_compaction_trigger = 1;
  }

289 290 291 292
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
293
    ROCKS_LOG_WARN(db_options.logger,
294 295 296 297 298 299 300
                   "This condition must be satisfied: "
                   "level0_stop_writes_trigger(%d) >= "
                   "level0_slowdown_writes_trigger(%d) >= "
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
301 302 303 304 305 306 307 308 309
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
310
    ROCKS_LOG_WARN(db_options.logger,
311 312 313 314 315 316 317
                   "Adjust the value to "
                   "level0_stop_writes_trigger(%d)"
                   "level0_slowdown_writes_trigger(%d)"
                   "level0_file_num_compaction_trigger(%d)",
                   result.level0_stop_writes_trigger,
                   result.level0_slowdown_writes_trigger,
                   result.level0_file_num_compaction_trigger);
318
  }
319 320 321 322 323 324 325 326 327 328 329

  if (result.soft_pending_compaction_bytes_limit == 0) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  } else if (result.hard_pending_compaction_bytes_limit > 0 &&
             result.soft_pending_compaction_bytes_limit >
                 result.hard_pending_compaction_bytes_limit) {
    result.soft_pending_compaction_bytes_limit =
        result.hard_pending_compaction_bytes_limit;
  }

330 331 332 333 334 335 336
#ifndef ROCKSDB_LITE
  // When the DB is stopped, it's possible that there are some .trash files that
  // were not deleted yet, when we open the DB we will find these .trash files
  // and schedule them to be deleted (or delete immediately if SstFileManager
  // was not used)
  auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
  for (size_t i = 0; i < result.cf_paths.size(); i++) {
M
mrambacher 已提交
337 338 339
    DeleteScheduler::CleanupDirectory(db_options.env, sfm,
                                      result.cf_paths[i].path)
        .PermitUncheckedError();
340 341 342 343 344 345 346
  }
#endif

  if (result.cf_paths.empty()) {
    result.cf_paths = db_options.db_paths;
  }

347
  if (result.level_compaction_dynamic_level_bytes) {
348 349 350 351 352 353 354 355 356 357 358 359
    if (result.compaction_style != kCompactionStyleLevel) {
      ROCKS_LOG_WARN(db_options.info_log.get(),
                     "level_compaction_dynamic_level_bytes only makes sense"
                     "for level-based compaction");
      result.level_compaction_dynamic_level_bytes = false;
    } else if (result.cf_paths.size() > 1U) {
      // we don't yet know how to make both of this feature and multiple
      // DB path work.
      ROCKS_LOG_WARN(db_options.info_log.get(),
                     "multiple cf_paths/db_paths and"
                     "level_compaction_dynamic_level_bytes"
                     "can't be used together");
360 361 362
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
363

364 365 366 367
  if (result.max_compaction_bytes == 0) {
    result.max_compaction_bytes = result.target_file_size_base * 25;
  }

368 369
  bool is_block_based_table = (result.table_factory->IsInstanceOf(
      TableFactory::kBlockBasedTableName()));
370 371 372 373 374 375 376 377 378 379 380 381

  const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
  if (result.ttl == kDefaultTtl) {
    if (is_block_based_table &&
        result.compaction_style != kCompactionStyleFIFO) {
      result.ttl = kAdjustedTtl;
    } else {
      result.ttl = 0;
    }
  }

  const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
382

383 384 385
  // Turn on periodic compactions and set them to occur once every 30 days if
  // compaction filters are used and periodic_compaction_seconds is set to the
  // default value.
386 387 388
  if (result.compaction_style != kCompactionStyleFIFO) {
    if ((result.compaction_filter != nullptr ||
         result.compaction_filter_factory != nullptr) &&
389 390 391
        result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
        is_block_based_table) {
      result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
392 393 394
    }
  } else {
    // result.compaction_style == kCompactionStyleFIFO
395
    if (result.ttl == 0) {
396 397 398 399 400
      if (is_block_based_table) {
        if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
          result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
        }
        result.ttl = result.periodic_compaction_seconds;
401 402 403 404
      }
    } else if (result.periodic_compaction_seconds != 0) {
      result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
    }
405 406
  }

407 408 409 410 411 412 413 414 415 416 417 418
  // TTL compactions would work similar to Periodic Compactions in Universal in
  // most of the cases. So, if ttl is set, execute the periodic compaction
  // codepath.
  if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
    if (result.periodic_compaction_seconds != 0) {
      result.periodic_compaction_seconds =
          std::min(result.ttl, result.periodic_compaction_seconds);
    } else {
      result.periodic_compaction_seconds = result.ttl;
    }
  }

419 420 421 422
  if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
    result.periodic_compaction_seconds = 0;
  }

423 424 425
  return result;
}

426 427 428
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
429

430 431 432 433 434 435 436 437 438 439 440 441 442
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
443
  uint32_t previous_refs = refs.fetch_sub(1);
444 445
  assert(previous_refs > 0);
  return previous_refs == 1;
446 447
}

448
void SuperVersion::Cleanup() {
449
  assert(refs.load(std::memory_order_relaxed) == 0);
450 451 452
  // Since this SuperVersion object is being deleted,
  // decrement reference to the immutable MemtableList
  // this SV object was pointing to.
453 454 455
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
456 457 458
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
459 460 461
    to_delete.push_back(m);
  }
  current->Unref();
462
  cfd->UnrefAndTryDelete();
463 464
}

465 466 467
void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
                        MemTableListVersion* new_imm, Version* new_current) {
  cfd = new_cfd;
468 469 470
  mem = new_mem;
  imm = new_imm;
  current = new_current;
471
  cfd->Ref();
472 473 474 475 476 477
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

478 479
namespace {
void SuperVersionUnrefHandle(void* ptr) {
480 481 482 483
  // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
  // destroyed. When the former happens, the thread shouldn't see kSVInUse.
  // When the latter happens, only super_version_ holds a reference
  // to ColumnFamilyData, so no further queries are possible.
484
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
485 486 487 488 489 490 491
  bool was_last_ref __attribute__((__unused__));
  was_last_ref = sv->Unref();
  // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
  // This is important because we can't do SuperVersion cleanup here.
  // That would require locking DB mutex, which would deadlock because
  // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
  assert(!was_last_ref);
492 493 494
}
}  // anonymous namespace

495 496 497 498 499 500 501 502 503 504 505
std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
  std::vector<std::string> paths;
  paths.reserve(ioptions_.cf_paths.size());
  for (const DbPath& db_path : ioptions_.cf_paths) {
    paths.emplace_back(db_path.path);
  }
  return paths;
}

const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;

506 507
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
508
    Cache* _table_cache, WriteBufferManager* write_buffer_manager,
509
    const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
510
    const FileOptions* file_options, ColumnFamilySet* column_family_set,
511
    BlockCacheTracer* const block_cache_tracer,
512 513
    const std::shared_ptr<IOTracer>& io_tracer,
    const std::string& db_session_id)
514 515
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
516
      dummy_versions_(_dummy_versions),
517
      current_(nullptr),
518
      refs_(0),
519
      initialized_(false),
520
      dropped_(false),
521
      internal_comparator_(cf_options.comparator),
522
      initial_cf_options_(SanitizeOptions(db_options, cf_options)),
523 524
      ioptions_(db_options, initial_cf_options_),
      mutable_cf_options_(initial_cf_options_),
525 526
      is_delete_range_supported_(
          cf_options.table_factory->IsDeleteRangeSupported()),
527
      write_buffer_manager_(write_buffer_manager),
528
      mem_(nullptr),
529
      imm_(ioptions_.min_write_buffer_number_to_merge,
530 531
           ioptions_.max_write_buffer_number_to_maintain,
           ioptions_.max_write_buffer_size_to_maintain),
532 533
      super_version_(nullptr),
      super_version_number_(0),
534
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
535 536
      next_(nullptr),
      prev_(nullptr),
537
      log_number_(0),
Z
Zhongyi Xie 已提交
538
      flush_reason_(FlushReason::kOthers),
539
      column_family_set_(column_family_set),
540
      queued_for_flush_(false),
541
      queued_for_compaction_(false),
542
      prev_compaction_needed_bytes_(0),
543
      allow_2pc_(db_options.allow_2pc),
544 545 546 547 548 549 550 551 552 553 554 555
      last_memtable_id_(0),
      db_paths_registered_(false) {
  if (id_ != kDummyColumnFamilyDataId) {
    // TODO(cc): RegisterDbPaths can be expensive, considering moving it
    // outside of this constructor which might be called with db mutex held.
    // TODO(cc): considering using ioptions_.fs, currently some tests rely on
    // EnvWrapper, that's the main reason why we use env here.
    Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
    if (s.ok()) {
      db_paths_registered_ = true;
    } else {
      ROCKS_LOG_ERROR(
556
          ioptions_.logger,
557 558 559 560
          "Failed to register data paths of column family (id: %d, name: %s)",
          id_, name_.c_str());
    }
  }
561 562
  Ref();

563
  // Convert user defined table properties collector factories to internal ones.
564
  GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
565

I
Igor Canadi 已提交
566 567
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
568 569
    internal_stats_.reset(
        new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
570
    table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
571 572
                                      block_cache_tracer, io_tracer,
                                      db_session_id));
573 574
    blob_file_cache_.reset(
        new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
575
                          internal_stats_->GetBlobFileReadHist(), io_tracer));
576

577
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
578
      compaction_picker_.reset(
579
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
580 581 582 583
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
584
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
585
      compaction_picker_.reset(
586
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
587 588 589
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
590
      ROCKS_LOG_WARN(ioptions_.logger,
591 592 593
                     "Column family %s does not use any background compaction. "
                     "Compactions can only be done via CompactFiles\n",
                     GetName().c_str());
594
#endif  // !ROCKSDB_LITE
595
    } else {
596
      ROCKS_LOG_ERROR(ioptions_.logger,
597 598 599
                      "Unable to recognize the specified compaction style %d. "
                      "Column family %s will use kCompactionStyleLevel.\n",
                      ioptions_.compaction_style, GetName().c_str());
600 601
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
602
    }
603

604
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
605
      ROCKS_LOG_INFO(ioptions_.logger,
606 607
                     "--------------- Options for column family [%s]:\n",
                     name.c_str());
608
      initial_cf_options_.Dump(ioptions_.logger);
609
    } else {
610
      ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
611
    }
612
  }
613

614
  RecalculateWriteStallConditions(mutable_cf_options_);
615
}
I
Igor Canadi 已提交
616

617
// DB mutex held
I
Igor Canadi 已提交
618
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
619
  assert(refs_.load(std::memory_order_relaxed) == 0);
620 621 622 623 624 625
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
626 627 628 629
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
630
    column_family_set_->RemoveColumnFamily(this);
631 632 633 634 635 636
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

637 638
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
639
  assert(!queued_for_flush_);
640
  assert(!queued_for_compaction_);
641
  assert(super_version_ == nullptr);
642

643 644
  if (dummy_versions_ != nullptr) {
    // List must be empty
645
    assert(dummy_versions_->Next() == dummy_versions_);
T
Tamir Duberstein 已提交
646
    bool deleted __attribute__((__unused__));
647
    deleted = dummy_versions_->Unref();
648
    assert(deleted);
649
  }
650

651 652
  if (mem_ != nullptr) {
    delete mem_->Unref();
653
  }
654
  autovector<MemTable*> to_delete;
655
  imm_.current()->Unref(&to_delete);
656 657 658
  for (MemTable* m : to_delete) {
    delete m;
  }
659 660 661 662 663 664 665

  if (db_paths_registered_) {
    // TODO(cc): considering using ioptions_.fs, currently some tests rely on
    // EnvWrapper, that's the main reason why we use env here.
    Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
    if (!s.ok()) {
      ROCKS_LOG_ERROR(
666
          ioptions_.logger,
667 668 669 670
          "Failed to unregister data paths of column family (id: %d, name: %s)",
          id_, name_.c_str());
    }
  }
671 672
}

673
bool ColumnFamilyData::UnrefAndTryDelete() {
674 675 676 677 678 679 680 681 682
  int old_refs = refs_.fetch_sub(1);
  assert(old_refs > 0);

  if (old_refs == 1) {
    assert(super_version_ == nullptr);
    delete this;
    return true;
  }

683
  if (old_refs == 2 && super_version_ != nullptr) {
684 685 686
    // Only the super_version_ holds me
    SuperVersion* sv = super_version_;
    super_version_ = nullptr;
687 688

    // Release SuperVersion references kept in ThreadLocalPtr.
689 690 691
    local_sv_.reset();

    if (sv->Unref()) {
692 693
      // Note: sv will delete this ColumnFamilyData during Cleanup()
      assert(sv->cfd == this);
694 695 696 697 698 699 700 701
      sv->Cleanup();
      delete sv;
      return true;
    }
  }
  return false;
}

I
Igor Canadi 已提交
702 703 704 705 706 707 708 709 710 711
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

712
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
713
  return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
714 715
}

716 717 718 719
uint64_t ColumnFamilyData::OldestLogToKeep() {
  auto current_log = GetLogNumber();

  if (allow_2pc_) {
720
    auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
721 722 723 724 725 726 727 728 729 730 731 732 733 734
    auto mem_prep_log = mem()->GetMinLogContainingPrepSection();

    if (imm_prep_log > 0 && imm_prep_log < current_log) {
      current_log = imm_prep_log;
    }

    if (mem_prep_log > 0 && mem_prep_log < current_log) {
      current_log = mem_prep_log;
    }
  }

  return current_log;
}

S
Siying Dong 已提交
735 736 737 738
const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
739 740

namespace {
S
Siying Dong 已提交
741
// If penalize_stop is true, we further reduce slowdown rate.
742
std::unique_ptr<WriteControllerToken> SetupDelay(
S
Siying Dong 已提交
743 744
    WriteController* write_controller, uint64_t compaction_needed_bytes,
    uint64_t prev_compaction_need_bytes, bool penalize_stop,
745
    bool auto_comapctions_disabled) {
S
Siying Dong 已提交
746
  const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
747

748
  uint64_t max_write_rate = write_controller->max_delayed_write_rate();
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770
  uint64_t write_rate = write_controller->delayed_write_rate();

  if (auto_comapctions_disabled) {
    // When auto compaction is disabled, always use the value user gave.
    write_rate = max_write_rate;
  } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
    // If user gives rate less than kMinWriteRate, don't adjust it.
    //
    // If already delayed, need to adjust based on previous compaction debt.
    // When there are two or more column families require delay, we always
    // increase or reduce write rate based on information for one single
    // column family. It is likely to be OK but we can improve if there is a
    // problem.
    // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
    // is only available in level-based compaction
    //
    // If the compaction debt stays the same as previously, we also further slow
    // down. It usually means a mem table is full. It's mainly for the case
    // where both of flush and compaction are much slower than the speed we
    // insert to mem tables, so we need to actively slow down before we get
    // feedback signal from compaction and flushes to avoid the full stop
    // because of hitting the max write buffer number.
S
Siying Dong 已提交
771 772 773 774
    //
    // If DB just falled into the stop condition, we need to further reduce
    // the write rate to avoid the stop condition.
    if (penalize_stop) {
H
hyunwoo 已提交
775
      // Penalize the near stop or stop condition by more aggressive slowdown.
S
Siying Dong 已提交
776 777 778 779 780
      // This is to provide the long term slowdown increase signal.
      // The penalty is more than the reward of recovering to the normal
      // condition.
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kNearStopSlowdownRatio);
781 782 783
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
S
Siying Dong 已提交
784 785 786 787 788 789 790 791
    } else if (prev_compaction_need_bytes > 0 &&
               prev_compaction_need_bytes <= compaction_needed_bytes) {
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
                                         kIncSlowdownRatio);
      if (write_rate < kMinWriteRate) {
        write_rate = kMinWriteRate;
      }
    } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
792 793 794
      // We are speeding up by ratio of kSlowdownRatio when we have paid
      // compaction debt. But we'll never speed up to faster than the write rate
      // given by users.
S
Siying Dong 已提交
795
      write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
S
Siying Dong 已提交
796
                                         kDecSlowdownRatio);
797 798 799 800 801 802 803
      if (write_rate > max_write_rate) {
        write_rate = max_write_rate;
      }
    }
  }
  return write_controller->GetDelayToken(write_rate);
}
804 805 806 807 808 809

int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
                                    int level0_slowdown_writes_trigger) {
  // SanitizeOptions() ensures it.
  assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

810 811 812 813
  if (level0_file_num_compaction_trigger < 0) {
    return std::numeric_limits<int>::max();
  }

814 815
  const int64_t twice_level0_trigger =
      static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
816

817 818
  const int64_t one_fourth_trigger_slowdown =
      static_cast<int64_t>(level0_file_num_compaction_trigger) +
819 820 821 822 823 824
      ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
       4);

  assert(twice_level0_trigger >= 0);
  assert(one_fourth_trigger_slowdown >= 0);

825 826
  // 1/4 of the way between L0 compaction trigger threshold and slowdown
  // condition.
827
  // Or twice as compaction trigger, if it is smaller.
828 829 830 831
  int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
  if (res >= port::kMaxInt32) {
    return port::kMaxInt32;
  } else {
832 833
    // res fits in int
    return static_cast<int>(res);
834
  }
835
}
836 837
}  // namespace

838 839 840 841
std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
ColumnFamilyData::GetWriteStallConditionAndCause(
    int num_unflushed_memtables, int num_l0_files,
    uint64_t num_compaction_needed_bytes,
F
fanrui03 已提交
842 843
    const MutableCFOptions& mutable_cf_options,
    const ImmutableCFOptions& immutable_cf_options) {
844 845 846 847 848 849 850 851 852 853 854 855 856
  if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
    return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
    return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.hard_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kStopped,
            WriteStallCause::kPendingCompactionBytes};
  } else if (mutable_cf_options.max_write_buffer_number > 3 &&
             num_unflushed_memtables >=
F
fanrui03 已提交
857 858 859
                 mutable_cf_options.max_write_buffer_number - 1 &&
             num_unflushed_memtables - 1 >=
                 immutable_cf_options.min_write_buffer_number_to_merge) {
860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875
    return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
             num_l0_files >=
                 mutable_cf_options.level0_slowdown_writes_trigger) {
    return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
  } else if (!mutable_cf_options.disable_auto_compactions &&
             mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
             num_compaction_needed_bytes >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit) {
    return {WriteStallCondition::kDelayed,
            WriteStallCause::kPendingCompactionBytes};
  }
  return {WriteStallCondition::kNormal, WriteStallCause::kNone};
}

876
WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
877
      const MutableCFOptions& mutable_cf_options) {
878
  auto write_stall_condition = WriteStallCondition::kNormal;
879
  if (current_ != nullptr) {
S
sdong 已提交
880
    auto* vstorage = current_->storage_info();
881
    auto write_controller = column_family_set_->write_controller_;
882 883
    uint64_t compaction_needed_bytes =
        vstorage->estimated_compaction_needed_bytes();
884

885 886
    auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
        imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
F
fanrui03 已提交
887 888
        vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
        *ioptions());
889 890 891
    write_stall_condition = write_stall_condition_and_cause.first;
    auto write_stall_cause = write_stall_condition_and_cause.second;

S
Siying Dong 已提交
892 893 894
    bool was_stopped = write_controller->IsStopped();
    bool needed_delay = write_controller->NeedsDelay();

895 896
    if (write_stall_condition == WriteStallCondition::kStopped &&
        write_stall_cause == WriteStallCause::kMemtableLimit) {
897
      write_controller_token_ = write_controller->GetStopToken();
898
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
899
      ROCKS_LOG_WARN(
900
          ioptions_.logger,
901
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
902
          "(waiting for flush), max_write_buffer_number is set to %d",
903
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
904
          mutable_cf_options.max_write_buffer_number);
905 906
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
907
      write_controller_token_ = write_controller->GetStopToken();
908
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
909 910
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
911
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
912
      }
913
      ROCKS_LOG_WARN(ioptions_.logger,
914 915
                     "[%s] Stopping writes because we have %d level-0 files",
                     name_.c_str(), vstorage->l0_delay_trigger_count());
916 917
    } else if (write_stall_condition == WriteStallCondition::kStopped &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
918 919
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
920
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
921
      ROCKS_LOG_WARN(
922
          ioptions_.logger,
923 924
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
925
          name_.c_str(), compaction_needed_bytes);
926 927
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kMemtableLimit) {
928
      write_controller_token_ =
929
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
930
                     prev_compaction_needed_bytes_, was_stopped,
931
                     mutable_cf_options.disable_auto_compactions);
932
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
933
      ROCKS_LOG_WARN(
934
          ioptions_.logger,
935 936 937 938 939 940
          "[%s] Stalling writes because we have %d immutable memtables "
          "(waiting for flush), max_write_buffer_number is set to %d "
          "rate %" PRIu64,
          name_.c_str(), imm()->NumNotFlushed(),
          mutable_cf_options.max_write_buffer_number,
          write_controller->delayed_write_rate());
941 942
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kL0FileCountLimit) {
S
Siying Dong 已提交
943 944 945
      // L0 is the last two files from stopping.
      bool near_stop = vstorage->l0_delay_trigger_count() >=
                       mutable_cf_options.level0_stop_writes_trigger - 2;
946
      write_controller_token_ =
947
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
948
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
949
                     mutable_cf_options.disable_auto_compactions);
950 951
      internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
                                  1);
952 953
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
954
            InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
955
      }
956
      ROCKS_LOG_WARN(ioptions_.logger,
957 958 959 960
                     "[%s] Stalling writes because we have %d level-0 files "
                     "rate %" PRIu64,
                     name_.c_str(), vstorage->l0_delay_trigger_count(),
                     write_controller->delayed_write_rate());
961 962
    } else if (write_stall_condition == WriteStallCondition::kDelayed &&
               write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
S
Siying Dong 已提交
963 964 965 966 967 968 969 970 971 972 973
      // If the distance to hard limit is less than 1/4 of the gap between soft
      // and
      // hard bytes limit, we think it is near stop and speed up the slowdown.
      bool near_stop =
          mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
          (compaction_needed_bytes -
           mutable_cf_options.soft_pending_compaction_bytes_limit) >
              3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
                   mutable_cf_options.soft_pending_compaction_bytes_limit) /
                  4;

974
      write_controller_token_ =
975
          SetupDelay(write_controller, compaction_needed_bytes,
S
Siying Dong 已提交
976
                     prev_compaction_needed_bytes_, was_stopped || near_stop,
977
                     mutable_cf_options.disable_auto_compactions);
978
      internal_stats_->AddCFStats(
979
          InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
980
      ROCKS_LOG_WARN(
981
          ioptions_.logger,
982
          "[%s] Stalling writes because of estimated pending compaction "
983 984 985
          "bytes %" PRIu64 " rate %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
          write_controller->delayed_write_rate());
S
Siying Dong 已提交
986
    } else {
987
      assert(write_stall_condition == WriteStallCondition::kNormal);
S
Siying Dong 已提交
988 989 990 991 992 993
      if (vstorage->l0_delay_trigger_count() >=
          GetL0ThresholdSpeedupCompaction(
              mutable_cf_options.level0_file_num_compaction_trigger,
              mutable_cf_options.level0_slowdown_writes_trigger)) {
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
994
        ROCKS_LOG_INFO(
995
            ioptions_.logger,
S
Siying Dong 已提交
996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
            "[%s] Increasing compaction threads because we have %d level-0 "
            "files ",
            name_.c_str(), vstorage->l0_delay_trigger_count());
      } else if (vstorage->estimated_compaction_needed_bytes() >=
                 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
        // Increase compaction threads if bytes needed for compaction exceeds
        // 1/4 of threshold for slowing down.
        // If soft pending compaction byte limit is not set, always speed up
        // compaction.
        write_controller_token_ =
            write_controller->GetCompactionPressureToken();
        if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
1008
          ROCKS_LOG_INFO(
1009
              ioptions_.logger,
S
Siying Dong 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
              "[%s] Increasing compaction threads because of estimated pending "
              "compaction "
              "bytes %" PRIu64,
              name_.c_str(), vstorage->estimated_compaction_needed_bytes());
        }
      } else {
        write_controller_token_.reset();
      }
      // If the DB recovers from delay conditions, we reward with reducing
      // double the slowdown ratio. This is to balance the long term slowdown
      // increase signal.
      if (needed_delay) {
        uint64_t write_rate = write_controller->delayed_write_rate();
        write_controller->set_delayed_write_rate(static_cast<uint64_t>(
            static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
1025 1026 1027 1028 1029 1030
        // Set the low pri limit to be 1/4 the delayed write rate.
        // Note we don't reset this value even after delay condition is relased.
        // Low-pri rate will continue to apply if there is a compaction
        // pressure.
        write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
                                                                    4);
1031
      }
1032
    }
1033
    prev_compaction_needed_bytes_ = compaction_needed_bytes;
1034
  }
1035
  return write_stall_condition;
1036 1037
}

1038 1039
const FileOptions* ColumnFamilyData::soptions() const {
  return &(column_family_set_->file_options_);
L
Lei Jin 已提交
1040 1041
}

I
Igor Canadi 已提交
1042 1043 1044
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
1045

1046 1047 1048 1049
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

1050 1051 1052 1053
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

1054 1055 1056 1057
uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
  return VersionSet::GetTotalBlobFileSize(dummy_versions_);
}

1058 1059 1060 1061
uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
  return current_->GetSstFilesSize();
}

1062
MemTable* ColumnFamilyData::ConstructNewMemtable(
1063
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
A
agiardullo 已提交
1064
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
1065
                      write_buffer_manager_, earliest_seq, id_);
1066 1067 1068
}

void ColumnFamilyData::CreateNewMemtable(
1069
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1070 1071
  if (mem_ != nullptr) {
    delete mem_->Unref();
1072
  }
1073
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1074 1075 1076
  mem_->Ref();
}

1077
bool ColumnFamilyData::NeedsCompaction() const {
1078 1079
  return !mutable_cf_options_.disable_auto_compactions &&
         compaction_picker_->NeedsCompaction(current_->storage_info());
1080 1081
}

1082
Compaction* ColumnFamilyData::PickCompaction(
1083 1084
    const MutableCFOptions& mutable_options,
    const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
1085 1086 1087
  SequenceNumber earliest_mem_seqno =
      std::min(mem_->GetEarliestSequenceNumber(),
               imm_.current()->GetEarliestSequenceNumber(false));
S
sdong 已提交
1088
  auto* result = compaction_picker_->PickCompaction(
1089 1090
      GetName(), mutable_options, mutable_db_options, current_->storage_info(),
      log_buffer, earliest_mem_seqno);
S
sdong 已提交
1091 1092 1093
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
1094
  return result;
1095 1096
}

1097 1098 1099 1100 1101 1102 1103
bool ColumnFamilyData::RangeOverlapWithCompaction(
    const Slice& smallest_user_key, const Slice& largest_user_key,
    int level) const {
  return compaction_picker_->RangeOverlapWithCompaction(
      smallest_user_key, largest_user_key, level);
}

1104 1105
Status ColumnFamilyData::RangesOverlapWithMemtables(
    const autovector<Range>& ranges, SuperVersion* super_version,
1106
    bool allow_data_in_errors, bool* overlap) {
1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
  assert(overlap != nullptr);
  *overlap = false;
  // Create an InternalIterator over all unflushed memtables
  Arena arena;
  ReadOptions read_opts;
  read_opts.total_order_seek = true;
  MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
  merge_iter_builder.AddIterator(
      super_version->mem->NewIterator(read_opts, &arena));
  super_version->imm->AddIterators(read_opts, &merge_iter_builder);
  ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

1119
  auto read_seq = super_version->current->version_set()->LastSequence();
1120
  ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1121
  auto* active_range_del_iter =
1122 1123 1124
      super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
  range_del_agg.AddTombstones(
      std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1125
  Status status;
1126 1127 1128 1129 1130
  status = super_version->imm->AddRangeTombstoneIterators(
      read_opts, nullptr /* arena */, &range_del_agg);
  // AddRangeTombstoneIterators always return Status::OK.
  assert(status.ok());

1131 1132 1133 1134 1135 1136 1137 1138
  for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
    auto* vstorage = super_version->current->storage_info();
    auto* ucmp = vstorage->InternalComparator()->user_comparator();
    InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
                            kValueTypeForSeek);
    memtable_iter->Seek(range_start.Encode());
    status = memtable_iter->status();
    ParsedInternalKey seek_result;
1139 1140 1141 1142

    if (status.ok() && memtable_iter->Valid()) {
      status = ParseInternalKey(memtable_iter->key(), &seek_result,
                                allow_data_in_errors);
1143
    }
1144

1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157
    if (status.ok()) {
      if (memtable_iter->Valid() &&
          ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
        *overlap = true;
      } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
                                                 ranges[i].limit)) {
        *overlap = true;
      }
    }
  }
  return status;
}

1158
const int ColumnFamilyData::kCompactAllLevels = -1;
1159
const int ColumnFamilyData::kCompactToBaseLevel = -2;
1160

1161
Compaction* ColumnFamilyData::CompactRange(
1162 1163
    const MutableCFOptions& mutable_cf_options,
    const MutableDBOptions& mutable_db_options, int input_level,
1164
    int output_level, const CompactRangeOptions& compact_range_options,
1165
    const InternalKey* begin, const InternalKey* end,
1166
    InternalKey** compaction_end, bool* conflict,
1167
    uint64_t max_file_num_to_ignore, const std::string& trim_ts) {
S
sdong 已提交
1168
  auto* result = compaction_picker_->CompactRange(
1169 1170 1171
      GetName(), mutable_cf_options, mutable_db_options,
      current_->storage_info(), input_level, output_level,
      compact_range_options, begin, end, compaction_end, conflict,
1172
      max_file_num_to_ignore, trim_ts);
S
sdong 已提交
1173 1174 1175 1176
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
1177 1178
}

1179 1180
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
  SuperVersion* sv = GetThreadLocalSuperVersion(db);
I
Igor Canadi 已提交
1181 1182
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
1183 1184 1185 1186
    // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
    // when the thread-local pointer was populated. So, the Ref() earlier in
    // this function still prevents the returned SuperVersion* from being
    // deleted out from under the caller.
I
Igor Canadi 已提交
1187
    sv->Unref();
1188 1189 1190 1191
  }
  return sv;
}

1192
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
J
jsteemann 已提交
1211
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1212 1213
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
1214
    RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
1215 1216 1217
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
1218
      RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_CLEANUPS);
1219
      db->mutex()->Lock();
1220 1221 1222
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
1223 1224 1225 1226 1227 1228
      if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
        db->AddSuperVersionsToFreeQueue(sv);
        db->SchedulePurge();
      } else {
        sv_to_delete = sv;
      }
1229
    } else {
1230
      db->mutex()->Lock();
1231 1232
    }
    sv = super_version_->Ref();
1233
    db->mutex()->Unlock();
1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
1247
    // storage has not been altered and no Scrape has happened. The
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

1259 1260
void ColumnFamilyData::InstallSuperVersion(
    SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1261
  db_mutex->AssertHeld();
1262
  return InstallSuperVersion(sv_context, mutable_cf_options_);
1263 1264
}

1265
void ColumnFamilyData::InstallSuperVersion(
1266
    SuperVersionContext* sv_context,
1267
    const MutableCFOptions& mutable_cf_options) {
1268
  SuperVersion* new_superversion = sv_context->new_superversion.release();
1269
  new_superversion->mutable_cf_options = mutable_cf_options;
1270
  new_superversion->Init(this, mem_, imm_.current(), current_);
1271 1272 1273
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
1274
  super_version_->version_number = super_version_number_;
1275 1276 1277 1278
  super_version_->write_stall_condition =
      RecalculateWriteStallConditions(mutable_cf_options);

  if (old_superversion != nullptr) {
1279 1280 1281 1282 1283 1284
    // Reset SuperVersions cached in thread local storage.
    // This should be done before old_superversion->Unref(). That's to ensure
    // that local_sv_ never holds the last reference to SuperVersion, since
    // it has no means to safely do SuperVersion cleanup.
    ResetThreadLocalSuperVersions();

1285 1286 1287 1288
    if (old_superversion->mutable_cf_options.write_buffer_size !=
        mutable_cf_options.write_buffer_size) {
      mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
    }
1289 1290 1291 1292 1293 1294 1295
    if (old_superversion->write_stall_condition !=
        new_superversion->write_stall_condition) {
      sv_context->PushWriteStallNotification(
          old_superversion->write_stall_condition,
          new_superversion->write_stall_condition, GetName(), ioptions());
    }
    if (old_superversion->Unref()) {
1296
      old_superversion->Cleanup();
1297 1298 1299
      sv_context->superversions_to_free.push_back(old_superversion);
    }
  }
I
Igor Canadi 已提交
1300 1301
}

1302 1303
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
1304
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1305 1306
  for (auto ptr : sv_ptrs) {
    assert(ptr);
1307 1308 1309
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
1310
    auto sv = static_cast<SuperVersion*>(ptr);
1311 1312 1313 1314 1315 1316
    bool was_last_ref __attribute__((__unused__));
    was_last_ref = sv->Unref();
    // sv couldn't have been the last reference because
    // ResetThreadLocalSuperVersions() is called before
    // unref'ing super_version_.
    assert(!was_last_ref);
1317 1318 1319
  }
}

1320 1321 1322 1323 1324 1325 1326
Status ColumnFamilyData::ValidateOptions(
    const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
  Status s;
  s = CheckCompressionSupported(cf_options);
  if (s.ok() && db_options.allow_concurrent_memtable_write) {
    s = CheckConcurrentWritesSupported(cf_options);
  }
1327 1328 1329 1330 1331
  if (s.ok() && db_options.unordered_write &&
      cf_options.max_successive_merges != 0) {
    s = Status::InvalidArgument(
        "max_successive_merges > 0 is incompatible with unordered_write");
  }
1332 1333 1334 1335 1336 1337 1338
  if (s.ok()) {
    s = CheckCFPathsSupported(db_options, cf_options);
  }
  if (!s.ok()) {
    return s;
  }

1339
  if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1340 1341
    if (!cf_options.table_factory->IsInstanceOf(
            TableFactory::kBlockBasedTableName())) {
1342 1343 1344 1345 1346
      return Status::NotSupported(
          "TTL is only supported in Block-Based Table format. ");
    }
  }

1347
  if (cf_options.periodic_compaction_seconds > 0 &&
1348
      cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1349 1350
    if (!cf_options.table_factory->IsInstanceOf(
            TableFactory::kBlockBasedTableName())) {
1351 1352 1353 1354 1355
      return Status::NotSupported(
          "Periodic Compaction is only supported in "
          "Block-Based Table format. ");
    }
  }
1356

1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
  if (cf_options.enable_blob_garbage_collection) {
    if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
        cf_options.blob_garbage_collection_age_cutoff > 1.0) {
      return Status::InvalidArgument(
          "The age cutoff for blob garbage collection should be in the range "
          "[0.0, 1.0].");
    }
    if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
        cf_options.blob_garbage_collection_force_threshold > 1.0) {
      return Status::InvalidArgument(
          "The garbage ratio threshold for forcing blob garbage collection "
          "should be in the range [0.0, 1.0].");
    }
1370 1371
  }

1372 1373 1374 1375 1376 1377
  if (cf_options.compaction_style == kCompactionStyleFIFO &&
      db_options.max_open_files != -1 && cf_options.ttl > 0) {
    return Status::NotSupported(
        "FIFO compaction only supported with max_open_files = -1.");
  }

1378 1379 1380
  return s;
}

I
Igor Canadi 已提交
1381
#ifndef ROCKSDB_LITE
1382
Status ColumnFamilyData::SetOptions(
1383
    const DBOptions& db_opts,
1384
    const std::unordered_map<std::string, std::string>& options_map) {
1385 1386 1387 1388 1389 1390
  ColumnFamilyOptions cf_opts =
      BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
  ConfigOptions config_opts;
  config_opts.mutable_options_only = true;
  Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
                                           &cf_opts);
1391
  if (s.ok()) {
1392
    s = ValidateOptions(db_opts, cf_opts);
1393
  }
1394
  if (s.ok()) {
1395
    mutable_cf_options_ = MutableCFOptions(cf_opts);
1396
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1397
  }
1398
  return s;
1399
}
I
Igor Canadi 已提交
1400
#endif  // ROCKSDB_LITE
1401

S
Stream  
Shaohua Li 已提交
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
// REQUIRES: DB mutex held
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
  if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
    return Env::WLTH_NOT_SET;
  }
  if (level == 0) {
    return Env::WLTH_MEDIUM;
  }
  int base_level = current_->storage_info()->base_level();

  // L1: medium, L2: long, ...
  if (level - base_level >= 2) {
    return Env::WLTH_EXTREME;
1415 1416 1417 1418
  } else if (level < base_level) {
    // There is no restriction which prevents level passed in to be smaller
    // than base_level.
    return Env::WLTH_MEDIUM;
S
Stream  
Shaohua Li 已提交
1419 1420 1421 1422 1423
  }
  return static_cast<Env::WriteLifeTimeHint>(level - base_level +
                            static_cast<int>(Env::WLTH_MEDIUM));
}

1424
Status ColumnFamilyData::AddDirectories(
1425
    std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
1426
  Status s;
1427
  assert(created_dirs != nullptr);
1428 1429
  assert(data_dirs_.empty());
  for (auto& p : ioptions_.cf_paths) {
1430 1431 1432
    auto existing_dir = created_dirs->find(p.path);

    if (existing_dir == created_dirs->end()) {
1433
      std::unique_ptr<FSDirectory> path_directory;
1434 1435
      s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
                                        &path_directory);
1436 1437 1438 1439 1440 1441 1442 1443
      if (!s.ok()) {
        return s;
      }
      assert(path_directory != nullptr);
      data_dirs_.emplace_back(path_directory.release());
      (*created_dirs)[p.path] = data_dirs_.back();
    } else {
      data_dirs_.emplace_back(existing_dir->second);
1444 1445 1446 1447 1448 1449
    }
  }
  assert(data_dirs_.size() == ioptions_.cf_paths.size());
  return s;
}

1450
FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1451 1452 1453 1454 1455 1456 1457 1458
  if (data_dirs_.empty()) {
    return nullptr;
  }

  assert(path_id < data_dirs_.size());
  return data_dirs_[path_id].get();
}

I
Igor Canadi 已提交
1459
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1460
                                 const ImmutableDBOptions* db_options,
1461
                                 const FileOptions& file_options,
1462
                                 Cache* table_cache,
1463 1464
                                 WriteBufferManager* _write_buffer_manager,
                                 WriteController* _write_controller,
1465
                                 BlockCacheTracer* const block_cache_tracer,
1466 1467
                                 const std::shared_ptr<IOTracer>& io_tracer,
                                 const std::string& db_session_id)
1468
    : max_column_family_(0),
1469
      file_options_(file_options),
1470
      dummy_cfd_(new ColumnFamilyData(
1471
          ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
1472
          nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
1473
          block_cache_tracer, io_tracer, db_session_id)),
I
Igor Canadi 已提交
1474
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
1475 1476
      db_name_(dbname),
      db_options_(db_options),
1477
      table_cache_(table_cache),
1478 1479
      write_buffer_manager_(_write_buffer_manager),
      write_controller_(_write_controller),
1480
      block_cache_tracer_(block_cache_tracer),
1481 1482
      io_tracer_(io_tracer),
      db_session_id_(db_session_id) {
1483
  // initialize linked list
1484 1485
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
1486
}
I
Igor Canadi 已提交
1487 1488

ColumnFamilySet::~ColumnFamilySet() {
1489 1490 1491
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
1492
    bool last_ref __attribute__((__unused__));
1493
    last_ref = cfd->UnrefAndTryDelete();
1494
    assert(last_ref);
I
Igor Canadi 已提交
1495
  }
1496
  bool dummy_last_ref __attribute__((__unused__));
1497
  dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1498
  assert(dummy_last_ref);
I
Igor Canadi 已提交
1499 1500 1501
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
1502 1503
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

1515 1516 1517
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
1518 1519 1520 1521 1522
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
1523 1524
    return nullptr;
  }
I
Igor Canadi 已提交
1525 1526 1527 1528 1529 1530
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

1531 1532 1533 1534 1535 1536
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

1537 1538 1539 1540
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
1541
// under a DB mutex AND write thread
I
Igor Canadi 已提交
1542 1543 1544 1545
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
1546 1547
  ColumnFamilyData* new_cfd = new ColumnFamilyData(
      id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1548
      *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
1549
      db_session_id_);
1550
  column_families_.insert({name, id});
I
Igor Canadi 已提交
1551 1552
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
1553
  // add to linked list
1554 1555 1556 1557 1558
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
1559 1560 1561
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
1562 1563 1564
  return new_cfd;
}

I
Igor Canadi 已提交
1565
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
1566
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1567
  auto cfd_iter = column_family_data_.find(cfd->GetID());
1568 1569
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
1570
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
1571 1572
}

I
Igor Canadi 已提交
1573
// under a DB mutex OR from a write thread
1574
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
1575 1576 1577 1578 1579 1580
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
1581
  handle_.SetCFD(current_);
1582 1583
  return current_ != nullptr;
}
1584

1585 1586 1587 1588 1589 1590 1591 1592 1593 1594
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

1595
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1596
  assert(current_ != nullptr);
1597
  return &handle_;
1598 1599
}

1600 1601 1602
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
1603
    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1604 1605 1606 1607 1608
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

1609 1610 1611
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
1612
    return column_family->GetComparator();
1613 1614 1615 1616
  }
  return nullptr;
}

1617
}  // namespace ROCKSDB_NAMESPACE