column_family.cc 13.5 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12 13 14 15

#include <vector>
#include <string>
#include <algorithm>

16
#include "db/db_impl.h"
I
Igor Canadi 已提交
17
#include "db/version_set.h"
18
#include "db/internal_stats.h"
19
#include "db/compaction_picker.h"
20
#include "db/table_properties_collector.h"
21
#include "util/autovector.h"
22
#include "util/hash_skiplist_rep.h"
I
Igor Canadi 已提交
23 24 25

namespace rocksdb {

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd,
                                               DBImpl* db, port::Mutex* mutex)
    : cfd_(cfd), db_(db), mutex_(mutex) {
  if (cfd_ != nullptr) {
    cfd_->Ref();
  }
}

ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
  if (cfd_ != nullptr) {
    DBImpl::DeletionState deletion_state;
    mutex_->Lock();
    if (cfd_->Unref()) {
      delete cfd_;
    }
    db_->FindObsoleteFiles(deletion_state, false, true);
    mutex_->Unlock();
    db_->PurgeObsoleteFiles(deletion_state);
  }
}

47 48
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
}  // anonymous namespace

ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
                                    const InternalFilterPolicy* ipolicy,
                                    const ColumnFamilyOptions& src) {
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
  result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  if (result.block_cache == nullptr && !result.no_block_cache) {
    result.block_cache = NewLRUCache(8 << 20);
  }
  result.compression_per_level = src.compression_per_level;
  if (result.block_size_deviation < 0 || result.block_size_deviation > 100) {
    result.block_size_deviation = 0;
  }
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
  if (result.prefix_extractor) {
    // If a prefix extractor has been supplied and a HashSkipListRepFactory is
    // being used, make sure that the latter uses the former as its transform
    // function.
    auto factory =
        dynamic_cast<HashSkipListRepFactory*>(result.memtable_factory.get());
    if (factory && factory->GetTransform() != result.prefix_extractor) {
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
  auto& collectors = result.table_properties_collectors;
  for (size_t i = 0; i < result.table_properties_collectors.size(); ++i) {
    assert(collectors[i]);
    collectors[i] =
        std::make_shared<UserKeyTablePropertiesCollector>(collectors[i]);
  }
  // Add collector to collect internal key statistics
  collectors.push_back(std::make_shared<InternalKeyPropertiesCollector>());

  return result;
}


115 116 117 118 119 120 121 122 123 124 125 126 127
SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
128 129 130
  uint32_t previous_refs = refs.fetch_sub(1, std::memory_order_relaxed);
  assert(previous_refs > 0);
  return previous_refs == 1;
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

I
Igor Canadi 已提交
154 155 156
ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
                                   const std::string& name,
                                   Version* dummy_versions, Cache* table_cache,
157
                                   const ColumnFamilyOptions& options,
I
Igor Canadi 已提交
158
                                   const DBOptions* db_options,
159 160
                                   const EnvOptions& storage_options,
                                   ColumnFamilySet* column_family_set)
161 162 163 164
    : id_(id),
      name_(name),
      dummy_versions_(dummy_versions),
      current_(nullptr),
165 166
      refs_(0),
      dropped_(false),
167 168 169 170
      internal_comparator_(options.comparator),
      internal_filter_policy_(options.filter_policy),
      options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_,
                               options)),
I
Igor Canadi 已提交
171
      full_options_(*db_options, options_),
172 173 174 175
      mem_(nullptr),
      imm_(options.min_write_buffer_number_to_merge),
      super_version_(nullptr),
      super_version_number_(0),
176 177
      next_(nullptr),
      prev_(nullptr),
178
      log_number_(0),
179 180 181 182
      need_slowdown_for_num_level0_files_(false),
      column_family_set_(column_family_set) {
  Ref();

183 184
  // if dummy_versions is nullptr, then this is a dummy column family.
  if (dummy_versions != nullptr) {
185 186
    internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
                                            db_options->statistics.get()));
187 188
    table_cache_.reset(
        new TableCache(dbname, &full_options_, storage_options, table_cache));
189 190 191 192 193 194 195
    if (options_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(new UniversalCompactionPicker(
          &options_, &internal_comparator_, db_options->info_log.get()));
    } else {
      compaction_picker_.reset(new LevelCompactionPicker(
          &options_, &internal_comparator_, db_options->info_log.get()));
    }
196 197 198 199

    Log(full_options_.info_log, "Options for column family \"%s\":\n",
        name.c_str());
    options_.Dump(full_options_.info_log.get());
200 201
  }
}
I
Igor Canadi 已提交
202

203
// DB mutex held
I
Igor Canadi 已提交
204
ColumnFamilyData::~ColumnFamilyData() {
205 206 207 208 209 210 211
  assert(refs_ == 0);
  // remove from linked list
  auto prev = prev_;
  auto next = next_;
  prev->next_ = next;
  next->prev_ = prev;

212
  if (super_version_ != nullptr) {
213
    bool is_last_reference __attribute__((unused));
214
    is_last_reference = super_version_->Unref();
215
    assert(is_last_reference);
216 217
    super_version_->Cleanup();
    delete super_version_;
218
  }
219 220 221 222 223 224 225 226 227 228 229

  // it's nullptr for dummy CFD
  if (column_family_set_ != nullptr) {
    // remove from column_family_set
    column_family_set_->DropColumnFamily(this);
  }

  if (current_ != nullptr) {
    current_->Unref();
  }

230 231 232 233 234
  if (dummy_versions_ != nullptr) {
    // List must be empty
    assert(dummy_versions_->next_ == dummy_versions_);
    delete dummy_versions_;
  }
235

236 237
  if (mem_ != nullptr) {
    delete mem_->Unref();
238
  }
239
  autovector<MemTable*> to_delete;
240
  imm_.current()->Unref(&to_delete);
241 242 243 244 245
  for (MemTable* m : to_delete) {
    delete m;
  }
}

