提交 d0623289 编写于 作者: I Islam AbdelRahman

Revert "Support SST files with Global sequence numbers"

This reverts commit ab01da54.
上级 5cd28833
...@@ -11,12 +11,10 @@ ...@@ -11,12 +11,10 @@
#include <inttypes.h> #include <inttypes.h>
#include "db/builder.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/sst_file_writer.h" #include "rocksdb/sst_file_writer.h"
#include "db/builder.h"
#include "table/sst_file_writer_collectors.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/file_util.h" #include "util/file_util.h"
...@@ -69,12 +67,7 @@ Status DBImpl::ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, ...@@ -69,12 +67,7 @@ Status DBImpl::ReadExternalSstFileInfo(ColumnFamilyHandle* column_family,
file_info->version = file_info->version =
DecodeFixed32(external_sst_file_version_iter->second.c_str()); DecodeFixed32(external_sst_file_version_iter->second.c_str());
if (file_info->version == 2) { if (file_info->version == 1) {
// version 2 imply that we have global sequence number
// TODO(tec): Implement version 2 ingestion
file_info->sequence_number = 0;
} else if (file_info->version == 1) {
// version 1 imply that all sequence numbers in table equal 0 // version 1 imply that all sequence numbers in table equal 0
file_info->sequence_number = 0; file_info->sequence_number = 0;
} else { } else {
...@@ -172,20 +165,12 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, ...@@ -172,20 +165,12 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
if (file_info_list[i].num_entries == 0) { if (file_info_list[i].num_entries == 0) {
return Status::InvalidArgument("File contain no entries"); return Status::InvalidArgument("File contain no entries");
} }
if (file_info_list[i].version != 1) {
if (file_info_list[i].version == 2) {
// version 2 imply that file have only Put Operations
// with global Sequence Number
// TODO(tec): Implement changing file global sequence number
} else if (file_info_list[i].version == 1) {
// version 1 imply that file have only Put Operations
// with Sequence Number = 0
} else {
// Unknown version !
return Status::InvalidArgument( return Status::InvalidArgument(
"Generated table version is not supported"); "Generated table version is not supported");
} }
// version 1 imply that file have only Put Operations with Sequence Number =
// 0
meta_list[i].smallest = meta_list[i].smallest =
InternalKey(file_info_list[i].smallest_key, InternalKey(file_info_list[i].smallest_key,
...@@ -279,7 +264,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, ...@@ -279,7 +264,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
for (size_t i = 0; i < num_files; i++) { for (size_t i = 0; i < num_files; i++) {
StopWatch sw(env_, nullptr, 0, &micro_list[i], false); StopWatch sw(env_, nullptr, 0, &micro_list[i], false);
InternalKey range_start(file_info_list[i].smallest_key, InternalKey range_start(file_info_list[i].smallest_key,
kMaxSequenceNumber, kValueTypeForSeek); kMaxSequenceNumber, kTypeValue);
iter->Seek(range_start.Encode()); iter->Seek(range_start.Encode());
status = iter->status(); status = iter->status();
......
...@@ -71,8 +71,6 @@ inline bool IsExtendedValueType(ValueType t) { ...@@ -71,8 +71,6 @@ inline bool IsExtendedValueType(ValueType t) {
static const SequenceNumber kMaxSequenceNumber = static const SequenceNumber kMaxSequenceNumber =
((0x1ull << 56) - 1); ((0x1ull << 56) - 1);
static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;
struct ParsedInternalKey { struct ParsedInternalKey {
Slice user_key; Slice user_key;
SequenceNumber sequence; SequenceNumber sequence;
...@@ -359,15 +357,6 @@ class IterKey { ...@@ -359,15 +357,6 @@ class IterKey {
return Slice(key_, key_n); return Slice(key_, key_n);
} }
// Copy the key into IterKey own buf_
void OwnKey() {
assert(IsKeyPinned() == true);
EnlargeBufferIfNeeded(key_size_);
memcpy(buf_, key_, key_size_);
key_ = buf_;
}
// Update the sequence number in the internal key. Guarantees not to // Update the sequence number in the internal key. Guarantees not to
// invalidate slices to the key (and the user key). // invalidate slices to the key (and the user key).
void UpdateInternalKey(uint64_t seq, ValueType t) { void UpdateInternalKey(uint64_t seq, ValueType t) {
......
...@@ -1091,7 +1091,7 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { ...@@ -1091,7 +1091,7 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
std::vector<std::thread> threads; std::vector<std::thread> threads;
while (range_id < 5000) { while (range_id < 5000) {
int range_start = range_id * 10; int range_start = (range_id * 20);
int range_end = range_start + 10; int range_end = range_start + 10;
file_keys.clear(); file_keys.clear();
...@@ -1114,18 +1114,6 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { ...@@ -1114,18 +1114,6 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) {
range_id++; range_id++;
} }
for (int rid = 0; rid < 5000; rid++) {
int range_start = rid * 10;
int range_end = range_start + 10;
ASSERT_EQ(Get(Key(range_start)), Key(range_start)) << rid;
ASSERT_EQ(Get(Key(range_end)), Key(range_end)) << rid;
for (int k = range_start + 1; k < range_end; k++) {
std::string v = Key(k) + ToString(rid);
ASSERT_EQ(Get(Key(k)), v) << rid;
}
}
} }
TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { TEST_F(ExternalSSTFileTest, PickedLevelDynamic) {
......
...@@ -13,6 +13,12 @@ namespace rocksdb { ...@@ -13,6 +13,12 @@ namespace rocksdb {
class Comparator; class Comparator;
// Table Properties that are specific to tables created by SstFileWriter.
struct ExternalSstFilePropertyNames {
// value of this property is a fixed int32 number.
static const std::string kVersion;
};
// ExternalSstFileInfo include information about sst files created // ExternalSstFileInfo include information about sst files created
// using SstFileWriter // using SstFileWriter
struct ExternalSstFileInfo { struct ExternalSstFileInfo {
...@@ -62,6 +68,8 @@ class SstFileWriter { ...@@ -62,6 +68,8 @@ class SstFileWriter {
Status Finish(ExternalSstFileInfo* file_info = nullptr); Status Finish(ExternalSstFileInfo* file_info = nullptr);
private: private:
class SstFileWriterPropertiesCollectorFactory;
class SstFileWriterPropertiesCollector;
struct Rep; struct Rep;
Rep* rep_; Rep* rep_;
}; };
......
...@@ -184,9 +184,6 @@ struct TableProperties { ...@@ -184,9 +184,6 @@ struct TableProperties {
UserCollectedProperties user_collected_properties; UserCollectedProperties user_collected_properties;
UserCollectedProperties readable_properties; UserCollectedProperties readable_properties;
// The offset of the value of each property in the file.
std::map<std::string, uint64_t> properties_offsets;
// convert this object to a human readable form // convert this object to a human readable form
// @prop_delim: delimiter for each property. // @prop_delim: delimiter for each property.
std::string ToString(const std::string& prop_delim = "; ", std::string ToString(const std::string& prop_delim = "; ",
......
...@@ -241,27 +241,6 @@ bool BlockIter::ParseNextKey() { ...@@ -241,27 +241,6 @@ bool BlockIter::ParseNextKey() {
key_.TrimAppend(shared, p, non_shared); key_.TrimAppend(shared, p, non_shared);
key_pinned_ = false; key_pinned_ = false;
} }
if (global_seqno_ != kDisableGlobalSequenceNumber) {
// If we are reading a file with a global sequence number we should
// expect that all encoded sequence numbers are zeros and all value
// types are kTypeValue
assert(GetInternalKeySeqno(key_.GetKey()) == 0);
assert(ExtractValueType(key_.GetKey()) == ValueType::kTypeValue);
if (key_pinned_) {
// TODO(tec): Investigate updating the seqno in the loaded block
// directly instead of doing a copy and update.
// We cannot use the key address in the block directly because
// we have a global_seqno_ that will overwrite the encoded one.
key_.OwnKey();
key_pinned_ = false;
}
key_.UpdateInternalKey(global_seqno_, ValueType::kTypeValue);
}
value_ = Slice(p + non_shared, value_length); value_ = Slice(p + non_shared, value_length);
while (restart_index_ + 1 < num_restarts_ && while (restart_index_ + 1 < num_restarts_ &&
GetRestartPoint(restart_index_ + 1) < current_) { GetRestartPoint(restart_index_ + 1) < current_) {
...@@ -393,12 +372,11 @@ uint32_t Block::NumRestarts() const { ...@@ -393,12 +372,11 @@ uint32_t Block::NumRestarts() const {
return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
} }
Block::Block(BlockContents&& contents, SequenceNumber _global_seqno, Block::Block(BlockContents&& contents, size_t read_amp_bytes_per_bit,
size_t read_amp_bytes_per_bit, Statistics* statistics) Statistics* statistics)
: contents_(std::move(contents)), : contents_(std::move(contents)),
data_(contents_.data.data()), data_(contents_.data.data()),
size_(contents_.data.size()), size_(contents_.data.size()) {
global_seqno_(_global_seqno) {
if (size_ < sizeof(uint32_t)) { if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker size_ = 0; // Error marker
} else { } else {
...@@ -440,11 +418,10 @@ InternalIterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter, ...@@ -440,11 +418,10 @@ InternalIterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter,
if (iter != nullptr) { if (iter != nullptr) {
iter->Initialize(cmp, data_, restart_offset_, num_restarts, iter->Initialize(cmp, data_, restart_offset_, num_restarts,
prefix_index_ptr, global_seqno_, read_amp_bitmap_.get()); prefix_index_ptr, read_amp_bitmap_.get());
} else { } else {
iter = new BlockIter(cmp, data_, restart_offset_, num_restarts, iter = new BlockIter(cmp, data_, restart_offset_, num_restarts,
prefix_index_ptr, global_seqno_, prefix_index_ptr, read_amp_bitmap_.get());
read_amp_bitmap_.get());
} }
if (read_amp_bitmap_) { if (read_amp_bitmap_) {
......
...@@ -147,8 +147,7 @@ class BlockReadAmpBitmap { ...@@ -147,8 +147,7 @@ class BlockReadAmpBitmap {
class Block { class Block {
public: public:
// Initialize the block with the specified contents. // Initialize the block with the specified contents.
explicit Block(BlockContents&& contents, SequenceNumber _global_seqno, explicit Block(BlockContents&& contents, size_t read_amp_bytes_per_bit = 0,
size_t read_amp_bytes_per_bit = 0,
Statistics* statistics = nullptr); Statistics* statistics = nullptr);
~Block() = default; ~Block() = default;
...@@ -191,8 +190,6 @@ class Block { ...@@ -191,8 +190,6 @@ class Block {
// Report an approximation of how much memory has been used. // Report an approximation of how much memory has been used.
size_t ApproximateMemoryUsage() const; size_t ApproximateMemoryUsage() const;
SequenceNumber global_seqno() const { return global_seqno_; }
private: private:
BlockContents contents_; BlockContents contents_;
const char* data_; // contents_.data.data() const char* data_; // contents_.data.data()
...@@ -200,9 +197,6 @@ class Block { ...@@ -200,9 +197,6 @@ class Block {
uint32_t restart_offset_; // Offset in data_ of restart array uint32_t restart_offset_; // Offset in data_ of restart array
std::unique_ptr<BlockPrefixIndex> prefix_index_; std::unique_ptr<BlockPrefixIndex> prefix_index_;
std::unique_ptr<BlockReadAmpBitmap> read_amp_bitmap_; std::unique_ptr<BlockReadAmpBitmap> read_amp_bitmap_;
// All keys in the block will have seqno = global_seqno_, regardless of
// the encoded value (kDisableGlobalSequenceNumber means disabled)
const SequenceNumber global_seqno_;
// No copying allowed // No copying allowed
Block(const Block&); Block(const Block&);
...@@ -221,21 +215,20 @@ class BlockIter : public InternalIterator { ...@@ -221,21 +215,20 @@ class BlockIter : public InternalIterator {
status_(Status::OK()), status_(Status::OK()),
prefix_index_(nullptr), prefix_index_(nullptr),
key_pinned_(false), key_pinned_(false),
global_seqno_(kDisableGlobalSequenceNumber),
read_amp_bitmap_(nullptr), read_amp_bitmap_(nullptr),
last_bitmap_offset_(0) {} last_bitmap_offset_(0) {}
BlockIter(const Comparator* comparator, const char* data, uint32_t restarts, BlockIter(const Comparator* comparator, const char* data, uint32_t restarts,
uint32_t num_restarts, BlockPrefixIndex* prefix_index, uint32_t num_restarts, BlockPrefixIndex* prefix_index,
SequenceNumber global_seqno, BlockReadAmpBitmap* read_amp_bitmap) BlockReadAmpBitmap* read_amp_bitmap)
: BlockIter() { : BlockIter() {
Initialize(comparator, data, restarts, num_restarts, prefix_index, Initialize(comparator, data, restarts, num_restarts, prefix_index,
global_seqno, read_amp_bitmap); read_amp_bitmap);
} }
void Initialize(const Comparator* comparator, const char* data, void Initialize(const Comparator* comparator, const char* data,
uint32_t restarts, uint32_t num_restarts, uint32_t restarts, uint32_t num_restarts,
BlockPrefixIndex* prefix_index, SequenceNumber global_seqno, BlockPrefixIndex* prefix_index,
BlockReadAmpBitmap* read_amp_bitmap) { BlockReadAmpBitmap* read_amp_bitmap) {
assert(data_ == nullptr); // Ensure it is called only once assert(data_ == nullptr); // Ensure it is called only once
assert(num_restarts > 0); // Ensure the param is valid assert(num_restarts > 0); // Ensure the param is valid
...@@ -247,7 +240,6 @@ class BlockIter : public InternalIterator { ...@@ -247,7 +240,6 @@ class BlockIter : public InternalIterator {
current_ = restarts_; current_ = restarts_;
restart_index_ = num_restarts_; restart_index_ = num_restarts_;
prefix_index_ = prefix_index; prefix_index_ = prefix_index;
global_seqno_ = global_seqno;
read_amp_bitmap_ = read_amp_bitmap; read_amp_bitmap_ = read_amp_bitmap;
last_bitmap_offset_ = current_ + 1; last_bitmap_offset_ = current_ + 1;
} }
...@@ -304,10 +296,6 @@ class BlockIter : public InternalIterator { ...@@ -304,10 +296,6 @@ class BlockIter : public InternalIterator {
size_t TEST_CurrentEntrySize() { return NextEntryOffset() - current_; } size_t TEST_CurrentEntrySize() { return NextEntryOffset() - current_; }
uint32_t ValueOffset() const {
return static_cast<uint32_t>(value_.data() - data_);
}
private: private:
const Comparator* comparator_; const Comparator* comparator_;
const char* data_; // underlying block contents const char* data_; // underlying block contents
...@@ -322,7 +310,6 @@ class BlockIter : public InternalIterator { ...@@ -322,7 +310,6 @@ class BlockIter : public InternalIterator {
Status status_; Status status_;
BlockPrefixIndex* prefix_index_; BlockPrefixIndex* prefix_index_;
bool key_pinned_; bool key_pinned_;
SequenceNumber global_seqno_;
// read-amp bitmap // read-amp bitmap
BlockReadAmpBitmap* read_amp_bitmap_; BlockReadAmpBitmap* read_amp_bitmap_;
......
...@@ -806,7 +806,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, ...@@ -806,7 +806,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
BlockContents results(std::move(ubuf), size, true, type); BlockContents results(std::move(ubuf), size, true, type);
Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber); Block* block = new Block(std::move(results));
// make cache key by appending the file offset to the cache prefix id // make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64( char* end = EncodeVarint64(
......
...@@ -38,7 +38,6 @@ ...@@ -38,7 +38,6 @@
#include "table/internal_iterator.h" #include "table/internal_iterator.h"
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "table/persistent_cache_helper.h" #include "table/persistent_cache_helper.h"
#include "table/sst_file_writer_collectors.h"
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "util/coding.h" #include "util/coding.h"
...@@ -70,14 +69,13 @@ Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, ...@@ -70,14 +69,13 @@ Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer,
const ImmutableCFOptions& ioptions, bool do_uncompress, const ImmutableCFOptions& ioptions, bool do_uncompress,
const Slice& compression_dict, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, const PersistentCacheOptions& cache_options,
SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit) { size_t read_amp_bytes_per_bit) {
BlockContents contents; BlockContents contents;
Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions, Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options); do_uncompress, compression_dict, cache_options);
if (s.ok()) { if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno, result->reset(new Block(std::move(contents), read_amp_bytes_per_bit,
read_amp_bytes_per_bit, ioptions.statistics)); ioptions.statistics));
} }
return s; return s;
...@@ -190,10 +188,10 @@ class BinarySearchIndexReader : public IndexReader { ...@@ -190,10 +188,10 @@ class BinarySearchIndexReader : public IndexReader {
const Comparator* comparator, IndexReader** index_reader, const Comparator* comparator, IndexReader** index_reader,
const PersistentCacheOptions& cache_options) { const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block; std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile( auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
file, footer, ReadOptions(), index_handle, &index_block, ioptions, &index_block, ioptions, true /* decompress */,
true /* decompress */, Slice() /*compression dict*/, cache_options, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); 0 /* read_amp_bytes_per_bit */);
if (s.ok()) { if (s.ok()) {
*index_reader = new BinarySearchIndexReader( *index_reader = new BinarySearchIndexReader(
...@@ -242,10 +240,10 @@ class HashIndexReader : public IndexReader { ...@@ -242,10 +240,10 @@ class HashIndexReader : public IndexReader {
bool hash_index_allow_collision, bool hash_index_allow_collision,
const PersistentCacheOptions& cache_options) { const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block; std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile( auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle,
file, footer, ReadOptions(), index_handle, &index_block, ioptions, &index_block, ioptions, true /* decompress */,
true /* decompress */, Slice() /*compression dict*/, cache_options, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); 0 /* read_amp_bytes_per_bit */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
...@@ -371,8 +369,7 @@ struct BlockBasedTable::Rep { ...@@ -371,8 +369,7 @@ struct BlockBasedTable::Rep {
filter_type(FilterType::kNoFilter), filter_type(FilterType::kNoFilter),
whole_key_filtering(_table_opt.whole_key_filtering), whole_key_filtering(_table_opt.whole_key_filtering),
prefix_filtering(true), prefix_filtering(true),
range_del_block(nullptr), range_del_block(nullptr) {}
global_seqno(kDisableGlobalSequenceNumber) {}
const ImmutableCFOptions& ioptions; const ImmutableCFOptions& ioptions;
const EnvOptions& env_options; const EnvOptions& env_options;
...@@ -431,13 +428,6 @@ struct BlockBasedTable::Rep { ...@@ -431,13 +428,6 @@ struct BlockBasedTable::Rep {
CachableEntry<FilterBlockReader> filter_entry; CachableEntry<FilterBlockReader> filter_entry;
CachableEntry<IndexReader> index_entry; CachableEntry<IndexReader> index_entry;
unique_ptr<Block> range_del_block; unique_ptr<Block> range_del_block;
// If global_seqno is used, all Keys in this file will have the same
// seqno with value `global_seqno`.
//
// A value of kDisableGlobalSequenceNumber means that this feature is disabled
// and every key have it's own seqno.
SequenceNumber global_seqno;
}; };
BlockBasedTable::~BlockBasedTable() { BlockBasedTable::~BlockBasedTable() {
...@@ -516,50 +506,6 @@ bool IsFeatureSupported(const TableProperties& table_properties, ...@@ -516,50 +506,6 @@ bool IsFeatureSupported(const TableProperties& table_properties,
} }
return true; return true;
} }
SequenceNumber GetGlobalSequenceNumber(const TableProperties& table_properties,
Logger* info_log) {
auto& props = table_properties.user_collected_properties;
auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion);
auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno);
if (version_pos == props.end()) {
if (seqno_pos != props.end()) {
// This is not an external sst file, global_seqno is not supported.
assert(false);
Log(InfoLogLevel::ERROR_LEVEL, info_log,
"A non-external sst file have global seqno property with value %s",
seqno_pos->second.c_str());
}
return kDisableGlobalSequenceNumber;
}
uint32_t version = DecodeFixed32(version_pos->second.c_str());
if (version < 2) {
if (seqno_pos != props.end() || version != 1) {
// This is a v1 external sst file, global_seqno is not supported.
assert(false);
Log(InfoLogLevel::ERROR_LEVEL, info_log,
"An external sst file with version %u have global seqno property "
"with value %s",
version, seqno_pos->second.c_str());
}
return kDisableGlobalSequenceNumber;
}
SequenceNumber global_seqno = DecodeFixed64(seqno_pos->second.c_str());
if (global_seqno > kMaxSequenceNumber) {
assert(false);
Log(InfoLogLevel::ERROR_LEVEL, info_log,
"An external sst file with version %u have global seqno property "
"with value %llu, which is greater than kMaxSequenceNumber",
version, global_seqno);
}
return global_seqno;
}
} // namespace } // namespace
Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix, Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix,
...@@ -723,8 +669,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, ...@@ -723,8 +669,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
"Encountered error while reading data from range del block %s", "Encountered error while reading data from range del block %s",
s.ToString().c_str()); s.ToString().c_str());
} else { } else {
rep->range_del_block.reset(new Block( rep->range_del_block.reset(
std::move(range_del_block_contents), kDisableGlobalSequenceNumber)); new Block(std::move(range_del_block_contents)));
} }
} }
} }
...@@ -738,9 +684,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, ...@@ -738,9 +684,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
rep->prefix_filtering &= IsFeatureSupported( rep->prefix_filtering &= IsFeatureSupported(
*(rep->table_properties), *(rep->table_properties),
BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log); BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log);
rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties),
rep->ioptions.info_log);
} }
// pre-fetching of blocks is turned on // pre-fetching of blocks is turned on
...@@ -854,8 +797,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, ...@@ -854,8 +797,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
rep->file.get(), rep->footer, ReadOptions(), rep->file.get(), rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions, rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/, true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, kDisableGlobalSequenceNumber, rep->persistent_cache_options, 0 /* read_amp_bytes_per_bit */);
0 /* read_amp_bytes_per_bit */);
if (!s.ok()) { if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log, Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log,
...@@ -925,10 +867,8 @@ Status BlockBasedTable::GetDataBlockFromCache( ...@@ -925,10 +867,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Insert uncompressed block into block cache // Insert uncompressed block into block cache
if (s.ok()) { if (s.ok()) {
block->value = block->value = new Block(std::move(contents), read_amp_bytes_per_bit,
new Block(std::move(contents), compressed_block->global_seqno(), statistics); // uncompressed block
read_amp_bytes_per_bit,
statistics); // uncompressed block
assert(block->value->compression_type() == kNoCompression); assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() && if (block_cache != nullptr && block->value->cachable() &&
read_options.fill_cache) { read_options.fill_cache) {
...@@ -973,9 +913,8 @@ Status BlockBasedTable::PutDataBlockToCache( ...@@ -973,9 +913,8 @@ Status BlockBasedTable::PutDataBlockToCache(
} }
if (raw_block->compression_type() != kNoCompression) { if (raw_block->compression_type() != kNoCompression) {
block->value = new Block(std::move(contents), raw_block->global_seqno(), block->value = new Block(std::move(contents), read_amp_bytes_per_bit,
read_amp_bytes_per_bit, statistics); // compressed block
statistics); // uncompressed block
} else { } else {
block->value = raw_block; block->value = raw_block;
raw_block = nullptr; raw_block = nullptr;
...@@ -1283,11 +1222,11 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( ...@@ -1283,11 +1222,11 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
std::unique_ptr<Block> raw_block; std::unique_ptr<Block> raw_block;
{ {
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile( s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions, &raw_block, rep->ioptions,
block_cache_compressed == nullptr, compression_dict, block_cache_compressed == nullptr,
rep->persistent_cache_options, rep->global_seqno, compression_dict, rep->persistent_cache_options,
rep->table_options.read_amp_bytes_per_bit); rep->table_options.read_amp_bytes_per_bit);
} }
if (s.ok()) { if (s.ok()) {
...@@ -1311,10 +1250,10 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( ...@@ -1311,10 +1250,10 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
} }
} }
std::unique_ptr<Block> block_value; std::unique_ptr<Block> block_value;
s = ReadBlockFromFile( s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle,
rep->file.get(), rep->footer, ro, handle, &block_value, rep->ioptions, &block_value, rep->ioptions, true /* compress */,
true /* compress */, compression_dict, rep->persistent_cache_options, compression_dict, rep->persistent_cache_options,
rep->global_seqno, rep->table_options.read_amp_bytes_per_bit); rep->table_options.read_amp_bytes_per_bit);
if (s.ok()) { if (s.ok()) {
block.value = block_value.release(); block.value = block_value.release();
} }
......
...@@ -95,7 +95,7 @@ TEST_F(BlockTest, SimpleTest) { ...@@ -95,7 +95,7 @@ TEST_F(BlockTest, SimpleTest) {
BlockContents contents; BlockContents contents;
contents.data = rawblock; contents.data = rawblock;
contents.cachable = false; contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber); Block reader(std::move(contents));
// read contents of block sequentially // read contents of block sequentially
int count = 0; int count = 0;
...@@ -156,8 +156,8 @@ void CheckBlockContents(BlockContents contents, const int max_key, ...@@ -156,8 +156,8 @@ void CheckBlockContents(BlockContents contents, const int max_key,
// create block reader // create block reader
BlockContents contents_ref(contents.data, contents.cachable, BlockContents contents_ref(contents.data, contents.cachable,
contents.compression_type); contents.compression_type);
Block reader1(std::move(contents), kDisableGlobalSequenceNumber); Block reader1(std::move(contents));
Block reader2(std::move(contents_ref), kDisableGlobalSequenceNumber); Block reader2(std::move(contents_ref));
std::unique_ptr<const SliceTransform> prefix_extractor( std::unique_ptr<const SliceTransform> prefix_extractor(
NewFixedPrefixTransform(prefix_size)); NewFixedPrefixTransform(prefix_size));
...@@ -358,8 +358,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) { ...@@ -358,8 +358,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
BlockContents contents; BlockContents contents;
contents.data = rawblock; contents.data = rawblock;
contents.cachable = true; contents.cachable = true;
Block reader(std::move(contents), kDisableGlobalSequenceNumber, Block reader(std::move(contents), kBytesPerBit, stats.get());
kBytesPerBit, stats.get());
// read contents of block sequentially // read contents of block sequentially
size_t read_bytes = 0; size_t read_bytes = 0;
...@@ -392,8 +391,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) { ...@@ -392,8 +391,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
BlockContents contents; BlockContents contents;
contents.data = rawblock; contents.data = rawblock;
contents.cachable = true; contents.cachable = true;
Block reader(std::move(contents), kDisableGlobalSequenceNumber, Block reader(std::move(contents), kBytesPerBit, stats.get());
kBytesPerBit, stats.get());
size_t read_bytes = 0; size_t read_bytes = 0;
BlockIter *iter = static_cast<BlockIter *>( BlockIter *iter = static_cast<BlockIter *>(
...@@ -428,8 +426,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) { ...@@ -428,8 +426,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
BlockContents contents; BlockContents contents;
contents.data = rawblock; contents.data = rawblock;
contents.cachable = true; contents.cachable = true;
Block reader(std::move(contents), kDisableGlobalSequenceNumber, Block reader(std::move(contents), kBytesPerBit, stats.get());
kBytesPerBit, stats.get());
size_t read_bytes = 0; size_t read_bytes = 0;
BlockIter *iter = static_cast<BlockIter *>( BlockIter *iter = static_cast<BlockIter *>(
......
...@@ -175,10 +175,9 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, ...@@ -175,10 +175,9 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
return s; return s;
} }
Block properties_block(std::move(block_contents), Block properties_block(std::move(block_contents));
kDisableGlobalSequenceNumber); std::unique_ptr<InternalIterator> iter(
BlockIter iter; properties_block.NewIterator(BytewiseComparator()));
properties_block.NewIterator(BytewiseComparator(), &iter);
auto new_table_properties = new TableProperties(); auto new_table_properties = new TableProperties();
// All pre-defined properties of type uint64_t // All pre-defined properties of type uint64_t
...@@ -201,24 +200,21 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, ...@@ -201,24 +200,21 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
}; };
std::string last_key; std::string last_key;
for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
s = iter.status(); s = iter->status();
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
auto key = iter.key().ToString(); auto key = iter->key().ToString();
// properties block is strictly sorted with no duplicate key. // properties block is strictly sorted with no duplicate key.
assert(last_key.empty() || assert(last_key.empty() ||
BytewiseComparator()->Compare(key, last_key) > 0); BytewiseComparator()->Compare(key, last_key) > 0);
last_key = key; last_key = key;
auto raw_val = iter.value(); auto raw_val = iter->value();
auto pos = predefined_uint64_properties.find(key); auto pos = predefined_uint64_properties.find(key);
new_table_properties->properties_offsets.insert(
{key, handle.offset() + iter.ValueOffset()});
if (pos != predefined_uint64_properties.end()) { if (pos != predefined_uint64_properties.end()) {
// handle predefined rocksdb properties // handle predefined rocksdb properties
uint64_t val; uint64_t val;
...@@ -281,8 +277,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, ...@@ -281,8 +277,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
Block metaindex_block(std::move(metaindex_contents), Block metaindex_block(std::move(metaindex_contents));
kDisableGlobalSequenceNumber);
std::unique_ptr<InternalIterator> meta_iter( std::unique_ptr<InternalIterator> meta_iter(
metaindex_block.NewIterator(BytewiseComparator())); metaindex_block.NewIterator(BytewiseComparator()));
...@@ -336,8 +331,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, ...@@ -336,8 +331,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
Block metaindex_block(std::move(metaindex_contents), Block metaindex_block(std::move(metaindex_contents));
kDisableGlobalSequenceNumber);
std::unique_ptr<InternalIterator> meta_iter; std::unique_ptr<InternalIterator> meta_iter;
meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator())); meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator()));
...@@ -370,8 +364,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, ...@@ -370,8 +364,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
} }
// Finding metablock // Finding metablock
Block metaindex_block(std::move(metaindex_contents), Block metaindex_block(std::move(metaindex_contents));
kDisableGlobalSequenceNumber);
std::unique_ptr<InternalIterator> meta_iter; std::unique_ptr<InternalIterator> meta_iter;
meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator())); meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator()));
......
...@@ -9,15 +9,66 @@ ...@@ -9,15 +9,66 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block_based_table_builder.h" #include "table/block_based_table_builder.h"
#include "table/sst_file_writer_collectors.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/string_util.h"
namespace rocksdb { namespace rocksdb {
const std::string ExternalSstFilePropertyNames::kVersion = const std::string ExternalSstFilePropertyNames::kVersion =
"rocksdb.external_sst_file.version"; "rocksdb.external_sst_file.version";
const std::string ExternalSstFilePropertyNames::kGlobalSeqno =
"rocksdb.external_sst_file.global_seqno"; // PropertiesCollector used to add properties specific to tables
// generated by SstFileWriter
class SstFileWriter::SstFileWriterPropertiesCollector
: public IntTblPropCollector {
public:
explicit SstFileWriterPropertiesCollector(int32_t version)
: version_(version) {}
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* properties) override {
std::string version_val;
PutFixed32(&version_val, static_cast<int32_t>(version_));
properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
return Status::OK();
}
virtual const char* Name() const override {
return "SstFileWriterPropertiesCollector";
}
virtual UserCollectedProperties GetReadableProperties() const override {
return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
}
private:
int32_t version_;
};
class SstFileWriter::SstFileWriterPropertiesCollectorFactory
: public IntTblPropCollectorFactory {
public:
explicit SstFileWriterPropertiesCollectorFactory(int32_t version)
: version_(version) {}
virtual IntTblPropCollector* CreateIntTblPropCollector(
uint32_t column_family_id) override {
return new SstFileWriterPropertiesCollector(version_);
}
virtual const char* Name() const override {
return "SstFileWriterPropertiesCollector";
}
private:
int32_t version_;
};
struct SstFileWriter::Rep { struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const Options& options, Rep(const EnvOptions& _env_options, const Options& options,
...@@ -69,8 +120,7 @@ Status SstFileWriter::Open(const std::string& file_path) { ...@@ -69,8 +120,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
// SstFileWriter properties collector to add SstFileWriter version. // SstFileWriter properties collector to add SstFileWriter version.
int_tbl_prop_collector_factories.emplace_back( int_tbl_prop_collector_factories.emplace_back(
new SstFileWriterPropertiesCollectorFactory(2 /* version */, new SstFileWriterPropertiesCollectorFactory(1 /* version */));
0 /* global_seqno*/));
// User collector factories // User collector factories
auto user_collector_factories = auto user_collector_factories =
...@@ -88,9 +138,6 @@ Status SstFileWriter::Open(const std::string& file_path) { ...@@ -88,9 +138,6 @@ Status SstFileWriter::Open(const std::string& file_path) {
r->column_family_name, unknown_level); r->column_family_name, unknown_level);
r->file_writer.reset( r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), r->env_options)); new WritableFileWriter(std::move(sst_file), r->env_options));
// TODO(tec) : If table_factory is using compressed block cache, we will
// be adding the external sst file blocks into it, which is wasteful.
r->builder.reset(r->ioptions.table_factory->NewTableBuilder( r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
table_builder_options, table_builder_options,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
...@@ -100,7 +147,7 @@ Status SstFileWriter::Open(const std::string& file_path) { ...@@ -100,7 +147,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
r->file_info.file_size = 0; r->file_info.file_size = 0;
r->file_info.num_entries = 0; r->file_info.num_entries = 0;
r->file_info.sequence_number = 0; r->file_info.sequence_number = 0;
r->file_info.version = 2; r->file_info.version = 1;
return s; return s;
} }
...@@ -125,7 +172,6 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { ...@@ -125,7 +172,6 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
r->file_info.largest_key.assign(user_key.data(), user_key.size()); r->file_info.largest_key.assign(user_key.data(), user_key.size());
r->file_info.file_size = r->builder->FileSize(); r->file_info.file_size = r->builder->FileSize();
// TODO(tec) : For external SST files we could omit the seqno and type.
r->ikey.Set(user_key, 0 /* Sequence Number */, r->ikey.Set(user_key, 0 /* Sequence Number */,
ValueType::kTypeValue /* Put */); ValueType::kTypeValue /* Put */);
r->builder->Add(r->ikey.Encode(), value); r->builder->Add(r->ikey.Encode(), value);
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "rocksdb/types.h"
#include "util/string_util.h"
namespace rocksdb {
// Table Properties that are specific to tables created by SstFileWriter.
struct ExternalSstFilePropertyNames {
// value of this property is a fixed uint32 number.
static const std::string kVersion;
// value of this property is a fixed uint64 number.
static const std::string kGlobalSeqno;
};
// PropertiesCollector used to add properties specific to tables
// generated by SstFileWriter
class SstFileWriterPropertiesCollector : public IntTblPropCollector {
public:
explicit SstFileWriterPropertiesCollector(int32_t version,
SequenceNumber global_seqno)
: version_(version), global_seqno_(global_seqno) {}
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override {
// Intentionally left blank. Have no interest in collecting stats for
// individual key/value pairs.
return Status::OK();
}
virtual Status Finish(UserCollectedProperties* properties) override {
// File version
std::string version_val;
PutFixed32(&version_val, static_cast<uint32_t>(version_));
properties->insert({ExternalSstFilePropertyNames::kVersion, version_val});
// Global Sequence number
std::string seqno_val;
PutFixed64(&seqno_val, static_cast<uint64_t>(global_seqno_));
properties->insert({ExternalSstFilePropertyNames::kGlobalSeqno, seqno_val});
return Status::OK();
}
virtual const char* Name() const override {
return "SstFileWriterPropertiesCollector";
}
virtual UserCollectedProperties GetReadableProperties() const override {
return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}};
}
private:
int32_t version_;
SequenceNumber global_seqno_;
};
class SstFileWriterPropertiesCollectorFactory
: public IntTblPropCollectorFactory {
public:
explicit SstFileWriterPropertiesCollectorFactory(int32_t version,
SequenceNumber global_seqno)
: version_(version), global_seqno_(global_seqno) {}
virtual IntTblPropCollector* CreateIntTblPropCollector(
uint32_t column_family_id) override {
return new SstFileWriterPropertiesCollector(version_, global_seqno_);
}
virtual const char* Name() const override {
return "SstFileWriterPropertiesCollector";
}
private:
int32_t version_;
SequenceNumber global_seqno_;
};
} // namespace rocksdb
...@@ -41,7 +41,6 @@ ...@@ -41,7 +41,6 @@
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "table/plain_table_factory.h" #include "table/plain_table_factory.h"
#include "table/scoped_arena_iterator.h" #include "table/scoped_arena_iterator.h"
#include "table/sst_file_writer_collectors.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/random.h" #include "util/random.h"
#include "util/statistics.h" #include "util/statistics.h"
...@@ -223,7 +222,7 @@ class BlockConstructor: public Constructor { ...@@ -223,7 +222,7 @@ class BlockConstructor: public Constructor {
BlockContents contents; BlockContents contents;
contents.data = data_; contents.data = data_;
contents.cachable = false; contents.cachable = false;
block_ = new Block(std::move(contents), kDisableGlobalSequenceNumber); block_ = new Block(std::move(contents));
return Status::OK(); return Status::OK();
} }
virtual InternalIterator* NewIterator() const override { virtual InternalIterator* NewIterator() const override {
...@@ -2746,183 +2745,6 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) { ...@@ -2746,183 +2745,6 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) {
// rocksdb still works. // rocksdb still works.
} }
TEST_F(BlockBasedTableTest, TableWithGlobalSeqno) {
BlockBasedTableOptions bbto;
test::StringSink* sink = new test::StringSink();
unique_ptr<WritableFileWriter> file_writer(test::GetWritableFileWriter(sink));
Options options;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const ImmutableCFOptions ioptions(options);
InternalKeyComparator ikc(options.comparator);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
int_tbl_prop_collector_factories.emplace_back(
new SstFileWriterPropertiesCollectorFactory(2 /* version */,
0 /* global_seqno*/));
std::string column_family_name;
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, CompressionOptions(),
nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
for (char c = 'a'; c <= 'z'; ++c) {
std::string key(8, c);
std::string value = key;
InternalKey ik(key, 0, kTypeValue);
builder->Add(ik.Encode(), value);
}
ASSERT_OK(builder->Finish());
file_writer->Flush();
test::RandomRWStringSink ss_rw(sink);
uint32_t version;
uint64_t global_seqno;
uint64_t global_seqno_offset;
// Helper function to get version, global_seqno, global_seqno_offset
std::function<void()> GetVersionAndGlobalSeqno = [&]() {
unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
TableProperties* props = nullptr;
ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(),
kBlockBasedTableMagicNumber, ioptions,
&props));
UserCollectedProperties user_props = props->user_collected_properties;
version = DecodeFixed32(
user_props[ExternalSstFilePropertyNames::kVersion].c_str());
global_seqno = DecodeFixed64(
user_props[ExternalSstFilePropertyNames::kGlobalSeqno].c_str());
global_seqno_offset =
props->properties_offsets[ExternalSstFilePropertyNames::kGlobalSeqno];
delete props;
};
// Helper function to update the value of the global seqno in the file
std::function<void(uint64_t)> SetGlobalSeqno = [&](uint64_t val) {
std::string new_global_seqno;
PutFixed64(&new_global_seqno, val);
ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno));
};
// Helper function to get the contents of the table InternalIterator
unique_ptr<TableReader> table_reader;
std::function<InternalIterator*()> GetTableInternalIter = [&]() {
unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
options.table_factory->NewTableReader(
TableReaderOptions(ioptions, EnvOptions(), ikc), std::move(file_reader),
ss_rw.contents().size(), &table_reader);
return table_reader->NewIterator(ReadOptions());
};
GetVersionAndGlobalSeqno();
ASSERT_EQ(2, version);
ASSERT_EQ(0, global_seqno);
InternalIterator* iter = GetTableInternalIter();
char current_c = 'a';
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 0);
ASSERT_EQ(pik.user_key, iter->value());
ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
current_c++;
}
ASSERT_EQ(current_c, 'z' + 1);
delete iter;
// Update global sequence number to 10
SetGlobalSeqno(10);
GetVersionAndGlobalSeqno();
ASSERT_EQ(2, version);
ASSERT_EQ(10, global_seqno);
iter = GetTableInternalIter();
current_c = 'a';
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 10);
ASSERT_EQ(pik.user_key, iter->value());
ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
current_c++;
}
ASSERT_EQ(current_c, 'z' + 1);
// Verify Seek
for (char c = 'a'; c <= 'z'; c++) {
std::string k = std::string(8, c);
InternalKey ik(k, 10, kValueTypeForSeek);
iter->Seek(ik.Encode());
ASSERT_TRUE(iter->Valid());
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 10);
ASSERT_EQ(pik.user_key.ToString(), k);
ASSERT_EQ(iter->value().ToString(), k);
}
delete iter;
// Update global sequence number to 3
SetGlobalSeqno(3);
GetVersionAndGlobalSeqno();
ASSERT_EQ(2, version);
ASSERT_EQ(3, global_seqno);
iter = GetTableInternalIter();
current_c = 'a';
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 3);
ASSERT_EQ(pik.user_key, iter->value());
ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c));
current_c++;
}
ASSERT_EQ(current_c, 'z' + 1);
// Verify Seek
for (char c = 'a'; c <= 'z'; c++) {
std::string k = std::string(8, c);
// seqno=4 is less than 3 so we still should get our key
InternalKey ik(k, 4, kValueTypeForSeek);
iter->Seek(ik.Encode());
ASSERT_TRUE(iter->Valid());
ParsedInternalKey pik;
ASSERT_TRUE(ParseInternalKey(iter->key(), &pik));
ASSERT_EQ(pik.type, ValueType::kTypeValue);
ASSERT_EQ(pik.sequence, 3);
ASSERT_EQ(pik.user_key.ToString(), k);
ASSERT_EQ(iter->value().ToString(), k);
}
delete iter;
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb { namespace rocksdb {
...@@ -35,11 +34,26 @@ class DeleteSchedulerTest : public testing::Test { ...@@ -35,11 +34,26 @@ class DeleteSchedulerTest : public testing::Test {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->LoadDependency({}); rocksdb::SyncPoint::GetInstance()->LoadDependency({});
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
test::DestroyDir(env_, dummy_files_dir_); DestroyDir(dummy_files_dir_);
}
void DestroyDir(const std::string& dir) {
if (env_->FileExists(dir).IsNotFound()) {
return;
}
std::vector<std::string> files_in_dir;
EXPECT_OK(env_->GetChildren(dir, &files_in_dir));
for (auto& file_in_dir : files_in_dir) {
if (file_in_dir == "." || file_in_dir == "..") {
continue;
}
EXPECT_OK(env_->DeleteFile(dir + "/" + file_in_dir));
}
EXPECT_OK(env_->DeleteDir(dir));
} }
void DestroyAndCreateDir(const std::string& dir) { void DestroyAndCreateDir(const std::string& dir) {
ASSERT_OK(test::DestroyDir(env_, dir)); DestroyDir(dir);
EXPECT_OK(env_->CreateDir(dir)); EXPECT_OK(env_->CreateDir(dir));
} }
...@@ -409,7 +423,7 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) { ...@@ -409,7 +423,7 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) {
// We will delete the trash directory, that mean that DeleteScheduler wont // We will delete the trash directory, that mean that DeleteScheduler wont
// be able to move files to trash and will delete files them immediately. // be able to move files to trash and will delete files them immediately.
ASSERT_OK(test::DestroyDir(env_, trash_dir_)); DestroyDir(trash_dir_);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data"; std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
......
...@@ -240,43 +240,6 @@ class StringSink: public WritableFile { ...@@ -240,43 +240,6 @@ class StringSink: public WritableFile {
size_t last_flush_; size_t last_flush_;
}; };
// A wrapper around a StringSink to give it a RandomRWFile interface
class RandomRWStringSink : public RandomRWFile {
public:
explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {}
Status Write(uint64_t offset, const Slice& data) {
if (offset + data.size() > ss_->contents_.size()) {
ss_->contents_.resize(offset + data.size(), '\0');
}
char* pos = const_cast<char*>(ss_->contents_.data() + offset);
memcpy(pos, data.data(), data.size());
return Status::OK();
}
Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {
*result = Slice(nullptr, 0);
if (offset < ss_->contents_.size()) {
size_t str_res_sz =
std::min(static_cast<size_t>(ss_->contents_.size() - offset), n);
*result = Slice(ss_->contents_.data() + offset, str_res_sz);
}
return Status::OK();
}
Status Flush() { return Status::OK(); }
Status Sync() { return Status::OK(); }
Status Close() { return Status::OK(); }
const std::string& contents() const { return ss_->contents(); }
private:
StringSink* ss_;
};
// Like StringSink, this writes into a string. Unlink StringSink, it // Like StringSink, this writes into a string. Unlink StringSink, it
// has some initial content and overwrites it, just like a recycled // has some initial content and overwrites it, just like a recycled
// log file. // log file.
......
...@@ -190,7 +190,7 @@ Status BlobDB::Get(const ReadOptions& options, const Slice& key, ...@@ -190,7 +190,7 @@ Status BlobDB::Get(const ReadOptions& options, const Slice& key,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
Block block(std::move(contents), kDisableGlobalSequenceNumber); Block block(std::move(contents));
BlockIter bit; BlockIter bit;
InternalIterator* it = block.NewIterator(nullptr, &bit); InternalIterator* it = block.NewIterator(nullptr, &bit);
it->SeekToFirst(); it->SeekToFirst();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册