column_family.cc 12.2 KB
Newer Older
1 2 3 4 5 6 7 8 9
//  Copyright (c) 2013, Facebook, Inc.  All rights reserved.
//  This source code is licensed under the BSD-style license found in the
//  LICENSE file in the root directory of this source tree. An additional grant
//  of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

I
Igor Canadi 已提交
10
#include "db/column_family.h"
11 12 13 14 15

#include <vector>
#include <string>
#include <algorithm>

I
Igor Canadi 已提交
16
#include "db/version_set.h"
17
#include "db/internal_stats.h"
18
#include "db/compaction_picker.h"
19 20
#include "db/table_properties_collector.h"
#include "util/hash_skiplist_rep.h"
I
Igor Canadi 已提交
21 22 23

namespace rocksdb {

24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
}  // anonymous namespace

ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
                                    const InternalFilterPolicy* ipolicy,
                                    const ColumnFamilyOptions& src) {
  ColumnFamilyOptions result = src;
  result.comparator = icmp;
  result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
  ClipToRange(&result.write_buffer_size,
              ((size_t)64) << 10, ((size_t)64) << 30);
  // if user sets arena_block_size, we trust user to use this value. Otherwise,
  // calculate a proper value from writer_buffer_size;
  if (result.arena_block_size <= 0) {
    result.arena_block_size = result.write_buffer_size / 10;
  }
  result.min_write_buffer_number_to_merge =
      std::min(result.min_write_buffer_number_to_merge,
               result.max_write_buffer_number - 1);
  if (result.block_cache == nullptr && !result.no_block_cache) {
    result.block_cache = NewLRUCache(8 << 20);
  }
  result.compression_per_level = src.compression_per_level;
  if (result.block_size_deviation < 0 || result.block_size_deviation > 100) {
    result.block_size_deviation = 0;
  }
  if (result.max_mem_compaction_level >= result.num_levels) {
    result.max_mem_compaction_level = result.num_levels - 1;
  }
  if (result.soft_rate_limit > result.hard_rate_limit) {
    result.soft_rate_limit = result.hard_rate_limit;
  }
  if (result.prefix_extractor) {
    // If a prefix extractor has been supplied and a HashSkipListRepFactory is
    // being used, make sure that the latter uses the former as its transform
    // function.
    auto factory =
        dynamic_cast<HashSkipListRepFactory*>(result.memtable_factory.get());
    if (factory && factory->GetTransform() != result.prefix_extractor) {
      result.memtable_factory = std::make_shared<SkipListFactory>();
    }
  }

  // -- Sanitize the table properties collector
  // All user defined properties collectors will be wrapped by
  // UserKeyTablePropertiesCollector since for them they only have the
  // knowledge of the user keys; internal keys are invisible to them.
  auto& collectors = result.table_properties_collectors;
  for (size_t i = 0; i < result.table_properties_collectors.size(); ++i) {
    assert(collectors[i]);
    collectors[i] =
        std::make_shared<UserKeyTablePropertiesCollector>(collectors[i]);
  }
  // Add collector to collect internal key statistics
  collectors.push_back(std::make_shared<InternalKeyPropertiesCollector>());

  return result;
}


90
SuperVersion::SuperVersion() {}
91 92 93 94 95 96 97 98 99 100 101 102 103 104

SuperVersion::~SuperVersion() {
  for (auto td : to_delete) {
    delete td;
  }
}

SuperVersion* SuperVersion::Ref() {
  refs.fetch_add(1, std::memory_order_relaxed);
  return this;
}

bool SuperVersion::Unref() {
  // fetch_sub returns the previous value of ref
105 106 107
  uint32_t previous_refs = refs.fetch_sub(1, std::memory_order_relaxed);
  assert(previous_refs > 0);
  return previous_refs == 1;
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
}

void SuperVersion::Cleanup() {
  assert(refs.load(std::memory_order_relaxed) == 0);
  imm->Unref(&to_delete);
  MemTable* m = mem->Unref();
  if (m != nullptr) {
    to_delete.push_back(m);
  }
  current->Unref();
}

void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
                        Version* new_current) {
  mem = new_mem;
  imm = new_imm;
  current = new_current;
  mem->Ref();
  imm->Ref();
  current->Ref();
  refs.store(1, std::memory_order_relaxed);
}

I
Igor Canadi 已提交
131 132 133
ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
                                   const std::string& name,
                                   Version* dummy_versions, Cache* table_cache,
