提交 77f47995 编写于 作者: M mpoeter 提交者: Facebook GitHub Bot

Fix potential leak when reusing PinnableSlice instances. (#10166)

Summary:
`PinnableSlice` may hold a handle to a cache value which must be released to correctly decrement the ref-counter. However, when `PinnableSlice` variables are reused, e.g. like this:
```
PinnableSlice pin_slice;
db.Get("foo", &pin_slice);
db.Get("foo", &pin_slice);
```
then the second `Get` simply overwrites the old value in `pin_slice` and the handle returned by the first `Get` is _not_ released.

This PR adds `Reset` calls to the `Get`/`MultiGet` calls that accept `PinnableSlice` arguments to ensure proper cleanup of old values.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10166

Reviewed By: hx235

Differential Revision: D37151632

Pulled By: ajkr

fbshipit-source-id: 9dd3c3288300f560531b843f67db11aeb569a9ff
上级 b550fc0b
...@@ -1742,6 +1742,8 @@ Status DBImpl::Get(const ReadOptions& read_options, ...@@ -1742,6 +1742,8 @@ Status DBImpl::Get(const ReadOptions& read_options,
Status DBImpl::Get(const ReadOptions& read_options, Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value, std::string* timestamp) { PinnableSlice* value, std::string* timestamp) {
assert(value != nullptr);
value->Reset();
GetImplOptions get_impl_options; GetImplOptions get_impl_options;
get_impl_options.column_family = column_family; get_impl_options.column_family = column_family;
get_impl_options.value = value; get_impl_options.value = value;
...@@ -2349,6 +2351,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys, ...@@ -2349,6 +2351,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys; autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys); sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
values[i].Reset();
key_context.emplace_back(column_families[i], keys[i], &values[i], key_context.emplace_back(column_families[i], keys[i], &values[i],
timestamps ? &timestamps[i] : nullptr, timestamps ? &timestamps[i] : nullptr,
&statuses[i]); &statuses[i]);
...@@ -2495,6 +2498,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options, ...@@ -2495,6 +2498,7 @@ void DBImpl::MultiGet(const ReadOptions& read_options,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys; autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(num_keys); sorted_keys.resize(num_keys);
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
values[i].Reset();
key_context.emplace_back(column_family, keys[i], &values[i], key_context.emplace_back(column_family, keys[i], &values[i],
timestamps ? &timestamps[i] : nullptr, timestamps ? &timestamps[i] : nullptr,
&statuses[i]); &statuses[i]);
......
...@@ -321,7 +321,7 @@ TEST_F(DBTest, MixedSlowdownOptions) { ...@@ -321,7 +321,7 @@ TEST_F(DBTest, MixedSlowdownOptions) {
// We need the 2nd write to trigger delay. This is because delay is // We need the 2nd write to trigger delay. This is because delay is
// estimated based on the last write size which is 0 for the first write. // estimated based on the last write size which is 0 for the first write.
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
token.reset(); token.reset();
for (auto& t : threads) { for (auto& t : threads) {
t.join(); t.join();
...@@ -379,7 +379,7 @@ TEST_F(DBTest, MixedSlowdownOptionsInQueue) { ...@@ -379,7 +379,7 @@ TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
// We need the 2nd write to trigger delay. This is because delay is // We need the 2nd write to trigger delay. This is because delay is
// estimated based on the last write size which is 0 for the first write. // estimated based on the last write size which is 0 for the first write.
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
token.reset(); token.reset();
for (auto& t : threads) { for (auto& t : threads) {
t.join(); t.join();
...@@ -448,7 +448,7 @@ TEST_F(DBTest, MixedSlowdownOptionsStop) { ...@@ -448,7 +448,7 @@ TEST_F(DBTest, MixedSlowdownOptionsStop) {
// We need the 2nd write to trigger delay. This is because delay is // We need the 2nd write to trigger delay. This is because delay is
// estimated based on the last write size which is 0 for the first write. // estimated based on the last write size which is 0 for the first write.
ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2")); ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
token.reset(); token.reset();
for (auto& t : threads) { for (auto& t : threads) {
t.join(); t.join();
...@@ -483,7 +483,6 @@ TEST_F(DBTest, LevelLimitReopen) { ...@@ -483,7 +483,6 @@ TEST_F(DBTest, LevelLimitReopen) {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest, PutSingleDeleteGet) { TEST_F(DBTest, PutSingleDeleteGet) {
do { do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
...@@ -782,7 +781,6 @@ TEST_F(DBTest, GetFromImmutableLayer) { ...@@ -782,7 +781,6 @@ TEST_F(DBTest, GetFromImmutableLayer) {
} while (ChangeOptions()); } while (ChangeOptions());
} }
TEST_F(DBTest, GetLevel0Ordering) { TEST_F(DBTest, GetLevel0Ordering) {
do { do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
...@@ -3807,7 +3805,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) { ...@@ -3807,7 +3805,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) {
options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
options.compaction_options_fifo.allow_compaction = false; options.compaction_options_fifo.allow_compaction = false;
options.ttl = 1 * 60 * 60 ; // 1 hour options.ttl = 1 * 60 * 60; // 1 hour
options = CurrentOptions(options); options = CurrentOptions(options);
DestroyAndReopen(options); DestroyAndReopen(options);
...@@ -3881,7 +3879,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) { ...@@ -3881,7 +3879,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) {
options.write_buffer_size = 10 << 10; // 10KB options.write_buffer_size = 10 << 10; // 10KB
options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
options.compaction_options_fifo.allow_compaction = false; options.compaction_options_fifo.allow_compaction = false;
options.ttl = 1 * 60 * 60; // 1 hour options.ttl = 1 * 60 * 60; // 1 hour
options = CurrentOptions(options); options = CurrentOptions(options);
DestroyAndReopen(options); DestroyAndReopen(options);
...@@ -6070,7 +6068,6 @@ TEST_F(DBTest, DISABLED_SuggestCompactRangeTest) { ...@@ -6070,7 +6068,6 @@ TEST_F(DBTest, DISABLED_SuggestCompactRangeTest) {
ASSERT_EQ(1, NumTableFilesAtLevel(1)); ASSERT_EQ(1, NumTableFilesAtLevel(1));
} }
TEST_F(DBTest, PromoteL0) { TEST_F(DBTest, PromoteL0) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
...@@ -6251,13 +6248,12 @@ TEST_F(DBTest, CompactFilesShouldTriggerAutoCompaction) { ...@@ -6251,13 +6248,12 @@ TEST_F(DBTest, CompactFilesShouldTriggerAutoCompaction) {
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
port::Thread manual_compaction_thread([&]() { port::Thread manual_compaction_thread([&]() {
auto s = db_->CompactFiles(CompactionOptions(), auto s = db_->CompactFiles(CompactionOptions(), db_->DefaultColumnFamily(),
db_->DefaultColumnFamily(), input_files, 0); input_files, 0);
ASSERT_OK(s); ASSERT_OK(s);
}); });
TEST_SYNC_POINT( TEST_SYNC_POINT("DBTest::CompactFilesShouldTriggerAutoCompaction:Begin");
"DBTest::CompactFilesShouldTriggerAutoCompaction:Begin");
// generate enough files to trigger compaction // generate enough files to trigger compaction
for (int i = 0; i < 20; ++i) { for (int i = 0; i < 20; ++i) {
for (int j = 0; j < 2; ++j) { for (int j = 0; j < 2; ++j) {
...@@ -6267,16 +6263,15 @@ TEST_F(DBTest, CompactFilesShouldTriggerAutoCompaction) { ...@@ -6267,16 +6263,15 @@ TEST_F(DBTest, CompactFilesShouldTriggerAutoCompaction) {
} }
db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data); db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data);
ASSERT_GT(cf_meta_data.levels[0].files.size(), ASSERT_GT(cf_meta_data.levels[0].files.size(),
options.level0_file_num_compaction_trigger); options.level0_file_num_compaction_trigger);
TEST_SYNC_POINT( TEST_SYNC_POINT("DBTest::CompactFilesShouldTriggerAutoCompaction:End");
"DBTest::CompactFilesShouldTriggerAutoCompaction:End");
manual_compaction_thread.join(); manual_compaction_thread.join();
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data); db_->GetColumnFamilyMetaData(db_->DefaultColumnFamily(), &cf_meta_data);
ASSERT_LE(cf_meta_data.levels[0].files.size(), ASSERT_LE(cf_meta_data.levels[0].files.size(),
options.level0_file_num_compaction_trigger); options.level0_file_num_compaction_trigger);
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
...@@ -6501,8 +6496,9 @@ class WriteStallListener : public EventListener { ...@@ -6501,8 +6496,9 @@ class WriteStallListener : public EventListener {
MutexLock l(&mutex_); MutexLock l(&mutex_);
return expected == condition_; return expected == condition_;
} }
private: private:
port::Mutex mutex_; port::Mutex mutex_;
WriteStallCondition condition_; WriteStallCondition condition_;
}; };
...@@ -6730,7 +6726,8 @@ TEST_F(DBTest, LastWriteBufferDelay) { ...@@ -6730,7 +6726,8 @@ TEST_F(DBTest, LastWriteBufferDelay) {
sleeping_task.WakeUp(); sleeping_task.WakeUp();
sleeping_task.WaitUntilDone(); sleeping_task.WaitUntilDone();
} }
#endif // !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION) #endif // !defined(ROCKSDB_LITE) &&
// !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
TEST_F(DBTest, FailWhenCompressionNotSupportedTest) { TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
CompressionType compressions[] = {kZlibCompression, kBZip2Compression, CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
...@@ -6815,6 +6812,89 @@ TEST_F(DBTest, PinnableSliceAndRowCache) { ...@@ -6815,6 +6812,89 @@ TEST_F(DBTest, PinnableSliceAndRowCache) {
1); 1);
} }
TEST_F(DBTest, ReusePinnableSlice) {
Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.row_cache = NewLRUCache(8192);
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
ASSERT_EQ(Get("foo"), "bar");
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
1);
{
PinnableSlice pin_slice;
ASSERT_EQ(Get("foo", &pin_slice), Status::OK());
ASSERT_EQ(Get("foo", &pin_slice), Status::OK());
ASSERT_EQ(pin_slice.ToString(), "bar");
// Entry is already in cache, lookup will remove the element from lru
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
0);
}
// After PinnableSlice destruction element is added back in LRU
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
1);
{
std::vector<Slice> multiget_keys;
multiget_keys.push_back("foo");
std::vector<PinnableSlice> multiget_values(1);
std::vector<Status> statuses({Status::NotFound()});
ReadOptions ropt;
dbfull()->MultiGet(ropt, dbfull()->DefaultColumnFamily(),
multiget_keys.size(), multiget_keys.data(),
multiget_values.data(), statuses.data());
ASSERT_EQ(Status::OK(), statuses[0]);
dbfull()->MultiGet(ropt, dbfull()->DefaultColumnFamily(),
multiget_keys.size(), multiget_keys.data(),
multiget_values.data(), statuses.data());
ASSERT_EQ(Status::OK(), statuses[0]);
// Entry is already in cache, lookup will remove the element from lru
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
0);
}
// After PinnableSlice destruction element is added back in LRU
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
1);
{
std::vector<ColumnFamilyHandle*> multiget_cfs;
multiget_cfs.push_back(dbfull()->DefaultColumnFamily());
std::vector<Slice> multiget_keys;
multiget_keys.push_back("foo");
std::vector<PinnableSlice> multiget_values(1);
std::vector<Status> statuses({Status::NotFound()});
ReadOptions ropt;
dbfull()->MultiGet(ropt, multiget_keys.size(), multiget_cfs.data(),
multiget_keys.data(), multiget_values.data(),
statuses.data());
ASSERT_EQ(Status::OK(), statuses[0]);
dbfull()->MultiGet(ropt, multiget_keys.size(), multiget_cfs.data(),
multiget_keys.data(), multiget_values.data(),
statuses.data());
ASSERT_EQ(Status::OK(), statuses[0]);
// Entry is already in cache, lookup will remove the element from lru
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
0);
}
// After PinnableSlice destruction element is added back in LRU
ASSERT_EQ(
reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
1);
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
TEST_F(DBTest, DeletingOldWalAfterDrop) { TEST_F(DBTest, DeletingOldWalAfterDrop) {
...@@ -6894,9 +6974,7 @@ TEST_F(DBTest, PauseBackgroundWorkTest) { ...@@ -6894,9 +6974,7 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
TEST_F(DBTest, ThreadLocalPtrDeadlock) { TEST_F(DBTest, ThreadLocalPtrDeadlock) {
std::atomic<int> flushes_done{0}; std::atomic<int> flushes_done{0};
std::atomic<int> threads_destroyed{0}; std::atomic<int> threads_destroyed{0};
auto done = [&] { auto done = [&] { return flushes_done.load() > 10; };
return flushes_done.load() > 10;
};
port::Thread flushing_thread([&] { port::Thread flushing_thread([&] {
for (int i = 0; !done(); ++i) { for (int i = 0; !done(); ++i) {
...@@ -6909,7 +6987,7 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) { ...@@ -6909,7 +6987,7 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) {
}); });
std::vector<port::Thread> thread_spawning_threads(10); std::vector<port::Thread> thread_spawning_threads(10);
for (auto& t: thread_spawning_threads) { for (auto& t : thread_spawning_threads) {
t = port::Thread([&] { t = port::Thread([&] {
while (!done()) { while (!done()) {
{ {
...@@ -6925,7 +7003,7 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) { ...@@ -6925,7 +7003,7 @@ TEST_F(DBTest, ThreadLocalPtrDeadlock) {
}); });
} }
for (auto& t: thread_spawning_threads) { for (auto& t : thread_spawning_threads) {
t.join(); t.join();
} }
flushing_thread.join(); flushing_thread.join();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册