column_family.cc 15.4 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12 13 14 15

#include <vector>
#include <string>
#include <algorithm>

16
#include "db/db_impl.h"
I
Igor Canadi 已提交
17
#include "db/version_set.h"
18
#include "db/internal_stats.h"
19
#include "db/compaction_picker.h"
20
#include "db/table_properties_collector.h"
21
#include "util/autovector.h"
22
#include "util/hash_skiplist_rep.h"
I
Igor Canadi 已提交
23 24 25

namespace rocksdb {

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd,
                                               DBImpl* db, port::Mutex* mutex)
    : cfd_(cfd), db_(db), mutex_(mutex) {
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
    DBImpl::DeletionState deletion_state;
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
    db_->FindObsoleteFiles(deletion_state, false, true);
    mutex_->Unlock();
43 44 45
    if (deletion_state.HaveSomethingToDelete()) {
      db_->PurgeObsoleteFiles(deletion_state);
    }
46 47 48
  }
}

49 50
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
}  // anonymous namespace

ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
                                    const InternalFilterPolicy* ipolicy,
                                    const ColumnFamilyOptions& src) {
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
  result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  if (result.block_cache == nullptr && !result.no_block_cache) {
    result.block_cache = NewLRUCache(8 << 20);
  }
  result.compression_per_level = src.compression_per_level;
  if (result.block_size_deviation < 0 || result.block_size_deviation > 100) {
    result.block_size_deviation = 0;
  }
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
89 90 91 92 93
  if (!result.prefix_extractor) {
    assert(result.memtable_factory);
    Slice name = result.memtable_factory->Name();
    if (name.compare("HashSkipListRepFactory") == 0 ||
        name.compare("HashLinkListRepFactory") == 0) {
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
  auto& collectors = result.table_properties_collectors;
  for (size_t i = 0; i < result.table_properties_collectors.size(); ++i) {
    assert(collectors[i]);
    collectors[i] =
        std::make_shared<UserKeyTablePropertiesCollector>(collectors[i]);
  }
  // Add collector to collect internal key statistics
  collectors.push_back(std::make_shared<InternalKeyPropertiesCollector>());

  return result;
}

114 115 116
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
117

118 119 120 121 122 123 124 125 126 127 128 129 130
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
131 132 133
  uint32_t previous_refs = refs.fetch_sub(1, std::memory_order_relaxed);
  assert(previous_refs > 0);
  return previous_refs == 1;
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

157 158
namespace {
void SuperVersionUnrefHandle(void* ptr) {
159 160 161 162
  // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
  // destroyed. When former happens, the thread shouldn't see kSVInUse.
  // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
  // well.
163 164 165 166 167 168 169 170 171 172
  SuperVersion* sv = static_cast<SuperVersion*>(ptr);
  if (sv->Unref()) {
    sv->db_mutex->Lock();
    sv->Cleanup();
    sv->db_mutex->Unlock();
    delete sv;
  }
}
}  // anonymous namespace

I
Igor Canadi 已提交
173 174 175
ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
                                   const std::string& name,
                                   Version* dummy_versions, Cache* table_cache,
176
                                   const ColumnFamilyOptions& options,
I
Igor Canadi 已提交
177
                                   const DBOptions* db_options,
178 179
                                   const EnvOptions& storage_options,
                                   ColumnFamilySet* column_family_set)
180 181 182 183
    : id_(id),
      name_(name),
      dummy_versions_(dummy_versions),
      current_(nullptr),
184 185
      refs_(0),
      dropped_(false),
186 187 188 189
      internal_comparator_(options.comparator),
      internal_filter_policy_(options.filter_policy),
      options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_,
                               options)),
I
Igor Canadi 已提交
190
      full_options_(*db_options, options_),
191 192 193 194
      mem_(nullptr),
      imm_(options.min_write_buffer_number_to_merge),
      super_version_(nullptr),
      super_version_number_(0),
195
      local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
196 197
      next_(nullptr),
      prev_(nullptr),
198
      log_number_(0),
