提交 ccdb93e7 编写于 作者: I Igor Canadi

Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/db_impl.h
	db/memtable_list.cc
	db/memtable_list.h
	db/version_set.cc
	db/version_set.h
......@@ -12,6 +12,7 @@
* Removed arena.h from public header files.
* By default, checksums are verified on every read from database
* Added is_manual_compaction to CompactionFilter::Context
## 2.7.0 (01/28/2014)
......
......@@ -145,7 +145,7 @@ endif # PLATFORM_SHARED_EXT
all: $(LIBRARY) $(PROGRAMS)
dbg: $(PROGRAMS)
dbg: $(LIBRARY) $(PROGRAMS)
# Will also generate shared libraries.
release:
......
......@@ -19,7 +19,8 @@
#
# -DLEVELDB_PLATFORM_POSIX if cstdatomic is present
# -DLEVELDB_PLATFORM_NOATOMIC if it is not
# -DSNAPPY if the Snappy library is present
# -DSNAPPY if the Snappy library is present
# -DLZ4 if the LZ4 library is present
#
# Using gflags in rocksdb:
# Our project depends on gflags, which requires users to take some extra steps
......@@ -38,7 +39,7 @@ if test -z "$OUTPUT"; then
fi
# we depend on C++11
PLATFORM_CXXFLAGS="-std=gnu++11"
PLATFORM_CXXFLAGS="-std=c++11"
# we currently depend on POSIX platform
COMMON_FLAGS="-DROCKSDB_PLATFORM_POSIX"
......@@ -244,6 +245,17 @@ EOF
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lbz2"
fi
# Test whether lz4 library is installed
$CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <lz4.h>
#include <lz4hc.h>
int main() {}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DLZ4"
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -llz4"
fi
# Test whether tcmalloc is available
$CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <<EOF
int main() {}
......
......@@ -110,8 +110,6 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
}
SuperVersion::SuperVersion() {}
SuperVersion::~SuperVersion() {
for (auto td : to_delete) {
delete td;
......
......@@ -72,7 +72,7 @@ struct SuperVersion {
autovector<MemTable*> to_delete;
// should be called outside the mutex
SuperVersion();
SuperVersion() = default;
~SuperVersion();
SuperVersion* Ref();
// Returns true if this was the last reference and caller should
......
......@@ -41,6 +41,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
score_(0),
bottommost_level_(false),
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
cfd_->Ref();
......
......@@ -85,6 +85,9 @@ class Compaction {
// Does this compaction include all sst files?
bool IsFullCompaction() { return is_full_compaction_; }
// Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; }
private:
friend class CompactionPicker;
friend class UniversalCompactionPicker;
......@@ -125,6 +128,9 @@ class Compaction {
// Does this compaction include all sst files?
bool is_full_compaction_;
// Is this compaction requested by the client?
bool is_manual_compaction_;
// level_ptrs_ holds indices into input_version_->levels_: our state
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
......
......@@ -358,6 +358,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
// Is this compaction creating a file at the bottommost level
c->SetupBottomMostLevel(true);
c->is_manual_compaction_ = true;
return c;
}
......
......@@ -60,8 +60,8 @@ DEFINE_string(benchmarks,
"randomwithverify,"
"fill100K,"
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"compress,"
"uncompress,"
"acquireload,"
"fillfromstdin,",
......@@ -338,6 +338,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
return rocksdb::kZlibCompression;
else if (!strcasecmp(ctype, "bzip2"))
return rocksdb::kBZip2Compression;
else if (!strcasecmp(ctype, "lz4"))
return rocksdb::kLZ4Compression;
else if (!strcasecmp(ctype, "lz4hc"))
return rocksdb::kLZ4HCCompression;
fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
return rocksdb::kSnappyCompression; //default value
......@@ -479,7 +483,8 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) {
}
return true;
}
DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipList");
DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and "
"plain table");
enum RepFactory {
kSkipList,
......@@ -501,6 +506,8 @@ enum RepFactory StringToRepFactory(const char* ctype) {
}
static enum RepFactory FLAGS_rep_factory;
DEFINE_string(memtablerep, "skip_list", "");
DEFINE_bool(use_plain_table, false, "if use plain table "
"instead of block-based table format");
DEFINE_string(merge_operator, "", "The merge operator to use with the database."
"If a new merge operator is specified, be sure to use fresh"
......@@ -841,7 +848,13 @@ class Benchmark {
case rocksdb::kBZip2Compression:
fprintf(stdout, "Compression: bzip2\n");
break;
}
case rocksdb::kLZ4Compression:
fprintf(stdout, "Compression: lz4\n");
break;
case rocksdb::kLZ4HCCompression:
fprintf(stdout, "Compression: lz4hc\n");
break;
}
switch (FLAGS_rep_factory) {
case kPrefixHash:
......@@ -896,6 +909,16 @@ class Benchmark {
strlen(text), &compressed);
name = "BZip2";
break;
case kLZ4Compression:
result = port::LZ4_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "LZ4";
break;
case kLZ4HCCompression:
result = port::LZ4HC_Compress(Options().compression_opts, text,
strlen(text), &compressed);
name = "LZ4HC";
break;
case kNoCompression:
assert(false); // cannot happen
break;
......@@ -975,7 +998,8 @@ class Benchmark {
filter_policy_(FLAGS_bloom_bits >= 0
? NewBloomFilterPolicy(FLAGS_bloom_bits)
: nullptr),
prefix_extractor_(NewFixedPrefixTransform(FLAGS_key_size-1)),
prefix_extractor_(NewFixedPrefixTransform(FLAGS_use_plain_table ?
FLAGS_prefix_size : FLAGS_key_size-1)),
db_(nullptr),
num_(FLAGS_num),
value_size_(FLAGS_value_size),
......@@ -1146,10 +1170,10 @@ class Benchmark {
method = &Benchmark::Crc32c;
} else if (name == Slice("acquireload")) {
method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
method = &Benchmark::SnappyUncompress;
} else if (name == Slice("compress")) {
method = &Benchmark::Compress;
} else if (name == Slice("uncompress")) {
method = &Benchmark::Uncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
......@@ -1302,23 +1326,47 @@ class Benchmark {
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}
void SnappyCompress(ThreadState* thread) {
void Compress(ThreadState *thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
std::string compressed;
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
// Compress 1G
while (ok && bytes < int64_t(1) << 30) {
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
}
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
thread->stats.AddMessage("(compression failure)");
} else {
char buf[100];
snprintf(buf, sizeof(buf), "(output: %.1f%%)",
......@@ -1328,24 +1376,78 @@ class Benchmark {
}
}
void SnappyUncompress(ThreadState* thread) {
void Uncompress(ThreadState *thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
bool ok;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
ok = port::Snappy_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kZlibCompression:
ok = port::Zlib_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kBZip2Compression:
ok = port::BZip2_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4Compression:
ok = port::LZ4_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
case rocksdb::kLZ4HCCompression:
ok = port::LZ4HC_Compress(Options().compression_opts, input.data(),
input.size(), &compressed);
break;
default:
ok = false;
}
int64_t bytes = 0;
char* uncompressed = new char[input.size()];
while (ok && bytes < 1024 * 1048576) { // Compress 1G
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
int decompress_size;
while (ok && bytes < 1024 * 1048576) {
char *uncompressed = nullptr;
switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression:
// allocate here to make comparison fair
uncompressed = new char[input.size()];
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
break;
case rocksdb::kZlibCompression:
uncompressed = port::Zlib_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kBZip2Compression:
uncompressed = port::BZip2_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4Compression:
uncompressed = port::LZ4_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kLZ4HCCompression:
uncompressed = port::LZ4_Uncompress(
compressed.data(), compressed.size(), &decompress_size);
ok = uncompressed != nullptr;
break;
default:
ok = false;
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedSingleOp(nullptr);
}
delete[] uncompressed;
if (!ok) {
thread->stats.AddMessage("(snappy failure)");
thread->stats.AddMessage("(compression failure)");
} else {
thread->stats.AddBytes(bytes);
}
......@@ -1368,8 +1470,9 @@ class Benchmark {
options.compaction_style = FLAGS_compaction_style_e;
options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_;
options.prefix_extractor = FLAGS_use_prefix_blooms ? prefix_extractor_
: nullptr;
options.prefix_extractor =
(FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_
: nullptr;
options.max_open_files = FLAGS_open_files;
options.statistics = dbstats;
options.env = FLAGS_env;
......@@ -1383,8 +1486,8 @@ class Benchmark {
FLAGS_max_bytes_for_level_multiplier;
options.filter_deletes = FLAGS_filter_deletes;
if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) {
fprintf(stderr,
"prefix_size should be non-zero iff memtablerep == prefix_hash\n");
fprintf(stderr, "prefix_size should be non-zero iff memtablerep "
"== prefix_hash\n");
exit(1);
}
switch (FLAGS_rep_factory) {
......@@ -1401,6 +1504,22 @@ class Benchmark {
);
break;
}
if (FLAGS_use_plain_table) {
if (FLAGS_rep_factory != kPrefixHash) {
fprintf(stderr, "Waring: plain table is used with skipList\n");
}
if (!FLAGS_mmap_read && !FLAGS_mmap_write) {
fprintf(stderr, "plain table format requires mmap to operate\n");
exit(1);
}
int bloom_bits_per_key = FLAGS_bloom_bits;
if (bloom_bits_per_key < 0) {
bloom_bits_per_key = 0;
}
options.table_factory = std::shared_ptr<TableFactory>(
NewPlainTableFactory(FLAGS_key_size, bloom_bits_per_key, 0.75));
}
if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() > 0) {
if (FLAGS_max_bytes_for_level_multiplier_additional_v.size() !=
(unsigned int)FLAGS_num_levels) {
......
......@@ -116,6 +116,7 @@ struct DBImpl::CompactionState {
CompactionFilter::Context GetFilterContext() {
CompactionFilter::Context context;
context.is_full_compaction = compaction->IsFullCompaction();
context.is_manual_compaction = compaction->IsManualCompaction();
return context;
}
};
......@@ -1182,11 +1183,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
DeletionState& deletion_state) {
mutex_.AssertHeld();
assert(cfd->imm()->size() != 0);
if (!cfd->imm()->IsFlushPending()) {
Log(options_.info_log, "FlushMemTableToOutputFile already in progress");
return Status::IOError("FlushMemTableToOutputFile already in progress");
}
assert(cfd->imm()->IsFlushPending());
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
......@@ -1194,7 +1191,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
cfd->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
Log(options_.info_log, "Nothing in memstore to flush");
return Status::IOError("Nothing in memstore to flush");
return Status::OK();
}
// record the logfile_number_ before we release the mutex
......@@ -1217,15 +1214,20 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(cfd, mems, edit, &file_number);
if (s.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
s = Status::IOError("Column family closed during memtable flush");
if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) {
s = Status::ShutdownInProgress(
"Column family closed during memtable flush");
}
if (!s.ok()) {
cfd->imm()->RollbackMemtableFlush(mems, file_number, &pending_outputs_);
return s;
}
// Replace immutable memtable with the generated Table
s = cfd->imm()->InstallMemtableFlushResults(
cfd, mems, versions_.get(), s, &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get());
cfd, mems, versions_.get(), &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
if (s.ok()) {
InstallSuperVersion(cfd, deletion_state);
......@@ -1410,7 +1412,8 @@ Status DBImpl::GetUpdatesSince(SequenceNumber seq,
RecordTick(options_.statistics.get(), GET_UPDATES_SINCE_CALLS);
if (seq > versions_->LastSequence()) {
return Status::IOError("Requested sequence not yet written in the db");
return Status::NotFound(
"Requested sequence not yet written in the db");
}
// Get all sorted Wal Files.
// Do binary search and open files and find the seq number.
......@@ -1474,16 +1477,19 @@ Status DBImpl::ReadFirstRecord(const WalFileType type, const uint64_t number,
if (type == kAliveLogFile) {
std::string fname = LogFileName(options_.wal_dir, number);
Status status = ReadFirstLine(fname, result);
if (!status.ok()) {
// check if the file got moved to archive.
std::string archived_file =
ArchivedLogFileName(options_.wal_dir, number);
Status s = ReadFirstLine(archived_file, result);
if (!s.ok()) {
return Status::IOError("Log File has been deleted: " + archived_file);
}
if (status.ok() || env_->FileExists(fname)) {
// return OK or any error that is not caused non-existing file
return status;
}
return Status::OK();
// check if the file got moved to archive.
std::string archived_file =
ArchivedLogFileName(options_.wal_dir, number);
Status s = ReadFirstLine(archived_file, result);
if (s.ok() || env_->FileExists(archived_file)) {
return s;
}
return Status::NotFound("Log File has been deleted: " + archived_file);
} else if (type == kArchivedLogFile) {
std::string fname = ArchivedLogFileName(options_.wal_dir, number);
Status status = ReadFirstLine(fname, result);
......@@ -1498,12 +1504,17 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
Env* env;
Logger* info_log;
const char* fname;
Status* status; // nullptr if options_.paranoid_checks==false
Status* status;
bool ignore_error; // true if options_.paranoid_checks==false
virtual void Corruption(size_t bytes, const Status& s) {
Log(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""),
(this->ignore_error ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
if (this->status != nullptr && this->status->ok()) *this->status = s;
if (this->status->ok()) {
// only keep the first error
*this->status = s;
}
}
};
......@@ -1519,23 +1530,30 @@ Status DBImpl::ReadFirstLine(const std::string& fname,
reporter.env = env_;
reporter.info_log = options_.info_log.get();
reporter.fname = fname.c_str();
reporter.status = (options_.paranoid_checks ? &status : nullptr);
reporter.status = &status;
reporter.ignore_error = !options_.paranoid_checks;
log::Reader reader(std::move(file), &reporter, true/*checksum*/,
0/*initial_offset*/);
std::string scratch;
Slice record;
if (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (reader.ReadRecord(&record, &scratch) &&
(status.ok() || !options_.paranoid_checks)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
return Status::IOError("Corruption noted");
// TODO read record's till the first no corrupt entry?
} else {
WriteBatchInternal::SetContents(batch, record);
return Status::OK();
}
WriteBatchInternal::SetContents(batch, record);
return Status::OK();
}
return Status::IOError("Error reading from file " + fname);
// ReadRecord returns false on EOF, which is deemed as OK() by Reader
if (status.ok()) {
status = Status::Corruption("eof reached");
}
return status;
}
struct CompareLogByPointer {
......@@ -2206,7 +2224,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);
return Status::IOError("Compaction input files inconsistent");
return Status::Corruption("Compaction input files inconsistent");
}
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
......@@ -2573,7 +2591,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
status = Status::IOError("Column family closing started during compaction");
status = Status::ShutdownInProgress(
"Database shutdown started during compaction");
}
if (status.ok() && compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input.get());
......@@ -3687,6 +3706,21 @@ void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
versions_->GetLiveFilesMetaData(metadata);
}
void DBImpl::TEST_GetFilesMetaData(
std::vector<std::vector<FileMetaData>>* metadata) {
MutexLock l(&mutex_);
metadata->resize(NumberLevels());
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files =
default_cf_handle_->cfd()->current()->files_[level];
(*metadata)[level].clear();
for (const auto& f : files) {
(*metadata)[level].push_back(*f);
}
}
}
Status DBImpl::GetDbIdentity(std::string& identity) {
std::string idfilename = IdentityFileName(dbname_);
unique_ptr<SequentialFile> idfile;
......
......@@ -174,6 +174,8 @@ class DBImpl : public DB {
default_interval_to_delete_obsolete_WAL_ = default_interval_to_delete_obsolete_WAL;
}
void TEST_GetFilesMetaData(std::vector<std::vector<FileMetaData>>* metadata);
// needed for CleanupIteratorState
struct DeletionState {
inline bool HaveSomethingToDelete() const {
......
......@@ -56,7 +56,19 @@ static bool BZip2CompressionSupported(const CompressionOptions& options) {
return port::BZip2_Compress(options, in.data(), in.size(), &out);
}
static std::string RandomString(Random* rnd, int len) {
static bool LZ4CompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4_Compress(options, in.data(), in.size(), &out);
}
static bool LZ4HCCompressionSupported(const CompressionOptions &options) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4HC_Compress(options, in.data(), in.size(), &out);
}
static std::string RandomString(Random *rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
......@@ -1649,6 +1661,42 @@ TEST(DBTest, Recover) {
} while (ChangeOptions());
}
TEST(DBTest, RecoverWithTableHandle) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.write_buffer_size = 100;
options.disable_auto_compactions = true;
DestroyAndReopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("bar", "v2"));
dbfull()->TEST_FlushMemTable();
ASSERT_OK(Put("foo", "v3"));
ASSERT_OK(Put("bar", "v4"));
dbfull()->TEST_FlushMemTable();
ASSERT_OK(Put("big", std::string(100, 'a')));
Reopen();
std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(&files);
int total_files = 0;
for (const auto& level : files) {
total_files += level.size();
}
ASSERT_EQ(total_files, 3);
for (const auto& level : files) {
for (const auto& file : level) {
if (kInfiniteMaxOpenFiles == option_config_) {
ASSERT_TRUE(file.table_reader_handle != nullptr);
} else {
ASSERT_TRUE(file.table_reader_handle == nullptr);
}
}
}
} while (ChangeOptions());
}
TEST(DBTest, IgnoreRecoveredLog) {
std::string backup_logs = dbname_ + "/backup_logs";
......@@ -2624,6 +2672,14 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits,
CompressionOptions(wbits, lev, strategy))) {
type = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else if (LZ4CompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (LZ4HCCompressionSupported(
CompressionOptions(wbits, lev, strategy))) {
type = kLZ4HCCompression;
fprintf(stderr, "using lz4hc\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return false;
......@@ -2917,7 +2973,11 @@ class DeleteFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
} else {
return std::unique_ptr<CompactionFilter>(nullptr);
}
}
virtual const char* Name() const override {
......
......@@ -208,116 +208,147 @@ void MemTable::Add(SequenceNumber s, ValueType type,
}
}
// Callback from MemTable::Get()
namespace {
struct Saver {
Status* status;
const LookupKey* key;
bool* found_final_value; // Is value set correctly? Used by KeyMayExist
bool* merge_in_progress;
std::string* value;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
MemTable* mem;
Logger* logger;
Statistics* statistics;
bool inplace_update_support;
};
} // namespace
static bool SaveValue(void* arg, const char* entry) {
Saver* s = reinterpret_cast<Saver*>(arg);
MergeContext* merge_context = s->merge_context;
const MergeOperator* merge_operator = s->merge_operator;
assert(s != nullptr && merge_context != nullptr);
// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (s->mem->GetInternalKeyComparator().user_comparator()->Compare(
Slice(key_ptr, key_length - 8), s->key->user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->status) = Status::OK();
if (*(s->merge_in_progress)) {
assert(merge_operator);
if (!merge_operator->FullMerge(s->key->user_key(), &v,
merge_context->GetOperands(), s->value,
s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
*(s->status) =
Status::Corruption("Error: Could not perform merge.");
}
} else {
s->value->assign(v.data(), v.size());
}
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->Unlock();
}
*(s->found_final_value) = true;
return false;
}
case kTypeDeletion: {
if (*(s->merge_in_progress)) {
assert(merge_operator);
*(s->status) = Status::OK();
if (!merge_operator->FullMerge(s->key->user_key(), nullptr,
merge_context->GetOperands(), s->value,
s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
*(s->status) =
Status::Corruption("Error: Could not perform merge.");
}
} else {
*(s->status) = Status::NotFound();
}
*(s->found_final_value) = true;
return false;
}
case kTypeMerge: {
std::string merge_result; // temporary area for merge results later
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->merge_in_progress) = true;
merge_context->PushOperand(v);
while (merge_context->GetNumOperands() >= 2) {
// Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(
s->key->user_key(), merge_context->GetOperand(0),
merge_context->GetOperand(1), &merge_result, s->logger)) {
merge_context->PushPartialMergeResult(merge_result);
} else {
// Stack them because user can't associative merge
break;
}
}
return true;
}
default:
assert(false);
return true;
}
}
// s->state could be Corrupt, merge or notfound
return false;
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext& merge_context, const Options& options) {
StopWatchNano memtable_get_timer(options.env, false);
StartPerfTimer(&memtable_get_timer);
Slice mem_key = key.memtable_key();
Slice user_key = key.user_key();
bool found_final_value = false;
bool merge_in_progress = s->IsMergeInProgress();
std::unique_ptr<MemTableRep::Iterator> iter;
if (prefix_bloom_ &&
!prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key))) {
// iter is null if prefix bloom says the key does not exist
} else {
iter.reset(table_->GetIterator(user_key));
iter->Seek(key.internal_key(), mem_key.data());
}
bool merge_in_progress = s->IsMergeInProgress();
auto merge_operator = options.merge_operator.get();
auto logger = options.info_log;
std::string merge_result;
bool found_final_value = false;
for (; !found_final_value && iter && iter->Valid(); iter->Next()) {
// entry format is:
// klength varint32
// userkey char[klength-8]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter->key();
uint32_t key_length = 0;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
if (options.inplace_update_support) {
GetLock(key.user_key())->ReadLock();
}
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*s = Status::OK();
if (merge_in_progress) {
assert(merge_operator);
if (!merge_operator->FullMerge(key.user_key(), &v,
merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
value->assign(v.data(), v.size());
}
if (options.inplace_update_support) {
GetLock(key.user_key())->Unlock();
}
found_final_value = true;
break;
}
case kTypeDeletion: {
if (merge_in_progress) {
assert(merge_operator);
*s = Status::OK();
if (!merge_operator->FullMerge(key.user_key(), nullptr,
merge_context.GetOperands(), value,
logger.get())) {
RecordTick(options.statistics.get(), NUMBER_MERGE_FAILURES);
*s = Status::Corruption("Error: Could not perform merge.");
}
} else {
*s = Status::NotFound();
}
found_final_value = true;
break;
}
case kTypeMerge: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
merge_in_progress = true;
merge_context.PushOperand(v);
while(merge_context.GetNumOperands() >= 2) {
// Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(key.user_key(),
merge_context.GetOperand(0),
merge_context.GetOperand(1),
&merge_result, logger.get())) {
merge_context.PushPartialMergeResult(merge_result);
} else {
// Stack them because user can't associative merge
break;
}
}
break;
}
default:
assert(false);
break;
}
} else {
// exit loop if user key does not match
break;
}
Saver saver;
saver.status = s;
saver.found_final_value = &found_final_value;
saver.merge_in_progress = &merge_in_progress;
saver.key = &key;
saver.value = value;
saver.status = s;
saver.mem = this;
saver.merge_context = &merge_context;
saver.merge_operator = options.merge_operator.get();
saver.logger = options.info_log.get();
saver.inplace_update_support = options.inplace_update_support;
saver.statistics = options.statistics.get();
table_->Get(key, &saver, SaveValue);
}
// No change to value, since we have not yet found a Put/Delete
if (!found_final_value && merge_in_progress) {
*s = Status::MergeInProgress("");
}
......@@ -489,4 +520,13 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) {
return num_successive_merges;
}
void MemTableRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto iter = GetIterator(k.user_key());
for (iter->Seek(k.internal_key(), k.memtable_key().data());
iter->Valid() && callback_func(callback_args, iter->key());
iter->Next()) {
}
}
} // namespace rocksdb
......@@ -154,6 +154,13 @@ class MemTable {
// Notify the underlying storage that no more items will be added
void MarkImmutable() { table_->MarkReadOnly(); }
// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);
const InternalKeyComparator& GetInternalKeyComparator() const {
return comparator_.comparator;
}
private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;
......@@ -190,9 +197,6 @@ class MemTable {
MemTable(const MemTable&);
void operator=(const MemTable&);
// Get the lock associated for the key
port::RWMutex* GetLock(const Slice& key);
const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_;
};
......
......@@ -120,31 +120,34 @@ void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
flush_requested_ = false; // start-flush request is complete
}
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t file_number,
std::set<uint64_t>* pending_outputs) {
assert(!mems.empty());
// If the flush was not successful, then just reset state.
// Maybe a suceeding attempt to flush will be successful.
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
}
pending_outputs->erase(file_number);
imm_flush_needed.Release_Store(reinterpret_cast<void *>(1));
}
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory) {
mu->AssertHeld();
// If the flush was not successful, then just reset state.
// Maybe a suceeding attempt to flush will be successful.
if (!flushStatus.ok()) {
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
imm_flush_needed.Release_Store((void *)1);
pending_outputs.erase(file_number);
}
return flushStatus;
}
// flush was sucessful
for (size_t i = 0; i < mems.size(); ++i) {
// All the edits are associated with the first memtable of this batch.
......@@ -216,7 +219,6 @@ Status MemTableList::InstallMemtableFlushResults(
pending_outputs.erase(m->file_number_);
m->file_number_ = 0;
imm_flush_needed.Release_Store((void *)1);
s = Status::IOError("Unable to commit flushed memtable");
}
++mem_id;
} while (!current_->memlist_.empty() && (m = current_->memlist_.back()) &&
......
......@@ -83,8 +83,8 @@ class MemTableList {
MemTableListVersion* current() { return current_; }
// so that backgrund threads can detect non-nullptr pointer to
// determine whether this is anything more to start flushing.
// so that background threads can detect non-nullptr pointer to
// determine whether there is anything more to start flushing.
port::AtomicPointer imm_flush_needed;
// Returns the total number of memtables in the list
......@@ -98,12 +98,20 @@ class MemTableList {
// memtables are guaranteed to be in the ascending order of created time.
void PickMemtablesToFlush(autovector<MemTable*>* mems);
// Reset status of the given memtable list back to pending state so that
// they can get picked up again on the next round of flush.
void RollbackMemtableFlush(const autovector<MemTable*>& mems,
uint64_t file_number,
std::set<uint64_t>* pending_outputs);
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const autovector<MemTable*>& m, VersionSet* vset,
Status flushStatus, port::Mutex* mu, Logger* info_log,
uint64_t file_number, std::set<uint64_t>& pending_outputs,
autovector<MemTable*>* to_delete, Directory* db_directory);
Status InstallMemtableFlushResults(ColumnFamilyData* cfd,
const autovector<MemTable*>& m,
VersionSet* vset, port::Mutex* mu,
Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs,
autovector<MemTable*>* to_delete,
Directory* db_directory);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().
......
......@@ -127,7 +127,7 @@ class Repairer {
return status;
}
if (filenames.empty()) {
return Status::IOError(dbname_, "repair found no files");
return Status::Corruption(dbname_, "repair found no files");
}
uint64_t number;
......
......@@ -96,7 +96,7 @@ public:
void SetupForCompaction() override;
TableProperties& GetTableProperties() override;
std::shared_ptr<const TableProperties> GetTableProperties() const override;
~SimpleTableReader();
......@@ -172,7 +172,7 @@ struct SimpleTableReader::Rep {
unique_ptr<RandomAccessFile> file;
uint64_t index_start_offset;
int num_entries;
TableProperties table_properties;
std::shared_ptr<TableProperties> table_properties;
const static int user_key_size = 16;
const static int offset_length = 8;
......@@ -215,7 +215,8 @@ Status SimpleTableReader::Open(const Options& options,
void SimpleTableReader::SetupForCompaction() {
}
TableProperties& SimpleTableReader::GetTableProperties() {
std::shared_ptr<const TableProperties> SimpleTableReader::GetTableProperties()
const {
return rep_->table_properties;
}
......
......@@ -33,6 +33,7 @@
#pragma once
#include <assert.h>
#include <stdlib.h>
#include "util/arena.h"
#include "port/port.h"
#include "util/arena.h"
#include "util/random.h"
......
......@@ -116,7 +116,7 @@ class RegularKeysStartWithA: public TablePropertiesCollector {
}
virtual UserCollectedProperties GetReadableProperties() const {
return {};
return UserCollectedProperties{};
}
......@@ -157,7 +157,7 @@ void TestCustomizedTablePropertiesCollector(
// -- Step 2: Read properties
FakeRandomeAccessFile readable(writable->contents());
TableProperties props;
TableProperties* props;
Status s = ReadTableProperties(
&readable,
writable->contents().size(),
......@@ -166,9 +166,10 @@ void TestCustomizedTablePropertiesCollector(
nullptr,
&props
);
std::unique_ptr<TableProperties> props_guard(props);
ASSERT_OK(s);
auto user_collected = props.user_collected_properties;
auto user_collected = props->user_collected_properties;
ASSERT_EQ("Rocksdb", user_collected.at("TablePropertiesTest"));
......@@ -256,7 +257,7 @@ void TestInternalKeyPropertiesCollector(
ASSERT_OK(builder->Finish());
FakeRandomeAccessFile readable(writable->contents());
TableProperties props;
TableProperties* props;
Status s = ReadTableProperties(
&readable,
writable->contents().size(),
......@@ -267,7 +268,8 @@ void TestInternalKeyPropertiesCollector(
);
ASSERT_OK(s);
auto user_collected = props.user_collected_properties;
std::unique_ptr<TableProperties> props_guard(props);
auto user_collected = props->user_collected_properties;
uint64_t deleted = GetDeletedKeys(user_collected);
ASSERT_EQ(4u, deleted);
......
......@@ -46,9 +46,6 @@ Status TransactionLogIteratorImpl::OpenLogFile(
// Try the archive dir, as it could have moved in the meanwhile.
fname = ArchivedLogFileName(dir_, logFile->LogNumber());
status = env->NewSequentialFile(fname, file, soptions_);
if (!status.ok()) {
return Status::IOError("Requested file not present in the dir");
}
}
return status;
}
......@@ -187,7 +184,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
if (currentLastSeq_ == dbimpl_->GetLatestSequenceNumber()) {
currentStatus_ = Status::OK();
} else {
currentStatus_ = Status::IOError("NO MORE DATA LEFT");
currentStatus_ = Status::Corruption("NO MORE DATA LEFT");
}
return;
}
......
......@@ -1855,8 +1855,18 @@ Status VersionSet::Recover(
if (s.ok()) {
for (auto cfd : *column_family_set_) {
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto builder = builders_iter->second;
if (options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder->LoadTableHandlers();
}
Version* v = new Version(cfd, this, current_version_number_++);
builders[cfd->GetID()]->SaveTo(v);
builder->SaveTo(v);
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
......
......@@ -46,6 +46,7 @@ class VersionSet;
class MergeContext;
struct ColumnFamilyData;
class ColumnFamilySet;
class LookupKey;
// Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file.
......
......@@ -238,8 +238,10 @@ extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*);
enum {
rocksdb_no_compression = 0,
rocksdb_snappy_compression = 1,
rocksdb_zlib_compression = 1,
rocksdb_bz2_compression = 1
rocksdb_zlib_compression = 2,
rocksdb_bz2_compression = 3,
rocksdb_lz4_compression = 4,
rocksdb_lz4hc_compression = 5
};
extern void rocksdb_options_set_compression(rocksdb_options_t*, int);
......
......@@ -25,6 +25,9 @@ class CompactionFilter {
struct Context {
// Does this compaction run include all data files
bool is_full_compaction;
// Is this compaction requested by the client (true),
// or is it occurring as an automatic compaction process
bool is_manual_compaction;
};
virtual ~CompactionFilter() {}
......
......@@ -52,6 +52,7 @@ struct ReadOptions;
struct WriteOptions;
struct FlushOptions;
class WriteBatch;
class Env;
// Metadata associated with each SST file.
struct LiveFileMetaData {
......@@ -379,7 +380,7 @@ class DB {
// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup
// THIS METHOD IS DEPRECATED. Use the GetTableMetaData to get more
// THIS METHOD IS DEPRECATED. Use the GetLiveFilesMetaData to get more
// detailed information on the live files.
// Retrieve the list of all files in the database. The files are
// relative to the dbname and are not absolute paths. The valid size of the
......
......@@ -36,10 +36,12 @@
#pragma once
#include <memory>
#include <stdint.h>
namespace rocksdb {
class Arena;
class LookupKey;
class Slice;
class SliceTransform;
......@@ -73,6 +75,20 @@ class MemTableRep {
// nothing.
virtual void MarkReadOnly() { }
// Look up key from the mem table, since the first key in the mem table whose
// user_key matches the one given k, call the function callback_func(), with
// callback_args directly forwarded as the first parameter, and the mem table
// key as the second parameter. If the return value is false, then terminates.
// Otherwise, go through the next key.
// It's safe for Get() to terminate after having finished all the potential
// key for the k.user_key(), or not.
//
// Default:
// Get() function with a default value of dynamically construct an iterator,
// seek and call the call back function.
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry));
// Report an approximation of how much memory has been used other than memory
// that was allocated through the arena.
virtual size_t ApproximateMemoryUsage() = 0;
......
......@@ -45,10 +45,8 @@ using std::shared_ptr;
enum CompressionType : char {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
kSnappyCompression = 0x1,
kZlibCompression = 0x2,
kBZip2Compression = 0x3
kNoCompression = 0x0, kSnappyCompression = 0x1, kZlibCompression = 0x2,
kBZip2Compression = 0x3, kLZ4Compression = 0x4, kLZ4HCCompression = 0x5
};
enum CompactionStyle : char {
......
......@@ -61,6 +61,10 @@ class Status {
static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIncomplete, msg, msg2);
}
static Status ShutdownInProgress(const Slice& msg,
const Slice& msg2 = Slice()) {
return Status(kShutdownInProgress, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
......@@ -86,6 +90,9 @@ class Status {
// Returns true iff the status indicates Incomplete
bool IsIncomplete() const { return code() == kIncomplete; }
// Returns true iff the status indicates Incomplete
bool IsShutdownInProgress() const { return code() == kShutdownInProgress; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
......@@ -99,7 +106,8 @@ class Status {
kInvalidArgument = 4,
kIOError = 5,
kMergeInProgress = 6,
kIncomplete = 7
kIncomplete = 7,
kShutdownInProgress = 8
};
// A nullptr state_ (which is always the case for OK) means the message
......
......@@ -4,7 +4,7 @@
#pragma once
#include <string>
#include <unordered_map>
#include <map>
#include "rocksdb/status.h"
namespace rocksdb {
......@@ -14,7 +14,16 @@ namespace rocksdb {
// collected properties.
// The value of the user-collected properties are encoded as raw bytes --
// users have to interprete these values by themselves.
typedef std::unordered_map<std::string, std::string> UserCollectedProperties;
// Note: To do prefix seek/scan in `UserCollectedProperties`, you can do
// something similar to:
//
// UserCollectedProperties props = ...;
// for (auto pos = props.lower_bound(prefix);
// pos != props.end() && pos->first.compare(0, prefix.size(), prefix) == 0;
// ++pos) {
// ...
// }
typedef std::map<std::string, std::string> UserCollectedProperties;
// TableProperties contains a bunch of read-only properties of its associated
// table.
......
......@@ -46,6 +46,11 @@
#include <bzlib.h>
#endif
#if defined(LZ4)
#include <lz4.h>
#include <lz4hc.h>
#endif
#include <stdint.h>
#include <string>
#include <string.h>
......@@ -353,8 +358,8 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input,
return false;
}
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef BZIP2
bz_stream _stream;
memset(&_stream, 0, sizeof(bz_stream));
......@@ -409,7 +414,64 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
return nullptr;
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
inline bool LZ4_Compress(const CompressionOptions &opts, const char *input,
size_t length, ::std::string* output) {
#ifdef LZ4
int compressBound = LZ4_compressBound(length);
output->resize(8 + compressBound);
char *p = const_cast<char *>(output->c_str());
memcpy(p, &length, sizeof(length));
size_t outlen;
outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound);
if (outlen == 0) {
return false;
}
output->resize(8 + outlen);
return true;
#endif
return false;
}
inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef LZ4
if (input_length < 8) {
return nullptr;
}
int output_len;
memcpy(&output_len, input_data, sizeof(output_len));
char *output = new char[output_len];
*decompress_size = LZ4_decompress_safe_partial(
input_data + 8, output, input_length - 8, output_len, output_len);
if (*decompress_size < 0) {
delete[] output;
return nullptr;
}
return output;
#endif
return nullptr;
}
inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,
size_t length, ::std::string* output) {
#ifdef LZ4
int compressBound = LZ4_compressBound(length);
output->resize(8 + compressBound);
char *p = const_cast<char *>(output->c_str());
memcpy(p, &length, sizeof(length));
size_t outlen;
outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound,
opts.level);
if (outlen == 0) {
return false;
}
output->resize(8 + outlen);
return true;
#endif
return false;
}
inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) {
return false;
}
......
......@@ -233,6 +233,30 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
type = kNoCompression;
}
break;
case kLZ4Compression:
if (port::LZ4_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// LZ4 not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kLZ4HCCompression:
if (port::LZ4HC_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// LZ4 not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
......
......@@ -62,7 +62,7 @@ struct BlockBasedTable::Rep {
unique_ptr<Block> index_block;
unique_ptr<FilterBlockReader> filter;
TableProperties table_properties;
std::shared_ptr<const TableProperties> table_properties;
};
BlockBasedTable::~BlockBasedTable() {
......@@ -255,9 +255,10 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions,
meta_iter->Seek(kPropertiesBlock);
if (meta_iter->Valid() && meta_iter->key() == kPropertiesBlock) {
s = meta_iter->status();
TableProperties* table_properties = nullptr;
if (s.ok()) {
s = ReadProperties(meta_iter->value(), rep->file.get(), rep->options.env,
rep->options.info_log.get(), &rep->table_properties);
rep->options.info_log.get(), &table_properties);
}
if (!s.ok()) {
......@@ -265,6 +266,8 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions,
"[Warning] Encountered error while reading data from properties "
"block " + s.ToString();
Log(rep->options.info_log, "%s", err_msg.c_str());
} else {
rep->table_properties.reset(table_properties);
}
}
......@@ -339,7 +342,8 @@ void BlockBasedTable::SetupForCompaction() {
compaction_optimized_ = true;
}
const TableProperties& BlockBasedTable::GetTableProperties() {
std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
const {
return rep_->table_properties;
}
......
......@@ -86,7 +86,7 @@ class BlockBasedTable : public TableReader {
// posix_fadvise
void SetupForCompaction() override;
const TableProperties& GetTableProperties() override;
std::shared_ptr<const TableProperties> GetTableProperties() const override;
~BlockBasedTable();
......
......@@ -10,6 +10,7 @@
#include "table/format.h"
#include <string>
#include <inttypes.h>
#include "port/port.h"
#include "rocksdb/env.h"
......@@ -64,7 +65,8 @@ Status Footer::DecodeFrom(Slice* input) {
if (magic != table_magic_number()) {
char buffer[80];
snprintf(buffer, sizeof(buffer) - 1,
"not an sstable (bad magic number --- %lx)", magic);
"not an sstable (bad magic number --- %lx)",
(long)magic);
return Status::InvalidArgument(buffer);
}
} else {
......@@ -228,6 +230,28 @@ Status UncompressBlockContents(const char* data, size_t n,
result->heap_allocated = true;
result->cachable = true;
break;
case kLZ4Compression:
ubuf = port::LZ4_Uncompress(data, n, &decompress_size);
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
if (!ubuf) {
return Status::Corruption(lz4_corrupt_msg);
}
result->data = Slice(ubuf, decompress_size);
result->heap_allocated = true;
result->cachable = true;
break;
case kLZ4HCCompression:
ubuf = port::LZ4_Uncompress(data, n, &decompress_size);
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
if (!ubuf) {
return Status::Corruption(lz4hc_corrupt_msg);
}
result->data = Slice(ubuf, decompress_size);
result->heap_allocated = true;
result->cachable = true;
break;
default:
return Status::Corruption("bad block type");
}
......
......@@ -109,7 +109,7 @@ class Footer {
kEncodedLength = 2 * BlockHandle::kMaxEncodedLength + 8
};
const uint64_t kInvalidTableMagicNumber = 0;
static const uint64_t kInvalidTableMagicNumber = 0;
private:
// Set the table_magic_number only when it was not previously
......
......@@ -133,12 +133,9 @@ bool NotifyCollectTableCollectorsOnFinish(
return all_succeeded;
}
Status ReadProperties(
const Slice& handle_value,
RandomAccessFile* file,
Env* env,
Logger* logger,
TableProperties* table_properties) {
Status ReadProperties(const Slice& handle_value, RandomAccessFile* file,
Env* env, Logger* logger,
TableProperties** table_properties) {
assert(table_properties);
Slice v = handle_value;
......@@ -161,18 +158,22 @@ Status ReadProperties(
std::unique_ptr<Iterator> iter(
properties_block.NewIterator(BytewiseComparator()));
auto new_table_properties = new TableProperties();
// All pre-defined properties of type uint64_t
std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
{TablePropertiesNames::kDataSize, &table_properties->data_size},
{TablePropertiesNames::kIndexSize, &table_properties->index_size},
{TablePropertiesNames::kFilterSize, &table_properties->filter_size},
{TablePropertiesNames::kRawKeySize, &table_properties->raw_key_size},
{TablePropertiesNames::kRawValueSize, &table_properties->raw_value_size},
{TablePropertiesNames::kDataSize, &new_table_properties->data_size},
{TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
{TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
{TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size},
{TablePropertiesNames::kRawValueSize,
&new_table_properties->raw_value_size},
{TablePropertiesNames::kNumDataBlocks,
&table_properties->num_data_blocks},
{TablePropertiesNames::kNumEntries, &table_properties->num_entries},
{TablePropertiesNames::kFormatVersion, &table_properties->format_version},
{TablePropertiesNames::kFixedKeyLen, &table_properties->fixed_key_len}};
&new_table_properties->num_data_blocks},
{TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
{TablePropertiesNames::kFormatVersion,
&new_table_properties->format_version},
{TablePropertiesNames::kFixedKeyLen,
&new_table_properties->fixed_key_len}, };
std::string last_key;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
......@@ -203,24 +204,25 @@ Status ReadProperties(
}
*(pos->second) = val;
} else if (key == TablePropertiesNames::kFilterPolicy) {
table_properties->filter_policy_name = raw_val.ToString();
new_table_properties->filter_policy_name = raw_val.ToString();
} else {
// handle user-collected properties
table_properties->user_collected_properties.insert(
new_table_properties->user_collected_properties.insert(
{key, raw_val.ToString()});
}
}
if (s.ok()) {
*table_properties = new_table_properties;
} else {
delete new_table_properties;
}
return s;
}
Status ReadTableProperties(
RandomAccessFile* file,
uint64_t file_size,
uint64_t table_magic_number,
Env* env,
Logger* info_log,
TableProperties* properties) {
Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
Logger* info_log, TableProperties** properties) {
// -- Read metaindex block
Footer footer(table_magic_number);
auto s = ReadFooterFromFile(file, file_size, &footer);
......
......@@ -103,21 +103,20 @@ bool NotifyCollectTableCollectorsOnFinish(
PropertyBlockBuilder* builder);
// Read the properties from the table.
Status ReadProperties(
const Slice& handle_value,
RandomAccessFile* file,
Env* env,
Logger* logger,
TableProperties* table_properties);
// @returns a status to indicate if the operation succeeded. On success,
// *table_properties will point to a heap-allocated TableProperties
// object, otherwise value of `table_properties` will not be modified.
Status ReadProperties(const Slice& handle_value, RandomAccessFile* file,
Env* env, Logger* logger,
TableProperties** table_properties);
// Directly read the properties from the properties block of a plain table.
Status ReadTableProperties(
RandomAccessFile* file,
uint64_t file_size,
uint64_t table_magic_number,
Env* env,
Logger* info_log,
TableProperties* properties);
// @returns a status to indicate if the operation succeeded. On success,
// *table_properties will point to a heap-allocated TableProperties
// object, otherwise value of `table_properties` will not be modified.
Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
Logger* info_log, TableProperties** properties);
// Read the magic number of the specified file directly. The magic number
// of a valid sst table the last 8-byte of the file.
......
......@@ -87,15 +87,15 @@ PlainTableReader::PlainTableReader(const EnvOptions& storage_options,
const InternalKeyComparator& icomparator,
uint64_t file_size, int bloom_bits_per_key,
double hash_table_ratio,
const TableProperties& table_properties)
const TableProperties* table_properties)
: soptions_(storage_options),
internal_comparator_(icomparator),
file_size_(file_size),
kHashTableRatio(hash_table_ratio),
kBloomBitsPerKey(bloom_bits_per_key),
table_properties_(table_properties),
data_end_offset_(table_properties_.data_size),
user_key_len_(table_properties.fixed_key_len) {}
data_end_offset_(table_properties_->data_size),
user_key_len_(table_properties->fixed_key_len) {}
PlainTableReader::~PlainTableReader() {
delete[] hash_table_;
......@@ -117,17 +117,16 @@ Status PlainTableReader::Open(const Options& options,
return Status::NotSupported("File is too large for PlainTableReader!");
}
TableProperties table_properties;
TableProperties* props = nullptr;
auto s = ReadTableProperties(file.get(), file_size, kPlainTableMagicNumber,
options.env, options.info_log.get(),
&table_properties);
options.env, options.info_log.get(), &props);
if (!s.ok()) {
return s;
}
std::unique_ptr<PlainTableReader> new_reader(new PlainTableReader(
soptions, internal_comparator, file_size, bloom_bits_per_key,
hash_table_ratio, table_properties));
std::unique_ptr<PlainTableReader> new_reader(
new PlainTableReader(soptions, internal_comparator, file_size,
bloom_bits_per_key, hash_table_ratio, props));
new_reader->file_ = std::move(file);
new_reader->options_ = options;
......
......@@ -64,13 +64,15 @@ class PlainTableReader: public TableReader {
void SetupForCompaction();
const TableProperties& GetTableProperties() { return table_properties_; }
std::shared_ptr<const TableProperties> GetTableProperties() const {
return table_properties_;
}
PlainTableReader(const EnvOptions& storage_options,
const InternalKeyComparator& internal_comparator,
uint64_t file_size, int bloom_num_bits,
double hash_table_ratio,
const TableProperties& table_properties);
const TableProperties* table_properties);
~PlainTableReader();
private:
......@@ -95,7 +97,7 @@ class PlainTableReader: public TableReader {
const int kBloomBitsPerKey;
DynamicBloom* bloom_ = nullptr;
TableProperties table_properties_;
std::shared_ptr<const TableProperties> table_properties_;
const uint32_t data_start_offset_ = 0;
const uint32_t data_end_offset_;
const size_t user_key_len_;
......
......@@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <memory>
namespace rocksdb {
......@@ -47,7 +48,7 @@ class TableReader {
// posix_fadvise
virtual void SetupForCompaction() = 0;
virtual const TableProperties& GetTableProperties() = 0;
virtual std::shared_ptr<const TableProperties> GetTableProperties() const = 0;
// Calls (*result_handler)(handle_context, ...) repeatedly, starting with
// the entry found after a call to Seek(key), until result_handler returns
......
......@@ -488,30 +488,62 @@ class DBConstructor: public Constructor {
};
static bool SnappyCompressionSupported() {
#ifdef SNAPPY
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Snappy_Compress(Options().compression_opts,
in.data(), in.size(),
&out);
#else
return false;
#endif
}
static bool ZlibCompressionSupported() {
#ifdef ZLIB
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::Zlib_Compress(Options().compression_opts,
in.data(), in.size(),
&out);
#else
return false;
#endif
}
#ifdef BZIP2
static bool BZip2CompressionSupported() {
#ifdef BZIP2
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::BZip2_Compress(Options().compression_opts,
in.data(), in.size(),
&out);
#else
return false;
#endif
}
static bool LZ4CompressionSupported() {
#ifdef LZ4
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4_Compress(Options().compression_opts, in.data(), in.size(),
&out);
#else
return false;
#endif
}
static bool LZ4HCCompressionSupported() {
#ifdef LZ4
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
return port::LZ4HC_Compress(Options().compression_opts, in.data(), in.size(),
&out);
#else
return false;
#endif
}
enum TestType {
BLOCK_BASED_TABLE_TEST,
......@@ -539,24 +571,23 @@ static std::vector<TestArgs> GenerateArgList() {
std::vector<int> restart_intervals = {16, 1, 1024};
// Only add compression if it is supported
std::vector<CompressionType> compression_types = {kNoCompression};
#ifdef SNAPPY
std::vector<CompressionType> compression_types;
compression_types.push_back(kNoCompression);
if (SnappyCompressionSupported()) {
compression_types.push_back(kSnappyCompression);
}
#endif
#ifdef ZLIB
if (ZlibCompressionSupported()) {
compression_types.push_back(kZlibCompression);
}
#endif
#ifdef BZIP2
if (BZip2CompressionSupported()) {
compression_types.push_back(kBZip2Compression);
}
#endif
if (LZ4CompressionSupported()) {
compression_types.push_back(kLZ4Compression);
}
if (LZ4HCCompressionSupported()) {
compression_types.push_back(kLZ4HCCompression);
}
for (auto test_type : test_types) {
for (auto reverse_compare : reverse_compare_types) {
......@@ -908,6 +939,44 @@ class TableTest {
class GeneralTableTest : public TableTest {};
class BlockBasedTableTest : public TableTest {};
class PlainTableTest : public TableTest {};
class TablePropertyTest {};
// This test serves as the living tutorial for the prefix scan of user collected
// properties.
TEST(TablePropertyTest, PrefixScanTest) {
UserCollectedProperties props{{"num.111.1", "1"},
{"num.111.2", "2"},
{"num.111.3", "3"},
{"num.333.1", "1"},
{"num.333.2", "2"},
{"num.333.3", "3"},
{"num.555.1", "1"},
{"num.555.2", "2"},
{"num.555.3", "3"}, };
// prefixes that exist
for (const std::string& prefix : {"num.111", "num.333", "num.555"}) {
int num = 0;
for (auto pos = props.lower_bound(prefix);
pos != props.end() &&
pos->first.compare(0, prefix.size(), prefix) == 0;
++pos) {
++num;
auto key = prefix + "." + std::to_string(num);
ASSERT_EQ(key, pos->first);
ASSERT_EQ(std::to_string(num), pos->second);
}
ASSERT_EQ(3, num);
}
// prefixes that don't exist
for (const std::string& prefix :
{"num.000", "num.222", "num.444", "num.666"}) {
auto pos = props.lower_bound(prefix);
ASSERT_TRUE(pos == props.end() ||
pos->first.compare(0, prefix.size(), prefix) != 0);
}
}
// This test include all the basic checks except those for index size and block
// size, which will be conducted in separated unit tests.
......@@ -933,7 +1002,7 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) {
c.Finish(options, GetPlainInternalComparator(options.comparator), &keys,
&kvmap);
auto& props = c.table_reader()->GetTableProperties();
auto& props = *c.table_reader()->GetTableProperties();
ASSERT_EQ(kvmap.size(), props.num_entries);
auto raw_key_size = kvmap.size() * 2ul;
......@@ -964,7 +1033,7 @@ TEST(BlockBasedTableTest, FilterPolicyNameProperties) {
c.Finish(options, GetPlainInternalComparator(options.comparator), &keys,
&kvmap);
auto& props = c.table_reader()->GetTableProperties();
auto& props = *c.table_reader()->GetTableProperties();
ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name);
}
......@@ -1006,8 +1075,7 @@ TEST(BlockBasedTableTest, IndexSizeStat) {
c.Finish(options, GetPlainInternalComparator(options.comparator), &ks,
&kvmap);
auto index_size =
c.table_reader()->GetTableProperties().index_size;
auto index_size = c.table_reader()->GetTableProperties()->index_size;
ASSERT_GT(index_size, last_index_size);
last_index_size = index_size;
}
......@@ -1032,7 +1100,7 @@ TEST(BlockBasedTableTest, NumBlockStat) {
c.Finish(options, GetPlainInternalComparator(options.comparator), &ks,
&kvmap);
ASSERT_EQ(kvmap.size(),
c.table_reader()->GetTableProperties().num_data_blocks);
c.table_reader()->GetTableProperties()->num_data_blocks);
}
class BlockCacheProperties {
......@@ -1238,18 +1306,19 @@ TEST(PlainTableTest, BasicPlainTableProperties) {
StringSource source(sink.contents(), 72242, true);
TableProperties props;
TableProperties* props = nullptr;
auto s = ReadTableProperties(&source, sink.contents().size(),
kPlainTableMagicNumber, Env::Default(), nullptr,
&props);
std::unique_ptr<TableProperties> props_guard(props);
ASSERT_OK(s);
ASSERT_EQ(0ul, props.index_size);
ASSERT_EQ(0ul, props.filter_size);
ASSERT_EQ(16ul * 26, props.raw_key_size);
ASSERT_EQ(28ul * 26, props.raw_value_size);
ASSERT_EQ(26ul, props.num_entries);
ASSERT_EQ(1ul, props.num_data_blocks);
ASSERT_EQ(0ul, props->index_size);
ASSERT_EQ(0ul, props->filter_size);
ASSERT_EQ(16ul * 26, props->raw_key_size);
ASSERT_EQ(28ul * 26, props->raw_value_size);
ASSERT_EQ(26ul, props->num_entries);
ASSERT_EQ(1ul, props->num_data_blocks);
}
TEST(GeneralTableTest, ApproximateOffsetOfPlain) {
......@@ -1307,24 +1376,42 @@ static void DoCompressionTest(CompressionType comp) {
}
TEST(GeneralTableTest, ApproximateOffsetOfCompressed) {
CompressionType compression_state[2];
int valid = 0;
std::vector<CompressionType> compression_state;
if (!SnappyCompressionSupported()) {
fprintf(stderr, "skipping snappy compression tests\n");
} else {
compression_state[valid] = kSnappyCompression;
valid++;
compression_state.push_back(kSnappyCompression);
}
if (!ZlibCompressionSupported()) {
fprintf(stderr, "skipping zlib compression tests\n");
} else {
compression_state[valid] = kZlibCompression;
valid++;
compression_state.push_back(kZlibCompression);
}
// TODO(kailiu) DoCompressionTest() doesn't work with BZip2.
/*
if (!BZip2CompressionSupported()) {
fprintf(stderr, "skipping bzip2 compression tests\n");
} else {
compression_state.push_back(kBZip2Compression);
}
*/
if (!LZ4CompressionSupported()) {
fprintf(stderr, "skipping lz4 compression tests\n");
} else {
compression_state.push_back(kLZ4Compression);
}
if (!LZ4HCCompressionSupported()) {
fprintf(stderr, "skipping lz4hc compression tests\n");
} else {
compression_state.push_back(kLZ4HCCompression);
}
for (int i = 0; i < valid; i++) {
DoCompressionTest(compression_state[i]);
for (auto state : compression_state) {
DoCompressionTest(state);
}
}
......
......@@ -273,6 +273,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
return rocksdb::kZlibCompression;
else if (!strcasecmp(ctype, "bzip2"))
return rocksdb::kBZip2Compression;
else if (!strcasecmp(ctype, "lz4"))
return rocksdb::kLZ4Compression;
else if (!strcasecmp(ctype, "lz4hc"))
return rocksdb::kLZ4HCCompression;
fprintf(stdout, "Cannot parse compression type '%s'\n", ctype);
return rocksdb::kSnappyCompression; //default value
......@@ -1328,7 +1332,12 @@ class StressTest {
case rocksdb::kBZip2Compression:
compression = "bzip2";
break;
}
case rocksdb::kLZ4Compression:
compression = "lz4";
case rocksdb::kLZ4HCCompression:
compression = "lz4hc";
break;
}
fprintf(stdout, "Compression : %s\n", compression);
......
......@@ -7,6 +7,7 @@
#include <map>
#include <string>
#include <vector>
#include <inttypes.h>
#include "db/dbformat.h"
#include "db/memtable.h"
......@@ -43,7 +44,8 @@ class SstFileReader {
bool has_to,
const std::string& to_key);
Status ReadTableProperties(TableProperties* table_properties);
Status ReadTableProperties(
std::shared_ptr<const TableProperties>* table_properties);
uint64_t GetReadNumber() { return read_num_; }
private:
......@@ -112,10 +114,11 @@ Status SstFileReader::NewTableReader(const std::string& file_path) {
Status SstFileReader::SetTableOptionsByMagicNumber(uint64_t table_magic_number,
RandomAccessFile* file,
uint64_t file_size) {
TableProperties table_properties;
TableProperties* table_properties;
Status s = rocksdb::ReadTableProperties(file, file_size, table_magic_number,
options_.env, options_.info_log.get(),
&table_properties);
std::unique_ptr<TableProperties> props_guard(table_properties);
if (!s.ok()) {
return s;
}
......@@ -126,13 +129,14 @@ Status SstFileReader::SetTableOptionsByMagicNumber(uint64_t table_magic_number,
} else if (table_magic_number == kPlainTableMagicNumber) {
options_.allow_mmap_reads = true;
options_.table_factory = std::make_shared<PlainTableFactory>(
table_properties.fixed_key_len, 2, 0.8);
table_properties->fixed_key_len, 2, 0.8);
options_.prefix_extractor = NewNoopTransform();
fprintf(stdout, "Sst file format: plain table\n");
} else {
char error_msg_buffer[80];
snprintf(error_msg_buffer, sizeof(error_msg_buffer) - 1,
"Unsupported table magic number --- %lx)", table_magic_number);
"Unsupported table magic number --- %lx",
(long)table_magic_number);
return Status::InvalidArgument(error_msg_buffer);
}
......@@ -192,7 +196,8 @@ Status SstFileReader::ReadSequential(bool print_kv,
return ret;
}
Status SstFileReader::ReadTableProperties(TableProperties* table_properties) {
Status SstFileReader::ReadTableProperties(
std::shared_ptr<const TableProperties>* table_properties) {
if (!table_reader_) {
return init_result_;
}
......@@ -335,18 +340,19 @@ int main(int argc, char** argv) {
}
}
if (show_properties) {
rocksdb::TableProperties table_properties;
std::shared_ptr<const rocksdb::TableProperties> table_properties;
st = reader.ReadTableProperties(&table_properties);
if (!st.ok()) {
fprintf(stderr, "%s: %s\n", filename.c_str(), st.ToString().c_str());
} else {
fprintf(stdout,
"Table Properties:\n"
"------------------------------\n"
" %s", table_properties.ToString("\n ", ": ").c_str());
"Table Properties:\n"
"------------------------------\n"
" %s",
table_properties->ToString("\n ", ": ").c_str());
fprintf(stdout, "# deleted keys: %zd\n",
rocksdb::GetDeletedKeys(
table_properties.user_collected_properties));
table_properties->user_collected_properties));
}
}
}
......
......@@ -161,7 +161,7 @@ Status BlobStore::Put(const Slice& value, Blob* blob) {
if (size_left > 0) {
Delete(*blob);
return Status::IOError("Tried to write more data than fits in the blob");
return Status::Corruption("Tried to write more data than fits in the blob");
}
return Status::OK();
......@@ -187,9 +187,13 @@ Status BlobStore::Get(const Blob& blob,
chunk.size * block_size_,
&result,
&value->at(offset));
if (!s.ok() || result.size() < chunk.size * block_size_) {
if (!s.ok()) {
value->clear();
return s;
}
if (result.size() < chunk.size * block_size_) {
value->clear();
return Status::IOError("Could not read in from file");
return Status::Corruption("Could not read in from file");
}
offset += chunk.size * block_size_;
}
......@@ -236,7 +240,7 @@ Status BlobStore::CreateNewBucket() {
MutexLock l(&buckets_mutex_);
if (buckets_size_ >= max_buckets_) {
return Status::IOError("Max size exceeded\n");
return Status::NotSupported("Max size exceeded\n");
}
int new_bucket_id = buckets_size_;
......
......@@ -64,6 +64,10 @@ class HashLinkListRep : public MemTableRep {
virtual size_t ApproximateMemoryUsage() override;
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override;
virtual ~HashLinkListRep();
virtual MemTableRep::Iterator* GetIterator() override;
......@@ -398,6 +402,19 @@ size_t HashLinkListRep::ApproximateMemoryUsage() {
return 0;
}
void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto transformed = transform_->Transform(k.user_key());
auto bucket = GetBucket(transformed);
if (bucket != nullptr) {
Iterator iter(this, bucket);
for (iter.Seek(k.internal_key(), nullptr);
iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) {
}
}
}
MemTableRep::Iterator* HashLinkListRep::GetIterator() {
auto list = new FullList(compare_, arena_);
for (size_t i = 0; i < bucket_size_; ++i) {
......
......@@ -31,6 +31,10 @@ class HashSkipListRep : public MemTableRep {
virtual size_t ApproximateMemoryUsage() override;
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override;
virtual ~HashSkipListRep();
virtual MemTableRep::Iterator* GetIterator() override;
......@@ -271,6 +275,19 @@ size_t HashSkipListRep::ApproximateMemoryUsage() {
return sizeof(buckets_);
}
void HashSkipListRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
auto transformed = transform_->Transform(k.user_key());
auto bucket = GetBucket(transformed);
if (bucket != nullptr) {
Bucket::Iterator iter(bucket);
for (iter.Seek(k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) {
}
}
}
MemTableRep::Iterator* HashSkipListRep::GetIterator() {
auto list = new Bucket(compare_, arena_);
for (size_t i = 0; i < bucket_size_; ++i) {
......
......@@ -245,6 +245,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
opt.compression = kZlibCompression;
} else if (comp == "bzip2") {
opt.compression = kBZip2Compression;
} else if (comp == "lz4") {
opt.compression = kLZ4Compression;
} else if (comp == "lz4hc") {
opt.compression = kLZ4HCCompression;
} else {
// Unknown compression.
exec_state_ = LDBCommandExecuteResult::FAILED(
......
......@@ -32,6 +32,17 @@ public:
return 0;
}
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override {
SkipListRep::Iterator iter(&skip_list_);
Slice dummy_slice;
for (iter.Seek(dummy_slice, k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key());
iter.Next()) {
}
}
virtual ~SkipListRep() override { }
// Iteration over the contents of a skip list
......
......@@ -60,7 +60,13 @@ std::string Status::ToString() const {
type = "IO error: ";
break;
case kMergeInProgress:
type = "Merge In Progress: ";
type = "Merge in progress: ";
break;
case kIncomplete:
type = "Result incomplete: ";
break;
case kShutdownInProgress:
type = "Shutdown in progress: ";
break;
default:
snprintf(tmp, sizeof(tmp), "Unknown code(%d): ",
......
......@@ -39,6 +39,10 @@ class VectorRep : public MemTableRep {
virtual size_t ApproximateMemoryUsage() override;
virtual void Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg,
const char* entry)) override;
virtual ~VectorRep() override { }
class Iterator : public MemTableRep::Iterator {
......@@ -233,6 +237,25 @@ void VectorRep::Iterator::SeekToLast() {
}
}
void VectorRep::Get(const LookupKey& k, void* callback_args,
bool (*callback_func)(void* arg, const char* entry)) {
rwlock_.ReadLock();
VectorRep* vector_rep;
std::shared_ptr<Bucket> bucket;
if (immutable_) {
vector_rep = this;
} else {
vector_rep = nullptr;
bucket.reset(new Bucket(*bucket_)); // make a copy
}
VectorRep::Iterator iter(vector_rep, immutable_ ? bucket_ : bucket, compare_);
rwlock_.Unlock();
for (iter.Seek(k.user_key(), k.memtable_key().data());
iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) {
}
}
MemTableRep::Iterator* VectorRep::GetIterator() {
ReadLock l(&rwlock_);
// Do not sort here. The sorting would be done the first time
......
......@@ -857,7 +857,6 @@ void BackupEngineImpl::BackupMeta::Delete() {
// <file1> <crc32(literal string)> <crc32_value>
// <file2> <crc32(literal string)> <crc32_value>
// ...
// TODO: maybe add checksum?
Status BackupEngineImpl::BackupMeta::LoadFromFile(
const std::string& backup_dir) {
assert(Empty());
......@@ -873,7 +872,7 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
s = backup_meta_file->Read(max_backup_meta_file_size_, &data, buf.get());
if (!s.ok() || data.size() == max_backup_meta_file_size_) {
return s.ok() ? Status::IOError("File size too big") : s;
return s.ok() ? Status::Corruption("File size too big") : s;
}
buf[data.size()] = 0;
......
......@@ -172,7 +172,7 @@ class TestEnv : public EnvWrapper {
const EnvOptions& options) {
written_files_.push_back(f);
if (limit_written_files_ <= 0) {
return Status::IOError("Sorry, can't do this");
return Status::NotSupported("Sorry, can't do this");
}
limit_written_files_--;
return EnvWrapper::NewWritableFile(f, r, options);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册