提交 6de1b5b8 编写于 作者: I Igor Canadi

Merge branch 'master' into columnfamilies

......@@ -44,5 +44,7 @@ libraries. You are on your own.
`make clean; make` will compile librocksdb.a (RocskDB static library) and all
the unit tests. You can run all unit tests with `make check`.
For shared library builds, exec `make librocksdb.so` instead.
If you followed the above steps and your compile or unit tests fail,
please submit an issue: (https://github.com/facebook/rocksdb/issues)
......@@ -50,6 +50,7 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full
TESTS = \
db_test \
autovector_test \
table_properties_collector_test \
arena_test \
auto_roll_logger_test \
......@@ -226,6 +227,9 @@ signal_test: util/signal_test.o $(LIBOBJECTS)
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
autovector_test: util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/autovector_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
table_properties_collector_test: db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/table_properties_collector_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
......
......@@ -79,4 +79,4 @@ include/rocksdb/statistics.h
include/rocksdb/transaction_log.h
An API to retrieve transaction logs from a database.
Design discussions are conducted in https://www.facebook.com/groups/rocksdb.dev/
* Detailed instructions on how to compile using fbcode and jemalloc
* Latest release is 2.5.fb
* Latest release is 2.7.fb
......@@ -189,6 +189,18 @@ EOF
COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_ATOMIC_PRESENT"
fi
# Test whether fallocate is available
$CXX $CFLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <fcntl.h>
int main() {
int fd = open("/dev/null", 0);
fallocate(fd, 0, 0, 1024);
}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_FALLOCATE_PRESENT"
fi
# Test whether Snappy library is installed
# http://code.google.com/p/snappy/
$CXX $CFLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
......
......@@ -8,7 +8,7 @@
#
# create git version file
VFILE=$ROCKSDB_ROOT/util/build_version.cc.tmp
VFILE=$PWD/util/build_version.cc.tmp
trap "rm $VFILE" EXIT
# check to see if git is in the path
......@@ -36,7 +36,7 @@ echo "const char* rocksdb_build_git_datetime = \"rocksdb_build_git_datetime:$(da
echo "const char* rocksdb_build_compile_date = __DATE__;" >> ${VFILE}
echo "const char* rocksdb_build_compile_time = __TIME__;" >> ${VFILE}
OUTFILE=$ROCKSDB_ROOT/util/build_version.cc
OUTFILE=$PWD/util/build_version.cc
if [ ! -e $OUTFILE ] || ! cmp -s $VFILE $OUTFILE; then
cp $VFILE $OUTFILE
fi
......@@ -54,7 +54,7 @@ RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib
CFLAGS="-B$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/gold -m64 -mtune=generic"
CFLAGS+=" -I $TOOLCHAIN_LIB_BASE/jemalloc/$TOOL_JEMALLOC/include -DHAVE_JEMALLOC"
CFLAGS+=" $LIBGCC_INCLUDE $GLIBC_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DSNAPPY -DGFLAGS -DZLIB -DBZIP2"
EXEC_LDFLAGS=" -Wl,--whole-archive $TOOLCHAIN_LIB_BASE/jemalloc/$TOOL_JEMALLOC/lib/libjemalloc.a"
......
......@@ -61,7 +61,7 @@ RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib
CFLAGS="-B$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/gold -m64 -mtune=generic"
CFLAGS+=" -nostdlib $LIBGCC_INCLUDE $GLIBC_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DSNAPPY -DGFLAGS -DZLIB -DBZIP2"
EXEC_LDFLAGS="-Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.8.1-glibc-2.17/lib/ld.so"
......
......@@ -48,6 +48,7 @@ DEFINE_string(benchmarks,
"compact,"
"readrandom,"
"readseq,"
"readtocache,"
"readreverse,"
"readwhilewriting,"
"readrandomwriterandom,"
......@@ -75,6 +76,7 @@ DEFINE_string(benchmarks,
"\tdeleteseq -- delete N keys in sequential order\n"
"\tdeleterandom -- delete N keys in random order\n"
"\treadseq -- read N times sequentially\n"
"\treadtocache -- 1 thread reading database sequentially\n"
"\treadreverse -- read N times in reverse order\n"
"\treadrandom -- read N times in random order\n"
"\treadmissing -- read N missing keys in random order\n"
......@@ -1057,6 +1059,10 @@ class Benchmark {
method = &Benchmark::WriteRandom;
} else if (name == Slice("readseq")) {
method = &Benchmark::ReadSequential;
} else if (name == Slice("readtocache")) {
method = &Benchmark::ReadSequential;
num_threads = 1;
reads_ = num_;
} else if (name == Slice("readreverse")) {
method = &Benchmark::ReadReverse;
} else if (name == Slice("readrandom")) {
......
......@@ -22,20 +22,34 @@ namespace rocksdb {
Status DBImpl::DisableFileDeletions() {
MutexLock l(&mutex_);
disable_delete_obsolete_files_ = true;
Log(options_.info_log, "File Deletions Disabled");
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
// if not, it has already been disabled, so don't log anything
Log(options_.info_log, "File Deletions Disabled");
}
return Status::OK();
}
Status DBImpl::EnableFileDeletions() {
Status DBImpl::EnableFileDeletions(bool force) {
DeletionState deletion_state;
bool should_purge_files = false;
{
MutexLock l(&mutex_);
disable_delete_obsolete_files_ = false;
Log(options_.info_log, "File Deletions Enabled");
FindObsoleteFiles(deletion_state, true);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
} else if (disable_delete_obsolete_files_ > 0) {
--disable_delete_obsolete_files_;
}
if (disable_delete_obsolete_files_ == 0) {
Log(options_.info_log, "File Deletions Enabled");
should_purge_files = true;
FindObsoleteFiles(deletion_state, true);
}
}
if (should_purge_files) {
PurgeObsoleteFiles(deletion_state);
}
PurgeObsoleteFiles(deletion_state);
LogFlush(options_.info_log);
return Status::OK();
}
......
......@@ -244,13 +244,14 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mem_(new MemTable(internal_comparator_, mem_rep_factory_,
NumberLevels(), options_)),
logfile_number_(0),
super_version_(nullptr),
tmp_batch_(),
bg_compaction_scheduled_(0),
bg_flush_scheduled_(0),
bg_logstats_scheduled_(false),
manual_compaction_(nullptr),
logger_(nullptr),
disable_delete_obsolete_files_(false),
disable_delete_obsolete_files_(0),
delete_obsolete_files_last_run_(options.env->NowMicros()),
purge_wal_files_last_run_(0),
last_stats_dump_time_microsec_(0),
......@@ -319,6 +320,13 @@ DBImpl::~DBImpl() {
bg_logstats_scheduled_) {
bg_cv_.Wait();
}
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
}
mutex_.Unlock();
if (db_lock_ != nullptr) {
......@@ -348,6 +356,13 @@ void DBImpl::TEST_Destroy_DBImpl() {
bg_logstats_scheduled_) {
bg_cv_.Wait();
}
if (super_version_ != nullptr) {
bool is_last_reference __attribute__((unused));
is_last_reference = super_version_->Unref();
assert(is_last_reference);
super_version_->Cleanup();
delete super_version_;
}
// Prevent new compactions from occuring.
bg_work_gate_closed_ = true;
......@@ -446,6 +461,49 @@ void DBImpl::MaybeDumpStats() {
}
}
// DBImpl::SuperVersion methods
DBImpl::SuperVersion::SuperVersion(const int num_memtables) {
to_delete.resize(num_memtables);
}
DBImpl::SuperVersion::~SuperVersion() {
for (auto td : to_delete) {
delete td;
}
}
DBImpl::SuperVersion* DBImpl::SuperVersion::Ref() {
refs.fetch_add(1, std::memory_order_relaxed);
return this;
}
bool DBImpl::SuperVersion::Unref() {
assert(refs > 0);
// fetch_sub returns the previous value of ref
return refs.fetch_sub(1, std::memory_order_relaxed) == 1;
}
void DBImpl::SuperVersion::Cleanup() {
assert(refs.load(std::memory_order_relaxed) == 0);
imm.UnrefAll(&to_delete);
MemTable* m = mem->Unref();
if (m != nullptr) {
to_delete.push_back(m);
}
current->Unref();
}
void DBImpl::SuperVersion::Init(MemTable* new_mem, const MemTableList& new_imm,
Version* new_current) {
mem = new_mem;
imm = new_imm;
current = new_current;
mem->Ref();
imm.RefAll();
current->Ref();
refs.store(1, std::memory_order_relaxed);
}
// Returns the list of live files in 'sst_live' and the list
// of all files in the filesystem in 'all_files'.
// no_full_scan = true -- never do the full scan using GetChildren()
......@@ -458,7 +516,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
mutex_.AssertHeld();
// if deletion is disabled, do nothing
if (disable_delete_obsolete_files_) {
if (disable_delete_obsolete_files_ > 0) {
return;
}
......@@ -521,11 +579,6 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// free pending memtables
for (auto m : state.memtables_to_free) {
delete m;
}
// check if there is anything to do
if (!state.all_files.size() &&
!state.sst_delete_files.size() &&
......@@ -1191,13 +1244,14 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
file_number, pending_outputs_, &deletion_state.memtables_to_free);
if (s.ok()) {
InstallSuperVersion(deletion_state);
if (madeProgress) {
*madeProgress = 1;
}
MaybeScheduleLogDBDeployStats();
if (!disable_delete_obsolete_files_) {
if (disable_delete_obsolete_files_ == 0) {
// add to deletion state
deletion_state.log_delete_files.insert(
deletion_state.log_delete_files.end(),
......@@ -1251,11 +1305,17 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) {
void DBImpl::ReFitLevel(int level, int target_level) {
assert(level < NumberLevels());
MutexLock l(&mutex_);
SuperVersion* superversion_to_free = nullptr;
SuperVersion* new_superversion =
new SuperVersion(options_.max_write_buffer_number);
mutex_.Lock();
// only allow one thread refitting
if (refitting_level_) {
mutex_.Unlock();
Log(options_.info_log, "ReFitLevel: another thread is refitting");
delete new_superversion;
return;
}
refitting_level_ = true;
......@@ -1291,6 +1351,8 @@ void DBImpl::ReFitLevel(int level, int target_level) {
edit.DebugString().data());
auto status = versions_->LogAndApply(&edit, &mutex_);
superversion_to_free = InstallSuperVersion(new_superversion);
new_superversion = nullptr;
Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());
......@@ -1302,6 +1364,10 @@ void DBImpl::ReFitLevel(int level, int target_level) {
refitting_level_ = false;
bg_work_gate_closed_ = false;
mutex_.Unlock();
delete superversion_to_free;
delete new_superversion;
}
int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) {
......@@ -1676,7 +1742,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
DeletionState deletion_state(options_.max_write_buffer_number);
DeletionState deletion_state(options_.max_write_buffer_number, true);
assert(bg_flush_scheduled_);
MutexLock l(&mutex_);
......@@ -1722,7 +1788,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
DeletionState deletion_state(options_.max_write_buffer_number);
DeletionState deletion_state(options_.max_write_buffer_number, true);
MaybeDumpStats();
......@@ -1775,7 +1841,7 @@ void DBImpl::BackgroundCallCompaction() {
}
Status DBImpl::BackgroundCompaction(bool* madeProgress,
DeletionState& deletion_state) {
DeletionState& deletion_state) {
*madeProgress = false;
mutex_.AssertHeld();
......@@ -1828,6 +1894,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_);
InstallSuperVersion(deletion_state);
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
......@@ -2489,6 +2556,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (status.ok()) {
status = InstallCompactionResults(compact);
InstallSuperVersion(deletion_state);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log,
......@@ -2593,6 +2661,44 @@ Status DBImpl::Get(const ReadOptions& options,
return GetImpl(options, key, value);
}
// DeletionState gets created and destructed outside of the lock -- we
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete one SuperVersion() outside of the lock -- superversion_to_free
//
// However, if InstallSuperVersion() gets called twice with the same,
// deletion_state, we can't reuse the SuperVersion() that got malloced because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl::InstallSuperVersion(DeletionState& deletion_state) {
// if new_superversion == nullptr, it means somebody already used it
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
SuperVersion* old_superversion = InstallSuperVersion(new_superversion);
deletion_state.new_superversion = nullptr;
if (deletion_state.superversion_to_free != nullptr) {
// somebody already put it there
delete old_superversion;
} else {
deletion_state.superversion_to_free = old_superversion;
}
}
DBImpl::SuperVersion* DBImpl::InstallSuperVersion(
SuperVersion* new_superversion) {
mutex_.AssertHeld();
new_superversion->Init(mem_, imm_, versions_->current());
SuperVersion* old_superversion = super_version_;
super_version_ = new_superversion;
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
return old_superversion; // will let caller delete outside of mutex
}
return nullptr;
}
Status DBImpl::GetImpl(const ReadOptions& options,
const Slice& key,
std::string* value,
......@@ -2601,27 +2707,20 @@ Status DBImpl::GetImpl(const ReadOptions& options,
StopWatch sw(env_, options_.statistics.get(), DB_GET);
SequenceNumber snapshot;
std::vector<MemTable*> to_delete;
mutex_.Lock();
if (options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTableList imm = imm_;
Version* current = versions_->current();
mem->Ref();
imm.RefAll();
current->Ref();
// Unlock while reading from files and memtables
// This can be replaced by using atomics and spinlock instead of big mutex
mutex_.Lock();
SuperVersion* get_version = super_version_->Ref();
mutex_.Unlock();
bool have_stat_update = false;
Version::GetStats stats;
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
......@@ -2629,32 +2728,41 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s, merge_context, options_)) {
if (get_version->mem->Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else if (imm.Get(lkey, value, &s, merge_context, options_)) {
} else if (get_version->imm.Get(lkey, value, &s, merge_context, options_)) {
// Done
RecordTick(options_.statistics.get(), MEMTABLE_HIT);
} else {
current->Get(options, lkey, value, &s, &merge_context, &stats,
options_, value_found);
get_version->current->Get(options, lkey, value, &s, &merge_context, &stats,
options_, value_found);
have_stat_update = true;
RecordTick(options_.statistics.get(), MEMTABLE_MISS);
}
mutex_.Lock();
if (!options_.disable_seek_compaction &&
have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleFlushOrCompaction();
bool delete_get_version = false;
if (!options_.disable_seek_compaction && have_stat_update) {
mutex_.Lock();
if (get_version->current->UpdateStats(stats)) {
MaybeScheduleFlushOrCompaction();
}
if (get_version->Unref()) {
get_version->Cleanup();
delete_get_version = true;
}
mutex_.Unlock();
} else {
if (get_version->Unref()) {
mutex_.Lock();
get_version->Cleanup();
mutex_.Unlock();
delete_get_version = true;
}
}
if (delete_get_version) {
delete get_version;
}
MemTable* m = mem->Unref();
imm.UnrefAll(&to_delete);
current->Unref();
mutex_.Unlock();
// free up all obsolete memtables outside the mutex
delete m;
for (MemTable* v: to_delete) delete v;
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics.get(), NUMBER_KEYS_READ);
......@@ -2833,7 +2941,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w.done = false;
StopWatch sw(env_, options_.statistics.get(), DB_WRITE);
MutexLock l(&mutex_);
mutex_.Lock();
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
......@@ -2844,6 +2952,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
if (w.done) {
mutex_.Unlock();
RecordTick(options_.statistics.get(), WRITE_DONE_BY_OTHER, 1);
return w.status;
} else {
......@@ -2851,7 +2960,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == nullptr);
SuperVersion* superversion_to_free = nullptr;
Status status = MakeRoomForWrite(my_batch == nullptr, &superversion_to_free);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions
......@@ -2939,6 +3049,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
mutex_.Unlock();
delete superversion_to_free;
return status;
}
......@@ -3031,7 +3143,8 @@ uint64_t DBImpl::SlowdownAmount(int n, int top, int bottom) {
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
Status DBImpl::MakeRoomForWrite(bool force,
SuperVersion** superversion_to_free) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
......@@ -3040,6 +3153,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
uint64_t rate_limit_delay_millis = 0;
Status s;
double score;
*superversion_to_free = nullptr;
while (true) {
if (!bg_error_.ok()) {
......@@ -3166,6 +3280,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// Do this without holding the dbmutex lock.
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
SuperVersion* new_superversion = nullptr;
mutex_.Unlock();
{
EnvOptions soptions(storage_options_);
......@@ -3182,6 +3297,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size);
memtmp = new MemTable(
internal_comparator_, mem_rep_factory_, NumberLevels(), options_);
new_superversion = new SuperVersion(options_.max_write_buffer_number);
}
}
mutex_.Lock();
......@@ -3206,6 +3322,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mem_->SetLogNumber(logfile_number_);
force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction();
*superversion_to_free = InstallSuperVersion(new_superversion);
}
}
return s;
......@@ -3562,7 +3679,7 @@ Status DBImpl::DeleteFile(std::string name) {
FileMetaData metadata;
int maxlevel = NumberLevels();
VersionEdit edit(maxlevel);
DeletionState deletion_state;
DeletionState deletion_state(0, true);
{
MutexLock l(&mutex_);
status = versions_->GetMetadataForFile(number, &level, &metadata);
......@@ -3592,14 +3709,14 @@ Status DBImpl::DeleteFile(std::string name) {
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(&edit, &mutex_);
if (status.ok()) {
InstallSuperVersion(deletion_state);
}
FindObsoleteFiles(deletion_state, false);
} // lock released here
LogFlush(options_.info_log);
if (status.ok()) {
// remove files outside the db-lock
PurgeObsoleteFiles(deletion_state);
}
// remove files outside the db-lock
PurgeObsoleteFiles(deletion_state);
return status;
}
......@@ -3712,6 +3829,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
delete impl->InstallSuperVersion(new DBImpl::SuperVersion());
impl->mem_->SetLogNumber(impl->logfile_number_);
impl->DeleteObsoleteFiles();
impl->MaybeScheduleFlushOrCompaction();
......
......@@ -105,7 +105,7 @@ class DBImpl : public DB {
virtual Status Flush(const FlushOptions& options,
const ColumnFamilyHandle& column_family);
virtual Status DisableFileDeletions();
virtual Status EnableFileDeletions();
virtual Status EnableFileDeletions(bool force);
// All the returned filenames start with "/"
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
......@@ -160,12 +160,38 @@ class DBImpl : public DB {
default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL;
}
// needed for CleanupIteratorState
// holds references to memtable, all immutable memtables and version
struct SuperVersion {
MemTable* mem;
MemTableList imm;
Version* current;
std::atomic<uint32_t> refs;
// We need to_delete because during Cleanup(), imm.UnrefAll() returns
// all memtables that we need to free through this vector. We then
// delete all those memtables outside of mutex, during destruction
std::vector<MemTable*> to_delete;
// should be called outside the mutex
explicit SuperVersion(const int num_memtables = 0);
~SuperVersion();
SuperVersion* Ref();
// Returns true if this was the last reference and caller should
// call Clenaup() and delete the object
bool Unref();
// call these two methods with db mutex held
// Cleanup unrefs mem, imm and current. Also, it stores all memtables
// that needs to be deleted in to_delete vector. Unrefing those
// objects needs to be done in the mutex
void Cleanup();
void Init(MemTable* new_mem, const MemTableList& new_imm,
Version* new_current);
};
// needed for CleanupIteratorState
struct DeletionState {
inline bool HaveSomethingToDelete() const {
return memtables_to_free.size() ||
all_files.size() ||
return all_files.size() ||
sst_delete_files.size() ||
log_delete_files.size();
}
......@@ -187,15 +213,35 @@ class DBImpl : public DB {
// a list of memtables to be free
std::vector<MemTable *> memtables_to_free;
SuperVersion* superversion_to_free; // if nullptr nothing to free
SuperVersion* new_superversion; // if nullptr no new superversion
// the current manifest_file_number, log_number and prev_log_number
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number, log_number, prev_log_number;
explicit DeletionState(const int num_memtables = 0) {
explicit DeletionState(const int num_memtables = 0,
bool create_superversion = false) {
manifest_file_number = 0;
log_number = 0;
prev_log_number = 0;
memtables_to_free.reserve(num_memtables);
superversion_to_free = nullptr;
new_superversion =
create_superversion ? new SuperVersion(num_memtables) : nullptr;
}
~DeletionState() {
// free pending memtables
for (auto m : memtables_to_free) {
delete m;
}
// free superversion. if nullptr, this will be noop
delete superversion_to_free;
// if new_superversion was not used, it will be non-nullptr and needs
// to be freed here
delete new_superversion;
}
};
......@@ -272,7 +318,11 @@ class DBImpl : public DB {
uint64_t* filenumber);
uint64_t SlowdownAmount(int n, int top, int bottom);
Status MakeRoomForWrite(bool force /* compact even if there is room? */);
// MakeRoomForWrite will return superversion_to_free through an arugment,
// which the caller needs to delete. We do it because caller can delete
// the superversion outside of mutex
Status MakeRoomForWrite(bool force /* compact even if there is room? */,
SuperVersion** superversion_to_free);
WriteBatch* BuildBatchGroup(Writer** last_writer);
// Force current memtable contents to be flushed.
......@@ -356,6 +406,8 @@ class DBImpl : public DB {
uint64_t logfile_number_;
unique_ptr<log::Writer> log_;
SuperVersion* super_version_;
std::string host_name_;
// Queue of writers.
......@@ -396,7 +448,12 @@ class DBImpl : public DB {
int64_t volatile last_log_ts;
// shall we disable deletion of obsolete files
bool disable_delete_obsolete_files_;
// if 0 the deletion is enabled.
// if non-zero, files will not be getting deleted
// This enables two different threads to call
// EnableFileDeletions() and DisableFileDeletions()
// without any synchronization
int disable_delete_obsolete_files_;
// last time when DeleteObsoleteFiles was invoked
uint64_t delete_obsolete_files_last_run_;
......@@ -523,6 +580,18 @@ class DBImpl : public DB {
std::vector<SequenceNumber>& snapshots,
SequenceNumber* prev_snapshot);
// will return a pointer to SuperVersion* if previous SuperVersion
// if its reference count is zero and needs deletion or nullptr if not
// As argument takes a pointer to allocated SuperVersion
// Foreground threads call this function directly (they don't carry
// deletion state and have to handle their own creation and deletion
// of SuperVersion)
SuperVersion* InstallSuperVersion(SuperVersion* new_superversion);
// Background threads call this function, which is just a wrapper around
// the InstallSuperVersion() function above. Background threads carry
// deletion_state which can have new_superversion already allocated.
void InstallSuperVersion(DeletionState& deletion_state);
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options,
......
......@@ -74,7 +74,7 @@ public:
virtual Status DisableFileDeletions() {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status EnableFileDeletions() {
virtual Status EnableFileDeletions(bool force) {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status GetLiveFiles(std::vector<std::string>&,
......
......@@ -2549,12 +2549,13 @@ class DeleteFilter : public CompactionFilter {
class ChangeFilter : public CompactionFilter {
public:
explicit ChangeFilter(int argv) : argv_(argv) {}
explicit ChangeFilter(int argv) {
assert(argv == 100);
}
virtual bool Filter(int level, const Slice& key,
const Slice& value, std::string* new_value,
bool* value_changed) const override {
assert(argv_ == 100);
assert(new_value != nullptr);
*new_value = NEW_VALUE;
*value_changed = true;
......@@ -2564,9 +2565,6 @@ class ChangeFilter : public CompactionFilter {
virtual const char* Name() const override {
return "ChangeFilter";
}
private:
const int argv_;
};
class KeepFilterFactory : public CompactionFilterFactory {
......@@ -4491,7 +4489,7 @@ class ModelDB: public DB {
virtual Status DisableFileDeletions() {
return Status::OK();
}
virtual Status EnableFileDeletions() {
virtual Status EnableFileDeletions(bool force) {
return Status::OK();
}
virtual Status GetLiveFiles(std::vector<std::string>&, uint64_t* size,
......
......@@ -225,7 +225,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
*s = Status::NotFound(Slice());
*s = Status::NotFound();
}
return true;
}
......
......@@ -29,6 +29,11 @@ static void UnrefEntry(void* arg1, void* arg2) {
cache->Release(h);
}
static Slice GetSliceForFileNumber(uint64_t file_number) {
return Slice(reinterpret_cast<const char*>(&file_number),
sizeof(file_number));
}
TableCache::TableCache(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
......@@ -50,9 +55,7 @@ Status TableCache::FindTable(const EnvOptions& toptions,
Cache::Handle** handle, bool* table_io,
const bool no_io) {
Status s;
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
Slice key(buf, sizeof(buf));
Slice key = GetSliceForFileNumber(file_number);
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
if (no_io) { // Dont do IO and return a not-found status
......@@ -165,9 +168,7 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options,
}
void TableCache::Evict(uint64_t file_number) {
char buf[sizeof(file_number)];
EncodeFixed64(buf, file_number);
cache_->Erase(Slice(buf, sizeof(buf)));
cache_->Erase(GetSliceForFileNumber(file_number));
}
} // namespace rocksdb
......@@ -545,7 +545,7 @@ void Version::Get(const ReadOptions& options,
case kFound:
return;
case kDeleted:
*status = Status::NotFound(Slice()); // Use empty error message for speed
*status = Status::NotFound(); // Use empty error message for speed
return;
case kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
......@@ -570,7 +570,7 @@ void Version::Get(const ReadOptions& options,
user_key);
}
} else {
*status = Status::NotFound(Slice()); // Use an empty error message for speed
*status = Status::NotFound(); // Use an empty error message for speed
}
}
......
......@@ -272,12 +272,14 @@ class VersionSet {
int64_t NumLevelBytes(int level) const;
// Return the last sequence number.
uint64_t LastSequence() const { return last_sequence_; }
uint64_t LastSequence() const {
return last_sequence_.load(std::memory_order_acquire);
}
// Set the last sequence number to s.
void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_);
last_sequence_ = s;
last_sequence_.store(s, std::memory_order_release);
}
// Mark the specified file number as used.
......@@ -476,7 +478,7 @@ class VersionSet {
const InternalKeyComparator icmp_;
uint64_t next_file_number_;
uint64_t manifest_file_number_;
uint64_t last_sequence_;
std::atomic<uint64_t> last_sequence_;
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
......
......@@ -363,7 +363,15 @@ class DB {
virtual Status DisableFileDeletions() = 0;
// Allow compactions to delete obselete files.
virtual Status EnableFileDeletions() = 0;
// If force == true, the call to EnableFileDeletions() will guarantee that
// file deletions are enabled after the call, even if DisableFileDeletions()
// was called multiple times before.
// If force == false, EnableFileDeletions will only enable file deletion
// after it's been called at least as many times as DisableFileDeletions(),
// enabling the two methods to be called by two threads concurrently without
// synchronization -- i.e., file deletions will be enabled only after both
// threads call EnableFileDeletions()
virtual Status EnableFileDeletions(bool force = true) = 0;
// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup
......
......@@ -25,7 +25,7 @@ namespace rocksdb {
class Status {
public:
// Create a success status.
Status() : state_(nullptr) { }
Status() : code_(kOk), state_(nullptr) { }
~Status() { delete[] state_; }
// Copy the specified status.
......@@ -39,6 +39,10 @@ class Status {
static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotFound, msg, msg2);
}
// Fast path for not found without malloc;
static Status NotFound() {
return Status(kNotFound);
}
static Status Corruption(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kCorruption, msg, msg2);
}
......@@ -59,7 +63,7 @@ class Status {
}
// Returns true iff the status indicates success.
bool ok() const { return (state_ == nullptr); }
bool ok() const { return code() == kOk; }
// Returns true iff the status indicates a NotFound error.
bool IsNotFound() const { return code() == kNotFound; }
......@@ -87,13 +91,6 @@ class Status {
std::string ToString() const;
private:
// OK status has a nullptr state_. Otherwise, state_ is a new[] array
// of the following form:
// state_[0..3] == length of message
// state_[4] == code
// state_[5..] == message
const char* state_;
enum Code {
kOk = 0,
kNotFound = 1,
......@@ -105,20 +102,30 @@ class Status {
kIncomplete = 7
};
// A nullptr state_ (which is always the case for OK) means the message
// is empty.
// of the following form:
// state_[0..3] == length of message
// state_[4..] == message
Code code_;
const char* state_;
Code code() const {
return (state_ == nullptr) ? kOk : static_cast<Code>(state_[4]);
return code_;
}
explicit Status(Code code) : code_(code), state_(nullptr) { }
Status(Code code, const Slice& msg, const Slice& msg2);
static const char* CopyState(const char* s);
};
inline Status::Status(const Status& s) {
code_ = s.code_;
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
}
inline void Status::operator=(const Status& s) {
// The following condition catches both aliasing (when this == &s),
// and the common case where both s and *this are ok.
code_ = s.code_;
if (state_ != s.state_) {
delete[] state_;
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
......
......@@ -56,7 +56,7 @@ class LogFile {
};
struct BatchResult {
SequenceNumber sequence = SequenceNumber();
SequenceNumber sequence = 0;
std::unique_ptr<WriteBatch> writeBatchPtr;
};
......
......@@ -158,8 +158,8 @@ class StackableDB : public DB {
return db_->DisableFileDeletions();
}
virtual Status EnableFileDeletions() override {
return db_->EnableFileDeletions();
virtual Status EnableFileDeletions(bool force) override {
return db_->EnableFileDeletions(force);
}
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <algorithm>
#include <cassert>
#include <stdexcept>
#include <iterator>
#include <vector>
namespace rocksdb {
// A vector that leverages pre-allocated stack-based array to achieve better
// performance for array with small amount of items.
//
// The interface resembles that of vector, but with less features since we aim
// to solve the problem that we have in hand, rather than implementing a
// full-fledged generic container.
//
// Currently we don't support:
// * reserve()/shrink_to_fit()/resize()
// If used correctly, in most cases, people should not touch the
// underlying vector at all.
// * random insert()/erase(), please only use push_back()/pop_back().
// * No move/swap operations. Each autovector instance has a
// stack-allocated array and if we want support move/swap operations, we
// need to copy the arrays other than just swapping the pointers. In this
// case we'll just explicitly forbid these operations since they may
// lead users to make false assumption by thinking they are inexpensive
// operations.
//
// Naming style of public methods almost follows that of the STL's.
template <class T, size_t kSize = 8>
class autovector {
public:
// General STL-style container member types.
typedef T value_type;
typedef typename std::vector<T>::difference_type difference_type;
typedef typename std::vector<T>::size_type size_type;
typedef value_type& reference;
typedef const value_type& const_reference;
typedef value_type* pointer;
typedef const value_type* const_pointer;
// This class is the base for regular/const iterator
template <class TAutoVector, class TValueType>
class iterator_impl {
public:
// -- iterator traits
typedef iterator_impl<TAutoVector, TValueType> self_type;
typedef TValueType value_type;
typedef TValueType& reference;
typedef TValueType* pointer;
typedef typename TAutoVector::difference_type difference_type;
typedef std::random_access_iterator_tag iterator_category;
iterator_impl(TAutoVector* vect, size_t index)
: vect_(vect)
, index_(index) {
};
iterator_impl(const iterator_impl&) = default;
~iterator_impl() { }
iterator_impl& operator=(const iterator_impl&) = default;
// -- Advancement
// iterator++
self_type& operator++() {
++index_;
return *this;
}
// ++iterator
self_type operator++(int) {
auto old = *this;
++index_;
return old;
}
// iterator--
self_type& operator--() {
--index_;
return *this;
}
// --iterator
self_type operator--(int) {
auto old = *this;
--index_;
return old;
}
self_type operator-(difference_type len) {
return self_type(vect_, index_ - len);
}
difference_type operator-(const self_type& other) {
assert(vect_ == other.vect_);
return index_ - other.index_;
}
self_type operator+(difference_type len) {
return self_type(vect_, index_ + len);
}
self_type& operator+=(difference_type len) {
index_ += len;
return *this;
}
self_type& operator-=(difference_type len) {
index_ -= len;
return *this;
}
// -- Reference
reference operator*() {
assert(vect_->size() >= index_);
return (*vect_)[index_];
}
pointer operator->() {
assert(vect_->size() >= index_);
return &(*vect_)[index_];
}
// -- Logical Operators
bool operator==(const self_type& other) const {
assert(vect_ == other.vect_);
return index_ == other.index_;
}
bool operator!=(const self_type& other) const {
return !(*this == other);
}
bool operator>(const self_type& other) const {
assert(vect_ == other.vect_);
return index_ > other.index_;
}
bool operator<(const self_type& other) const {
assert(vect_ == other.vect_);
return index_ < other.index_;
}
bool operator>=(const self_type& other) const {
assert(vect_ == other.vect_);
return index_ >= other.index_;
}
bool operator<=(const self_type& other) const {
assert(vect_ == other.vect_);
return index_ <= other.index_;
}
private:
TAutoVector* vect_ = nullptr;
size_t index_ = 0;
};
typedef iterator_impl<autovector, value_type> iterator;
typedef iterator_impl<const autovector, const value_type> const_iterator;
typedef std::reverse_iterator<iterator> reverse_iterator;
typedef std::reverse_iterator<const_iterator> const_reverse_iterator;
autovector() = default;
~autovector() = default;
// -- Immutable operations
// Indicate if all data resides in in-stack data structure.
bool only_in_stack() const {
// If no element was inserted at all, the vector's capacity will be `0`.
return vect_.capacity() == 0;
}
size_type size() const {
return num_stack_items_ + vect_.size();
}
bool empty() const {
return size() == 0;
}
// will not check boundry
const_reference operator[](size_type n) const {
return n < kSize ? values_[n] : vect_[n - kSize];
}
reference operator[](size_type n) {
return n < kSize ? values_[n] : vect_[n - kSize];
}
// will check boundry
const_reference at(size_type n) const {
if (n >= size()) {
throw std::out_of_range("autovector: index out of range");
}
return (*this)[n];
}
reference at(size_type n) {
if (n >= size()) {
throw std::out_of_range("autovector: index out of range");
}
return (*this)[n];
}
reference front() {
assert(!empty());
return *begin();
}
const_reference front() const {
assert(!empty());
return *begin();
}
reference back() {
assert(!empty());
return *(end() - 1);
}
const_reference back() const {
assert(!empty());
return *(end() - 1);
}
// -- Mutable Operations
void push_back(T&& item) {
if (num_stack_items_ < kSize) {
values_[num_stack_items_++] = std::move(item);
} else {
vect_.push_back(item);
}
}
void push_back(const T& item) {
push_back(value_type(item));
}
template<class... Args>
void emplace_back(Args&&... args) {
push_back(value_type(args...));
}
void pop_back() {
assert(!empty());
if (!vect_.empty()) {
vect_.pop_back();
} else {
--num_stack_items_;
}
}
void clear() {
num_stack_items_ = 0;
vect_.clear();
}
// -- Copy and Assignment
autovector& assign(const autovector& other);
autovector(const autovector& other) {
assign(other);
}
autovector& operator=(const autovector& other) {
return assign(other);
}
// move operation are disallowed since it is very hard to make sure both
// autovectors are allocated from the same function stack.
autovector& operator=(autovector&& other) = delete;
autovector(autovector&& other) = delete;
// -- Iterator Operations
iterator begin() {
return iterator(this, 0);
}
const_iterator begin() const {
return const_iterator(this, 0);
}
iterator end() {
return iterator(this, this->size());
}
const_iterator end() const {
return const_iterator(this, this->size());
}
reverse_iterator rbegin() {
return reverse_iterator(end());
}
const_reverse_iterator rbegin() const {
return const_reverse_iterator(end());
}
reverse_iterator rend() {
return reverse_iterator(begin());
}
const_reverse_iterator rend() const {
return const_reverse_iterator(begin());
}
private:
size_type num_stack_items_ = 0; // current number of items
value_type values_[kSize]; // the first `kSize` items
// used only if there are more than `kSize` items.
std::vector<T> vect_;
};
template <class T, size_t kSize>
autovector<T, kSize>& autovector<T, kSize>::assign(const autovector& other) {
// copy the internal vector
vect_.assign(other.vect_.begin(), other.vect_.end());
// copy array
num_stack_items_ = other.num_stack_items_;
std::copy(other.values_, other.values_ + num_stack_items_, values_);
return *this;
}
} // rocksdb
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <atomic>
#include <iostream>
#include "rocksdb/env.h"
#include "util/autovector.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
using namespace std;
class AutoVectorTest { };
const size_t kSize = 8;
TEST(AutoVectorTest, PushBackAndPopBack) {
autovector<size_t, kSize> vec;
ASSERT_TRUE(vec.empty());
ASSERT_EQ(0ul, vec.size());
for (size_t i = 0; i < 1000 * kSize; ++i) {
vec.push_back(i);
ASSERT_TRUE(!vec.empty());
if (i < kSize) {
ASSERT_TRUE(vec.only_in_stack());
} else {
ASSERT_TRUE(!vec.only_in_stack());
}
ASSERT_EQ(i + 1, vec.size());
ASSERT_EQ(i, vec[i]);
ASSERT_EQ(i, vec.at(i));
}
size_t size = vec.size();
while (size != 0) {
vec.pop_back();
// will always be in heap
ASSERT_TRUE(!vec.only_in_stack());
ASSERT_EQ(--size, vec.size());
}
ASSERT_TRUE(vec.empty());
}
TEST(AutoVectorTest, EmplaceBack) {
typedef std::pair<size_t, std::string> ValueType;
autovector<ValueType, kSize> vec;
for (size_t i = 0; i < 1000 * kSize; ++i) {
vec.emplace_back(i, std::to_string(i + 123));
ASSERT_TRUE(!vec.empty());
if (i < kSize) {
ASSERT_TRUE(vec.only_in_stack());
} else {
ASSERT_TRUE(!vec.only_in_stack());
}
ASSERT_EQ(i + 1, vec.size());
ASSERT_EQ(i, vec[i].first);
ASSERT_EQ(std::to_string(i + 123), vec[i].second);
}
vec.clear();
ASSERT_TRUE(vec.empty());
ASSERT_TRUE(!vec.only_in_stack());
}
void AssertEqual(
const autovector<size_t, kSize>& a, const autovector<size_t, kSize>& b) {
ASSERT_EQ(a.size(), b.size());
ASSERT_EQ(a.empty(), b.empty());
ASSERT_EQ(a.only_in_stack(), b.only_in_stack());
for (size_t i = 0; i < a.size(); ++i) {
ASSERT_EQ(a[i], b[i]);
}
}
TEST(AutoVectorTest, CopyAndAssignment) {
// Test both heap-allocated and stack-allocated cases.
for (auto size : { kSize / 2, kSize * 1000 }) {
autovector<size_t, kSize> vec;
for (size_t i = 0; i < size; ++i) {
vec.push_back(i);
}
{
autovector<size_t, kSize> other;
other = vec;
AssertEqual(other, vec);
}
{
autovector<size_t, kSize> other(vec);
AssertEqual(other, vec);
}
}
}
TEST(AutoVectorTest, Iterators) {
autovector<std::string, kSize> vec;
for (size_t i = 0; i < kSize * 1000; ++i) {
vec.push_back(std::to_string(i));
}
// basic operator test
ASSERT_EQ(vec.front(), *vec.begin());
ASSERT_EQ(vec.back(), *(vec.end() - 1));
ASSERT_TRUE(vec.begin() < vec.end());
// non-const iterator
size_t index = 0;
for (const auto& item : vec) {
ASSERT_EQ(vec[index++], item);
}
index = vec.size() - 1;
for (auto pos = vec.rbegin(); pos != vec.rend(); ++pos) {
ASSERT_EQ(vec[index--], *pos);
}
// const iterator
const auto& cvec = vec;
index = 0;
for (const auto& item : cvec) {
ASSERT_EQ(cvec[index++], item);
}
index = vec.size() - 1;
for (auto pos = cvec.rbegin(); pos != cvec.rend(); ++pos) {
ASSERT_EQ(cvec[index--], *pos);
}
// forward and backward
auto pos = vec.begin();
while (pos != vec.end()) {
auto old_val = *pos;
auto old = pos++;
// HACK: make sure -> works
ASSERT_TRUE(!old->empty());
ASSERT_EQ(old_val, *old);
ASSERT_TRUE(pos == vec.end() || old_val != *pos);
}
pos = vec.begin();
for (size_t i = 0; i < vec.size(); i += 2) {
// Cannot use ASSERT_EQ since that macro depends on iostream serialization
ASSERT_TRUE(pos + 2 - 2 == pos);
pos += 2;
ASSERT_TRUE(pos >= vec.begin());
ASSERT_TRUE(pos <= vec.end());
size_t diff = static_cast<size_t>(pos - vec.begin());
ASSERT_EQ(i + 2, diff);
}
}
vector<string> GetTestKeys(size_t size) {
vector<string> keys;
keys.resize(size);
int index = 0;
for (auto& key : keys) {
key = "item-" + to_string(index++);
}
return keys;
}
template<class TVector>
void BenchmarkVectorCreationAndInsertion(
string name, size_t ops, size_t item_size,
const std::vector<typename TVector::value_type>& items) {
auto env = Env::Default();
int index = 0;
auto start_time = env->NowNanos();
auto ops_remaining = ops;
while(ops_remaining--) {
TVector v;
for (size_t i = 0; i < item_size; ++i) {
v.push_back(items[index++]);
}
}
auto elapsed = env->NowNanos() - start_time;
cout << "created " << ops << " " << name << " instances:\n\t"
<< "each was inserted with " << item_size << " elements\n\t"
<< "total time elapsed: " << elapsed << " (ns)" << endl;
}
template <class TVector>
size_t BenchmarkSequenceAccess(string name, size_t ops, size_t elem_size) {
TVector v;
for (const auto& item : GetTestKeys(elem_size)) {
v.push_back(item);
}
auto env = Env::Default();
auto ops_remaining = ops;
auto start_time = env->NowNanos();
size_t total = 0;
while (ops_remaining--) {
auto end = v.end();
for (auto pos = v.begin(); pos != end; ++pos) {
total += pos->size();
}
}
auto elapsed = env->NowNanos() - start_time;
cout << "performed " << ops << " sequence access against " << name << "\n\t"
<< "size: " << elem_size << "\n\t"
<< "total time elapsed: " << elapsed << " (ns)" << endl;
// HACK avoid compiler's optimization to ignore total
return total;
}
// This test case only reports the performance between std::vector<string>
// and autovector<string>. We chose string for comparison because in most
// o our use cases we used std::vector<string>.
TEST(AutoVectorTest, PerfBench) {
// We run same operations for kOps times in order to get a more fair result.
size_t kOps = 100000;
// Creation and insertion test
// Test the case when there is:
// * no element inserted: internal array of std::vector may not really get
// initialize.
// * one element inserted: internal array of std::vector must have
// initialized.
// * kSize elements inserted. This shows the most time we'll spend if we
// keep everything in stack.
// * 2 * kSize elements inserted. The internal vector of
// autovector must have been initialized.
cout << "=====================================================" << endl;
cout << "Creation and Insertion Test (value type: std::string)" << endl;
cout << "=====================================================" << endl;
// pre-generated unique keys
auto string_keys = GetTestKeys(kOps * 2 * kSize);
for (auto insertions : { 0ul, 1ul, kSize / 2, kSize, 2 * kSize }) {
BenchmarkVectorCreationAndInsertion<vector<string>>(
"vector<string>", kOps, insertions, string_keys
);
BenchmarkVectorCreationAndInsertion<autovector<string, kSize>>(
"autovector<string>", kOps, insertions, string_keys
);
cout << "-----------------------------------" << endl;
}
cout << "=====================================================" << endl;
cout << "Creation and Insertion Test (value type: uint64_t)" << endl;
cout << "=====================================================" << endl;
// pre-generated unique keys
vector<uint64_t> int_keys(kOps * 2 * kSize);
for (size_t i = 0; i < kOps * 2 * kSize; ++i) {
int_keys[i] = i;
}
for (auto insertions : { 0ul, 1ul, kSize / 2, kSize, 2 * kSize }) {
BenchmarkVectorCreationAndInsertion<vector<uint64_t>>(
"vector<uint64_t>", kOps, insertions, int_keys
);
BenchmarkVectorCreationAndInsertion<autovector<uint64_t, kSize>>(
"autovector<uint64_t>", kOps, insertions, int_keys
);
cout << "-----------------------------------" << endl;
}
// Sequence Access Test
cout << "=====================================================" << endl;
cout << "Sequence Access Test" << endl;
cout << "=====================================================" << endl;
for (auto elem_size : { kSize / 2, kSize, 2 * kSize }) {
BenchmarkSequenceAccess<vector<string>>(
"vector", kOps, elem_size
);
BenchmarkSequenceAccess<autovector<string, kSize>>(
"autovector", kOps, elem_size
);
cout << "-----------------------------------" << endl;
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}
......@@ -389,7 +389,7 @@ class PosixMmapFile : public WritableFile {
}
Status MapNewRegion() {
#ifdef OS_LINUX
#ifdef ROCKSDB_FALLOCATE_PRESENT
assert(base_ == nullptr);
TEST_KILL_RANDOM(rocksdb_kill_odds);
......@@ -575,7 +575,7 @@ class PosixMmapFile : public WritableFile {
#endif
}
#ifdef OS_LINUX
#ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) {
TEST_KILL_RANDOM(rocksdb_kill_odds);
if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
......@@ -752,7 +752,7 @@ class PosixWritableFile : public WritableFile {
#endif
}
#ifdef OS_LINUX
#ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) {
TEST_KILL_RANDOM(rocksdb_kill_odds);
if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
......@@ -856,7 +856,7 @@ class PosixRandomRWFile : public RandomRWFile {
return Status::OK();
}
#ifdef OS_LINUX
#ifdef ROCKSDB_FALLOCATE_PRESENT
virtual Status Allocate(off_t offset, off_t len) {
if (!fallocate(fd_, FALLOC_FL_KEEP_SIZE, offset, len)) {
return Status::OK();
......@@ -1297,7 +1297,7 @@ class PosixEnv : public Env {
}
bool SupportsFastAllocate(const std::string& path) {
#ifdef OS_LINUX
#ifdef ROCKSDB_FALLOCATE_PRESENT
struct statfs s;
if (statfs(path.c_str(), &s)){
return false;
......
......@@ -111,7 +111,7 @@ class PosixLogger : public Logger {
assert(p <= limit);
const size_t write_size = p - base;
#ifdef OS_LINUX
#ifdef ROCKSDB_FALLOCATE_PRESENT
// If this write would cross a boundary of kDebugLogChunkSize
// space, pre-allocate more space to avoid overly large
// allocations from filesystem allocsize options.
......
......@@ -16,68 +16,65 @@ namespace rocksdb {
const char* Status::CopyState(const char* state) {
uint32_t size;
memcpy(&size, state, sizeof(size));
char* result = new char[size + 5];
memcpy(result, state, size + 5);
char* result = new char[size + 4];
memcpy(result, state, size + 4);
return result;
}
Status::Status(Code code, const Slice& msg, const Slice& msg2) {
Status::Status(Code code, const Slice& msg, const Slice& msg2) :
code_(code) {
assert(code != kOk);
const uint32_t len1 = msg.size();
const uint32_t len2 = msg2.size();
const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
char* result = new char[size + 5];
char* result = new char[size + 4];
memcpy(result, &size, sizeof(size));
result[4] = static_cast<char>(code);
memcpy(result + 5, msg.data(), len1);
memcpy(result + 4, msg.data(), len1);
if (len2) {
result[5 + len1] = ':';
result[6 + len1] = ' ';
memcpy(result + 7 + len1, msg2.data(), len2);
result[4 + len1] = ':';
result[5 + len1] = ' ';
memcpy(result + 6 + len1, msg2.data(), len2);
}
state_ = result;
}
std::string Status::ToString() const {
if (state_ == nullptr) {
return "OK";
} else {
char tmp[30];
const char* type;
switch (code()) {
case kOk:
type = "OK";
break;
case kNotFound:
type = "NotFound: ";
break;
case kCorruption:
type = "Corruption: ";
break;
case kNotSupported:
type = "Not implemented: ";
break;
case kInvalidArgument:
type = "Invalid argument: ";
break;
case kIOError:
type = "IO error: ";
break;
case kMergeInProgress:
type = "Merge In Progress: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code()));
type = tmp;
break;
}
std::string result(type);
char tmp[30];
const char* type;
switch (code_) {
case kOk:
return "OK";
case kNotFound:
type = "NotFound: ";
break;
case kCorruption:
type = "Corruption: ";
break;
case kNotSupported:
type = "Not implemented: ";
break;
case kInvalidArgument:
type = "Invalid argument: ";
break;
case kIOError:
type = "IO error: ";
break;
case kMergeInProgress:
type = "Merge In Progress: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
static_cast<int>(code()));
type = tmp;
break;
}
std::string result(type);
if (state_ != nullptr) {
uint32_t length;
memcpy(&length, state_, sizeof(length));
result.append(state_ + 5, length);
return result;
result.append(state_ + 4, length);
}
return result;
}
} // namespace rocksdb
......@@ -50,7 +50,7 @@ class DummyDB : public StackableDB {
return options_;
}
virtual Status EnableFileDeletions() override {
virtual Status EnableFileDeletions(bool force) override {
ASSERT_TRUE(!deletions_enabled_);
deletions_enabled_ = true;
return Status::OK();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册