提交 f9f4d40f 编写于 作者: A Anand Ananthabhotla 提交者: Facebook Github Bot

Align SST file data blocks to avoid spanning multiple pages

Summary:
Provide a block_align option in BlockBasedTableOptions to allow
alignment of SST file data blocks. This will avoid higher
IOPS/throughput load due to < 4KB data blocks spanning 2 4KB pages.
When this option is set to true, the block alignment is set to lower of
block size and 4KB.
Closes https://github.com/facebook/rocksdb/pull/3502

Differential Revision: D7400897

Pulled By: anand1976

fbshipit-source-id: 04cc3bd144e88e3431a4f97604e63ad7a0f06d44
上级 0999e9b7
# Rocksdb Change Log
## Unreleased
### Public API Change
* Add a BlockBasedTableOption to align uncompressed data blocks on the smaller of block size or page size boundary, to reduce flash reads by avoiding reads spanning 4K pages.
### New Features
......
......@@ -222,6 +222,9 @@ struct BlockBasedTableOptions {
// false will avoid the overhead of decompression if index blocks are evicted
// and read back
bool enable_index_compression = true;
// Align data blocks on lesser of page size and block size
bool block_align = false;
};
// Table Properties that are specific to block-based table properties.
......
......@@ -151,7 +151,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"format_version=1;"
"hash_index_allow_collision=false;"
"verify_compression=true;read_amp_bytes_per_bit=0;"
"enable_index_compression=false",
"enable_index_compression=false;"
"block_align=true",
new_bbto));
ASSERT_EQ(unset_bytes_base,
......
......@@ -249,6 +249,7 @@ struct BlockBasedTableBuilder::Rep {
WritableFileWriter* file;
uint64_t offset = 0;
Status status;
size_t alignment;
BlockBuilder data_block;
BlockBuilder range_del_block;
......@@ -294,6 +295,9 @@ struct BlockBasedTableBuilder::Rep {
table_options(table_opt),
internal_comparator(icomparator),
file(f),
alignment(table_options.block_align
? std::min(table_options.block_size, kDefaultPageSize)
: 0),
data_block(table_options.block_restart_interval,
table_options.use_delta_encoding),
range_del_block(1), // TODO(andrewkr): restart_interval unnecessary
......@@ -537,13 +541,14 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
}
WriteRawBlock(block_contents, type, handle);
WriteRawBlock(block_contents, type, handle, is_data_block);
r->compressed_output.clear();
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type,
BlockHandle* handle) {
BlockHandle* handle,
bool is_data_block) {
Rep* r = rep_;
StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
handle->set_offset(r->offset);
......@@ -581,6 +586,16 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
}
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
if (r->table_options.block_align && is_data_block) {
size_t pad_bytes =
(r->alignment - ((block_contents.size() + kBlockTrailerSize) &
(r->alignment - 1))) &
(r->alignment - 1);
r->status = r->file->Pad(pad_bytes);
if (r->status.ok()) {
r->offset += pad_bytes;
}
}
}
}
}
......
......@@ -96,7 +96,8 @@ class BlockBasedTableBuilder : public TableBuilder {
void WriteBlock(const Slice& block_contents, BlockHandle* handle,
bool is_data_block);
// Directly write data to the file.
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle,
bool is_data_block = false);
Status InsertBlockInCache(const Slice& block_contents,
const CompressionType type,
const BlockHandle* handle);
......
......@@ -114,6 +114,15 @@ Status BlockBasedTableFactory::SanitizeOptions(
"Unsupported BlockBasedTable format_version. Please check "
"include/rocksdb/table.h for more info");
}
if (table_options_.block_align && (cf_opts.compression != kNoCompression)) {
return Status::InvalidArgument("Enable block_align, but compression "
"enabled");
}
if (table_options_.block_align &&
(table_options_.block_size & (table_options_.block_size - 1))) {
return Status::InvalidArgument(
"Block alignment requested but block size is not a power of 2");
}
return Status::OK();
}
......@@ -225,6 +234,9 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
snprintf(buffer, kBufferSize, " enable_index_compression: %d\n",
table_options_.enable_index_compression);
ret.append(buffer);
snprintf(buffer, kBufferSize, " block_align: %d\n",
table_options_.block_align);
ret.append(buffer);
return ret;
}
......
......@@ -155,6 +155,9 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionType::kSizeT, OptionVerificationType::kNormal, false, 0}},
{"enable_index_compression",
{offsetof(struct BlockBasedTableOptions, enable_index_compression),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
{"block_align",
{offsetof(struct BlockBasedTableOptions, block_align),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}};
#endif // !ROCKSDB_LITE
} // namespace rocksdb
......@@ -3,10 +3,11 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/options.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "table/block_builder.h"
#include "table/format.h"
#include <cassert>
......@@ -21,10 +22,12 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy {
// reaches the configured
FlushBlockBySizePolicy(const uint64_t block_size,
const uint64_t block_size_deviation,
const bool align,
const BlockBuilder& data_block_builder)
: block_size_(block_size),
block_size_deviation_limit_(
((block_size * (100 - block_size_deviation)) + 99) / 100),
align_(align),
data_block_builder_(data_block_builder) {}
virtual bool Update(const Slice& key,
......@@ -51,8 +54,13 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy {
}
const auto curr_size = data_block_builder_.CurrentSizeEstimate();
const auto estimated_size_after =
data_block_builder_.EstimateSizeAfterKV(key, value);
auto estimated_size_after =
data_block_builder_.EstimateSizeAfterKV(key, value);
if (align_) {
estimated_size_after += kBlockTrailerSize;
return estimated_size_after > block_size_;
}
return estimated_size_after > block_size_ &&
curr_size > block_size_deviation_limit_;
......@@ -60,6 +68,7 @@ class FlushBlockBySizePolicy : public FlushBlockPolicy {
const uint64_t block_size_;
const uint64_t block_size_deviation_limit_;
const bool align_;
const BlockBuilder& data_block_builder_;
};
......@@ -68,13 +77,13 @@ FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
const BlockBuilder& data_block_builder) const {
return new FlushBlockBySizePolicy(
table_options.block_size, table_options.block_size_deviation,
data_block_builder);
table_options.block_align, data_block_builder);
}
FlushBlockPolicy* FlushBlockBySizePolicyFactory::NewFlushBlockPolicy(
const uint64_t size, const int deviation,
const BlockBuilder& data_block_builder) {
return new FlushBlockBySizePolicy(size, deviation, data_block_builder);
return new FlushBlockBySizePolicy(size, deviation, false, data_block_builder);
}
} // namespace rocksdb
......@@ -3197,6 +3197,110 @@ TEST_F(BlockBasedTableTest, TableWithGlobalSeqno) {
delete iter;
}
TEST_F(BlockBasedTableTest, BlockAlignTest) {
BlockBasedTableOptions bbto;
bbto.block_align = true;
test::StringSink* sink = new test::StringSink();
unique_ptr<WritableFileWriter> file_writer(test::GetWritableFileWriter(sink));
Options options;
options.compression = kNoCompression;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const ImmutableCFOptions ioptions(options);
InternalKeyComparator ikc(options.comparator);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
std::string column_family_name;
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, CompressionOptions(),
nullptr /* compression_dict */,
false /* skip_filters */, column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
for (int i = 1; i <= 10000; ++i) {
std::ostringstream ostr;
ostr << std::setfill('0') << std::setw(5) << i;
std::string key = ostr.str();
std::string value = "val";
InternalKey ik(key, 0, kTypeValue);
builder->Add(ik.Encode(), value);
}
ASSERT_OK(builder->Finish());
file_writer->Flush();
test::RandomRWStringSink ss_rw(sink);
unique_ptr<RandomAccessFileReader> file_reader(
test::GetRandomAccessFileReader(
new test::StringSource(ss_rw.contents(), 73342, true)));
// Helper function to get version, global_seqno, global_seqno_offset
std::function<void()> VerifyBlockAlignment = [&]() {
TableProperties* props = nullptr;
ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(),
kBlockBasedTableMagicNumber, ioptions,
&props));
uint64_t data_block_size = props->data_size / props->num_data_blocks;
ASSERT_EQ(data_block_size, 4096);
ASSERT_EQ(props->data_size, data_block_size * props->num_data_blocks);
delete props;
};
VerifyBlockAlignment();
// The below block of code verifies that we can read back the keys. Set
// block_align to false when creating the reader to ensure we can flip between
// the two modes without any issues
std::unique_ptr<TableReader> table_reader;
bbto.block_align = false;
Options options2;
options2.table_factory.reset(NewBlockBasedTableFactory(bbto));
ImmutableCFOptions ioptions2(options2);
ASSERT_OK(ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions2, EnvOptions(),
GetPlainInternalComparator(options2.comparator)),
std::move(file_reader), ss_rw.contents().size(), &table_reader));
std::unique_ptr<InternalIterator> db_iter(
table_reader->NewIterator(ReadOptions()));
int expected_key = 1;
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
std::ostringstream ostr;
ostr << std::setfill('0') << std::setw(5) << expected_key++;
std::string key = ostr.str();
std::string value = "val";
ASSERT_OK(db_iter->status());
ASSERT_EQ(ExtractUserKey(db_iter->key()).ToString(), key);
ASSERT_EQ(db_iter->value().ToString(), value);
}
expected_key--;
ASSERT_EQ(expected_key, 10000);
table_reader.reset();
}
TEST_F(BlockBasedTableTest, BadOptions) {
rocksdb::Options options;
options.compression = kNoCompression;
rocksdb::BlockBasedTableOptions bbto;
bbto.block_size = 4000;
bbto.block_align = true;
const std::string kDBPath = test::TmpDir() + "/table_prefix_test";
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyDB(kDBPath, options);
rocksdb::DB* db;
ASSERT_NOK(rocksdb::DB::Open(options, kDBPath, &db));
bbto.block_size = 4096;
options.compression = kSnappyCompression;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
ASSERT_NOK(rocksdb::DB::Open(options, kDBPath, &db));
}
} // namespace rocksdb
int main(int argc, char** argv) {
......
......@@ -452,6 +452,9 @@ DEFINE_bool(enable_index_compression,
rocksdb::BlockBasedTableOptions().enable_index_compression,
"Compress the index block");
DEFINE_bool(block_align, rocksdb::BlockBasedTableOptions().block_align,
"Align data blocks on page size");
DEFINE_int64(compressed_cache_size, -1,
"Number of bytes to use as a cache of compressed data.");
......@@ -3140,6 +3143,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
block_based_options.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
block_based_options.enable_index_compression =
FLAGS_enable_index_compression;
block_based_options.block_align = FLAGS_block_align;
if (FLAGS_read_cache_path != "") {
#ifndef ROCKSDB_LITE
Status rc_status;
......
......@@ -161,6 +161,12 @@ public:
}
}
void PadWith(size_t pad_size, int padding) {
assert((pad_size + cursize_) <= capacity_);
memset(bufstart_ + cursize_, padding, pad_size);
cursize_ += pad_size;
}
// After a partial flush move the tail to the beginning of the buffer
void RefitTail(size_t tail_offset, size_t tail_size) {
if (tail_size > 0) {
......
......@@ -219,6 +219,31 @@ Status WritableFileWriter::Append(const Slice& data) {
return s;
}
Status WritableFileWriter::Pad(const size_t pad_bytes) {
assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize();
// Assume pad_bytes is small compared to buf_ capacity. So we always
// use buf_ rather than write directly to file in certain cases like
// Append() does.
while (left) {
size_t append_bytes = std::min(cap, left);
buf_.PadWith(append_bytes, 0);
left -= append_bytes;
if (left > 0) {
Status s = Flush();
if (!s.ok()) {
return s;
}
}
cap = buf_.Capacity() - buf_.CurrentSize();
}
pending_sync_ = true;
filesize_ += pad_bytes;
return Status::OK();
}
Status WritableFileWriter::Close() {
// Do not quit immediately on failure the file MUST be closed
......
......@@ -166,6 +166,8 @@ class WritableFileWriter {
Status Append(const Slice& data);
Status Pad(const size_t pad_bytes);
Status Flush();
Status Close();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册