diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index ea446037d3c66450c2e804b42ac9026ea014e60b..659ac1b7ea668f3a58e8fdbf58679fa60c0779c4 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1274,6 +1274,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; + std::vector blob_file_additions; std::unique_ptr::iterator> pending_outputs_inserted_elem( new std::list::iterator( @@ -1326,10 +1327,10 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, s = BuildTable( dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(), mutable_cf_options, file_options_for_compaction_, cfd->table_cache(), - iter.get(), std::move(range_del_iters), &meta, - nullptr /* blob_file_additions */, cfd->internal_comparator(), - cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), - snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + iter.get(), std::move(range_del_iters), &meta, &blob_file_additions, + cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), + cfd->GetID(), cfd->GetName(), snapshot_seqs, + earliest_write_conflict_snapshot, snapshot_checker, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), mutable_cf_options.sample_for_compression, mutable_cf_options.compression_opts, paranoid_file_checks, @@ -1351,23 +1352,39 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, // Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. - int level = 0; - if (s.ok() && meta.fd.GetFileSize() > 0) { + const bool has_output = meta.fd.GetFileSize() > 0; + assert(has_output || blob_file_additions.empty()); + + constexpr int level = 0; + + if (s.ok() && has_output) { edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, meta.fd.smallest_seqno, meta.fd.largest_seqno, meta.marked_for_compaction, meta.oldest_blob_file_number, meta.oldest_ancester_time, meta.file_creation_time, meta.file_checksum, meta.file_checksum_func_name); + + edit->SetBlobFileAdditions(std::move(blob_file_additions)); } InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); stats.micros = env_->NowMicros() - start_micros; - stats.bytes_written = meta.fd.GetFileSize(); - stats.num_output_files = 1; + + if (has_output) { + stats.bytes_written = meta.fd.GetFileSize(); + + const auto& blobs = edit->GetBlobFileAdditions(); + for (const auto& blob : blobs) { + stats.bytes_written += blob.GetTotalBlobBytes(); + } + + stats.num_output_files = static_cast(blobs.size()) + 1; + } + cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats); cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED, - meta.fd.GetFileSize()); + stats.bytes_written); RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize()); return s; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 1501d89383ebf5e36d51971637851fee69296628..9d1613222383bc06da81484516d7c7688bb309d3 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -338,6 +338,163 @@ TEST_F(DBWALTest, RecoverWithTableHandle) { } while (ChangeWalOptions()); } +TEST_F(DBWALTest, RecoverWithBlob) { + // Write a value that's below the prospective size limit for blobs and another + // one that's above. Note that blob files are not actually enabled at this + // point. + constexpr uint64_t min_blob_size = 10; + + constexpr char short_value[] = "short"; + static_assert(sizeof(short_value) - 1 < min_blob_size, + "short_value too long"); + + constexpr char long_value[] = "long_value"; + static_assert(sizeof(long_value) - 1 >= min_blob_size, + "long_value too short"); + + ASSERT_OK(Put("key1", short_value)); + ASSERT_OK(Put("key2", long_value)); + + // There should be no files just yet since we haven't flushed. + { + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + ASSERT_EQ(storage_info->num_non_empty_levels(), 0); + ASSERT_TRUE(storage_info->GetBlobFiles().empty()); + } + + // Reopen the database with blob files enabled. A new table file/blob file + // pair should be written during recovery. + Options options; + options.enable_blob_files = true; + options.min_blob_size = min_blob_size; + options.avoid_flush_during_recovery = false; + options.disable_auto_compactions = true; + + Reopen(options); + + ASSERT_EQ(Get("key1"), short_value); + + // TODO: enable once Get support is implemented for blobs + // ASSERT_EQ(Get("key2"), long_value); + + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& l0_files = storage_info->LevelFiles(0); + ASSERT_EQ(l0_files.size(), 1); + + const FileMetaData* const table_file = l0_files[0]; + assert(table_file); + + const auto& blob_files = storage_info->GetBlobFiles(); + ASSERT_EQ(blob_files.size(), 1); + + const auto& blob_file = blob_files.begin()->second; + assert(blob_file); + + ASSERT_EQ(table_file->smallest.user_key(), "key1"); + ASSERT_EQ(table_file->largest.user_key(), "key2"); + ASSERT_EQ(table_file->fd.smallest_seqno, 1); + ASSERT_EQ(table_file->fd.largest_seqno, 2); + ASSERT_EQ(table_file->oldest_blob_file_number, + blob_file->GetBlobFileNumber()); + + ASSERT_EQ(blob_file->GetTotalBlobCount(), 1); + +#ifndef ROCKSDB_LITE + const InternalStats* const internal_stats = cfd->internal_stats(); + assert(internal_stats); + + const uint64_t expected_bytes = + table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes(); + + const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); + ASSERT_FALSE(compaction_stats.empty()); + ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes); + ASSERT_EQ(compaction_stats[0].num_output_files, 2); + + const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); + ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes); +#endif // ROCKSDB_LITE +} + +class DBRecoveryTestBlobError + : public DBWALTest, + public testing::WithParamInterface { + public: + DBRecoveryTestBlobError() : fault_injection_env_(env_) {} + ~DBRecoveryTestBlobError() { Close(); } + + FaultInjectionTestEnv fault_injection_env_; +}; + +INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError, + ::testing::ValuesIn(std::vector{ + "BlobFileBuilder::WriteBlobToFile:AddRecord", + "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); + +TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) { + // Write a value. Note that blob files are not actually enabled at this point. + ASSERT_OK(Put("key", "blob")); + + // Reopen with blob files enabled but make blob file writing fail during + // recovery. + SyncPoint::GetInstance()->SetCallBack(GetParam(), [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(false, Status::IOError()); + }); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", [this](void* /* arg */) { + fault_injection_env_.SetFilesystemActive(true); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + Options options; + options.enable_blob_files = true; + options.avoid_flush_during_recovery = false; + options.disable_auto_compactions = true; + options.env = &fault_injection_env_; + + ASSERT_NOK(TryReopen(options)); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + + // Make sure the files generated by the failed recovery have been deleted. + std::vector files; + ASSERT_OK(env_->GetChildren(dbname_, &files)); + for (const auto& file : files) { + uint64_t number = 0; + FileType type = kTableFile; + + if (!ParseFileName(file, &number, &type)) { + continue; + } + + ASSERT_NE(type, kTableFile); + ASSERT_NE(type, kBlobFile); + } +} + TEST_F(DBWALTest, IgnoreRecoveredLog) { std::string backup_logs = dbname_ + "/backup_logs";