diff --git a/HISTORY.md b/HISTORY.md index 60c0ccebace73ad953d162b06caec79aa0bfc740..7069e3e964ac7f5253966da0b9564cb0b3a70e4f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Add suppport to extend DB::VerifyFileChecksums API to also verify blob files checksum. * When using the new BlobDB, the amount of data written by flushes/compactions is now broken down into table files and blob files in the compaction statistics; namely, Write(GB) denotes the amount of data written to table files, while Wblob(GB) means the amount of data written to blob files. * Add new SetBufferSize API to WriteBufferManager to allow dynamic management of memory allotted to all write buffers. This allows user code to adjust memory monitoring provided by WriteBufferManager as process memory needs change datasets grow and shrink. +* For the new integrated BlobDB implementation, compaction statistics now include the amount of data read from blob files during compaction (due to garbage collection or compaction filters). Write amplification metrics have also been extended to account for data read from blob files. ### New Features * Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index bc4cede080218277aaf356e12ffeaeacbf6d1f18..f1414e8cb5e42f15a526e91269d7f989fde7d310 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -271,7 +271,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t offset, uint64_t value_size, CompressionType compression_type, - PinnableSlice* value) const { + PinnableSlice* value, + uint64_t* bytes_read) const { assert(value); const uint64_t key_size = user_key.size(); @@ -294,6 +295,9 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, : 0; assert(offset >= adjustment); + const uint64_t record_offset = offset - adjustment; + const uint64_t record_size = value_size + adjustment; + Slice record_slice; Buffer buf; AlignedBuf aligned_buf; @@ -301,9 +305,6 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, { TEST_SYNC_POINT("BlobFileReader::GetBlob:ReadFromFile"); - const uint64_t record_offset = offset - adjustment; - const uint64_t record_size = value_size + adjustment; - const Status s = ReadFromFile(file_reader_.get(), record_offset, static_cast(record_size), &record_slice, &buf, &aligned_buf); @@ -332,6 +333,10 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, } } + if (bytes_read) { + *bytes_read = record_size; + } + return Status::OK(); } diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index dee142bc12ed10320daafa859faaa8e4646d9e9e..0ebd89c74421bb71ebd4b44d4dc8a443c467f3ec 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -39,7 +39,8 @@ class BlobFileReader { Status GetBlob(const ReadOptions& read_options, const Slice& user_key, uint64_t offset, uint64_t value_size, - CompressionType compression_type, PinnableSlice* value) const; + CompressionType compression_type, PinnableSlice* value, + uint64_t* bytes_read) const; private: BlobFileReader(std::unique_ptr&& file_reader, diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc index 1d0b78bacfc2c15a7cf6ebedd5136add4bbe483c..04fea58d671419c3853cdfa5b9ff1e293d57270a 100644 --- a/db/blob/blob_file_reader_test.cc +++ b/db/blob/blob_file_reader_test.cc @@ -152,83 +152,103 @@ TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) { { PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kNoCompression, &value)); + kNoCompression, &value, &bytes_read)); ASSERT_EQ(value, blob); + ASSERT_EQ(bytes_read, blob_size); } read_options.verify_checksums = true; { PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kNoCompression, &value)); + kNoCompression, &value, &bytes_read)); ASSERT_EQ(value, blob); + + constexpr uint64_t key_size = sizeof(key) - 1; + ASSERT_EQ(bytes_read, + BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) + + blob_size); } // Invalid offset (too close to start of file) { PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, key, blob_offset - 1, blob_size, - kNoCompression, &value) + kNoCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); } // Invalid offset (too close to end of file) { PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, key, blob_offset + 1, blob_size, - kNoCompression, &value) + kNoCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); } // Incorrect compression type { PinnableSlice value; + uint64_t bytes_read = 0; - ASSERT_TRUE( - reader - ->GetBlob(read_options, key, blob_offset, blob_size, kZSTD, &value) - .IsCorruption()); + ASSERT_TRUE(reader + ->GetBlob(read_options, key, blob_offset, blob_size, kZSTD, + &value, &bytes_read) + .IsCorruption()); + ASSERT_EQ(bytes_read, 0); } // Incorrect key size { constexpr char shorter_key[] = "k"; PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, shorter_key, blob_offset - (sizeof(key) - sizeof(shorter_key)), - blob_size, kNoCompression, &value) + blob_size, kNoCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); } // Incorrect key { constexpr char incorrect_key[] = "foo"; PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, incorrect_key, blob_offset, - blob_size, kNoCompression, &value) + blob_size, kNoCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); } // Incorrect value size { PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(read_options, key, blob_offset, blob_size + 1, - kNoCompression, &value) + kNoCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); } } @@ -479,11 +499,13 @@ TEST_F(BlobFileReaderTest, BlobCRCError) { SyncPoint::GetInstance()->EnableProcessing(); PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value) + kNoCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); @@ -530,20 +552,28 @@ TEST_F(BlobFileReaderTest, Compression) { { PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, &value)); + kSnappyCompression, &value, &bytes_read)); ASSERT_EQ(value, blob); + ASSERT_EQ(bytes_read, blob_size); } read_options.verify_checksums = true; { PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size, - kSnappyCompression, &value)); + kSnappyCompression, &value, &bytes_read)); ASSERT_EQ(value, blob); + + constexpr uint64_t key_size = sizeof(key) - 1; + ASSERT_EQ(bytes_read, + BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) + + blob_size); } } @@ -596,11 +626,13 @@ TEST_F(BlobFileReaderTest, UncompressionError) { SyncPoint::GetInstance()->EnableProcessing(); PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kSnappyCompression, &value) + kSnappyCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); @@ -678,11 +710,13 @@ TEST_P(BlobFileReaderIOErrorTest, IOError) { ASSERT_OK(s); PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value) + kNoCompression, &value, &bytes_read) .IsIOError()); + ASSERT_EQ(bytes_read, 0); } SyncPoint::GetInstance()->DisableProcessing(); @@ -758,11 +792,13 @@ TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) { ASSERT_OK(s); PinnableSlice value; + uint64_t bytes_read = 0; ASSERT_TRUE(reader ->GetBlob(ReadOptions(), key, blob_offset, blob_size, - kNoCompression, &value) + kNoCompression, &value, &bytes_read) .IsCorruption()); + ASSERT_EQ(bytes_read, 0); } SyncPoint::GetInstance()->DisableProcessing(); diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index 6e699b09c5cbded79362f60aeb30ba74d5fa0341..8c02046f978fb8a8b0bc8d331eef27197335c6f3 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -41,6 +41,22 @@ class DBBlobCompactionTest : public DBTestBase { return result; } + +#ifndef ROCKSDB_LITE + const std::vector& GetCompactionStats() { + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + return internal_stats->TEST_GetCompactionStats(); + } +#endif // ROCKSDB_LITE }; namespace { @@ -214,6 +230,17 @@ TEST_F(DBBlobCompactionTest, FilterByKeyLength) { value.clear(); ASSERT_OK(db_->Get(ReadOptions(), long_key, &value)); ASSERT_EQ("value", value); + +#ifndef ROCKSDB_LITE + const auto& compaction_stats = GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + // Filter decides between kKeep and kRemove solely based on key; + // this involves neither reading nor writing blobs + ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); +#endif // ROCKSDB_LITE + Close(); } @@ -239,6 +266,17 @@ TEST_F(DBBlobCompactionTest, BlindWriteFilter) { for (const auto& key : keys) { ASSERT_EQ(new_blob_value, Get(key)); } + +#ifndef ROCKSDB_LITE + const auto& compaction_stats = GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + // Filter unconditionally changes value in FilterBlobByKey; + // this involves writing but not reading blobs + ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0); + ASSERT_GT(compaction_stats[1].bytes_written_blob, 0); +#endif // ROCKSDB_LITE + Close(); } @@ -312,6 +350,17 @@ TEST_F(DBBlobCompactionTest, CompactionFilter) { for (const auto& kv : kvs) { ASSERT_EQ(kv.second + std::string(padding), Get(kv.first)); } + +#ifndef ROCKSDB_LITE + const auto& compaction_stats = GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + // Filter changes the value using the previous value in FilterV2; + // this involves reading and writing blobs + ASSERT_GT(compaction_stats[1].bytes_read_blob, 0); + ASSERT_GT(compaction_stats[1].bytes_written_blob, 0); +#endif // ROCKSDB_LITE + Close(); } @@ -354,6 +403,16 @@ TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) { /*end=*/nullptr)); ASSERT_EQ(blob_files, GetBlobFileNumbers()); +#ifndef ROCKSDB_LITE + const auto& compaction_stats = GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + // Filter decides to keep the existing value in FilterV2; + // this involves reading but not writing blobs + ASSERT_GT(compaction_stats[1].bytes_read_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); +#endif // ROCKSDB_LITE + Close(); } @@ -363,4 +422,4 @@ int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); -} \ No newline at end of file +} diff --git a/db/compaction/compaction_iteration_stats.h b/db/compaction/compaction_iteration_stats.h index 963c1d8eb4900dd3473cdc1c732c833abd2831d3..cb7b82c65ab4db7d9be2c049ba57a3e9dc4be175 100644 --- a/db/compaction/compaction_iteration_stats.h +++ b/db/compaction/compaction_iteration_stats.h @@ -34,4 +34,8 @@ struct CompactionIterationStats { // Single-Delete diagnostics for exceptional situations uint64_t num_single_del_fallthru = 0; uint64_t num_single_del_mismatch = 0; + + // Blob related statistics + uint64_t num_blobs_read = 0; + uint64_t total_blob_bytes_read = 0; }; diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 8064474da992b3f2c967c5135c34fa05e6d50f89..2569514cd35aafaca2154c26d0105c7961e6f9a6 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -253,13 +253,19 @@ bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, } const Version* const version = compaction_->input_version(); assert(version); + + uint64_t bytes_read = 0; s = version->GetBlob(ReadOptions(), ikey_.user_key, blob_index, - &blob_value_); + &blob_value_, &bytes_read); if (!s.ok()) { status_ = s; valid_ = false; return false; } + + ++iter_stats_.num_blobs_read; + iter_stats_.total_blob_bytes_read += bytes_read; + value_type = CompactionFilter::ValueType::kValue; } } @@ -883,9 +889,11 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { const Version* const version = compaction_->input_version(); assert(version); + uint64_t bytes_read = 0; + { - const Status s = - version->GetBlob(ReadOptions(), user_key(), blob_index, &blob_value_); + const Status s = version->GetBlob(ReadOptions(), user_key(), blob_index, + &blob_value_, &bytes_read); if (!s.ok()) { status_ = s; @@ -895,6 +903,9 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { } } + ++iter_stats_.num_blobs_read; + iter_stats_.total_blob_bytes_read += bytes_read; + value_ = blob_value_; if (ExtractLargeValueIfNeededImpl()) { diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 38387f046c291f139d710d75abb4ce3fa9b20a96..cbfc996f24ec19dce7554c02f4a840df9a6bf5cb 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -794,20 +794,23 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { double bytes_read_per_sec = 0; double bytes_written_per_sec = 0; - if (stats.bytes_read_non_output_levels > 0) { - read_write_amp = - (stats.bytes_written + stats.bytes_written_blob + - stats.bytes_read_output_level + stats.bytes_read_non_output_levels) / - static_cast(stats.bytes_read_non_output_levels); - write_amp = (stats.bytes_written + stats.bytes_written_blob) / - static_cast(stats.bytes_read_non_output_levels); + const uint64_t bytes_read_non_output_and_blob = + stats.bytes_read_non_output_levels + stats.bytes_read_blob; + const uint64_t bytes_read_all = + stats.bytes_read_output_level + bytes_read_non_output_and_blob; + const uint64_t bytes_written_all = + stats.bytes_written + stats.bytes_written_blob; + + if (bytes_read_non_output_and_blob > 0) { + read_write_amp = (bytes_written_all + bytes_read_all) / + static_cast(bytes_read_non_output_and_blob); + write_amp = + bytes_written_all / static_cast(bytes_read_non_output_and_blob); } if (stats.micros > 0) { - bytes_read_per_sec = - (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / - static_cast(stats.micros); - bytes_written_per_sec = (stats.bytes_written + stats.bytes_written_blob) / - static_cast(stats.micros); + bytes_read_per_sec = bytes_read_all / static_cast(stats.micros); + bytes_written_per_sec = + bytes_written_all / static_cast(stats.micros); } const std::string& column_family_name = cfd->GetName(); @@ -818,8 +821,8 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { log_buffer_, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " "files in(%d, %d) out(%d +%d blob) " - "MB in(%.1f, %.1f) out(%.1f +%.1f blob), read-write-amplify(%.1f) " - "write-amplify(%.1f) %s, records in: %" PRIu64 + "MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), " + "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64 ", records dropped: %" PRIu64 " output_compression: %s\n", column_family_name.c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec, bytes_written_per_sec, @@ -827,9 +830,9 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { stats.num_input_files_in_non_output_levels, stats.num_input_files_in_output_level, stats.num_output_files, stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB, - stats.bytes_read_output_level / kMB, stats.bytes_written / kMB, - stats.bytes_written_blob / kMB, read_write_amp, write_amp, - status.ToString().c_str(), stats.num_input_records, + stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB, + stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp, + write_amp, status.ToString().c_str(), stats.num_input_records, stats.num_dropped_records, CompressionTypeToString(compact_->compaction->output_compression()) .c_str()); @@ -1124,6 +1127,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } } + sub_compact->compaction_job_stats.num_blobs_read = + c_iter_stats.num_blobs_read; + sub_compact->compaction_job_stats.total_blob_bytes_read = + c_iter_stats.total_blob_bytes_read; sub_compact->compaction_job_stats.num_input_deletion_records = c_iter_stats.num_input_deletion_records; sub_compact->compaction_job_stats.num_corrupt_keys = @@ -1827,6 +1834,10 @@ void CompactionJob::UpdateCompactionStats() { } } + assert(compaction_job_stats_); + compaction_stats_.bytes_read_blob = + compaction_job_stats_->total_blob_bytes_read; + compaction_stats_.num_output_files = static_cast(compact_->num_output_files); compaction_stats_.num_output_files_blob = @@ -1871,11 +1882,11 @@ void CompactionJob::UpdateCompactionJobStats( stats.num_input_files_in_output_level; // output information - compaction_job_stats_->total_output_bytes = - stats.bytes_written + stats.bytes_written_blob; + compaction_job_stats_->total_output_bytes = stats.bytes_written; + compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob; compaction_job_stats_->num_output_records = compact_->num_output_records; - compaction_job_stats_->num_output_files = - stats.num_output_files + stats.num_output_files_blob; + compaction_job_stats_->num_output_files = stats.num_output_files; + compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob; if (stats.num_output_files > 0) { CopyPrefix(compact_->SmallestUserKey(), diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 8c81134105824537ada75b314114f5ad8adddb76..e034e3610ca780b9ace230413856fc263486527d 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5948,6 +5948,7 @@ TEST_F(DBCompactionTest, CompactionWithBlob) { const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); ASSERT_GE(compaction_stats.size(), 2); + ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0); ASSERT_EQ(compaction_stats[1].bytes_written, table_file->fd.GetFileSize()); ASSERT_EQ(compaction_stats[1].bytes_written_blob, blob_file->GetTotalBlobBytes()); @@ -6039,12 +6040,14 @@ TEST_P(DBCompactionTestBlobError, CompactionError) { ASSERT_GE(compaction_stats.size(), 2); if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") { + ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0); ASSERT_EQ(compaction_stats[1].bytes_written, 0); ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); ASSERT_EQ(compaction_stats[1].num_output_files, 0); ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0); } else { // SST file writing succeeded; blob file writing failed (during Finish) + ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0); ASSERT_GT(compaction_stats[1].bytes_written, 0); ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); ASSERT_EQ(compaction_stats[1].num_output_files, 1); @@ -6133,6 +6136,36 @@ TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) { for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) { ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]); } + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + assert(versions->GetColumnFamilySet()); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_GE(compaction_stats.size(), 2); + + if (blob_gc_age_cutoff_ > 0.0) { + ASSERT_GT(compaction_stats[1].bytes_read_blob, 0); + + if (updated_enable_blob_files_) { + // GC relocated some blobs to new blob files + ASSERT_GT(compaction_stats[1].bytes_written_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_read_blob, + compaction_stats[1].bytes_written_blob); + } else { + // GC moved some blobs back to the LSM, no new blob files + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); + } + } else { + ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0); + ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0); + } } TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) { diff --git a/db/db_iter.cc b/db/db_iter.cc index 853e800ecbfafe9ae89007d1d02dc30cb9935be1..6d4d9bab34d36e1753cfd46c475d88e4da64528d 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -193,8 +193,10 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, read_options.read_tier = read_tier_; read_options.verify_checksums = verify_checksums_; - const Status s = - version_->GetBlob(read_options, user_key, blob_index, &blob_value_); + constexpr uint64_t* bytes_read = nullptr; + + const Status s = version_->GetBlob(read_options, user_key, blob_index, + &blob_value_, bytes_read); if (!s.ok()) { status_ = s; diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 8ef719682d82b5a0e29ba605ce600d91e86b1570..030d1fab661e81d247ceec61872bc1d04eddaaac 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -50,6 +50,7 @@ const std::map InternalStats::compaction_level_stats = {LevelStatType::AVG_SEC, LevelStat{"AvgSec", "Avg(sec)"}}, {LevelStatType::KEY_IN, LevelStat{"KeyIn", "KeyIn"}}, {LevelStatType::KEY_DROP, LevelStat{"KeyDrop", "KeyDrop"}}, + {LevelStatType::R_BLOB_GB, LevelStat{"RblobGB", "Rblob(GB)"}}, {LevelStatType::W_BLOB_GB, LevelStat{"WblobGB", "Wblob(GB)"}}, }; @@ -68,7 +69,8 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name, }; int line_size = snprintf( buf + written_size, len - written_size, - "%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s\n", + "%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s " + "%s\n", // Note that we skip COMPACTED_FILES and merge it with Files column group_by.c_str(), hdr(LevelStatType::NUM_FILES), hdr(LevelStatType::SIZE_BYTES), hdr(LevelStatType::SCORE), @@ -79,7 +81,8 @@ void PrintLevelStatsHeader(char* buf, size_t len, const std::string& cf_name, hdr(LevelStatType::WRITE_MBPS), hdr(LevelStatType::COMP_SEC), hdr(LevelStatType::COMP_CPU_SEC), hdr(LevelStatType::COMP_COUNT), hdr(LevelStatType::AVG_SEC), hdr(LevelStatType::KEY_IN), - hdr(LevelStatType::KEY_DROP), hdr(LevelStatType::W_BLOB_GB)); + hdr(LevelStatType::KEY_DROP), hdr(LevelStatType::R_BLOB_GB), + hdr(LevelStatType::W_BLOB_GB)); written_size += line_size; written_size = std::min(written_size, static_cast(len)); @@ -91,8 +94,9 @@ void PrepareLevelStats(std::map* level_stats, int num_files, int being_compacted, double total_file_size, double score, double w_amp, const InternalStats::CompactionStats& stats) { - const uint64_t bytes_read = - stats.bytes_read_non_output_levels + stats.bytes_read_output_level; + const uint64_t bytes_read = stats.bytes_read_non_output_levels + + stats.bytes_read_output_level + + stats.bytes_read_blob; const uint64_t bytes_written = stats.bytes_written + stats.bytes_written_blob; const int64_t bytes_new = stats.bytes_written - stats.bytes_read_output_level; const double elapsed = (stats.micros + 1) / kMicrosInSec; @@ -120,6 +124,7 @@ void PrepareLevelStats(std::map* level_stats, static_cast(stats.num_input_records); (*level_stats)[LevelStatType::KEY_DROP] = static_cast(stats.num_dropped_records); + (*level_stats)[LevelStatType::R_BLOB_GB] = stats.bytes_read_blob / kGB; (*level_stats)[LevelStatType::W_BLOB_GB] = stats.bytes_written_blob / kGB; } @@ -146,6 +151,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, "%8.3f " /* Avg(sec) */ "%7s " /* KeyIn */ "%6s " /* KeyDrop */ + "%9.1f " /* Rblob(GB) */ "%9.1f\n", /* Wblob(GB) */ name.c_str(), static_cast(stat_value.at(LevelStatType::NUM_FILES)), static_cast(stat_value.at(LevelStatType::COMPACTED_FILES)), @@ -172,6 +178,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name, NumberToHumanString( static_cast(stat_value.at(LevelStatType::KEY_DROP))) .c_str(), + stat_value.at(LevelStatType::R_BLOB_GB), stat_value.at(LevelStatType::W_BLOB_GB)); } @@ -1116,9 +1123,10 @@ void InternalStats::DumpDBStats(std::string* value) { */ void InternalStats::DumpCFMapStats( std::map* cf_stats) { + const VersionStorageInfo* vstorage = cfd_->current()->storage_info(); CompactionStats compaction_stats_sum; std::map> levels_stats; - DumpCFMapStats(&levels_stats, &compaction_stats_sum); + DumpCFMapStats(vstorage, &levels_stats, &compaction_stats_sum); for (auto const& level_ent : levels_stats) { auto level_str = level_ent.first == -1 ? "Sum" : "L" + ToString(level_ent.first); @@ -1135,9 +1143,10 @@ void InternalStats::DumpCFMapStats( } void InternalStats::DumpCFMapStats( + const VersionStorageInfo* vstorage, std::map>* levels_stats, CompactionStats* compaction_stats_sum) { - const VersionStorageInfo* vstorage = cfd_->current()->storage_info(); + assert(vstorage); int num_levels_to_check = (cfd_->ioptions()->compaction_style != kCompactionStyleFIFO) @@ -1178,7 +1187,8 @@ void InternalStats::DumpCFMapStats( if (level == 0) { input_bytes = curr_ingest; } else { - input_bytes = comp_stats_[level].bytes_read_non_output_levels; + input_bytes = comp_stats_[level].bytes_read_non_output_levels + + comp_stats_[level].bytes_read_blob; } double w_amp = (input_bytes == 0) @@ -1262,9 +1272,10 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { value->append(buf); // Print stats for each level + const VersionStorageInfo* vstorage = cfd_->current()->storage_info(); std::map> levels_stats; CompactionStats compaction_stats_sum; - DumpCFMapStats(&levels_stats, &compaction_stats_sum); + DumpCFMapStats(vstorage, &levels_stats, &compaction_stats_sum); for (int l = 0; l < number_levels_; ++l) { if (levels_stats.find(l) != levels_stats.end()) { PrintLevelStats(buf, sizeof(buf), "L" + ToString(l), levels_stats[l]); @@ -1320,6 +1331,12 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { } } + snprintf(buf, sizeof(buf), + "\nBlob file count: %" ROCKSDB_PRIszt ", total size: %.1f GB\n\n", + vstorage->GetBlobFiles().size(), + vstorage->GetTotalBlobFileSize() / kGB); + value->append(buf); + double seconds_up = (clock_->NowMicros() - started_at_ + 1) / kMicrosInSec; double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up; snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n", @@ -1360,7 +1377,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) { uint64_t compact_micros = 0; for (int level = 0; level < number_levels_; level++) { compact_bytes_read += comp_stats_[level].bytes_read_output_level + - comp_stats_[level].bytes_read_non_output_levels; + comp_stats_[level].bytes_read_non_output_levels + + comp_stats_[level].bytes_read_blob; compact_bytes_write += comp_stats_[level].bytes_written + comp_stats_[level].bytes_written_blob; compact_micros += comp_stats_[level].micros; diff --git a/db/internal_stats.h b/db/internal_stats.h index 1337779d31ae04ec359b58a8171a80d8d9d52f77..6f449aa3d681c011da1dc23da7ec044b463c6620 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -79,6 +79,7 @@ enum class LevelStatType { AVG_SEC, KEY_IN, KEY_DROP, + R_BLOB_GB, W_BLOB_GB, TOTAL // total number of types }; @@ -150,6 +151,9 @@ class InternalStats { // The number of bytes read from the compaction output level (table files) uint64_t bytes_read_output_level; + // The number of bytes read from blob files + uint64_t bytes_read_blob; + // Total number of bytes written to table files during compaction uint64_t bytes_written; @@ -190,6 +194,7 @@ class InternalStats { cpu_micros(0), bytes_read_non_output_levels(0), bytes_read_output_level(0), + bytes_read_blob(0), bytes_written(0), bytes_written_blob(0), bytes_moved(0), @@ -211,6 +216,7 @@ class InternalStats { cpu_micros(0), bytes_read_non_output_levels(0), bytes_read_output_level(0), + bytes_read_blob(0), bytes_written(0), bytes_written_blob(0), bytes_moved(0), @@ -238,6 +244,7 @@ class InternalStats { cpu_micros(c.cpu_micros), bytes_read_non_output_levels(c.bytes_read_non_output_levels), bytes_read_output_level(c.bytes_read_output_level), + bytes_read_blob(c.bytes_read_blob), bytes_written(c.bytes_written), bytes_written_blob(c.bytes_written_blob), bytes_moved(c.bytes_moved), @@ -260,6 +267,7 @@ class InternalStats { cpu_micros = c.cpu_micros; bytes_read_non_output_levels = c.bytes_read_non_output_levels; bytes_read_output_level = c.bytes_read_output_level; + bytes_read_blob = c.bytes_read_blob; bytes_written = c.bytes_written; bytes_written_blob = c.bytes_written_blob; bytes_moved = c.bytes_moved; @@ -284,6 +292,7 @@ class InternalStats { this->cpu_micros = 0; this->bytes_read_non_output_levels = 0; this->bytes_read_output_level = 0; + this->bytes_read_blob = 0; this->bytes_written = 0; this->bytes_written_blob = 0; this->bytes_moved = 0; @@ -305,6 +314,7 @@ class InternalStats { this->cpu_micros += c.cpu_micros; this->bytes_read_non_output_levels += c.bytes_read_non_output_levels; this->bytes_read_output_level += c.bytes_read_output_level; + this->bytes_read_blob += c.bytes_read_blob; this->bytes_written += c.bytes_written; this->bytes_written_blob += c.bytes_written_blob; this->bytes_moved += c.bytes_moved; @@ -328,6 +338,7 @@ class InternalStats { this->cpu_micros -= c.cpu_micros; this->bytes_read_non_output_levels -= c.bytes_read_non_output_levels; this->bytes_read_output_level -= c.bytes_read_output_level; + this->bytes_read_blob -= c.bytes_read_blob; this->bytes_written -= c.bytes_written; this->bytes_written_blob -= c.bytes_written_blob; this->bytes_moved -= c.bytes_moved; @@ -435,6 +446,7 @@ class InternalStats { void DumpDBStats(std::string* value); void DumpCFMapStats(std::map* cf_stats); void DumpCFMapStats( + const VersionStorageInfo* vstorage, std::map>* level_stats, CompactionStats* compaction_stats_sum); void DumpCFMapStatsByPriority( @@ -674,6 +686,7 @@ class InternalStats { uint64_t cpu_micros; uint64_t bytes_read_non_output_levels; uint64_t bytes_read_output_level; + uint64_t bytes_read_blob; uint64_t bytes_written; uint64_t bytes_written_blob; uint64_t bytes_moved; diff --git a/db/version_set.cc b/db/version_set.cc index 88cf623c332f7ae37042c1dc9cc004a66349ba5e..c4ff1a6aa5182642c15e9d66e7edcbefd493b789 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1793,8 +1793,8 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset, io_tracer_(io_tracer) {} Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, - const Slice& blob_index_slice, - PinnableSlice* value) const { + const Slice& blob_index_slice, PinnableSlice* value, + uint64_t* bytes_read) const { if (read_options.read_tier == kBlockCacheTier) { return Status::Incomplete("Cannot read blob: no disk I/O allowed"); } @@ -1808,12 +1808,12 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, } } - return GetBlob(read_options, user_key, blob_index, value); + return GetBlob(read_options, user_key, blob_index, value, bytes_read); } Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, - const BlobIndex& blob_index, - PinnableSlice* value) const { + const BlobIndex& blob_index, PinnableSlice* value, + uint64_t* bytes_read) const { assert(value); if (blob_index.HasTTL() || blob_index.IsInlined()) { @@ -1843,7 +1843,7 @@ Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key, assert(blob_file_reader.GetValue()); const Status s = blob_file_reader.GetValue()->GetBlob( read_options, user_key, blob_index.offset(), blob_index.size(), - blob_index.compression(), value); + blob_index.compression(), value, bytes_read); return s; } @@ -1953,7 +1953,10 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, if (is_blob_index) { if (do_merge && value) { - *status = GetBlob(read_options, user_key, *value, value); + constexpr uint64_t* bytes_read = nullptr; + + *status = + GetBlob(read_options, user_key, *value, value, bytes_read); if (!status->ok()) { if (status->IsIncomplete()) { get_context.MarkKeyMayExist(); @@ -2147,8 +2150,10 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, if (iter->is_blob_index) { if (iter->value) { + constexpr uint64_t* bytes_read = nullptr; + *status = GetBlob(read_options, iter->ukey_with_ts, *iter->value, - iter->value); + iter->value, bytes_read); if (!status->ok()) { if (status->IsIncomplete()) { get_context.MarkKeyMayExist(); diff --git a/db/version_set.h b/db/version_set.h index 98c15af5c1f562841c371386cc7d78883bf9e1e6..68263eb8571188155b2b1bb401e50c6e144d999b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -344,6 +344,19 @@ class VersionStorageInfo { using BlobFiles = std::map>; const BlobFiles& GetBlobFiles() const { return blob_files_; } + uint64_t GetTotalBlobFileSize() const { + uint64_t total_blob_bytes = 0; + + for (const auto& pair : blob_files_) { + const auto& meta = pair.second; + assert(meta); + + total_blob_bytes += meta->GetTotalBlobBytes(); + } + + return total_blob_bytes; + } + const ROCKSDB_NAMESPACE::LevelFilesBrief& LevelFilesBrief(int level) const { assert(level < static_cast(level_files_brief_.size())); return level_files_brief_[level]; @@ -690,12 +703,14 @@ class Version { // saves it in *value. // REQUIRES: blob_index_slice stores an encoded blob reference Status GetBlob(const ReadOptions& read_options, const Slice& user_key, - const Slice& blob_index_slice, PinnableSlice* value) const; + const Slice& blob_index_slice, PinnableSlice* value, + uint64_t* bytes_read) const; // Retrieves a blob using a blob reference and saves it in *value, // assuming the corresponding blob file is part of this Version. Status GetBlob(const ReadOptions& read_options, const Slice& user_key, - const BlobIndex& blob_index, PinnableSlice* value) const; + const BlobIndex& blob_index, PinnableSlice* value, + uint64_t* bytes_read) const; // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. diff --git a/include/rocksdb/compaction_job_stats.h b/include/rocksdb/compaction_job_stats.h index 0f9c8fcbb1f832690741c5648ecaf78852dfc142..626f3202ff93c0976028201216c1ff6112139f1c 100644 --- a/include/rocksdb/compaction_job_stats.h +++ b/include/rocksdb/compaction_job_stats.h @@ -25,25 +25,33 @@ struct CompactionJobStats { // the number of compaction input records. uint64_t num_input_records; - // the number of compaction input files. + // the number of blobs read from blob files + uint64_t num_blobs_read; + // the number of compaction input files (table files) size_t num_input_files; - // the number of compaction input files at the output level. + // the number of compaction input files at the output level (table files) size_t num_input_files_at_output_level; // the number of compaction output records. uint64_t num_output_records; - // the number of compaction output files. + // the number of compaction output files (table files) size_t num_output_files; + // the number of compaction output files (blob files) + size_t num_output_files_blob; // true if the compaction is a full compaction (all live SST files input) bool is_full_compaction; // true if the compaction is a manual compaction bool is_manual_compaction; - // the size of the compaction input in bytes. + // the total size of table files in the compaction input uint64_t total_input_bytes; - // the size of the compaction output in bytes. + // the total size of blobs read from blob files + uint64_t total_blob_bytes_read; + // the total size of table files in the compaction output uint64_t total_output_bytes; + // the total size of blob files in the compaction output + uint64_t total_output_bytes_blob; // number of records being replaced by newer record associated with same key. // this could be a new value or a deletion entry for that key so this field diff --git a/util/compaction_job_stats_impl.cc b/util/compaction_job_stats_impl.cc index e6380bb0481868adf9a8be7cb137fa463d5adea2..cfab2a4fefe30217917564757816ce2f00b22f10 100644 --- a/util/compaction_job_stats_impl.cc +++ b/util/compaction_job_stats_impl.cc @@ -14,17 +14,21 @@ void CompactionJobStats::Reset() { cpu_micros = 0; num_input_records = 0; + num_blobs_read = 0; num_input_files = 0; num_input_files_at_output_level = 0; num_output_records = 0; num_output_files = 0; + num_output_files_blob = 0; is_full_compaction = false; is_manual_compaction = false; total_input_bytes = 0; + total_blob_bytes_read = 0; total_output_bytes = 0; + total_output_bytes_blob = 0; num_records_replaced = 0; @@ -53,14 +57,18 @@ void CompactionJobStats::Add(const CompactionJobStats& stats) { cpu_micros += stats.cpu_micros; num_input_records += stats.num_input_records; + num_blobs_read += stats.num_blobs_read; num_input_files += stats.num_input_files; num_input_files_at_output_level += stats.num_input_files_at_output_level; num_output_records += stats.num_output_records; num_output_files += stats.num_output_files; + num_output_files_blob += stats.num_output_files_blob; total_input_bytes += stats.total_input_bytes; + total_blob_bytes_read += stats.total_blob_bytes_read; total_output_bytes += stats.total_output_bytes; + total_output_bytes_blob += stats.total_output_bytes_blob; num_records_replaced += stats.num_records_replaced;