199 200 201 202
      need_slowdown_for_num_level0_files_(false),
      column_family_set_(column_family_set) {
  Ref();

203 204
  // if dummy_versions is nullptr, then this is a dummy column family.
  if (dummy_versions != nullptr) {
205 206
    internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
                                            db_options->statistics.get()));
207 208
    table_cache_.reset(
        new TableCache(dbname, &full_options_, storage_options, table_cache));
209 210 211 212 213 214 215
    if (options_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(new UniversalCompactionPicker(
          &options_, &internal_comparator_, db_options->info_log.get()));
    } else {
      compaction_picker_.reset(new LevelCompactionPicker(
          &options_, &internal_comparator_, db_options->info_log.get()));
    }
216 217 218 219

    Log(full_options_.info_log, "Options for column family \"%s\":\n",
        name.c_str());
    options_.Dump(full_options_.info_log.get());
220 221
  }
}
I
Igor Canadi 已提交
222

223
// DB mutex held
I
Igor Canadi 已提交
224
ColumnFamilyData::~ColumnFamilyData() {
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
  assert(refs_ == 0);
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

  // it's nullptr for dummy CFD
  if (column_family_set_ != nullptr) {
    // remove from column_family_set
    column_family_set_->DropColumnFamily(this);
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

242 243
  DeleteSuperVersion();

244 245 246 247 248
  if (dummy_versions_ != nullptr) {
    // List must be empty
    assert(dummy_versions_->next_ == dummy_versions_);
    delete dummy_versions_;
  }
249

250 251
  if (mem_ != nullptr) {
    delete mem_->Unref();
252
  }
253
  autovector<MemTable*> to_delete;
254
  imm_.current()->Unref(&to_delete);
255 256 257 258 259
  for (MemTable* m : to_delete) {
    delete m;
  }
}

260 261 262 263
InternalStats* ColumnFamilyData::internal_stats() {
  return internal_stats_.get();
}

264 265 266 267 268 269 270
void ColumnFamilyData::SetCurrent(Version* current) {
  current_ = current;
  need_slowdown_for_num_level0_files_ =
      (options_.level0_slowdown_writes_trigger >= 0 &&
       current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger);
}

271
void ColumnFamilyData::CreateNewMemtable() {
272 273 274
  assert(current_ != nullptr);
  if (mem_ != nullptr) {
    delete mem_->Unref();
275
  }
276
  mem_ = new MemTable(internal_comparator_, options_);
277 278 279
  mem_->Ref();
}

280 281
Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
  return compaction_picker_->PickCompaction(current_, log_buffer);
282 283 284 285 286 287 288 289 290 291
}

Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
                                           const InternalKey* begin,
                                           const InternalKey* end,
                                           InternalKey** compaction_end) {
  return compaction_picker_->CompactRange(current_, input_level, output_level,
                                          begin, end, compaction_end);
}

292
SuperVersion* ColumnFamilyData::InstallSuperVersion(
293
    SuperVersion* new_superversion, port::Mutex* db_mutex) {
294 295 296 297
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
298 299
  super_version_->version_number = super_version_number_;
  super_version_->db_mutex = db_mutex;
300 301 302 303 304
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
305 306
}

307 308
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
  autovector<void*> sv_ptrs;
309
  local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
310 311
  for (auto ptr : sv_ptrs) {
    assert(ptr);
312 313 314
    if (ptr == SuperVersion::kSVInUse) {
      continue;
    }
315 316 317 318 319 320 321 322
    auto sv = static_cast<SuperVersion*>(ptr);
    if (sv->Unref()) {
      sv->Cleanup();
      delete sv;
    }
  }
}

323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
void ColumnFamilyData::DeleteSuperVersion() {
  if (super_version_ != nullptr) {
    // Release SuperVersion reference kept in ThreadLocalPtr.
    // This must be done outside of mutex_ since unref handler can lock mutex.
    super_version_->db_mutex->Unlock();
    local_sv_.reset();
    super_version_->db_mutex->Lock();

    bool is_last_reference __attribute__((unused));
    is_last_reference = super_version_->Unref();
    assert(is_last_reference);
    super_version_->Cleanup();
    delete super_version_;
    super_version_ = nullptr;
  }
}

