column_family.cc 30.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11

12 13 14 15 16
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif

#include <inttypes.h>
17 18 19
#include <vector>
#include <string>
#include <algorithm>
I
Igor Canadi 已提交
20
#include <limits>
21

I
Igor Canadi 已提交
22
#include "db/compaction_picker.h"
23
#include "db/db_impl.h"
24
#include "db/internal_stats.h"
25
#include "db/job_context.h"
26
#include "db/table_properties_collector.h"
I
Igor Canadi 已提交
27
#include "db/version_set.h"
28
#include "db/write_controller.h"
29
#include "db/writebuffer.h"
30
#include "memtable/hash_skiplist_rep.h"
31
#include "util/autovector.h"
32
#include "util/compression.h"
33
#include "util/options_helper.h"
34
#include "util/thread_status_util.h"
35
#include "util/xfunc.h"
I
Igor Canadi 已提交
36 37 38

namespace rocksdb {

I
Igor Canadi 已提交
39
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
40
    ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
I
Igor Canadi 已提交
41
    : cfd_(column_family_data), db_(db), mutex_(mutex) {
42 43 44 45 46 47 48
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
49 50 51
    // Job id == 0 means that this is not our background process, but rather
    // user thread
    JobContext job_context(0);
52 53 54 55
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
I
Igor Canadi 已提交
56
    db_->FindObsoleteFiles(&job_context, false, true);
57
    mutex_->Unlock();
I
Igor Canadi 已提交
58 59
    if (job_context.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(job_context);
60
    }
Y
Yueh-Hsuan Chiang 已提交
61
    job_context.Clean();
62 63 64
  }
}

65 66
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

67 68 69 70
const std::string& ColumnFamilyHandleImpl::GetName() const {
  return cfd()->GetName();
}

71 72 73 74
const Comparator* ColumnFamilyHandleImpl::user_comparator() const {
  return cfd()->user_comparator();
}

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
void GetIntTblPropCollectorFactory(
    const ColumnFamilyOptions& cf_options,
    std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
        int_tbl_prop_collector_factories) {
  auto& collector_factories = cf_options.table_properties_collector_factories;
  for (size_t i = 0; i < cf_options.table_properties_collector_factories.size();
       ++i) {
    assert(collector_factories[i]);
    int_tbl_prop_collector_factories->emplace_back(
        new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
  }
  // Add collector to collect internal key statistics
  int_tbl_prop_collector_factories->emplace_back(
      new InternalKeyPropertiesCollectorFactory);
}

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
  if (!cf_options.compression_per_level.empty()) {
    for (size_t level = 0; level < cf_options.compression_per_level.size();
         ++level) {
      if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
        return Status::InvalidArgument(
            "Compression type " +
            CompressionTypeToString(cf_options.compression_per_level[level]) +
            " is not linked with the binary.");
      }
    }
  } else {
    if (!CompressionTypeSupported(cf_options.compression)) {
      return Status::InvalidArgument(
          "Compression type " +
          CompressionTypeToString(cf_options.compression) +
          " is not linked with the binary.");
    }
  }
  return Status::OK();
}

113 114 115
ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
                                    const InternalKeyComparator* icmp,
                                    const ColumnFamilyOptions& src) {
116 117
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
I
Igor Canadi 已提交
118 119 120 121
#ifdef OS_MACOSX
  // TODO(icanadi) make write_buffer_size uint64_t instead of size_t
  ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, ((size_t)1) << 30);
#else
122 123
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
I
Igor Canadi 已提交
124
#endif
125 126 127
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
A
agiardullo 已提交
128 129 130 131 132 133
    result.arena_block_size = result.write_buffer_size / 8;

    // Align up to 4k
    const size_t align = 4 * 1024;
    result.arena_block_size =
        ((result.arena_block_size + align - 1) / align) * align;
