提交 27b22f13 编写于 作者: I Igor Canadi

Merge pull request #249 from tdfischer/decompression-refactoring

Decompression refactoring
...@@ -299,10 +299,7 @@ uint32_t Block::NumRestarts() const { ...@@ -299,10 +299,7 @@ uint32_t Block::NumRestarts() const {
Block::Block(const BlockContents& contents) Block::Block(const BlockContents& contents)
: data_(contents.data.data()), : data_(contents.data.data()),
size_(contents.data.size()), size_(contents.data.size()) {
owned_(contents.heap_allocated),
cachable_(contents.cachable),
compression_type_(contents.compression_type) {
if (size_ < sizeof(uint32_t)) { if (size_ < sizeof(uint32_t)) {
size_ = 0; // Error marker size_ = 0; // Error marker
} else { } else {
...@@ -315,10 +312,8 @@ Block::Block(const BlockContents& contents) ...@@ -315,10 +312,8 @@ Block::Block(const BlockContents& contents)
} }
} }
Block::~Block() { Block::Block(BlockContents&& contents) : Block(contents) {
if (owned_) { contents_ = std::move(contents);
delete[] data_;
}
} }
Iterator* Block::NewIterator( Iterator* Block::NewIterator(
......
...@@ -14,6 +14,10 @@ ...@@ -14,6 +14,10 @@
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "table/block_prefix_index.h"
#include "table/block_hash_index.h"
#include "format.h"
namespace rocksdb { namespace rocksdb {
...@@ -26,15 +30,16 @@ class BlockPrefixIndex; ...@@ -26,15 +30,16 @@ class BlockPrefixIndex;
class Block { class Block {
public: public:
// Initialize the block with the specified contents. // Initialize the block with the specified contents.
explicit Block(BlockContents&& contents);
explicit Block(const BlockContents& contents); explicit Block(const BlockContents& contents);
~Block(); ~Block() = default;
size_t size() const { return size_; } size_t size() const { return size_; }
const char* data() const { return data_; } const char* data() const { return data_; }
bool cachable() const { return cachable_; } bool cachable() const { return contents_.cachable; }
uint32_t NumRestarts() const; uint32_t NumRestarts() const;
CompressionType compression_type() const { return compression_type_; } CompressionType compression_type() const { return contents_.compression_type; }
// If hash index lookup is enabled and `use_hash_index` is true. This block // If hash index lookup is enabled and `use_hash_index` is true. This block
// will do hash lookup for the key prefix. // will do hash lookup for the key prefix.
...@@ -58,12 +63,10 @@ class Block { ...@@ -58,12 +63,10 @@ class Block {
size_t ApproximateMemoryUsage() const; size_t ApproximateMemoryUsage() const;
private: private:
BlockContents contents_;
const char* data_; const char* data_;
size_t size_; size_t size_;
uint32_t restart_offset_; // Offset in data_ of restart array uint32_t restart_offset_; // Offset in data_ of restart array
bool owned_; // Block owns data_[]
bool cachable_;
CompressionType compression_type_;
std::unique_ptr<BlockHashIndex> hash_index_; std::unique_ptr<BlockHashIndex> hash_index_;
std::unique_ptr<BlockPrefixIndex> prefix_index_; std::unique_ptr<BlockPrefixIndex> prefix_index_;
......
...@@ -138,7 +138,7 @@ void BlockBasedFilterBlockBuilder::GenerateFilter() { ...@@ -138,7 +138,7 @@ void BlockBasedFilterBlockBuilder::GenerateFilter() {
BlockBasedFilterBlockReader::BlockBasedFilterBlockReader( BlockBasedFilterBlockReader::BlockBasedFilterBlockReader(
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt, const BlockBasedTableOptions& table_opt,
const Slice& contents, bool delete_contents_after_use) const Slice& contents)
: policy_(table_opt.filter_policy.get()), : policy_(table_opt.filter_policy.get()),
prefix_extractor_(prefix_extractor), prefix_extractor_(prefix_extractor),
whole_key_filtering_(table_opt.whole_key_filtering), whole_key_filtering_(table_opt.whole_key_filtering),
...@@ -155,9 +155,14 @@ BlockBasedFilterBlockReader::BlockBasedFilterBlockReader( ...@@ -155,9 +155,14 @@ BlockBasedFilterBlockReader::BlockBasedFilterBlockReader(
data_ = contents.data(); data_ = contents.data();
offset_ = data_ + last_word; offset_ = data_ + last_word;
num_ = (n - 5 - last_word) / 4; num_ = (n - 5 - last_word) / 4;
if (delete_contents_after_use) { }
filter_data.reset(contents.data());
} BlockBasedFilterBlockReader::BlockBasedFilterBlockReader(
const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt,
BlockContents &&contents)
: BlockBasedFilterBlockReader (prefix_extractor, table_opt, contents.data) {
contents_ = std::move(contents);
} }
bool BlockBasedFilterBlockReader::KeyMayMatch(const Slice& key, bool BlockBasedFilterBlockReader::KeyMayMatch(const Slice& key,
......
...@@ -74,8 +74,10 @@ class BlockBasedFilterBlockReader : public FilterBlockReader { ...@@ -74,8 +74,10 @@ class BlockBasedFilterBlockReader : public FilterBlockReader {
// REQUIRES: "contents" and *policy must stay live while *this is live. // REQUIRES: "contents" and *policy must stay live while *this is live.
BlockBasedFilterBlockReader(const SliceTransform* prefix_extractor, BlockBasedFilterBlockReader(const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt, const BlockBasedTableOptions& table_opt,
const Slice& contents, const Slice& contents);
bool delete_contents_after_use = false); BlockBasedFilterBlockReader(const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt,
BlockContents&& contents);
virtual bool IsBlockBased() override { return true; } virtual bool IsBlockBased() override { return true; }
virtual bool KeyMayMatch(const Slice& key, virtual bool KeyMayMatch(const Slice& key,
uint64_t block_offset = kNotValid) override; uint64_t block_offset = kNotValid) override;
...@@ -91,7 +93,7 @@ class BlockBasedFilterBlockReader : public FilterBlockReader { ...@@ -91,7 +93,7 @@ class BlockBasedFilterBlockReader : public FilterBlockReader {
const char* offset_; // Pointer to beginning of offset array (at block-end) const char* offset_; // Pointer to beginning of offset array (at block-end)
size_t num_; // Number of entries in offset array size_t num_; // Number of entries in offset array
size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file) size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file)
std::unique_ptr<const char[]> filter_data; BlockContents contents_;
bool MayMatch(const Slice& entry, uint64_t block_offset); bool MayMatch(const Slice& entry, uint64_t block_offset);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <utility>
#include "db/dbformat.h" #include "db/dbformat.h"
...@@ -634,18 +635,13 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, ...@@ -634,18 +635,13 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
Cache::Handle* cache_handle = nullptr; Cache::Handle* cache_handle = nullptr;
size_t size = block_contents.size(); size_t size = block_contents.size();
char* ubuf = new char[size + 1]; // make a new copy std::unique_ptr<char[]> ubuf(new char[size+1]);
memcpy(ubuf, block_contents.data(), size); memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type; ubuf[size] = type;
BlockContents results; BlockContents results(std::move(ubuf), size, true, type);
Slice sl(ubuf, size);
results.data = sl;
results.cachable = true; // XXX
results.heap_allocated = true;
results.compression_type = type;
Block* block = new Block(results); Block* block = new Block(std::move(results));
// make cache key by appending the file offset to the cache prefix id // make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64( char* end = EncodeVarint64(
......
...@@ -66,7 +66,7 @@ Status ReadBlockFromFile(RandomAccessFile* file, const Footer& footer, ...@@ -66,7 +66,7 @@ Status ReadBlockFromFile(RandomAccessFile* file, const Footer& footer,
Status s = ReadBlockContents(file, footer, options, handle, &contents, env, Status s = ReadBlockContents(file, footer, options, handle, &contents, env,
do_uncompress); do_uncompress);
if (s.ok()) { if (s.ok()) {
*result = new Block(contents); *result = new Block(std::move(contents));
} }
return s; return s;
...@@ -252,9 +252,6 @@ class HashIndexReader : public IndexReader { ...@@ -252,9 +252,6 @@ class HashIndexReader : public IndexReader {
&prefixes_meta_contents, env, &prefixes_meta_contents, env,
true /* do decompression */); true /* do decompression */);
if (!s.ok()) { if (!s.ok()) {
if (prefixes_contents.heap_allocated) {
delete[] prefixes_contents.data.data();
}
// TODO: log error // TODO: log error
return Status::OK(); return Status::OK();
} }
...@@ -269,7 +266,7 @@ class HashIndexReader : public IndexReader { ...@@ -269,7 +266,7 @@ class HashIndexReader : public IndexReader {
// TODO: log error // TODO: log error
if (s.ok()) { if (s.ok()) {
new_index_reader->index_block_->SetBlockHashIndex(hash_index); new_index_reader->index_block_->SetBlockHashIndex(hash_index);
new_index_reader->OwnPrefixesContents(prefixes_contents); new_index_reader->OwnPrefixesContents(std::move(prefixes_contents));
} }
} else { } else {
BlockPrefixIndex* prefix_index = nullptr; BlockPrefixIndex* prefix_index = nullptr;
...@@ -283,18 +280,6 @@ class HashIndexReader : public IndexReader { ...@@ -283,18 +280,6 @@ class HashIndexReader : public IndexReader {
} }
} }
// Always release prefix meta block
if (prefixes_meta_contents.heap_allocated) {
delete[] prefixes_meta_contents.data.data();
}
// Release prefix content block if we don't own it.
if (!new_index_reader->own_prefixes_contents_) {
if (prefixes_contents.heap_allocated) {
delete[] prefixes_contents.data.data();
}
}
return Status::OK(); return Status::OK();
} }
...@@ -314,24 +299,18 @@ class HashIndexReader : public IndexReader { ...@@ -314,24 +299,18 @@ class HashIndexReader : public IndexReader {
private: private:
HashIndexReader(const Comparator* comparator, Block* index_block) HashIndexReader(const Comparator* comparator, Block* index_block)
: IndexReader(comparator), : IndexReader(comparator),
index_block_(index_block), index_block_(index_block) {
own_prefixes_contents_(false) {
assert(index_block_ != nullptr); assert(index_block_ != nullptr);
} }
~HashIndexReader() { ~HashIndexReader() {
if (own_prefixes_contents_ && prefixes_contents_.heap_allocated) {
delete[] prefixes_contents_.data.data();
}
} }
void OwnPrefixesContents(const BlockContents& prefixes_contents) { void OwnPrefixesContents(BlockContents&& prefixes_contents) {
prefixes_contents_ = prefixes_contents; prefixes_contents_ = std::move(prefixes_contents);
own_prefixes_contents_ = true;
} }
std::unique_ptr<Block> index_block_; std::unique_ptr<Block> index_block_;
bool own_prefixes_contents_;
BlockContents prefixes_contents_; BlockContents prefixes_contents_;
}; };
...@@ -677,7 +656,7 @@ Status BlockBasedTable::GetDataBlockFromCache( ...@@ -677,7 +656,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Insert uncompressed block into block cache // Insert uncompressed block into block cache
if (s.ok()) { if (s.ok()) {
block->value = new Block(contents); // uncompressed block block->value = new Block(std::move(contents)); // uncompressed block
assert(block->value->compression_type() == kNoCompression); assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() && if (block_cache != nullptr && block->value->cachable() &&
read_options.fill_cache) { read_options.fill_cache) {
...@@ -715,7 +694,7 @@ Status BlockBasedTable::PutDataBlockToCache( ...@@ -715,7 +694,7 @@ Status BlockBasedTable::PutDataBlockToCache(
} }
if (raw_block->compression_type() != kNoCompression) { if (raw_block->compression_type() != kNoCompression) {
block->value = new Block(contents); // uncompressed block block->value = new Block(std::move(contents)); // uncompressed block
} else { } else {
block->value = raw_block; block->value = raw_block;
raw_block = nullptr; raw_block = nullptr;
...@@ -768,15 +747,14 @@ FilterBlockReader* BlockBasedTable::ReadFilter( ...@@ -768,15 +747,14 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
assert(rep->filter_policy); assert(rep->filter_policy);
if (kFilterBlockPrefix == filter_block_prefix) { if (kFilterBlockPrefix == filter_block_prefix) {
return new BlockBasedFilterBlockReader(rep->ioptions.prefix_extractor, return new BlockBasedFilterBlockReader(rep->ioptions.prefix_extractor,
rep->table_options, block.data, block.heap_allocated); rep->table_options, std::move(block));
} else if (kFullFilterBlockPrefix == filter_block_prefix) { } else if (kFullFilterBlockPrefix == filter_block_prefix) {
auto filter_bits_reader = rep->filter_policy-> auto filter_bits_reader = rep->filter_policy->
GetFilterBitsReader(block.data); GetFilterBitsReader(block.data);
if (filter_bits_reader != nullptr) { if (filter_bits_reader != nullptr) {
return new FullFilterBlockReader(rep->ioptions.prefix_extractor, return new FullFilterBlockReader(rep->ioptions.prefix_extractor,
rep->table_options, block.data, filter_bits_reader, rep->table_options, std::move(block), filter_bits_reader);
block.heap_allocated);
} }
} }
return nullptr; return nullptr;
......
...@@ -92,8 +92,7 @@ TEST(BlockTest, SimpleTest) { ...@@ -92,8 +92,7 @@ TEST(BlockTest, SimpleTest) {
BlockContents contents; BlockContents contents;
contents.data = rawblock; contents.data = rawblock;
contents.cachable = false; contents.cachable = false;
contents.heap_allocated = false; Block reader(std::move(contents));
Block reader(contents);
// read contents of block sequentially // read contents of block sequentially
int count = 0; int count = 0;
...@@ -143,12 +142,11 @@ BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder, ...@@ -143,12 +142,11 @@ BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder,
BlockContents contents; BlockContents contents;
contents.data = rawblock; contents.data = rawblock;
contents.cachable = false; contents.cachable = false;
contents.heap_allocated = false;
return contents; return contents;
} }
void CheckBlockContents(BlockContents contents, const int max_key, void CheckBlockContents(const BlockContents &contents, const int max_key,
const std::vector<std::string> &keys, const std::vector<std::string> &keys,
const std::vector<std::string> &values) { const std::vector<std::string> &values) {
const size_t prefix_size = 6; const size_t prefix_size = 6;
......
...@@ -18,17 +18,22 @@ ...@@ -18,17 +18,22 @@
#pragma once #pragma once
#include <memory>
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <string> #include <string>
#include <vector> #include <vector>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "util/hash.h"
#include "format.h"
namespace rocksdb { namespace rocksdb {
const uint64_t kNotValid = ULLONG_MAX; const uint64_t kNotValid = ULLONG_MAX;
class FilterPolicy;
// A FilterBlockBuilder is used to construct all of the filters for a // A FilterBlockBuilder is used to construct all of the filters for a
// particular Table. It generates a single string which is stored as // particular Table. It generates a single string which is stored as
......
...@@ -255,112 +255,53 @@ Status ReadBlock(RandomAccessFile* file, const Footer& footer, ...@@ -255,112 +255,53 @@ Status ReadBlock(RandomAccessFile* file, const Footer& footer,
return s; return s;
} }
// Decompress a block according to params Status ReadBlockContents(RandomAccessFile* file, const Footer& footer,
// May need to malloc a space for cache usage const ReadOptions& options, const BlockHandle& handle,
Status DecompressBlock(BlockContents* result, size_t block_size, BlockContents *contents, Env* env,
bool do_uncompress, const char* buf, bool decompression_requested) {
const Slice& contents, bool use_stack_buf) { Status status;
Status s; Slice slice;
size_t n = block_size; size_t n = static_cast<size_t>(handle.size());
const char* data = contents.data(); std::unique_ptr<char[]> heap_buf;
char stack_buf[DefaultStackBufferSize];
result->data = Slice(); char *used_buf = nullptr;
result->cachable = false; rocksdb::CompressionType compression_type;
result->heap_allocated = false;
if (decompression_requested && n + kBlockTrailerSize < DefaultStackBufferSize) {
PERF_TIMER_GUARD(block_decompress_time); //If we've got a small enough hunk of data, read it in to the
rocksdb::CompressionType compression_type = //trivially allocated stack buffer instead of needing a full malloc()
static_cast<rocksdb::CompressionType>(data[n]); used_buf = &stack_buf[0];
// If the caller has requested that the block not be uncompressed
if (!do_uncompress || compression_type == kNoCompression) {
if (data != buf) {
// File implementation gave us pointer to some other data.
// Use it directly under the assumption that it will be live
// while the file is open.
result->data = Slice(data, n);
result->heap_allocated = false;
result->cachable = false; // Do not double-cache
} else {
if (use_stack_buf) {
// Need to allocate space in heap for cache usage
char* new_buf = new char[n];
memcpy(new_buf, buf, n);
result->data = Slice(new_buf, n);
} else {
result->data = Slice(buf, n);
}
result->heap_allocated = true;
result->cachable = true;
}
result->compression_type = compression_type;
s = Status::OK();
} else { } else {
s = UncompressBlockContents(data, n, result); heap_buf = std::unique_ptr<char[]>(new char[n + kBlockTrailerSize]);
used_buf = heap_buf.get();
} }
return s;
}
// Read and Decompress block status = ReadBlock(file, footer, options, handle, &slice, used_buf);
// Use buf in stack as temp reading buffer
Status ReadAndDecompressFast(RandomAccessFile* file, const Footer& footer,
const ReadOptions& options,
const BlockHandle& handle, BlockContents* result,
Env* env, bool do_uncompress) {
Status s;
Slice contents;
size_t n = static_cast<size_t>(handle.size());
char buf[DefaultStackBufferSize];
s = ReadBlock(file, footer, options, handle, &contents, buf); if (!status.ok()) {
if (!s.ok()) { return status;
return s;
}
s = DecompressBlock(result, n, do_uncompress, buf, contents, true);
if (!s.ok()) {
return s;
} }
return s;
}
// Read and Decompress block PERF_TIMER_GUARD(block_decompress_time);
// Use buf in heap as temp reading buffer
Status ReadAndDecompress(RandomAccessFile* file, const Footer& footer,
const ReadOptions& options, const BlockHandle& handle,
BlockContents* result, Env* env, bool do_uncompress) {
Status s;
Slice contents;
size_t n = static_cast<size_t>(handle.size());
char* buf = new char[n + kBlockTrailerSize];
s = ReadBlock(file, footer, options, handle, &contents, buf); compression_type = static_cast<rocksdb::CompressionType>(slice.data()[n]);
if (!s.ok()) {
delete[] buf; if (decompression_requested && compression_type != kNoCompression) {
return s; return UncompressBlockContents(slice.data(), n, contents);
}
s = DecompressBlock(result, n, do_uncompress, buf, contents, false);
if (!s.ok()) {
delete[] buf;
return s;
} }
if (result->data.data() != buf) { if (slice.data() != used_buf) {
delete[] buf; *contents = BlockContents(Slice(slice.data(), n), false, compression_type);
return status;
} }
return s;
}
Status ReadBlockContents(RandomAccessFile* file, const Footer& footer, if (used_buf == &stack_buf[0]) {
const ReadOptions& options, const BlockHandle& handle, heap_buf = std::unique_ptr<char[]>(new char[n]);
BlockContents* result, Env* env, bool do_uncompress) { memcpy(heap_buf.get(), stack_buf, n);
size_t n = static_cast<size_t>(handle.size());
if (do_uncompress && n + kBlockTrailerSize < DefaultStackBufferSize) {
return ReadAndDecompressFast(file, footer, options, handle, result, env,
do_uncompress);
} else {
return ReadAndDecompress(file, footer, options, handle, result, env,
do_uncompress);
} }
*contents = BlockContents(std::move(heap_buf), n, true, compression_type);
return status;
} }
// //
...@@ -370,8 +311,8 @@ Status ReadBlockContents(RandomAccessFile* file, const Footer& footer, ...@@ -370,8 +311,8 @@ Status ReadBlockContents(RandomAccessFile* file, const Footer& footer,
// buffer is returned via 'result' and it is upto the caller to // buffer is returned via 'result' and it is upto the caller to
// free this buffer. // free this buffer.
Status UncompressBlockContents(const char* data, size_t n, Status UncompressBlockContents(const char* data, size_t n,
BlockContents* result) { BlockContents* contents) {
char* ubuf = nullptr; std::unique_ptr<char[]> ubuf;
int decompress_size = 0; int decompress_size = 0;
assert(data[n] != kNoCompression); assert(data[n] != kNoCompression);
switch (data[n]) { switch (data[n]) {
...@@ -382,64 +323,52 @@ Status UncompressBlockContents(const char* data, size_t n, ...@@ -382,64 +323,52 @@ Status UncompressBlockContents(const char* data, size_t n,
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg); return Status::Corruption(snappy_corrupt_msg);
} }
ubuf = new char[ulength]; ubuf = std::unique_ptr<char[]>(new char[ulength]);
if (!port::Snappy_Uncompress(data, n, ubuf)) { if (!port::Snappy_Uncompress(data, n, ubuf.get())) {
delete[] ubuf;
return Status::Corruption(snappy_corrupt_msg); return Status::Corruption(snappy_corrupt_msg);
} }
result->data = Slice(ubuf, ulength); *contents = BlockContents(std::move(ubuf), ulength, true, kNoCompression);
result->heap_allocated = true;
result->cachable = true;
break; break;
} }
case kZlibCompression: case kZlibCompression:
ubuf = port::Zlib_Uncompress(data, n, &decompress_size); ubuf = std::unique_ptr<char[]>(port::Zlib_Uncompress(data, n, &decompress_size));
static char zlib_corrupt_msg[] = static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents"; "Zlib not supported or corrupted Zlib compressed block contents";
if (!ubuf) { if (!ubuf) {
return Status::Corruption(zlib_corrupt_msg); return Status::Corruption(zlib_corrupt_msg);
} }
result->data = Slice(ubuf, decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
result->heap_allocated = true;
result->cachable = true;
break; break;
case kBZip2Compression: case kBZip2Compression:
ubuf = port::BZip2_Uncompress(data, n, &decompress_size); ubuf = std::unique_ptr<char[]>(port::BZip2_Uncompress(data, n, &decompress_size));
static char bzip2_corrupt_msg[] = static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents"; "Bzip2 not supported or corrupted Bzip2 compressed block contents";
if (!ubuf) { if (!ubuf) {
return Status::Corruption(bzip2_corrupt_msg); return Status::Corruption(bzip2_corrupt_msg);
} }
result->data = Slice(ubuf, decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
result->heap_allocated = true;
result->cachable = true;
break; break;
case kLZ4Compression: case kLZ4Compression:
ubuf = port::LZ4_Uncompress(data, n, &decompress_size); ubuf = std::unique_ptr<char[]>(port::LZ4_Uncompress(data, n, &decompress_size));
static char lz4_corrupt_msg[] = static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents"; "LZ4 not supported or corrupted LZ4 compressed block contents";
if (!ubuf) { if (!ubuf) {
return Status::Corruption(lz4_corrupt_msg); return Status::Corruption(lz4_corrupt_msg);
} }
result->data = Slice(ubuf, decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
result->heap_allocated = true;
result->cachable = true;
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
ubuf = port::LZ4_Uncompress(data, n, &decompress_size); ubuf = std::unique_ptr<char[]>(port::LZ4_Uncompress(data, n, &decompress_size));
static char lz4hc_corrupt_msg[] = static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents"; "LZ4HC not supported or corrupted LZ4HC compressed block contents";
if (!ubuf) { if (!ubuf) {
return Status::Corruption(lz4hc_corrupt_msg); return Status::Corruption(lz4hc_corrupt_msg);
} }
result->data = Slice(ubuf, decompress_size); *contents = BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
result->heap_allocated = true;
result->cachable = true;
break; break;
default: default:
return Status::Corruption("bad block type"); return Status::Corruption("bad block type");
} }
result->compression_type = kNoCompression; // not compressed any more
return Status::OK(); return Status::OK();
} }
......
...@@ -160,28 +160,39 @@ static const size_t kBlockTrailerSize = 5; ...@@ -160,28 +160,39 @@ static const size_t kBlockTrailerSize = 5;
struct BlockContents { struct BlockContents {
Slice data; // Actual contents of data Slice data; // Actual contents of data
bool cachable; // True iff data can be cached bool cachable; // True iff data can be cached
bool heap_allocated; // True iff caller should delete[] data.data()
CompressionType compression_type; CompressionType compression_type;
std::unique_ptr<char[]> allocation;
BlockContents()
: cachable(false),
compression_type(kNoCompression) {}
BlockContents(const Slice &_data, bool _cachable, CompressionType _compression_type)
: data(_data),
cachable(_cachable),
compression_type(_compression_type) {}
BlockContents(std::unique_ptr<char[]> &&_data, size_t _size, bool _cachable, CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type),
allocation(std::move(_data)) {}
}; };
// Read the block identified by "handle" from "file". On failure // Read the block identified by "handle" from "file". On failure
// return non-OK. On success fill *result and return OK. // return non-OK. On success fill *result and return OK.
extern Status ReadBlockContents(RandomAccessFile* file, extern Status ReadBlockContents(RandomAccessFile* file, const Footer& footer,
const Footer& footer,
const ReadOptions& options, const ReadOptions& options,
const BlockHandle& handle, const BlockHandle& handle, BlockContents* contents,
BlockContents* result, Env* env, bool do_uncompress);
Env* env,
bool do_uncompress);
// The 'data' points to the raw block contents read in from file. // The 'data' points to the raw block contents read in from file.
// This method allocates a new heap buffer and the raw block // This method allocates a new heap buffer and the raw block
// contents are uncompresed into this buffer. This buffer is // contents are uncompresed into this buffer. This buffer is
// returned via 'result' and it is upto the caller to // returned via 'result' and it is upto the caller to
// free this buffer. // free this buffer.
extern Status UncompressBlockContents(const char* data, extern Status UncompressBlockContents(const char* data, size_t n,
size_t n, BlockContents* contents);
BlockContents* result);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,
......
...@@ -56,16 +56,21 @@ FullFilterBlockReader::FullFilterBlockReader( ...@@ -56,16 +56,21 @@ FullFilterBlockReader::FullFilterBlockReader(
const SliceTransform* prefix_extractor, const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt, const BlockBasedTableOptions& table_opt,
const Slice& contents, const Slice& contents,
FilterBitsReader* filter_bits_reader, bool delete_contents_after_use) FilterBitsReader* filter_bits_reader)
: prefix_extractor_(prefix_extractor), : prefix_extractor_(prefix_extractor),
whole_key_filtering_(table_opt.whole_key_filtering), whole_key_filtering_(table_opt.whole_key_filtering),
contents_(contents) { contents_(contents) {
assert(filter_bits_reader != nullptr); assert(filter_bits_reader != nullptr);
filter_bits_reader_.reset(filter_bits_reader); filter_bits_reader_.reset(filter_bits_reader);
}
if (delete_contents_after_use) { FullFilterBlockReader::FullFilterBlockReader(
filter_data_.reset(contents.data()); const SliceTransform* prefix_extractor,
} const BlockBasedTableOptions& table_opt,
BlockContents&& contents,
FilterBitsReader* filter_bits_reader)
: FullFilterBlockReader(prefix_extractor, table_opt, contents.data, filter_bits_reader) {
block_contents_ = std::move(contents);
} }
bool FullFilterBlockReader::KeyMayMatch(const Slice& key, bool FullFilterBlockReader::KeyMayMatch(const Slice& key,
......
...@@ -75,8 +75,11 @@ class FullFilterBlockReader : public FilterBlockReader { ...@@ -75,8 +75,11 @@ class FullFilterBlockReader : public FilterBlockReader {
explicit FullFilterBlockReader(const SliceTransform* prefix_extractor, explicit FullFilterBlockReader(const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt, const BlockBasedTableOptions& table_opt,
const Slice& contents, const Slice& contents,
FilterBitsReader* filter_bits_reader, FilterBitsReader* filter_bits_reader);
bool delete_contents_after_use = false); explicit FullFilterBlockReader(const SliceTransform* prefix_extractor,
const BlockBasedTableOptions& table_opt,
BlockContents&& contents,
FilterBitsReader* filter_bits_reader);
// bits_reader is created in filter_policy, it should be passed in here // bits_reader is created in filter_policy, it should be passed in here
// directly. and be deleted here // directly. and be deleted here
...@@ -95,6 +98,7 @@ class FullFilterBlockReader : public FilterBlockReader { ...@@ -95,6 +98,7 @@ class FullFilterBlockReader : public FilterBlockReader {
std::unique_ptr<FilterBitsReader> filter_bits_reader_; std::unique_ptr<FilterBitsReader> filter_bits_reader_;
Slice contents_; Slice contents_;
BlockContents block_contents_;
std::unique_ptr<const char[]> filter_data_; std::unique_ptr<const char[]> filter_data_;
bool MayMatch(const Slice& entry); bool MayMatch(const Slice& entry);
......
...@@ -141,14 +141,15 @@ Status ReadProperties(const Slice &handle_value, RandomAccessFile *file, ...@@ -141,14 +141,15 @@ Status ReadProperties(const Slice &handle_value, RandomAccessFile *file,
BlockContents block_contents; BlockContents block_contents;
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = false; read_options.verify_checksums = false;
Status s = ReadBlockContents(file, footer, read_options, handle, Status s;
&block_contents, env, false); s = ReadBlockContents(file, footer, read_options, handle, &block_contents,
env, false);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
Block properties_block(block_contents); Block properties_block(std::move(block_contents));
std::unique_ptr<Iterator> iter( std::unique_ptr<Iterator> iter(
properties_block.NewIterator(BytewiseComparator())); properties_block.NewIterator(BytewiseComparator()));
...@@ -228,12 +229,12 @@ Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size, ...@@ -228,12 +229,12 @@ Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size,
BlockContents metaindex_contents; BlockContents metaindex_contents;
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = false; read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle, s = ReadBlockContents(file, footer, read_options, metaindex_handle, &metaindex_contents,
&metaindex_contents, env, false); env, false);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
Block metaindex_block(metaindex_contents); Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<Iterator> meta_iter( std::unique_ptr<Iterator> meta_iter(
metaindex_block.NewIterator(BytewiseComparator())); metaindex_block.NewIterator(BytewiseComparator()));
...@@ -287,7 +288,7 @@ Status FindMetaBlock(RandomAccessFile* file, uint64_t file_size, ...@@ -287,7 +288,7 @@ Status FindMetaBlock(RandomAccessFile* file, uint64_t file_size,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
Block metaindex_block(metaindex_contents); Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<Iterator> meta_iter; std::unique_ptr<Iterator> meta_iter;
meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator())); meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator()));
...@@ -299,41 +300,36 @@ Status ReadMetaBlock(RandomAccessFile* file, uint64_t file_size, ...@@ -299,41 +300,36 @@ Status ReadMetaBlock(RandomAccessFile* file, uint64_t file_size,
uint64_t table_magic_number, Env* env, uint64_t table_magic_number, Env* env,
const std::string& meta_block_name, const std::string& meta_block_name,
BlockContents* contents) { BlockContents* contents) {
Status status;
Footer footer(table_magic_number); Footer footer(table_magic_number);
auto s = ReadFooterFromFile(file, file_size, &footer); status = ReadFooterFromFile(file, file_size, &footer);
if (!s.ok()) { if (!status.ok()) return status;
return s;
}
// Reading metaindex block // Reading metaindex block
auto metaindex_handle = footer.metaindex_handle(); auto metaindex_handle = footer.metaindex_handle();
BlockContents metaindex_contents; BlockContents metaindex_contents;
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = false; read_options.verify_checksums = false;
s = ReadBlockContents(file, footer, read_options, metaindex_handle, status = ReadBlockContents(file, footer, read_options, metaindex_handle,
&metaindex_contents, env, false); &metaindex_contents, env, false);
if (!s.ok()) { if (!status.ok()) return status;
return s;
}
// Finding metablock // Finding metablock
Block metaindex_block(metaindex_contents); Block metaindex_block(std::move(metaindex_contents));
std::unique_ptr<Iterator> meta_iter; std::unique_ptr<Iterator> meta_iter;
meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator())); meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator()));
BlockHandle block_handle; BlockHandle block_handle;
s = FindMetaBlock(meta_iter.get(), meta_block_name, &block_handle); status = FindMetaBlock(meta_iter.get(), meta_block_name, &block_handle);
if (!s.ok()) { if (!status.ok()) {
return s; return status;
} }
// Reading metablock // Reading metablock
s = ReadBlockContents(file, footer, read_options, block_handle, contents, env, return ReadBlockContents(file, footer, read_options, block_handle, contents,
false); env, false);
return s;
} }
} // namespace rocksdb } // namespace rocksdb
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "table/block.h" #include "table/block.h"
#include "table/bloom_block.h" #include "table/bloom_block.h"
#include "table/filter_block.h"
#include "table/format.h" #include "table/format.h"
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
......
...@@ -265,8 +265,7 @@ class BlockConstructor: public Constructor { ...@@ -265,8 +265,7 @@ class BlockConstructor: public Constructor {
BlockContents contents; BlockContents contents;
contents.data = data_; contents.data = data_;
contents.cachable = false; contents.cachable = false;
contents.heap_allocated = false; block_ = new Block(std::move(contents));
block_ = new Block(contents);
return Status::OK(); return Status::OK();
} }
virtual Iterator* NewIterator() const { virtual Iterator* NewIterator() const {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册