提交 c65448f9 编写于 作者: Y Yueh-Hsuan Chiang

Merge branch 'master' of github.com:facebook/rocksdb into HEAD

......@@ -13,6 +13,10 @@ build_config.mk
*_bench
*_stress
*.out
*.class
*.jar
*.*jnilib*
*.d-e
ldb
manifest_dump
......@@ -23,3 +27,5 @@ coverage/COVERAGE_REPORT
.gdbhistory
.phutil_module_cache
tags
java/*.log
java/include/org_rocksdb_*.h
# Rocksdb Change Log
## Unreleased
## Unreleased (will be released in 3.0)
* Column family support
### Public API changes
## 2.8.0 (04/04/2014)
* Removed arena.h from public header files.
* By default, checksums are verified on every read from database
* Change default value of several options, including: paranoid_checks=true, max_open_files=5000, level0_slowdown_writes_trigger=20, level0_stop_writes_trigger=24, disable_seek_compaction=true, max_background_flushes=1 and allow_mmap_writes=false
* Added is_manual_compaction to CompactionFilter::Context
* Added "virtual void WaitForJoin()" in class Env. Default operation is no-op.
* Removed BackupEngine::DeleteBackupsNewerThan() function
......@@ -15,11 +19,18 @@
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
* Added a command "checkconsistency" in ldb tool, which checks
if file system state matches DB state (file existence and file sizes)
* Separate options related to block based table to a new struct BlockBasedTableOptions
* WriteBatch has a new function Count() to return total size in the batch, and Data() now returns a reference instead of a copy
* Add more counters to perf context.
* Supports several more DB properties: compaction-pending, background-errors and cur-size-active-mem-table.
### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files,
we will ignore it. We assume that writers of these records were interrupted
and that we can safely ignore it.
* A new SST format "PlainTable" is added, which is optimized for memory-only workloads. It can be created through NewPlainTableFactory() or NewTotalOrderPlainTableFactory().
* A new mem table implementation hash linked list optimizing for the case that there are only few keys for each prefix, which can be created through NewHashLinkListRepFactory().
* Merge operator supports a new function PartialMergeMulti() to allow users to do partial merges against multiple operands.
* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. The new interface uses a new structure CompactionFilterContext for the same purpose as CompactionFilter::Context in V1.
* Geo-spatial support for locations and radial-search.
......
......@@ -67,6 +67,9 @@ libraries. You are on your own.
* Please note that some of the optimizations/features are disabled in OSX.
We did not run any production workloads on it.
* **iOS**:
* Run: `TARGET_OS=IOS make static_lib`
## Compilation
`make clean; make` will compile librocksdb.a (RocksDB static library) and all
the unit tests. You can run all unit tests with `make check`.
......
......@@ -23,6 +23,14 @@ $(shell (export ROCKSDB_ROOT=$(CURDIR); $(CURDIR)/build_tools/build_detect_platf
# this file is generated by the previous line to set build flags and sources
include build_config.mk
ifneq ($(PLATFORM), IOS)
CFLAGS += -g
CXXFLAGS += -g
else
# no debug info for IOS, that will make our library big
OPT += -DNDEBUG
endif
# ASAN doesn't work well with jemalloc. If we're compiling with ASAN, we should use regular malloc.
ifdef COMPILE_WITH_ASAN
# ASAN compile flags
......@@ -37,8 +45,8 @@ else
endif
WARNING_FLAGS = -Wall -Werror -Wno-sign-compare
CFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual
CFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
CXXFLAGS += $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -Woverloaded-virtual
LDFLAGS += $(PLATFORM_LDFLAGS)
......@@ -57,6 +65,7 @@ TESTS = \
db_test \
block_hash_index_test \
autovector_test \
column_family_test \
table_properties_collector_test \
arena_test \
auto_roll_logger_test \
......@@ -148,11 +157,15 @@ $(SHARED3):
endif # PLATFORM_SHARED_EXT
.PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \
release tags valgrind_check whitebox_crash_test format shared_lib all \
release tags valgrind_check whitebox_crash_test format static_lib shared_lib all \
dbg
all: $(LIBRARY) $(PROGRAMS)
static_lib: $(LIBRARY)
shared_lib: $(SHARED)
dbg: $(LIBRARY) $(PROGRAMS)
# Will also generate shared libraries.
......@@ -218,8 +231,6 @@ tags:
format:
build_tools/format-diff.sh
shared_lib: $(SHARED)
# ---------------------------------------------------------------------------
# Unit tests and tools
# ---------------------------------------------------------------------------
......@@ -260,6 +271,9 @@ arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
autovector_test: util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
column_family_test: db/column_family_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/column_family_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
......@@ -404,7 +418,7 @@ ldb: tools/ldb.o $(LIBOBJECTS)
# ---------------------------------------------------------------------------
JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc ./java/rocksjni/options.cc ./java/rocksjni/write_batch.cc
JAVA_INCLUDE = -I/usr/lib/jvm/java-openjdk/include/ -I/usr/lib/jvm/java-openjdk/include/linux
JAVA_INCLUDE = -I/usr/lib/jvm/java-openjdk/include/ -I/usr/lib/jvm/java-openjdk/include/linux
ROCKSDBJNILIB = ./java/librocksdbjni.so
ifeq ($(PLATFORM), OS_MACOSX)
......@@ -435,20 +449,20 @@ ifeq ($(PLATFORM), IOS)
PLATFORMSROOT=/Applications/Xcode.app/Contents/Developer/Platforms
SIMULATORROOT=$(PLATFORMSROOT)/iPhoneSimulator.platform/Developer
DEVICEROOT=$(PLATFORMSROOT)/iPhoneOS.platform/Developer
IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/versionCFBundleShortVersionString)
IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBundleShortVersionString)
.cc.o:
mkdir -p ios-x86/$(dir $@)
$(SIMULATORROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@ $(COVERAGEFLAGS)
$(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@ $(COVERAGEFLAGS)
xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -arch armv7s -arch arm64 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
.c.o:
mkdir -p ios-x86/$(dir $@)
$(SIMULATORROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -c $< -o ios-x86/$@
$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
$(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -c $< -o ios-arm/$@
xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 -arch armv7s -arch arm64 -c $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
else
......
......@@ -87,7 +87,7 @@ PLATFORM_SHARED_CFLAGS="-fPIC"
PLATFORM_SHARED_VERSIONED=false
# generic port files (working on all platform by #ifdef) go directly in /port
GENERIC_PORT_FILES=`find $ROCKSDB_ROOT/port -name '*.cc' | tr "\n" " "`
GENERIC_PORT_FILES=`cd $ROCKSDB_ROOT; find port -name '*.cc' | tr "\n" " "`
# On GCC, we pick libc's memcmp over GCC's memcmp via -fno-builtin-memcmp
case "$TARGET_OS" in
......@@ -98,6 +98,13 @@ case "$TARGET_OS" in
PLATFORM_SHARED_LDFLAGS="-dynamiclib -install_name "
# PORT_FILES=port/darwin/darwin_specific.cc
;;
IOS)
PLATFORM=IOS
COMMON_FLAGS="$COMMON_FLAGS -DOS_MACOSX -DIOS_CROSS_COMPILE"
PLATFORM_SHARED_EXT=dylib
PLATFORM_SHARED_LDFLAGS="-dynamiclib -install_name "
CROSS_COMPILE=true
;;
Linux)
PLATFORM=OS_LINUX
COMMON_FLAGS="$COMMON_FLAGS -DOS_LINUX"
......
......@@ -25,12 +25,14 @@
#include "rocksdb/universal_compaction.h"
#include "rocksdb/statistics.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
using rocksdb::Cache;
using rocksdb::Comparator;
using rocksdb::CompressionType;
using rocksdb::DB;
using rocksdb::Env;
using rocksdb::InfoLogLevel;
using rocksdb::FileLock;
using rocksdb::FilterPolicy;
using rocksdb::FlushOptions;
......@@ -656,6 +658,11 @@ void rocksdb_options_set_info_log(rocksdb_options_t* opt, rocksdb_logger_t* l) {
}
}
void rocksdb_options_set_info_log_level(
rocksdb_options_t* opt, int v) {
opt->rep.info_log_level = static_cast<InfoLogLevel>(v);
}
void rocksdb_options_set_write_buffer_size(rocksdb_options_t* opt, size_t s) {
opt->rep.write_buffer_size = s;
}
......@@ -714,6 +721,14 @@ void rocksdb_options_set_max_grandparent_overlap_factor(
opt->rep.max_grandparent_overlap_factor = n;
}
void rocksdb_options_set_max_bytes_for_level_multiplier_additional(
rocksdb_options_t* opt, int* level_values, size_t num_levels) {
opt->rep.max_bytes_for_level_multiplier_additional.resize(num_levels);
for (size_t i = 0; i < num_levels; ++i) {
opt->rep.max_bytes_for_level_multiplier_additional[i] = level_values[i];
}
}
void rocksdb_options_enable_statistics(rocksdb_options_t* opt) {
opt->rep.statistics = rocksdb::CreateDBStatistics();
}
......@@ -857,6 +872,24 @@ void rocksdb_options_set_advise_random_on_open(
opt->rep.advise_random_on_open = v;
}
void rocksdb_options_set_access_hint_on_compaction_start(
rocksdb_options_t* opt, int v) {
switch(v) {
case 0:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::NONE;
break;
case 1:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::NORMAL;
break;
case 2:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::SEQUENTIAL;
break;
case 3:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::WILLNEED;
break;
}
}
void rocksdb_options_set_use_adaptive_mutex(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.use_adaptive_mutex = v;
......@@ -867,6 +900,11 @@ void rocksdb_options_set_bytes_per_sync(
opt->rep.bytes_per_sync = v;
}
void rocksdb_options_set_verify_checksums_in_compaction(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.verify_checksums_in_compaction = v;
}
void rocksdb_options_set_filter_deletes(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.filter_deletes = v;
......@@ -1003,11 +1041,48 @@ void rocksdb_options_set_hash_link_list_rep(
opt->rep.memtable_factory.reset(factory);
}
void rocksdb_options_set_plain_table_factory(
rocksdb_options_t *opt, uint32_t user_key_len, int bloom_bits_per_key,
double hash_table_ratio, size_t index_sparseness) {
static rocksdb::TableFactory* factory = 0;
if (!factory) {
factory = rocksdb::NewPlainTableFactory(
user_key_len, bloom_bits_per_key,
hash_table_ratio, index_sparseness);
}
opt->rep.table_factory.reset(factory);
}
void rocksdb_options_set_max_successive_merges(
rocksdb_options_t* opt, size_t v) {
opt->rep.max_successive_merges = v;
}
void rocksdb_options_set_min_partial_merge_operands(
rocksdb_options_t* opt, uint32_t v) {
opt->rep.min_partial_merge_operands = v;
}
void rocksdb_options_set_bloom_locality(
rocksdb_options_t* opt, uint32_t v) {
opt->rep.bloom_locality = v;
}
void rocksdb_options_set_allow_thread_local(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.allow_thread_local = v;
}
void rocksdb_options_set_inplace_update_support(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.inplace_update_support = v;
}
void rocksdb_options_set_inplace_update_num_locks(
rocksdb_options_t* opt, size_t v) {
opt->rep.inplace_update_num_locks = v;
}
void rocksdb_options_set_compaction_style(rocksdb_options_t *opt, int style) {
opt->rep.compaction_style = static_cast<rocksdb::CompactionStyle>(style);
}
......@@ -1022,21 +1097,14 @@ DB::OpenForReadOnly
DB::MultiGet
DB::KeyMayExist
DB::GetOptions
DB::GetLiveFiles
DB::GetSortedWalFiles
DB::GetLatestSequenceNumber
DB::GetUpdatesSince
DB::DeleteFile
DB::GetDbIdentity
DB::RunManualCompaction
custom cache
compaction_filter
max_bytes_for_level_multiplier_additional
access_hint_on_compaction_start
table_factory
table_properties_collectors
inplace_update_support
inplace_update_num_locks
*/
rocksdb_comparator_t* rocksdb_comparator_create(
......
......@@ -443,6 +443,7 @@ int main(int argc, char** argv) {
rocksdb_options_set_filter_policy(options, policy);
rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3));
rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4);
rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16);
db = rocksdb_open(options, dbname, &err);
CheckNoError(err);
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/column_family.h"
#include <vector>
#include <string>
#include <algorithm>
#include "db/db_impl.h"
#include "db/version_set.h"
#include "db/internal_stats.h"
#include "db/compaction_picker.h"
#include "db/table_properties_collector.h"
#include "util/autovector.h"
#include "util/hash_skiplist_rep.h"
namespace rocksdb {
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd,
DBImpl* db, port::Mutex* mutex)
: cfd_(cfd), db_(db), mutex_(mutex) {
if (cfd_ != nullptr) {
cfd_->Ref();
}
}
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
if (cfd_ != nullptr) {
DBImpl::DeletionState deletion_state;
mutex_->Lock();
if (cfd_->Unref()) {
delete cfd_;
}
db_->FindObsoleteFiles(deletion_state, false, true);
mutex_->Unlock();
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
}
}
}
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
namespace {
// Fix user-supplied options to be reasonable
template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
}
} // anonymous namespace
ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
ClipToRange(&result.write_buffer_size,
((size_t)64) << 10, ((size_t)64) << 30);
// if user sets arena_block_size, we trust user to use this value. Otherwise,
// calculate a proper value from writer_buffer_size;
if (result.arena_block_size <= 0) {
result.arena_block_size = result.write_buffer_size / 10;
}
result.min_write_buffer_number_to_merge =
std::min(result.min_write_buffer_number_to_merge,
result.max_write_buffer_number - 1);
if (result.block_cache == nullptr && !result.no_block_cache) {
result.block_cache = NewLRUCache(8 << 20);
}
result.compression_per_level = src.compression_per_level;
if (result.block_size_deviation < 0 || result.block_size_deviation > 100) {
result.block_size_deviation = 0;
}
if (result.max_mem_compaction_level >= result.num_levels) {
result.max_mem_compaction_level = result.num_levels - 1;
}
if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit;
}
if (!result.prefix_extractor) {
assert(result.memtable_factory);
Slice name = result.memtable_factory->Name();
if (name.compare("HashSkipListRepFactory") == 0 ||
name.compare("HashLinkListRepFactory") == 0) {
result.memtable_factory = std::make_shared<SkipListFactory>();
}
}
// -- Sanitize the table properties collector
// All user defined properties collectors will be wrapped by
// UserKeyTablePropertiesCollector since for them they only have the
// knowledge of the user keys; internal keys are invisible to them.
auto& collectors = result.table_properties_collectors;
for (size_t i = 0; i < result.table_properties_collectors.size(); ++i) {
assert(collectors[i]);
collectors[i] =
std::make_shared<UserKeyTablePropertiesCollector>(collectors[i]);
}
// Add collector to collect internal key statistics
collectors.push_back(std::make_shared<InternalKeyPropertiesCollector>());
return result;
}
int SuperVersion::dummy = 0;
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
void* const SuperVersion::kSVObsolete = nullptr;
SuperVersion::~SuperVersion() {
for (auto td : to_delete) {
delete td;
}
}
SuperVersion* SuperVersion::Ref() {
refs.fetch_add(1, std::memory_order_relaxed);
return this;
}
bool SuperVersion::Unref() {
// fetch_sub returns the previous value of ref
uint32_t previous_refs = refs.fetch_sub(1, std::memory_order_relaxed);
assert(previous_refs > 0);
return previous_refs == 1;
}
void SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
imm->Unref(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
to_delete.push_back(m);
}
current->Unref();
}
void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current) {
mem = new_mem;
imm = new_imm;
current = new_current;
mem->Ref();
imm->Ref();
current->Ref();
refs.store(1, std::memory_order_relaxed);
}
namespace {
void SuperVersionUnrefHandle(void* ptr) {
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
// destroyed. When former happens, the thread shouldn't see kSVInUse.
// When latter happens, we are in ~ColumnFamilyData(), no get should happen as
// well.
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->db_mutex->Lock();
sv->Cleanup();
sv->db_mutex->Unlock();
delete sv;
}
}
} // anonymous namespace
ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
const std::string& name,
Version* dummy_versions, Cache* table_cache,
const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& storage_options,
ColumnFamilySet* column_family_set)
: id_(id),
name_(name),
dummy_versions_(dummy_versions),
current_(nullptr),
refs_(0),
dropped_(false),
internal_comparator_(options.comparator),
internal_filter_policy_(options.filter_policy),
options_(*db_options, SanitizeOptions(&internal_comparator_,
&internal_filter_policy_, options)),
mem_(nullptr),
imm_(options.min_write_buffer_number_to_merge),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
next_(nullptr),
prev_(nullptr),
log_number_(0),
need_slowdown_for_num_level0_files_(false),
column_family_set_(column_family_set) {
Ref();
// if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) {
internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
db_options->statistics.get()));
table_cache_.reset(
new TableCache(dbname, &options_, storage_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
} else {
compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_));
}
Log(options_.info_log, "Options for column family \"%s\":\n",
name.c_str());
const ColumnFamilyOptions* cf_options = &options_;
cf_options->Dump(options_.info_log.get());
}
}
// DB mutex held
ColumnFamilyData::~ColumnFamilyData() {
assert(refs_ == 0);
// remove from linked list
auto prev = prev_;
auto next = next_;
prev->next_ = next;
next->prev_ = prev;
// it's nullptr for dummy CFD
if (column_family_set_ != nullptr) {
// remove from column_family_set
column_family_set_->RemoveColumnFamily(this);
}
if (current_ != nullptr) {
current_->Unref();
}
if (super_version_ != nullptr) {
// Release SuperVersion reference kept in ThreadLocalPtr.
// This must be done outside of mutex_ since unref handler can lock mutex.
super_version_->db_mutex->Unlock();
local_sv_.reset();
super_version_->db_mutex->Lock();
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
super_version_ = nullptr;
}
if (dummy_versions_ != nullptr) {
// List must be empty
assert(dummy_versions_->next_ == dummy_versions_);
delete dummy_versions_;
}
if (mem_ != nullptr) {
delete mem_->Unref();
}
autovector<MemTable*> to_delete;
imm_.current()->Unref(&to_delete);
for (MemTable* m : to_delete) {
delete m;
}
}
void ColumnFamilyData::SetCurrent(Version* current) {
current_ = current;
need_slowdown_for_num_level0_files_ =
(options_.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >= options_.level0_slowdown_writes_trigger);
}
void ColumnFamilyData::CreateNewMemtable() {
assert(current_ != nullptr);
if (mem_ != nullptr) {
delete mem_->Unref();
}
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
}
Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
return compaction_picker_->PickCompaction(current_, log_buffer);
}
Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(current_, input_level, output_level,
begin, end, compaction_end);
}
SuperVersion* ColumnFamilyData::InstallSuperVersion(
SuperVersion* new_superversion, port::Mutex* db_mutex) {
new_superversion->db_mutex = db_mutex;
new_superversion->Init(mem_, imm_.current(), current_);
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
}
return nullptr;
}
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
autovector<void*> sv_ptrs;
local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
for (auto ptr : sv_ptrs) {
assert(ptr);
if (ptr == SuperVersion::kSVInUse) {
continue;
}
auto sv = static_cast<SuperVersion*>(ptr);
if (sv->Unref()) {
sv->Cleanup();
delete sv;
}
}
}
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const DBOptions* db_options,
const EnvOptions& storage_options,
Cache* table_cache)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr,
ColumnFamilyOptions(), db_options,
storage_options_, nullptr)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),
storage_options_(storage_options),
table_cache_(table_cache),
spin_lock_(ATOMIC_FLAG_INIT) {
// initialize linked list
dummy_cfd_->prev_ = dummy_cfd_;
dummy_cfd_->next_ = dummy_cfd_;
}
ColumnFamilySet::~ColumnFamilySet() {
while (column_family_data_.size() > 0) {
// cfd destructor will delete itself from column_family_data_
auto cfd = column_family_data_.begin()->second;
cfd->Unref();
delete cfd;
}
dummy_cfd_->Unref();
delete dummy_cfd_;
}
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
assert(default_cfd_cache_ != nullptr);
return default_cfd_cache_;
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
auto cfd_iter = column_family_data_.find(id);
if (cfd_iter != column_family_data_.end()) {
return cfd_iter->second;
} else {
return nullptr;
}
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
const {
auto cfd_iter = column_families_.find(name);
if (cfd_iter != column_families_.end()) {
auto cfd = GetColumnFamily(cfd_iter->second);
assert(cfd != nullptr);
return cfd;
} else {
return nullptr;
}
}
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
return ++max_column_family_;
}
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
max_column_family_ = std::max(new_max_column_family, max_column_family_);
}
// under a DB mutex
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
const std::string& name, uint32_t id, Version* dummy_versions,
const ColumnFamilyOptions& options) {
assert(column_families_.find(name) == column_families_.end());
ColumnFamilyData* new_cfd =
new ColumnFamilyData(db_name_, id, name, dummy_versions, table_cache_,
options, db_options_, storage_options_, this);
Lock();
column_families_.insert({name, id});
column_family_data_.insert({id, new_cfd});
Unlock();
max_column_family_ = std::max(max_column_family_, id);
// add to linked list
new_cfd->next_ = dummy_cfd_;
auto prev = dummy_cfd_->prev_;
new_cfd->prev_ = prev;
prev->next_ = new_cfd;
dummy_cfd_->prev_ = new_cfd;
if (id == 0) {
default_cfd_cache_ = new_cfd;
}
return new_cfd;
}
void ColumnFamilySet::Lock() {
// spin lock
while (spin_lock_.test_and_set(std::memory_order_acquire)) {
}
}
void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); }
// REQUIRES: DB mutex held
void ColumnFamilySet::FreeDeadColumnFamilies() {
autovector<ColumnFamilyData*> to_delete;
for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
if (cfd->refs_ == 0) {
to_delete.push_back(cfd);
}
}
for (auto cfd : to_delete) {
// this is very rare, so it's not a problem that we do it under a mutex
delete cfd;
}
}
// under a DB mutex
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
auto cfd_iter = column_family_data_.find(cfd->GetID());
assert(cfd_iter != column_family_data_.end());
Lock();
column_family_data_.erase(cfd_iter);
column_families_.erase(cfd->GetName());
Unlock();
}
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
if (column_family_id == 0) {
// optimization for common case
current_ = column_family_set_->GetDefault();
} else {
// maybe outside of db mutex, should lock
column_family_set_->Lock();
current_ = column_family_set_->GetColumnFamily(column_family_id);
column_family_set_->Unlock();
}
handle_.SetCFD(current_);
return current_ != nullptr;
}
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
assert(current_ != nullptr);
return current_->GetLogNumber();
}
MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
assert(current_ != nullptr);
return current_->mem();
}
const Options* ColumnFamilyMemTablesImpl::GetOptions() const {
assert(current_ != nullptr);
return current_->options();
}
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
assert(current_ != nullptr);
return &handle_;
}
} // namespace rocksdb
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <unordered_map>
#include <string>
#include <vector>
#include <atomic>
#include "rocksdb/options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "db/memtable_list.h"
#include "db/write_batch_internal.h"
#include "db/table_cache.h"
#include "util/thread_local.h"
namespace rocksdb {
class Version;
class VersionSet;
class MemTable;
class MemTableListVersion;
class CompactionPicker;
class Compaction;
class InternalKey;
class InternalStats;
class ColumnFamilyData;
class DBImpl;
class LogBuffer;
// ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client
// is done using the column family
class ColumnFamilyHandleImpl : public ColumnFamilyHandle {
public:
// create while holding the mutex
ColumnFamilyHandleImpl(ColumnFamilyData* cfd, DBImpl* db, port::Mutex* mutex);
// destroy without mutex
virtual ~ColumnFamilyHandleImpl();
virtual ColumnFamilyData* cfd() const { return cfd_; }
virtual uint32_t GetID() const;
private:
ColumnFamilyData* cfd_;
DBImpl* db_;
port::Mutex* mutex_;
};
// Does not ref-count ColumnFamilyData
// We use this dummy ColumnFamilyHandleImpl because sometimes MemTableInserter
// calls DBImpl methods. When this happens, MemTableInserter need access to
// ColumnFamilyHandle (same as the client would need). In that case, we feed
// MemTableInserter dummy ColumnFamilyHandle and enable it to call DBImpl
// methods
class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl {
public:
ColumnFamilyHandleInternal()
: ColumnFamilyHandleImpl(nullptr, nullptr, nullptr) {}
void SetCFD(ColumnFamilyData* cfd) { internal_cfd_ = cfd; }
virtual ColumnFamilyData* cfd() const override { return internal_cfd_; }
private:
ColumnFamilyData* internal_cfd_;
};
// holds references to memtable, all immutable memtables and version
struct SuperVersion {
MemTable* mem;
MemTableListVersion* imm;
Version* current;
std::atomic<uint32_t> refs;
// We need to_delete because during Cleanup(), imm->Unref() returns
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
autovector<MemTable*> to_delete;
// Version number of the current SuperVersion
uint64_t version_number;
port::Mutex* db_mutex;
// should be called outside the mutex
SuperVersion() = default;
~SuperVersion();
SuperVersion* Ref();
bool Unref();
// call these two methods with db mutex held
// Cleanup unrefs mem, imm and current. Also, it stores all memtables
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(MemTable* new_mem, MemTableListVersion* new_imm,
Version* new_current);
// The value of dummy is not actually used. kSVInUse takes its address as a
// mark in the thread local storage to indicate the SuperVersion is in use
// by thread. This way, the value of kSVInUse is guaranteed to have no
// conflict with SuperVersion object address and portable on different
// platform.
static int dummy;
static void* const kSVInUse;
static void* const kSVObsolete;
};
extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
const InternalFilterPolicy* ipolicy,
const ColumnFamilyOptions& src);
class ColumnFamilySet;
// This class keeps all the data that a column family needs. It's mosly dumb and
// used just to provide access to metadata.
// Most methods require DB mutex held, unless otherwise noted
class ColumnFamilyData {
public:
~ColumnFamilyData();
// thread-safe
uint32_t GetID() const { return id_; }
// thread-safe
const std::string& GetName() const { return name_; }
void Ref() { ++refs_; }
// will just decrease reference count to 0, but will not delete it. returns
// true if the ref count was decreased to zero. in that case, it can be
// deleted by the caller immediatelly, or later, by calling
// FreeDeadColumnFamilies()
bool Unref() {
assert(refs_ > 0);
return --refs_ == 0;
}
// This can only be called from single-threaded VersionSet::LogAndApply()
// After dropping column family no other operation on that column family
// will be executed. All the files and memory will be, however, kept around
// until client drops the column family handle. That way, client can still
// access data from dropped column family.
// Column family can be dropped and still alive. In that state:
// *) Column family is not included in the iteration.
// *) Compaction and flush is not executed on the dropped column family.
// *) Client can continue writing and reading from column family. However, all
// writes stay in the current memtable.
// When the dropped column family is unreferenced, then we:
// *) delete all memory associated with that column family
// *) delete all the files associated with that column family
void SetDropped() {
// can't drop default CF
assert(id_ != 0);
dropped_ = true;
}
bool IsDropped() const { return dropped_; }
// thread-safe
int NumberLevels() const { return options_.num_levels; }
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
uint64_t GetLogNumber() const { return log_number_; }
// thread-safe
const Options* options() const { return &options_; }
InternalStats* internal_stats() { return internal_stats_.get(); }
MemTableList* imm() { return &imm_; }
MemTable* mem() { return mem_; }
Version* current() { return current_; }
Version* dummy_versions() { return dummy_versions_; }
void SetMemtable(MemTable* new_mem) { mem_ = new_mem; }
void SetCurrent(Version* current);
void CreateNewMemtable();
TableCache* table_cache() { return table_cache_.get(); }
// See documentation in compaction_picker.h
Compaction* PickCompaction(LogBuffer* log_buffer);
Compaction* CompactRange(int input_level, int output_level,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe
const Comparator* user_comparator() const {
return internal_comparator_.user_comparator();
}
// thread-safe
const InternalKeyComparator& internal_comparator() const {
return internal_comparator_;
}
SuperVersion* GetSuperVersion() { return super_version_; }
// thread-safe
ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); }
// thread-safe
uint64_t GetSuperVersionNumber() const {
return super_version_number_.load();
}
// will return a pointer to SuperVersion* if previous SuperVersion
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion to enable
// the clients to allocate SuperVersion outside of mutex.
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion,
port::Mutex* db_mutex);
void ResetThreadLocalSuperVersions();
// A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files.
bool NeedSlowdownForNumLevel0Files() const {
return need_slowdown_for_num_level0_files_;
}
private:
friend class ColumnFamilySet;
ColumnFamilyData(const std::string& dbname, uint32_t id,
const std::string& name, Version* dummy_versions,
Cache* table_cache, const ColumnFamilyOptions& options,
const DBOptions* db_options,
const EnvOptions& storage_options,
ColumnFamilySet* column_family_set);
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions->prev_
int refs_; // outstanding references to ColumnFamilyData
bool dropped_; // true if client dropped it
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
Options const options_;
std::unique_ptr<TableCache> table_cache_;
std::unique_ptr<InternalStats> internal_stats_;
MemTable* mem_;
MemTableList imm_;
SuperVersion* super_version_;
// An ordinal representing the current SuperVersion. Updated by
// InstallSuperVersion(), i.e. incremented every time super_version_
// changes.
std::atomic<uint64_t> super_version_number_;
// Thread's local copy of SuperVersion pointer
// This needs to be destructed before mutex_
std::unique_ptr<ThreadLocalPtr> local_sv_;
// pointers for a circular linked list. we use it to support iterations
// that can be concurrent with writes
ColumnFamilyData* next_;
ColumnFamilyData* prev_;
// This is the earliest log file number that contains data from this
// Column Family. All earlier log files must be ignored and not
// recovered from
uint64_t log_number_;
// A flag indicating whether we should delay writes because
// we have too many level 0 files
bool need_slowdown_for_num_level0_files_;
// An object that keeps all the compaction stats
// and picks the next compaction
std::unique_ptr<CompactionPicker> compaction_picker_;
ColumnFamilySet* column_family_set_;
};
// ColumnFamilySet has interesting thread-safety requirements
// * CreateColumnFamily() or RemoveColumnFamily() -- need to protect by DB
// mutex. Inside, column_family_data_ and column_families_ will be protected
// by Lock() and Unlock(). CreateColumnFamily() should ONLY be called from
// VersionSet::LogAndApply() in the normal runtime. It is also called
// during Recovery and in DumpManifest(). RemoveColumnFamily() is called
// from ColumnFamilyData destructor
// * Iteration -- hold DB mutex, but you can release it in the body of
// iteration. If you release DB mutex in body, reference the column
// family before the mutex and unreference after you unlock, since the column
// family might get dropped when the DB mutex is released
// * GetDefault() -- thread safe
// * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock()
// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() --
// inside of DB mutex
class ColumnFamilySet {
public:
// ColumnFamilySet supports iteration
class iterator {
public:
explicit iterator(ColumnFamilyData* cfd)
: current_(cfd) {}
iterator& operator++() {
// dummy is never dead or dropped, so this will never be infinite
do {
current_ = current_->next_;
} while (current_->refs_ == 0 || current_->IsDropped());
return *this;
}
bool operator!=(const iterator& other) {
return this->current_ != other.current_;
}
ColumnFamilyData* operator*() { return current_; }
private:
ColumnFamilyData* current_;
};
ColumnFamilySet(const std::string& dbname, const DBOptions* db_options,
const EnvOptions& storage_options, Cache* table_cache);
~ColumnFamilySet();
ColumnFamilyData* GetDefault() const;
// GetColumnFamily() calls return nullptr if column family is not found
ColumnFamilyData* GetColumnFamily(uint32_t id) const;
ColumnFamilyData* GetColumnFamily(const std::string& name) const;
// this call will return the next available column family ID. it guarantees
// that there is no column family with id greater than or equal to the
// returned value in the current running instance or anytime in RocksDB
// instance history.
uint32_t GetNextColumnFamilyID();
uint32_t GetMaxColumnFamily();
void UpdateMaxColumnFamily(uint32_t new_max_column_family);
ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id,
Version* dummy_version,
const ColumnFamilyOptions& options);
iterator begin() { return iterator(dummy_cfd_->next_); }
iterator end() { return iterator(dummy_cfd_); }
void Lock();
void Unlock();
// REQUIRES: DB mutex held
// Don't call while iterating over ColumnFamilySet
void FreeDeadColumnFamilies();
private:
friend class ColumnFamilyData;
// helper function that gets called from cfd destructor
// REQUIRES: DB mutex held
void RemoveColumnFamily(ColumnFamilyData* cfd);
// column_families_ and column_family_data_ need to be protected:
// * when mutating: 1. DB mutex locked first, 2. spinlock locked second
// * when reading, either: 1. lock DB mutex, or 2. lock spinlock
// (if both, respect the ordering to avoid deadlock!)
std::unordered_map<std::string, uint32_t> column_families_;
std::unordered_map<uint32_t, ColumnFamilyData*> column_family_data_;
uint32_t max_column_family_;
ColumnFamilyData* dummy_cfd_;
// We don't hold the refcount here, since default column family always exists
// We are also not responsible for cleaning up default_cfd_cache_. This is
// just a cache that makes common case (accessing default column family)
// faster
ColumnFamilyData* default_cfd_cache_;
const std::string db_name_;
const DBOptions* const db_options_;
const EnvOptions storage_options_;
Cache* table_cache_;
std::atomic_flag spin_lock_;
};
// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access
// memtables of different column families (specified by ID in the write batch)
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
: column_family_set_(column_family_set), current_(nullptr) {}
// sets current_ to ColumnFamilyData with column_family_id
// returns false if column family doesn't exist
bool Seek(uint32_t column_family_id) override;
// Returns log number of the selected column family
uint64_t GetLogNumber() const override;
// REQUIRES: Seek() called first
virtual MemTable* GetMemTable() const override;
// Returns options for selected column family
// REQUIRES: Seek() called first
virtual const Options* GetOptions() const override;
// Returns column family handle for the selected column family
virtual ColumnFamilyHandle* GetColumnFamilyHandle() override;
private:
ColumnFamilySet* column_family_set_;
ColumnFamilyData* current_;
ColumnFamilyHandleInternal handle_;
};
} // namespace rocksdb
此差异已折叠。
......@@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction.h"
#include "db/column_family.h"
namespace rocksdb {
......@@ -29,6 +30,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(input_version),
number_levels_(input_version_->NumberLevels()),
cfd_(input_version_->cfd_),
seek_compaction_(seek_compaction),
enable_compression_(enable_compression),
grandparent_index_(0),
......@@ -42,8 +44,10 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
cfd_->Ref();
input_version_->Ref();
edit_ = new VersionEdit();
edit_->SetColumnFamily(cfd_->GetID());
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
}
......@@ -54,6 +58,11 @@ Compaction::~Compaction() {
if (input_version_ != nullptr) {
input_version_->Unref();
}
if (cfd_ != nullptr) {
if (cfd_->Unref()) {
delete cfd_;
}
}
}
bool Compaction::IsTrivialMove() const {
......@@ -77,12 +86,11 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_;
}
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
const Comparator* user_cmp = cfd_->user_comparator();
for (int lvl = level_ + 2; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) {
......@@ -103,7 +111,7 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
const InternalKeyComparator* icmp = &cfd_->internal_comparator();
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
......@@ -141,8 +149,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) {
// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// all files will be picked in a single compaction
......@@ -155,8 +162,7 @@ void Compaction::SetupBottomMostLevel(bool isManual) {
return;
}
bottommost_level_ = true;
int num_levels = input_version_->vset_->NumberLevels();
for (int i = output_level() + 1; i < num_levels; i++) {
for (int i = output_level() + 1; i < number_levels_; i++) {
if (input_version_->NumLevelFiles(i) > 0) {
bottommost_level_ = false;
break;
......@@ -169,6 +175,16 @@ void Compaction::ReleaseInputs() {
input_version_->Unref();
input_version_ = nullptr;
}
if (cfd_ != nullptr) {
if (cfd_->Unref()) {
delete cfd_;
}
cfd_ = nullptr;
}
}
void Compaction::ReleaseCompactionFiles(Status status) {
cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
}
void Compaction::ResetNextCompactionIndex() {
......
......@@ -13,6 +13,7 @@
namespace rocksdb {
class Version;
class ColumnFamilyData;
// A Compaction encapsulates information about a compaction.
class Compaction {
......@@ -36,6 +37,8 @@ class Compaction {
// Returns input version of the compaction
Version* input_version() const { return input_version_; }
ColumnFamilyData* column_family_data() const { return cfd_; }
// Return the ith input file at "level()+which" ("which" must be 0 or 1).
FileMetaData* input(int which, int i) const { return inputs_[which][i]; }
......@@ -67,6 +70,10 @@ class Compaction {
// is successful.
void ReleaseInputs();
// Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions.
void ReleaseCompactionFiles(Status status);
void Summary(char* output, int len);
// Return the score that was used to pick this compaction run.
......@@ -97,6 +104,7 @@ class Compaction {
Version* input_version_;
VersionEdit* edit_;
int number_levels_;
ColumnFamilyData* cfd_;
bool seek_compaction_;
bool enable_compression_;
......
......@@ -277,14 +277,10 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
Log(options_->info_log,
"Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)"
"\n",
(unsigned long)level,
(unsigned long)(c->inputs_[0].size()),
(unsigned long)(c->inputs_[1].size()),
(unsigned long)inputs0_size,
(unsigned long)inputs1_size,
(unsigned long)(expanded0.size()),
(unsigned long)(expanded1.size()),
(unsigned long)expanded0_size,
(unsigned long)level, (unsigned long)(c->inputs_[0].size()),
(unsigned long)(c->inputs_[1].size()), (unsigned long)inputs0_size,
(unsigned long)inputs1_size, (unsigned long)(expanded0.size()),
(unsigned long)(expanded1.size()), (unsigned long)expanded0_size,
(unsigned long)inputs1_size);
smallest = new_start;
largest = new_limit;
......@@ -587,7 +583,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
options_->level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
Log(options_->info_log, "Universal: compacting for file num\n");
LogToBuffer(log_buffer, "Universal: compacting for file num\n");
}
}
}
......@@ -653,7 +649,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
FileMetaData* f = nullptr;
bool done = false;
int start_index = 0;
unsigned int candidate_count;
unsigned int candidate_count = 0;
assert(file_by_time.size() == version->files_[level].size());
unsigned int max_files_to_compact = std::min(max_merge_width,
......
......@@ -12,6 +12,7 @@
#include "db/compaction.h"
#include "rocksdb/status.h"
#include "rocksdb/options.h"
#include "rocksdb/env.h"
#include <vector>
#include <memory>
......@@ -118,6 +119,7 @@ class CompactionPicker {
std::unique_ptr<uint64_t[]> level_max_bytes_;
const Options* const options_;
private:
int num_levels_;
......
此差异已折叠。
......@@ -7,6 +7,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <algorithm>
#include <string>
#include <stdint.h>
......@@ -17,6 +19,7 @@
#include "rocksdb/env.h"
#include "port/port.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
namespace rocksdb {
......@@ -60,21 +63,36 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
*manifest_file_size = 0;
mutex_.Lock();
if (flush_memtable) {
// flush all dirty data to disk.
Status status = Flush(FlushOptions());
Status status;
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions());
mutex_.Lock();
cfd->Unref();
if (!status.ok()) {
break;
}
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
if (!status.ok()) {
mutex_.Unlock();
Log(options_.info_log, "Cannot Flush data %s\n",
status.ToString().c_str());
return status;
}
}
MutexLock l(&mutex_);
// Make a set of all of the live *.sst files
std::set<uint64_t> live;
versions_->current()->AddLiveFiles(&live);
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->current()->AddLiveFiles(&live);
}
ret.clear();
ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST
......@@ -91,24 +109,60 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// find length of manifest file while holding the mutex lock
*manifest_file_size = versions_->ManifestFileSize();
mutex_.Unlock();
return Status::OK();
}
Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
// First get sorted files in archive dir, then append sorted files from main
// dir to maintain sorted order
// First get sorted files in db dir, then get sorted files from archived
// dir, to avoid a race condition where a log file is moved to archived
// dir in between.
Status s;
// list wal files in main db dir.
VectorLogPtr logs;
s = GetSortedWalsOfType(options_.wal_dir, logs, kAliveLogFile);
if (!s.ok()) {
return s;
}
// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1");
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2");
files.clear();
// list wal files in archive dir.
Status s;
std::string archivedir = ArchivalDirectory(options_.wal_dir);
if (env_->FileExists(archivedir)) {
s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile);
s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
if (!s.ok()) {
return s;
}
}
// list wal files in main db dir.
return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile);
uint64_t latest_archived_log_number = 0;
if (!files.empty()) {
latest_archived_log_number = files.back()->LogNumber();
Log(options_.info_log, "Latest Archived log: %" PRIu64,
latest_archived_log_number);
}
files.reserve(files.size() + logs.size());
for (auto& log : logs) {
if (log->LogNumber() > latest_archived_log_number) {
files.push_back(std::move(log));
} else {
// When the race condition happens, we could see the
// same log in both db dir and archived dir. Simply
// ignore the one in db dir. Note that, if we read
// archived dir first, we would have missed the log file.
Log(options_.info_log, "%s already moved to archive",
log->PathName().c_str());
}
}
return s;
}
}
此差异已折叠。
此差异已折叠。
......@@ -42,8 +42,8 @@
namespace rocksdb {
DBImplReadOnly::DBImplReadOnly(const Options& options,
const std::string& dbname)
DBImplReadOnly::DBImplReadOnly(const DBOptions& options,
const std::string& dbname)
: DBImpl(options, dbname) {
Log(options_.info_log, "Opening the db in read only mode");
}
......@@ -53,42 +53,57 @@ DBImplReadOnly::~DBImplReadOnly() {
// Implementations of the DB interface
Status DBImplReadOnly::Get(const ReadOptions& options,
const Slice& key,
std::string* value) {
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status s;
SequenceNumber snapshot = versions_->LastSequence();
SuperVersion* super_version = GetSuperVersion();
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context;
LookupKey lkey(key, snapshot);
if (super_version->mem->Get(lkey, value, &s, merge_context, options_)) {
if (super_version->mem->Get(lkey, value, &s, merge_context,
*cfd->options())) {
} else {
Version::GetStats stats;
super_version->current->Get(options, lkey, value, &s, &merge_context,
&stats, options_);
&stats, *cfd->options());
}
return s;
}
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
SequenceNumber latest_snapshot = versions_->LastSequence();
Iterator* internal_iter = NewInternalIterator(options, cfd, super_version);
return NewDBIterator(
&dbname_, env_, options_, user_comparator(),internal_iter,
&dbname_, env_, *cfd->options(), cfd->user_comparator(), internal_iter,
(options.snapshot != nullptr
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
}
Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DB** dbptr, bool error_if_log_file_exist) {
DB** dbptr, bool error_if_log_file_exist) {
*dbptr = nullptr;
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(default_column_family_name, cf_options));
DBImplReadOnly* impl = new DBImplReadOnly(db_options, dbname);
impl->mutex_.Lock();
Status s = impl->Recover(true /* read only */, error_if_log_file_exist);
Status s = impl->Recover(column_families, true /* read only */,
error_if_log_file_exist);
if (s.ok()) {
delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
}
}
impl->mutex_.Unlock();
if (s.ok()) {
......
......@@ -12,6 +12,8 @@
#include <deque>
#include <set>
#include <vector>
#include <string>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
......@@ -23,57 +25,79 @@
namespace rocksdb {
class DBImplReadOnly : public DBImpl {
public:
DBImplReadOnly(const Options& options, const std::string& dbname);
virtual ~DBImplReadOnly();
public:
DBImplReadOnly(const DBOptions& options, const std::string& dbname);
virtual ~DBImplReadOnly();
// Implementations of the DB interface
virtual Status Get(const ReadOptions& options,
const Slice& key,
std::string* value);
// Implementations of the DB interface
using DB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value);
// TODO: Implement ReadOnly MultiGet?
// TODO: Implement ReadOnly MultiGet?
virtual Iterator* NewIterator(const ReadOptions&);
using DBImpl::NewIterator;
virtual Iterator* NewIterator(const ReadOptions&,
ColumnFamilyHandle* column_family);
virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Merge(const WriteOptions&, const Slice& key,
const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Delete(const WriteOptions&, const Slice& key) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Write(const WriteOptions& options, WriteBatch* updates) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false, int target_level = -1) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status EnableFileDeletions(bool force) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Flush(const FlushOptions& options) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status NewIterators(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
std::vector<Iterator*>* iterators) {
// TODO
return Status::NotSupported("Not supported yet.");
}
private:
friend class DB;
using DBImpl::Put;
virtual Status Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Merge;
virtual Status Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Delete;
virtual Status Delete(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Write(const WriteOptions& options, WriteBatch* updates) {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::CompactRange;
virtual Status CompactRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end,
bool reduce_level = false,
int target_level = -1) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status EnableFileDeletions(bool force) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Flush;
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) {
return Status::NotSupported("Not supported operation in read only mode.");
}
// No copying allowed
DBImplReadOnly(const DBImplReadOnly&);
void operator=(const DBImplReadOnly&);
};
private:
friend class DB;
// No copying allowed
DBImplReadOnly(const DBImplReadOnly&);
void operator=(const DBImplReadOnly&);
};
}
......@@ -39,71 +39,6 @@ static void DumpInternalIter(Iterator* iter) {
namespace {
class IterLookupKey {
public:
IterLookupKey() : key_(space_), buf_size_(sizeof(space_)), key_size_(0) {}
~IterLookupKey() { Clear(); }
Slice GetKey() const {
if (key_ != nullptr) {
return Slice(key_, key_size_);
} else {
return Slice();
}
}
bool Valid() const { return key_ != nullptr; }
void Clear() {
if (key_ != nullptr && key_ != space_) {
delete[] key_;
}
key_ = space_;
buf_size_ = sizeof(buf_size_);
}
// Enlarge the buffer size if needed based on key_size.
// By default, static allocated buffer is used. Once there is a key
// larger than the static allocated buffer, another buffer is dynamically
// allocated, until a larger key buffer is requested. In that case, we
// reallocate buffer and delete the old one.
void EnlargeBufferIfNeeded(size_t key_size) {
// If size is smaller than buffer size, continue using current buffer,
// or the static allocated one, as default
if (key_size > buf_size_) {
// Need to enlarge the buffer.
Clear();
key_ = new char[key_size];
buf_size_ = key_size;
}
key_size_ = key_size;
}
void SetUserKey(const Slice& user_key) {
size_t size = user_key.size();
EnlargeBufferIfNeeded(size);
memcpy(key_, user_key.data(), size);
}
void SetInternalKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
EnlargeBufferIfNeeded(usize + sizeof(uint64_t));
memcpy(key_, user_key.data(), usize);
EncodeFixed64(key_ + usize, PackSequenceAndType(s, kValueTypeForSeek));
}
private:
char* key_;
size_t buf_size_;
size_t key_size_;
char space_[32]; // Avoid allocation for short keys
// No copying allowed
IterLookupKey(const IterLookupKey&) = delete;
void operator=(const LookupKey&) = delete;
};
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBIter
// combines multiple entries for the same userkey found in the DB
......@@ -191,7 +126,7 @@ class DBIter: public Iterator {
SequenceNumber const sequence_;
Status status_;
IterLookupKey saved_key_; // == current key when direction_==kReverse
IterKey saved_key_; // == current key when direction_==kReverse
std::string saved_value_; // == current raw value when direction_==kReverse
std::string skip_key_;
Direction direction_;
......@@ -254,10 +189,9 @@ void DBIter::Next() {
// NOTE: In between, saved_key_ can point to a user key that has
// a delete marker
inline void DBIter::FindNextUserEntry(bool skipping) {
StopWatchNano timer(env_, false);
StartPerfTimer(&timer);
PERF_TIMER_AUTO(find_next_user_entry_time);
FindNextUserEntryInternal(skipping);
BumpPerfTime(&perf_context.find_next_user_entry_time, &timer);
PERF_TIMER_STOP(find_next_user_entry_time);
}
// Actual implementation of DBIter::FindNextUserEntry()
......@@ -273,7 +207,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
if (skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
num_skipped++; // skip this entry
BumpPerfCount(&perf_context.internal_key_skipped_count);
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else {
skipping = false;
switch (ikey.type) {
......@@ -283,7 +217,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
saved_key_.SetUserKey(ikey.user_key);
skipping = true;
num_skipped = 0;
BumpPerfCount(&perf_context.internal_delete_skipped_count);
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
valid_ = true;
......@@ -488,10 +422,9 @@ void DBIter::Seek(const Slice& target) {
saved_key_.Clear();
// now savved_key is used to store internal key.
saved_key_.SetInternalKey(target, sequence_);
StopWatchNano internal_seek_timer(env_, false);
StartPerfTimer(&internal_seek_timer);
PERF_TIMER_AUTO(seek_internal_seek_time);
iter_->Seek(saved_key_.GetKey());
BumpPerfTime(&perf_context.seek_internal_seek_time, &internal_seek_timer);
PERF_TIMER_STOP(seek_internal_seek_time);
if (iter_->Valid()) {
direction_ = kForward;
ClearSavedValue();
......@@ -504,10 +437,9 @@ void DBIter::Seek(const Slice& target) {
void DBIter::SeekToFirst() {
direction_ = kForward;
ClearSavedValue();
StopWatchNano internal_seek_timer(env_, false);
StartPerfTimer(&internal_seek_timer);
PERF_TIMER_AUTO(seek_internal_seek_time);
iter_->SeekToFirst();
BumpPerfTime(&perf_context.seek_internal_seek_time, &internal_seek_timer);
PERF_TIMER_STOP(seek_internal_seek_time);
if (iter_->Valid()) {
FindNextUserEntry(false /* not skipping */);
} else {
......@@ -526,10 +458,9 @@ void DBIter::SeekToLast() {
direction_ = kReverse;
ClearSavedValue();
StopWatchNano internal_seek_timer(env_, false);
StartPerfTimer(&internal_seek_timer);
PERF_TIMER_AUTO(seek_internal_seek_time);
iter_->SeekToLast();
BumpPerfTime(&perf_context.seek_internal_seek_time, &internal_seek_timer);
PERF_TIMER_STOP(seek_internal_seek_time);
FindPrevUserEntry();
}
......
......@@ -65,7 +65,7 @@ void DBImpl::LogDBDeployStats() {
uint64_t file_total_size = 0;
uint32_t file_total_num = 0;
Version* current = versions_->current();
Version* current = default_cf_handle_->cfd()->current();
for (int i = 0; i < current->NumberLevels(); i++) {
file_total_num += current->NumLevelFiles(i);
file_total_size += current->NumLevelBytes(i);
......
此差异已折叠。
......@@ -59,7 +59,7 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
BumpPerfCount(&perf_context.user_key_comparison_count);
PERF_COUNTER_ADD(user_key_comparison_count, 1);
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
......@@ -79,7 +79,7 @@ int InternalKeyComparator::Compare(const ParsedInternalKey& a,
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(a.user_key, b.user_key);
BumpPerfCount(&perf_context.user_key_comparison_count);
PERF_COUNTER_ADD(user_key_comparison_count, 1);
if (r == 0) {
if (a.sequence > b.sequence) {
r = -1;
......
......@@ -32,6 +32,9 @@ enum ValueType : unsigned char {
kTypeValue = 0x1,
kTypeMerge = 0x2,
kTypeLogData = 0x3,
kTypeColumnFamilyDeletion = 0x4,
kTypeColumnFamilyValue = 0x5,
kTypeColumnFamilyMerge = 0x6,
kMaxValue = 0x7F
};
......@@ -235,4 +238,74 @@ inline LookupKey::~LookupKey() {
if (start_ != space_) delete[] start_;
}
class IterKey {
public:
IterKey() : key_(space_), buf_size_(sizeof(space_)), key_size_(0) {}
~IterKey() { Clear(); }
Slice GetKey() const {
if (key_ != nullptr) {
return Slice(key_, key_size_);
} else {
return Slice();
}
}
bool Valid() const { return key_ != nullptr; }
void Clear() {
if (key_ != nullptr && key_ != space_) {
delete[] key_;
}
key_ = space_;
buf_size_ = sizeof(buf_size_);
}
// Enlarge the buffer size if needed based on key_size.
// By default, static allocated buffer is used. Once there is a key
// larger than the static allocated buffer, another buffer is dynamically
// allocated, until a larger key buffer is requested. In that case, we
// reallocate buffer and delete the old one.
void EnlargeBufferIfNeeded(size_t key_size) {
// If size is smaller than buffer size, continue using current buffer,
// or the static allocated one, as default
if (key_size > buf_size_) {
// Need to enlarge the buffer.
Clear();
key_ = new char[key_size];
buf_size_ = key_size;
}
key_size_ = key_size;
}
void SetUserKey(const Slice& user_key) {
size_t size = user_key.size();
EnlargeBufferIfNeeded(size);
memcpy(key_, user_key.data(), size);
}
void SetInternalKey(const Slice& user_key, SequenceNumber s,
ValueType value_type = kValueTypeForSeek) {
size_t usize = user_key.size();
EnlargeBufferIfNeeded(usize + sizeof(uint64_t));
memcpy(key_, user_key.data(), usize);
EncodeFixed64(key_ + usize, PackSequenceAndType(s, value_type));
}
void SetInternalKey(const ParsedInternalKey& parsed_key) {
SetInternalKey(parsed_key.user_key, parsed_key.sequence, parsed_key.type);
}
private:
char* key_;
size_t buf_size_;
size_t key_size_;
char space_[32]; // Avoid allocation for short keys
// No copying allowed
IterKey(const IterKey&) = delete;
void operator=(const IterKey&) = delete;
};
} // namespace rocksdb
......@@ -7,8 +7,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/internal_stats.h"
#include "db/db_impl.h"
#include "db/memtable_list.h"
#include "db/column_family.h"
#include <vector>
......@@ -44,10 +43,8 @@ DBPropertyType GetPropertyType(const Slice& property) {
bool InternalStats::GetProperty(DBPropertyType property_type,
const Slice& property, std::string* value,
DBImpl* db) {
VersionSet* version_set = db->versions_.get();
Version* current = version_set->current();
const MemTableList& imm = db->imm_;
ColumnFamilyData* cfd) {
Version* current = cfd->current();
Slice in = property;
switch (property_type) {
......@@ -110,7 +107,6 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
write_with_wal = statistics_->getTickerCount(WRITE_WITH_WAL);
}
// Pardon the long line but I think it is easier to read this way.
snprintf(
buf, sizeof(buf),
" Compactions\n"
......@@ -159,7 +155,7 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
"%9lu\n",
level, files, current->NumLevelBytes(level) / 1048576.0,
current->NumLevelBytes(level) /
version_set->MaxBytesForLevel(level),
cfd->compaction_picker()->MaxBytesForLevel(level),
compaction_stats_[level].micros / 1e6,
bytes_read / 1048576.0,
compaction_stats_[level].bytes_written / 1048576.0,
......@@ -334,11 +330,11 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
*value = current->DebugString();
return true;
case kNumImmutableMemTable:
*value = std::to_string(imm.size());
*value = std::to_string(cfd->imm()->size());
return true;
case kMemtableFlushPending:
// Return number of mem tables that are ready to flush (made immutable)
*value = std::to_string(imm.IsFlushPending() ? 1 : 0);
*value = std::to_string(cfd->imm()->IsFlushPending() ? 1 : 0);
return true;
case kCompactionPending:
// 1 if the system already determines at least one compacdtion is needed.
......@@ -351,7 +347,7 @@ bool InternalStats::GetProperty(DBPropertyType property_type,
return true;
case kCurSizeActiveMemTable:
// Current size of the active memtable
*value = std::to_string(db->mem_->ApproximateMemoryUsage());
*value = std::to_string(cfd->mem()->ApproximateMemoryUsage());
return true;
default:
return false;
......
......@@ -16,6 +16,8 @@
#include <vector>
#include <string>
class ColumnFamilyData;
namespace rocksdb {
class MemTableList;
......@@ -126,7 +128,7 @@ class InternalStats {
uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; }
bool GetProperty(DBPropertyType property_type, const Slice& property,
std::string* value, DBImpl* db);
std::string* value, ColumnFamilyData* cfd);
private:
std::vector<CompactionStats> compaction_stats_;
......
......@@ -29,7 +29,8 @@
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
MemTable::MemTable(const InternalKeyComparator& cmp,
const Options& options)
: comparator_(cmp),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)),
......@@ -42,7 +43,6 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
file_number_(0),
first_seqno_(0),
mem_next_logfile_number_(0),
mem_logfile_number_(0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0),
prefix_extractor_(options.prefix_extractor.get()),
......@@ -142,6 +142,11 @@ Slice MemTableRep::UserKey(const char* key) const {
return Slice(slice.data(), slice.size() - 8);
}
KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
*buf = arena_->Allocate(len);
return static_cast<KeyHandle>(*buf);
}
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
......@@ -243,7 +248,9 @@ void MemTable::Add(SequenceNumber s, ValueType type,
const size_t encoded_len =
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_.Allocate(encoded_len);
char* buf = nullptr;
KeyHandle handle = table_->Allocate(encoded_len, &buf);
assert(buf != nullptr);
char* p = EncodeVarint32(buf, internal_key_size);
memcpy(p, key.data(), key_size);
p += key_size;
......@@ -252,7 +259,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size);
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
table_->Insert(buf);
table_->Insert(handle);
if (prefix_bloom_) {
assert(prefix_extractor_);
......@@ -370,8 +377,7 @@ static bool SaveValue(void* arg, const char* entry) {
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options) {
StopWatchNano memtable_get_timer(options.env, false);
StartPerfTimer(&memtable_get_timer);
PERF_TIMER_AUTO(get_from_memtable_time);
Slice user_key = key.user_key();
bool found_final_value = false;
......@@ -401,8 +407,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
if (!found_final_value && merge_in_progress) {
*s = Status::MergeInProgress("");
}
BumpPerfTime(&perf_context.get_from_memtable_time, &memtable_get_timer);
BumpPerfCount(&perf_context.get_from_memtable_count);
PERF_TIMER_STOP(get_from_memtable_time);
PERF_COUNTER_ADD(get_from_memtable_count, 1);
return found_final_value;
}
......
......@@ -13,7 +13,7 @@
#include <deque>
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "db/version_set.h"
#include "db/version_edit.h"
#include "rocksdb/db.h"
#include "rocksdb/memtablerep.h"
#include "util/arena.h"
......@@ -39,7 +39,7 @@ class MemTable {
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator,
const Options& options = Options());
const Options& options);
~MemTable();
......@@ -147,14 +147,6 @@ class MemTable {
// be flushed to storage
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// Returns the logfile number that can be safely deleted when this
// memstore is flushed to storage
uint64_t GetLogNumber() { return mem_logfile_number_; }
// Sets the logfile number that can be safely deleted when this
// memstore is flushed to storage
void SetLogNumber(uint64_t num) { mem_logfile_number_ = num; }
// Notify the underlying storage that no more items will be added
void MarkImmutable() { table_->MarkReadOnly(); }
......@@ -197,10 +189,6 @@ class MemTable {
// The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_;
// The log file that backs this memtable (to be deleted when
// memtable flush is done)
uint64_t mem_logfile_number_;
// rw locks for inplace updates
std::vector<port::RWMutex> locks_;
......
......@@ -8,9 +8,11 @@
#include <string>
#include "rocksdb/db.h"
#include "db/memtable.h"
#include "db/version_set.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "util/coding.h"
#include "util/log_buffer.h"
namespace rocksdb {
......@@ -120,7 +122,8 @@ void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
}
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t file_number, std::set<uint64_t>* pending_outputs) {
uint64_t file_number,
std::set<uint64_t>* pending_outputs) {
assert(!mems.empty());
// If the flush was not successful, then just reset state.
......@@ -140,10 +143,10 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
const autovector<MemTable*>& mems, VersionSet* vset,
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory) {
Directory* db_directory, LogBuffer* log_buffer) {
mu->AssertHeld();
// flush was sucessful
......@@ -173,12 +176,11 @@ Status MemTableList::InstallMemtableFlushResults(
break;
}
Log(info_log,
"Level-0 commit table #%lu started",
(unsigned long)m->file_number_);
LogToBuffer(log_buffer, "Level-0 commit table #%lu started",
(unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu, db_directory);
s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable
......@@ -189,10 +191,8 @@ Status MemTableList::InstallMemtableFlushResults(
uint64_t mem_id = 1; // how many memtables has been flushed.
do {
if (s.ok()) { // commit new state
Log(info_log,
"Level-0 commit table #%lu: memtable #%lu done",
(unsigned long)m->file_number_,
(unsigned long)mem_id);
LogToBuffer(log_buffer, "Level-0 commit table #%lu: memtable #%lu done",
(unsigned long)m->file_number_, (unsigned long)mem_id);
current_->Remove(m);
assert(m->file_number_ > 0);
......
......@@ -7,19 +7,25 @@
#include <string>
#include <list>
#include <vector>
#include <set>
#include <deque>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/iterator.h"
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/skiplist.h"
#include "rocksdb/db.h"
#include "db/memtable.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/autovector.h"
#include "util/log_buffer.h"
namespace rocksdb {
class ColumnFamilyData;
class InternalKeyComparator;
class Mutex;
......@@ -99,12 +105,14 @@ class MemTableList {
std::set<uint64_t>* pending_outputs);
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(const autovector<MemTable*>& m,
Status InstallMemtableFlushResults(ColumnFamilyData* cfd,
const autovector<MemTable*>& m,
VersionSet* vset, port::Mutex* mu,
Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs,
autovector<MemTable*>* to_delete,
Directory* db_directory);
Directory* db_directory,
LogBuffer* log_buffer);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().
......
......@@ -429,6 +429,48 @@ TEST(PlainTableDBTest, Iterator) {
}
}
std::string MakeLongKey(size_t length, char c) {
return std::string(length, c);
}
TEST(PlainTableDBTest, IteratorLargeKeys) {
Options options = CurrentOptions();
options.table_factory.reset(NewTotalOrderPlainTableFactory(0, 0, 16));
options.create_if_missing = true;
options.prefix_extractor.reset();
DestroyAndReopen(&options);
std::string key_list[] = {
MakeLongKey(30, '0'),
MakeLongKey(16, '1'),
MakeLongKey(32, '2'),
MakeLongKey(60, '3'),
MakeLongKey(90, '4'),
MakeLongKey(50, '5'),
MakeLongKey(26, '6')
};
for (size_t i = 0; i < 7; i++) {
ASSERT_OK(Put(key_list[i], std::to_string(i)));
}
dbfull()->TEST_FlushMemTable();
Iterator* iter = dbfull()->NewIterator(ro_);
iter->Seek(key_list[0]);
for (size_t i = 0; i < 7; i++) {
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(key_list[i], iter->key().ToString());
ASSERT_EQ(std::to_string(i), iter->value().ToString());
iter->Next();
}
ASSERT_TRUE(!iter->Valid());
delete iter;
}
// A test comparator which compare two strings in this way:
// (1) first compare prefix of 8 bytes in alphabet order,
// (2) if two strings share the same prefix, sort the other part of the string
......
......@@ -55,14 +55,20 @@ class Repairer {
icmp_(options.comparator),
ipolicy_(options.filter_policy),
options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)),
raw_table_cache_(
// TableCache can be small since we expect each table to be opened
// once.
NewLRUCache(10, options_.table_cache_numshardbits,
options_.table_cache_remove_scan_count_limit)),
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10);
table_cache_ = new TableCache(dbname_, &options_, storage_options_,
raw_table_cache_.get());
edit_ = new VersionEdit();
}
~Repairer() {
delete table_cache_;
raw_table_cache_.reset();
delete edit_;
}
......@@ -102,6 +108,7 @@ class Repairer {
InternalKeyComparator const icmp_;
InternalFilterPolicy const ipolicy_;
Options const options_;
std::shared_ptr<Cache> raw_table_cache_;
TableCache* table_cache_;
VersionEdit* edit_;
......@@ -197,6 +204,7 @@ class Repairer {
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_);
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref();
int counter = 0;
while (reader.ReadRecord(&record, &scratch)) {
......@@ -206,7 +214,7 @@ class Repairer {
continue;
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
status = WriteBatchInternal::InsertInto(&batch, cf_mems_default);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
......@@ -226,6 +234,7 @@ class Repairer {
iter, &meta, icmp_, 0, 0, kNoCompression);
delete iter;
delete mem->Unref();
delete cf_mems_default;
mem = nullptr;
if (status.ok()) {
if (meta.file_size > 0) {
......
......@@ -35,18 +35,13 @@ static Slice GetSliceForFileNumber(uint64_t* file_number) {
sizeof(*file_number));
}
TableCache::TableCache(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
int entries)
TableCache::TableCache(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, Cache* const cache)
: env_(options->env),
dbname_(dbname),
options_(options),
storage_options_(storage_options),
cache_(
NewLRUCache(entries, options->table_cache_numshardbits,
options->table_cache_remove_scan_count_limit)) {
}
cache_(cache) {}
TableCache::~TableCache() {
}
......@@ -124,7 +119,7 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
TableReader* table_reader = GetTableReaderFromHandle(handle);
Iterator* result = table_reader->NewIterator(options);
if (!file_meta.table_reader_handle) {
result->RegisterCleanup(&UnrefEntry, cache_.get(), handle);
result->RegisterCleanup(&UnrefEntry, cache_, handle);
}
if (table_reader_ptr != nullptr) {
*table_reader_ptr = table_reader;
......@@ -216,8 +211,8 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options,
return may_match;
}
void TableCache::Evict(uint64_t file_number) {
cache_->Erase(GetSliceForFileNumber(&file_number));
void TableCache::Evict(Cache* cache, uint64_t file_number) {
cache->Erase(GetSliceForFileNumber(&file_number));
}
} // namespace rocksdb
......@@ -30,7 +30,7 @@ struct FileMetaData;
class TableCache {
public:
TableCache(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, int entries);
const EnvOptions& storage_options, Cache* cache);
~TableCache();
// Return an iterator for the specified file number (the corresponding
......@@ -64,7 +64,7 @@ class TableCache {
const Slice& internal_prefix, bool* table_io);
// Evict any entry for the specified file number
void Evict(uint64_t file_number);
static void Evict(Cache* cache, uint64_t file_number);
// Find table reader
Status FindTable(const EnvOptions& toptions,
......@@ -95,7 +95,7 @@ class TableCache {
const std::string dbname_;
const Options* options_;
const EnvOptions& storage_options_;
std::shared_ptr<Cache> cache_;
Cache* const cache_;
};
} // namespace rocksdb
......@@ -8,15 +8,19 @@
#include <string>
#include <utility>
#include "db/db_impl.h"
#include "db/column_family.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
namespace rocksdb {
TailingIterator::TailingIterator(DBImpl* db, const ReadOptions& options,
const Comparator* comparator)
: db_(db), options_(options), comparator_(comparator),
version_number_(0), current_(nullptr),
ColumnFamilyData* cfd)
: db_(db),
options_(options),
cfd_(cfd),
version_number_(0),
current_(nullptr),
status_(Status::InvalidArgument("Seek() not called on this iterator")) {}
bool TailingIterator::Valid() const {
......@@ -53,10 +57,9 @@ void TailingIterator::Seek(const Slice& target) {
// 'target' -- in this case, prev_key_ is included in the interval, so
// prev_inclusive_ has to be set.
if (!is_prev_set_ ||
comparator_->Compare(prev_key_, target) >= !is_prev_inclusive_ ||
(immutable_->Valid() &&
comparator_->Compare(target, immutable_->key()) > 0) ||
const Comparator* cmp = cfd_->user_comparator();
if (!is_prev_set_ || cmp->Compare(prev_key_, target) >= !is_prev_inclusive_ ||
(immutable_->Valid() && cmp->Compare(target, immutable_->key()) > 0) ||
(options_.prefix_seek && !IsSamePrefix(target))) {
SeekImmutable(target);
}
......@@ -121,7 +124,7 @@ void TailingIterator::SeekToLast() {
void TailingIterator::CreateIterators() {
std::pair<Iterator*, Iterator*> iters =
db_->GetTailingIteratorPair(options_, &version_number_);
db_->GetTailingIteratorPair(options_, cfd_, &version_number_);
assert(iters.first && iters.second);
......@@ -137,9 +140,10 @@ void TailingIterator::UpdateCurrent() {
if (mutable_->Valid()) {
current_ = mutable_.get();
}
const Comparator* cmp = cfd_->user_comparator();
if (immutable_->Valid() &&
(current_ == nullptr ||
comparator_->Compare(immutable_->key(), current_->key()) < 0)) {
cmp->Compare(immutable_->key(), current_->key()) < 0)) {
current_ = immutable_.get();
}
......@@ -151,11 +155,11 @@ void TailingIterator::UpdateCurrent() {
bool TailingIterator::IsCurrentVersion() const {
return mutable_ != nullptr && immutable_ != nullptr &&
version_number_ == db_->CurrentVersionNumber();
version_number_ == cfd_->GetSuperVersionNumber();
}
bool TailingIterator::IsSamePrefix(const Slice& target) const {
const SliceTransform* extractor = db_->options_.prefix_extractor.get();
const SliceTransform* extractor = cfd_->options()->prefix_extractor.get();
assert(extractor);
assert(is_prev_set_);
......
......@@ -13,6 +13,7 @@
namespace rocksdb {
class DBImpl;
class ColumnFamilyData;
/**
* TailingIterator is a special type of iterator that doesn't use an (implicit)
......@@ -25,7 +26,7 @@ class DBImpl;
class TailingIterator : public Iterator {
public:
TailingIterator(DBImpl* db, const ReadOptions& options,
const Comparator* comparator);
ColumnFamilyData* cfd);
virtual ~TailingIterator() {}
virtual bool Valid() const override;
......@@ -41,7 +42,7 @@ class TailingIterator : public Iterator {
private:
DBImpl* const db_;
const ReadOptions options_;
const Comparator* const comparator_;
ColumnFamilyData* const cfd_;
uint64_t version_number_;
// TailingIterator merges the contents of the two iterators below (one using
......
......@@ -9,7 +9,7 @@
namespace rocksdb {
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
const std::string& dir, const Options* options,
const std::string& dir, const DBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seq,
std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl)
......
......@@ -67,7 +67,7 @@ class LogFileImpl : public LogFile {
class TransactionLogIteratorImpl : public TransactionLogIterator {
public:
TransactionLogIteratorImpl(
const std::string& dir, const Options* options,
const std::string& dir, const DBOptions* options,
const TransactionLogIterator::ReadOptions& read_options,
const EnvOptions& soptions, const SequenceNumber seqNum,
std::unique_ptr<VectorLogPtr> files, DBImpl const* const dbimpl);
......@@ -82,7 +82,7 @@ class TransactionLogIteratorImpl : public TransactionLogIterator {
private:
const std::string& dir_;
const Options* options_;
const DBOptions* options_;
const TransactionLogIterator::ReadOptions read_options_;
const EnvOptions& soptions_;
SequenceNumber startingSequenceNumber_;
......
......@@ -11,6 +11,7 @@
#include "db/version_set.h"
#include "util/coding.h"
#include "rocksdb/slice.h"
namespace rocksdb {
......@@ -29,6 +30,11 @@ enum Tag {
// these are new formats divergent from open source leveldb
kNewFile2 = 100, // store smallest & largest seqno
kColumnFamily = 200, // specify column family for version edit
kColumnFamilyAdd = 201,
kColumnFamilyDrop = 202,
kMaxColumnFamily = 203,
};
void VersionEdit::Clear() {
......@@ -38,13 +44,19 @@ void VersionEdit::Clear() {
prev_log_number_ = 0;
last_sequence_ = 0;
next_file_number_ = 0;
max_column_family_ = 0;
has_comparator_ = false;
has_log_number_ = false;
has_prev_log_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false;
has_max_column_family_ = false;
deleted_files_.clear();
new_files_.clear();
column_family_ = 0;
is_column_family_add_ = 0;
is_column_family_drop_ = 0;
column_family_name_.clear();
}
void VersionEdit::EncodeTo(std::string* dst) const {
......@@ -68,6 +80,10 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kLastSequence);
PutVarint64(dst, last_sequence_);
}
if (has_max_column_family_) {
PutVarint32(dst, kMaxColumnFamily);
PutVarint32(dst, max_column_family_);
}
for (const auto& deleted : deleted_files_) {
PutVarint32(dst, kDeletedFile);
......@@ -86,6 +102,21 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.smallest_seqno);
PutVarint64(dst, f.largest_seqno);
}
// 0 is default and does not need to be explicitly written
if (column_family_ != 0) {
PutVarint32(dst, kColumnFamily);
PutVarint32(dst, column_family_);
}
if (is_column_family_add_) {
PutVarint32(dst, kColumnFamilyAdd);
PutLengthPrefixedSlice(dst, Slice(column_family_name_));
}
if (is_column_family_drop_) {
PutVarint32(dst, kColumnFamilyDrop);
}
}
static bool GetInternalKey(Slice* input, InternalKey* dst) {
......@@ -167,6 +198,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kMaxColumnFamily:
if (GetVarint32(&input, &max_column_family_)) {
has_max_column_family_ = true;
} else {
msg = "max column family";
}
break;
case kCompactPointer:
if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) {
......@@ -221,6 +260,29 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kColumnFamily:
if (!GetVarint32(&input, &column_family_)) {
if (!msg) {
msg = "set column family id";
}
}
break;
case kColumnFamilyAdd:
if (GetLengthPrefixedSlice(&input, &str)) {
is_column_family_add_ = true;
column_family_name_ = str.ToString();
} else {
if (!msg) {
msg = "column family add";
}
}
break;
case kColumnFamilyDrop:
is_column_family_drop_ = true;
break;
default:
msg = "unknown tag";
break;
......@@ -282,6 +344,19 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(" .. ");
r.append(f.largest.DebugString(hex_key));
}
r.append("\n ColumnFamily: ");
AppendNumberTo(&r, column_family_);
if (is_column_family_add_) {
r.append("\n ColumnFamilyAdd: ");
r.append(column_family_name_);
}
if (is_column_family_drop_) {
r.append("\n ColumnFamilyDrop");
}
if (has_max_column_family_) {
r.append("\n MaxColumnFamily: ");
AppendNumberTo(&r, max_column_family_);
}
r.append("\n}\n");
return r;
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册