134 135 136 137
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
138 139 140 141 142 143 144
  if (result.num_levels < 1) {
    result.num_levels = 1;
  }
  if (result.compaction_style == kCompactionStyleLevel &&
      result.num_levels < 2) {
    result.num_levels = 2;
  }
145 146 147
  if (result.max_write_buffer_number < 2) {
    result.max_write_buffer_number = 2;
  }
148 149 150 151 152 153 154 155 156 157 158 159
  if (result.max_write_buffer_number_to_maintain < 0) {
    result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
  }
  XFUNC_TEST("memtablelist_history", "transaction_xftest_SanitizeOptions",
             xf_transaction_set_memtable_history1,
             xf_transaction_set_memtable_history,
             &result.max_write_buffer_number_to_maintain);
  XFUNC_TEST("memtablelist_history_clear", "transaction_xftest_SanitizeOptions",
             xf_transaction_clear_memtable_history1,
             xf_transaction_clear_memtable_history,
             &result.max_write_buffer_number_to_maintain);

160 161 162 163 164
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
165 166 167 168
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

I
Igor Canadi 已提交
169 170 171 172 173 174 175 176 177
  if (result.compaction_style == kCompactionStyleFIFO) {
    result.num_levels = 1;
    // since we delete level0 files in FIFO compaction when there are too many
    // of them, these options don't really mean anything
    result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
    result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
    result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
  }

178 179 180 181 182 183
  if (result.level0_file_num_compaction_trigger == 0) {
    Warn(db_options.info_log.get(),
         "level0_file_num_compaction_trigger cannot be 0");
    result.level0_file_num_compaction_trigger = 1;
  }

184 185 186 187
  if (result.level0_stop_writes_trigger <
          result.level0_slowdown_writes_trigger ||
      result.level0_slowdown_writes_trigger <
          result.level0_file_num_compaction_trigger) {
188
    Warn(db_options.info_log.get(),
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
         "This condition must be satisfied: "
         "level0_stop_writes_trigger(%d) >= "
         "level0_slowdown_writes_trigger(%d) >= "
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
    if (result.level0_slowdown_writes_trigger <
        result.level0_file_num_compaction_trigger) {
      result.level0_slowdown_writes_trigger =
          result.level0_file_num_compaction_trigger;
    }
    if (result.level0_stop_writes_trigger <
        result.level0_slowdown_writes_trigger) {
      result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
    }
205
    Warn(db_options.info_log.get(),
206 207 208 209 210 211 212 213
         "Adjust the value to "
         "level0_stop_writes_trigger(%d)"
         "level0_slowdown_writes_trigger(%d)"
         "level0_file_num_compaction_trigger(%d)",
         result.level0_stop_writes_trigger,
         result.level0_slowdown_writes_trigger,
         result.level0_file_num_compaction_trigger);
  }
214 215 216 217 218 219 220 221 222 223
  if (result.level_compaction_dynamic_level_bytes) {
    if (result.compaction_style != kCompactionStyleLevel ||
        db_options.db_paths.size() > 1U) {
      // 1. level_compaction_dynamic_level_bytes only makes sense for
      //    level-based compaction.
      // 2. we don't yet know how to make both of this feature and multiple
      //    DB path work.
      result.level_compaction_dynamic_level_bytes = false;
    }
  }
224

225 226 227
  return result;
}

228 229 230
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
231

232 233 234 235 236 237 238 239 240 241 242 243 244
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
245
  uint32_t previous_refs = refs.fetch_sub(1);
246 247
  assert(previous_refs > 0);
  return previous_refs == 1;
248 249 250 251 252 253 254
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
255 256 257
    auto* memory_usage = current->cfd()->imm()->current_memory_usage();
    assert(*memory_usage >= m->ApproximateMemoryUsage());
    *memory_usage -= m->ApproximateMemoryUsage();
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

