提交 151f9e14 编写于 作者: I Igor Canadi

Merge branch 'master' into columnfamilies

......@@ -50,7 +50,7 @@ make release
--num=$NUM \
--writes=$NUM \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -68,7 +68,7 @@ make release
--num=$NUM \
--writes=$((NUM / 10)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -87,7 +87,7 @@ make release
--num=$NUM \
--writes=$NUM \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -106,7 +106,7 @@ make release
--num=$NUM \
--reads=$((NUM / 5)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
......@@ -126,7 +126,7 @@ make release
--num=$NUM \
--reads=$((NUM / 5)) \
--cache_size=104857600 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
......@@ -147,7 +147,7 @@ make release
--reads=$((NUM / 5)) \
--writes=512 \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--write_buffer_size=1000000000 \
--open_files=55000 \
......@@ -169,7 +169,7 @@ make release
--num=$((NUM / 4)) \
--writes=$((NUM / 4)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
......@@ -179,6 +179,25 @@ make release
--sync=0 \
--threads=1 > /dev/null
# dummy test just to compact the data
./db_bench \
--benchmarks=readrandom \
--db=$DATA_DIR \
--use_existing_db=1 \
--bloom_bits=10 \
--num=$((NUM / 1000)) \
--reads=$((NUM / 1000)) \
--cache_size=6442450944 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--statistics=1 \
--histogram=1 \
--disable_data_sync=1 \
--disable_wal=1 \
--sync=0 \
--threads=16 > /dev/null
# measure readrandom after load with filluniquerandom with 6GB block cache
./db_bench \
--benchmarks=readrandom \
......@@ -188,7 +207,7 @@ make release
--num=$((NUM / 4)) \
--reads=$((NUM / 4)) \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
......@@ -200,6 +219,28 @@ make release
--sync=0 \
--threads=16 > ${STAT_FILE}.readrandom_filluniquerandom
# measure readwhilewriting after load with filluniquerandom with 6GB block cache
./db_bench \
--benchmarks=readwhilewriting \
--db=$DATA_DIR \
--use_existing_db=1 \
--bloom_bits=10 \
--num=$((NUM / 4)) \
--reads=$((NUM / 4)) \
--writes_per_second=1000 \
--write_buffer_size=100000000 \
--cache_size=6442450944 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--open_files=55000 \
--disable_seek_compaction=1 \
--statistics=1 \
--histogram=1 \
--disable_data_sync=1 \
--disable_wal=1 \
--sync=0 \
--threads=16 > ${STAT_FILE}.readwhilewriting
# measure memtable performance -- none of the data gets flushed to disk
./db_bench \
--benchmarks=fillrandom,readrandom, \
......@@ -208,7 +249,7 @@ make release
--num=$((NUM / 10)) \
--reads=$NUM \
--cache_size=6442450944 \
--cache_numshardbits=4 \
--cache_numshardbits=6 \
--table_cache_numshardbits=4 \
--write_buffer_size=1000000000 \
--open_files=55000 \
......@@ -264,3 +305,4 @@ send_benchmark_to_ods readrandom readrandom_memtable_sst $STAT_FILE.readrandom_m
send_benchmark_to_ods readrandom readrandom_fillunique_random $STAT_FILE.readrandom_filluniquerandom
send_benchmark_to_ods fillrandom memtablefillrandom $STAT_FILE.memtablefillreadrandom
send_benchmark_to_ods readrandom memtablereadrandom $STAT_FILE.memtablefillreadrandom
send_benchmark_to_ods readwhilewriting readwhilewriting $STAT_FILE.readwhilewriting
......@@ -42,7 +42,7 @@ Status BuildTable(const std::string& dbname,
const Comparator* user_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const bool enable_compression) {
const CompressionType compression) {
Status s;
meta->file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0;
......@@ -65,7 +65,7 @@ Status BuildTable(const std::string& dbname,
}
TableBuilder* builder = GetTableBuilder(options, file.get(),
options.compression);
compression);
// the first key is the smallest key
Slice key = iter->key();
......
......@@ -43,6 +43,6 @@ extern Status BuildTable(const std::string& dbname,
const Comparator* user_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const bool enable_compression);
const CompressionType compression);
} // namespace rocksdb
......@@ -788,6 +788,10 @@ void rocksdb_env_set_background_threads(rocksdb_env_t* env, int n) {
env->rep->SetBackgroundThreads(n);
}
void rocksdb_env_set_high_priority_background_threads(rocksdb_env_t* env, int n) {
env->rep->SetBackgroundThreads(n, Env::HIGH);
}
void rocksdb_env_destroy(rocksdb_env_t* env) {
if (!env->is_default) delete env->rep;
delete env;
......
......@@ -94,6 +94,8 @@ DEFINE_string(benchmarks,
"\tmergerandom -- same as updaterandom/appendrandom using merge"
" operator. "
"Must be used with merge_operator\n"
"\treadrandommergerandom -- perform N random read-or-merge "
"operations. Must be used with merge_operator\n"
"\tseekrandom -- N random seeks\n"
"\tcrc32c -- repeated crc32c of 4K of data\n"
"\tacquireload -- load N*1000 times\n"
......@@ -112,6 +114,11 @@ DEFINE_int64(numdistinct, 1000,
"read/write on fewer keys so that gets are more likely to find the"
" key and puts are more likely to update the same key");
DEFINE_int64(merge_keys, -1,
"Number of distinct keys to use for MergeRandom and "
"ReadRandomMergeRandom. "
"If negative, there will be FLAGS_num keys.");
DEFINE_int64(reads, -1, "Number of read operations to do. "
"If negative, do FLAGS_num reads.");
......@@ -297,6 +304,11 @@ DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed"
"default value 90 means 90% operations out of all reads and writes"
" operations are reads. In other words, 9 gets for every 1 put.");
DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed"
" as percentage) for the ReadRandomMergeRandom workload. The"
" default value 70 means 70% out of all read and merge operations"
" are merges. In other words, 7 merges for every 3 gets.");
DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
"deletes (used in RandomWithVerify only). RandomWithVerify "
"calculates writepercent as (100 - FLAGS_readwritepercent - "
......@@ -446,6 +458,9 @@ DEFINE_uint64(bytes_per_sync, rocksdb::Options().bytes_per_sync,
DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop"
" the delete if key not present");
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
" operations on a key in the memtable");
static bool ValidatePrefixSize(const char* flagname, int32_t value) {
if (value < 0 || value>=2000000000) {
fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n",
......@@ -784,6 +799,7 @@ class Benchmark {
long long reads_;
long long writes_;
long long readwrites_;
long long merge_keys_;
int heap_counter_;
char keyFormat_[100]; // will contain the format of key. e.g "%016d"
void PrintHeader() {
......@@ -958,6 +974,7 @@ class Benchmark {
readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num :
((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads)
),
merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
heap_counter_(0) {
std::vector<std::string> files;
FLAGS_env->GetChildren(FLAGS_db, &files);
......@@ -985,8 +1002,8 @@ class Benchmark {
}
unique_ptr<char []> GenerateKeyFromInt(long long v, const char* suffix = "") {
unique_ptr<char []> keyInStr(new char[kMaxKeySize]);
snprintf(keyInStr.get(), kMaxKeySize, keyFormat_, v, suffix);
unique_ptr<char []> keyInStr(new char[kMaxKeySize + 1]);
snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix);
return keyInStr;
}
......@@ -1087,6 +1104,14 @@ class Benchmark {
method = &Benchmark::ReadWhileWriting;
} else if (name == Slice("readrandomwriterandom")) {
method = &Benchmark::ReadRandomWriteRandom;
} else if (name == Slice("readrandommergerandom")) {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.ToString().c_str());
method = nullptr;
} else {
method = &Benchmark::ReadRandomMergeRandom;
}
} else if (name == Slice("updaterandom")) {
method = &Benchmark::UpdateRandom;
} else if (name == Slice("appendrandom")) {
......@@ -1421,6 +1446,7 @@ class Benchmark {
FLAGS_merge_operator.c_str());
exit(1);
}
options.max_successive_merges = FLAGS_max_successive_merges;
// set universal style compaction configurations, if applicable
if (FLAGS_universal_size_ratio != 0) {
......@@ -2375,13 +2401,16 @@ class Benchmark {
//
// For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8
// to simulate random additions over 64-bit integers using merge.
//
// The number of merges on the same key can be controlled by adjusting
// FLAGS_merge_keys.
void MergeRandom(ThreadState* thread) {
RandomGenerator gen;
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % FLAGS_num;
const long long k = thread->rand.Next() % merge_keys_;
unique_ptr<char []> key = GenerateKeyFromInt(k);
Status s = db_->Merge(write_options_, key.get(),
......@@ -2400,6 +2429,68 @@ class Benchmark {
thread->stats.AddMessage(msg);
}
// Read and merge random keys. The amount of reads and merges are controlled
// by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct
// keys (and thus also the number of reads and merges on the same key) can be
// adjusted with FLAGS_merge_keys.
//
// As with MergeRandom, the merge operator to use should be defined by
// FLAGS_merge_operator.
void ReadRandomMergeRandom(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
long long num_hits = 0;
long long num_gets = 0;
long long num_merges = 0;
size_t max_length = 0;
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const long long k = thread->rand.Next() % merge_keys_;
unique_ptr<char []> key = GenerateKeyFromInt(k);
bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
if (do_merge) {
Status s = db_->Merge(write_options_, key.get(),
gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
num_merges++;
} else {
Status s = db_->Get(options, key.get(), &value);
if (value.length() > max_length)
max_length = value.length();
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
} else if (!s.IsNotFound()) {
num_hits++;
}
num_gets++;
}
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg),
"(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)",
num_gets, num_merges, readwrites_, num_hits, max_length);
thread->stats.AddMessage(msg);
}
void Compact(ThreadState* thread) {
db_->CompactRange(nullptr, nullptr);
}
......
......@@ -228,6 +228,28 @@ CompressionType GetCompressionType(const Options& options, int level,
}
}
CompressionType GetCompressionFlush(const Options& options) {
// Compressing memtable flushes might not help unless the sequential load
// optimization is used for leveled compaction. Otherwise the CPU and
// latency overhead is not offset by saving much space.
bool can_compress;
if (options.compaction_style == kCompactionStyleUniversal) {
can_compress =
(options.compaction_options_universal.compression_size_percent < 0);
} else {
// For leveled compress when min_level_to_compress == 0.
can_compress = (GetCompressionType(options, 0, true) != kNoCompression);
}
if (can_compress) {
return options.compression;
} else {
return kNoCompression;
}
}
DBImpl::DBImpl(const Options& options, const std::string& dbname)
: env_(options.env),
dbname_(dbname),
......@@ -1086,7 +1108,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, true);
earliest_seqno_in_memtable,
GetCompressionFlush(options_));
LogFlush(options_.info_log);
mutex_.Lock();
}
......@@ -1147,15 +1170,11 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
Log(options_.info_log,
"Level-0 flush table #%lu: started",
(unsigned long)meta.number);
// We skip compression if universal compression is used and the size
// threshold is set for compression.
bool enable_compression = (options_.compaction_style
!= kCompactionStyleUniversal ||
options_.compaction_options_universal.compression_size_percent < 0);
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, enable_compression);
earliest_seqno_in_memtable, GetCompressionFlush(options_));
LogFlush(options_.info_log);
delete iter;
Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s",
......@@ -2092,11 +2111,11 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
if (s.ok() && !options_.disableDataSync) {
if (options_.use_fsync) {
StopWatch sw(env_, options_.statistics.get(),
COMPACTION_OUTFILE_SYNC_MICROS);
COMPACTION_OUTFILE_SYNC_MICROS, false);
s = compact->outfile->Fsync();
} else {
StopWatch sw(env_, options_.statistics.get(),
COMPACTION_OUTFILE_SYNC_MICROS);
COMPACTION_OUTFILE_SYNC_MICROS, false);
s = compact->outfile->Sync();
}
}
......@@ -2725,7 +2744,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
bool* value_found) {
Status s;
StopWatch sw(env_, options_.statistics.get(), DB_GET);
StopWatch sw(env_, options_.statistics.get(), DB_GET, false);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
......@@ -2795,7 +2814,7 @@ std::vector<Status> DBImpl::MultiGet(
const std::vector<ColumnFamilyHandle>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET);
StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET, false);
SequenceNumber snapshot;
std::vector<MemTable*> to_delete;
......@@ -3001,7 +3020,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
w.disableWAL = options.disableWAL;
w.done = false;
StopWatch sw(env_, options_.statistics.get(), DB_WRITE);
StopWatch sw(env_, options_.statistics.get(), DB_WRITE, false);
mutex_.Lock();
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
......
......@@ -628,4 +628,7 @@ extern Options SanitizeOptions(const std::string& db,
CompressionType GetCompressionType(const Options& options, int level,
const bool enable_compression);
// Determine compression type for L0 file written by memtable flush.
CompressionType GetCompressionFlush(const Options& options);
} // namespace rocksdb
......@@ -329,4 +329,37 @@ bool MemTable::Update(SequenceNumber seq, ValueType type,
// Key doesn't exist
return false;
}
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
Slice memkey = key.memtable_key();
// A total ordered iterator is costly for some memtablerep (prefix aware
// reps). By passing in the user key, we allow efficient iterator creation.
// The iterator only needs to be ordered within the same user key.
std::shared_ptr<MemTableRep::Iterator> iter(
table_->GetIterator(key.user_key()));
iter->Seek(memkey.data());
size_t num_successive_merges = 0;
for (; iter->Valid(); iter->Next()) {
const char* entry = iter->key();
uint32_t key_length;
const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (!comparator_.comparator.user_comparator()->Compare(
Slice(iter_key_ptr, key_length - 8), key.user_key()) == 0) {
break;
}
const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
if (static_cast<ValueType>(tag & 0xff) != kTypeMerge) {
break;
}
++num_successive_merges;
}
return num_successive_merges;
}
} // namespace rocksdb
......@@ -107,6 +107,11 @@ class MemTable {
const Slice& key,
const Slice& value);
// Returns the number of successive merge entries starting from the newest
// entry for the key up to the last non-merge entry or last entry for the
// key in the memtable.
size_t CountSuccessiveMergeEntries(const LookupKey& key);
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
......
......@@ -14,6 +14,7 @@
#include "rocksdb/merge_operator.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/write_batch_internal.h"
#include "utilities/merge_operators.h"
#include "util/testharness.h"
#include "utilities/utility_db.h"
......@@ -21,13 +22,52 @@
using namespace std;
using namespace rocksdb;
namespace {
int numMergeOperatorCalls;
std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) {
void resetNumMergeOperatorCalls() {
numMergeOperatorCalls = 0;
}
}
class CountMergeOperator : public AssociativeMergeOperator {
public:
CountMergeOperator() {
mergeOperator_ = MergeOperators::CreateUInt64AddOperator();
}
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
++numMergeOperatorCalls;
return mergeOperator_->PartialMerge(
key,
*existing_value,
value,
new_value,
logger);
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
private:
std::shared_ptr<MergeOperator> mergeOperator_;
};
std::shared_ptr<DB> OpenDb(
const string& dbname,
const bool ttl = false,
const unsigned max_successive_merges = 0) {
DB* db;
StackableDB* sdb;
Options options;
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
options.merge_operator = std::make_shared<CountMergeOperator>();
options.max_successive_merges = max_successive_merges;
Status s;
DestroyDB(dbname, Options());
if (ttl) {
......@@ -243,6 +283,67 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) {
}
}
void testSuccessiveMerge(
Counters& counters, int max_num_merges, int num_merges) {
counters.assert_remove("z");
uint64_t sum = 0;
for (int i = 1; i <= num_merges; ++i) {
resetNumMergeOperatorCalls();
counters.assert_add("z", i);
sum += i;
if (i % (max_num_merges + 1) == 0) {
assert(numMergeOperatorCalls == max_num_merges + 1);
} else {
assert(numMergeOperatorCalls == 0);
}
resetNumMergeOperatorCalls();
assert(counters.assert_get("z") == sum);
assert(numMergeOperatorCalls == i % (max_num_merges + 1));
}
}
void testSingleBatchSuccessiveMerge(
DB* db,
int max_num_merges,
int num_merges) {
assert(num_merges > max_num_merges);
Slice key("BatchSuccessiveMerge");
uint64_t merge_value = 1;
Slice merge_value_slice((char *)&merge_value, sizeof(merge_value));
// Create the batch
WriteBatch batch;
for (int i = 0; i < num_merges; ++i) {
batch.Merge(key, merge_value_slice);
}
// Apply to memtable and count the number of merges
resetNumMergeOperatorCalls();
{
Status s = db->Write(WriteOptions(), &batch);
assert(s.ok());
}
assert(numMergeOperatorCalls ==
num_merges - (num_merges % (max_num_merges + 1)));
// Get the value
resetNumMergeOperatorCalls();
string get_value_str;
{
Status s = db->Get(ReadOptions(), key, &get_value_str);
assert(s.ok());
}
assert(get_value_str.size() == sizeof(uint64_t));
uint64_t get_value = DecodeFixed64(&get_value_str[0]);
ASSERT_EQ(get_value, num_merges * merge_value);
ASSERT_EQ(numMergeOperatorCalls, (num_merges % (max_num_merges + 1)));
}
void runTest(int argc, const string& dbname, const bool use_ttl = false) {
auto db = OpenDb(dbname, use_ttl);
......@@ -265,6 +366,19 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
}
DestroyDB(dbname, Options());
db.reset();
{
cout << "Test merge in memtable... \n";
unsigned maxMerge = 5;
auto db = OpenDb(dbname, use_ttl, maxMerge);
MergeBasedCounters counters(db, 0);
testCounters(counters, db.get(), compact);
testSuccessiveMerge(counters, maxMerge, maxMerge * 2);
testSingleBatchSuccessiveMerge(db.get(), 5, 7);
DestroyDB(dbname, Options());
}
}
int main(int argc, char *argv[]) {
......
......@@ -225,7 +225,8 @@ class Repairer {
Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_, iter, &meta,
icmp_.user_comparator(), 0, 0, true);
icmp_.user_comparator(), 0, 0,
kNoCompression);
delete iter;
delete mem->Unref();
mem = nullptr;
......
......@@ -24,6 +24,7 @@
#include "rocksdb/write_batch.h"
#include "rocksdb/options.h"
#include "rocksdb/merge_operator.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/memtable.h"
......@@ -258,7 +259,62 @@ class MemTableInserter : public WriteBatch::Handler {
}
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
mem_->Add(sequence_, kTypeMerge, key, value);
bool perform_merge = false;
if (options_->max_successive_merges > 0 && db_ != nullptr) {
LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head
// of the key in the memtable
size_t num_merges = mem_->CountSuccessiveMergeEntries(lkey);
if (num_merges >= options_->max_successive_merges) {
perform_merge = true;
}
}
if (perform_merge) {
// 1) Get the existing value
std::string get_value;
// Pass in the sequence number so that we also include previous merge
// operations in the same batch.
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions read_options;
read_options.snapshot = &read_from_snapshot;
db_->Get(read_options, key, &get_value);
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = options_->merge_operator.get();
assert(merge_operator);
std::deque<std::string> operands;
operands.push_front(value.ToString());
std::string new_value;
if (!merge_operator->FullMerge(key,
&get_value_slice,
operands,
&new_value,
options_->info_log.get())) {
// Failed to merge!
RecordTick(options_->statistics.get(), NUMBER_MERGE_FAILURES);
// Store the delta in memtable
perform_merge = false;
} else {
// 3) Add value to memtable
mem_->Add(sequence_, kTypeValue, key, new_value);
}
}
if (!perform_merge) {
// Add merge operator to memtable
mem_->Add(sequence_, kTypeMerge, key, value);
}
sequence_++;
}
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
......
......@@ -311,6 +311,7 @@ extern void rocksdb_cache_destroy(rocksdb_cache_t* cache);
extern rocksdb_env_t* rocksdb_create_default_env();
extern void rocksdb_env_set_background_threads(rocksdb_env_t* env, int n);
extern void rocksdb_env_set_high_priority_background_threads(rocksdb_env_t* env, int n);
extern void rocksdb_env_destroy(rocksdb_env_t*);
/* Universal Compaction options */
......
......@@ -44,15 +44,15 @@ using std::shared_ptr;
enum CompressionType : char {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZlibCompression = 0x2,
kBZip2Compression = 0x3
};
enum CompactionStyle : char {
kCompactionStyleLevel = 0x0, // level based compaction style
kCompactionStyleUniversal = 0x1 // Universal compaction style
kCompactionStyleLevel = 0x0, // level based compaction style
kCompactionStyleUniversal = 0x1 // Universal compaction style
};
// Compression options for different compression algorithms like Zlib
......@@ -60,12 +60,9 @@ struct CompressionOptions {
int window_bits;
int level;
int strategy;
CompressionOptions():window_bits(-14),
level(-1),
strategy(0){}
CompressionOptions(int wbits, int lev, int strategy):window_bits(wbits),
level(lev),
strategy(strategy){}
CompressionOptions() : window_bits(-14), level(-1), strategy(0) {}
CompressionOptions(int wbits, int lev, int strategy)
: window_bits(wbits), level(lev), strategy(strategy) {}
};
struct Options;
......@@ -180,7 +177,6 @@ struct ColumnFamilyOptions {
// Default: 16
int block_restart_interval;
// Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically.
//
......@@ -211,7 +207,7 @@ struct ColumnFamilyOptions {
// java/C api hard to construct.
std::vector<CompressionType> compression_per_level;
//different options for compression algorithms
// different options for compression algorithms
CompressionOptions compression_opts;
// If non-nullptr, use the specified filter policy to reduce disk reads.
......@@ -290,7 +286,6 @@ struct ColumnFamilyOptions {
// will be 20MB, total file size for level-2 will be 200MB,
// and total file size for level-3 will be 2GB.
// by default 'max_bytes_for_level_base' is 10MB.
uint64_t max_bytes_for_level_base;
// by default 'max_bytes_for_level_base' is 10.
......@@ -426,6 +421,17 @@ struct ColumnFamilyOptions {
// Default: 10000, if inplace_update_support = true, else 0.
size_t inplace_update_num_locks;
// Maximum number of successive merge operations on a key in the memtable.
//
// When a merge operation is added to the memtable and the maximum number of
// successive merges is reached, the value of the key will be calculated and
// inserted into the memtable instead of the merge operation. This will
// ensure that there are never more than max_successive_merges merge
// operations in the memtable.
//
// Default: 0 (disabled)
size_t max_successive_merges;
// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
......@@ -560,6 +566,14 @@ struct DBOptions {
// If <= 0, a proper value is automatically calculated (usually 1/10 of
// writer_buffer_size).
//
// There are two additonal restriction of the The specified size:
// (1) size should be in the range of [4096, 2 << 30] and
// (2) be the multiple of the CPU word (which helps with the memory
// alignment).
//
// We'll automatically check and adjust the size number to make sure it
// conforms to the restrictions.
//
// Default: 0
size_t arena_block_size;
......@@ -614,7 +628,12 @@ struct DBOptions {
// Specify the file access pattern once a compaction is started.
// It will be applied to all input files of a compaction.
// Default: NORMAL
enum { NONE, NORMAL, SEQUENTIAL, WILLNEED } access_hint_on_compaction_start;
enum {
NONE,
NORMAL,
SEQUENTIAL,
WILLNEED
} access_hint_on_compaction_start;
// Use adaptive mutex, which spins in the user space before resorting
// to kernel. This could reduce context switch when the mutex is not
......@@ -666,7 +685,7 @@ struct Options : public DBOptions, public ColumnFamilyOptions {
// the block cache. It will not page in data from the OS cache or data that
// resides in storage.
enum ReadTier {
kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage
kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage
kBlockCacheTier = 0x1 // data in memtable or block cache
};
......@@ -719,13 +738,14 @@ struct ReadOptions {
prefix_seek(false),
snapshot(nullptr),
prefix(nullptr),
read_tier(kReadAllTier) {
}
ReadOptions(bool cksum, bool cache) :
verify_checksums(cksum), fill_cache(cache),
prefix_seek(false), snapshot(nullptr), prefix(nullptr),
read_tier(kReadAllTier) {
}
read_tier(kReadAllTier) {}
ReadOptions(bool cksum, bool cache)
: verify_checksums(cksum),
fill_cache(cache),
prefix_seek(false),
snapshot(nullptr),
prefix(nullptr),
read_tier(kReadAllTier) {}
};
// Options that control write operations
......@@ -752,10 +772,7 @@ struct WriteOptions {
// and the write may got lost after a crash.
bool disableWAL;
WriteOptions()
: sync(false),
disableWAL(false) {
}
WriteOptions() : sync(false), disableWAL(false) {}
};
// Options that control flush operations
......@@ -764,9 +781,7 @@ struct FlushOptions {
// Default: true
bool wait;
FlushOptions()
: wait(true) {
}
FlushOptions() : wait(true) {}
};
} // namespace rocksdb
......
......@@ -31,6 +31,14 @@ struct BackupableDBOptions {
// Default: nullptr
Env* backup_env;
// If share_table_files == true, backup will assume that table files with
// same name have the same contents. This enables incremental backups and
// avoids unnecessary data copies.
// If share_table_files == false, each backup will be on its own and will
// not share any data with other backups.
// default: true
bool share_table_files;
// Backup info and error messages will be written to info_log
// if non-nullptr.
// Default: nullptr
......@@ -49,6 +57,7 @@ struct BackupableDBOptions {
explicit BackupableDBOptions(const std::string& _backup_dir,
Env* _backup_env = nullptr,
bool _share_table_files = true,
Logger* _info_log = nullptr,
bool _sync = true,
bool _destroy_old_data = false) :
......@@ -93,6 +102,14 @@ class BackupableDB : public StackableDB {
Status PurgeOldBackups(uint32_t num_backups_to_keep);
// deletes a specific backup
Status DeleteBackup(BackupID backup_id);
// Call this from another thread if you want to stop the backup
// that is currently happening. It will return immediatelly, will
// not wait for the backup to stop.
// The backup will stop ASAP and the call to CreateNewBackup will
// return Status::Incomplete(). It will not clean up after itself, but
// the state will remain consistent. The state will be cleaned up
// next time you create BackupableDB or RestoreBackupableDB.
void StopBackup();
private:
BackupEngine* backup_engine_;
......@@ -108,9 +125,10 @@ class RestoreBackupableDB {
void GetBackupInfo(std::vector<BackupInfo>* backup_info);
// restore from backup with backup_id
// IMPORTANT -- if you restore from some backup that is not the latest,
// and you start creating new backups from the new DB, all the backups
// that were newer than the backup you restored from will be deleted
// IMPORTANT -- if options_.share_table_files == true and you restore DB
// from some backup that is not the latest, and you start creating new
// backups from the new DB, all the backups that were newer than the
// backup you restored from will be deleted
//
// Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3.
// If you try creating a new backup now, old backups 4 and 5 will be deleted
......
......@@ -8,71 +8,86 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/arena_impl.h"
#include <algorithm>
namespace rocksdb {
ArenaImpl::ArenaImpl(size_t block_size) {
if (block_size < kMinBlockSize) {
block_size_ = kMinBlockSize;
} else if (block_size > kMaxBlockSize) {
block_size_ = kMaxBlockSize;
} else {
block_size_ = block_size;
const size_t ArenaImpl::kMinBlockSize = 4096;
const size_t ArenaImpl::kMaxBlockSize = 2 << 30;
static const int kAlignUnit = sizeof(void*);
size_t OptimizeBlockSize(size_t block_size) {
// Make sure block_size is in optimal range
block_size = std::max(ArenaImpl::kMinBlockSize, block_size);
block_size = std::min(ArenaImpl::kMaxBlockSize, block_size);
// make sure block_size is the multiple of kAlignUnit
if (block_size % kAlignUnit != 0) {
block_size = (1 + block_size / kAlignUnit) * kAlignUnit;
}
blocks_memory_ = 0;
alloc_ptr_ = nullptr; // First allocation will allocate a block
alloc_bytes_remaining_ = 0;
return block_size;
}
ArenaImpl::ArenaImpl(size_t block_size)
: kBlockSize(OptimizeBlockSize(block_size)) {
assert(kBlockSize >= kMinBlockSize && kBlockSize <= kMaxBlockSize &&
kBlockSize % kAlignUnit == 0);
}
ArenaImpl::~ArenaImpl() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
for (const auto& block : blocks_) {
delete[] block;
}
}
char* ArenaImpl::AllocateFallback(size_t bytes) {
if (bytes > block_size_ / 4) {
char* ArenaImpl::AllocateFallback(size_t bytes, bool aligned) {
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
return AllocateNewBlock(bytes);
}
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(block_size_);
alloc_bytes_remaining_ = block_size_;
auto block_head = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize - bytes;
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
if (aligned) {
aligned_alloc_ptr_ = block_head + bytes;
unaligned_alloc_ptr_ = block_head + kBlockSize;
return block_head;
} else {
aligned_alloc_ptr_ = block_head;
unaligned_alloc_ptr_ = block_head + kBlockSize - bytes;
return unaligned_alloc_ptr_;
}
}
char* ArenaImpl::AllocateAligned(size_t bytes) {
const int align = sizeof(void*); // We'll align to pointer size
assert((align & (align-1)) == 0); // Pointer size should be a power of 2
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
assert((kAlignUnit & (kAlignUnit - 1)) ==
0); // Pointer size should be a power of 2
size_t current_mod =
reinterpret_cast<uintptr_t>(aligned_alloc_ptr_) & (kAlignUnit - 1);
size_t slop = (current_mod == 0 ? 0 : kAlignUnit - current_mod);
size_t needed = bytes + slop;
char* result;
if (needed <= alloc_bytes_remaining_) {
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
result = aligned_alloc_ptr_ + slop;
aligned_alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
result = AllocateFallback(bytes, true /* aligned */);
}
assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
assert((reinterpret_cast<uintptr_t>(result) & (kAlignUnit - 1)) == 0);
return result;
}
char* ArenaImpl::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
char* block = new char[block_bytes];
blocks_memory_ += block_bytes;
blocks_.push_back(result);
return result;
blocks_.push_back(block);
return block;
}
} // namespace rocksdb
......@@ -22,49 +22,54 @@ namespace rocksdb {
class ArenaImpl : public Arena {
public:
// No copying allowed
ArenaImpl(const ArenaImpl&) = delete;
void operator=(const ArenaImpl&) = delete;
static const size_t kMinBlockSize;
static const size_t kMaxBlockSize;
explicit ArenaImpl(size_t block_size = kMinBlockSize);
virtual ~ArenaImpl();
virtual char* Allocate(size_t bytes);
virtual char* Allocate(size_t bytes) override;
virtual char* AllocateAligned(size_t bytes);
virtual char* AllocateAligned(size_t bytes) override;
// Returns an estimate of the total memory usage of data allocated
// by the arena (including space allocated but not yet used for user
// by the arena (exclude the space allocated but not yet used for future
// allocations).
//
// TODO: Do we need to exclude space allocated but not used?
virtual const size_t ApproximateMemoryUsage() {
return blocks_memory_ + blocks_.capacity() * sizeof(char*);
return blocks_memory_ + blocks_.capacity() * sizeof(char*) -
alloc_bytes_remaining_;
}
virtual const size_t MemoryAllocatedBytes() {
virtual const size_t MemoryAllocatedBytes() override {
return blocks_memory_;
}
private:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);
static const size_t kMinBlockSize = 4096;
static const size_t kMaxBlockSize = 2 << 30;
// Number of bytes allocated in one block
size_t block_size_;
// Allocation state
char* alloc_ptr_;
size_t alloc_bytes_remaining_;
const size_t kBlockSize;
// Array of new[] allocated memory blocks
std::vector<char*> blocks_;
typedef std::vector<char*> Blocks;
Blocks blocks_;
// Stats for current active block.
// For each block, we allocate aligned memory chucks from one end and
// allocate unaligned memory chucks from the other end. Otherwise the
// memory waste for alignment will be higher if we allocate both types of
// memory from one direction.
char* unaligned_alloc_ptr_ = nullptr;
char* aligned_alloc_ptr_ = nullptr;
// How many bytes left in currently active block?
size_t alloc_bytes_remaining_ = 0;
char* AllocateFallback(size_t bytes, bool aligned);
char* AllocateNewBlock(size_t block_bytes);
// Bytes of memory in blocks allocated so far
size_t blocks_memory_;
// No copying allowed
ArenaImpl(const ArenaImpl&);
void operator=(const ArenaImpl&);
size_t blocks_memory_ = 0;
};
inline char* ArenaImpl::Allocate(size_t bytes) {
......@@ -73,12 +78,16 @@ inline char* ArenaImpl::Allocate(size_t bytes) {
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
unaligned_alloc_ptr_ -= bytes;
alloc_bytes_remaining_ -= bytes;
return result;
return unaligned_alloc_ptr_;
}
return AllocateFallback(bytes);
return AllocateFallback(bytes, false /* unaligned */);
}
// check and adjust the block_size so that the return value is
// 1. in the range of [kMinBlockSize, kMaxBlockSize].
// 2. the multiple of align unit.
extern size_t OptimizeBlockSize(size_t block_size);
} // namespace rocksdb
......@@ -57,8 +57,34 @@ TEST(ArenaImplTest, MemoryAllocatedBytes) {
ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated);
}
// Make sure we didn't count the allocate but not used memory space in
// Arena::ApproximateMemoryUsage()
TEST(ArenaImplTest, ApproximateMemoryUsageTest) {
const size_t kBlockSize = 4096;
const size_t kEntrySize = kBlockSize / 8;
const size_t kZero = 0;
ArenaImpl arena(kBlockSize);
ASSERT_EQ(kZero, arena.ApproximateMemoryUsage());
auto num_blocks = kBlockSize / kEntrySize;
// first allocation
arena.AllocateAligned(kEntrySize);
auto mem_usage = arena.MemoryAllocatedBytes();
ASSERT_EQ(mem_usage, kBlockSize);
auto usage = arena.ApproximateMemoryUsage();
ASSERT_LT(usage, mem_usage);
for (size_t i = 1; i < num_blocks; ++i) {
arena.AllocateAligned(kEntrySize);
ASSERT_EQ(mem_usage, arena.MemoryAllocatedBytes());
ASSERT_EQ(arena.ApproximateMemoryUsage(), usage + kEntrySize);
usage = arena.ApproximateMemoryUsage();
}
ASSERT_GT(usage, mem_usage);
}
TEST(ArenaImplTest, Simple) {
std::vector<std::pair<size_t, char*> > allocated;
std::vector<std::pair<size_t, char*>> allocated;
ArenaImpl arena_impl;
const int N = 100000;
size_t bytes = 0;
......@@ -68,8 +94,9 @@ TEST(ArenaImplTest, Simple) {
if (i % (N / 10) == 0) {
s = i;
} else {
s = rnd.OneIn(4000) ? rnd.Uniform(6000) :
(rnd.OneIn(10) ? rnd.Uniform(100) : rnd.Uniform(20));
s = rnd.OneIn(4000)
? rnd.Uniform(6000)
: (rnd.OneIn(10) ? rnd.Uniform(100) : rnd.Uniform(20));
}
if (s == 0) {
// Our arena disallows size 0 allocations.
......@@ -89,7 +116,7 @@ TEST(ArenaImplTest, Simple) {
bytes += s;
allocated.push_back(std::make_pair(s, r));
ASSERT_GE(arena_impl.ApproximateMemoryUsage(), bytes);
if (i > N/10) {
if (i > N / 10) {
ASSERT_LE(arena_impl.ApproximateMemoryUsage(), bytes * 1.10);
}
}
......
......@@ -128,7 +128,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
table_factory(options.table_factory),
table_properties_collectors(options.table_properties_collectors),
inplace_update_support(options.inplace_update_support),
inplace_update_num_locks(options.inplace_update_num_locks) {
inplace_update_num_locks(options.inplace_update_num_locks),
max_successive_merges(0) {
assert(memtable_factory.get() != nullptr);
}
......@@ -389,6 +390,8 @@ Options::Dump(Logger* log) const
inplace_update_support);
Log(log, " Options.inplace_update_num_locks: %zd",
inplace_update_num_locks);
Log(log, " Options.max_successive_merges: %zd",
max_successive_merges);
} // Options::Dump
//
......
......@@ -15,9 +15,10 @@ class StopWatch {
explicit StopWatch(
Env * const env,
Statistics* statistics = nullptr,
const Histograms histogram_name = DB_GET) :
const Histograms histogram_name = DB_GET,
bool auto_start = true) :
env_(env),
start_time_(env->NowMicros()),
start_time_((!auto_start && !statistics) ? 0 : env->NowMicros()),
statistics_(statistics),
histogram_name_(histogram_name) {}
......
......@@ -20,6 +20,7 @@
#include <map>
#include <string>
#include <limits>
#include <atomic>
namespace rocksdb {
......@@ -31,6 +32,9 @@ class BackupEngine {
Status CreateNewBackup(DB* db, bool flush_before_backup = false);
Status PurgeOldBackups(uint32_t num_backups_to_keep);
Status DeleteBackup(BackupID backup_id);
void StopBackup() {
stop_backup_.store(true, std::memory_order_release);
}
void GetBackupInfo(std::vector<BackupInfo>* backup_info);
Status RestoreDBFromBackup(BackupID backup_id, const std::string &db_dir,
......@@ -106,13 +110,16 @@ class BackupEngine {
return "private";
}
inline std::string GetPrivateFileRel(BackupID backup_id,
const std::string &file = "") const {
bool tmp = false,
const std::string& file = "") const {
assert(file.size() == 0 || file[0] != '/');
return GetPrivateDirRel() + "/" + std::to_string(backup_id) + "/" + file;
return GetPrivateDirRel() + "/" + std::to_string(backup_id) +
(tmp ? ".tmp" : "") + "/" + file;
}
inline std::string GetSharedFileRel(const std::string& file = "") const {
inline std::string GetSharedFileRel(const std::string& file = "",
bool tmp = false) const {
assert(file.size() == 0 || file[0] != '/');
return "shared/" + file;
return "shared/" + file + (tmp ? ".tmp" : "");
}
inline std::string GetLatestBackupFile(bool tmp = false) const {
return GetAbsolutePath(std::string("LATEST_BACKUP") + (tmp ? ".tmp" : ""));
......@@ -151,6 +158,7 @@ class BackupEngine {
std::map<BackupID, BackupMeta> backups_;
std::unordered_map<std::string, int> backuped_file_refs_;
std::vector<BackupID> obsolete_backups_;
std::atomic<bool> stop_backup_;
// options data
BackupableDBOptions options_;
......@@ -161,13 +169,17 @@ class BackupEngine {
};
BackupEngine::BackupEngine(Env* db_env, const BackupableDBOptions& options)
: options_(options),
db_env_(db_env),
backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_) {
: stop_backup_(false),
options_(options),
db_env_(db_env),
backup_env_(options.backup_env != nullptr ? options.backup_env
: db_env_) {
// create all the dirs we need
backup_env_->CreateDirIfMissing(GetAbsolutePath());
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel()));
if (options_.share_table_files) {
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel()));
}
backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel()));
backup_env_->CreateDirIfMissing(GetBackupMetaDir());
......@@ -298,8 +310,9 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
Log(options_.info_log, "Started the backup process -- creating backup %u",
new_backup_id);
// create private dir
s = backup_env_->CreateDir(GetAbsolutePath(GetPrivateFileRel(new_backup_id)));
// create temporary private dir
s = backup_env_->CreateDir(
GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
// copy live_files
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
......@@ -320,7 +333,7 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
// * if it's kDescriptorFile, limit the size to manifest_file_size
s = BackupFile(new_backup_id,
&new_backup,
type == kTableFile, /* shared */
options_.share_table_files && type == kTableFile,
db->GetName(), /* src_dir */
live_files[i], /* src_fname */
(type == kDescriptorFile) ? manifest_file_size : 0);
......@@ -342,6 +355,13 @@ Status BackupEngine::CreateNewBackup(DB* db, bool flush_before_backup) {
// we copied all the files, enable file deletions
db->EnableFileDeletions();
if (s.ok()) {
// move tmp private backup to real backup folder
s = backup_env_->RenameFile(
GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)), // tmp
GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)));
}
if (s.ok()) {
// persist the backup metadata on the disk
s = new_backup.StoreToFile(options_.sync);
......@@ -561,6 +581,9 @@ Status BackupEngine::CopyFile(const std::string& src,
Slice data;
do {
if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped");
}
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
copy_file_buffer_size_ : size_limit;
s = src_file->Read(buffer_to_read, &data, buf.get());
......@@ -590,12 +613,16 @@ Status BackupEngine::BackupFile(BackupID backup_id,
assert(src_fname.size() > 0 && src_fname[0] == '/');
std::string dst_relative = src_fname.substr(1);
std::string dst_relative_tmp;
if (shared) {
dst_relative = GetSharedFileRel(dst_relative);
dst_relative_tmp = GetSharedFileRel(dst_relative, true);
dst_relative = GetSharedFileRel(dst_relative, false);
} else {
dst_relative = GetPrivateFileRel(backup_id, dst_relative);
dst_relative_tmp = GetPrivateFileRel(backup_id, true, dst_relative);
dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
}
std::string dst_path = GetAbsolutePath(dst_relative);
std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp);
Status s;
uint64_t size;
......@@ -607,12 +634,15 @@ Status BackupEngine::BackupFile(BackupID backup_id,
} else {
Log(options_.info_log, "Copying %s", src_fname.c_str());
s = CopyFile(src_dir + src_fname,
dst_path,
dst_path_tmp,
db_env_,
backup_env_,
options_.sync,
&size,
size_limit);
if (s.ok() && shared) {
s = backup_env_->RenameFile(dst_path_tmp, dst_path);
}
}
if (s.ok()) {
backup->AddFile(dst_relative, size);
......@@ -671,14 +701,16 @@ void BackupEngine::GarbageCollection(bool full_scan) {
&private_children);
for (auto& child : private_children) {
BackupID backup_id = 0;
bool tmp_dir = child.find(".tmp") != std::string::npos;
sscanf(child.c_str(), "%u", &backup_id);
if (backup_id == 0 || backups_.find(backup_id) != backups_.end()) {
if (!tmp_dir && // if it's tmp_dir, delete it
(backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
// it's either not a number or it's still alive. continue
continue;
}
// here we have to delete the dir and all its children
std::string full_private_path =
GetAbsolutePath(GetPrivateFileRel(backup_id));
GetAbsolutePath(GetPrivateFileRel(backup_id, tmp_dir));
std::vector<std::string> subchildren;
backup_env_->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
......@@ -813,7 +845,9 @@ Status BackupEngine::BackupMeta::StoreToFile(bool sync) {
BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options)
: StackableDB(db), backup_engine_(new BackupEngine(db->GetEnv(), options)) {
backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber());
if (options.share_table_files) {
backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber());
}
}
BackupableDB::~BackupableDB() {
......@@ -836,6 +870,10 @@ Status BackupableDB::DeleteBackup(BackupID backup_id) {
return backup_engine_->DeleteBackup(backup_id);
}
void BackupableDB::StopBackup() {
backup_engine_->StopBackup();
}
// --- RestoreBackupableDB methods ------
RestoreBackupableDB::RestoreBackupableDB(Env* db_env,
......
......@@ -307,7 +307,7 @@ class BackupableDBTest {
CreateLoggerFromOptions(dbname_, backupdir_, env_,
Options(), &logger_);
backupable_options_.reset(new BackupableDBOptions(
backupdir_, test_backup_env_.get(), logger_.get(), true));
backupdir_, test_backup_env_.get(), true, logger_.get(), true));
// delete old files in db
DestroyDB(dbname_, Options());
......@@ -319,7 +319,8 @@ class BackupableDBTest {
return db;
}
void OpenBackupableDB(bool destroy_old_data = false, bool dummy = false) {
void OpenBackupableDB(bool destroy_old_data = false, bool dummy = false,
bool share_table_files = true) {
// reset all the defaults
test_backup_env_->SetLimitWrittenFiles(1000000);
test_db_env_->SetLimitWrittenFiles(1000000);
......@@ -333,6 +334,7 @@ class BackupableDBTest {
ASSERT_OK(DB::Open(options_, dbname_, &db));
}
backupable_options_->destroy_old_data = destroy_old_data;
backupable_options_->share_table_files = share_table_files;
db_.reset(new BackupableDB(db, *backupable_options_));
}
......@@ -661,6 +663,38 @@ TEST(BackupableDBTest, DeleteNewerBackups) {
CloseRestoreDB();
}
TEST(BackupableDBTest, NoShareTableFiles) {
const int keys_iteration = 5000;
OpenBackupableDB(true, false, false);
for (int i = 0; i < 5; ++i) {
FillDB(db_.get(), keys_iteration * i, keys_iteration * (i + 1));
ASSERT_OK(db_->CreateNewBackup(!!(i % 2)));
}
CloseBackupableDB();
for (int i = 0; i < 5; ++i) {
AssertBackupConsistency(i + 1, 0, keys_iteration * (i + 1),
keys_iteration * 6);
}
}
TEST(BackupableDBTest, DeleteTmpFiles) {
OpenBackupableDB();
CloseBackupableDB();
std::string shared_tmp = backupdir_ + "/shared/00006.sst.tmp";
std::string private_tmp_dir = backupdir_ + "/private/10.tmp";
std::string private_tmp_file = private_tmp_dir + "/00003.sst";
file_manager_->WriteToFile(shared_tmp, "tmp");
file_manager_->CreateDir(private_tmp_dir);
file_manager_->WriteToFile(private_tmp_file, "tmp");
ASSERT_EQ(true, file_manager_->FileExists(private_tmp_dir));
OpenBackupableDB();
CloseBackupableDB();
ASSERT_EQ(false, file_manager_->FileExists(shared_tmp));
ASSERT_EQ(false, file_manager_->FileExists(private_tmp_file));
ASSERT_EQ(false, file_manager_->FileExists(private_tmp_dir));
}
} // anon namespace
} // namespace rocksdb
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册