提交 fb2346fc 编写于 作者: I Igor Canadi

[CF] Code cleanup part 1

Summary:
I'm cleaning up some code preparing for the big diff review tomorrow. This is the first part of the cleanup.

Changes are mostly cosmetic. The goal is to decrease amount of code difference between columnfamilies and master branch.

This diff also fixes race condition when dropping column family.

Test Plan: Ran db_stress with variety of parameters

Reviewers: dhruba, haobo

Differential Revision: https://reviews.facebook.net/D16833
上级 56ca8338
......@@ -4,6 +4,9 @@
* By default, max_background_flushes is 1 and flush process is
removed from background compaction process. Flush process is now always
executed in high priority thread pool.
* Column family support
* If you write something to the non-default column family with disableWAL = true,
you need to Flush the column family before exiting if you want your data to be persistent
## Unreleased (will be relased in 2.8)
......
......@@ -185,9 +185,8 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
dropped_(false),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_,
options)),
full_options_(*db_options, options_),
options_(*db_options, SanitizeOptions(&internal_comparator_,
&internal_filter_policy_, options)),
mem_(nullptr),
imm_(options.min_write_buffer_number_to_merge),
super_version_(nullptr),
......@@ -205,18 +204,19 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
db_options->statistics.get()));
table_cache_.reset(
new TableCache(dbname, &full_options_, storage_options, table_cache));
new TableCache(dbname, &options_, storage_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(new UniversalCompactionPicker(
&options_, &internal_comparator_, db_options->info_log.get()));
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
} else {
compaction_picker_.reset(new LevelCompactionPicker(
&options_, &internal_comparator_, db_options->info_log.get()));
compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_));
}
Log(full_options_.info_log, "Options for column family \"%s\":\n",
Log(options_.info_log, "Options for column family \"%s\":\n",
name.c_str());
options_.Dump(full_options_.info_log.get());
const ColumnFamilyOptions* cf_options = &options_;
cf_options->Dump(options_.info_log.get());
}
}
......@@ -232,14 +232,27 @@ ColumnFamilyData::~ColumnFamilyData() {
// it's nullptr for dummy CFD
if (column_family_set_ != nullptr) {
// remove from column_family_set
column_family_set_->DropColumnFamily(this);
column_family_set_->RemoveColumnFamily(this);
}
if (current_ != nullptr) {
current_->Unref();
}
DeleteSuperVersion();
if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
super_version_ = nullptr;
}
if (dummy_versions_ != nullptr) {
// List must be empty
......@@ -257,10 +270,6 @@ ColumnFamilyData::~ColumnFamilyData() {
}
}
InternalStats* ColumnFamilyData::internal_stats() {
return internal_stats_.get();
}
void ColumnFamilyData::SetCurrent(Version* current) {
current_ = current;
need_slowdown_for_num_level0_files_ =
......@@ -320,23 +329,6 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() {
}
}
void ColumnFamilyData::DeleteSuperVersion() {
if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
super_version_ = nullptr;
}
}
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& storage_options,
......@@ -345,6 +337,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr,
ColumnFamilyOptions(), db_options,
storage_options_, nullptr)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
storage_options_(storage_options),
......@@ -367,10 +360,8 @@ ColumnFamilySet::~ColumnFamilySet() {
}
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
auto cfd = GetColumnFamily(0);
// default column family should always exist
assert(cfd != nullptr);
return cfd;
assert(default_cfd_cache_ != nullptr);
return default_cfd_cache_;
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
......@@ -385,24 +376,13 @@ ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
const {
auto cfd_iter = column_families_.find(name);
if (cfd_iter == column_families_.end()) {
if (cfd_iter != column_families_.end()) {
auto cfd = GetColumnFamily(cfd_iter->second);
assert(cfd != nullptr);
return cfd;
} else {
return nullptr;
}
return GetColumnFamily(cfd_iter->second);
}
bool ColumnFamilySet::Exists(uint32_t id) {
return column_family_data_.find(id) != column_family_data_.end();
}
bool ColumnFamilySet::Exists(const std::string& name) {
return column_families_.find(name) != column_families_.end();
}
uint32_t ColumnFamilySet::GetID(const std::string& name) {
auto cfd_iter = column_families_.find(name);
assert(cfd_iter != column_families_.end());
return cfd_iter->second;
}
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
......@@ -434,11 +414,22 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
new_cfd->prev_ = prev;
prev->next_ = new_cfd;
dummy_cfd_->prev_ = new_cfd;
if (id == 0) {
default_cfd_cache_ = new_cfd;
}
return new_cfd;
}
void ColumnFamilySet::Lock() {
// spin lock
while (spin_lock_.test_and_set(std::memory_order_acquire)) {
}
}
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
// under a DB mutex
void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) {
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());
assert(cfd_iter != column_family_data_.end());
Lock();
......@@ -447,19 +438,16 @@ void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) {
Unlock();
}
void ColumnFamilySet::Lock() {
// spin lock
while (spin_lock_.test_and_set(std::memory_order_acquire)) {
}
}
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
// maybe outside of db mutex, should lock
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id);
column_family_set_->Unlock();
if (column_family_id == 0) {
// optimization for common case
current_ = column_family_set_->GetDefault();
} else {
// maybe outside of db mutex, should lock
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id);
column_family_set_->Unlock();
}
handle_.SetCFD(current_);
return current_ != nullptr;
}
......@@ -474,9 +462,9 @@ MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
return current_->mem();
}
const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const {
const Options* ColumnFamilyMemTablesImpl::GetOptions() const {
assert(current_ != nullptr);
return current_->full_options();
return current_->options();
}
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
......
......@@ -15,6 +15,7 @@
#include <atomic>
#include "rocksdb/options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "db/memtable_list.h"
#include "db/write_batch_internal.h"
......@@ -35,6 +36,9 @@ class ColumnFamilyData;
class DBImpl;
class LogBuffer;
// ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client
// is done using the column family
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public:
// create while holding the mutex
......@@ -51,7 +55,12 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
port::Mutex* mutex_;
};
// does not ref-count cfd_
// Does not ref-count ColumnFamilyData
// We use this dummy ColumnFamilyHandleImpl because sometimes MemTableInserter
// calls DBImpl methods. When this happens, MemTableInserter need access to
// ColumnFamilyHandle (same as the client would need). In that case, we feed
// MemTableInserter dummy ColumnFamilyHandle and enable it to call DBImpl
// methods
class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
public:
ColumnFamilyHandleInternal()
......@@ -110,15 +119,18 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
class ColumnFamilySet;
// column family metadata. not thread-safe. should be protected by db_mutex
// This class keeps all the data that a column family needs. It's mosly dumb and
// used just to provide access to metadata.
// Most methods require DB mutex held, unless otherwise noted
class ColumnFamilyData {
public:
~ColumnFamilyData();
// thread-safe
uint32_t GetID() const { return id_; }
const std::string& GetName() { return name_; }
// thread-safe
const std::string& GetName() const { return name_; }
// DB mutex held for all these
void Ref() { ++refs_; }
// will just decrease reference count to 0, but will not delete it. returns
// true if the ref count was decreased to zero and needs to be cleaned up by
......@@ -127,24 +139,37 @@ class ColumnFamilyData {
assert(refs_ > 0);
return --refs_ == 0;
}
bool Dead() { return refs_ == 0; }
// SetDropped() and IsDropped() are thread-safe
// This can only be called from single-threaded VersionSet::LogAndApply()
// After dropping column family no other operation on that column family
// will be executed. All the files and memory will be, however, kept around
// until client drops the column family handle. That way, client can still
// access data from dropped column family.
// Column family can be dropped and still alive. In that state:
// *) Column family is not included in the iteration.
// *) Compaction and flush is not executed on the dropped column family.
// *) Client can continue writing and reading from column family. However, all
// writes stay in the current memtable.
// When the dropped column family is unreferenced, then we:
// *) delete all memory associated with that column family
// *) delete all the files associated with that column family
void SetDropped() {
// can't drop default CF
assert(id_ != 0);
dropped_.store(true);
dropped_ = true;
}
bool IsDropped() const { return dropped_.load(); }
bool IsDropped() const { return dropped_; }
// thread-safe
int NumberLevels() const { return options_.num_levels; }
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }
const ColumnFamilyOptions* options() const { return &options_; }
const Options* full_options() const { return &full_options_; }
InternalStats* internal_stats();
// thread-safe
const Options* options() const { return &options_; }
InternalStats* internal_stats() { return internal_stats_.get(); }
MemTableList* imm() { return &imm_; }
MemTable* mem() { return mem_; }
......@@ -154,7 +179,7 @@ class ColumnFamilyData {
void SetCurrent(Version* current);
void CreateNewMemtable();
TableCache* table_cache() const { return table_cache_.get(); }
TableCache* table_cache() { return table_cache_.get(); }
// See documentation in compaction_picker.h
Compaction* PickCompaction(LogBuffer* log_buffer);
......@@ -162,18 +187,20 @@ class ColumnFamilyData {
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
CompactionPicker* compaction_picker() const {
return compaction_picker_.get();
}
CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
// thread-safe
const InternalKeyComparator& internal_comparator() const {
return internal_comparator_;
}
SuperVersion* GetSuperVersion() const { return super_version_; }
SuperVersion* GetSuperVersion() { return super_version_; }
// thread-safe
ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); }
// thread-safe
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
}
......@@ -185,9 +212,6 @@ class ColumnFamilyData {
port::Mutex* db_mutex);
void ResetThreadLocalSuperVersions();
// REQUIRED: db mutex held
// Do not access column family after calling this method
void DeleteSuperVersion();
// A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files.
......@@ -204,21 +228,18 @@ class ColumnFamilyData {
const EnvOptions& storage_options,
ColumnFamilySet* column_family_set);
ColumnFamilyData* next() { return next_; }
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions->prev_
int refs_; // outstanding references to ColumnFamilyData
std::atomic<bool> dropped_; // true if client dropped it
bool dropped_; // true if client dropped it
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
ColumnFamilyOptions const options_;
Options const full_options_;
Options const options_;
std::unique_ptr<TableCache> table_cache_;
......@@ -258,19 +279,33 @@ class ColumnFamilyData {
ColumnFamilySet* column_family_set_;
};
// Thread safe only for reading without a writer. All access should be
// locked when adding or dropping column family
// ColumnFamilySet has interesting thread-safety requirements
// * CreateColumnFamily() or RemoveColumnFamily() -- need to protect by DB
// mutex. Inside, column_family_data_ and column_families_ will be protected
// by Lock() and Unlock(). CreateColumnFamily() should ONLY be called from
// VersionSet::LogAndApply() in the normal runtime. It is also called
// during Recovery and in DumpManifest(). RemoveColumnFamily() is called
// from ColumnFamilyData destructor
// * Iteration -- hold DB mutex, but you can release it in the body of
// iteration. If you release DB mutex in body, reference the column
// family before the mutex and unreference after you unlock, since the column
// family might get dropped when the DB mutex is released
// * GetDefault() -- thread safe
// * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock()
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() --
// inside of DB mutex
class ColumnFamilySet {
public:
// ColumnFamilySet supports iteration
class iterator {
public:
explicit iterator(ColumnFamilyData* cfd)
: current_(cfd) {}
iterator& operator++() {
// dummy is never dead, so this will never be infinite
// dummy is never dead or dropped, so this will never be infinite
do {
current_ = current_->next();
} while (current_->Dead());
current_ = current_->next_;
} while (current_->refs_ == 0 || current_->IsDropped());
return *this;
}
bool operator!=(const iterator& other) {
......@@ -290,9 +325,6 @@ class ColumnFamilySet {
// GetColumnFamily() calls return nullptr if column family is not found
ColumnFamilyData* GetColumnFamily(uint32_t id) const;
ColumnFamilyData* GetColumnFamily(const std::string& name) const;
bool Exists(uint32_t id);
bool Exists(const std::string& name);
uint32_t GetID(const std::string& name);
// this call will return the next available column family ID. it guarantees
// that there is no column family with id greater than or equal to the
// returned value in the current running instance or anytime in RocksDB
......@@ -304,34 +336,33 @@ class ColumnFamilySet {
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
void DropColumnFamily(ColumnFamilyData* cfd);
iterator begin() { return iterator(dummy_cfd_->next()); }
iterator begin() { return iterator(dummy_cfd_->next_); }
iterator end() { return iterator(dummy_cfd_); }
// ColumnFamilySet has interesting thread-safety requirements
// * CreateColumnFamily() or DropColumnFamily() -- need to protect by DB
// mutex. Inside, column_family_data_ and column_families_ will be protected
// by Lock() and Unlock()
// * Iterate -- hold DB mutex, but you can release it in the body of
// iteration. If you release DB mutex in body, reference the column
// family before the mutex and unreference after you unlock, since the column
// family might get dropped when you release the DB mutex.
// * GetDefault(), GetColumnFamily(), Exists(), GetID() -- either inside of DB
// mutex or call Lock()
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() --
// inside of DB mutex
void Lock();
void Unlock();
private:
// when mutating: 1. DB mutex locked first, 2. spinlock locked second
// when reading, either: 1. lock DB mutex, or 2. lock spinlock
friend class ColumnFamilyData;
// helper function that gets called from cfd destructor
// REQUIRES: DB mutex held
void RemoveColumnFamily(ColumnFamilyData* cfd);
// column_families_ and column_family_data_ need to be protected:
// * when mutating: 1. DB mutex locked first, 2. spinlock locked second
// * when reading, either: 1. lock DB mutex, or 2. lock spinlock
// (if both, respect the ordering to avoid deadlock!)
std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_;
// We don't hold the refcount here, since default column family always exists
// We are also not responsible for cleaning up default_cfd_cache_. This is
// just a cache that makes common case (accessing default column family)
// faster
ColumnFamilyData* default_cfd_cache_;
const std::string db_name_;
const DBOptions* const db_options_;
......@@ -340,6 +371,8 @@ class ColumnFamilySet {
std::atomic_flag spin_lock_;
};
// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
// memtables of different column families (specified by ID in the write batch)
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
......@@ -357,7 +390,7 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
// Returns options for selected column family
// REQUIRES: Seek() called first
virtual const Options* GetFullOptions() const override;
virtual const Options* GetOptions() const override;
// Returns column family handle for the selected column family
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
......
......@@ -797,7 +797,7 @@ std::string IterStatus(Iterator* iter) {
}
return result;
}
} // namespace anonymous
} // anonymous namespace
TEST(ColumnFamilyTest, NewIteratorsTest) {
// iter == 0 -- no tailing
......
......@@ -42,11 +42,9 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
} // anonymous namespace
CompactionPicker::CompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp,
Logger* logger)
CompactionPicker::CompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: compactions_in_progress_(options->num_levels),
logger_(logger),
options_(options),
num_levels_(options->num_levels),
icmp_(icmp) {
......@@ -272,7 +270,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
&c->parent_index_);
if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(logger_,
Log(options_->info_log,
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
"\n",
(unsigned long)level, (unsigned long)(c->inputs_[0].size()),
......@@ -343,7 +341,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
c->inputs_[0] = inputs;
if (ExpandWhileOverlapping(c) == false) {
delete c;
Log(logger_, "Could not compact due to expansion failure.\n");
Log(options_->info_log, "Could not compact due to expansion failure.\n");
return nullptr;
}
......@@ -514,7 +512,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
}
//if (i > Version::number_of_files_to_sort_) {
// Log(logger_, "XXX Looking at index %d", i);
// Log(options_->info_log, "XXX Looking at index %d", i);
//}
// Do not pick this file if its parents at level+1 are being compacted.
......@@ -610,6 +608,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
c->bottommost_level_ = true;
}
// update statistics
MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION,
c->inputs_[0].size());
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
......
......@@ -26,8 +26,7 @@ class Version;
class CompactionPicker {
public:
CompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp, Logger* logger);
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
// Pick level and inputs for a new compaction.
......@@ -119,8 +118,7 @@ class CompactionPicker {
// Per-level max bytes
std::unique_ptr<uint64_t[]> level_max_bytes_;
Logger* logger_;
const ColumnFamilyOptions* const options_;
const Options* const options_;
private:
int num_levels_;
......@@ -130,9 +128,9 @@ class CompactionPicker {
class UniversalCompactionPicker : public CompactionPicker {
public:
UniversalCompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp, Logger* logger)
: CompactionPicker(options, icmp, logger) {}
UniversalCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
......@@ -150,9 +148,9 @@ class UniversalCompactionPicker : public CompactionPicker {
class LevelCompactionPicker : public CompactionPicker {
public:
LevelCompactionPicker(const ColumnFamilyOptions* options,
const InternalKeyComparator* icmp, Logger* logger)
: CompactionPicker(options, icmp, logger) {}
LevelCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
......
......@@ -276,26 +276,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
}
DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
if (flush_on_destroy_) {
autovector<ColumnFamilyData*> to_delete;
for (auto cfd : *versions_->GetColumnFamilySet()) {
// TODO(icanadi) do this in ColumnFamilyData destructor
if (!cfd->IsDropped() && cfd->mem()->GetFirstSequenceNumber() != 0) {
cfd->Ref();
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions());
mutex_.Lock();
if (cfd->Unref()) {
to_delete.push_back(cfd);
}
}
}
for (auto cfd : to_delete) {
delete cfd;
// only the default CFD is alive at this point
if (default_cf_handle_ != nullptr) {
auto default_cfd = default_cf_handle_->cfd();
if (flush_on_destroy_ &&
default_cfd->mem()->GetFirstSequenceNumber() != 0) {
FlushMemTable(default_cfd, FlushOptions());
}
}
mutex_.Lock();
// Wait for background work to finish
shutting_down_.Release_Store(this); // Any non-nullptr value is ok
while (bg_compaction_scheduled_ ||
bg_flush_scheduled_ ||
......@@ -303,8 +294,11 @@ DBImpl::~DBImpl() {
bg_cv_.Wait();
}
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->DeleteSuperVersion();
if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
mutex_.Unlock();
delete default_cf_handle_;
mutex_.Lock();
}
if (options_.allow_thread_local) {
......@@ -328,21 +322,13 @@ DBImpl::~DBImpl() {
}
}
mutex_.Unlock();
if (default_cf_handle_ != nullptr) {
// we need to delete handle outside of lock because it does its own locking
delete default_cf_handle_;
}
if (db_lock_ != nullptr) {
env_->UnlockFile(db_lock_);
}
mutex_.Lock();
// versions need to be destroyed before table_cache since it can hold
// references to table_cache.
versions_.reset();
mutex_.Unlock();
if (db_lock_ != nullptr) {
env_->UnlockFile(db_lock_);
}
LogFlush(options_.info_log);
}
......@@ -876,14 +862,8 @@ Status DBImpl::Recover(
versions_->MarkFileNumberUsed(log);
s = RecoverLogFile(log, &max_sequence, read_only);
}
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
versions_->LastSequence());
}
SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER,
versions_->LastSequence());
}
return s;
......@@ -1029,9 +1009,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// we must mark the next log number as used, even though it's
// not actually used. that is because VersionSet assumes
// VersionSet::next_file_number_ always to be strictly greater than any
// log
// number
// log number
versions_->MarkFileNumberUsed(log_number + 1);
if (versions_->LastSequence() < *max_sequence) {
versions_->SetLastSequence(*max_sequence);
}
status = versions_->LogAndApply(cfd, edit, &mutex_);
if (!status.ok()) {
return status;
......@@ -1059,10 +1041,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_,
s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->full_options()));
GetCompressionFlush(*cfd->options()));
LogFlush(options_.info_log);
mutex_.Lock();
}
......@@ -1124,10 +1106,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
Log(options_.info_log, "Level-0 flush table #%lu: started",
(unsigned long)meta.number);
s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_,
s = BuildTable(dbname_, env_, *cfd->options(), storage_options_,
cfd->table_cache(), iter, &meta, cfd->internal_comparator(),
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(*cfd->full_options()));
GetCompressionFlush(*cfd->options()));
LogFlush(options_.info_log);
delete iter;
Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s",
......@@ -1758,7 +1740,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bool is_flush_pending = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->imm()->IsFlushPending()) {
if (cfd->imm()->IsFlushPending()) {
is_flush_pending = true;
}
}
......@@ -1775,7 +1757,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bool is_compaction_needed = false;
// no need to refcount since we're under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->current()->NeedsCompaction()) {
if (cfd->current()->NeedsCompaction()) {
is_compaction_needed = true;
break;
}
......@@ -1813,9 +1795,6 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
autovector<ColumnFamilyData*> to_delete;
// refcounting in iteration
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
Status flush_status;
while (flush_status.ok() && cfd->imm()->IsFlushPending()) {
......@@ -1988,7 +1967,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else {
// no need to refcount in iteration since it's always under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->options()->disable_auto_compactions && !cfd->IsDropped()) {
if (!cfd->options()->disable_auto_compactions) {
c.reset(cfd->PickCompaction(log_buffer));
if (c != nullptr) {
// update statistics
......@@ -2166,12 +2145,12 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
1.1 * cfd->compaction_picker()->MaxFileSizeForLevel(
compact->compaction->output_level()));
CompressionType compression_type = GetCompressionType(
*cfd->full_options(), compact->compaction->output_level(),
compact->compaction->enable_compression());
CompressionType compression_type =
GetCompressionType(*cfd->options(), compact->compaction->output_level(),
compact->compaction->enable_compression());
compact->builder.reset(
NewTableBuilder(*cfd->full_options(), cfd->internal_comparator(),
NewTableBuilder(*cfd->options(), cfd->internal_comparator(),
compact->outfile.get(), compression_type));
}
LogFlush(options_.info_log);
......@@ -2788,8 +2767,8 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
Iterator* mutable_iter = super_version->mem->NewIterator(options);
// create a DBIter that only uses memtable content; see NewIterator()
mutable_iter =
NewDBIterator(&dbname_, env_, *cfd->full_options(),
cfd->user_comparator(), mutable_iter, kMaxSequenceNumber);
NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(),
mutable_iter, kMaxSequenceNumber);
std::vector<Iterator*> list;
super_version->imm->AddIterators(options, &list);
......@@ -2799,8 +2778,8 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
// create a DBIter that only uses memtable content; see NewIterator()
immutable_iter =
NewDBIterator(&dbname_, env_, *cfd->full_options(),
cfd->user_comparator(), immutable_iter, kMaxSequenceNumber);
NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(),
immutable_iter, kMaxSequenceNumber);
// register cleanups
mutable_iter->RegisterCleanup(CleanupIteratorState,
......@@ -2937,11 +2916,10 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->full_options())) {
if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (sv->imm->Get(lkey, value, &s, merge_context,
*cfd->full_options())) {
} else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else {
......@@ -2950,7 +2928,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
StartPerfTimer(&from_files_timer);
sv->current->Get(options, lkey, value, &s, &merge_context, &stats,
*cfd->full_options(), value_found);
*cfd->options(), value_found);
have_stat_update = true;
BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer);
RecordTick(options_.statistics.get(), MEMTABLE_MISS);
......@@ -3072,14 +3050,14 @@ std::vector<Status> DBImpl::MultiGet(
auto super_version = mgd->super_version;
auto cfd = mgd->cfd;
if (super_version->mem->Get(lkey, value, &s, merge_context,
*cfd->full_options())) {
*cfd->options())) {
// Done
} else if (super_version->imm->Get(lkey, value, &s, merge_context,
*cfd->full_options())) {
*cfd->options())) {
// Done
} else {
super_version->current->Get(options, lkey, value, &s, &merge_context,
&mgd->stats, *cfd->full_options());
&mgd->stats, *cfd->options());
mgd->have_stat_update = true;
}
......@@ -3134,7 +3112,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
*handle = nullptr;
MutexLock l(&mutex_);
if (versions_->GetColumnFamilySet()->Exists(column_family_name)) {
if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
nullptr) {
return Status::InvalidArgument("Column family already exists");
}
VersionEdit edit;
......@@ -3144,6 +3123,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(options.comparator->Name());
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
Status s = versions_->LogAndApply(nullptr, &edit, &mutex_,
db_directory_.get(), false, &options);
if (s.ok()) {
......@@ -3184,9 +3165,10 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
}
if (s.ok()) {
assert(cfd->IsDropped());
Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID());
// Flush the memtables. This will make all WAL files referencing dropped
// column family to be obsolete. They will be deleted when user deletes
// column family to be obsolete. They will be deleted once user deletes
// column family handle
Write(WriteOptions(), nullptr); // ignore error
} else {
......@@ -3237,7 +3219,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot;
iter = NewDBIterator(&dbname_, env_, *cfd->full_options(),
iter = NewDBIterator(&dbname_, env_, *cfd->options(),
cfd->user_comparator(), iter, snapshot);
}
......@@ -3292,7 +3274,7 @@ Status DBImpl::NewIterators(
: latest_snapshot;
auto iter = NewInternalIterator(options, cfd, super_versions[i]);
iter = NewDBIterator(&dbname_, env_, *cfd->full_options(),
iter = NewDBIterator(&dbname_, env_, *cfd->options(),
cfd->user_comparator(), iter, snapshot);
iterators->push_back(iter);
}
......@@ -3364,9 +3346,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
autovector<ColumnFamilyData*> to_delete;
// refcounting cfd in iteration
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
// May temporarily unlock and wait.
status = MakeRoomForWrite(cfd, my_batch == nullptr);
......@@ -3586,6 +3565,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
// Yield previous error
s = bg_error_;
break;
} else if (cfd->IsDropped()) {
break;
} else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
......@@ -3786,7 +3767,7 @@ Env* DBImpl::GetEnv() const {
const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
return *cfh->cfd()->full_options();
return *cfh->cfd()->options();
}
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
......@@ -4058,24 +4039,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
VersionEdit edit;
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
// We use this LogAndApply just to store the next file number, the one
// that we used by calling impl->versions_->NewFileNumber()
// The used log number are already written to manifest in RecoverLogFile()
// method
s = impl->versions_->LogAndApply(impl->default_cf_handle_->cfd(), &edit,
&impl->mutex_,
impl->db_directory_.get());
}
if (s.ok()) {
// set column family handles
for (auto cf : column_families) {
if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (cfd == nullptr) {
s = Status::InvalidArgument("Column family not found: ", cf.name);
break;
}
uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name);
auto cfd = impl->versions_->GetColumnFamilySet()->GetColumnFamily(id);
assert(cfd != nullptr);
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
}
......
......@@ -63,11 +63,11 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
MergeContext merge_context;
LookupKey lkey(key, snapshot);
if (super_version->mem->Get(lkey, value, &s, merge_context,
*cfd->full_options())) {
*cfd->options())) {
} else {
Version::GetStats stats;
super_version->current->Get(options, lkey, value, &s, &merge_context,
&stats, *cfd->full_options());
&stats, *cfd->options());
}
return s;
}
......@@ -80,8 +80,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options,
SequenceNumber latest_snapshot = versions_->LastSequence();
Iterator* internal_iter = NewInternalIterator(options, cfd, super_version);
return NewDBIterator(
&dbname_, env_, *cfd->full_options(), cfd->user_comparator(),
internal_iter,
&dbname_, env_, *cfd->options(), cfd->user_comparator(), internal_iter,
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
......
......@@ -29,7 +29,7 @@
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp,
const ColumnFamilyOptions& options)
const Options& options)
: comparator_(cmp),
refs_(0),
arena_(options.arena_block_size),
......
......@@ -39,7 +39,7 @@ class MemTable {
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator,
const ColumnFamilyOptions& options);
const Options& options);
~MemTable();
......
......@@ -108,7 +108,6 @@ class Repairer {
InternalKeyComparator const icmp_;
InternalFilterPolicy const ipolicy_;
Options const options_;
ColumnFamilyOptions const cf_options_;
std::shared_ptr<Cache> raw_table_cache_;
TableCache* table_cache_;
VersionEdit* edit_;
......
......@@ -249,7 +249,7 @@ bool Version::PrefixMayMatch(const ReadOptions& options,
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
auto table_cache = cfd_->table_cache();
auto options = cfd_->full_options();
auto options = cfd_->options();
for (int level = 0; level < num_levels_; level++) {
for (const auto& file_meta : files_[level]) {
auto fname = TableFileName(vset_->dbname_, file_meta->number);
......@@ -1491,11 +1491,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
assert(column_family_data != nullptr || edit->is_column_family_add_);
if (column_family_data != nullptr && column_family_data->IsDropped()) {
// if column family is dropped no need to write anything to the manifest
// (unless, of course, thit is the drop column family write)
return Status::OK();
}
if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family,
// so that we don't reuse existing ID
......@@ -1511,6 +1506,16 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (w.done) {
return w.status;
}
if (column_family_data != nullptr && column_family_data->IsDropped()) {
// if column family is dropped by the time we get here, no need to write
// anything to the manifest
manifest_writers_.pop_front();
// Notify new head of write queue
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return Status::OK();
}
std::vector<VersionEdit*> batch_edits;
Version* v = nullptr;
......@@ -2353,9 +2358,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
{
// Store column family info
VersionEdit edit;
......@@ -2401,19 +2403,18 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
}
}
// save max column family to avoid reusing the same column
// family ID for two different column families
if (column_family_set_->GetMaxColumnFamily() > 0) {
{
// persist max column family, last sequence and next file
VersionEdit edit;
edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
if (column_family_set_->GetMaxColumnFamily() > 0) {
edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
}
edit.SetLastSequence(last_sequence_);
edit.SetNextFile(next_file_number_);
std::string record;
edit.EncodeTo(&record);
Status s = log->AddRecord(record);
if (!s.ok()) {
return s;
}
return log->AddRecord(record);
}
return Status::OK();
}
// Opens the mainfest file and reads all records
......
......@@ -289,7 +289,7 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
const Options* options = cf_mems_->GetOptions();
if (!options->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value);
} else if (options->inplace_callback == nullptr) {
......@@ -344,7 +344,7 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
const Options* options = cf_mems_->GetOptions();
bool perform_merge = false;
if (options->max_successive_merges > 0 && db_ != nullptr) {
......@@ -413,7 +413,7 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
const Options* options = cf_mems_->GetFullOptions();
const Options* options = cf_mems_->GetOptions();
if (!dont_filter_deletes_ && options->filter_deletes) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
......
......@@ -26,7 +26,7 @@ class ColumnFamilyMemTables {
// been processed)
virtual uint64_t GetLogNumber() const = 0;
virtual MemTable* GetMemTable() const = 0;
virtual const Options* GetFullOptions() const = 0;
virtual const Options* GetOptions() const = 0;
virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
};
......@@ -47,7 +47,7 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
return mem_;
}
const Options* GetFullOptions() const override {
const Options* GetOptions() const override {
assert(ok_);
return options_;
}
......
......@@ -24,7 +24,7 @@ static std::string PrintContents(WriteBatch* b) {
auto factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = factory;
MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options));
MemTable* mem = new MemTable(cmp, options);
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);
......
......@@ -410,8 +410,7 @@ class MemTableConstructor: public Constructor {
table_factory_(new SkipListFactory) {
Options options;
options.memtable_factory = table_factory_;
memtable_ =
new MemTable(internal_comparator_, ColumnFamilyOptions(options));
memtable_ = new MemTable(internal_comparator_, options);
memtable_->Ref();
}
~MemTableConstructor() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册