274 275
namespace {
void SuperVersionUnrefHandle(void* ptr) {
276 277 278 279
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
280 281 282 283 284 285 286 287 288 289
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

290 291 292 293 294
ColumnFamilyData::ColumnFamilyData(
    uint32_t id, const std::string& name, Version* _dummy_versions,
    Cache* _table_cache, WriteBuffer* write_buffer,
    const ColumnFamilyOptions& cf_options, const DBOptions* db_options,
    const EnvOptions& env_options, ColumnFamilySet* column_family_set)
295 296
    : id_(id),
      name_(name),
I
Igor Canadi 已提交
297
      dummy_versions_(_dummy_versions),
298
      current_(nullptr),
299 300
      refs_(0),
      dropped_(false),
301
      internal_comparator_(cf_options.comparator),
302 303
      options_(*db_options,
               SanitizeOptions(*db_options, &internal_comparator_, cf_options)),
L
Lei Jin 已提交
304
      ioptions_(options_),
305
      mutable_cf_options_(options_, ioptions_),
306
      write_buffer_(write_buffer),
307
      mem_(nullptr),
308 309
      imm_(options_.min_write_buffer_number_to_merge,
           options_.max_write_buffer_number_to_maintain),
310 311
      super_version_(nullptr),
      super_version_number_(0),
312
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
313 314
      next_(nullptr),
      prev_(nullptr),
315
      log_number_(0),
316 317 318
      column_family_set_(column_family_set),
      pending_flush_(false),
      pending_compaction_(false) {
319 320
  Ref();

321 322 323
  // Convert user defined table properties collector factories to internal ones.
  GetIntTblPropCollectorFactory(options_, &int_tbl_prop_collector_factories_);

I
Igor Canadi 已提交
324 325
  // if _dummy_versions is nullptr, then this is a dummy column family.
  if (_dummy_versions != nullptr) {
326
    internal_stats_.reset(
327
        new InternalStats(ioptions_.num_levels, db_options->env, this));
I
Igor Canadi 已提交
328
    table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
329
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
I
Igor Canadi 已提交
330
      compaction_picker_.reset(
331
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
332 333 334 335
#ifndef ROCKSDB_LITE
    } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(
          new UniversalCompactionPicker(ioptions_, &internal_comparator_));
336
    } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
I
Igor Canadi 已提交
337
      compaction_picker_.reset(
338
          new FIFOCompactionPicker(ioptions_, &internal_comparator_));
339 340 341 342 343 344 345
    } else if (ioptions_.compaction_style == kCompactionStyleNone) {
      compaction_picker_.reset(new NullCompactionPicker(
          ioptions_, &internal_comparator_));
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
          "Column family %s does not use any background compaction. "
          "Compactions can only be done via CompactFiles\n",
          GetName().c_str());
346
#endif  // !ROCKSDB_LITE
347 348 349 350 351 352 353
    } else {
      Log(InfoLogLevel::ERROR_LEVEL, ioptions_.info_log,
          "Unable to recognize the specified compaction style %d. "
          "Column family %s will use kCompactionStyleLevel.\n",
          ioptions_.compaction_style, GetName().c_str());
      compaction_picker_.reset(
          new LevelCompactionPicker(ioptions_, &internal_comparator_));
354
    }
355

356 357 358
    if (column_family_set_->NumberOfColumnFamilies() < 10) {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "--------------- Options for column family [%s]:\n", name.c_str());
359
      options_.DumpCFOptions(ioptions_.info_log);
360 361 362 363
    } else {
      Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log,
          "\t(skipping printing options)\n");
    }
364
  }
365

366
  RecalculateWriteStallConditions(mutable_cf_options_);
367
}
I
Igor Canadi 已提交
368

