From 3503f289820b8b199db6ffddb4da2565822948c8 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Tue, 29 Jun 2021 10:41:22 -0700 Subject: [PATCH] Add sub-compaction support for RemoteCompaction (#8364) Summary: Change the job_id for remote compaction interface, which will include both internal compaction job_id, also a sub_compaction_job_id. It is not a backward compatible change. The user needs to update interface during upgrade. (We will avoid backward incompatible change after the feature is not experimental.) Pull Request resolved: https://github.com/facebook/rocksdb/pull/8364 Reviewed By: ajkr Differential Revision: D28917301 Pulled By: jay-zhuang fbshipit-source-id: 6d72a21f652bb517ad6954d0387b496797fc4e11 --- db/compaction/compaction_job.cc | 33 +++++++++++++++++------- db/compaction/compaction_job.h | 4 ++- db/compaction/compaction_service_test.cc | 21 ++++++++++----- include/rocksdb/options.h | 4 +-- 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d764265dd..bb1d1fef2 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -180,9 +180,17 @@ struct CompactionJob::SubcompactionState { uint64_t overlapped_bytes = 0; // A flag determine whether the key has been seen in ShouldStopBefore() bool seen_key = false; - - SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size) - : compaction(c), start(_start), end(_end), approx_size(size) { + // sub compaction job id, which is used to identify different sub-compaction + // within the same compaction job. + const uint32_t sub_job_id; + + SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size, + uint32_t _sub_job_id) + : compaction(c), + start(_start), + end(_end), + approx_size(size), + sub_job_id(_sub_job_id) { assert(compaction != nullptr); } @@ -449,7 +457,8 @@ void CompactionJob::Prepare() { for (size_t i = 0; i <= boundaries_.size(); i++) { Slice* start = i == 0 ? nullptr : &boundaries_[i - 1]; Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i]; - compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]); + compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i], + static_cast(i)); } RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, compact_->sub_compact_states.size()); @@ -458,7 +467,8 @@ void CompactionJob::Prepare() { constexpr Slice* end = nullptr; constexpr uint64_t size = 0; - compact_->sub_compact_states.emplace_back(c, start, end, size); + compact_->sub_compact_states.emplace_back(c, start, end, size, + /*sub_job_id*/ 0); } } @@ -973,7 +983,8 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( compaction_input.column_family.name.c_str(), job_id_, compaction_input.output_level, input_files_oss.str().c_str()); CompactionServiceJobStatus compaction_status = - db_options_.compaction_service->Start(compaction_input_binary, job_id_); + db_options_.compaction_service->Start(compaction_input_binary, + GetCompactionId(sub_compact)); if (compaction_status != CompactionServiceJobStatus::kSuccess) { sub_compact->status = Status::Incomplete("CompactionService failed to start compaction job."); @@ -982,7 +993,7 @@ void CompactionJob::ProcessKeyValueCompactionWithCompactionService( std::string compaction_result_binary; compaction_status = db_options_.compaction_service->WaitForComplete( - job_id_, &compaction_result_binary); + GetCompactionId(sub_compact), &compaction_result_binary); CompactionServiceResult compaction_result; s = CompactionServiceResult::Read(compaction_result_binary, @@ -1446,6 +1457,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { sub_compact->status = status; } +uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) { + return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id; +} + void CompactionJob::RecordDroppedKeys( const CompactionIterationStats& c_iter_stats, CompactionJobStats* compaction_job_stats) { @@ -2217,8 +2232,8 @@ Status CompactionServiceCompactionJob::Run() { Slice end = compaction_input_.end; compact_->sub_compact_states.emplace_back( c, compaction_input_.has_begin ? &begin : nullptr, - compaction_input_.has_end ? &end : nullptr, - compaction_input_.approx_size); + compaction_input_.has_end ? &end : nullptr, compaction_input_.approx_size, + /*sub_job_id*/ 0); log_buffer_->FlushBufferToLog(); LogCompaction(); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 0f71fd57b..586bf86de 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -168,7 +168,7 @@ class CompactionJob { void UpdateCompactionInputStatsHelper( int* num_files, uint64_t* bytes_read, int input_level); - int job_id_; + uint32_t job_id_; CompactionJobStats* compaction_job_stats_; @@ -219,6 +219,8 @@ class CompactionJob { std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; + uint64_t GetCompactionId(SubcompactionState* sub_compact); + // Get table file name in where it's outputting to, which should also be in // `output_directory_`. virtual std::string GetTableFileName(uint64_t file_number); diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 1b9afab89..03e626181 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -21,7 +21,7 @@ class MyTestCompactionService : public CompactionService { const char* Name() const override { return kClassName(); } CompactionServiceJobStatus Start(const std::string& compaction_service_input, - int job_id) override { + uint64_t job_id) override { InstrumentedMutexLock l(&mutex_); jobs_.emplace(job_id, compaction_service_input); CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; @@ -30,7 +30,7 @@ class MyTestCompactionService : public CompactionService { } CompactionServiceJobStatus WaitForComplete( - int job_id, std::string* compaction_service_result) override { + uint64_t job_id, std::string* compaction_service_result) override { std::string compaction_input; { InstrumentedMutexLock l(&mutex_); @@ -73,7 +73,7 @@ class MyTestCompactionService : public CompactionService { private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; - std::map jobs_; + std::map jobs_; const std::string db_path_; std::shared_ptr fs_; Options options_; @@ -282,8 +282,7 @@ TEST_F(CompactionServiceTest, InvalidResult) { ASSERT_FALSE(s.ok()); } -// TODO: support sub-compaction -TEST_F(CompactionServiceTest, DISABLED_SubCompaction) { +TEST_F(CompactionServiceTest, SubCompaction) { Options options = CurrentOptions(); options.env = env_; options.max_subcompactions = 10; @@ -294,10 +293,20 @@ TEST_F(CompactionServiceTest, DISABLED_SubCompaction) { DestroyAndReopen(options); GenerateTestData(); + VerifyTestData(); + + auto my_cs = + dynamic_cast(options.compaction_service.get()); + int compaction_num_before = my_cs->GetCompactionNum(); auto cro = CompactRangeOptions(); cro.max_subcompactions = 10; - db_->CompactRange(cro, nullptr, nullptr); + Status s = db_->CompactRange(cro, nullptr, nullptr); + ASSERT_OK(s); + VerifyTestData(); + int compaction_num = my_cs->GetCompactionNum() - compaction_num_before; + // make sure there's sub-compaction by checking the compaction number + ASSERT_GE(compaction_num, 2); } class PartialDeleteCompactionFilter : public CompactionFilter { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 4b46a75c2..7b8d238f7 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -384,12 +384,12 @@ class CompactionService : public Customizable { // TODO: sub-compaction is not supported, as they will have the same job_id, a // sub-compaction id might be added virtual CompactionServiceJobStatus Start( - const std::string& compaction_service_input, int job_id) = 0; + const std::string& compaction_service_input, uint64_t job_id) = 0; // Wait compaction to be finish. // TODO: Add output path override virtual CompactionServiceJobStatus WaitForComplete( - int job_id, std::string* compaction_service_result) = 0; + uint64_t job_id, std::string* compaction_service_result) = 0; virtual ~CompactionService() {} }; -- GitLab