提交 9aa3b6f9 编写于 作者: C Changyu Bi 提交者: Facebook GitHub Bot

Support range deletion tombstones in `CreateColumnFamilyWithImport` (#11252)

Summary:
CreateColumnFamilyWithImport() did not support range tombstones for two reasons:
1. it uses point keys of a input file to determine its boundary (smallest and largest internal key), which means range tombstones outside of the point key range will be effectively dropped.
2. it does not handle files with no point keys.

Also included a fix in external_sst_file_ingestion_job.cc where the blocks read in `GetIngestedFileInfo()` can be added to block cache now (issue fixed in https://github.com/facebook/rocksdb/pull/6429).

This PR adds support for exporting and importing column family with range tombstones. The main change is to add smallest internal key and largest internal key to `SstFileMetaData` that will be part of the output of `ExportColumnFamily()`. Then during `CreateColumnFamilyWithImport(...,const ExportImportFilesMetaData& metadata,...)`, file boundaries can be set from `metadata` directly. This is needed since when file boundaries are extended by range tombstones, sometimes they cannot be deduced from a file's content alone.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11252

Test Plan:
- added unit tests that fails before this change

Closes https://github.com/facebook/rocksdb/issues/11245

Reviewed By: ajkr

Differential Revision: D43577443

Pulled By: cbi42

fbshipit-source-id: 6bff78e583cc50c44854994dea0a8dd519398f2f
上级 fbd603d0
......@@ -6,6 +6,7 @@
### Bug Fixes
* Fixed an issue for backward iteration when user defined timestamp is enabled in combination with BlobDB.
* Fixed a couple of cases where a Merge operand encountered during iteration wasn't reflected in the `internal_merge_count` PerfContext counter.
* Fixed a bug in CreateColumnFamilyWithImport()/ExportColumnFamily() which did not support range tombstones (#11252).
### New Features
* Add statistics rocksdb.secondary.cache.filter.hits, rocksdb.secondary.cache.index.hits, and rocksdb.secondary.cache.filter.hits
......
......@@ -746,12 +746,6 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ParsedInternalKey key;
ReadOptions ro;
// During reading the external file we can cache blocks that we read into
// the block cache, if we later change the global seqno of this file, we will
// have block in cache that will include keys with wrong seqno.
// We need to disable fill_cache so that we read from the file without
// updating the block cache.
ro.fill_cache = false;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
......
......@@ -34,8 +34,8 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
for (const auto& file_metadata : metadata_) {
const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
IngestedFileInfo file_to_import;
status =
GetIngestedFileInfo(file_path, next_file_number++, &file_to_import, sv);
status = GetIngestedFileInfo(file_path, next_file_number++, sv,
file_metadata, &file_to_import);
if (!status.ok()) {
return status;
}
......@@ -212,16 +212,20 @@ void ImportColumnFamilyJob::Cleanup(const Status& status) {
Status ImportColumnFamilyJob::GetIngestedFileInfo(
const std::string& external_file, uint64_t new_file_number,
IngestedFileInfo* file_to_import, SuperVersion* sv) {
SuperVersion* sv, const LiveFileMetaData& file_meta,
IngestedFileInfo* file_to_import) {
file_to_import->external_file_path = external_file;
// Get external file size
Status status = fs_->GetFileSize(external_file, IOOptions(),
&file_to_import->file_size, nullptr);
if (!status.ok()) {
return status;
Status status;
if (file_meta.size > 0) {
file_to_import->file_size = file_meta.size;
} else {
// Get external file size
status = fs_->GetFileSize(external_file, IOOptions(),
&file_to_import->file_size, nullptr);
if (!status.ok()) {
return status;
}
}
// Assign FD with number
file_to_import->fd =
FileDescriptor(new_file_number, 0, file_to_import->file_size);
......@@ -262,37 +266,61 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
// Get number of entries in table
file_to_import->num_entries = props->num_entries;
ParsedInternalKey key;
ReadOptions ro;
// During reading the external file we can cache blocks that we read into
// the block cache, if we later change the global seqno of this file, we will
// have block in cache that will include keys with wrong seqno.
// We need to disable fill_cache so that we read from the file without
// updating the block cache.
ro.fill_cache = false;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
// Get first (smallest) key from file
iter->SeekToFirst();
Status pik_status =
ParseInternalKey(iter->key(), &key, db_options_.allow_data_in_errors);
if (!pik_status.ok()) {
return Status::Corruption("Corrupted Key in external file. ",
pik_status.getState());
}
file_to_import->smallest_internal_key.SetFrom(key);
// Get last (largest) key from file
iter->SeekToLast();
pik_status =
ParseInternalKey(iter->key(), &key, db_options_.allow_data_in_errors);
if (!pik_status.ok()) {
return Status::Corruption("Corrupted Key in external file. ",
pik_status.getState());
// If the importing files were exported with Checkpoint::ExportColumnFamily(),
// we cannot simply recompute smallest and largest used to truncate range
// tombstones from file content, and we expect smallest and largest populated
// in file_meta.
if (file_meta.smallest.empty()) {
assert(file_meta.largest.empty());
ReadOptions ro;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
// Get first (smallest) key from file
iter->SeekToFirst();
bool bound_set = false;
if (iter->Valid()) {
file_to_import->smallest_internal_key.DecodeFrom(iter->key());
iter->SeekToLast();
file_to_import->largest_internal_key.DecodeFrom(iter->key());
bound_set = true;
}
std::unique_ptr<InternalIterator> range_del_iter{
table_reader->NewRangeTombstoneIterator(ro)};
if (range_del_iter != nullptr) {
range_del_iter->SeekToFirst();
if (range_del_iter->Valid()) {
ParsedInternalKey key;
Status pik_status = ParseInternalKey(range_del_iter->key(), &key,
db_options_.allow_data_in_errors);
if (!pik_status.ok()) {
return Status::Corruption("Corrupted key in external file. ",
pik_status.getState());
}
RangeTombstone tombstone(key, range_del_iter->value());
InternalKey start_key = tombstone.SerializeKey();
const InternalKeyComparator* icmp = &cfd_->internal_comparator();
if (!bound_set ||
icmp->Compare(start_key, file_to_import->smallest_internal_key) <
0) {
file_to_import->smallest_internal_key = start_key;
}
InternalKey end_key = tombstone.SerializeEndKey();
if (!bound_set ||
icmp->Compare(end_key, file_to_import->largest_internal_key) > 0) {
file_to_import->largest_internal_key = end_key;
}
bound_set = true;
}
}
assert(bound_set);
} else {
assert(!file_meta.largest.empty());
file_to_import->smallest_internal_key.DecodeFrom(file_meta.smallest);
file_to_import->largest_internal_key.DecodeFrom(file_meta.largest);
}
file_to_import->largest_internal_key.SetFrom(key);
file_to_import->cf_id = static_cast<uint32_t>(props->column_family_id);
......
......@@ -62,9 +62,9 @@ class ImportColumnFamilyJob {
// Open the external file and populate `file_to_import` with all the
// external information we need to import this file.
Status GetIngestedFileInfo(const std::string& external_file,
uint64_t new_file_number,
IngestedFileInfo* file_to_import,
SuperVersion* sv);
uint64_t new_file_number, SuperVersion* sv,
const LiveFileMetaData& file_meta,
IngestedFileInfo* file_to_import);
SystemClock* clock_;
VersionSet* versions_;
......
......@@ -280,6 +280,57 @@ TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithOverlap) {
}
}
TEST_F(ImportColumnFamilyTest, ImportSSTFileWriterFilesWithRangeTombstone) {
// Test for a bug where import file's smallest and largest key did not
// consider range tombstone.
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
SstFileWriter sfw_cf1(EnvOptions(), options, handles_[1]);
// cf1.sst
const std::string cf1_sst_name = "cf1.sst";
const std::string cf1_sst = sst_files_dir_ + cf1_sst_name;
ASSERT_OK(sfw_cf1.Open(cf1_sst));
ASSERT_OK(sfw_cf1.Put("K1", "V1"));
ASSERT_OK(sfw_cf1.Put("K2", "V2"));
ASSERT_OK(sfw_cf1.DeleteRange("K3", "K4"));
ASSERT_OK(sfw_cf1.Finish());
// Import sst file corresponding to cf1 onto a new cf and verify
ExportImportFilesMetaData metadata;
metadata.files.push_back(
LiveFileMetaDataInit(cf1_sst_name, sst_files_dir_, 0, 0, 19));
metadata.db_comparator_name = options.comparator->Name();
ASSERT_OK(db_->CreateColumnFamilyWithImport(
options, "toto", ImportColumnFamilyOptions(), metadata, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
ColumnFamilyMetaData import_cf_meta;
db_->GetColumnFamilyMetaData(import_cfh_, &import_cf_meta);
ASSERT_EQ(import_cf_meta.file_count, 1);
const SstFileMetaData* file_meta = nullptr;
for (const auto& level_meta : import_cf_meta.levels) {
if (!level_meta.files.empty()) {
file_meta = &(level_meta.files[0]);
break;
}
}
ASSERT_TRUE(file_meta != nullptr);
InternalKey largest;
largest.DecodeFrom(file_meta->largest);
ASSERT_EQ(largest.user_key(), "K4");
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K1", &value));
ASSERT_EQ(value, "V1");
ASSERT_OK(db_->Get(ReadOptions(), import_cfh_, "K2", &value));
ASSERT_EQ(value, "V2");
ASSERT_OK(db_->DropColumnFamily(import_cfh_));
ASSERT_OK(db_->DestroyColumnFamilyHandle(import_cfh_));
import_cfh_ = nullptr;
}
TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherCF) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"koko"}, options);
......@@ -444,6 +495,70 @@ TEST_F(ImportColumnFamilyTest, ImportExportedSSTFromAnotherDB) {
ASSERT_OK(DestroyDir(env_, dbname_ + "/db_copy"));
}
TEST_F(ImportColumnFamilyTest,
ImportExportedSSTFromAnotherCFWithRangeTombstone) {
// Test for a bug where import file's smallest and largest key did not
// consider range tombstone.
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"koko"}, options);
for (int i = 10; i < 20; ++i) {
ASSERT_OK(Put(1, Key(i), Key(i) + "_val"));
}
ASSERT_OK(Flush(1 /* cf */));
MoveFilesToLevel(1 /* level */, 1 /* cf */);
const Snapshot* snapshot = db_->GetSnapshot();
ASSERT_OK(db_->DeleteRange(WriteOptions(), handles_[1], Key(0), Key(25)));
ASSERT_OK(Put(1, Key(1), "t"));
ASSERT_OK(Flush(1));
// Tests importing a range tombstone only file
ASSERT_OK(db_->DeleteRange(WriteOptions(), handles_[1], Key(0), Key(2)));
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->ExportColumnFamily(handles_[1], export_files_dir_,
&metadata_ptr_));
ASSERT_NE(metadata_ptr_, nullptr);
delete checkpoint;
ImportColumnFamilyOptions import_options;
import_options.move_files = false;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "toto", import_options,
*metadata_ptr_, &import_cfh_));
ASSERT_NE(import_cfh_, nullptr);
import_options.move_files = true;
ASSERT_OK(db_->CreateColumnFamilyWithImport(options, "yoyo", import_options,
*metadata_ptr_, &import_cfh2_));
ASSERT_NE(import_cfh2_, nullptr);
delete metadata_ptr_;
metadata_ptr_ = nullptr;
std::string value1, value2;
ReadOptions ro_latest;
ReadOptions ro_snapshot;
ro_snapshot.snapshot = snapshot;
for (int i = 10; i < 20; ++i) {
ASSERT_TRUE(db_->Get(ro_latest, import_cfh_, Key(i), &value1).IsNotFound());
ASSERT_OK(db_->Get(ro_snapshot, import_cfh_, Key(i), &value1));
ASSERT_EQ(Get(1, Key(i), snapshot), value1);
}
ASSERT_TRUE(db_->Get(ro_latest, import_cfh_, Key(1), &value1).IsNotFound());
for (int i = 10; i < 20; ++i) {
ASSERT_TRUE(
db_->Get(ro_latest, import_cfh2_, Key(i), &value1).IsNotFound());
ASSERT_OK(db_->Get(ro_snapshot, import_cfh2_, Key(i), &value2));
ASSERT_EQ(Get(1, Key(i), snapshot), value2);
}
ASSERT_TRUE(db_->Get(ro_latest, import_cfh2_, Key(1), &value1).IsNotFound());
db_->ReleaseSnapshot(snapshot);
}
TEST_F(ImportColumnFamilyTest, LevelFilesOverlappingAtEndpoints) {
// Imports a column family containing a level where two files overlap at their
// endpoints. "Overlap" means the largest user key in one file is the same as
......
......@@ -1776,6 +1776,8 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
file->file_checksum, file->file_checksum_func_name);
files.back().num_entries = file->num_entries;
files.back().num_deletions = file->num_deletions;
files.back().smallest = file->smallest.Encode().ToString();
files.back().largest = file->largest.Encode().ToString();
level_size += file->fd.GetFileSize();
}
cf_meta->levels.emplace_back(level, level_size, std::move(files));
......
......@@ -1717,11 +1717,12 @@ class DB {
const std::vector<IngestExternalFileArg>& args) = 0;
// CreateColumnFamilyWithImport() will create a new column family with
// column_family_name and import external SST files specified in metadata into
// this column family.
// column_family_name and import external SST files specified in `metadata`
// into this column family.
// (1) External SST files can be created using SstFileWriter.
// (2) External SST files can be exported from a particular column family in
// an existing DB using Checkpoint::ExportColumnFamily.
// an existing DB using Checkpoint::ExportColumnFamily. `metadata` should
// be the output from Checkpoint::ExportColumnFamily.
// Option in import_options specifies whether the external files are copied or
// moved (default is copy). When option specifies copy, managing files at
// external_file_path is caller's responsibility. When option specifies a
......
......@@ -148,6 +148,13 @@ struct SstFileMetaData : public FileStorageInfo {
// For L0, larger `epoch_number` indicates newer L0 file.
// 0 if the information is not available.
uint64_t epoch_number = 0;
// These bounds define the effective key range for range tombstones
// in this file.
// Currently only used by CreateColumnFamilyWithImport().
std::string smallest{}; // Smallest internal key served by table
std::string largest{}; // Largest internal key served by table
// DEPRECATED: The name of the file within its directory with a
// leading slash (e.g. "/123456.sst"). Use relative_filename from base struct
// instead.
......
......@@ -372,17 +372,19 @@ Status CheckpointImpl::ExportColumnFamily(
for (const auto& file_metadata : level_metadata.files) {
LiveFileMetaData live_file_metadata;
live_file_metadata.size = file_metadata.size;
live_file_metadata.name = std::move(file_metadata.name);
live_file_metadata.name = file_metadata.name;
live_file_metadata.file_number = file_metadata.file_number;
live_file_metadata.db_path = export_dir;
live_file_metadata.smallest_seqno = file_metadata.smallest_seqno;
live_file_metadata.largest_seqno = file_metadata.largest_seqno;
live_file_metadata.smallestkey = std::move(file_metadata.smallestkey);
live_file_metadata.largestkey = std::move(file_metadata.largestkey);
live_file_metadata.smallestkey = file_metadata.smallestkey;
live_file_metadata.largestkey = file_metadata.largestkey;
live_file_metadata.oldest_blob_file_number =
file_metadata.oldest_blob_file_number;
live_file_metadata.epoch_number = file_metadata.epoch_number;
live_file_metadata.level = level_metadata.level;
live_file_metadata.smallest = file_metadata.smallest;
live_file_metadata.largest = file_metadata.largest;
result_metadata->files.push_back(live_file_metadata);
}
*metadata = result_metadata;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册