提交 50439606 编写于 作者: L Levi Tamasi 提交者: Facebook GitHub Bot

Add a blob file builder class that can be used in background jobs (#7306)

Summary:
The patch adds a class called `BlobFileBuilder` that can be used to build
and cut blob files in background jobs (flushes/compactions). The class
enforces a value size threshold (`min_blob_size`; smaller blobs will be inlined
in the LSM tree itself), and supports specifying a blob file size limit (`blob_file_size`),
as well as compression (`blob_compression_type`) and checksums for blob files.
It also keeps track of the generated blob files and their associated `BlobFileAddition`
metadata, which can be applied as part of the background job's `VersionEdit`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7306

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D23298817

Pulled By: ltamasi

fbshipit-source-id: 38f35d81dab1ba81f15236240612ec173d7f21b5
上级 8e0df905
......@@ -546,6 +546,7 @@ set(SOURCES
cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/blob/blob_file_addition.cc
db/blob/blob_file_builder.cc
db/blob/blob_file_garbage.cc
db/blob/blob_file_meta.cc
db/blob/blob_log_format.cc
......@@ -1012,6 +1013,7 @@ if(WITH_TESTS)
cache/cache_test.cc
cache/lru_cache_test.cc
db/blob/blob_file_addition_test.cc
db/blob/blob_file_builder_test.cc
db/blob/blob_file_garbage_test.cc
db/blob/db_blob_index_test.cc
db/column_family_test.cc
......
......@@ -566,6 +566,7 @@ ifdef ASSERT_STATUS_CHECKED
cache_test \
lru_cache_test \
blob_file_addition_test \
blob_file_builder_test \
blob_file_garbage_test \
bloom_test \
cassandra_format_test \
......@@ -1806,6 +1807,9 @@ defer_test: $(OBJ_DIR)/util/defer_test.o $(TEST_LIBRARY) $(LIBRARY)
blob_file_addition_test: $(OBJ_DIR)/db/blob/blob_file_addition_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
blob_file_builder_test: $(OBJ_DIR)/db/blob/blob_file_builder_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
......
......@@ -122,6 +122,7 @@ cpp_library(
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
"db/blob/blob_file_garbage.cc",
"db/blob/blob_file_meta.cc",
"db/blob/blob_log_format.cc",
......@@ -533,6 +534,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"blob_file_builder_test",
"db/blob/blob_file_builder_test.cc",
"serial",
[],
[],
],
[
"blob_file_garbage_test",
"db/blob/blob_file_garbage_test.cc",
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/blob/blob_file_builder.h"
#include <cassert>
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_index.h"
#include "db/blob/blob_log_format.h"
#include "db/blob/blob_log_writer.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
BlobFileBuilder::BlobFileBuilder(
VersionSet* versions, Env* env, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
uint32_t column_family_id, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<BlobFileAddition>* blob_file_additions)
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env,
fs, immutable_cf_options, mutable_cf_options,
file_options, column_family_id, io_priority, write_hint,
blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
std::function<uint64_t()> file_number_generator, Env* env, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
uint32_t column_family_id, Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<BlobFileAddition>* blob_file_additions)
: file_number_generator_(std::move(file_number_generator)),
env_(env),
fs_(fs),
immutable_cf_options_(immutable_cf_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compression_type_(mutable_cf_options->blob_compression_type),
file_options_(file_options),
column_family_id_(column_family_id),
io_priority_(io_priority),
write_hint_(write_hint),
blob_file_additions_(blob_file_additions),
blob_count_(0),
blob_bytes_(0) {
assert(file_number_generator_);
assert(env_);
assert(fs_);
assert(immutable_cf_options_);
assert(file_options_);
assert(blob_file_additions_);
}
BlobFileBuilder::~BlobFileBuilder() = default;
Status BlobFileBuilder::Add(const Slice& key, const Slice& value,
std::string* blob_index) {
assert(blob_index);
assert(blob_index->empty());
if (value.size() < min_blob_size_) {
return Status::OK();
}
{
const Status s = OpenBlobFileIfNeeded();
if (!s.ok()) {
return s;
}
}
Slice blob = value;
std::string compressed_blob;
{
const Status s = CompressBlobIfNeeded(&blob, &compressed_blob);
if (!s.ok()) {
return s;
}
}
uint64_t blob_file_number = 0;
uint64_t blob_offset = 0;
{
const Status s =
WriteBlobToFile(key, blob, &blob_file_number, &blob_offset);
if (!s.ok()) {
return s;
}
}
{
const Status s = CloseBlobFileIfNeeded();
if (!s.ok()) {
return s;
}
}
BlobIndex::EncodeBlob(blob_index, blob_file_number, blob_offset, blob.size(),
blob_compression_type_);
return Status::OK();
}
Status BlobFileBuilder::Finish() {
if (!IsBlobFileOpen()) {
return Status::OK();
}
return CloseBlobFile();
}
bool BlobFileBuilder::IsBlobFileOpen() const { return !!writer_; }
Status BlobFileBuilder::OpenBlobFileIfNeeded() {
if (IsBlobFileOpen()) {
return Status::OK();
}
assert(!blob_count_);
assert(!blob_bytes_);
assert(file_number_generator_);
const uint64_t blob_file_number = file_number_generator_();
assert(immutable_cf_options_);
assert(!immutable_cf_options_->cf_paths.empty());
const std::string blob_file_path = BlobFileName(
immutable_cf_options_->cf_paths.front().path, blob_file_number);
std::unique_ptr<FSWritableFile> file;
{
TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:NewWritableFile");
assert(file_options_);
const Status s =
NewWritableFile(fs_, blob_file_path, &file, *file_options_);
if (!s.ok()) {
return s;
}
}
assert(file);
file->SetIOPriority(io_priority_);
file->SetWriteLifeTimeHint(write_hint_);
Statistics* const statistics = immutable_cf_options_->statistics;
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, *file_options_,
env_, statistics, immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory));
std::unique_ptr<BlobLogWriter> blob_log_writer(
new BlobLogWriter(std::move(file_writer), env_, statistics,
blob_file_number, immutable_cf_options_->use_fsync));
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
BlobLogHeader header(column_family_id_, blob_compression_type_, has_ttl,
expiration_range);
{
TEST_SYNC_POINT("BlobFileBuilder::OpenBlobFileIfNeeded:WriteHeader");
const Status s = blob_log_writer->WriteHeader(header);
if (!s.ok()) {
return s;
}
}
writer_ = std::move(blob_log_writer);
assert(IsBlobFileOpen());
return Status::OK();
}
Status BlobFileBuilder::CompressBlobIfNeeded(
Slice* blob, std::string* compressed_blob) const {
assert(blob);
assert(compressed_blob);
assert(compressed_blob->empty());
if (blob_compression_type_ == kNoCompression) {
return Status::OK();
}
CompressionOptions opts;
CompressionContext context(blob_compression_type_);
constexpr uint64_t sample_for_compression = 0;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
blob_compression_type_, sample_for_compression);
constexpr uint32_t compression_format_version = 2;
if (!CompressData(*blob, info, compression_format_version, compressed_blob)) {
return Status::Corruption("Error compressing blob");
}
*blob = Slice(*compressed_blob);
return Status::OK();
}
Status BlobFileBuilder::WriteBlobToFile(const Slice& key, const Slice& blob,
uint64_t* blob_file_number,
uint64_t* blob_offset) {
assert(IsBlobFileOpen());
assert(blob_file_number);
assert(blob_offset);
uint64_t key_offset = 0;
TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AddRecord");
const Status s = writer_->AddRecord(key, blob, &key_offset, blob_offset);
if (!s.ok()) {
return s;
}
*blob_file_number = writer_->get_log_number();
++blob_count_;
blob_bytes_ += BlobLogRecord::kHeaderSize + key.size() + blob.size();
return Status::OK();
}
Status BlobFileBuilder::CloseBlobFile() {
assert(IsBlobFileOpen());
BlobLogFooter footer;
footer.blob_count = blob_count_;
std::string checksum_method;
std::string checksum_value;
TEST_SYNC_POINT("BlobFileBuilder::WriteBlobToFile:AppendFooter");
const Status s =
writer_->AppendFooter(footer, &checksum_method, &checksum_value);
if (!s.ok()) {
return s;
}
const uint64_t blob_file_number = writer_->get_log_number();
assert(blob_file_additions_);
blob_file_additions_->emplace_back(blob_file_number, blob_count_, blob_bytes_,
std::move(checksum_method),
std::move(checksum_value));
writer_.reset();
blob_count_ = 0;
blob_bytes_ = 0;
return Status::OK();
}
Status BlobFileBuilder::CloseBlobFileIfNeeded() {
assert(IsBlobFileOpen());
const WritableFileWriter* const file_writer = writer_->file();
assert(file_writer);
if (file_writer->GetFileSize() < blob_file_size_) {
return Status::OK();
}
return CloseBlobFile();
}
} // namespace ROCKSDB_NAMESPACE
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cinttypes>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include "rocksdb/compression_type.h"
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
namespace ROCKSDB_NAMESPACE {
class VersionSet;
class FileSystem;
struct ImmutableCFOptions;
struct MutableCFOptions;
struct FileOptions;
class BlobFileAddition;
class Status;
class Slice;
class BlobLogWriter;
class BlobFileBuilder {
public:
BlobFileBuilder(VersionSet* versions, Env* env, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options,
const FileOptions* file_options, uint32_t column_family_id,
Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<BlobFileAddition>* blob_file_additions);
BlobFileBuilder(std::function<uint64_t()> file_number_generator, Env* env,
FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options,
const FileOptions* file_options, uint32_t column_family_id,
Env::IOPriority io_priority,
Env::WriteLifeTimeHint write_hint,
std::vector<BlobFileAddition>* blob_file_additions);
BlobFileBuilder(const BlobFileBuilder&) = delete;
BlobFileBuilder& operator=(const BlobFileBuilder&) = delete;
~BlobFileBuilder();
Status Add(const Slice& key, const Slice& value, std::string* blob_index);
Status Finish();
private:
bool IsBlobFileOpen() const;
Status OpenBlobFileIfNeeded();
Status CompressBlobIfNeeded(Slice* blob, std::string* compressed_blob) const;
Status WriteBlobToFile(const Slice& key, const Slice& blob,
uint64_t* blob_file_number, uint64_t* blob_offset);
Status CloseBlobFile();
Status CloseBlobFileIfNeeded();
std::function<uint64_t()> file_number_generator_;
Env* env_;
FileSystem* fs_;
const ImmutableCFOptions* immutable_cf_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;
CompressionType blob_compression_type_;
const FileOptions* file_options_;
uint32_t column_family_id_;
Env::IOPriority io_priority_;
Env::WriteLifeTimeHint write_hint_;
std::vector<BlobFileAddition>* blob_file_additions_;
std::unique_ptr<BlobLogWriter> writer_;
uint64_t blob_count_;
uint64_t blob_bytes_;
};
} // namespace ROCKSDB_NAMESPACE
此差异已折叠。
......@@ -24,6 +24,8 @@ BlobLogReader::BlobLogReader(
next_byte_(0) {}
Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
assert(file_);
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
Status s = file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size),
slice, buf, nullptr);
......@@ -39,8 +41,11 @@ Status BlobLogReader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
}
Status BlobLogReader::ReadHeader(BlobLogHeader* header) {
assert(file_.get() != nullptr);
assert(next_byte_ == 0);
static_assert(BlobLogHeader::kSize <= sizeof(header_buf_),
"Buffer is smaller than BlobLogHeader::kSize");
Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_);
if (!s.ok()) {
return s;
......@@ -55,6 +60,9 @@ Status BlobLogReader::ReadHeader(BlobLogHeader* header) {
Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level,
uint64_t* blob_offset) {
static_assert(BlobLogRecord::kHeaderSize <= sizeof(header_buf_),
"Buffer is smaller than BlobLogRecord::kHeaderSize");
Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_);
if (!s.ok()) {
return s;
......@@ -100,4 +108,20 @@ Status BlobLogReader::ReadRecord(BlobLogRecord* record, ReadLevel level,
return s;
}
Status BlobLogReader::ReadFooter(BlobLogFooter* footer) {
static_assert(BlobLogFooter::kSize <= sizeof(header_buf_),
"Buffer is smaller than BlobLogFooter::kSize");
Status s = ReadSlice(BlobLogFooter::kSize, &buffer_, header_buf_);
if (!s.ok()) {
return s;
}
if (buffer_.size() != BlobLogFooter::kSize) {
return Status::Corruption("EOF reached before file footer");
}
return footer->DecodeFrom(buffer_);
}
} // namespace ROCKSDB_NAMESPACE
......@@ -56,6 +56,8 @@ class BlobLogReader {
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader,
uint64_t* blob_offset = nullptr);
Status ReadFooter(BlobLogFooter* footer);
void ResetNextByte() { next_byte_ = 0; }
uint64_t GetNextByte() const { return next_byte_; }
......
......@@ -29,6 +29,8 @@ BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
use_fsync_(use_fs),
last_elem_type_(kEtNone) {}
BlobLogWriter::~BlobLogWriter() = default;
Status BlobLogWriter::Sync() {
TEST_SYNC_POINT("BlobLogWriter::Sync");
......@@ -55,7 +57,9 @@ Status BlobLogWriter::WriteHeader(BlobLogHeader& header) {
return s;
}
Status BlobLogWriter::AppendFooter(BlobLogFooter& footer) {
Status BlobLogWriter::AppendFooter(BlobLogFooter& footer,
std::string* checksum_method,
std::string* checksum_value) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
......@@ -65,10 +69,34 @@ Status BlobLogWriter::AppendFooter(BlobLogFooter& footer) {
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
s = Sync();
if (s.ok()) {
s = dest_->Close();
if (s.ok()) {
assert(!!checksum_method == !!checksum_value);
if (checksum_method) {
assert(checksum_method->empty());
std::string method = dest_->GetFileChecksumFuncName();
if (method != kUnknownFileChecksumFuncName) {
*checksum_method = std::move(method);
}
}
if (checksum_value) {
assert(checksum_value->empty());
std::string value = dest_->GetFileChecksum();
if (value != kUnknownFileChecksum) {
*checksum_value = std::move(value);
}
}
}
}
dest_.reset();
}
......
......@@ -39,7 +39,7 @@ class BlobLogWriter {
BlobLogWriter(const BlobLogWriter&) = delete;
BlobLogWriter& operator=(const BlobLogWriter&) = delete;
~BlobLogWriter() = default;
~BlobLogWriter();
static void ConstructBlobHeader(std::string* buf, const Slice& key,
const Slice& val, uint64_t expiration);
......@@ -54,7 +54,8 @@ class BlobLogWriter {
const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset);
Status AppendFooter(BlobLogFooter& footer);
Status AppendFooter(BlobLogFooter& footer, std::string* checksum_method,
std::string* checksum_value);
Status WriteHeader(BlobLogHeader& header);
......
......@@ -6,6 +6,7 @@ LIB_SOURCES = \
cache/sharded_cache.cc \
db/arena_wrapped_db_iter.cc \
db/blob/blob_file_addition.cc \
db/blob/blob_file_builder.cc \
db/blob/blob_file_garbage.cc \
db/blob/blob_file_meta.cc \
db/blob/blob_log_format.cc \
......@@ -340,6 +341,7 @@ TEST_MAIN_SOURCES = \
cache/cache_test.cc \
cache/lru_cache_test.cc \
db/blob/blob_file_addition_test.cc \
db/blob/blob_file_builder_test.cc \
db/blob/blob_file_garbage_test.cc \
db/blob/db_blob_index_test.cc \
db/column_family_test.cc \
......
......@@ -111,7 +111,8 @@ Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) {
}
// this will close the file and reset the Writable File Pointer.
Status s = log_writer_->AppendFooter(footer);
Status s = log_writer_->AppendFooter(footer, /* checksum_method */ nullptr,
/* checksum_value */ nullptr);
if (s.ok()) {
closed_ = true;
immutable_sequence_ = sequence;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册