369
// DB mutex held
I
Igor Canadi 已提交
370
ColumnFamilyData::~ColumnFamilyData() {
I
Igor Canadi 已提交
371
  assert(refs_.load(std::memory_order_relaxed) == 0);
372 373 374 375 376 377
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

I
Igor Canadi 已提交
378 379 380 381
  if (!dropped_ && column_family_set_ != nullptr) {
    // If it's dropped, it's already removed from column family set
    // If column_family_set_ == nullptr, this is dummy CFD and not in
    // ColumnFamilySet
I
Igor Canadi 已提交
382
    column_family_set_->RemoveColumnFamily(this);
383 384 385 386 387 388
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

389 390 391 392 393
  // It would be wrong if this ColumnFamilyData is in flush_queue_ or
  // compaction_queue_ and we destroyed it
  assert(!pending_flush_);
  assert(!pending_compaction_);

I
Igor Canadi 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
408

409 410
  if (dummy_versions_ != nullptr) {
    // List must be empty
411 412 413
    assert(dummy_versions_->TEST_Next() == dummy_versions_);
    bool deleted __attribute__((unused)) = dummy_versions_->Unref();
    assert(deleted);
414
  }
415

416 417
  if (mem_ != nullptr) {
    delete mem_->Unref();
418
  }
419
  autovector<MemTable*> to_delete;
420
  imm_.current()->Unref(&to_delete);
421 422 423 424 425
  for (MemTable* m : to_delete) {
    delete m;
  }
}

I
Igor Canadi 已提交
426 427 428 429 430 431 432 433 434 435
void ColumnFamilyData::SetDropped() {
  // can't drop default CF
  assert(id_ != 0);
  dropped_ = true;
  write_controller_token_.reset();

  // remove from column_family_set
  column_family_set_->RemoveColumnFamily(this);
}

436 437
void ColumnFamilyData::RecalculateWriteStallConditions(
      const MutableCFOptions& mutable_cf_options) {
438
  if (current_ != nullptr) {
S
sdong 已提交
439
    auto* vstorage = current_->storage_info();
440 441
    auto write_controller = column_family_set_->write_controller_;

442
    if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
443 444
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
445
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
446
          "[%s] Stopping writes because we have %d immutable memtables "
L
Lei Jin 已提交
447
          "(waiting for flush), max_write_buffer_number is set to %d",
448
          name_.c_str(), imm()->NumNotFlushed(),
L
Lei Jin 已提交
449
          mutable_cf_options.max_write_buffer_number);
S
sdong 已提交
450
    } else if (vstorage->l0_delay_trigger_count() >=
451
               mutable_cf_options.level0_stop_writes_trigger) {
452
      write_controller_token_ = write_controller->GetStopToken();
453 454 455 456 457
      internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
      }
458
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
459
          "[%s] Stopping writes because we have %d level-0 files",
S
sdong 已提交
460
          name_.c_str(), vstorage->l0_delay_trigger_count());
461 462 463 464 465 466 467
    } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
               vstorage->estimated_compaction_needed_bytes() >=
                   mutable_cf_options.hard_pending_compaction_bytes_limit) {
      write_controller_token_ = write_controller->GetStopToken();
      internal_stats_->AddCFStats(
          InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
468 469
          "[%s] Stopping writes because of estimated pending compaction "
          "bytes %" PRIu64,
470
          name_.c_str(), vstorage->estimated_compaction_needed_bytes());
471
    } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
S
sdong 已提交
472
               vstorage->l0_delay_trigger_count() >=
473
                   mutable_cf_options.level0_slowdown_writes_trigger) {
S
sdong 已提交
474
      write_controller_token_ = write_controller->GetDelayToken();
475 476 477 478 479
      internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
      if (compaction_picker_->IsLevel0CompactionInProgress()) {
        internal_stats_->AddCFStats(
            InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
      }
480
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
S
sdong 已提交
481 482
          "[%s] Stalling writes because we have %d level-0 files",
          name_.c_str(), vstorage->l0_delay_trigger_count());
483 484 485
    } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
               vstorage->estimated_compaction_needed_bytes() >=
                   mutable_cf_options.soft_pending_compaction_bytes_limit) {
S
sdong 已提交
486
      write_controller_token_ = write_controller->GetDelayToken();
487 488
      internal_stats_->AddCFStats(
          InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
489
      Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
490 491 492
          "[%s] Stalling writes because of estimated pending compaction "
          "bytes %" PRIu64,
          name_.c_str(), vstorage->estimated_compaction_needed_bytes());
493 494 495
    } else {
      write_controller_token_.reset();
    }