134
                                   const ColumnFamilyOptions& options,
I
Igor Canadi 已提交
135
                                   const DBOptions* db_options,
I
Igor Canadi 已提交
136
                                   const EnvOptions& storage_options)
137 138 139 140
    : id_(id),
      name_(name),
      dummy_versions_(dummy_versions),
      current_(nullptr),
141 142 143 144
      internal_comparator_(options.comparator),
      internal_filter_policy_(options.filter_policy),
      options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_,
                               options)),
I
Igor Canadi 已提交
145
      full_options_(*db_options, options_),
146 147 148 149
      mem_(nullptr),
      imm_(options.min_write_buffer_number_to_merge),
      super_version_(nullptr),
      super_version_number_(0),
150 151
      next_(nullptr),
      prev_(nullptr),
152
      log_number_(0),
153
      need_slowdown_for_num_level0_files_(false) {
154 155
  // if dummy_versions is nullptr, then this is a dummy column family.
  if (dummy_versions != nullptr) {
156 157
    internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
                                            db_options->statistics.get()));
158 159
    table_cache_.reset(
        new TableCache(dbname, &full_options_, storage_options, table_cache));
160 161 162 163 164 165 166
    if (options_.compaction_style == kCompactionStyleUniversal) {
      compaction_picker_.reset(new UniversalCompactionPicker(
          &options_, &internal_comparator_, db_options->info_log.get()));
    } else {
      compaction_picker_.reset(new LevelCompactionPicker(
          &options_, &internal_comparator_, db_options->info_log.get()));
    }
167 168
  }
}
I
Igor Canadi 已提交
169 170

ColumnFamilyData::~ColumnFamilyData() {
171
  if (super_version_ != nullptr) {
172
    bool is_last_reference __attribute__((unused));
173
    is_last_reference = super_version_->Unref();
174
    assert(is_last_reference);
175 176
    super_version_->Cleanup();
    delete super_version_;
177
  }
178 179 180 181 182
  if (dummy_versions_ != nullptr) {
    // List must be empty
    assert(dummy_versions_->next_ == dummy_versions_);
    delete dummy_versions_;
  }
183

184 185
  if (mem_ != nullptr) {
    delete mem_->Unref();
186 187
  }
  std::vector<MemTable*> to_delete;
188
  imm_.current()->Unref(&to_delete);
189 190 191 192 193
  for (MemTable* m : to_delete) {
    delete m;
  }
}

194 195 196 197
InternalStats* ColumnFamilyData::internal_stats() {
  return internal_stats_.get();
}

198 199 200 201 202 203 204
void ColumnFamilyData::SetCurrent(Version* current) {
  current_ = current;
  need_slowdown_for_num_level0_files_ =
      (options_.level0_slowdown_writes_trigger >= 0 &&
       current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger);
}

205
void ColumnFamilyData::CreateNewMemtable() {
206 207 208
  assert(current_ != nullptr);
  if (mem_ != nullptr) {
    delete mem_->Unref();
209
  }
210
  mem_ = new MemTable(internal_comparator_, options_);
211 212 213
  mem_->Ref();
}

214 215 216 217 218 219 220 221 222 223 224 225
Compaction* ColumnFamilyData::PickCompaction() {
  return compaction_picker_->PickCompaction(current_);
}

Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
                                           const InternalKey* begin,
                                           const InternalKey* end,
                                           InternalKey** compaction_end) {
  return compaction_picker_->CompactRange(current_, input_level, output_level,
                                          begin, end, compaction_end);
}

226 227 228 229 230 231 232 233 234 235 236
SuperVersion* ColumnFamilyData::InstallSuperVersion(
    SuperVersion* new_superversion) {
  new_superversion->Init(mem_, imm_.current(), current_);
  SuperVersion* old_superversion = super_version_;
  super_version_ = new_superversion;
  ++super_version_number_;
  if (old_superversion != nullptr && old_superversion->Unref()) {
    old_superversion->Cleanup();
    return old_superversion;  // will let caller delete outside of mutex
  }
  return nullptr;
I
Igor Canadi 已提交
237 238
}

I
Igor Canadi 已提交
239
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
I
Igor Canadi 已提交
240
                                 const DBOptions* db_options,
I
Igor Canadi 已提交
241 242
                                 const EnvOptions& storage_options,
                                 Cache* table_cache)
