diff --git a/HISTORY.md b/HISTORY.md index ab576e5071adeabf6c8d74c5af21acdfc1cb8f13..c909f0b9ef0ed358858f3192b490e8d182dd12b2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ ### Bug Fixes * Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms. +* Fix SstFileReader not able to open file ingested with write_glbal_seqno=true. ## Unreleased diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index c3bc90f049d33e6ba7cbb0ac04d6221f23df2125..b7953fbd2502e6c868a1a9ff915f7be2ddbb3871 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -727,17 +727,24 @@ Status GetGlobalSequenceNumber(const TableProperties& table_properties, if (seqno_pos != props.end()) { global_seqno = DecodeFixed64(seqno_pos->second.c_str()); } - if (global_seqno != 0 && global_seqno != largest_seqno) { - std::array msg_buf; - snprintf(msg_buf.data(), msg_buf.max_size(), - "An external sst file with version %u have global seqno property " - "with value %s, while largest seqno in the file is %llu", - version, seqno_pos->second.c_str(), - static_cast(largest_seqno)); - return Status::Corruption(msg_buf.data()); + // SstTableReader open table reader with kMaxSequenceNumber as largest_seqno + // to denote it is unknown. + if (largest_seqno < kMaxSequenceNumber) { + if (global_seqno == 0) { + global_seqno = largest_seqno; + } + if (global_seqno != largest_seqno) { + std::array msg_buf; + snprintf( + msg_buf.data(), msg_buf.max_size(), + "An external sst file with version %u have global seqno property " + "with value %s, while largest seqno in the file is %llu", + version, seqno_pos->second.c_str(), + static_cast(largest_seqno)); + return Status::Corruption(msg_buf.data()); + } } - global_seqno = largest_seqno; - *seqno = largest_seqno; + *seqno = global_seqno; if (global_seqno > kMaxSequenceNumber) { std::array msg_buf; @@ -942,6 +949,41 @@ Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len, return s; } +Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, + TableProperties** table_properties) { + assert(table_properties != nullptr); + // If this is an external SST file ingested with write_global_seqno set to + // true, then we expect the checksum mismatch because checksum was written + // by SstFileWriter, but its global seqno in the properties block may have + // been changed during ingestion. In this case, we read the properties + // block, copy it to a memory buffer, change the global seqno to its + // original value, i.e. 0, and verify the checksum again. + BlockHandle props_block_handle; + CacheAllocationPtr tmp_buf; + Status s = ReadProperties(handle_value, rep->file.get(), prefetch_buffer, + rep->footer, rep->ioptions, table_properties, + false /* verify_checksum */, &props_block_handle, + &tmp_buf, false /* compression_type_missing */, + nullptr /* memory_allocator */); + if (s.ok() && tmp_buf) { + const auto seqno_pos_iter = + (*table_properties) + ->properties_offsets.find( + ExternalSstFilePropertyNames::kGlobalSeqno); + size_t block_size = props_block_handle.size(); + if (seqno_pos_iter != (*table_properties)->properties_offsets.end()) { + uint64_t global_seqno_offset = seqno_pos_iter->second; + EncodeFixed64( + tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), 0); + } + uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1); + s = rocksdb::VerifyChecksum(rep->footer.checksum(), tmp_buf.get(), + block_size + 1, value); + } + return s; +} + Status BlockBasedTable::ReadPropertiesBlock( Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, const SequenceNumber largest_seqno) { @@ -965,33 +1007,8 @@ Status BlockBasedTable::ReadPropertiesBlock( } if (s.IsCorruption()) { - // If this is an external SST file ingested with write_global_seqno set to - // true, then we expect the checksum mismatch because checksum was written - // by SstFileWriter, but its global seqno in the properties block may have - // been changed during ingestion. In this case, we read the properties - // block, copy it to a memory buffer, change the global seqno to its - // original value, i.e. 0, and verify the checksum again. - BlockHandle props_block_handle; - CacheAllocationPtr tmp_buf; - s = ReadProperties(meta_iter->value(), rep->file.get(), prefetch_buffer, - rep->footer, rep->ioptions, &table_properties, - false /* verify_checksum */, &props_block_handle, - &tmp_buf, false /* compression_type_missing */, - nullptr /* memory_allocator */); - if (s.ok() && tmp_buf) { - const auto seqno_pos_iter = table_properties->properties_offsets.find( - ExternalSstFilePropertyNames::kGlobalSeqno); - size_t block_size = props_block_handle.size(); - if (seqno_pos_iter != table_properties->properties_offsets.end()) { - uint64_t global_seqno_offset = seqno_pos_iter->second; - EncodeFixed64( - tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), - 0); - } - uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1); - s = rocksdb::VerifyChecksum(rep->footer.checksum(), tmp_buf.get(), - block_size + 1, value); - } + s = TryReadPropertiesWithGlobalSeqno( + rep, prefetch_buffer, meta_iter->value(), &table_properties); } std::unique_ptr props_guard; if (table_properties != nullptr) { @@ -2801,7 +2818,7 @@ Status BlockBasedTable::VerifyChecksum() { std::unique_ptr meta_iter; s = ReadMetaBlock(rep_, nullptr /* prefetch buffer */, &meta, &meta_iter); if (s.ok()) { - s = VerifyChecksumInBlocks(meta_iter.get()); + s = VerifyChecksumInMetaBlocks(meta_iter.get()); if (!s.ok()) { return s; } @@ -2848,7 +2865,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks( return s; } -Status BlockBasedTable::VerifyChecksumInBlocks( +Status BlockBasedTable::VerifyChecksumInMetaBlocks( InternalIteratorBase* index_iter) { Status s; for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) { @@ -2866,6 +2883,13 @@ Status BlockBasedTable::VerifyChecksumInBlocks( false /* decompress */, false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); + if (s.IsCorruption() && index_iter->key() == kPropertiesBlock) { + TableProperties* table_properties; + s = TryReadPropertiesWithGlobalSeqno(rep_, nullptr /* prefetch_buffer */, + index_iter->value(), + &table_properties); + delete table_properties; + } if (!s.ok()) { break; } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 510ca31635cac1978484a6401283a9d393751e80..8ccb42bbb70dafcd023a8ded1731142fa4ca4424 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -364,6 +364,9 @@ class BlockBasedTable : public TableReader { static Status ReadMetaBlock(Rep* rep, FilePrefetchBuffer* prefetch_buffer, std::unique_ptr* meta_block, std::unique_ptr* iter); + static Status TryReadPropertiesWithGlobalSeqno( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, + TableProperties** table_properties); static Status ReadPropertiesBlock(Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, @@ -382,7 +385,7 @@ class BlockBasedTable : public TableReader { const BlockBasedTableOptions& table_options, const int level, const bool prefetch_index_and_filter_in_cache); - Status VerifyChecksumInBlocks(InternalIteratorBase* index_iter); + Status VerifyChecksumInMetaBlocks(InternalIteratorBase* index_iter); Status VerifyChecksumInBlocks(InternalIteratorBase* index_iter); // Create the filter from the filter block. diff --git a/table/sst_file_reader.cc b/table/sst_file_reader.cc index e8b7173e7d9dab06034c8f89f7304e7a20bc6baa..54408bb50e9612dc50f2a44ab7de8ac820f7561a 100644 --- a/table/sst_file_reader.cc +++ b/table/sst_file_reader.cc @@ -8,6 +8,7 @@ #include "rocksdb/sst_file_reader.h" #include "db/db_iter.h" +#include "db/dbformat.h" #include "options/cf_options.h" #include "table/get_context.h" #include "table/table_builder.h" @@ -49,10 +50,12 @@ Status SstFileReader::Open(const std::string& file_path) { file_reader.reset(new RandomAccessFileReader(std::move(file), file_path)); } if (s.ok()) { - s = r->options.table_factory->NewTableReader( - TableReaderOptions(r->ioptions, r->moptions.prefix_extractor.get(), - r->soptions, r->ioptions.internal_comparator), - std::move(file_reader), file_size, &r->table_reader); + TableReaderOptions t_opt(r->ioptions, r->moptions.prefix_extractor.get(), + r->soptions, r->ioptions.internal_comparator); + // Allow open file with global sequence number for backward compatibility. + t_opt.largest_seqno = kMaxSequenceNumber; + s = r->options.table_factory->NewTableReader(t_opt, std::move(file_reader), + file_size, &r->table_reader); } return s; } diff --git a/table/sst_file_reader_test.cc b/table/sst_file_reader_test.cc index 3a95584fc0ee6ec8c05d8ea3479dc5a5cfc73368..51bc975af0025bdf2c133a50abd5f4f8d04afa5f 100644 --- a/table/sst_file_reader_test.cc +++ b/table/sst_file_reader_test.cc @@ -7,8 +7,10 @@ #include +#include "rocksdb/db.h" #include "rocksdb/sst_file_reader.h" #include "rocksdb/sst_file_writer.h" +#include "table/sst_file_writer_collectors.h" #include "util/testharness.h" #include "util/testutil.h" #include "utilities/merge_operators.h" @@ -34,19 +36,29 @@ class SstFileReaderTest : public testing::Test { sst_name_ = test::PerThreadDBPath("sst_file"); } - void CreateFileAndCheck(const std::vector& keys) { + ~SstFileReaderTest() { + Status s = Env::Default()->DeleteFile(sst_name_); + assert(s.ok()); + } + + void CreateFile(const std::string& file_name, + const std::vector& keys) { SstFileWriter writer(soptions_, options_); - ASSERT_OK(writer.Open(sst_name_)); + ASSERT_OK(writer.Open(file_name)); for (size_t i = 0; i + 2 < keys.size(); i += 3) { ASSERT_OK(writer.Put(keys[i], keys[i])); ASSERT_OK(writer.Merge(keys[i + 1], EncodeAsUint64(i + 1))); ASSERT_OK(writer.Delete(keys[i + 2])); } ASSERT_OK(writer.Finish()); + } + void CheckFile(const std::string& file_name, + const std::vector& keys, + bool check_global_seqno = false) { ReadOptions ropts; SstFileReader reader(options_); - ASSERT_OK(reader.Open(sst_name_)); + ASSERT_OK(reader.Open(file_name)); ASSERT_OK(reader.VerifyChecksum()); std::unique_ptr iter(reader.NewIterator(ropts)); iter->SeekToFirst(); @@ -61,6 +73,18 @@ class SstFileReaderTest : public testing::Test { iter->Next(); } ASSERT_FALSE(iter->Valid()); + if (check_global_seqno) { + auto properties = reader.GetTableProperties(); + ASSERT_TRUE(properties); + auto& user_properties = properties->user_collected_properties; + ASSERT_TRUE( + user_properties.count(ExternalSstFilePropertyNames::kGlobalSeqno)); + } + } + + void CreateFileAndCheck(const std::vector& keys) { + CreateFile(sst_name_, keys); + CheckFile(sst_name_, keys); } protected: @@ -88,6 +112,49 @@ TEST_F(SstFileReaderTest, Uint64Comparator) { CreateFileAndCheck(keys); } +TEST_F(SstFileReaderTest, ReadFileWithGlobalSeqno) { + std::vector keys; + for (uint64_t i = 0; i < kNumKeys; i++) { + keys.emplace_back(EncodeAsString(i)); + } + // Generate a SST file. + CreateFile(sst_name_, keys); + + // Ingest the file into a db, to assign it a global sequence number. + Options options; + options.create_if_missing = true; + std::string db_name = test::PerThreadDBPath("test_db"); + DB* db; + ASSERT_OK(DB::Open(options, db_name, &db)); + // Bump sequence number. + ASSERT_OK(db->Put(WriteOptions(), keys[0], "foo")); + ASSERT_OK(db->Flush(FlushOptions())); + // Ingest the file. + IngestExternalFileOptions ingest_options; + ingest_options.write_global_seqno = true; + ASSERT_OK(db->IngestExternalFile({sst_name_}, ingest_options)); + std::vector live_files; + uint64_t manifest_file_size = 0; + ASSERT_OK(db->GetLiveFiles(live_files, &manifest_file_size)); + // Get the ingested file. + std::string ingested_file; + for (auto& live_file : live_files) { + if (live_file.substr(live_file.size() - 4, std::string::npos) == ".sst") { + if (ingested_file.empty() || ingested_file < live_file) { + ingested_file = live_file; + } + } + } + ASSERT_FALSE(ingested_file.empty()); + delete db; + + // Verify the file can be open and read by SstFileReader. + CheckFile(db_name + ingested_file, keys, true /* check_global_seqno */); + + // Cleanup. + ASSERT_OK(DestroyDB(db_name, options)); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/table/sst_file_writer_collectors.h b/table/sst_file_writer_collectors.h index b3970151ddeeace4fae7bf603ee4f4b68cd868a2..e1827939f2cf0a3c7ee8f0eaea6ee27c25992583 100644 --- a/table/sst_file_writer_collectors.h +++ b/table/sst_file_writer_collectors.h @@ -5,6 +5,8 @@ #pragma once #include +#include "db/dbformat.h" +#include "db/table_properties_collector.h" #include "rocksdb/types.h" #include "util/string_util.h"