I
Igor Canadi 已提交
340
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
341
                                 const DBOptions* db_options,
I
Igor Canadi 已提交
342 343
                                 const EnvOptions& storage_options,
                                 Cache* table_cache)
344
    : max_column_family_(0),
I
Igor Canadi 已提交
345
      dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr,
346
                                      ColumnFamilyOptions(), db_options,
347
                                      storage_options_, nullptr)),
I
Igor Canadi 已提交
348 349 350
      db_name_(dbname),
      db_options_(db_options),
      storage_options_(storage_options),
351 352
      table_cache_(table_cache),
      spin_lock_(ATOMIC_FLAG_INIT) {
353
  // initialize linked list
354 355
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
356
}
I
Igor Canadi 已提交
357 358

ColumnFamilySet::~ColumnFamilySet() {
359 360 361 362
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
363 364
    delete cfd;
  }
365
  dummy_cfd_->Unref();
366
  delete dummy_cfd_;
I
Igor Canadi 已提交
367 368 369
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
370 371 372 373
  auto cfd = GetColumnFamily(0);
  // default column family should always exist
  assert(cfd != nullptr);
  return cfd;
I
Igor Canadi 已提交
374 375 376 377 378 379 380 381 382 383 384
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

385 386 387 388 389 390 391 392 393
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
    const {
  auto cfd_iter = column_families_.find(name);
  if (cfd_iter == column_families_.end()) {
    return nullptr;
  }
  return GetColumnFamily(cfd_iter->second);
}

I
Igor Canadi 已提交
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
bool ColumnFamilySet::Exists(uint32_t id) {
  return column_family_data_.find(id) != column_family_data_.end();
}

bool ColumnFamilySet::Exists(const std::string& name) {
  return column_families_.find(name) != column_families_.end();
}

uint32_t ColumnFamilySet::GetID(const std::string& name) {
  auto cfd_iter = column_families_.find(name);
  assert(cfd_iter != column_families_.end());
  return cfd_iter->second;
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

412 413 414 415 416 417
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }

void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
  max_column_family_ = std::max(new_max_column_family, max_column_family_);
}

418
// under a DB mutex
I
Igor Canadi 已提交
419 420 421 422 423
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
I
Igor Canadi 已提交
424
      new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_,
425 426 427
                           options, db_options_, storage_options_, this);
  Lock();
  column_families_.insert({name, id});
I
Igor Canadi 已提交
428
  column_family_data_.insert({id, new_cfd});
429
  Unlock();
I
Igor Canadi 已提交
430
  max_column_family_ = std::max(max_column_family_, id);
431
  // add to linked list
432 433 434 435 436
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
437 438 439
  return new_cfd;
}

440 441 442
// under a DB mutex
void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) {
  auto cfd_iter = column_family_data_.find(cfd->GetID());
443
  assert(cfd_iter != column_family_data_.end());
444
  Lock();
445
  column_family_data_.erase(cfd_iter);
446 447
  column_families_.erase(cfd->GetName());
  Unlock();
I
Igor Canadi 已提交
448 449
}

450 451 452 453 454 455 456 457
void ColumnFamilySet::Lock() {
  // spin lock
  while (spin_lock_.test_and_set(std::memory_order_acquire)) {
  }
}

void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }

458
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
459 460
  // maybe outside of db mutex, should lock
  column_family_set_->Lock();
461
  current_ = column_family_set_->GetColumnFamily(column_family_id);
462 463
  column_family_set_->Unlock();
  handle_.SetCFD(current_);
464 465
  return current_ != nullptr;
}
466

467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const {
  assert(current_ != nullptr);
  return current_->full_options();
}

482
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
483
  assert(current_ != nullptr);
484
  return &handle_;
485 486
}

I
Igor Canadi 已提交
487
}  // namespace rocksdb