243
    : max_column_family_(0),
I
Igor Canadi 已提交
244
      dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr,
245
                                      ColumnFamilyOptions(), db_options,
I
Igor Canadi 已提交
246 247 248 249
                                      storage_options_)),
      db_name_(dbname),
      db_options_(db_options),
      storage_options_(storage_options),
250 251
      table_cache_(table_cache),
      spin_lock_(ATOMIC_FLAG_INIT) {
252 253 254 255
  // initialize linked list
  dummy_cfd_->prev_.store(dummy_cfd_);
  dummy_cfd_->next_.store(dummy_cfd_);
}
I
Igor Canadi 已提交
256 257 258 259 260 261 262 263

ColumnFamilySet::~ColumnFamilySet() {
  for (auto& cfd : column_family_data_) {
    delete cfd.second;
  }
  for (auto& cfd : droppped_column_families_) {
    delete cfd;
  }
264
  delete dummy_cfd_;
I
Igor Canadi 已提交
265 266 267
}

ColumnFamilyData* ColumnFamilySet::GetDefault() const {
268 269 270 271
  auto cfd = GetColumnFamily(0);
  // default column family should always exist
  assert(cfd != nullptr);
  return cfd;
I
Igor Canadi 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
}

ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
  auto cfd_iter = column_family_data_.find(id);
  if (cfd_iter != column_family_data_.end()) {
    return cfd_iter->second;
  } else {
    return nullptr;
  }
}

bool ColumnFamilySet::Exists(uint32_t id) {
  return column_family_data_.find(id) != column_family_data_.end();
}

bool ColumnFamilySet::Exists(const std::string& name) {
  return column_families_.find(name) != column_families_.end();
}

uint32_t ColumnFamilySet::GetID(const std::string& name) {
  auto cfd_iter = column_families_.find(name);
  assert(cfd_iter != column_families_.end());
  return cfd_iter->second;
}

uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
  return ++max_column_family_;
}

ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
    const std::string& name, uint32_t id, Version* dummy_versions,
    const ColumnFamilyOptions& options) {
  assert(column_families_.find(name) == column_families_.end());
  column_families_.insert({name, id});
  ColumnFamilyData* new_cfd =
I
Igor Canadi 已提交
307 308
      new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_,
                           options, db_options_, storage_options_);
I
Igor Canadi 已提交
309 310
  column_family_data_.insert({id, new_cfd});
  max_column_family_ = std::max(max_column_family_, id);
311 312 313 314 315 316
  // add to linked list
  new_cfd->next_.store(dummy_cfd_);
  auto prev = dummy_cfd_->prev_.load();
  new_cfd->prev_.store(prev);
  prev->next_.store(new_cfd);
  dummy_cfd_->prev_.store(new_cfd);
I
Igor Canadi 已提交
317 318 319 320
  return new_cfd;
}

void ColumnFamilySet::DropColumnFamily(uint32_t id) {
321 322 323 324 325 326 327 328 329 330 331 332 333
  assert(id != 0);
  auto cfd_iter = column_family_data_.find(id);
  assert(cfd_iter != column_family_data_.end());
  auto cfd = cfd_iter->second;
  column_families_.erase(cfd->GetName());
  cfd->current()->Unref();
  droppped_column_families_.push_back(cfd);
  column_family_data_.erase(cfd_iter);
  // remove from linked list
  auto prev = cfd->prev_.load();
  auto next = cfd->next_.load();
  prev->next_.store(next);
  next->prev_.store(prev);
I
Igor Canadi 已提交
334 335
}

336 337 338 339 340 341 342 343
void ColumnFamilySet::Lock() {
  // spin lock
  while (spin_lock_.test_and_set(std::memory_order_acquire)) {
  }
}

void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }

344 345 346 347 348
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
  current_ = column_family_set_->GetColumnFamily(column_family_id);
  handle_.id = column_family_id;
  return current_ != nullptr;
}
349

350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
  assert(current_ != nullptr);
  return current_->GetLogNumber();
}

MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
  assert(current_ != nullptr);
  return current_->mem();
}

const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const {
  assert(current_ != nullptr);
  return current_->full_options();
}

const ColumnFamilyHandle& ColumnFamilyMemTablesImpl::GetColumnFamilyHandle()
    const {
  assert(current_ != nullptr);
  return handle_;
369 370
}

I
Igor Canadi 已提交
371
}  // namespace rocksdb