提交 1304d8c8 编写于 作者: K kailiu

Merge branch 'master' into performance

Conflicts:
	Makefile
	db/db_impl.cc
	db/db_impl.h
	db/db_test.cc
	db/memtable.cc
	db/memtable.h
	db/version_edit.h
	db/version_set.cc
	include/rocksdb/options.h
	util/hash_skiplist_rep.cc
	util/options.cc
......@@ -2,46 +2,4 @@
# http://clang.llvm.org/docs/ClangFormatStyleOptions.html
---
BasedOnStyle: Google
AccessModifierOffset: -1
ConstructorInitializerIndentWidth: 4
AlignEscapedNewlinesLeft: true
AlignTrailingComments: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AlwaysBreakTemplateDeclarations: true
AlwaysBreakBeforeMultilineStrings: true
BreakBeforeBinaryOperators: false
BreakConstructorInitializersBeforeComma: false
BinPackParameters: false
ColumnLimit: 80
ConstructorInitializerAllOnOneLineOrOnePerLine: true
DerivePointerBinding: true
ExperimentalAutoDetectBinPacking: true
IndentCaseLabels: false
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCSpaceBeforeProtocolList: false
PenaltyBreakBeforeFirstCallParameter: 10
PenaltyBreakComment: 60
PenaltyBreakString: 1000
PenaltyBreakFirstLessLess: 20
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 200
PointerBindsToType: true
SpacesBeforeTrailingComments: 2
Cpp11BracedListStyle: true
Standard: Cpp11
IndentWidth: 2
TabWidth: 8
UseTab: Never
BreakBeforeBraces: Attach
IndentFunctionDeclarationAfterType: false
SpacesInParentheses: false
SpacesInAngles: false
SpaceInEmptyParentheses: false
SpacesInCStyleCastParentheses: false
SpaceAfterControlStatementKeyword: true
SpaceBeforeAssignmentOperators: true
ContinuationIndentWidth: 4
...
......@@ -126,19 +126,22 @@ $(SHARED2): $(SHARED3)
ln -fs $(SHARED3) $(SHARED2)
endif
$(SHARED3):
$(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(COVERAGEFLAGS) $(PLATFORM_SHARED_CFLAGS) $(SOURCES) -o $@ $(LDFLAGS)
$(SHARED3): $(LIBOBJECTS)
$(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(LDFLAGS) $(SOURCES)-o $@
endif # PLATFORM_SHARED_EXT
all: $(LIBRARY) $(PROGRAMS)
.PHONY: blackbox_crash_test check clean coverage crash_test ldb_tests \
release tags valgrind_check whitebox_crash_test
release tags valgrind_check whitebox_crash_test format
# Will also generate shared libraries.
release:
$(MAKE) clean
OPT="-DNDEBUG -O2" $(MAKE) -j32
OPT="-DNDEBUG -O2" $(MAKE) all -j32
OPT="-DNDEBUG -O2" $(MAKE) $(SHARED) -j32
coverage:
$(MAKE) clean
......@@ -195,6 +198,9 @@ tags:
ctags * -R
cscope -b `find . -name '*.cc'` `find . -name '*.h'`
format:
build_tools/format-diff.sh
# ---------------------------------------------------------------------------
# Unit tests and tools
# ---------------------------------------------------------------------------
......@@ -416,6 +422,12 @@ DEPFILES = $(filter-out util/build_version.d,$(SOURCES:.cc=.d))
depend: $(DEPFILES)
# if the make goal is either "clean" or "format", we shouldn't
# try to import the *.d files.
# TODO(kailiu) The unfamiliarity of Make's conditions leads to the ugly
# working solution.
ifneq ($(MAKECMDGOALS),clean)
ifneq ($(MAKECMDGOALS),format)
-include $(DEPFILES)
endif
endif
......@@ -81,9 +81,9 @@ PLATFORM_CCFLAGS=
PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS ${CXXFLAGS}"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS"
PLATFORM_SHARED_EXT="so"
PLATFORM_SHARED_LDFLAGS="${EXEC_LDFLAGS_SHARED} -shared -Wl,-soname -Wl,"
PLATFORM_SHARED_LDFLAGS="-shared -Wl,-soname -Wl,"
PLATFORM_SHARED_CFLAGS="-fPIC"
PLATFORM_SHARED_VERSIONED=true
PLATFORM_SHARED_VERSIONED=false
# generic port files (working on all platform by #ifdef) go directly in /port
GENERIC_PORT_FILES=`find $ROCKSDB_ROOT/port -name '*.cc' | tr "\n" " "`
......
......@@ -60,7 +60,7 @@ AR=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ar
RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib
CFLAGS="-B$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/gold -m64 -mtune=generic"
CFLAGS+=" -nostdlib $LIBGCC_INCLUDE $GLIBC_INCLUDE"
CFLAGS+=" $LIBGCC_INCLUDE $GLIBC_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DSNAPPY -DGFLAGS -DZLIB -DBZIP2"
......
#!/bin/bash
set -e
# If clang_format_diff.py command is not specfied, we assume we are able to
# access directly without any path.
if [ -z $CLANG_FORMAT_DIFF ]
then
CLANG_FORMAT_DIFF="clang-format-diff.py"
fi
# Check clang-format-diff.py
if ! which $CLANG_FORMAT_DIFF &> /dev/null
then
echo "You didn't have clang-format-diff.py available in your computer!"
echo "You can download it by running: "
echo " curl https://fburl.com/clang-format-diff"
exit 128
fi
# Check argparse, a library that clang-format-diff.py requires.
python 2>/dev/null << EOF
import argparse
EOF
if [ "$?" != 0 ]
then
echo "To run clang-format-diff.py, we'll need the library "argparse" to be"
echo "installed. You can try either of the follow ways to install it:"
echo " 1. Manually download argparse: https://pypi.python.org/pypi/argparse"
echo " 2. easy_install argparse (if you have easy_install)"
echo " 3. pip install argparse (if you have pip)"
exit 129
fi
# TODO(kailiu) following work is not complete since we still need to figure
# out how to add the modified files done pre-commit hook to git's commit index.
#
# Check if this script has already been added to pre-commit hook.
# Will suggest user to add this script to pre-commit hook if their pre-commit
# is empty.
# PRE_COMMIT_SCRIPT_PATH="`git rev-parse --show-toplevel`/.git/hooks/pre-commit"
# if ! ls $PRE_COMMIT_SCRIPT_PATH &> /dev/null
# then
# echo "Would you like to add this script to pre-commit hook, which will do "
# echo -n "the format check for all the affected lines before you check in (y/n):"
# read add_to_hook
# if [ "$add_to_hook" == "y" ]
# then
# ln -s `git rev-parse --show-toplevel`/build_tools/format-diff.sh $PRE_COMMIT_SCRIPT_PATH
# fi
# fi
# Check the format of recently changed lines,
diffs=$(git diff -U0 HEAD^ | $CLANG_FORMAT_DIFF -p 1)
if [ -z "$diffs" ]
then
echo "Nothing needs to be reformatted!"
exit 0
fi
# Highlight the insertion/deletion from the clang-format-diff.py's output
COLOR_END="\033[0m"
COLOR_RED="\033[0;31m"
COLOR_GREEN="\033[0;32m"
echo -e "Detect lines that doesn't follow the format rules:\r"
# Add the color to the diff. lines added will be green; lines removed will be red.
echo "$diffs" |
sed -e "s/\(^-.*$\)/`echo -e \"$COLOR_RED\1$COLOR_END\"`/" |
sed -e "s/\(^+.*$\)/`echo -e \"$COLOR_GREEN\1$COLOR_END\"`/"
echo -e "Would you like to fix the format automatically (y/n): \c"
# Make sure under any mode, we can read user input.
exec < /dev/tty
read to_fix
if [ "$to_fix" != "y" ]
then
exit 1
fi
# Do in-place format adjustment.
git diff -U0 HEAD^ | $CLANG_FORMAT_DIFF -i -p 1
......@@ -50,7 +50,7 @@ make release
--num=$NUM \
--writes=$NUM \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -68,7 +68,7 @@ make release
--num=$NUM \
--writes=$((NUM / 10)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -87,7 +87,7 @@ make release
--num=$NUM \
--writes=$NUM \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -106,7 +106,7 @@ make release
--num=$NUM \
--reads=$((NUM / 5)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
......@@ -126,7 +126,7 @@ make release
--num=$NUM \
--reads=$((NUM / 5)) \
--cache_size=104857600 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
......@@ -147,7 +147,7 @@ make release
--reads=$((NUM / 5)) \
--writes=512 \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--write_buffer_size=1000000000 \
--open_files=55000 \
......@@ -169,7 +169,7 @@ make release
--num=$((NUM / 4)) \
--writes=$((NUM / 4)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -179,6 +179,25 @@ make release
--sync=0 \
--threads=1 > /dev/null
# dummy test just to compact the data
./db_bench \
--benchmarks=readrandom \
--db=$DATA_DIR \
--use_existing_db=1 \
--bloom_bits=10 \
--num=$((NUM / 1000)) \
--reads=$((NUM / 1000)) \
--cache_size=6442450944 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
--histogram=1 \
--disable_data_sync=1 \
--disable_wal=1 \
--sync=0 \
--threads=16 > /dev/null
# measure readrandom after load with filluniquerandom with 6GB block cache
./db_bench \
--benchmarks=readrandom \
......@@ -188,7 +207,7 @@ make release
--num=$((NUM / 4)) \
--reads=$((NUM / 4)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
......@@ -200,6 +219,28 @@ make release
--sync=0 \
--threads=16 > ${STAT_FILE}.readrandom_filluniquerandom
# measure readwhilewriting after load with filluniquerandom with 6GB block cache
./db_bench \
--benchmarks=readwhilewriting \
--db=$DATA_DIR \
--use_existing_db=1 \
--bloom_bits=10 \
--num=$((NUM / 4)) \
--reads=$((NUM / 4)) \
--writes_per_second=1000 \
--write_buffer_size=100000000 \
--cache_size=6442450944 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
--statistics=1 \
--histogram=1 \
--disable_data_sync=1 \
--disable_wal=1 \
--sync=0 \
--threads=16 > ${STAT_FILE}.readwhilewriting
# measure memtable performance -- none of the data gets flushed to disk
./db_bench \
--benchmarks=fillrandom,readrandom, \
......@@ -208,7 +249,7 @@ make release
--num=$((NUM / 10)) \
--reads=$NUM \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--write_buffer_size=1000000000 \
--open_files=55000 \
......@@ -264,3 +305,4 @@ send_benchmark_to_ods readrandom readrandom_memtable_sst $STAT_FILE.readrandom_m
send_benchmark_to_ods readrandom readrandom_fillunique_random $STAT_FILE.readrandom_filluniquerandom
send_benchmark_to_ods fillrandom memtablefillrandom $STAT_FILE.memtablefillreadrandom
send_benchmark_to_ods readrandom memtablereadrandom $STAT_FILE.memtablefillreadrandom
send_benchmark_to_ods readwhilewriting readwhilewriting $STAT_FILE.readwhilewriting
......@@ -42,7 +42,7 @@ Status BuildTable(const std::string& dbname,
const Comparator* user_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const bool enable_compression) {
const CompressionType compression) {
Status s;
meta->file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0;
......@@ -65,7 +65,7 @@ Status BuildTable(const std::string& dbname,
}
TableBuilder* builder = GetTableBuilder(options, file.get(),
options.compression);
compression);
// the first key is the smallest key
Slice key = iter->key();
......
......@@ -43,6 +43,6 @@ extern Status BuildTable(const std::string& dbname,
const Comparator* user_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const bool enable_compression);
const CompressionType compression);
} // namespace rocksdb
......@@ -788,6 +788,10 @@ void rocksdb_env_set_background_threads(rocksdb_env_t* env, int n) {
env->rep->SetBackgroundThreads(n);
}
void rocksdb_env_set_high_priority_background_threads(rocksdb_env_t* env, int n) {
env->rep->SetBackgroundThreads(n, Env::HIGH);
}
void rocksdb_env_destroy(rocksdb_env_t* env) {
if (!env->is_default) delete env->rep;
delete env;
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction.h"
namespace rocksdb {
static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
uint64_t sum = 0;
for (size_t i = 0; i < files.size() && files[i]; i++) {
sum += files[i]->file_size;
}
return sum;
}
Compaction::Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes,
bool seek_compaction, bool enable_compression)
: level_(level),
out_level_(out_level),
max_output_file_size_(target_file_size),
maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes),
input_version_(input_version),
number_levels_(input_version_->NumberLevels()),
seek_compaction_(seek_compaction),
enable_compression_(enable_compression),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
base_index_(-1),
parent_index_(-1),
score_(0),
bottommost_level_(false),
is_full_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
input_version_->Ref();
edit_ = new VersionEdit();
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::~Compaction() {
delete edit_;
if (input_version_ != nullptr) {
input_version_->Unref();
}
}
bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
// If level_== out_level_, the purpose is to force compaction filter to be
// applied to that level, and thus cannot be a trivia move.
return (level_ != out_level_ &&
num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= maxGrandParentOverlapBytes_);
}
void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) {
edit->DeleteFile(level_ + which, inputs_[which][i]->number);
}
}
}
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
return bottommost_level_;
}
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
for (int lvl = level_ + 2; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) {
// We've advanced far enough
if (user_cmp->Compare(user_key, f->smallest.user_key()) >= 0) {
// Key falls in this file's range, so definitely not base level
return false;
}
break;
}
level_ptrs_[lvl]++;
}
}
return true;
}
bool Compaction::ShouldStopBefore(const Slice& internal_key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) {
if (seen_key_) {
overlapped_bytes_ += grandparents_[grandparent_index_]->file_size;
}
assert(grandparent_index_ + 1 >= grandparents_.size() ||
icmp->Compare(grandparents_[grandparent_index_]->largest.Encode(),
grandparents_[grandparent_index_+1]->smallest.Encode())
< 0);
grandparent_index_++;
}
seen_key_ = true;
if (overlapped_bytes_ > maxGrandParentOverlapBytes_) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;
} else {
return false;
}
}
// Mark (or clear) each file that is being compacted
void Compaction::MarkFilesBeingCompacted(bool value) {
for (int i = 0; i < 2; i++) {
std::vector<FileMetaData*> v = inputs_[i];
for (unsigned int j = 0; j < inputs_[i].size(); j++) {
assert(value ? !inputs_[i][j]->being_compacted :
inputs_[i][j]->being_compacted);
inputs_[i][j]->being_compacted = value;
}
}
}
// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) {
if (input_version_->vset_->options_->compaction_style ==
kCompactionStyleUniversal) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
// all files will be picked in a single compaction
// run. We can safely set bottommost_level_ = true.
// If it is not manual compaction, then bottommost_level_
// is already set when the Compaction was created.
if (isManual) {
bottommost_level_ = true;
}
return;
}
bottommost_level_ = true;
int num_levels = input_version_->vset_->NumberLevels();
for (int i = output_level() + 1; i < num_levels; i++) {
if (input_version_->NumLevelFiles(i) > 0) {
bottommost_level_ = false;
break;
}
}
}
void Compaction::ReleaseInputs() {
if (input_version_ != nullptr) {
input_version_->Unref();
input_version_ = nullptr;
}
}
void Compaction::ResetNextCompactionIndex() {
input_version_->ResetNextCompactionIndex(level_);
}
static void InputSummary(std::vector<FileMetaData*>& files, char* output,
int len) {
int write = 0;
for (unsigned int i = 0; i < files.size(); i++) {
int sz = len - write;
int ret = snprintf(output + write, sz, "%lu(%lu) ",
(unsigned long)files.at(i)->number,
(unsigned long)files.at(i)->file_size);
if (ret < 0 || ret >= sz)
break;
write += ret;
}
}
void Compaction::Summary(char* output, int len) {
int write = snprintf(output, len,
"Base version %lu Base level %d, seek compaction:%d, inputs:",
(unsigned long)input_version_->GetVersionNumber(),
level_,
seek_compaction_);
if (write < 0 || write > len) {
return;
}
char level_low_summary[100];
InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary));
char level_up_summary[100];
if (inputs_[1].size()) {
InputSummary(inputs_[1], level_up_summary, sizeof(level_up_summary));
} else {
level_up_summary[0] = '\0';
}
snprintf(output + write, len - write, "[%s],[%s]",
level_low_summary, level_up_summary);
}
} // namespace rocksdb
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "db/version_set.h"
namespace rocksdb {
class Version;
// A Compaction encapsulates information about a compaction.
class Compaction {
public:
~Compaction();
// Return the level that is being compacted. Inputs from "level"
// will be merged.
int level() const { return level_; }
// Outputs will go to this level
int output_level() const { return out_level_; }
// Return the object that holds the edits to the descriptor done
// by this compaction.
VersionEdit* edit() { return edit_; }
// "which" must be either 0 or 1
int num_input_files(int which) const { return inputs_[which].size(); }
// Return the ith input file at "level()+which" ("which" must be 0 or 1).
FileMetaData* input(int which, int i) const { return inputs_[which][i]; }
// Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
// Whether compression will be enabled for compaction outputs
bool enable_compression() const { return enable_compression_; }
// Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const;
// Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit);
// Returns true if the information we have available guarantees that
// the compaction is producing data in "level+1" for which no data exists
// in levels greater than "level+1".
bool IsBaseLevelForKey(const Slice& user_key);
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
// Release the input version for the compaction, once the compaction
// is successful.
void ReleaseInputs();
void Summary(char* output, int len);
// Return the score that was used to pick this compaction run.
double score() const { return score_; }
// Is this compaction creating a file in the bottom most level?
bool BottomMostLevel() { return bottommost_level_; }
// Does this compaction include all sst files?
bool IsFullCompaction() { return is_full_compaction_; }
private:
friend class Version;
friend class VersionSet;
Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
bool seek_compaction = false, bool enable_compression = true);
int level_;
int out_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t maxGrandParentOverlapBytes_;
Version* input_version_;
VersionEdit* edit_;
int number_levels_;
bool seek_compaction_;
bool enable_compression_;
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
// State used to check for number of of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
uint64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
int base_index_; // index of the file in files_[level_]
int parent_index_; // index of some file with same range in files_[level_+1]
double score_; // score that was used to pick this compaction.
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;
// Does this compaction include all sst files?
bool is_full_compaction_;
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
std::vector<size_t> level_ptrs_;
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool);
// Initialize whether compaction producing files at the bottommost level
void SetupBottomMostLevel(bool isManual);
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();
};
} // namespace rocksdb
......@@ -95,6 +95,8 @@ DEFINE_string(benchmarks,
"\tmergerandom -- same as updaterandom/appendrandom using merge"
" operator. "
"Must be used with merge_operator\n"
"\treadrandommergerandom -- perform N random read-or-merge "
"operations. Must be used with merge_operator\n"
"\tseekrandom -- N random seeks\n"
"\tcrc32c -- repeated crc32c of 4K of data\n"
"\tacquireload -- load N*1000 times\n"
......@@ -113,6 +115,11 @@ DEFINE_int64(numdistinct, 1000,
"read/write on fewer keys so that gets are more likely to find the"
" key and puts are more likely to update the same key");
DEFINE_int64(merge_keys, -1,
"Number of distinct keys to use for MergeRandom and "
"ReadRandomMergeRandom. "
"If negative, there will be FLAGS_num keys.");
DEFINE_int64(reads, -1, "Number of read operations to do. "
"If negative, do FLAGS_num reads.");
......@@ -298,6 +305,11 @@ DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
"default value 90 means 90% operations out of all reads and writes"
" operations are reads. In other words, 9 gets for every 1 put.");
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
" as percentage) for the ReadRandomMergeRandom workload. The"
" default value 70 means 70% out of all read and merge operations"
" are merges. In other words, 7 merges for every 3 gets.");
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
"deletes (used in RandomWithVerify only). RandomWithVerify "
"calculates writepercent as (100 - FLAGS_readwritepercent - "
......@@ -449,6 +461,9 @@ DEFINE_uint64(bytes_per_sync, rocksdb::Options().bytes_per_sync,
DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop"
" the delete if key not present");
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
" operations on a key in the memtable");
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
if (value < 0 || value>=2000000000) {
fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
......@@ -788,6 +803,7 @@ class Benchmark {
long long reads_;
long long writes_;
long long readwrites_;
long long merge_keys_;
int heap_counter_;
char keyFormat_[100]; // will contain the format of key. e.g "%016d"
void PrintHeader() {
......@@ -963,6 +979,7 @@ class Benchmark {
readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num :
((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)
),
merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
heap_counter_(0) {
std::vector<std::string> files;
FLAGS_env->GetChildren(FLAGS_db, &files);
......@@ -990,8 +1007,8 @@ class Benchmark {
}
unique_ptr<char []> GenerateKeyFromInt(long long v, const char* suffix = "") {
unique_ptr<char []> keyInStr(new char[kMaxKeySize]);
snprintf(keyInStr.get(), kMaxKeySize, keyFormat_, v, suffix);
unique_ptr<char []> keyInStr(new char[kMaxKeySize + 1]);
snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix);
return keyInStr;
}
......@@ -1092,6 +1109,14 @@ class Benchmark {
method = &Benchmark::ReadWhileWriting;
} else if (name == Slice("readrandomwriterandom")) {
method = &Benchmark::ReadRandomWriteRandom;
} else if (name == Slice("readrandommergerandom")) {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.ToString().c_str());
method = nullptr;
} else {
method = &Benchmark::ReadRandomMergeRandom;
}
} else if (name == Slice("updaterandom")) {
method = &Benchmark::UpdateRandom;
} else if (name == Slice("appendrandom")) {
......@@ -1427,6 +1452,7 @@ class Benchmark {
FLAGS_merge_operator.c_str());
exit(1);
}
options.max_successive_merges = FLAGS_max_successive_merges;
// set universal style compaction configurations, if applicable
if (FLAGS_universal_size_ratio != 0) {
......@@ -2381,13 +2407,16 @@ class Benchmark {
//
// For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
// to simulate random additions over 64-bit integers using merge.
//
// The number of merges on the same key can be controlled by adjusting
// FLAGS_merge_keys.
void MergeRandom(ThreadState* thread) {
RandomGenerator gen;
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
const long long k = thread->rand.Next() % merge_keys_;
unique_ptr<char []> key = GenerateKeyFromInt(k);
Status s = db_->Merge(write_options_, key.get(),
......@@ -2406,6 +2435,68 @@ class Benchmark {
thread->stats.AddMessage(msg);
}
// Read and merge random keys. The amount of reads and merges are controlled
// by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
// keys (and thus also the number of reads and merges on the same key) can be
// adjusted with FLAGS_merge_keys.
//
// As with MergeRandom, the merge operator to use should be defined by
// FLAGS_merge_operator.
void ReadRandomMergeRandom(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long long num_hits = 0;
long long num_gets = 0;
long long num_merges = 0;
size_t max_length = 0;
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % merge_keys_;
unique_ptr<char []> key = GenerateKeyFromInt(k);
bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
if (do_merge) {
Status s = db_->Merge(write_options_, key.get(),
gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
num_merges++;
} else {
Status s = db_->Get(options, key.get(), &value);
if (value.length() > max_length)
max_length = value.length();
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
} else if (!s.IsNotFound()) {
num_hits++;
}
num_gets++;
}
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg),
"(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)",
num_gets, num_merges, readwrites_, num_hits, max_length);
thread->stats.AddMessage(msg);
}
void Compact(ThreadState* thread) {
db_->CompactRange(nullptr, nullptr);
}
......
......@@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// Make a set of all of the live *.sst files
std::set<uint64_t> live;
versions_->AddLiveFilesCurrentVersion(&live);
versions_->current()->AddLiveFiles(&live);
ret.clear();
ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST
......
此差异已折叠。
......@@ -91,10 +91,17 @@ class DBImpl : public DB {
virtual Status GetDbIdentity(std::string& identity);
void RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
const Slice* end);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin, *end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
void TEST_CompactRange(int level,
const Slice* begin,
const Slice* end);
// Force current memtable contents to be flushed.
Status TEST_FlushMemTable();
......@@ -124,7 +131,7 @@ class DBImpl : public DB {
void TEST_PurgeObsoleteteWAL();
// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);}
uint64_t TEST_GetLevel0TotalSize();
void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL)
{
......@@ -292,7 +299,8 @@ class DBImpl : public DB {
// the superversion outside of mutex
Status MakeRoomForWrite(bool force /* compact even if there is room? */,
SuperVersion** superversion_to_free);
WriteBatch* BuildBatchGroup(Writer** last_writer);
void BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group);
// Force current memtable contents to be flushed.
Status FlushMemTable(const FlushOptions& options);
......@@ -405,7 +413,8 @@ class DBImpl : public DB {
// Information for a manual compaction
struct ManualCompaction {
int level;
int input_level;
int output_level;
bool done;
bool in_progress; // compaction request being processed?
const InternalKey* begin; // nullptr means beginning of key range
......@@ -590,4 +599,7 @@ extern Options SanitizeOptions(const std::string& db,
CompressionType GetCompressionType(const Options& options, int level,
const bool enable_compression);
// Determine compression type for L0 file written by memtable flush.
CompressionType GetCompressionFlush(const Options& options);
} // namespace rocksdb
......@@ -86,7 +86,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock();
VersionEdit edit(impl->NumberLevels());
VersionEdit edit;
Status s = impl->Recover(&edit, impl->GetMemTable(),
error_if_log_file_exist);
impl->mutex_.Unlock();
......
......@@ -65,13 +65,14 @@ void DBImpl::LogDBDeployStats() {
uint64_t file_total_size = 0;
uint32_t file_total_num = 0;
for (int i = 0; i < versions_->NumberLevels(); i++) {
file_total_num += versions_->NumLevelFiles(i);
file_total_size += versions_->NumLevelBytes(i);
Version* current = versions_->current();
for (int i = 0; i < current->NumberLevels(); i++) {
file_total_num += current->NumLevelFiles(i);
file_total_size += current->NumLevelBytes(i);
}
VersionSet::LevelSummaryStorage scratch;
const char* file_num_summary = versions_->LevelSummary(&scratch);
Version::LevelSummaryStorage scratch;
const char* file_num_summary = current->LevelSummary(&scratch);
std::string file_num_per_level(file_num_summary);
std::string data_size_per_level(file_num_summary);
......
......@@ -832,6 +832,156 @@ TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT));
}
TEST(DBTest, LevelLimitReopen) {
Options options = CurrentOptions();
Reopen(&options);
const std::string value(1024 * 1024, ' ');
int i = 0;
while (NumTableFilesAtLevel(2) == 0) {
ASSERT_OK(Put(Key(i++), value));
}
options.num_levels = 1;
options.max_bytes_for_level_multiplier_additional.resize(1, 1);
Status s = TryReopen(&options);
ASSERT_EQ(s.IsInvalidArgument(), true);
ASSERT_EQ(s.ToString(),
"Invalid argument: db has more levels than options.num_levels");
options.num_levels = 10;
options.max_bytes_for_level_multiplier_additional.resize(10, 1);
ASSERT_OK(TryReopen(&options));
}
TEST(DBTest, Preallocation) {
const std::string src = dbname_ + "/alloc_test";
unique_ptr<WritableFile> srcfile;
const EnvOptions soptions;
ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions));
srcfile->SetPreallocationBlockSize(1024 * 1024);
// No writes should mean no preallocation
size_t block_size, last_allocated_block;
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 0UL);
// Small write should preallocate one block
srcfile->Append("test");
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 1UL);
// Write an entire preallocation block, make sure we increased by two.
std::string buf(block_size, ' ');
srcfile->Append(buf);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 2UL);
// Write five more blocks at once, ensure we're where we need to be.
buf = std::string(block_size * 5, ' ');
srcfile->Append(buf);
srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
ASSERT_EQ(last_allocated_block, 7UL);
}
TEST(DBTest, PutDeleteGet) {
do {
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
ASSERT_OK(db_->Delete(WriteOptions(), "foo"));
ASSERT_EQ("NOT_FOUND", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetFromImmutableLayer) {
do {
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
Put("k1", std::string(100000, 'x')); // Fill memtable
Put("k2", std::string(100000, 'y')); // Trigger compaction
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(nullptr); // Release sync calls
} while (ChangeOptions());
}
TEST(DBTest, GetFromVersions) {
do {
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v1", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetSnapshot) {
do {
// Try with both a short key and a long key
for (int i = 0; i < 2; i++) {
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
ASSERT_OK(Put(key, "v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
db_->ReleaseSnapshot(s1);
}
} while (ChangeOptions());
}
TEST(DBTest, GetLevel0Ordering) {
do {
// Check that we process level-0 files in correct order. The code
// below generates two level-0 files where the earlier one comes
// before the later one in the level-0 file list since the earlier
// one has a smaller "smallest" key.
ASSERT_OK(Put("bar", "b"));
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_FlushMemTable();
ASSERT_OK(Put("foo", "v2"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v2", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetOrderedByLevels) {
do {
ASSERT_OK(Put("foo", "v1"));
Compact("a", "z");
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Put("foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v2", Get("foo"));
} while (ChangeOptions());
}
TEST(DBTest, GetPicksCorrectFile) {
do {
// Arrange to have multiple files in a non-level-0 level.
ASSERT_OK(Put("a", "va"));
Compact("a", "b");
ASSERT_OK(Put("x", "vx"));
Compact("x", "y");
ASSERT_OK(Put("f", "vf"));
Compact("f", "g");
ASSERT_EQ("va", Get("a"));
ASSERT_EQ("vf", Get("f"));
ASSERT_EQ("vx", Get("x"));
} while (ChangeOptions());
}
TEST(DBTest, GetEncountersEmptyLevel) {
do {
// Arrange for the following to happen:
......@@ -3325,34 +3475,46 @@ TEST(DBTest, ManualCompaction) {
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
MakeTables(3, "p", "q");
ASSERT_EQ("1,1,1", FilesPerLevel());
// iter - 0 with 7 levels
// iter - 1 with 3 levels
for (int iter = 0; iter < 2; ++iter) {
MakeTables(3, "p", "q");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls before files
Compact("", "c");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls before files
Compact("", "c");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls after files
Compact("r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range falls after files
Compact("r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel());
// Compaction range overlaps files
Compact("p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel());
// Compaction range overlaps files
Compact("p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel());
// Populate a different range
MakeTables(3, "c", "e");
ASSERT_EQ("1,1,2", FilesPerLevel());
// Populate a different range
MakeTables(3, "c", "e");
ASSERT_EQ("1,1,2", FilesPerLevel());
// Compact just the new range
Compact("b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel());
// Compact just the new range
Compact("b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel());
// Compact all
MakeTables(1, "a", "z");
ASSERT_EQ("0,1,2", FilesPerLevel());
db_->CompactRange(nullptr, nullptr);
ASSERT_EQ("0,0,1", FilesPerLevel());
if (iter == 0) {
Options options = CurrentOptions();
options.num_levels = 3;
options.create_if_missing = true;
DestroyAndReopen(&options);
}
}
// Compact all
MakeTables(1, "a", "z");
ASSERT_EQ("0,1,2", FilesPerLevel());
db_->CompactRange(nullptr, nullptr);
ASSERT_EQ("0,0,1", FilesPerLevel());
}
TEST(DBTest, DBOpen_Options) {
......@@ -3413,7 +3575,7 @@ TEST(DBTest, DBOpen_Change_NumLevels) {
opts.create_if_missing = false;
opts.num_levels = 2;
s = DB::Open(opts, dbname, &db);
ASSERT_TRUE(strstr(s.ToString().c_str(), "Corruption") != nullptr);
ASSERT_TRUE(strstr(s.ToString().c_str(), "Invalid argument") != nullptr);
ASSERT_TRUE(db == nullptr);
}
......@@ -4348,6 +4510,70 @@ TEST(DBTest, MultiThreaded) {
} while (ChangeOptions());
}
// Group commit test:
namespace {
static const int kGCNumThreads = 4;
static const int kGCNumKeys = 1000;
struct GCThread {
DB* db;
int id;
std::atomic<bool> done;
};
static void GCThreadBody(void* arg) {
GCThread* t = reinterpret_cast<GCThread*>(arg);
int id = t->id;
DB* db = t->db;
WriteOptions wo;
for (int i = 0; i < kGCNumKeys; ++i) {
std::string kv(std::to_string(i + id * kGCNumKeys));
ASSERT_OK(db->Put(wo, kv, kv));
}
t->done = true;
}
} // namespace
TEST(DBTest, GroupCommitTest) {
do {
// Start threads
GCThread thread[kGCNumThreads];
for (int id = 0; id < kGCNumThreads; id++) {
thread[id].id = id;
thread[id].db = db_;
thread[id].done = false;
env_->StartThread(GCThreadBody, &thread[id]);
}
for (int id = 0; id < kGCNumThreads; id++) {
while (thread[id].done == false) {
env_->SleepForMicroseconds(100000);
}
}
std::vector<std::string> expected_db;
for (int i = 0; i < kGCNumThreads * kGCNumKeys; ++i) {
expected_db.push_back(std::to_string(i));
}
sort(expected_db.begin(), expected_db.end());
Iterator* itr = db_->NewIterator(ReadOptions());
itr->SeekToFirst();
for (auto x : expected_db) {
ASSERT_TRUE(itr->Valid());
ASSERT_EQ(itr->key().ToString(), x);
ASSERT_EQ(itr->value().ToString(), x);
itr->Next();
}
ASSERT_TRUE(!itr->Valid());
delete itr;
} while (ChangeOptions());
}
namespace {
typedef std::map<std::string, std::string> KVMap;
}
......@@ -4892,7 +5118,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
EnvOptions sopt;
VersionSet vset(dbname, &options, sopt, nullptr, &cmp);
ASSERT_OK(vset.Recover());
VersionEdit vbase(vset.NumberLevels());
VersionEdit vbase;
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
......@@ -4904,7 +5130,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
uint64_t start_micros = env->NowMicros();
for (int i = 0; i < iters; i++) {
VersionEdit vedit(vset.NumberLevels());
VersionEdit vedit;
vedit.DeleteFile(2, fnum);
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);
......
......@@ -35,27 +35,22 @@ struct hash<rocksdb::Slice> {
namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp,
MemTableRepFactory* table_factory,
int numlevel,
const Options& options)
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
: comparator_(cmp),
refs_(0),
arena_impl_(options.arena_block_size),
table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)),
table_(options.memtable_factory->CreateMemTableRep(comparator_,
&arena_impl_)),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
edit_(numlevel),
first_seqno_(0),
mem_next_logfile_number_(0),
mem_logfile_number_(0),
locks_(options.inplace_update_support
? options.inplace_update_num_locks
: 0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0),
prefix_extractor_(options.prefix_extractor) {
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
options.memtable_prefix_bloom_probes));
}
......@@ -67,7 +62,7 @@ MemTable::~MemTable() {
size_t MemTable::ApproximateMemoryUsage() {
return arena_impl_.ApproximateMemoryUsage() +
table_->ApproximateMemoryUsage();
table_->ApproximateMemoryUsage();
}
int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr)
......@@ -98,12 +93,11 @@ class MemTableIterator: public Iterator {
MemTableIterator(const MemTable& mem, const ReadOptions& options)
: mem_(mem), iter_(), dynamic_prefix_seek_(false), valid_(false) {
if (options.prefix) {
iter_ = mem_.table_->GetPrefixIterator(*options.prefix);
iter_.reset(mem_.table_->GetPrefixIterator(*options.prefix));
} else if (options.prefix_seek) {
dynamic_prefix_seek_ = true;
iter_ = mem_.table_->GetDynamicPrefixIterator();
iter_.reset(mem_.table_->GetDynamicPrefixIterator());
} else {
iter_ = mem_.table_->GetIterator();
iter_.reset(mem_.table_->GetIterator());
}
}
......@@ -212,12 +206,12 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
Slice mem_key = key.memtable_key();
Slice user_key = key.user_key();
std::shared_ptr<MemTableRep::Iterator> iter;
std::unique_ptr<MemTableRep::Iterator> iter;
if (prefix_bloom_ &&
!prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
// iter is null if prefix bloom says the key does not exist
} else {
iter = table_->GetIterator(user_key);
iter.reset(table_->GetIterator(user_key));
iter->Seek(user_key, mem_key.data());
}
......@@ -328,11 +322,10 @@ void MemTable::Update(SequenceNumber seq,
LookupKey lkey(key, seq);
Slice mem_key = lkey.memtable_key();
std::shared_ptr<MemTableRep::Iterator> iter(
std::unique_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(lkey.user_key()));
iter->Seek(key, mem_key.data());
if (iter->Valid()) {
// entry format is:
// key_length varint32
......@@ -462,4 +455,37 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
// or key doesn't exist
return false;
}
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
Slice memkey = key.memtable_key();
// A total ordered iterator is costly for some memtablerep (prefix aware
// reps). By passing in the user key, we allow efficient iterator creation.
// The iterator only needs to be ordered within the same user key.
std::unique_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key()));
iter->Seek(key.user_key(), memkey.data());
size_t num_successive_merges = 0;
for (; iter->Valid(); iter->Next()) {
const char* entry = iter->key();
uint32_t key_length;
const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (!comparator_.comparator.user_comparator()->Compare(
Slice(iter_key_ptr, key_length - 8), key.user_key()) == 0) {
break;
}
const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
break;
}
++num_successive_merges;
}
return num_successive_merges;
}
} // namespace rocksdb
......@@ -35,11 +35,8 @@ class MemTable {
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(
const InternalKeyComparator& comparator,
MemTableRepFactory* table_factory,
int numlevel = 7,
const Options& options = Options());
explicit MemTable(const InternalKeyComparator& comparator,
const Options& options = Options());
~MemTable();
......@@ -123,6 +120,11 @@ class MemTable {
const Slice& delta,
const Options& options);
// Returns the number of successive merge entries starting from the newest
// entry for the key up to the last non-merge entry or last entry for the
// key in the memtable.
size_t CountSuccessiveMergeEntries(const LookupKey& key);
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
......@@ -157,7 +159,7 @@ class MemTable {
KeyComparator comparator_;
int refs_;
ArenaImpl arena_impl_;
shared_ptr<MemTableRep> table_;
unique_ptr<MemTableRep> table_;
// These are used to manage memtable flushes to storage
bool flush_in_progress_; // started the flush
......
......@@ -14,6 +14,7 @@
#include "rocksdb/merge_operator.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/write_batch_internal.h"
#include "utilities/merge_operators.h"
#include "util/testharness.h"
#include "utilities/utility_db.h"
......@@ -21,13 +22,52 @@
using namespace std;
using namespace rocksdb;
namespace {
int numMergeOperatorCalls;
std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) {
void resetNumMergeOperatorCalls() {
numMergeOperatorCalls = 0;
}
}
class CountMergeOperator : public AssociativeMergeOperator {
public:
CountMergeOperator() {
mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
}
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
++numMergeOperatorCalls;
return mergeOperator_->PartialMerge(
key,
*existing_value,
value,
new_value,
logger);
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
private:
std::shared_ptr<MergeOperator> mergeOperator_;
};
std::shared_ptr<DB> OpenDb(
const string& dbname,
const bool ttl = false,
const unsigned max_successive_merges = 0) {
DB* db;
StackableDB* sdb;
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
options.merge_operator = std::make_shared<CountMergeOperator>();
options.max_successive_merges = max_successive_merges;
Status s;
DestroyDB(dbname, Options());
if (ttl) {
......@@ -243,6 +283,67 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
}
}
void testSuccessiveMerge(
Counters& counters, int max_num_merges, int num_merges) {
counters.assert_remove("z");
uint64_t sum = 0;
for (int i = 1; i <= num_merges; ++i) {
resetNumMergeOperatorCalls();
counters.assert_add("z", i);
sum += i;
if (i % (max_num_merges + 1) == 0) {
assert(numMergeOperatorCalls == max_num_merges + 1);
} else {
assert(numMergeOperatorCalls == 0);
}
resetNumMergeOperatorCalls();
assert(counters.assert_get("z") == sum);
assert(numMergeOperatorCalls == i % (max_num_merges + 1));
}
}
void testSingleBatchSuccessiveMerge(
DB* db,
int max_num_merges,
int num_merges) {
assert(num_merges > max_num_merges);
Slice key("BatchSuccessiveMerge");
uint64_t merge_value = 1;
Slice merge_value_slice((char *)&merge_value, sizeof(merge_value));
// Create the batch
WriteBatch batch;
for (int i = 0; i < num_merges; ++i) {
batch.Merge(key, merge_value_slice);
}
// Apply to memtable and count the number of merges
resetNumMergeOperatorCalls();
{
Status s = db->Write(WriteOptions(), &batch);
assert(s.ok());
}
assert(numMergeOperatorCalls ==
num_merges - (num_merges % (max_num_merges + 1)));
// Get the value
resetNumMergeOperatorCalls();
string get_value_str;
{
Status s = db->Get(ReadOptions(), key, &get_value_str);
assert(s.ok());
}
assert(get_value_str.size() == sizeof(uint64_t));
uint64_t get_value = DecodeFixed64(&get_value_str[0]);
ASSERT_EQ(get_value, num_merges * merge_value);
ASSERT_EQ(numMergeOperatorCalls, (num_merges % (max_num_merges + 1)));
}
void runTest(int argc, const string& dbname, const bool use_ttl = false) {
auto db = OpenDb(dbname, use_ttl);
......@@ -265,6 +366,19 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
}
DestroyDB(dbname, Options());
db.reset();
{
cout << "Test merge in memtable... \n";
unsigned maxMerge = 5;
auto db = OpenDb(dbname, use_ttl, maxMerge);
MergeBasedCounters counters(db, 0);
testCounters(counters, db.get(), compact);
testSuccessiveMerge(counters, maxMerge, maxMerge * 2);
testSingleBatchSuccessiveMerge(db.get(), 5, 7);
DestroyDB(dbname, Options());
}
}
int main(int argc, char *argv[]) {
......
......@@ -58,7 +58,7 @@ class Repairer {
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, storage_options_, 10);
edit_ = new VersionEdit(options.num_levels);
edit_ = new VersionEdit();
}
~Repairer() {
......@@ -196,8 +196,7 @@ class Repairer {
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, options_.memtable_factory.get(),
options_.num_levels);
MemTable* mem = new MemTable(icmp_, options_);
mem->Ref();
int counter = 0;
while (reader.ReadRecord(&record, &scratch)) {
......@@ -225,7 +224,8 @@ class Repairer {
Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_, iter, &meta,
icmp_.user_comparator(), 0, 0, true);
icmp_.user_comparator(), 0, 0,
kNoCompression);
delete iter;
delete mem->Unref();
mem = nullptr;
......
......@@ -33,6 +33,7 @@ enum Tag {
void VersionEdit::Clear() {
comparator_.clear();
max_level_ = 0;
log_number_ = 0;
prev_log_number_ = 0;
last_sequence_ = 0;
......@@ -105,14 +106,13 @@ static bool GetInternalKey(Slice* input, InternalKey* dst) {
bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) {
uint32_t v;
if (GetVarint32(input, &v) &&
(int)v < number_levels_) {
if (GetVarint32(input, &v)) {
*level = v;
if (max_level_ < *level) {
max_level_ = *level;
}
return true;
} else {
if ((int)v >= number_levels_) {
*msg = "db already has more levels than options.num_levels";
}
return false;
}
}
......
......@@ -41,10 +41,7 @@ struct FileMetaData {
class VersionEdit {
public:
explicit VersionEdit(int number_levels) :
number_levels_(number_levels) {
Clear();
}
VersionEdit() { Clear(); }
~VersionEdit() { }
void Clear();
......@@ -115,7 +112,7 @@ class VersionEdit {
bool GetLevel(Slice* input, int* level, const char** msg);
int number_levels_;
int max_level_;
std::string comparator_;
uint64_t log_number_;
uint64_t prev_log_number_;
......@@ -127,9 +124,9 @@ class VersionEdit {
bool has_next_file_number_;
bool has_last_sequence_;
std::vector<std::pair<int, InternalKey>> compact_pointers_;
std::vector<std::pair<int, InternalKey> > compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
std::vector<std::pair<int, FileMetaData> > new_files_;
};
} // namespace rocksdb
......@@ -15,7 +15,7 @@ namespace rocksdb {
static void TestEncodeDecode(const VersionEdit& edit) {
std::string encoded, encoded2;
edit.EncodeTo(&encoded);
VersionEdit parsed(7);
VersionEdit parsed;
Status s = parsed.DecodeFrom(encoded);
ASSERT_TRUE(s.ok()) << s.ToString();
parsed.EncodeTo(&encoded2);
......@@ -27,7 +27,7 @@ class VersionEditTest { };
TEST(VersionEditTest, EncodeDecode) {
static const uint64_t kBig = 1ull << 50;
VersionEdit edit(7);
VersionEdit edit;
for (int i = 0; i < 4; i++) {
TestEncodeDecode(edit);
edit.AddFile(3, kBig + 300 + i, kBig + 400 + i,
......
此差异已折叠。
......@@ -27,6 +27,7 @@
#include "db/version_edit.h"
#include "port/port.h"
#include "db/table_cache.h"
#include "db/compaction.h"
namespace rocksdb {
......@@ -86,6 +87,11 @@ class Version {
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
// Updates internal structures that keep track of compaction scores
// We use compaction scores to figure out which compaction to do next
// Also pre-sorts level0 files for Get()
void Finalize(std::vector<uint64_t>& size_being_compacted);
// Reference count management (so Versions do not disappear out from
// under live iterators)
void Ref();
......@@ -137,15 +143,45 @@ class Version {
int PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key);
int NumFiles(int level) const { return files_[level].size(); }
int NumberLevels() const { return num_levels_; }
// REQUIRES: lock is held
int NumLevelFiles(int level) const { return files_[level].size(); }
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;
// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
struct LevelSummaryStorage {
char buffer[100];
};
struct FileSummaryStorage {
char buffer[1000];
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;
// Return a human-readable short (single-line) summary of files
// in a specified level. Uses *scratch as backing store.
const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const;
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t MaxNextLevelOverlappingBytes();
// Add all files listed in the current version to *live.
void AddLiveFiles(std::set<uint64_t>* live);
// Return a human readable string that describes this version's contents.
std::string DebugString(bool hex = false) const;
// Returns the version nuber of this version
uint64_t GetVersionNumber() {
return version_number_;
}
uint64_t GetVersionNumber() const { return version_number_; }
// used to sort files by size
struct Fsize {
int index;
FileMetaData* file;
};
private:
friend class Compaction;
......@@ -159,10 +195,15 @@ class Version {
bool PrefixMayMatch(const ReadOptions& options, const EnvOptions& soptions,
const Slice& internal_prefix, Iterator* level_iter) const;
// Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first.
void UpdateFilesBySize();
VersionSet* vset_; // VersionSet to which this Version belongs
Version* next_; // Next version in linked list
Version* prev_; // Previous version in linked list
int refs_; // Number of live refs to this version
int num_levels_; // Number of levels
// List of files per level, files in each level are arranged
// in increasing order of keys
......@@ -199,9 +240,6 @@ class Version {
double max_compaction_score_; // max score in l1 to ln-1
int max_compaction_score_level_; // level on which max score occurs
// The offset in the manifest file where this version is stored.
uint64_t offset_manifest_file_;
// A version number that uniquely represents this version. This is
// used for debugging and logging purposes only.
uint64_t version_number_;
......@@ -223,10 +261,8 @@ class Version {
class VersionSet {
public:
VersionSet(const std::string& dbname,
const Options* options,
const EnvOptions& storage_options,
TableCache* table_cache,
VersionSet(const std::string& dbname, const Options* options,
const EnvOptions& storage_options, TableCache* table_cache,
const InternalKeyComparator*);
~VersionSet();
......@@ -236,7 +272,7 @@ class VersionSet {
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(VersionEdit* edit, port::Mutex* mu,
bool new_descriptor_log = false);
bool new_descriptor_log = false);
// Recover the last saved descriptor from persistent storage.
Status Recover();
......@@ -252,6 +288,12 @@ class VersionSet {
// Return the current version.
Version* current() const { return current_; }
// A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files.
bool NeedSlowdownForNumLevel0Files() const {
return need_slowdown_for_num_level0_files_;
}
// Return the current manifest file number
uint64_t ManifestFileNumber() const { return manifest_file_number_; }
......@@ -267,12 +309,6 @@ class VersionSet {
}
}
// Return the number of Table files at the specified level.
int NumLevelFiles(int level) const;
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;
// Return the last sequence number.
uint64_t LastSequence() const {
return last_sequence_.load(std::memory_order_acquire);
......@@ -306,14 +342,18 @@ class VersionSet {
// the specified level. Returns nullptr if there is nothing in that
// level that overlaps the specified range. Caller should delete
// the result.
Compaction* CompactRange(
int level,
const InternalKey* begin,
const InternalKey* end);
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64_t MaxNextLevelOverlappingBytes();
//
// The returned Compaction might not include the whole requested range.
// In that case, compaction_end will be set to the next key that needs
// compacting. In case the compaction will compact the whole range,
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
Compaction* CompactRange(int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
// Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed.
......@@ -358,37 +398,16 @@ class VersionSet {
// Add all files listed in any live version to *live.
void AddLiveFiles(std::vector<uint64_t>* live_list);
// Add all files listed in the current version to *live.
void AddLiveFilesCurrentVersion(std::set<uint64_t>* live);
// Return the approximate offset in the database of the data for
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);
// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
struct LevelSummaryStorage {
char buffer[100];
};
struct FileSummaryStorage {
char buffer[1000];
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;
// printf contents (for debugging)
Status DumpManifest(Options& options, std::string& manifestFileName,
bool verbose, bool hex = false);
// Return a human-readable short (single-line) summary of the data size
// of files per level. Uses *scratch as backing store.
const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const;
// Return a human-readable short (single-line) summary of files
// in a specified level. Uses *scratch as backing store.
const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const;
// Return the size of the current manifest file
const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; }
uint64_t ManifestFileSize() const { return manifest_file_size_; }
// For the specfied level, pick a compaction.
// Returns nullptr if there is no compaction to be done.
......@@ -415,16 +434,6 @@ class VersionSet {
// pick the same files to compact.
bool VerifyCompactionFileConsistency(Compaction* c);
// used to sort files by size
typedef struct fsize {
int index;
FileMetaData* file;
} Fsize;
// Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first.
void UpdateFilesBySize(Version *v);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level);
......@@ -447,8 +456,6 @@ class VersionSet {
void Init(int num_levels);
void Finalize(Version* v, std::vector<uint64_t>&);
void GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest,
InternalKey* largest);
......@@ -491,6 +498,10 @@ class VersionSet {
Version dummy_versions_; // Head of circular doubly-linked list of versions.
Version* current_; // == dummy_versions_.prev_
// A flag indicating whether we should delay writes because
// we have too many level 0 files
bool need_slowdown_for_num_level0_files_;
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string* compact_pointer_;
......@@ -510,9 +521,8 @@ class VersionSet {
// Queue of writers to the manifest file
std::deque<ManifestWriter*> manifest_writers_;
// Store the manifest file size when it is checked.
// Save us the cost of checking file size twice in LogAndApply
uint64_t last_observed_manifest_size_;
// Current size of manifest file
uint64_t manifest_file_size_;
std::vector<FileMetaData*> obsolete_files_;
......@@ -542,118 +552,4 @@ class VersionSet {
VersionEdit* edit, port::Mutex* mu);
};
// A Compaction encapsulates information about a compaction.
class Compaction {
public:
~Compaction();
// Return the level that is being compacted. Inputs from "level"
// will be merged.
int level() const { return level_; }
// Outputs will go to this level
int output_level() const { return out_level_; }
// Return the object that holds the edits to the descriptor done
// by this compaction.
VersionEdit* edit() { return edit_; }
// "which" must be either 0 or 1
int num_input_files(int which) const { return inputs_[which].size(); }
// Return the ith input file at "level()+which" ("which" must be 0 or 1).
FileMetaData* input(int which, int i) const { return inputs_[which][i]; }
// Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
// Whether compression will be enabled for compaction outputs
bool enable_compression() const { return enable_compression_; }
// Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const;
// Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit);
// Returns true if the information we have available guarantees that
// the compaction is producing data in "level+1" for which no data exists
// in levels greater than "level+1".
bool IsBaseLevelForKey(const Slice& user_key);
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
// Release the input version for the compaction, once the compaction
// is successful.
void ReleaseInputs();
void Summary(char* output, int len);
// Return the score that was used to pick this compaction run.
double score() const { return score_; }
// Is this compaction creating a file in the bottom most level?
bool BottomMostLevel() { return bottommost_level_; }
// Does this compaction include all sst files?
bool IsFullCompaction() { return is_full_compaction_; }
private:
friend class Version;
friend class VersionSet;
explicit Compaction(int level, int out_level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, int number_levels,
bool seek_compaction = false, bool enable_compression = true);
int level_;
int out_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t maxGrandParentOverlapBytes_;
Version* input_version_;
VersionEdit* edit_;
int number_levels_;
bool seek_compaction_;
bool enable_compression_;
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
// State used to check for number of of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
std::vector<FileMetaData*> grandparents_;
size_t grandparent_index_; // Index in grandparent_starts_
bool seen_key_; // Some output key has been seen
uint64_t overlapped_bytes_; // Bytes of overlap between current output
// and grandparent files
int base_index_; // index of the file in files_[level_]
int parent_index_; // index of some file with same range in files_[level_+1]
double score_; // score that was used to pick this compaction.
// Is this compaction creating a file in the bottom most level?
bool bottommost_level_;
// Does this compaction include all sst files?
bool is_full_compaction_;
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
std::vector<size_t> level_ptrs_;
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool);
// Initialize whether compaction producing files at the bottommost level
void SetupBottomMostLevel(bool isManual);
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();
};
} // namespace rocksdb
......@@ -25,7 +25,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
}
Version* current_version = current_;
int current_levels = NumberLevels();
int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) {
return Status::OK();
......@@ -36,7 +36,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
int first_nonempty_level = -1;
int first_nonempty_level_filenum = 0;
for (int i = new_levels - 1; i < current_levels; i++) {
int file_num = NumLevelFiles(i);
int file_num = current_version->NumLevelFiles(i);
if (file_num != 0) {
if (first_nonempty_level < 0) {
first_nonempty_level = i;
......@@ -65,6 +65,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
delete[] current_version->files_;
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
delete[] compact_pointer_;
delete[] max_file_size_;
......@@ -72,8 +73,8 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) {
num_levels_ = new_levels;
compact_pointer_ = new std::string[new_levels];
Init(new_levels);
VersionEdit ve(new_levels);
st = LogAndApply(&ve , mu, true);
VersionEdit ve;
st = LogAndApply(&ve, mu, true);
return st;
}
......
......@@ -21,6 +21,7 @@
#include "rocksdb/write_batch.h"
#include "rocksdb/options.h"
#include "rocksdb/merge_operator.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/memtable.h"
......@@ -234,7 +235,62 @@ class MemTableInserter : public WriteBatch::Handler {
}
virtual void Merge(const Slice& key, const Slice& value) {
mem_->Add(sequence_, kTypeMerge, key, value);
bool perform_merge = false;
if (options_->max_successive_merges > 0 && db_ != nullptr) {
LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head
// of the key in the memtable
size_t num_merges = mem_->CountSuccessiveMergeEntries(lkey);
if (num_merges >= options_->max_successive_merges) {
perform_merge = true;
}
}
if (perform_merge) {
// 1) Get the existing value
std::string get_value;
// Pass in the sequence number so that we also include previous merge
// operations in the same batch.
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions read_options;
read_options.snapshot = &read_from_snapshot;
db_->Get(read_options, key, &get_value);
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = options_->merge_operator.get();
assert(merge_operator);
std::deque<std::string> operands;
operands.push_front(value.ToString());
std::string new_value;
if (!merge_operator->FullMerge(key,
&get_value_slice,
operands,
&new_value,
options_->info_log.get())) {
// Failed to merge!
RecordTick(options_->statistics.get(), NUMBER_MERGE_FAILURES);
// Store the delta in memtable
perform_merge = false;
} else {
// 3) Add value to memtable
mem_->Add(sequence_, kTypeValue, key, new_value);
}
}
if (!perform_merge) {
// Add merge operator to memtable
mem_->Add(sequence_, kTypeMerge, key, value);
}
sequence_++;
}
virtual void Delete(const Slice& key) {
......
......@@ -22,10 +22,11 @@ namespace rocksdb {
static std::string PrintContents(WriteBatch* b) {
InternalKeyComparator cmp(BytewiseComparator());
auto factory = std::make_shared<SkipListFactory>();
MemTable* mem = new MemTable(cmp, factory.get());
Options options;
options.memtable_factory = factory;
MemTable* mem = new MemTable(cmp, options);
mem->Ref();
std::string state;
Options options;
Status s = WriteBatchInternal::InsertInto(b, mem, &options);
int count = 0;
Iterator* iter = mem->NewIterator();
......
......@@ -311,6 +311,7 @@ extern void rocksdb_cache_destroy(rocksdb_cache_t* cache);
extern rocksdb_env_t* rocksdb_create_default_env();
extern void rocksdb_env_set_background_threads(rocksdb_env_t* env, int n);
extern void rocksdb_env_set_high_priority_background_threads(rocksdb_env_t* env, int n);
extern void rocksdb_env_destroy(rocksdb_env_t*);
/* Universal Compaction options */
......
......@@ -199,6 +199,7 @@ class DB {
uint64_t* sizes) = 0;
// Compact the underlying storage for the key range [*begin,*end].
// The actual compaction interval might be superset of [*begin, *end].
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
......
......@@ -111,27 +111,23 @@ class MemTableRep {
};
// Return an iterator over the keys in this representation.
virtual std::shared_ptr<Iterator> GetIterator() = 0;
virtual Iterator* GetIterator() = 0;
// Return an iterator over at least the keys with the specified user key. The
// iterator may also allow access to other keys, but doesn't have to. Default:
// GetIterator().
virtual std::shared_ptr<Iterator> GetIterator(const Slice& user_key) {
return GetIterator();
}
virtual Iterator* GetIterator(const Slice& user_key) { return GetIterator(); }
// Return an iterator over at least the keys with the specified prefix. The
// iterator may also allow access to other keys, but doesn't have to. Default:
// GetIterator().
virtual std::shared_ptr<Iterator> GetPrefixIterator(const Slice& prefix) {
virtual Iterator* GetPrefixIterator(const Slice& prefix) {
return GetIterator();
}
// Return an iterator that has a special Seek semantics. The result of
// a Seek might only include keys with the same prefix as the target key.
virtual std::shared_ptr<Iterator> GetDynamicPrefixIterator() {
return GetIterator();
}
virtual Iterator* GetDynamicPrefixIterator() { return GetIterator(); }
protected:
// When *key is an internal key concatenated with the value, returns the
......@@ -144,8 +140,8 @@ class MemTableRep {
class MemTableRepFactory {
public:
virtual ~MemTableRepFactory() { };
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena*) = 0;
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&,
Arena*) = 0;
virtual const char* Name() const = 0;
};
......@@ -161,8 +157,8 @@ class VectorRepFactory : public MemTableRepFactory {
const size_t count_;
public:
explicit VectorRepFactory(size_t count = 0) : count_(count) { }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena*) override;
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&,
Arena*) override;
virtual const char* Name() const override {
return "VectorRepFactory";
}
......@@ -171,8 +167,8 @@ public:
// This uses a skip list to store keys. It is the default.
class SkipListFactory : public MemTableRepFactory {
public:
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena*) override;
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&,
Arena*) override;
virtual const char* Name() const override {
return "SkipListFactory";
}
......@@ -206,8 +202,8 @@ class TransformRepFactory : public MemTableRepFactory {
virtual ~TransformRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena*) override;
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&,
Arena*) override;
virtual const char* Name() const override {
return "TransformRepFactory";
......@@ -247,8 +243,8 @@ public:
: TransformRepFactory(prefix_extractor, bucket_count, num_locks)
{ }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator&, Arena*) override;
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator&,
Arena*) override;
virtual const char* Name() const override {
return "PrefixHashRepFactory";
......
......@@ -44,15 +44,15 @@ using std::shared_ptr;
enum CompressionType : char {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZlibCompression = 0x2,
kBZip2Compression = 0x3
};
enum CompactionStyle : char {
kCompactionStyleLevel = 0x0, // level based compaction style
kCompactionStyleUniversal = 0x1 // Universal compaction style
kCompactionStyleLevel = 0x0, // level based compaction style
kCompactionStyleUniversal = 0x1 // Universal compaction style
};
// Compression options for different compression algorithms like Zlib
......@@ -60,12 +60,9 @@ struct CompressionOptions {
int window_bits;
int level;
int strategy;
CompressionOptions():window_bits(-14),
level(-1),
strategy(0){}
CompressionOptions(int wbits, int lev, int strategy):window_bits(wbits),
level(lev),
strategy(strategy){}
CompressionOptions() : window_bits(-14), level(-1), strategy(0) {}
CompressionOptions(int wbits, int lev, int strategy)
: window_bits(wbits), level(lev), strategy(strategy) {}
};
// Options to control the behavior of a database (passed to DB::Open)
......@@ -218,7 +215,6 @@ struct Options {
// Default: 16
int block_restart_interval;
// Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically.
//
......@@ -249,7 +245,7 @@ struct Options {
// java/C api hard to construct.
std::vector<CompressionType> compression_per_level;
//different options for compression algorithms
// different options for compression algorithms
CompressionOptions compression_opts;
// If non-nullptr, use the specified filter policy to reduce disk reads.
......@@ -328,7 +324,6 @@ struct Options {
// will be 20MB, total file size for level-2 will be 200MB,
// and total file size for level-3 will be 2GB.
// by default 'max_bytes_for_level_base' is 10MB.
uint64_t max_bytes_for_level_base;
// by default 'max_bytes_for_level_base' is 10.
......@@ -486,10 +481,19 @@ struct Options {
// order.
int table_cache_remove_scan_count_limit;
// size of one block in arena memory allocation.
// If <= 0, a proper value is automatically calculated (usually 1/10 of
// Size of one block in arena memory allocation.
//
// If <= 0, a proper value is automatically calculated (usually about 1/10 of
// writer_buffer_size).
//
// There are two additonal restriction of the The specified size:
// (1) size should be in the range of [4096, 2 << 30] and
// (2) be the multiple of the CPU word (which helps with the memory
// alignment).
//
// We'll automatically check and adjust the size number to make sure it
// conforms to the restrictions.
//
// Default: 0
size_t arena_block_size;
......@@ -574,7 +578,12 @@ struct Options {
// Specify the file access pattern once a compaction is started.
// It will be applied to all input files of a compaction.
// Default: NORMAL
enum { NONE, NORMAL, SEQUENTIAL, WILLNEED } access_hint_on_compaction_start;
enum {
NONE,
NORMAL,
SEQUENTIAL,
WILLNEED
} access_hint_on_compaction_start;
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not
......@@ -641,7 +650,6 @@ struct Options {
// Default: 10000, if inplace_update_support = true, else 0.
size_t inplace_update_num_locks;
// * existing_value - pointer to previous value (from both memtable and sst).
// nullptr if key doesn't exist
// * existing_value_size - sizeof(existing_value). 0 if key doesn't exist
......@@ -681,6 +689,17 @@ struct Options {
// number of hash probes per key
uint32_t memtable_prefix_bloom_probes;
// Maximum number of successive merge operations on a key in the memtable.
//
// When a merge operation is added to the memtable and the maximum number of
// successive merges is reached, the value of the key will be calculated and
// inserted into the memtable instead of the merge operation. This will
// ensure that there are never more than max_successive_merges merge
// operations in the memtable.
//
// Default: 0 (disabled)
size_t max_successive_merges;
};
//
......@@ -691,7 +710,7 @@ struct Options {
// the block cache. It will not page in data from the OS cache or data that
// resides in storage.
enum ReadTier {
kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage
kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage
kBlockCacheTier = 0x1 // data in memtable or block cache
};
......@@ -744,13 +763,14 @@ struct ReadOptions {
prefix_seek(false),
snapshot(nullptr),
prefix(nullptr),
read_tier(kReadAllTier) {
}
ReadOptions(bool cksum, bool cache) :
verify_checksums(cksum), fill_cache(cache),
prefix_seek(false), snapshot(nullptr), prefix(nullptr),
read_tier(kReadAllTier) {
}
read_tier(kReadAllTier) {}
ReadOptions(bool cksum, bool cache)
: verify_checksums(cksum),
fill_cache(cache),
prefix_seek(false),
snapshot(nullptr),
prefix(nullptr),
read_tier(kReadAllTier) {}
};
// Options that control write operations
......@@ -777,10 +797,7 @@ struct WriteOptions {
// and the write may got lost after a crash.
bool disableWAL;
WriteOptions()
: sync(false),
disableWAL(false) {
}
WriteOptions() : sync(false), disableWAL(false) {}
};
// Options that control flush operations
......@@ -789,9 +806,7 @@ struct FlushOptions {
// Default: true
bool wait;
FlushOptions()
: wait(true) {
}
FlushOptions() : wait(true) {}
};
} // namespace rocksdb
......
......@@ -88,7 +88,10 @@ class WriteBatch {
Status Iterate(Handler* handler) const;
// Retrieve the serialized version of this batch.
std::string Data() { return rep_; }
const std::string& Data() const { return rep_; }
// Retrieve data size of the batch.
size_t GetDataSize() const { return rep_.size(); }
// Returns the number of updates in the batch
int Count() const;
......
......@@ -31,6 +31,14 @@ struct BackupableDBOptions {
// Default: nullptr
Env* backup_env;
// If share_table_files == true, backup will assume that table files with
// same name have the same contents. This enables incremental backups and
// avoids unnecessary data copies.
// If share_table_files == false, each backup will be on its own and will
// not share any data with other backups.
// default: true
bool share_table_files;
// Backup info and error messages will be written to info_log
// if non-nullptr.
// Default: nullptr
......@@ -49,6 +57,7 @@ struct BackupableDBOptions {
explicit BackupableDBOptions(const std::string& _backup_dir,
Env* _backup_env = nullptr,
bool _share_table_files = true,
Logger* _info_log = nullptr,
bool _sync = true,
bool _destroy_old_data = false) :
......@@ -93,6 +102,14 @@ class BackupableDB : public StackableDB {
Status PurgeOldBackups(uint32_t num_backups_to_keep);
// deletes a specific backup
Status DeleteBackup(BackupID backup_id);
// Call this from another thread if you want to stop the backup
// that is currently happening. It will return immediatelly, will
// not wait for the backup to stop.
// The backup will stop ASAP and the call to CreateNewBackup will
// return Status::Incomplete(). It will not clean up after itself, but
// the state will remain consistent. The state will be cleaned up
// next time you create BackupableDB or RestoreBackupableDB.
void StopBackup();
private:
BackupEngine* backup_engine_;
......@@ -108,9 +125,10 @@ class RestoreBackupableDB {
void GetBackupInfo(std::vector<BackupInfo>* backup_info);
// restore from backup with backup_id
// IMPORTANT -- if you restore from some backup that is not the latest,
// and you start creating new backups from the new DB, all the backups
// that were newer than the backup you restored from will be deleted
// IMPORTANT -- if options_.share_table_files == true and you restore DB
// from some backup that is not the latest, and you start creating new
// backups from the new DB, all the backups that were newer than the
// backup you restored from will be deleted
//
// Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3.
// If you try creating a new backup now, old backups 4 and 5 will be deleted
......
......@@ -394,7 +394,9 @@ class MemTableConstructor: public Constructor {
: Constructor(cmp),
internal_comparator_(cmp),
table_factory_(new SkipListFactory) {
memtable_ = new MemTable(internal_comparator_, table_factory_.get());
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, options);
memtable_->Ref();
}
~MemTableConstructor() {
......@@ -402,7 +404,9 @@ class MemTableConstructor: public Constructor {
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
delete memtable_->Unref();
memtable_ = new MemTable(internal_comparator_, table_factory_.get());
Options memtable_options;
memtable_options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, memtable_options);
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
......@@ -1381,10 +1385,11 @@ class MemTableTest { };
TEST(MemTableTest, Simple) {
InternalKeyComparator cmp(BytewiseComparator());
auto table_factory = std::make_shared<SkipListFactory>();
MemTable* memtable = new MemTable(cmp, table_factory.get());
Options options;
options.memtable_factory = table_factory;
MemTable* memtable = new MemTable(cmp, options);
memtable->Ref();
WriteBatch batch;
Options options;
WriteBatchInternal::SetSequence(&batch, 100);
batch.Put(std::string("k1"), std::string("v1"));
batch.Put(std::string("k2"), std::string("v2"));
......
......@@ -8,71 +8,86 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/arena_impl.h"
#include <algorithm>
namespace rocksdb {
ArenaImpl::ArenaImpl(size_t block_size) {
if (block_size < kMinBlockSize) {
block_size_ = kMinBlockSize;
} else if (block_size > kMaxBlockSize) {
block_size_ = kMaxBlockSize;
} else {
block_size_ = block_size;
const size_t ArenaImpl::kMinBlockSize = 4096;
const size_t ArenaImpl::kMaxBlockSize = 2 << 30;
static const int kAlignUnit = sizeof(void*);
size_t OptimizeBlockSize(size_t block_size) {
// Make sure block_size is in optimal range
block_size = std::max(ArenaImpl::kMinBlockSize, block_size);
block_size = std::min(ArenaImpl::kMaxBlockSize, block_size);
// make sure block_size is the multiple of kAlignUnit
if (block_size % kAlignUnit != 0) {
block_size = (1 + block_size / kAlignUnit) * kAlignUnit;
}
blocks_memory_ = 0;
alloc_ptr_ = nullptr; // First allocation will allocate a block
alloc_bytes_remaining_ = 0;
return block_size;
}
ArenaImpl::ArenaImpl(size_t block_size)
: kBlockSize(OptimizeBlockSize(block_size)) {
assert(kBlockSize >= kMinBlockSize && kBlockSize <= kMaxBlockSize &&
kBlockSize % kAlignUnit == 0);
}
ArenaImpl::~ArenaImpl() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
for (const auto& block : blocks_) {
delete[] block;
}
}
char* ArenaImpl::AllocateFallback(size_t bytes) {
if (bytes > block_size_ / 4) {
char* ArenaImpl::AllocateFallback(size_t bytes, bool aligned) {
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
return AllocateNewBlock(bytes);
}
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(block_size_);
alloc_bytes_remaining_ = block_size_;
auto block_head = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize - bytes;
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
if (aligned) {
aligned_alloc_ptr_ = block_head + bytes;
unaligned_alloc_ptr_ = block_head + kBlockSize;
return block_head;
} else {
aligned_alloc_ptr_ = block_head;
unaligned_alloc_ptr_ = block_head + kBlockSize - bytes;
return unaligned_alloc_ptr_;
}
}
char* ArenaImpl::AllocateAligned(size_t bytes) {
const int align = sizeof(void*); // We'll align to pointer size
assert((align & (align-1)) == 0); // Pointer size should be a power of 2
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
assert((kAlignUnit & (kAlignUnit - 1)) ==
0); // Pointer size should be a power of 2
size_t current_mod =
reinterpret_cast<uintptr_t>(aligned_alloc_ptr_) & (kAlignUnit - 1);
size_t slop = (current_mod == 0 ? 0 : kAlignUnit - current_mod);
size_t needed = bytes + slop;
char* result;
if (needed <= alloc_bytes_remaining_) {
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
result = aligned_alloc_ptr_ + slop;
aligned_alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
result = AllocateFallback(bytes, true /* aligned */);
}
assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
assert((reinterpret_cast<uintptr_t>(result) & (kAlignUnit - 1)) == 0);
return result;
}
char* ArenaImpl::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
char* block = new char[block_bytes];
blocks_memory_ += block_bytes;
blocks_.push_back(result);
return result;
blocks_.push_back(block);
return block;
}
} // namespace rocksdb
......@@ -22,49 +22,54 @@ namespace rocksdb {
class ArenaImpl : public Arena {
public:
// No copying allowed
ArenaImpl(const ArenaImpl&) = delete;
void operator=(const ArenaImpl&) = delete;
static const size_t kMinBlockSize;
static const size_t kMaxBlockSize;
explicit ArenaImpl(size_t block_size = kMinBlockSize);
virtual ~ArenaImpl();
virtual char* Allocate(size_t bytes);
virtual char* Allocate(size_t bytes) override;
virtual char* AllocateAligned(size_t bytes);
virtual char* AllocateAligned(size_t bytes) override;
// Returns an estimate of the total memory usage of data allocated
// by the arena (including space allocated but not yet used for user
// by the arena (exclude the space allocated but not yet used for future
// allocations).
//
// TODO: Do we need to exclude space allocated but not used?
virtual const size_t ApproximateMemoryUsage() {
return blocks_memory_ + blocks_.capacity() * sizeof(char*);
return blocks_memory_ + blocks_.capacity() * sizeof(char*) -
alloc_bytes_remaining_;
}
virtual const size_t MemoryAllocatedBytes() {
virtual const size_t MemoryAllocatedBytes() override {
return blocks_memory_;
}
private:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);
static const size_t kMinBlockSize = 4096;
static const size_t kMaxBlockSize = 2 << 30;
// Number of bytes allocated in one block
size_t block_size_;
// Allocation state
char* alloc_ptr_;
size_t alloc_bytes_remaining_;
const size_t kBlockSize;
// Array of new[] allocated memory blocks
std::vector<char*> blocks_;
typedef std::vector<char*> Blocks;
Blocks blocks_;
// Stats for current active block.
// For each block, we allocate aligned memory chucks from one end and
// allocate unaligned memory chucks from the other end. Otherwise the
// memory waste for alignment will be higher if we allocate both types of
// memory from one direction.
char* unaligned_alloc_ptr_ = nullptr;
char* aligned_alloc_ptr_ = nullptr;
// How many bytes left in currently active block?
size_t alloc_bytes_remaining_ = 0;
char* AllocateFallback(size_t bytes, bool aligned);
char* AllocateNewBlock(size_t block_bytes);
// Bytes of memory in blocks allocated so far
size_t blocks_memory_;
// No copying allowed
ArenaImpl(const ArenaImpl&);
void operator=(const ArenaImpl&);
size_t blocks_memory_ = 0;
};
inline char* ArenaImpl::Allocate(size_t bytes) {
......@@ -73,12 +78,16 @@ inline char* ArenaImpl::Allocate(size_t bytes) {
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
unaligned_alloc_ptr_ -= bytes;
alloc_bytes_remaining_ -= bytes;
return result;
return unaligned_alloc_ptr_;
}
return AllocateFallback(bytes);
return AllocateFallback(bytes, false /* unaligned */);
}
// check and adjust the block_size so that the return value is
// 1. in the range of [kMinBlockSize, kMaxBlockSize].
// 2. the multiple of align unit.
extern size_t OptimizeBlockSize(size_t block_size);
} // namespace rocksdb
......@@ -57,8 +57,34 @@ TEST(ArenaImplTest, MemoryAllocatedBytes) {
ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated);
}
// Make sure we didn't count the allocate but not used memory space in
// Arena::ApproximateMemoryUsage()
TEST(ArenaImplTest, ApproximateMemoryUsageTest) {
const size_t kBlockSize = 4096;
const size_t kEntrySize = kBlockSize / 8;
const size_t kZero = 0;
ArenaImpl arena(kBlockSize);
ASSERT_EQ(kZero, arena.ApproximateMemoryUsage());
auto num_blocks = kBlockSize / kEntrySize;
// first allocation
arena.AllocateAligned(kEntrySize);
auto mem_usage = arena.MemoryAllocatedBytes();
ASSERT_EQ(mem_usage, kBlockSize);
auto usage = arena.ApproximateMemoryUsage();
ASSERT_LT(usage, mem_usage);
for (size_t i = 1; i < num_blocks; ++i) {
arena.AllocateAligned(kEntrySize);
ASSERT_EQ(mem_usage, arena.MemoryAllocatedBytes());
ASSERT_EQ(arena.ApproximateMemoryUsage(), usage + kEntrySize);
usage = arena.ApproximateMemoryUsage();
}
ASSERT_GT(usage, mem_usage);
}
TEST(ArenaImplTest, Simple) {
std::vector<std::pair<size_t, char*> > allocated;
std::vector<std::pair<size_t, char*>> allocated;
ArenaImpl arena_impl;
const int N = 100000;
size_t bytes = 0;
......@@ -68,8 +94,9 @@ TEST(ArenaImplTest, Simple) {
if (i % (N / 10) == 0) {
s = i;
} else {
s = rnd.OneIn(4000) ? rnd.Uniform(6000) :
(rnd.OneIn(10) ? rnd.Uniform(100) : rnd.Uniform(20));
s = rnd.OneIn(4000)
? rnd.Uniform(6000)
: (rnd.OneIn(10) ? rnd.Uniform(100) : rnd.Uniform(20));
}
if (s == 0) {
// Our arena disallows size 0 allocations.
......@@ -89,7 +116,7 @@ TEST(ArenaImplTest, Simple) {
bytes += s;
allocated.push_back(std::make_pair(s, r));
ASSERT_GE(arena_impl.ApproximateMemoryUsage(), bytes);
if (i > N/10) {
if (i > N / 10) {
ASSERT_LE(arena_impl.ApproximateMemoryUsage(), bytes * 1.10);
}
}
......
......@@ -66,17 +66,15 @@ class HashLinkListRep : public MemTableRep {
virtual ~HashLinkListRep();
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() override;
virtual MemTableRep::Iterator* GetIterator() override;
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator(
const Slice& slice) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetPrefixIterator(
const Slice& prefix) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetDynamicPrefixIterator()
virtual MemTableRep::Iterator* GetPrefixIterator(const Slice& prefix)
override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override;
private:
friend class DynamicIterator;
typedef SkipList<const char*, MemTableRep::KeyComparator&> FullList;
......@@ -298,8 +296,6 @@ class HashLinkListRep : public MemTableRep {
virtual void SeekToLast() { }
private:
};
std::shared_ptr<EmptyIterator> empty_iterator_;
};
HashLinkListRep::HashLinkListRep(MemTableRep::KeyComparator& compare,
......@@ -308,9 +304,7 @@ HashLinkListRep::HashLinkListRep(MemTableRep::KeyComparator& compare,
: bucket_size_(bucket_size),
transform_(transform),
compare_(compare),
arena_(arena),
empty_iterator_(std::make_shared<EmptyIterator>()) {
arena_(arena) {
char* mem = arena_->AllocateAligned(
sizeof(port::AtomicPointer) * bucket_size);
......@@ -389,7 +383,7 @@ size_t HashLinkListRep::ApproximateMemoryUsage() {
return 0;
}
std::shared_ptr<MemTableRep::Iterator> HashLinkListRep::GetIterator() {
MemTableRep::Iterator* HashLinkListRep::GetIterator() {
auto list = new FullList(compare_, arena_);
for (size_t i = 0; i < bucket_size_; ++i) {
auto bucket = GetBucket(i);
......@@ -400,26 +394,24 @@ std::shared_ptr<MemTableRep::Iterator> HashLinkListRep::GetIterator() {
}
}
}
return std::make_shared<FullListIterator>(list);
return new FullListIterator(list);
}
std::shared_ptr<MemTableRep::Iterator> HashLinkListRep::GetPrefixIterator(
MemTableRep::Iterator* HashLinkListRep::GetPrefixIterator(
const Slice& prefix) {
auto bucket = GetBucket(prefix);
if (bucket == nullptr) {
return empty_iterator_;
return new EmptyIterator();
}
return std::make_shared<Iterator>(this, bucket);
return new Iterator(this, bucket);
}
std::shared_ptr<MemTableRep::Iterator> HashLinkListRep::GetIterator(
const Slice& slice) {
MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) {
return GetPrefixIterator(transform_->Transform(slice));
}
std::shared_ptr<MemTableRep::Iterator>
HashLinkListRep::GetDynamicPrefixIterator() {
return std::make_shared<DynamicIterator>(*this);
MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator() {
return new DynamicIterator(*this);
}
bool HashLinkListRep::BucketContains(Node* head, const Key& key) const {
......@@ -450,10 +442,9 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
} // anon namespace
std::shared_ptr<MemTableRep> HashLinkListRepFactory::CreateMemTableRep(
MemTableRep* HashLinkListRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<HashLinkListRep>(compare, arena, transform_,
bucket_count_);
return new HashLinkListRep(compare, arena, transform_, bucket_count_);
}
MemTableRepFactory* NewHashLinkListRepFactory(
......
......@@ -22,8 +22,8 @@ class HashLinkListRepFactory : public MemTableRepFactory {
virtual ~HashLinkListRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) override;
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator& compare,
Arena* arena) override;
virtual const char* Name() const override {
return "HashLinkListRepFactory";
......
......@@ -33,17 +33,15 @@ class HashSkipListRep : public MemTableRep {
virtual ~HashSkipListRep();
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator() override;
virtual MemTableRep::Iterator* GetIterator() override;
virtual std::shared_ptr<MemTableRep::Iterator> GetIterator(
const Slice& slice) override;
virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetPrefixIterator(
const Slice& prefix) override;
virtual std::shared_ptr<MemTableRep::Iterator> GetDynamicPrefixIterator()
virtual MemTableRep::Iterator* GetPrefixIterator(const Slice& prefix)
override;
virtual MemTableRep::Iterator* GetDynamicPrefixIterator() override;
private:
friend class DynamicIterator;
typedef SkipList<const char*, MemTableRep::KeyComparator&> Bucket;
......@@ -216,22 +214,18 @@ class HashSkipListRep : public MemTableRep {
virtual void SeekToLast() { }
private:
};
std::shared_ptr<EmptyIterator> empty_iterator_;
};
HashSkipListRep::HashSkipListRep(MemTableRep::KeyComparator& compare,
Arena* arena, const SliceTransform* transform,
size_t bucket_size, int32_t skiplist_height,
int32_t skiplist_branching_factor)
: bucket_size_(bucket_size),
skiplist_height_(skiplist_height),
skiplist_branching_factor_(skiplist_branching_factor),
transform_(transform),
compare_(compare),
arena_(arena),
empty_iterator_(std::make_shared<EmptyIterator>()) {
: bucket_size_(bucket_size),
skiplist_height_(skiplist_height),
skiplist_branching_factor_(skiplist_branching_factor),
transform_(transform),
compare_(compare),
arena_(arena) {
buckets_ = new port::AtomicPointer[bucket_size];
for (size_t i = 0; i < bucket_size_; ++i) {
......@@ -276,7 +270,7 @@ size_t HashSkipListRep::ApproximateMemoryUsage() {
return sizeof(buckets_);
}
std::shared_ptr<MemTableRep::Iterator> HashSkipListRep::GetIterator() {
MemTableRep::Iterator* HashSkipListRep::GetIterator() {
auto list = new Bucket(compare_, arena_);
for (size_t i = 0; i < bucket_size_; ++i) {
auto bucket = GetBucket(i);
......@@ -287,35 +281,31 @@ std::shared_ptr<MemTableRep::Iterator> HashSkipListRep::GetIterator() {
}
}
}
return std::make_shared<Iterator>(list);
return new Iterator(list);
}
std::shared_ptr<MemTableRep::Iterator> HashSkipListRep::GetPrefixIterator(
const Slice& prefix) {
MemTableRep::Iterator* HashSkipListRep::GetPrefixIterator(const Slice& prefix) {
auto bucket = GetBucket(prefix);
if (bucket == nullptr) {
return empty_iterator_;
return new EmptyIterator();
}
return std::make_shared<Iterator>(bucket, false);
return new Iterator(bucket, false);
}
std::shared_ptr<MemTableRep::Iterator> HashSkipListRep::GetIterator(
const Slice& slice) {
MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) {
return GetPrefixIterator(transform_->Transform(slice));
}
std::shared_ptr<MemTableRep::Iterator>
HashSkipListRep::GetDynamicPrefixIterator() {
return std::make_shared<DynamicIterator>(*this);
MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator() {
return new DynamicIterator(*this);
}
} // anon namespace
std::shared_ptr<MemTableRep> HashSkipListRepFactory::CreateMemTableRep(
MemTableRep* HashSkipListRepFactory::CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) {
return std::make_shared<HashSkipListRep>(compare, arena, transform_,
bucket_count_, skiplist_height_,
skiplist_branching_factor_);
return new HashSkipListRep(compare, arena, transform_, bucket_count_,
skiplist_height_, skiplist_branching_factor_);
}
MemTableRepFactory* NewHashSkipListRepFactory(
......
......@@ -26,8 +26,8 @@ class HashSkipListRepFactory : public MemTableRepFactory {
virtual ~HashSkipListRepFactory() { delete transform_; }
virtual std::shared_ptr<MemTableRep> CreateMemTableRep(
MemTableRep::KeyComparator& compare, Arena* arena) override;
virtual MemTableRep* CreateMemTableRep(MemTableRep::KeyComparator& compare,
Arena* arena) override;
virtual const char* Name() const override {
return "HashSkipListRepFactory";
......
......@@ -1024,7 +1024,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
}
int max = -1;
for (int i = 0; i < versions.NumberLevels(); i++) {
if (versions.NumLevelFiles(i)) {
if (versions.current()->NumLevelFiles(i)) {
max = i;
}
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册