496 497 498
  }
}

L
Lei Jin 已提交
499
const EnvOptions* ColumnFamilyData::soptions() const {
L
Lei Jin 已提交
500
  return &(column_family_set_->env_options_);
L
Lei Jin 已提交
501 502
}

I
Igor Canadi 已提交
503 504 505
void ColumnFamilyData::SetCurrent(Version* current_version) {
  current_ = current_version;
}
506

507 508 509 510
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
  return VersionSet::GetNumLiveVersions(dummy_versions_);
}

511 512 513 514
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
  return VersionSet::GetTotalSstFilesSize(dummy_versions_);
}

515
MemTable* ColumnFamilyData::ConstructNewMemtable(
A
agiardullo 已提交
516
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
517
  assert(current_ != nullptr);
A
agiardullo 已提交
518 519
  return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
                      write_buffer_, earliest_seq);
520 521 522
}

void ColumnFamilyData::CreateNewMemtable(
A
agiardullo 已提交
523
    const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
524 525
  if (mem_ != nullptr) {
    delete mem_->Unref();
526
  }
A
agiardullo 已提交
527
  SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
528 529 530
  mem_->Ref();
}

531 532 533 534
bool ColumnFamilyData::NeedsCompaction() const {
  return compaction_picker_->NeedsCompaction(current_->storage_info());
}

535 536
Compaction* ColumnFamilyData::PickCompaction(
    const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
S
sdong 已提交
537
  auto* result = compaction_picker_->PickCompaction(
S
sdong 已提交
538
      GetName(), mutable_options, current_->storage_info(), log_buffer);
S
sdong 已提交
539 540 541
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
542
  return result;
543 544
}

545
const int ColumnFamilyData::kCompactAllLevels = -1;
546
const int ColumnFamilyData::kCompactToBaseLevel = -2;
547

548
Compaction* ColumnFamilyData::CompactRange(
549 550 551
    const MutableCFOptions& mutable_cf_options, int input_level,
    int output_level, uint32_t output_path_id, const InternalKey* begin,
    const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
S
sdong 已提交
552
  auto* result = compaction_picker_->CompactRange(
S
sdong 已提交
553
      GetName(), mutable_cf_options, current_->storage_info(), input_level,
554
      output_level, output_path_id, begin, end, compaction_end, conflict);
S
sdong 已提交
555 556 557 558
  if (result != nullptr) {
    result->SetInputVersion(current_);
  }
  return result;
559 560
}

561
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
562
    InstrumentedMutex* db_mutex) {
563
  SuperVersion* sv = nullptr;
I
Igor Canadi 已提交
564 565 566 567
  sv = GetThreadLocalSuperVersion(db_mutex);
  sv->Ref();
  if (!ReturnThreadLocalSuperVersion(sv)) {
    sv->Unref();
568 569 570 571 572
  }
  return sv;
}

SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
573
    InstrumentedMutex* db_mutex) {
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
  SuperVersion* sv = nullptr;
  // The SuperVersion is cached in thread local storage to avoid acquiring
  // mutex when SuperVersion does not change since the last use. When a new
  // SuperVersion is installed, the compaction or flush thread cleans up
  // cached SuperVersion in all existing thread local storage. To avoid
  // acquiring mutex for this operation, we use atomic Swap() on the thread
  // local pointer to guarantee exclusive access. If the thread local pointer
  // is being used while a new SuperVersion is installed, the cached
  // SuperVersion can become stale. In that case, the background thread would
  // have swapped in kSVObsolete. We re-check the value at when returning
  // SuperVersion back to thread local, with an atomic compare and swap.
  // The superversion will need to be released if detected to be stale.
  void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
  // Invariant:
  // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
  // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
  // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
  // (if no Scrape happens).
  assert(ptr != SuperVersion::kSVInUse);
  sv = static_cast<SuperVersion*>(ptr);
  if (sv == SuperVersion::kSVObsolete ||
      sv->version_number != super_version_number_.load()) {
596
    RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
597 598 599
    SuperVersion* sv_to_delete = nullptr;

    if (sv && sv->Unref()) {
600
      RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
      db_mutex->Lock();
      // NOTE: underlying resources held by superversion (sst files) might
      // not be released until the next background job.
      sv->Cleanup();
      sv_to_delete = sv;
    } else {
      db_mutex->Lock();
    }
    sv = super_version_->Ref();
    db_mutex->Unlock();

    delete sv_to_delete;
  }
  assert(sv != nullptr);
  return sv;
}

bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
  assert(sv != nullptr);
  // Put the SuperVersion back
  void* expected = SuperVersion::kSVInUse;
  if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
    // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
C
clark.kang 已提交
624
    // storage has not been altered and no Scrape has happened. The
625 626 627 628 629 630 631 632 633 634 635
    // SuperVersion is still current.
    return true;
  } else {
    // ThreadLocal scrape happened in the process of this GetImpl call (after
    // thread local Swap() at the beginning and before CompareAndSwap()).
    // This means the SuperVersion it holds is obsolete.
    assert(expected == SuperVersion::kSVObsolete);
  }
  return false;
}

636
SuperVersion* ColumnFamilyData::InstallSuperVersion(
637
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
638 639 640 641 642
  db_mutex->AssertHeld();
  return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
}

SuperVersion* ColumnFamilyData::InstallSuperVersion(
643
    SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
644
    const MutableCFOptions& mutable_cf_options) {
645
  new_superversion->db_mutex = db_mutex;
646
  new_superversion->mutable_cf_options = mutable_cf_options;
647 648 649 650
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
651
  super_version_->version_number = super_version_number_;
652
  // Reset SuperVersions cached in thread local storage
I
Igor Canadi 已提交
653
  ResetThreadLocalSuperVersions();
654

655
  RecalculateWriteStallConditions(mutable_cf_options);
656

657 658 659 660 661
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
662 663
}

664 665
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
666
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
667 668
  for (auto ptr : sv_ptrs) {
    assert(ptr);
669 670 671
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
672 673 674 675 676 677 678 679
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

I
Igor Canadi 已提交
680
#ifndef ROCKSDB_LITE
681
Status ColumnFamilyData::SetOptions(
682 683
      const std::unordered_map<std::string, std::string>& options_map) {
  MutableCFOptions new_mutable_cf_options;
684 685 686
  Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
                                          &new_mutable_cf_options);
  if (s.ok()) {
687
    mutable_cf_options_ = new_mutable_cf_options;
688
    mutable_cf_options_.RefreshDerivedOptions(ioptions_);
689
  }
690
  return s;
691
}
I
Igor Canadi 已提交
692
#endif  // ROCKSDB_LITE
693

I
Igor Canadi 已提交
694
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
695
                                 const DBOptions* db_options,
L
Lei Jin 已提交
696
                                 const EnvOptions& env_options,
697
                                 Cache* table_cache,
698
                                 WriteBuffer* write_buffer,
699
                                 WriteController* write_controller)
700
    : max_column_family_(0),
701
      dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
702
                                      ColumnFamilyOptions(), db_options,
703
                                      env_options, nullptr)),
I
Igor Canadi 已提交
704
      default_cfd_cache_(nullptr),
I
Igor Canadi 已提交
705 706
      db_name_(dbname),
      db_options_(db_options),
L
Lei Jin 已提交
707
      env_options_(env_options),
708
      table_cache_(table_cache),
709
      write_buffer_(write_buffer),
