提交 740d8b3d 编写于 作者: D dgrogan@chromium.org

Update from upstream @21551990

* Patch LevelDB to build for OSX and iOS
* Fix race condition in memtable iterator deletion.
* Other small fixes.

git-svn-id: https://leveldb.googlecode.com/svn/trunk@29 62dab493-f737-651d-591e-8d6aee1b9529
上级 da799095
......@@ -8,7 +8,21 @@ CC = g++
#OPT = -O2 -DNDEBUG
OPT = -g2
CFLAGS = -c -DLEVELDB_PLATFORM_POSIX -I. -I./include -std=c++0x $(OPT)
UNAME := $(shell uname)
ifeq ($(UNAME), Darwin)
# To build for iOS, set PLATFORM=IOS.
ifndef PLATFORM
PLATFORM=OSX
endif # PLATFORM
PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_OSX
PORT_MODULE = port_osx.o
else # UNAME
PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -std=c++0x
PORT_MODULE = port_posix.o
endif # UNAME
CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT)
LDFLAGS=-lpthread
......@@ -26,7 +40,7 @@ LIBOBJECTS = \
./db/version_edit.o \
./db/version_set.o \
./db/write_batch.o \
./port/port_posix.o \
./port/$(PORT_MODULE) \
./table/block.o \
./table/block_builder.o \
./table/format.o \
......@@ -69,13 +83,25 @@ TESTS = \
PROGRAMS = db_bench $(TESTS)
all: $(PROGRAMS)
LIBRARY = libleveldb.a
ifeq ($(PLATFORM), IOS)
# Only XCode can build executable applications for iOS.
all: $(LIBRARY)
else
all: $(PROGRAMS) $(LIBRARY)
endif
check: $(TESTS)
for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done
clean:
rm -f $(PROGRAMS) */*.o
-rm -f $(PROGRAMS) $(LIBRARY) */*.o ios-x86/*/*.o ios-arm/*/*.o
-rmdir -p ios-x86/* ios-arm/*
$(LIBRARY): $(LIBOBJECTS)
rm -f $@
$(AR) -rs $@ $(LIBOBJECTS)
db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL)
$(CC) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@
......@@ -122,8 +148,19 @@ version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CC) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and
# a device.
.cc.o:
mkdir -p ios-x86/$(dir $@)
$(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneSimulator.platform/Developer/SDKs/iPhoneSimulator4.3.sdk -arch i686 $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@)
$(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS4.3.sdk -arch armv6 -arch armv7 $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@
else
.cc.o:
$(CC) $(CFLAGS) $< -o $@
endif
# TODO(gabor): dependencies for .o files
# TODO(gabor): Build library
......@@ -29,6 +29,7 @@
// readrandom -- read N times in random order
// readhot -- read N times in random order from 1% section of DB
// crc32c -- repeated crc32c of 4K of data
// acquireload -- load N*1000 times
// Meta operations:
// compact -- Compact the entire DB
// stats -- Print DB stats
......@@ -50,6 +51,7 @@ static const char* FLAGS_benchmarks =
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"acquireload,"
;
// Number of key/values to place in database
......@@ -382,6 +384,8 @@ class Benchmark {
Compact();
} else if (name == Slice("crc32c")) {
Crc32c(4096, "(4K per op)");
} else if (name == Slice("acquireload")) {
AcquireLoad();
} else if (name == Slice("snappycomp")) {
SnappyCompress();
} else if (name == Slice("snappyuncomp")) {
......@@ -420,6 +424,22 @@ class Benchmark {
message_ = label;
}
void AcquireLoad() {
int dummy;
port::AtomicPointer ap(&dummy);
int count = 0;
void *ptr = NULL;
message_ = "(each op is 1000 loads)";
while (count < 100000) {
for (int i = 0; i < 1000; i++) {
ptr = ap.Acquire_Load();
}
count++;
FinishedSingleOp();
}
if (ptr == NULL) exit(1); // Disable unused variable warning.
}
void SnappyCompress() {
Slice input = gen_.Generate(Options().block_size);
int64_t bytes = 0;
......
......@@ -875,22 +875,49 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
return status;
}
namespace {
struct IterState {
port::Mutex* mu;
Version* version;
MemTable* mem;
MemTable* imm;
};
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
state->mu->Lock();
state->mem->Unref();
if (state->imm != NULL) state->imm->Unref();
state->version->Unref();
state->mu->Unlock();
delete state;
}
}
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber* latest_snapshot) {
IterState* cleanup = new IterState;
mutex_.Lock();
*latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators
std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
mem_->Ref();
if (imm_ != NULL) {
list.push_back(imm_->NewIterator());
imm_->Ref();
}
versions_->current()->AddIterators(options, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
internal_iter->RegisterCleanup(&DBImpl::Unref, this, versions_->current());
cleanup->mu = &mutex_;
cleanup->mem = mem_;
cleanup->imm = imm_;
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
mutex_.Unlock();
return internal_iter;
......@@ -937,13 +964,6 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) {
: latest_snapshot));
}
void DBImpl::Unref(void* arg1, void* arg2) {
DBImpl* impl = reinterpret_cast<DBImpl*>(arg1);
Version* v = reinterpret_cast<Version*>(arg2);
MutexLock l(&impl->mutex_);
v->Unref();
}
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
......
......@@ -77,10 +77,6 @@ class DBImpl : public DB {
// Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles();
// Called when an iterator over a particular version of the
// descriptor goes away.
static void Unref(void* arg1, void* arg2);
// Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable();
......
......@@ -801,6 +801,101 @@ TEST(DBTest, DBOpen_Options) {
db = NULL;
}
// Multi-threaded test:
namespace {
static const int kNumThreads = 4;
static const int kTestSeconds = 10;
static const int kNumKeys = 1000;
struct MTState {
DBTest* test;
port::AtomicPointer stop;
port::AtomicPointer counter[kNumThreads];
port::AtomicPointer thread_done[kNumThreads];
};
struct MTThread {
MTState* state;
int id;
};
static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
DB* db = t->state->test->db_;
uintptr_t counter = 0;
fprintf(stderr, "... starting thread %d\n", t->id);
Random rnd(1000 + t->id);
std::string value;
char valbuf[1500];
while (t->state->stop.Acquire_Load() == NULL) {
t->state->counter[t->id].Release_Store(reinterpret_cast<void*>(counter));
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
snprintf(keybuf, sizeof(keybuf), "%016d", key);
if (rnd.OneIn(2)) {
// Write values of the form <key, my id, counter>.
// We add some padding for force compactions.
snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d",
key, t->id, static_cast<int>(counter));
ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf)));
} else {
// Read a value and verify that it matches the pattern written above.
Status s = db->Get(ReadOptions(), Slice(keybuf), &value);
if (s.IsNotFound()) {
// Key has not yet been written
} else {
// Check that the writer thread counter is >= the counter in the value
ASSERT_OK(s);
int k, w, c;
ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value;
ASSERT_EQ(k, key);
ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads);
ASSERT_LE(c, reinterpret_cast<uintptr_t>(
t->state->counter[w].Acquire_Load()));
}
}
counter++;
}
t->state->thread_done[t->id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", t->id, int(counter));
}
}
TEST(DBTest, MultiThreaded) {
// Initialize state
MTState mt;
mt.test = this;
mt.stop.Release_Store(0);
for (int id = 0; id < kNumThreads; id++) {
mt.counter[id].Release_Store(0);
mt.thread_done[id].Release_Store(0);
}
// Start threads
MTThread thread[kNumThreads];
for (int id = 0; id < kNumThreads; id++) {
thread[id].state = &mt;
thread[id].id = id;
env_->StartThread(MTThreadBody, &thread[id]);
}
// Let them run for a while
env_->SleepForMicroseconds(kTestSeconds * 1000000);
// Stop the threads and wait for them to finish
mt.stop.Release_Store(&mt);
for (int id = 0; id < kNumThreads; id++) {
while (mt.thread_done[id].Acquire_Load() == NULL) {
env_->SleepForMicroseconds(100000);
}
}
}
namespace {
typedef std::map<std::string, std::string> KVMap;
}
......
......@@ -76,7 +76,7 @@ class LogTest {
return Status::OK();
}
virtual Status Skip(size_t n) {
virtual Status Skip(uint64_t n) {
if (n > contents_.size()) {
contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
......
......@@ -50,33 +50,24 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
explicit MemTableIterator(MemTable* mem, MemTable::Table* table) {
mem_ = mem;
iter_ = new MemTable::Table::Iterator(table);
mem->Ref();
}
virtual ~MemTableIterator() {
delete iter_;
mem_->Unref();
}
virtual bool Valid() const { return iter_->Valid(); }
virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_->SeekToFirst(); }
virtual void SeekToLast() { iter_->SeekToLast(); }
virtual void Next() { iter_->Next(); }
virtual void Prev() { iter_->Prev(); }
virtual Slice key() const { return GetLengthPrefixedSlice(iter_->key()); }
explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }
virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_.SeekToFirst(); }
virtual void SeekToLast() { iter_.SeekToLast(); }
virtual void Next() { iter_.Next(); }
virtual void Prev() { iter_.Prev(); }
virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }
virtual Slice value() const {
Slice key_slice = GetLengthPrefixedSlice(iter_->key());
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
virtual Status status() const { return Status::OK(); }
private:
MemTable* mem_;
MemTable::Table::Iterator* iter_;
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
......@@ -85,7 +76,7 @@ class MemTableIterator: public Iterator {
};
Iterator* MemTable::NewIterator() {
return new MemTableIterator(this, &table_);
return new MemTableIterator(&table_);
}
void MemTable::Add(SequenceNumber s, ValueType type,
......
......@@ -16,6 +16,8 @@
# include "port/port_chromium.h"
#elif defined(LEVELDB_PLATFORM_ANDROID)
# include "port/port_android.h"
#elif defined(LEVELDB_PLATFORM_OSX)
# include "port/port_osx.h"
#endif
#endif // STORAGE_LEVELDB_PORT_PORT_H_
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "port_osx.h"
#include <cstdlib>
#include <stdio.h>
#include <string.h>
#include "util/logging.h"
namespace leveldb {
namespace port {
static void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort();
}
}
Mutex::Mutex() { PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL)); }
Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); }
void Mutex::Lock() { PthreadCall("lock", pthread_mutex_lock(&mu_)); }
void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); }
CondVar::CondVar(Mutex* mu)
: mu_(mu) {
PthreadCall("init cv", pthread_cond_init(&cv_, NULL));
}
CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); }
void CondVar::Wait() {
PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_));
}
void CondVar::Signal() {
PthreadCall("signal", pthread_cond_signal(&cv_));
}
void CondVar::SignalAll() {
PthreadCall("broadcast", pthread_cond_broadcast(&cv_));
}
}
}
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// See port_example.h for documentation for the following types/functions.
#ifndef STORAGE_LEVELDB_PORT_PORT_OSX_H_
#define STORAGE_LEVELDB_PORT_PORT_OSX_H_
#include <libkern/OSAtomic.h>
#include <machine/endian.h>
#include <pthread.h>
#include <stdint.h>
#include <string>
namespace leveldb {
// The following 4 methods implemented here for the benefit of env_posix.cc.
inline size_t fread_unlocked(void *a, size_t b, size_t c, FILE *d) {
return fread(a, b, c, d);
}
inline size_t fwrite_unlocked(const void *a, size_t b, size_t c, FILE *d) {
return fwrite(a, b, c, d);
}
inline int fflush_unlocked(FILE *f) {
return fflush(f);
}
inline int fdatasync(int fd) {
return fsync(fd);
}
namespace port {
static const bool kLittleEndian = (__DARWIN_BYTE_ORDER == __DARWIN_LITTLE_ENDIAN);
// ------------------ Threading -------------------
// A Mutex represents an exclusive lock.
class Mutex {
public:
Mutex();
~Mutex();
void Lock();
void Unlock();
void AssertHeld() { }
private:
friend class CondVar;
pthread_mutex_t mu_;
// No copying
Mutex(const Mutex&);
void operator=(const Mutex&);
};
class CondVar {
public:
explicit CondVar(Mutex* mu);
~CondVar();
void Wait();
void Signal();
void SignalAll();
private:
pthread_cond_t cv_;
Mutex* mu_;
};
inline void MemoryBarrier() {
#if defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__))
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
__asm__ __volatile__("" : : : "memory");
#else
OSMemoryBarrier();
#endif
}
class AtomicPointer {
private:
void* ptr_;
public:
AtomicPointer() { }
explicit AtomicPointer(void* p) : ptr_(p) {}
inline void* Acquire_Load() const {
void* ptr = ptr_;
MemoryBarrier();
return ptr;
}
inline void Release_Store(void* v) {
MemoryBarrier();
ptr_ = v;
}
inline void* NoBarrier_Load() const {
return ptr_;
}
inline void NoBarrier_Store(void* v) {
ptr_ = v;
}
};
inline bool Snappy_Compress(const char* input, size_t input_length,
std::string* output) {
return false;
}
inline bool Snappy_Uncompress(const char* input_data, size_t input_length,
std::string* output) {
return false;
}
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
return false;
}
}
}
#endif // STORAGE_LEVELDB_PORT_PORT_OSX_H_
......@@ -3,7 +3,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/iterator.h"
#include "util/logging.h"
namespace leveldb {
......
......@@ -12,10 +12,6 @@ namespace leveldb {
// This can help avoid virtual function calls and also gives better
// cache locality.
class IteratorWrapper {
private:
Iterator* iter_;
bool valid_;
Slice key_;
public:
IteratorWrapper(): iter_(NULL), valid_(false) { }
explicit IteratorWrapper(Iterator* iter): iter_(NULL) {
......@@ -56,9 +52,12 @@ class IteratorWrapper {
key_ = iter_->key();
}
}
};
}
Iterator* iter_;
bool valid_;
Slice key_;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_
......@@ -4,6 +4,8 @@
#if defined(LEVELDB_PLATFORM_POSIX) || defined(LEVELDB_PLATFORM_ANDROID)
#include <unordered_set>
#elif defined(LEVELDB_PLATFORM_OSX)
#include <ext/hash_set>
#elif defined(LEVELDB_PLATFORM_CHROMIUM)
#include "base/hash_tables.h"
#else
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册