246 247 248 249
InternalStats* ColumnFamilyData::internal_stats() {
  return internal_stats_.get();
}

250 251 252 253 254 255 256
void ColumnFamilyData::SetCurrent(Version* current) {
  current_ = current;
  need_slowdown_for_num_level0_files_ =
      (options_.level0_slowdown_writes_trigger >= 0 &&
       current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger);
}

257
void ColumnFamilyData::CreateNewMemtable() {
258 259 260
  assert(current_ != nullptr);
  if (mem_ != nullptr) {
    delete mem_->Unref();
261
  }
262
  mem_ = new MemTable(internal_comparator_, options_);
263 264 265
  mem_->Ref();
}

266 267 268 269 270 271 272 273 274 275 276 277
Compaction* ColumnFamilyData::PickCompaction() {
  return compaction_picker_->PickCompaction(current_);
}

Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
                                           const InternalKey* begin,
                                           const InternalKey* end,
                                           InternalKey** compaction_end) {
  return compaction_picker_->CompactRange(current_, input_level, output_level,
                                          begin, end, compaction_end);
}

278 279 280 281 282 283 284 285 286 287 288
SuperVersion* ColumnFamilyData::InstallSuperVersion(
    SuperVersion* new_superversion) {
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
289 290
}

I
Igor Canadi 已提交
291
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
292
                                 const DBOptions* db_options,
I
Igor Canadi 已提交
293 294
                                 const EnvOptions& storage_options,
                                 Cache* table_cache)
295
    : max_column_family_(0),
I
Igor Canadi 已提交
296
      dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr,
297
                                      ColumnFamilyOptions(), db_options,
298
                                      storage_options_, nullptr)),
I
Igor Canadi 已提交
299 300 301
      db_name_(dbname),
      db_options_(db_options),
      storage_options_(storage_options),
302 303
      table_cache_(table_cache),
      spin_lock_(ATOMIC_FLAG_INIT) {
304
  // initialize linked list
305 306
  dummy_cfd_->prev_ = dummy_cfd_;
  dummy_cfd_->next_ = dummy_cfd_;
307
}
I
Igor Canadi 已提交
308 309

ColumnFamilySet::~ColumnFamilySet() {
310 311 312 313
  while (column_family_data_.size() > 0) {
    // cfd destructor will delete itself from column_family_data_
    auto cfd = column_family_data_.begin()->second;
    cfd->Unref();
I
Igor Canadi 已提交
314 315
    delete cfd;
  }
316
  dummy_cfd_->Unref();
317
  delete dummy_cfd_;
I
Igor Canadi 已提交
318 319 320
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
321 322 323 324
  auto cfd = GetColumnFamily(0);
  // default column family should always exist
  assert(cfd != nullptr);
  return cfd;
I
Igor Canadi 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

bool ColumnFamilySet::Exists(uint32_t id) {
  return column_family_data_.find(id) != column_family_data_.end();
}

bool ColumnFamilySet::Exists(const std::string& name) {
  return column_families_.find(name) != column_families_.end();
}

uint32_t ColumnFamilySet::GetID(const std::string& name) {
  auto cfd_iter = column_families_.find(name);
  assert(cfd_iter != column_families_.end());
  return cfd_iter->second;
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

354
// under a DB mutex
I
Igor Canadi 已提交
355 356 357 358 359
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  ColumnFamilyData* new_cfd =
I
Igor Canadi 已提交
360
      new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_,
361 362 363
                           options, db_options_, storage_options_, this);
  Lock();
  column_families_.insert({name, id});
I
Igor Canadi 已提交
364
  column_family_data_.insert({id, new_cfd});
365
  Unlock();
I
Igor Canadi 已提交
366
  max_column_family_ = std::max(max_column_family_, id);
367
  // add to linked list
368 369 370 371 372
  new_cfd->next_ = dummy_cfd_;
  auto prev = dummy_cfd_->prev_;
  new_cfd->prev_ = prev;
  prev->next_ = new_cfd;
  dummy_cfd_->prev_ = new_cfd;
I
Igor Canadi 已提交
373 374 375
  return new_cfd;
}

376 377 378
// under a DB mutex
void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) {
  auto cfd_iter = column_family_data_.find(cfd->GetID());
379
  assert(cfd_iter != column_family_data_.end());
380
  Lock();
381
  column_family_data_.erase(cfd_iter);
382 383
  column_families_.erase(cfd->GetName());
  Unlock();
I
Igor Canadi 已提交
384 385
}

386 387 388 389 390 391 392 393
void ColumnFamilySet::Lock() {
  // spin lock
  while (spin_lock_.test_and_set(std::memory_order_acquire)) {
  }
}

void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }

394
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
395 396
  // maybe outside of db mutex, should lock
  column_family_set_->Lock();
397
  current_ = column_family_set_->GetColumnFamily(column_family_id);
398 399
  column_family_set_->Unlock();
  handle_.SetCFD(current_);
400 401
  return current_ != nullptr;
}
402

403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const {
  assert(current_ != nullptr);
  return current_->full_options();
}

418
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
419
  assert(current_ != nullptr);
420
  return &handle_;
421 422
}

I
Igor Canadi 已提交
423
}  // namespace rocksdb