I
Igor Canadi 已提交
710
      write_controller_(write_controller) {
711
  // initialize linked list
712 713
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
714
}
I
Igor Canadi 已提交
715 716

ColumnFamilySet::~ColumnFamilySet() {
717 718 719 720
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
721 722
    delete cfd;
  }
723
  dummy_cfd_->Unref();
724
  delete dummy_cfd_;
I
Igor Canadi 已提交
725 726 727
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
I
Igor Canadi 已提交
728 729
  assert(default_cfd_cache_ != nullptr);
  return default_cfd_cache_;
I
Igor Canadi 已提交
730 731 732 733 734 735 736 737 738 739 740
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

741 742 743
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
I
Igor Canadi 已提交
744 745 746 747 748
  if (cfd_iter != column_families_.end()) {
    auto cfd = GetColumnFamily(cfd_iter->second);
    assert(cfd != nullptr);
    return cfd;
  } else {
749 750
    return nullptr;
  }
I
Igor Canadi 已提交
751 752 753 754 755 756
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

757 758 759 760 761 762
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

763 764 765 766
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
  return column_families_.size();
}

I
Igor Canadi 已提交
767
// under a DB mutex AND write thread
I
Igor Canadi 已提交
768 769 770 771 772
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
773 774 775
      new ColumnFamilyData(id, name, dummy_versions, table_cache_,
                           write_buffer_, options, db_options_,
                           env_options_, this);
776
  column_families_.insert({name, id});
I
Igor Canadi 已提交
777 778
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
779
  // add to linked list
780 781 782 783 784
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
785 786 787
  if (id == 0) {
    default_cfd_cache_ = new_cfd;
  }
I
Igor Canadi 已提交
788 789 790
  return new_cfd;
}

791 792 793 794
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
  autovector<ColumnFamilyData*> to_delete;
  for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
I
Igor Canadi 已提交
795
    if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
796 797 798 799 800 801 802 803 804
      to_delete.push_back(cfd);
    }
  }
  for (auto cfd : to_delete) {
    // this is very rare, so it's not a problem that we do it under a mutex
    delete cfd;
  }
}

I
Igor Canadi 已提交
805
// under a DB mutex AND from a write thread
I
Igor Canadi 已提交
806
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
807
  auto cfd_iter = column_family_data_.find(cfd->GetID());
808 809
  assert(cfd_iter != column_family_data_.end());
  column_family_data_.erase(cfd_iter);
810
  column_families_.erase(cfd->GetName());
I
Igor Canadi 已提交
811 812
}

I
Igor Canadi 已提交
813
// under a DB mutex OR from a write thread
814
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
I
Igor Canadi 已提交
815 816 817 818 819 820
  if (column_family_id == 0) {
    // optimization for common case
    current_ = column_family_set_->GetDefault();
  } else {
    current_ = column_family_set_->GetColumnFamily(column_family_id);
  }
821
  handle_.SetCFD(current_);
822 823
  return current_ != nullptr;
}
824

825 826 827 828 829 830 831 832 833 834
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

835
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
836
  assert(current_ != nullptr);
837
  return &handle_;
838 839
}

I
Igor Canadi 已提交
840 841 842 843 844 845 846
void ColumnFamilyMemTablesImpl::CheckMemtableFull() {
  if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) {
    flush_scheduler_->ScheduleFlush(current_);
    current_->mem()->MarkFlushScheduled();
  }
}

847 848 849 850 851 852 853 854 855
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
  uint32_t column_family_id = 0;
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    column_family_id = cfh->GetID();
  }
  return column_family_id;
}

856 857 858 859 860 861 862 863 864
const Comparator* GetColumnFamilyUserComparator(
    ColumnFamilyHandle* column_family) {
  if (column_family != nullptr) {
    auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
    return cfh->user_comparator();
  }
  return nullptr;
}

I
Igor Canadi 已提交
865
}  // namespace rocksdb