提交 48842ab3 编写于 作者: I Igor Canadi

Deprecate AtomicPointer

Summary: RocksDB already depends on C++11, so we might as well all the goodness that C++11 provides. This means that we don't need AtomicPointer anymore. The less things in port/, the easier it will be to port to other platforms.

Test Plan: make check + careful visual review verifying that NoBarried got memory_order_relaxed, while Acquire/Release methods got memory_order_acquire and memory_order_release

Reviewers: rven, yhchiang, ljin, sdong

Reviewed By: ljin

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D27543
上级 f37048ad
......@@ -188,15 +188,6 @@ if [ "$CROSS_COMPILE" = "true" -o "$FBCODE_BUILD" = "true" ]; then
# Also don't need any compilation tests if compiling on fbcode
true
else
# If -std=c++0x works, use <atomic>. Otherwise use port_posix.h.
$CXX $CFLAGS -std=c++0x -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <atomic>
int main() {}
EOF
if [ "$?" = 0 ]; then
COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_ATOMIC_PRESENT"
fi
# Test whether fallocate is available
$CXX $CFLAGS -x c++ - -o /dev/null 2>/dev/null <<EOF
#include <fcntl.h>
......
......@@ -54,7 +54,7 @@ RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib
CFLAGS="-B$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/gold -m64 -mtune=generic"
CFLAGS+=" -I $TOOLCHAIN_LIB_BASE/jemalloc/$TOOL_JEMALLOC/include -DHAVE_JEMALLOC"
CFLAGS+=" $LIBGCC_INCLUDE $GLIBC_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DSNAPPY -DGFLAGS=google -DZLIB -DBZIP2"
EXEC_LDFLAGS=" -Wl,--whole-archive $TOOLCHAIN_LIB_BASE/jemalloc/$TOOL_JEMALLOC/lib/libjemalloc.a"
......
......@@ -70,7 +70,7 @@ RANLIB=$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/ranlib
CFLAGS="-B$TOOLCHAIN_EXECUTABLES/binutils/binutils-2.21.1/da39a3e/bin/gold -m64 -mtune=generic"
CFLAGS+=" $LIBGCC_INCLUDE $GLIBC_INCLUDE"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_ATOMIC_PRESENT -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_FALLOCATE_PRESENT"
CFLAGS+=" -DSNAPPY -DGFLAGS=google -DZLIB -DBZIP2 -DLZ4 -DNUMA"
EXEC_LDFLAGS="-Wl,--dynamic-linker,/usr/local/fbcode/gcc-4.8.1-glibc-2.17/lib/ld.so"
......
......@@ -1533,13 +1533,13 @@ class Benchmark {
void AcquireLoad(ThreadState* thread) {
int dummy;
port::AtomicPointer ap(&dummy);
std::atomic<void*> ap;
int count = 0;
void *ptr = nullptr;
thread->stats.AddMessage("(each op is 1000 loads)");
while (count < 100000) {
for (int i = 0; i < 1000; i++) {
ptr = ap.Acquire_Load();
ptr = ap.load(std::memory_order_acquire);
}
count++;
thread->stats.FinishedOps(nullptr, nullptr, 1);
......
......@@ -326,7 +326,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
stats_(db_options_.statistics.get()),
db_lock_(nullptr),
mutex_(options.use_adaptive_mutex),
shutting_down_(nullptr),
shutting_down_(false),
bg_cv_(&mutex_),
logfile_number_(0),
log_empty_(true),
......@@ -388,7 +388,7 @@ DBImpl::~DBImpl() {
}
// Wait for background work to finish
shutting_down_.Release_Store(this); // Any non-nullptr value is ok
shutting_down_.store(true, std::memory_order_release);
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
bg_cv_.Wait();
}
......@@ -1615,7 +1615,8 @@ Status DBImpl::FlushMemTableToOutputFile(
Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit,
&file_number, log_buffer);
if (s.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
if (s.ok() &&
(shutting_down_.load(std::memory_order_acquire) || cfd->IsDropped())) {
s = Status::ShutdownInProgress(
"Database shutdown or Column family drop during flush");
}
......@@ -2014,7 +2015,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
bg_schedule_needed_ = false;
if (bg_work_gate_closed_) {
// gate closed for backgrond work
} else if (shutting_down_.Acquire_Load()) {
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else {
bool is_flush_pending = false;
......@@ -2129,7 +2130,7 @@ void DBImpl::BackgroundCallFlush() {
MutexLock l(&mutex_);
Status s;
if (!shutting_down_.Acquire_Load()) {
if (!shutting_down_.load(std::memory_order_acquire)) {
s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
......@@ -2196,7 +2197,7 @@ void DBImpl::BackgroundCallCompaction() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
Status s;
if (!shutting_down_.Acquire_Load()) {
if (!shutting_down_.load(std::memory_order_acquire)) {
s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
......@@ -2700,7 +2701,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
// flush thread will take care of this
return 0;
}
if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
if (cfd->imm()->imm_flush_needed.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
......@@ -2762,7 +2763,7 @@ Status DBImpl::ProcessKeyValueCompaction(
int64_t key_drop_newer_entry = 0;
int64_t key_drop_obsolete = 0;
int64_t loop_cnt = 0;
while (input->Valid() && !shutting_down_.Acquire_Load() &&
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire) &&
!cfd->IsDropped()) {
if (++loop_cnt > 1000) {
if (key_drop_user > 0) {
......@@ -3222,7 +3223,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact->compaction));
backup_input->SeekToFirst();
while (backup_input->Valid() && !shutting_down_.Acquire_Load() &&
while (backup_input->Valid() &&
!shutting_down_.load(std::memory_order_acquire) &&
!cfd->IsDropped()) {
// FLUSH preempts compaction
// TODO(icanadi) this currently only checks if flush is necessary on
......@@ -3356,7 +3358,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
log_buffer);
} // checking for compaction filter v2
if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
if (status.ok() &&
(shutting_down_.load(std::memory_order_acquire) || cfd->IsDropped())) {
status = Status::ShutdownInProgress(
"Database shutdown or Column family drop during compaction");
}
......
......@@ -488,7 +488,7 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
std::atomic<bool> shutting_down_;
// This condition variable is signaled on these conditions:
// * whenever bg_compaction_scheduled_ goes down to 0
// * if bg_manual_only_ > 0, whenever a compaction finishes, even if it hasn't
......
......@@ -121,25 +121,25 @@ static std::string Key(int i) {
class SpecialEnv : public EnvWrapper {
public:
// sstable Sync() calls are blocked while this pointer is non-nullptr.
port::AtomicPointer delay_sstable_sync_;
std::atomic<bool> delay_sstable_sync_;
// Drop writes on the floor while this pointer is non-nullptr.
port::AtomicPointer drop_writes_;
std::atomic<bool> drop_writes_;
// Simulate no-space errors while this pointer is non-nullptr.
port::AtomicPointer no_space_;
std::atomic<bool> no_space_;
// Simulate non-writable file system while this pointer is non-nullptr
port::AtomicPointer non_writable_;
std::atomic<bool> non_writable_;
// Force sync of manifest files to fail while this pointer is non-nullptr
port::AtomicPointer manifest_sync_error_;
std::atomic<bool> manifest_sync_error_;
// Force write to manifest files to fail while this pointer is non-nullptr
port::AtomicPointer manifest_write_error_;
std::atomic<bool> manifest_write_error_;
// Force write to log files to fail while this pointer is non-nullptr
port::AtomicPointer log_write_error_;
std::atomic<bool> log_write_error_;
bool count_random_reads_;
anon::AtomicCounter random_read_counter_;
......@@ -154,15 +154,15 @@ class SpecialEnv : public EnvWrapper {
std::atomic<int> sync_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(nullptr);
drop_writes_.Release_Store(nullptr);
no_space_.Release_Store(nullptr);
non_writable_.Release_Store(nullptr);
delay_sstable_sync_.store(false, std::memory_order_release);
drop_writes_.store(false, std::memory_order_release);
no_space_.store(false, std::memory_order_release);
non_writable_.store(false, std::memory_order_release);
count_random_reads_ = false;
count_sequential_reads_ = false;
manifest_sync_error_.Release_Store(nullptr);
manifest_write_error_.Release_Store(nullptr);
log_write_error_.Release_Store(nullptr);
manifest_sync_error_.store(false, std::memory_order_release);
manifest_write_error_.store(false, std::memory_order_release);
log_write_error_.store(false, std::memory_order_release);
bytes_written_ = 0;
sync_counter_ = 0;
}
......@@ -180,10 +180,10 @@ class SpecialEnv : public EnvWrapper {
base_(std::move(base)) {
}
Status Append(const Slice& data) {
if (env_->drop_writes_.Acquire_Load() != nullptr) {
if (env_->drop_writes_.load(std::memory_order_acquire)) {
// Drop writes on the floor
return Status::OK();
} else if (env_->no_space_.Acquire_Load() != nullptr) {
} else if (env_->no_space_.load(std::memory_order_acquire)) {
return Status::IOError("No space left on device");
} else {
env_->bytes_written_ += data.size();
......@@ -194,7 +194,7 @@ class SpecialEnv : public EnvWrapper {
Status Flush() { return base_->Flush(); }
Status Sync() {
++env_->sync_counter_;
while (env_->delay_sstable_sync_.Acquire_Load() != nullptr) {
while (env_->delay_sstable_sync_.load(std::memory_order_acquire)) {
env_->SleepForMicroseconds(100000);
}
return base_->Sync();
......@@ -211,7 +211,7 @@ class SpecialEnv : public EnvWrapper {
ManifestFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
: env_(env), base_(std::move(b)) { }
Status Append(const Slice& data) {
if (env_->manifest_write_error_.Acquire_Load() != nullptr) {
if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
......@@ -221,7 +221,7 @@ class SpecialEnv : public EnvWrapper {
Status Flush() { return base_->Flush(); }
Status Sync() {
++env_->sync_counter_;
if (env_->manifest_sync_error_.Acquire_Load() != nullptr) {
if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated sync error");
} else {
return base_->Sync();
......@@ -236,7 +236,7 @@ class SpecialEnv : public EnvWrapper {
LogFile(SpecialEnv* env, unique_ptr<WritableFile>&& b)
: env_(env), base_(std::move(b)) { }
Status Append(const Slice& data) {
if (env_->log_write_error_.Acquire_Load() != nullptr) {
if (env_->log_write_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
......@@ -250,7 +250,7 @@ class SpecialEnv : public EnvWrapper {
}
};
if (non_writable_.Acquire_Load() != nullptr) {
if (non_writable_.load(std::memory_order_acquire)) {
return Status::IOError("simulated write error");
}
......@@ -1211,7 +1211,8 @@ TEST(DBTest, Empty) {
handles_[1], "rocksdb.num-entries-active-mem-table", &num));
ASSERT_EQ("1", num);
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
// Block sync calls
env_->delay_sstable_sync_.store(true, std::memory_order_release);
Put(1, "k1", std::string(100000, 'x')); // Fill memtable
ASSERT_TRUE(dbfull()->GetProperty(
handles_[1], "rocksdb.num-entries-active-mem-table", &num));
......@@ -1223,7 +1224,8 @@ TEST(DBTest, Empty) {
ASSERT_EQ("1", num);
ASSERT_EQ("v1", Get(1, "foo"));
env_->delay_sstable_sync_.Release_Store(nullptr); // Release sync calls
// Release sync calls
env_->delay_sstable_sync_.store(false, std::memory_order_release);
ASSERT_OK(db_->DisableFileDeletions());
ASSERT_TRUE(
......@@ -1539,12 +1541,14 @@ TEST(DBTest, GetFromImmutableLayer) {
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_EQ("v1", Get(1, "foo"));
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
// Block sync calls
env_->delay_sstable_sync_.store(true, std::memory_order_release);
Put(1, "k1", std::string(100000, 'x')); // Fill memtable
Put(1, "k2", std::string(100000, 'y')); // Trigger flush
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
env_->delay_sstable_sync_.Release_Store(nullptr); // Release sync calls
// Release sync calls
env_->delay_sstable_sync_.store(false, std::memory_order_release);
} while (ChangeOptions());
}
......@@ -5776,7 +5780,8 @@ TEST(DBTest, DropWrites) {
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
env_->drop_writes_.Release_Store(env_); // Force out-of-space errors
// Force out-of-space errors
env_->drop_writes_.store(true, std::memory_order_release);
env_->sleep_counter_.Reset();
for (int i = 0; i < 5; i++) {
for (int level = 0; level < dbfull()->NumberLevels()-1; level++) {
......@@ -5788,7 +5793,7 @@ TEST(DBTest, DropWrites) {
ASSERT_TRUE(db_->GetProperty("rocksdb.background-errors", &property_value));
ASSERT_EQ("5", property_value);
env_->drop_writes_.Release_Store(nullptr);
env_->drop_writes_.store(false, std::memory_order_release);
ASSERT_LT(CountFiles(), num_files + 3);
// Check that compaction attempts slept after errors
......@@ -5805,7 +5810,8 @@ TEST(DBTest, DropWritesFlush) {
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
env_->drop_writes_.Release_Store(env_); // Force out-of-space errors
// Force out-of-space errors
env_->drop_writes_.store(true, std::memory_order_release);
std::string property_value;
// Background error count is 0 now.
......@@ -5829,7 +5835,7 @@ TEST(DBTest, DropWritesFlush) {
}
ASSERT_EQ("1", property_value);
env_->drop_writes_.Release_Store(nullptr);
env_->drop_writes_.store(false, std::memory_order_release);
} while (ChangeCompactOptions());
}
......@@ -5848,12 +5854,13 @@ TEST(DBTest, NoSpaceCompactRange) {
ASSERT_OK(Flush());
}
env_->no_space_.Release_Store(env_); // Force out-of-space errors
// Force out-of-space errors
env_->no_space_.store(true, std::memory_order_release);
Status s = db_->CompactRange(nullptr, nullptr);
ASSERT_TRUE(s.IsIOError());
env_->no_space_.Release_Store(nullptr);
env_->no_space_.store(false, std::memory_order_release);
} while (ChangeCompactOptions());
}
......@@ -5864,7 +5871,8 @@ TEST(DBTest, NonWritableFileSystem) {
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
env_->non_writable_.Release_Store(env_); // Force errors for new files
// Force errors for new files
env_->non_writable_.store(true, std::memory_order_release);
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
......@@ -5874,7 +5882,7 @@ TEST(DBTest, NonWritableFileSystem) {
}
}
ASSERT_GT(errors, 0);
env_->non_writable_.Release_Store(nullptr);
env_->non_writable_.store(false, std::memory_order_release);
} while (ChangeCompactOptions());
}
......@@ -5888,7 +5896,7 @@ TEST(DBTest, ManifestWriteError) {
// We iterate twice. In the second iteration, everything is the
// same except the log record never makes it to the MANIFEST file.
for (int iter = 0; iter < 2; iter++) {
port::AtomicPointer* error_type = (iter == 0)
std::atomic<bool>* error_type = (iter == 0)
? &env_->manifest_sync_error_
: &env_->manifest_write_error_;
......@@ -5909,12 +5917,12 @@ TEST(DBTest, ManifestWriteError) {
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
// Merging compaction (will fail)
error_type->Release_Store(env_);
error_type->store(true, std::memory_order_release);
dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data
error_type->Release_Store(nullptr);
error_type->store(false, std::memory_order_release);
Reopen(&options);
ASSERT_EQ("bar", Get("foo"));
}
......@@ -5938,10 +5946,10 @@ TEST(DBTest, PutFailsParanoid) {
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "foo1", "bar1"));
// simulate error
env_->log_write_error_.Release_Store(env_);
env_->log_write_error_.store(true, std::memory_order_release);
s = Put(1, "foo2", "bar2");
ASSERT_TRUE(!s.ok());
env_->log_write_error_.Release_Store(nullptr);
env_->log_write_error_.store(false, std::memory_order_release);
s = Put(1, "foo3", "bar3");
// the next put should fail, too
ASSERT_TRUE(!s.ok());
......@@ -5956,10 +5964,10 @@ TEST(DBTest, PutFailsParanoid) {
ASSERT_OK(Put(1, "foo", "bar"));
ASSERT_OK(Put(1, "foo1", "bar1"));
// simulate error
env_->log_write_error_.Release_Store(env_);
env_->log_write_error_.store(true, std::memory_order_release);
s = Put(1, "foo2", "bar2");
ASSERT_TRUE(!s.ok());
env_->log_write_error_.Release_Store(nullptr);
env_->log_write_error_.store(false, std::memory_order_release);
s = Put(1, "foo3", "bar3");
// the next put should NOT fail
ASSERT_TRUE(s.ok());
......@@ -6005,7 +6013,7 @@ TEST(DBTest, BloomFilter) {
Flush(1);
// Prevent auto compactions triggered by seeks
env_->delay_sstable_sync_.Release_Store(env_);
env_->delay_sstable_sync_.store(true, std::memory_order_release);
// Lookup present keys. Should rarely read from small sstable.
env_->random_read_counter_.Reset();
......@@ -6026,7 +6034,7 @@ TEST(DBTest, BloomFilter) {
fprintf(stderr, "%d missing => %d reads\n", N, reads);
ASSERT_LE(reads, 3*N/100);
env_->delay_sstable_sync_.Release_Store(nullptr);
env_->delay_sstable_sync_.store(false, std::memory_order_release);
Close();
} while (ChangeCompactOptions());
}
......@@ -7047,9 +7055,9 @@ static const int kNumKeys = 1000;
struct MTState {
DBTest* test;
port::AtomicPointer stop;
port::AtomicPointer counter[kNumThreads];
port::AtomicPointer thread_done[kNumThreads];
std::atomic<bool> stop;
std::atomic<int> counter[kNumThreads];
std::atomic<bool> thread_done[kNumThreads];
};
struct MTThread {
......@@ -7061,12 +7069,12 @@ static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_;
uintptr_t counter = 0;
int counter = 0;
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
char valbuf[1500];
while (t->state->stop.Acquire_Load() == nullptr) {
t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
while (t->state->stop.load(std::memory_order_acquire) == false) {
t->state->counter[id].store(counter, std::memory_order_release);
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
......@@ -7126,8 +7134,7 @@ static void MTThreadBody(void* arg) {
ASSERT_EQ(k, key);
ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads);
ASSERT_LE((unsigned int)c, reinterpret_cast<uintptr_t>(
t->state->counter[w].Acquire_Load()));
ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire));
ASSERT_EQ(cf, i);
if (i == 0) {
unique_id = u;
......@@ -7141,7 +7148,7 @@ static void MTThreadBody(void* arg) {
}
counter++;
}
t->state->thread_done[id].Release_Store(t);
t->state->thread_done[id].store(true, std::memory_order_release);
fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
}
......@@ -7157,10 +7164,10 @@ TEST(DBTest, MultiThreaded) {
// Initialize state
MTState mt;
mt.test = this;
mt.stop.Release_Store(0);
mt.stop.store(false, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
mt.counter[id].Release_Store(0);
mt.thread_done[id].Release_Store(0);
mt.counter[id].store(0, std::memory_order_release);
mt.thread_done[id].store(false, std::memory_order_release);
}
// Start threads
......@@ -7175,9 +7182,9 @@ TEST(DBTest, MultiThreaded) {
env_->SleepForMicroseconds(kTestSeconds * 1000000);
// Stop the threads and wait for them to finish
mt.stop.Release_Store(&mt);
mt.stop.store(true, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
while (mt.thread_done[id].Acquire_Load() == nullptr) {
while (mt.thread_done[id].load(std::memory_order_acquire) == false) {
env_->SleepForMicroseconds(100000);
}
}
......
......@@ -114,7 +114,7 @@ void MemTableListVersion::Remove(MemTable* m) {
bool MemTableList::IsFlushPending() const {
if ((flush_requested_ && num_flush_not_started_ >= 1) ||
(num_flush_not_started_ >= min_write_buffer_number_to_merge_)) {
assert(imm_flush_needed.NoBarrier_Load() != nullptr);
assert(imm_flush_needed.load(std::memory_order_relaxed));
return true;
}
return false;
......@@ -129,7 +129,7 @@ void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
assert(!m->flush_completed_);
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.Release_Store(nullptr);
imm_flush_needed.store(false, std::memory_order_release);
}
m->flush_in_progress_ = true; // flushing will start very soon
ret->push_back(m);
......@@ -155,7 +155,7 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
num_flush_not_started_++;
}
pending_outputs->erase(file_number);
imm_flush_needed.Release_Store(reinterpret_cast<void *>(1));
imm_flush_needed.store(true, std::memory_order_release);
}
// Record a successful flush in the manifest file
......@@ -236,7 +236,7 @@ Status MemTableList::InstallMemtableFlushResults(
num_flush_not_started_++;
pending_outputs->erase(m->file_number_);
m->file_number_ = 0;
imm_flush_needed.Release_Store((void *)1);
imm_flush_needed.store(true, std::memory_order_release);
}
++mem_id;
} while (!current_->memlist_.empty() && (m = current_->memlist_.back()) &&
......@@ -259,7 +259,7 @@ void MemTableList::Add(MemTable* m) {
m->MarkImmutable();
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
imm_flush_needed.Release_Store((void *)1);
imm_flush_needed.store(true, std::memory_order_release);
}
}
......
......@@ -78,12 +78,12 @@ class MemTableList {
public:
// A list of memtables.
explicit MemTableList(int min_write_buffer_number_to_merge)
: min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
: imm_flush_needed(false),
min_write_buffer_number_to_merge_(min_write_buffer_number_to_merge),
current_(new MemTableListVersion()),
num_flush_not_started_(0),
commit_in_progress_(false),
flush_requested_(false) {
imm_flush_needed.Release_Store(nullptr);
current_->Ref();
}
~MemTableList() {}
......@@ -92,7 +92,7 @@ class MemTableList {
// so that background threads can detect non-nullptr pointer to
// determine whether there is anything more to start flushing.
port::AtomicPointer imm_flush_needed;
std::atomic<bool> imm_flush_needed;
// Returns the total number of memtables in the list
int size() const;
......
......@@ -115,15 +115,14 @@ class SkipList {
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
port::AtomicPointer max_height_; // Height of the entire list
std::atomic<int> max_height_; // Height of the entire list
// Used for optimizing sequential insert patterns
Node** prev_;
int32_t prev_height_;
inline int GetMaxHeight() const {
return static_cast<int>(
reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load()));
return max_height_.load(std::memory_order_relaxed);
}
// Read/written only by Insert().
......@@ -169,35 +168,35 @@ struct SkipList<Key, Comparator>::Node {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
return (next_[n].load(std::memory_order_acquire));
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
next_[n].store(x, std::memory_order_release);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
next_[n].store(x, std::memory_order_relaxed);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
port::AtomicPointer next_[1];
std::atomic<Node*> next_[1];
};
template<typename Key, class Comparator>
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::NewNode(const Key& key, int height) {
char* mem = arena_->AllocateAligned(
sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (mem) Node(key);
}
......@@ -364,7 +363,7 @@ SkipList<Key, Comparator>::SkipList(const Comparator cmp, Arena* arena,
compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, max_height)),
max_height_(reinterpret_cast<void*>(1)),
max_height_(1),
prev_height_(1),
rnd_(0xdeadbeef) {
assert(kMaxHeight_ > 0);
......@@ -402,7 +401,7 @@ void SkipList<Key, Comparator>::Insert(const Key& key) {
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
......
......@@ -191,13 +191,11 @@ class ConcurrentTest {
// Per-key generation
struct State {
port::AtomicPointer generation[K];
void Set(int k, intptr_t v) {
generation[k].Release_Store(reinterpret_cast<void*>(v));
}
intptr_t Get(int k) {
return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
std::atomic<int> generation[K];
void Set(int k, int v) {
generation[k].store(v, std::memory_order_release);
}
int Get(int k) { return generation[k].load(std::memory_order_acquire); }
State() {
for (unsigned int k = 0; k < K; k++) {
......@@ -221,7 +219,7 @@ class ConcurrentTest {
// REQUIRES: External synchronization
void WriteStep(Random* rnd) {
const uint32_t k = rnd->Next() % K;
const intptr_t g = current_.Get(k) + 1;
const int g = current_.Get(k) + 1;
const Key key = MakeKey(k, g);
list_.Insert(key);
current_.Set(k, g);
......@@ -303,7 +301,7 @@ class TestState {
public:
ConcurrentTest t_;
int seed_;
port::AtomicPointer quit_flag_;
std::atomic<bool> quit_flag_;
enum ReaderState {
STARTING,
......@@ -312,10 +310,7 @@ class TestState {
};
explicit TestState(int s)
: seed_(s),
quit_flag_(nullptr),
state_(STARTING),
state_cv_(&mu_) {}
: seed_(s), quit_flag_(false), state_(STARTING), state_cv_(&mu_) {}
void Wait(ReaderState s) {
mu_.Lock();
......@@ -343,7 +338,7 @@ static void ConcurrentReader(void* arg) {
Random rnd(state->seed_);
int64_t reads = 0;
state->Change(TestState::RUNNING);
while (!state->quit_flag_.Acquire_Load()) {
while (!state->quit_flag_.load(std::memory_order_acquire)) {
state->t_.ReadStep(&rnd);
++reads;
}
......@@ -365,7 +360,7 @@ static void RunConcurrent(int run) {
for (int i = 0; i < kSize; i++) {
state.t_.WriteStep(&rnd);
}
state.quit_flag_.Release_Store(&state); // Any non-nullptr arg will do
state.quit_flag_.store(true, std::memory_order_release);
state.Wait(TestState::DONE);
}
}
......
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// AtomicPointer provides storage for a lock-free pointer.
// Platform-dependent implementation of AtomicPointer:
// - If the platform provides a cheap barrier, we use it with raw pointers
// - If cstdatomic is present (on newer versions of gcc, it is), we use
// a cstdatomic-based AtomicPointer. However we prefer the memory
// barrier based version, because at least on a gcc 4.4 32-bit build
// on linux, we have encountered a buggy <cstdatomic>
// implementation. Also, some <cstdatomic> implementations are much
// slower than a memory-barrier based implementation (~16ns for
// <cstdatomic> based acquire-load vs. ~1ns for a barrier based
// acquire-load).
// This code is based on atomicops-internals-* in Google's perftools:
// http://code.google.com/p/google-perftools/source/browse/#svn%2Ftrunk%2Fsrc%2Fbase
#ifndef PORT_ATOMIC_POINTER_H_
#define PORT_ATOMIC_POINTER_H_
#include <stdint.h>
#ifdef ROCKSDB_ATOMIC_PRESENT
#include <atomic>
#endif
#ifdef OS_WIN
#include <windows.h>
#endif
#ifdef OS_MACOSX
#include <libkern/OSAtomic.h>
#endif
#if defined(_M_X64) || defined(__x86_64__)
#define ARCH_CPU_X86_FAMILY 1
#elif defined(_M_IX86) || defined(__i386__) || defined(__i386)
#define ARCH_CPU_X86_FAMILY 1
#elif defined(__ARMEL__)
#define ARCH_CPU_ARM_FAMILY 1
#endif
namespace rocksdb {
namespace port {
// Define MemoryBarrier() if available
// Windows on x86
#if defined(OS_WIN) && defined(COMPILER_MSVC) && defined(ARCH_CPU_X86_FAMILY)
// windows.h already provides a MemoryBarrier(void) macro
// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
#define ROCKSDB_HAVE_MEMORY_BARRIER
// Gcc on x86
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
inline void MemoryBarrier() {
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
__asm__ __volatile__("" : : : "memory");
}
#define ROCKSDB_HAVE_MEMORY_BARRIER
// Sun Studio
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC)
inline void MemoryBarrier() {
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
asm volatile("" : : : "memory");
}
#define ROCKSDB_HAVE_MEMORY_BARRIER
// Mac OS
#elif defined(OS_MACOSX)
inline void MemoryBarrier() {
OSMemoryBarrier();
}
#define ROCKSDB_HAVE_MEMORY_BARRIER
// ARM Linux
#elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__)
typedef void (*LinuxKernelMemoryBarrierFunc)(void);
// The Linux ARM kernel provides a highly optimized device-specific memory
// barrier function at a fixed memory address that is mapped in every
// user-level process.
//
// This beats using CPU-specific instructions which are, on single-core
// devices, un-necessary and very costly (e.g. ARMv7-A "dmb" takes more
// than 180ns on a Cortex-A8 like the one on a Nexus One). Benchmarking
// shows that the extra function call cost is completely negligible on
// multi-core devices.
//
inline void MemoryBarrier() {
(*(LinuxKernelMemoryBarrierFunc)0xffff0fa0)();
}
#define ROCKSDB_HAVE_MEMORY_BARRIER
#endif
// AtomicPointer built using platform-specific MemoryBarrier()
#if defined(ROCKSDB_HAVE_MEMORY_BARRIER)
class AtomicPointer {
private:
void* rep_;
public:
AtomicPointer() { }
explicit AtomicPointer(void* p) : rep_(p) {}
inline void* NoBarrier_Load() const { return rep_; }
inline void NoBarrier_Store(void* v) { rep_ = v; }
inline void* Acquire_Load() const {
void* result = rep_;
MemoryBarrier();
return result;
}
inline void Release_Store(void* v) {
MemoryBarrier();
rep_ = v;
}
};
// AtomicPointer based on <atomic>
#elif defined(ROCKSDB_ATOMIC_PRESENT)
class AtomicPointer {
private:
std::atomic<void*> rep_;
public:
AtomicPointer() { }
explicit AtomicPointer(void* v) : rep_(v) { }
inline void* Acquire_Load() const {
return rep_.load(std::memory_order_acquire);
}
inline void Release_Store(void* v) {
rep_.store(v, std::memory_order_release);
}
inline void* NoBarrier_Load() const {
return rep_.load(std::memory_order_relaxed);
}
inline void NoBarrier_Store(void* v) {
rep_.store(v, std::memory_order_relaxed);
}
};
// We have neither MemoryBarrier(), nor <cstdatomic>
#else
#error Please implement AtomicPointer for this platform.
#endif
#undef ROCKSDB_HAVE_MEMORY_BARRIER
#undef ARCH_CPU_X86_FAMILY
#undef ARCH_CPU_ARM_FAMILY
} // namespace port
} // namespace rocksdb
#endif // PORT_ATOMIC_POINTER_H_
......@@ -75,35 +75,6 @@ typedef intptr_t OnceType;
#define LEVELDB_ONCE_INIT 0
extern void InitOnce(port::OnceType*, void (*initializer)());
// A type that holds a pointer that can be read or written atomically
// (i.e., without word-tearing.)
class AtomicPointer {
private:
intptr_t rep_;
public:
// Initialize to arbitrary value
AtomicPointer();
// Initialize to hold v
explicit AtomicPointer(void* v) : rep_(v) { }
// Read and return the stored pointer with the guarantee that no
// later memory access (read or write) by this thread can be
// reordered ahead of this read.
void* Acquire_Load() const;
// Set v as the stored pointer with the guarantee that no earlier
// memory access (read or write) by this thread can be reordered
// after this store.
void Release_Store(void* v);
// Read the stored pointer with no ordering guarantees.
void* NoBarrier_Load() const;
// Set va as the stored pointer with no ordering guarantees.
void NoBarrier_Store(void* v);
};
// ------------------ Compression -------------------
// Store the snappy compression of "input[0,input_length-1]" in *output.
......
......@@ -55,7 +55,6 @@
#include <string>
#include <string.h>
#include "rocksdb/options.h"
#include "port/atomic_pointer.h"
#ifndef PLATFORM_IS_LITTLE_ENDIAN
#define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN)
......
......@@ -58,7 +58,7 @@ static void DataPumpThreadBody(void* arg) {
}
struct ReplicationThread {
port::AtomicPointer stop;
std::atomic<bool> stop;
DB* db;
volatile size_t no_read;
};
......@@ -68,11 +68,11 @@ static void ReplicationThreadBody(void* arg) {
DB* db = t->db;
unique_ptr<TransactionLogIterator> iter;
SequenceNumber currentSeqNum = 1;
while (t->stop.Acquire_Load() != nullptr) {
while (!t->stop.load(std::memory_order_acquire)) {
iter.reset();
Status s;
while(!db->GetUpdatesSince(currentSeqNum, &iter).ok()) {
if (t->stop.Acquire_Load() == nullptr) {
if (t->stop.load(std::memory_order_acquire)) {
return;
}
}
......@@ -129,11 +129,11 @@ int main(int argc, const char** argv) {
ReplicationThread replThread;
replThread.db = db;
replThread.no_read = 0;
replThread.stop.Release_Store(env); // store something to make it non-null.
replThread.stop.store(false, std::memory_order_release);
env->StartThread(ReplicationThreadBody, &replThread);
while(replThread.no_read < FLAGS_num_inserts);
replThread.stop.Release_Store(nullptr);
replThread.stop.store(true, std::memory_order_release);
if (replThread.no_read < dataPump.no_records) {
// no. read should be => than inserted.
fprintf(stderr, "No. of Record's written and read not same\nRead : %zu"
......
......@@ -44,30 +44,31 @@ class EnvPosixTest {
};
static void SetBool(void* ptr) {
reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
reinterpret_cast<std::atomic<bool>*>(ptr)
->store(true, std::memory_order_relaxed);
}
TEST(EnvPosixTest, RunImmediately) {
port::AtomicPointer called (nullptr);
std::atomic<bool> called(false);
env_->Schedule(&SetBool, &called);
Env::Default()->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called.NoBarrier_Load() != nullptr);
ASSERT_TRUE(called.load(std::memory_order_relaxed));
}
TEST(EnvPosixTest, RunMany) {
port::AtomicPointer last_id (nullptr);
std::atomic<int> last_id(0);
struct CB {
port::AtomicPointer* last_id_ptr; // Pointer to shared slot
uintptr_t id; // Order# for the execution of this callback
std::atomic<int>* last_id_ptr; // Pointer to shared slot
int id; // Order# for the execution of this callback
CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { }
CB(std::atomic<int>* p, int i) : last_id_ptr(p), id(i) {}
static void Run(void* v) {
CB* cb = reinterpret_cast<CB*>(v);
void* cur = cb->last_id_ptr->NoBarrier_Load();
ASSERT_EQ(cb->id-1, reinterpret_cast<uintptr_t>(cur));
cb->last_id_ptr->Release_Store(reinterpret_cast<void*>(cb->id));
int cur = cb->last_id_ptr->load(std::memory_order_relaxed);
ASSERT_EQ(cb->id - 1, cur);
cb->last_id_ptr->store(cb->id, std::memory_order_release);
}
};
......@@ -82,8 +83,8 @@ TEST(EnvPosixTest, RunMany) {
env_->Schedule(&CB::Run, &cb4);
Env::Default()->SleepForMicroseconds(kDelayMicros);
void* cur = last_id.Acquire_Load();
ASSERT_EQ(4U, reinterpret_cast<uintptr_t>(cur));
int cur = last_id.load(std::memory_order_acquire);
ASSERT_EQ(4, cur);
}
struct State {
......
......@@ -8,12 +8,12 @@
#include "util/hash_linklist_rep.h"
#include <algorithm>
#include <atomic>
#include "rocksdb/memtablerep.h"
#include "util/arena.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "port/port.h"
#include "port/atomic_pointer.h"
#include "util/histogram.h"
#include "util/murmurhash.h"
#include "db/memtable.h"
......@@ -24,7 +24,7 @@ namespace {
typedef const char* Key;
typedef SkipList<Key, const MemTableRep::KeyComparator&> MemtableSkipList;
typedef port::AtomicPointer Pointer;
typedef std::atomic<void*> Pointer;
// A data structure used as the header of a link list of a hash bucket.
struct BucketHeader {
......@@ -34,7 +34,9 @@ struct BucketHeader {
explicit BucketHeader(void* n, uint32_t count)
: next(n), num_entries(count) {}
bool IsSkipListBucket() { return next.NoBarrier_Load() == this; }
bool IsSkipListBucket() {
return next.load(std::memory_order_relaxed) == this;
}
};
// A data structure used as the header of a skip list of a hash bucket.
......@@ -55,24 +57,23 @@ struct Node {
Node* Next() {
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_.Acquire_Load());
return next_.load(std::memory_order_acquire);
}
void SetNext(Node* x) {
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_.Release_Store(x);
next_.store(x, std::memory_order_release);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next() {
return reinterpret_cast<Node*>(next_.NoBarrier_Load());
return next_.load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(Node* x) {
next_.NoBarrier_Store(x);
}
void NoBarrier_SetNext(Node* x) { next_.store(x, std::memory_order_relaxed); }
private:
port::AtomicPointer next_;
std::atomic<Node*> next_;
public:
char key[0];
};
......@@ -174,7 +175,7 @@ class HashLinkListRep : public MemTableRep {
// Maps slices (which are transformed user keys) to buckets of keys sharing
// the same transform.
port::AtomicPointer* buckets_;
Pointer* buckets_;
const uint32_t threshold_use_skiplist_;
......@@ -203,7 +204,7 @@ class HashLinkListRep : public MemTableRep {
}
Pointer* GetBucket(size_t i) const {
return static_cast<Pointer*>(buckets_[i].Acquire_Load());
return static_cast<Pointer*>(buckets_[i].load(std::memory_order_acquire));
}
Pointer* GetBucket(const Slice& slice) const {
......@@ -467,13 +468,13 @@ HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare,
logger_(logger),
bucket_entries_logging_threshold_(bucket_entries_logging_threshold),
if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) {
char* mem = arena_->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size,
char* mem = arena_->AllocateAligned(sizeof(Pointer) * bucket_size,
huge_page_tlb_size, logger);
buckets_ = new (mem) port::AtomicPointer[bucket_size];
buckets_ = new (mem) Pointer[bucket_size];
for (size_t i = 0; i < bucket_size_; ++i) {
buckets_[i].NoBarrier_Store(nullptr);
buckets_[i].store(nullptr, std::memory_order_relaxed);
}
}
......@@ -492,7 +493,7 @@ SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader(
if (first_next_pointer == nullptr) {
return nullptr;
}
if (first_next_pointer->NoBarrier_Load() == nullptr) {
if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) {
// Single entry bucket
return nullptr;
}
......@@ -502,8 +503,8 @@ SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader(
assert(header->num_entries > threshold_use_skiplist_);
auto* skip_list_bucket_header =
reinterpret_cast<SkipListBucketHeader*>(header);
assert(skip_list_bucket_header->Counting_header.next.NoBarrier_Load() ==
header);
assert(skip_list_bucket_header->Counting_header.next.load(
std::memory_order_relaxed) == header);
return skip_list_bucket_header;
}
assert(header->num_entries <= threshold_use_skiplist_);
......@@ -514,7 +515,7 @@ Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const {
if (first_next_pointer == nullptr) {
return nullptr;
}
if (first_next_pointer->NoBarrier_Load() == nullptr) {
if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) {
// Single entry bucket
return reinterpret_cast<Node*>(first_next_pointer);
}
......@@ -522,7 +523,8 @@ Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const {
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
if (!header->IsSkipListBucket()) {
assert(header->num_entries <= threshold_use_skiplist_);
return reinterpret_cast<Node*>(header->next.NoBarrier_Load());
return reinterpret_cast<Node*>(
header->next.load(std::memory_order_relaxed));
}
assert(header->num_entries > threshold_use_skiplist_);
return nullptr;
......@@ -534,19 +536,20 @@ void HashLinkListRep::Insert(KeyHandle handle) {
Slice internal_key = GetLengthPrefixedSlice(x->key);
auto transformed = GetPrefix(internal_key);
auto& bucket = buckets_[GetHash(transformed)];
Pointer* first_next_pointer = static_cast<Pointer*>(bucket.NoBarrier_Load());
Pointer* first_next_pointer =
static_cast<Pointer*>(bucket.load(std::memory_order_relaxed));
if (first_next_pointer == nullptr) {
// Case 1. empty bucket
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(nullptr);
bucket.Release_Store(x);
bucket.store(x, std::memory_order_release);
return;
}
BucketHeader* header = nullptr;
if (first_next_pointer->NoBarrier_Load() == nullptr) {
if (first_next_pointer->load(std::memory_order_relaxed) == nullptr) {
// Case 2. only one entry in the bucket
// Need to convert to a Counting bucket and turn to case 4.
Node* first = reinterpret_cast<Node*>(first_next_pointer);
......@@ -557,7 +560,7 @@ void HashLinkListRep::Insert(KeyHandle handle) {
// think the node is a bucket header.
auto* mem = arena_->AllocateAligned(sizeof(BucketHeader));
header = new (mem) BucketHeader(first, 1);
bucket.Release_Store(header);
bucket.store(header, std::memory_order_release);
} else {
header = reinterpret_cast<BucketHeader*>(first_next_pointer);
if (header->IsSkipListBucket()) {
......@@ -585,7 +588,8 @@ void HashLinkListRep::Insert(KeyHandle handle) {
// Case 3. number of entries reaches the threshold so need to convert to
// skip list.
LinkListIterator bucket_iter(
this, reinterpret_cast<Node*>(first_next_pointer->NoBarrier_Load()));
this, reinterpret_cast<Node*>(
first_next_pointer->load(std::memory_order_relaxed)));
auto mem = arena_->AllocateAligned(sizeof(SkipListBucketHeader));
SkipListBucketHeader* new_skip_list_header = new (mem)
SkipListBucketHeader(compare_, arena_, header->num_entries + 1);
......@@ -599,11 +603,12 @@ void HashLinkListRep::Insert(KeyHandle handle) {
// insert the new entry
skip_list.Insert(x->key);
// Set the bucket
bucket.Release_Store(new_skip_list_header);
bucket.store(new_skip_list_header, std::memory_order_release);
} else {
// Case 5. Need to insert to the sorted linked list without changing the
// header.
Node* first = reinterpret_cast<Node*>(header->next.NoBarrier_Load());
Node* first =
reinterpret_cast<Node*>(header->next.load(std::memory_order_relaxed));
assert(first != nullptr);
// Advance counter unless the bucket needs to be advanced to skip list.
// In that case, we need to make sure the previous count never exceeds
......@@ -640,7 +645,7 @@ void HashLinkListRep::Insert(KeyHandle handle) {
if (prev) {
prev->SetNext(x);
} else {
header->next.Release_Store(static_cast<void*>(x));
header->next.store(static_cast<void*>(x), std::memory_order_release);
}
}
}
......
......@@ -7,12 +7,13 @@
#ifndef ROCKSDB_LITE
#include "util/hash_skiplist_rep.h"
#include <atomic>
#include "rocksdb/memtablerep.h"
#include "util/arena.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "port/port.h"
#include "port/atomic_pointer.h"
#include "util/murmurhash.h"
#include "db/memtable.h"
#include "db/skiplist.h"
......@@ -54,7 +55,7 @@ class HashSkipListRep : public MemTableRep {
// Maps slices (which are transformed user keys) to buckets of keys sharing
// the same transform.
port::AtomicPointer* buckets_;
std::atomic<Bucket*>* buckets_;
// The user-supplied transform whose domain is the user keys.
const SliceTransform* transform_;
......@@ -67,7 +68,7 @@ class HashSkipListRep : public MemTableRep {
return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_;
}
inline Bucket* GetBucket(size_t i) const {
return static_cast<Bucket*>(buckets_[i].Acquire_Load());
return buckets_[i].load(std::memory_order_acquire);
}
inline Bucket* GetBucket(const Slice& slice) const {
return GetBucket(GetHash(slice));
......@@ -229,12 +230,11 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare,
transform_(transform),
compare_(compare),
arena_(arena) {
auto mem =
arena->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size);
buckets_ = new (mem) port::AtomicPointer[bucket_size];
auto mem = arena->AllocateAligned(sizeof(std::atomic<void*>) * bucket_size);
buckets_ = new (mem) std::atomic<Bucket*>[bucket_size];
for (size_t i = 0; i < bucket_size_; ++i) {
buckets_[i].NoBarrier_Store(nullptr);
buckets_[i].store(nullptr, std::memory_order_relaxed);
}
}
......@@ -249,7 +249,7 @@ HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket(
auto addr = arena_->AllocateAligned(sizeof(Bucket));
bucket = new (addr) Bucket(compare_, arena_, skiplist_height_,
skiplist_branching_factor_);
buckets_[hash].Release_Store(static_cast<void*>(bucket));
buckets_[hash].store(bucket, std::memory_order_release);
}
return bucket;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册