diff --git a/db/db_bench.cc b/db/db_bench.cc index 096b1837728305239924fa49c9948e360b8fbfc9..1bc1caa73a3e9ad5a817032cda71f617fcf93d3d 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -13,6 +13,7 @@ #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/memtablerep.h" #include "rocksdb/write_batch.h" #include "rocksdb/statistics.h" #include "port/port.h" @@ -79,6 +80,7 @@ static const char* FLAGS_benchmarks = "snappycomp," "snappyuncomp," "acquireload," + "fillfromstdin," ; // the maximum size of key in bytes static const int MAX_KEY_SIZE = 128; @@ -906,6 +908,9 @@ class Benchmark { } else if (name == Slice("fillrandom")) { fresh_db = true; method = &Benchmark::WriteRandom; + } else if (name == Slice("fillfromstdin")) { + fresh_db = true; + method = &Benchmark::WriteFromStdin; } else if (name == Slice("filluniquerandom")) { fresh_db = true; if (num_threads > 1) { @@ -1342,6 +1347,54 @@ class Benchmark { DoWrite(thread, UNIQUE_RANDOM); } + void writeOrFail(WriteBatch& batch) { + Status s = db_->Write(write_options_, &batch); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + } + + void WriteFromStdin(ThreadState* thread) { + size_t count = 0; + WriteBatch batch; + const size_t bufferLen = 32 << 20; + unique_ptr line = unique_ptr(new char[bufferLen]); + char* linep = line.get(); + const int batchSize = 100 << 10; + const char columnSeparator = '\t'; + const char lineSeparator = '\n'; + + while (fgets(linep, bufferLen, stdin) != nullptr) { + ++count; + char* tab = std::find(linep, linep + bufferLen, columnSeparator); + if (tab == linep + bufferLen) { + fprintf(stderr, "[Error] No Key delimiter TAB at line %ld\n", count); + continue; + } + Slice key(linep, tab - linep); + tab++; + char* endLine = std::find(tab, linep + bufferLen, lineSeparator); + if (endLine == linep + bufferLen) { + fprintf(stderr, "[Error] No ENTER at end of line # %ld\n", count); + continue; + } + Slice value(tab, endLine - tab); + thread->stats.FinishedSingleOp(db_); + thread->stats.AddBytes(endLine - linep - 1); + + if (batch.Count() < batchSize) { + batch.Put(key, value); + continue; + } + writeOrFail(batch); + batch.Clear(); + } + if (batch.Count() > 0) { + writeOrFail(batch); + } + } + void DoWrite(ThreadState* thread, WriteMode write_mode) { const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0; const int num_ops = writes_ == 0 ? num_ : writes_ ; @@ -2320,8 +2373,8 @@ int main(int argc, char** argv) { } else { FLAGS_key_size = n; } - } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { - FLAGS_write_buffer_size = n; + } else if (sscanf(argv[i], "--write_buffer_size=%lld%c", &ll, &junk) == 1) { + FLAGS_write_buffer_size = ll; } else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--min_write_buffer_number_to_merge=%d%c", diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 14443c7314435922cb02fbf7642be12c9fc091a4..8770f60b98aaad5e81dd225a585e39c6abbed90f 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -36,11 +36,15 @@ class VectorRep : public MemTableRep { virtual ~VectorRep() override { } class Iterator : public MemTableRep::Iterator { + class VectorRep* vrep_; std::shared_ptr> bucket_; - typename std::vector::const_iterator cit_; + typename std::vector::const_iterator mutable cit_; const KeyComparator& compare_; + bool mutable sorted_; + void DoSort() const; public: - explicit Iterator(std::shared_ptr> bucket, + explicit Iterator(class VectorRep* vrep, + std::shared_ptr> bucket, const KeyComparator& compare); // Initialize an iterator over the specified collection. @@ -82,11 +86,12 @@ class VectorRep : public MemTableRep { virtual std::shared_ptr GetIterator() override; private: + friend class Iterator; typedef std::vector Bucket; std::shared_ptr bucket_; mutable port::RWMutex rwlock_; - bool immutable_ = false; - bool sorted_ = false; + bool immutable_; + bool sorted_; const KeyComparator& compare_; }; @@ -119,16 +124,42 @@ size_t VectorRep::ApproximateMemoryUsage() { VectorRep::VectorRep(const KeyComparator& compare, Arena* arena, size_t count) : bucket_(new Bucket(count)), + immutable_(false), + sorted_(false), compare_(compare) { } -VectorRep::Iterator::Iterator(std::shared_ptr> bucket, +VectorRep::Iterator::Iterator(class VectorRep* vrep, + std::shared_ptr> bucket, const KeyComparator& compare) -: bucket_(bucket), - cit_(bucket_->begin()), - compare_(compare) { } +: vrep_(vrep), + bucket_(bucket), + cit_(nullptr), + compare_(compare), + sorted_(false) { } + +void VectorRep::Iterator::DoSort() const { + // vrep is non-null means that we are working on an immutable memtable + if (!sorted_ && vrep_ != nullptr) { + WriteLock l(&vrep_->rwlock_); + if (!vrep_->sorted_) { + std::sort(bucket_->begin(), bucket_->end(), Compare(compare_)); + cit_ = bucket_->begin(); + vrep_->sorted_ = true; + } + sorted_ = true; + } + if (!sorted_) { + std::sort(bucket_->begin(), bucket_->end(), Compare(compare_)); + cit_ = bucket_->begin(); + sorted_ = true; + } + assert(sorted_); + assert(vrep_ == nullptr || vrep_->sorted_); +} // Returns true iff the iterator is positioned at a valid node. bool VectorRep::Iterator::Valid() const { + DoSort(); return cit_ != bucket_->end(); } @@ -165,6 +196,7 @@ void VectorRep::Iterator::Prev() { // Advance to the first entry with a key >= target void VectorRep::Iterator::Seek(const char* target) { + DoSort(); // Do binary search to find first value not less than the target cit_ = std::equal_range(bucket_->begin(), bucket_->end(), @@ -177,12 +209,14 @@ void VectorRep::Iterator::Seek(const char* target) { // Position at the first entry in collection. // Final state of iterator is Valid() iff collection is not empty. void VectorRep::Iterator::SeekToFirst() { + DoSort(); cit_ = bucket_->begin(); } // Position at the last entry in collection. // Final state of iterator is Valid() iff collection is not empty. void VectorRep::Iterator::SeekToLast() { + DoSort(); cit_ = bucket_->end(); if (bucket_->size() != 0) { --cit_; @@ -190,21 +224,16 @@ void VectorRep::Iterator::SeekToLast() { } std::shared_ptr VectorRep::GetIterator() { - std::shared_ptr tmp; ReadLock l(&rwlock_); + // Do not sort here. The sorting would be done the first time + // a Seek is performed on the iterator. if (immutable_) { - rwlock_.Unlock(); - rwlock_.WriteLock(); - tmp = bucket_; - if (!sorted_) { - std::sort(tmp->begin(), tmp->end(), Compare(compare_)); - sorted_ = true; - } + return std::make_shared(this, bucket_, compare_); } else { + std::shared_ptr tmp; tmp.reset(new Bucket(*bucket_)); // make a copy - std::sort(tmp->begin(), tmp->end(), Compare(compare_)); + return std::make_shared(nullptr, tmp, compare_); } - return std::make_shared(tmp, compare_); } } // anon namespace