diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index f74307e191b4ecd8109e806369cbeec481825ac7..1afdc94a06e84a66148553d99a604ea2d3c45364 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -28,6 +28,7 @@ #include "util/file_reader_writer.h" #include "util/filename.h" #include "util/logging.h" +#include "util/mutexlock.h" #include "util/random.h" #include "util/timer_queue.h" #include "utilities/transactions/optimistic_transaction_db_impl.h" @@ -878,6 +879,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { } }; + MutexLock l(&write_mutex_); + SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1; BlobInserter blob_inserter(this, sequence); updates->Iterate(&blob_inserter); @@ -953,6 +956,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, Status BlobDBImpl::PutUntil(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value_unc, int32_t expiration) { + MutexLock l(&write_mutex_); UpdateWriteOptions(options); std::shared_ptr bfile = diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 95a387afe37512c34d9680517d78d214a6db6313..5105c8c17b1e0f00210d6b2637831213adc16f42 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -447,6 +447,9 @@ class BlobDBImpl : public BlobDB { // HEAVILY TRAFFICKED port::RWMutex mutex_; + // Writers has to hold write_mutex_ before writing. + mutable port::Mutex write_mutex_; + // counter for blob file number std::atomic next_file_number_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index a3873729cf52b4c66e398efa6d88f39780aed3c7..199a9e0750a0692df37c960e28febff1654bfaec 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -511,18 +511,35 @@ TEST_F(BlobDBTest, Compression) { } #endif -TEST_F(BlobDBTest, DISABLED_MultipleWriters) { - Open(); +TEST_F(BlobDBTest, MultipleWriters) { + Open(BlobDBOptions()); std::vector workers; - for (size_t ii = 0; ii < 10; ii++) - workers.push_back(port::Thread(&BlobDBTest::InsertBlobs, this)); - - for (auto& t : workers) { - if (t.joinable()) { - t.join(); + std::vector> data_set(10); + for (uint32_t i = 0; i < 10; i++) + workers.push_back(port::Thread( + [&](uint32_t id) { + Random rnd(301 + id); + for (int j = 0; j < 100; j++) { + std::string key = "key" + ToString(id) + "_" + ToString(j); + if (id < 5) { + PutRandom(key, &rnd, &data_set[id]); + } else { + WriteBatch batch; + PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]); + blob_db_->Write(WriteOptions(), &batch); + } + } + }, + i)); + std::map data; + for (size_t i = 0; i < 10; i++) { + if (workers[i].joinable()) { + workers[i].join(); } + data.insert(data_set[i].begin(), data_set[i].end()); } + VerifyDB(data); } // Test sequence number store in blob file is correct.