提交 9674c11d 编写于 作者: R Radheshyam Balasundaram

Integrating Cuckoo Hash SST Table format into RocksDB

Summary:
Contains the following changes:
- Implementation of cuckoo_table_factory
- Adding cuckoo table into AdaptiveTableFactory
- Adding cuckoo_table_db_test, similar to lines of plain_table_db_test
- Minor fixes to Reader: When a key is found in the table, return the key found instead of the search key.
- Minor fixes to Builder: Add table properties that are required by Version::UpdateTemporaryStats() during Get operation. Don't define curr_node as a reference variable as the memory locations may get reassigned during tree.push_back operation, leading to invalid memory access.

Test Plan:
cuckoo_table_reader_test --enable_perf
cuckoo_table_builder_test
cuckoo_table_db_test
make check all
make valgrind_check
make asan_check

Reviewers: sdong, igor, yhchiang, ljin

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D21219
上级 37c6740c
......@@ -117,9 +117,10 @@ TESTS = \
thread_local_test \
geodb_test \
rate_limiter_test \
cuckoo_table_builder_test \
options_test \
cuckoo_table_reader_test
cuckoo_table_builder_test \
cuckoo_table_reader_test \
cuckoo_table_db_test
TOOLS = \
sst_dump \
......@@ -430,6 +431,9 @@ cuckoo_table_builder_test: table/cuckoo_table_builder_test.o $(LIBOBJECTS) $(TES
cuckoo_table_reader_test: table/cuckoo_table_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(BENCHHARNESS)
$(CXX) table/cuckoo_table_reader_test.o $(LIBOBJECTS) $(TESTHARNESS) $(BENCHHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
cuckoo_table_db_test: db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/cuckoo_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
options_test: util/options_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/options_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
......
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/db_impl.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "table/meta_blocks.h"
#include "table/cuckoo_table_factory.h"
#include "table/cuckoo_table_reader.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
class CuckooTableDBTest {
private:
std::string dbname_;
Env* env_;
DB* db_;
public:
CuckooTableDBTest() : env_(Env::Default()) {
dbname_ = test::TmpDir() + "/cuckoo_table_db_test";
ASSERT_OK(DestroyDB(dbname_, Options()));
db_ = nullptr;
Reopen();
}
~CuckooTableDBTest() {
delete db_;
ASSERT_OK(DestroyDB(dbname_, Options()));
}
Options CurrentOptions() {
Options options;
options.table_factory.reset(NewCuckooTableFactory());
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true));
options.allow_mmap_reads = true;
options.create_if_missing = true;
options.max_mem_compaction_level = 0;
return options;
}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
// The following util methods are copied from plain_table_db_test.
void Reopen(Options* options = nullptr) {
delete db_;
db_ = nullptr;
Options opts;
if (options != nullptr) {
opts = *options;
} else {
opts = CurrentOptions();
opts.create_if_missing = true;
}
ASSERT_OK(DB::Open(opts, dbname_, &db_));
}
Status Put(const Slice& k, const Slice& v) {
return db_->Put(WriteOptions(), k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
std::string Get(const std::string& k) {
ReadOptions options;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
int NumTableFilesAtLevel(int level) {
std::string property;
ASSERT_TRUE(
db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
}
// Return spread of files per level
std::string FilesPerLevel() {
std::string result;
int last_non_zero_offset = 0;
for (int level = 0; level < db_->NumberLevels(); level++) {
int f = NumTableFilesAtLevel(level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
};
TEST(CuckooTableDBTest, Flush) {
// Try with empty DB first.
ASSERT_TRUE(dbfull() != nullptr);
ASSERT_EQ("NOT_FOUND", Get("key2"));
// Add some values to db.
Options options = CurrentOptions();
Reopen(&options);
ASSERT_OK(Put("key1", "v1"));
ASSERT_OK(Put("key2", "v2"));
ASSERT_OK(Put("key3", "v3"));
dbfull()->TEST_FlushMemTable();
TablePropertiesCollection ptc;
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(1U, ptc.size());
ASSERT_EQ(3, ptc.begin()->second->num_entries);
ASSERT_EQ("1", FilesPerLevel());
ASSERT_EQ("v1", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("NOT_FOUND", Get("key4"));
ASSERT_EQ("Invalid argument: Length of key is invalid.", Get("somelongkey"));
ASSERT_EQ("Invalid argument: Length of key is invalid.", Get("s"));
// Now add more keys and flush.
ASSERT_OK(Put("key4", "v4"));
ASSERT_OK(Put("key5", "v5"));
ASSERT_OK(Put("key6", "v6"));
dbfull()->TEST_FlushMemTable();
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(2U, ptc.size());
auto row = ptc.begin();
ASSERT_EQ(3, row->second->num_entries);
ASSERT_EQ(3, (++row)->second->num_entries);
ASSERT_EQ("2", FilesPerLevel());
ASSERT_EQ("v1", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("v4", Get("key4"));
ASSERT_EQ("v5", Get("key5"));
ASSERT_EQ("v6", Get("key6"));
ASSERT_OK(Delete("key6"));
ASSERT_OK(Delete("key5"));
ASSERT_OK(Delete("key4"));
dbfull()->TEST_FlushMemTable();
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(3U, ptc.size());
row = ptc.begin();
ASSERT_EQ(3, row->second->num_entries);
ASSERT_EQ(3, (++row)->second->num_entries);
ASSERT_EQ(3, (++row)->second->num_entries);
ASSERT_EQ("3", FilesPerLevel());
ASSERT_EQ("v1", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("NOT_FOUND", Get("key4"));
ASSERT_EQ("NOT_FOUND", Get("key5"));
ASSERT_EQ("NOT_FOUND", Get("key6"));
}
TEST(CuckooTableDBTest, FlushWithDuplicateKeys) {
Options options = CurrentOptions();
Reopen(&options);
ASSERT_OK(Put("key1", "v1"));
ASSERT_OK(Put("key2", "v2"));
ASSERT_OK(Put("key1", "v3")); // Duplicate
dbfull()->TEST_FlushMemTable();
TablePropertiesCollection ptc;
reinterpret_cast<DB*>(dbfull())->GetPropertiesOfAllTables(&ptc);
ASSERT_EQ(1U, ptc.size());
ASSERT_EQ(2, ptc.begin()->second->num_entries);
ASSERT_EQ("1", FilesPerLevel());
ASSERT_EQ("v3", Get("key1"));
ASSERT_EQ("v2", Get("key2"));
}
namespace {
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key_______%06d", i);
return std::string(buf);
}
}
TEST(CuckooTableDBTest, CompactionTrigger) {
Options options = CurrentOptions();
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
Reopen(&options);
// Write 11 values, each 10016 B
for (int idx = 0; idx < 11; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a' + idx)));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("1", FilesPerLevel());
// Generate one more file in level-0, and should trigger level-0 compaction
for (int idx = 11; idx < 22; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a' + idx)));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
ASSERT_EQ("0,2", FilesPerLevel());
for (int idx = 0; idx < 22; ++idx) {
ASSERT_EQ(std::string(10000, 'a' + idx), Get(Key(idx)));
}
}
TEST(CuckooTableDBTest, SameKeyInsertedInTwoDifferentFilesAndCompacted) {
// Insert same key twice so that they go to different SST files. Then wait for
// compaction and check if the latest value is stored and old value removed.
Options options = CurrentOptions();
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
Reopen(&options);
// Write 11 values, each 10016 B
for (int idx = 0; idx < 11; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a')));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ("1", FilesPerLevel());
// Generate one more file in level-0, and should trigger level-0 compaction
for (int idx = 0; idx < 11; ++idx) {
ASSERT_OK(Put(Key(idx), std::string(10000, 'a' + idx)));
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
ASSERT_EQ("0,1", FilesPerLevel());
for (int idx = 0; idx < 11; ++idx) {
ASSERT_EQ(std::string(10000, 'a' + idx), Get(Key(idx)));
}
}
TEST(CuckooTableDBTest, AdaptiveTable) {
Options options = CurrentOptions();
// Write some keys using cuckoo table.
options.table_factory.reset(NewCuckooTableFactory());
Reopen(&options);
ASSERT_OK(Put("key1", "v1"));
ASSERT_OK(Put("key2", "v2"));
ASSERT_OK(Put("key3", "v3"));
dbfull()->TEST_FlushMemTable();
// Write some keys using plain table.
options.create_if_missing = false;
options.table_factory.reset(NewPlainTableFactory());
Reopen(&options);
ASSERT_OK(Put("key4", "v4"));
ASSERT_OK(Put("key1", "v5"));
dbfull()->TEST_FlushMemTable();
// Write some keys using block based table.
std::shared_ptr<TableFactory> block_based_factory(
NewBlockBasedTableFactory());
options.table_factory.reset(NewAdaptiveTableFactory(block_based_factory));
Reopen(&options);
ASSERT_OK(Put("key5", "v6"));
ASSERT_OK(Put("key2", "v7"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v5", Get("key1"));
ASSERT_EQ("v7", Get("key2"));
ASSERT_EQ("v3", Get("key3"));
ASSERT_EQ("v4", Get("key4"));
ASSERT_EQ("v6", Get("key5"));
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
......@@ -192,6 +192,9 @@ struct CuckooTablePropertyNames {
static const std::string kIsLastLevel;
};
extern TableFactory* NewCuckooTableFactory(double hash_table_ratio = 0.9,
uint32_t max_search_depth = 100);
#endif // ROCKSDB_LITE
// A base class for table factories.
......@@ -263,7 +266,8 @@ class TableFactory {
extern TableFactory* NewAdaptiveTableFactory(
std::shared_ptr<TableFactory> table_factory_to_write = nullptr,
std::shared_ptr<TableFactory> block_based_table_factory = nullptr,
std::shared_ptr<TableFactory> plain_table_factory = nullptr);
std::shared_ptr<TableFactory> plain_table_factory = nullptr,
std::shared_ptr<TableFactory> cuckoo_table_factory = nullptr);
#endif // ROCKSDB_LITE
......
......@@ -12,10 +12,12 @@ namespace rocksdb {
AdaptiveTableFactory::AdaptiveTableFactory(
std::shared_ptr<TableFactory> table_factory_to_write,
std::shared_ptr<TableFactory> block_based_table_factory,
std::shared_ptr<TableFactory> plain_table_factory)
std::shared_ptr<TableFactory> plain_table_factory,
std::shared_ptr<TableFactory> cuckoo_table_factory)
: table_factory_to_write_(table_factory_to_write),
block_based_table_factory_(block_based_table_factory),
plain_table_factory_(plain_table_factory) {
plain_table_factory_(plain_table_factory),
cuckoo_table_factory_(cuckoo_table_factory) {
if (!table_factory_to_write_) {
table_factory_to_write_ = block_based_table_factory_;
}
......@@ -25,12 +27,16 @@ AdaptiveTableFactory::AdaptiveTableFactory(
if (!block_based_table_factory_) {
block_based_table_factory_.reset(NewBlockBasedTableFactory());
}
if (!cuckoo_table_factory_) {
cuckoo_table_factory_.reset(NewCuckooTableFactory());
}
}
extern const uint64_t kPlainTableMagicNumber;
extern const uint64_t kLegacyPlainTableMagicNumber;
extern const uint64_t kBlockBasedTableMagicNumber;
extern const uint64_t kLegacyBlockBasedTableMagicNumber;
extern const uint64_t kCuckooTableMagicNumber;
Status AdaptiveTableFactory::NewTableReader(
const Options& options, const EnvOptions& soptions,
......@@ -49,6 +55,9 @@ Status AdaptiveTableFactory::NewTableReader(
footer.table_magic_number() == kLegacyBlockBasedTableMagicNumber) {
return block_based_table_factory_->NewTableReader(
options, soptions, icomp, std::move(file), file_size, table);
} else if (footer.table_magic_number() == kCuckooTableMagicNumber) {
return cuckoo_table_factory_->NewTableReader(
options, soptions, icomp, std::move(file), file_size, table);
} else {
return Status::NotSupported("Unidentified table format");
}
......@@ -64,9 +73,10 @@ TableBuilder* AdaptiveTableFactory::NewTableBuilder(
extern TableFactory* NewAdaptiveTableFactory(
std::shared_ptr<TableFactory> table_factory_to_write,
std::shared_ptr<TableFactory> block_based_table_factory,
std::shared_ptr<TableFactory> plain_table_factory) {
return new AdaptiveTableFactory(
table_factory_to_write, block_based_table_factory, plain_table_factory);
std::shared_ptr<TableFactory> plain_table_factory,
std::shared_ptr<TableFactory> cuckoo_table_factory) {
return new AdaptiveTableFactory(table_factory_to_write,
block_based_table_factory, plain_table_factory, cuckoo_table_factory);
}
} // namespace rocksdb
......
......@@ -28,7 +28,8 @@ class AdaptiveTableFactory : public TableFactory {
explicit AdaptiveTableFactory(
std::shared_ptr<TableFactory> table_factory_to_write,
std::shared_ptr<TableFactory> block_based_table_factory,
std::shared_ptr<TableFactory> plain_table_factory);
std::shared_ptr<TableFactory> plain_table_factory,
std::shared_ptr<TableFactory> cuckoo_table_factory);
const char* Name() const override { return "AdaptiveTableFactory"; }
Status NewTableReader(const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
......@@ -44,6 +45,7 @@ class AdaptiveTableFactory : public TableFactory {
std::shared_ptr<TableFactory> table_factory_to_write_;
std::shared_ptr<TableFactory> block_based_table_factory_;
std::shared_ptr<TableFactory> plain_table_factory_;
std::shared_ptr<TableFactory> cuckoo_table_factory_;
};
} // namespace rocksdb
......
......@@ -213,8 +213,11 @@ Status CuckooTableBuilder::Finish() {
}
}
assert(num_added == NumEntries());
properties_.raw_key_size = num_added * properties_.fixed_key_len;
properties_.raw_value_size = num_added * value_length;
uint64_t offset = buckets.size() * bucket_size;
properties_.data_size = offset;
unused_bucket.resize(properties_.fixed_key_len);
properties_.user_collected_properties[
CuckooTablePropertyNames::kEmptyKey] = unused_bucket;
......@@ -330,7 +333,8 @@ bool CuckooTableBuilder::MakeSpaceForKey(
uint32_t curr_pos = 0;
while (!null_found && curr_pos < tree.size()) {
CuckooNode& curr_node = tree[curr_pos];
if (curr_node.depth >= max_search_depth_) {
uint32_t curr_depth = curr_node.depth;
if (curr_depth >= max_search_depth_) {
break;
}
CuckooBucket& curr_bucket = (*buckets)[curr_node.bucket_id];
......@@ -345,7 +349,7 @@ bool CuckooTableBuilder::MakeSpaceForKey(
}
(*buckets)[child_bucket_id].make_space_for_key_call_id =
make_space_for_key_call_id;
tree.push_back(CuckooNode(child_bucket_id, curr_node.depth + 1,
tree.push_back(CuckooNode(child_bucket_id, curr_depth + 1,
curr_pos));
if ((*buckets)[child_bucket_id].vector_idx == kMaxVectorIdx) {
null_found = true;
......
......@@ -51,6 +51,8 @@ class CuckooBuilderTest {
kCuckooTableMagicNumber, env_, nullptr, &props));
ASSERT_EQ(props->num_entries, keys.size());
ASSERT_EQ(props->fixed_key_len, keys.empty() ? 0 : keys[0].size());
ASSERT_EQ(props->data_size, keys.size()*expected_unused_bucket.size());
ASSERT_EQ(props->raw_key_size, keys.size()*props->fixed_key_len);
// Check unused bucket.
std::string unused_key = props->user_collected_properties[
......@@ -62,6 +64,7 @@ class CuckooBuilderTest {
*reinterpret_cast<const uint32_t*>(props->user_collected_properties[
CuckooTablePropertyNames::kValueLength].data());
ASSERT_EQ(values.empty() ? 0 : values[0].size(), value_len_found);
ASSERT_EQ(props->raw_value_size, values.size()*value_len_found);
const uint64_t max_buckets =
*reinterpret_cast<const uint64_t*>(props->user_collected_properties[
CuckooTablePropertyNames::kMaxNumBuckets].data());
......
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "table/cuckoo_table_factory.h"
#include "db/dbformat.h"
#include "table/cuckoo_table_builder.h"
#include "table/cuckoo_table_reader.h"
#include "util/murmurhash.h"
namespace rocksdb {
extern const uint32_t kMaxNumHashTable = 64;
extern uint64_t GetSliceMurmurHash(const Slice& s, uint32_t index,
uint64_t max_num_buckets) {
static constexpr uint32_t seeds[kMaxNumHashTable] = {
816922183, 506425713, 949485004, 22513986, 421427259, 500437285,
888981693, 847587269, 511007211, 722295391, 934013645, 566947683,
193618736, 428277388, 770956674, 819994962, 755946528, 40807421,
263144466, 241420041, 444294464, 731606396, 304158902, 563235655,
968740453, 336996831, 462831574, 407970157, 985877240, 637708754,
736932700, 205026023, 755371467, 729648411, 807744117, 46482135,
847092855, 620960699, 102476362, 314094354, 625838942, 550889395,
639071379, 834567510, 397667304, 151945969, 443634243, 196618243,
421986347, 407218337, 964502417, 327741231, 493359459, 452453139,
692216398, 108161624, 816246924, 234779764, 618949448, 496133787,
156374056, 316589799, 982915425, 553105889 };
return MurmurHash(s.data(), s.size(), seeds[index]) % max_num_buckets;
}
Status CuckooTableFactory::NewTableReader(const Options& options,
const EnvOptions& soptions, const InternalKeyComparator& icomp,
std::unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* table) const {
std::unique_ptr<CuckooTableReader> new_reader(new CuckooTableReader(options,
std::move(file), file_size, GetSliceMurmurHash));
Status s = new_reader->status();
if (s.ok()) {
*table = std::move(new_reader);
}
return s;
}
TableBuilder* CuckooTableFactory::NewTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type) const {
return new CuckooTableBuilder(file, hash_table_ratio_, kMaxNumHashTable,
max_search_depth_, GetSliceMurmurHash);
}
TableFactory* NewCuckooTableFactory(double hash_table_ratio,
uint32_t max_search_depth) {
return new CuckooTableFactory(hash_table_ratio, max_search_depth);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE
......@@ -6,27 +6,44 @@
#pragma once
#ifndef ROCKSDB_LITE
#include "util/murmurhash.h"
#include "rocksdb/table.h"
namespace rocksdb {
static const uint32_t kMaxNumHashTable = 64;
uint64_t GetSliceMurmurHash(const Slice& s, uint32_t index,
uint64_t max_num_buckets) {
static constexpr uint32_t seeds[kMaxNumHashTable] = {
816922183, 506425713, 949485004, 22513986, 421427259, 500437285,
888981693, 847587269, 511007211, 722295391, 934013645, 566947683,
193618736, 428277388, 770956674, 819994962, 755946528, 40807421,
263144466, 241420041, 444294464, 731606396, 304158902, 563235655,
968740453, 336996831, 462831574, 407970157, 985877240, 637708754,
736932700, 205026023, 755371467, 729648411, 807744117, 46482135,
847092855, 620960699, 102476362, 314094354, 625838942, 550889395,
639071379, 834567510, 397667304, 151945969, 443634243, 196618243,
421986347, 407218337, 964502417, 327741231, 493359459, 452453139,
692216398, 108161624, 816246924, 234779764, 618949448, 496133787,
156374056, 316589799, 982915425, 553105889 };
return MurmurHash(s.data(), s.size(), seeds[index]) % max_num_buckets;
}
extern uint64_t GetSliceMurmurHash(const Slice& s, uint32_t index,
uint64_t max_num_buckets);
// Cuckoo Table is designed for applications that require fast point lookups
// but not fast range scans.
//
// Some assumptions:
// - Key length and Value length are fixed.
// - Does not support Snapshot.
// - Does not support Merge operations.
// - Only supports Bytewise comparators.
class CuckooTableFactory : public TableFactory {
public:
CuckooTableFactory(double hash_table_ratio, uint32_t max_search_depth)
: hash_table_ratio_(hash_table_ratio),
max_search_depth_(max_search_depth) {}
~CuckooTableFactory() {}
const char* Name() const override { return "CuckooTable"; }
Status NewTableReader(
const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table) const override;
TableBuilder* NewTableBuilder(const Options& options,
const InternalKeyComparator& icomparator, WritableFile* file,
CompressionType compression_type) const override;
private:
const double hash_table_ratio_;
const uint32_t max_search_depth_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE
......@@ -89,6 +89,10 @@ Status CuckooTableReader::Get(
if (!ParseInternalKey(key, &ikey)) {
return Status::Corruption("Unable to parse key into inernal key.");
}
if ((is_last_level_ && key.size() != key_length_ + 8) ||
(!is_last_level_ && key.size() != key_length_)) {
return Status::InvalidArgument("Length of key is invalid.");
}
for (uint32_t hash_cnt = 0; hash_cnt < num_hash_fun_; ++hash_cnt) {
uint64_t hash_val = get_slice_hash_(ikey.user_key, hash_cnt, num_buckets_);
assert(hash_val < num_buckets_);
......@@ -101,7 +105,15 @@ Status CuckooTableReader::Get(
// per user key and we don't support sanpshot.
if (ikey.user_key.compare(Slice(bucket, ikey.user_key.size())) == 0) {
Slice value = Slice(&bucket[key_length_], value_length_);
result_handler(handle_context, ikey, value);
if (is_last_level_) {
ParsedInternalKey found_ikey(Slice(bucket, key_length_), 0, kTypeValue);
result_handler(handle_context, found_ikey, value);
} else {
Slice full_key(bucket, key_length_);
ParsedInternalKey found_ikey;
ParseInternalKey(full_key, &found_ikey);
result_handler(handle_context, found_ikey, value);
}
// We don't support merge operations. So, we return here.
return Status::OK();
}
......
......@@ -37,6 +37,7 @@ DEFINE_bool(enable_perf, false, "Run Benchmark Tests too.");
namespace rocksdb {
extern const uint64_t kCuckooTableMagicNumber;
extern const uint64_t kMaxNumHashTable;
namespace {
const uint32_t kNumHashFunc = 10;
......@@ -311,6 +312,22 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
ASSERT_EQ(0, v.call_count);
ASSERT_OK(reader.status());
// Test read with key of invalid length.
IterKey k;
k.SetInternalKey("very_long_key", 0, kTypeValue);
ASSERT_TRUE(reader.Get(
ReadOptions(), k.GetKey(), &v,
AssertValues, nullptr).IsInvalidArgument());
ASSERT_EQ(0, v.call_count);
ASSERT_OK(reader.status());
k.Clear();
k.SetInternalKey("s", 0, kTypeValue);
ASSERT_TRUE(reader.Get(
ReadOptions(), k.GetKey(), &v,
AssertValues, nullptr).IsInvalidArgument());
ASSERT_EQ(0, v.call_count);
ASSERT_OK(reader.status());
// Test read when key is unused key.
std::string unused_key =
reader.GetTableProperties()->user_collected_properties.at(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册