diff --git a/Makefile b/Makefile index 556d7620730068dba4a9ae78d879e0cf73fa24ce..cd259fb9152980a4be0dd8dfc3eed44e6a4613e7 100644 --- a/Makefile +++ b/Makefile @@ -421,7 +421,7 @@ TEST_LIBS = \ librocksdb_env_basic_test.a # TODO: add back forward_iterator_bench, after making it build in all environemnts. -BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench column_aware_encoding_exp +BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench column_aware_encoding_exp persistent_cache_bench # if user didn't config LIBNAME, set the default ifeq ($(LIBNAME),) @@ -849,6 +849,9 @@ db_bench: tools/db_bench.o $(BENCHTOOLOBJECTS) cache_bench: util/cache_bench.o $(LIBOBJECTS) $(TESTUTIL) $(AM_LINK) +persistent_cache_bench: utilities/persistent_cache/persistent_cache_bench.o $(LIBOBJECTS) $(TESTUTIL) + $(AM_LINK) + memtablerep_bench: db/memtablerep_bench.o $(LIBOBJECTS) $(TESTUTIL) $(AM_LINK) diff --git a/utilities/persistent_cache/block_cache_tier.cc b/utilities/persistent_cache/block_cache_tier.cc index 57b08c1a22ac84fc7f2ad3bc9d9824791648ff2f..02b90e68df83dd81db2b73407a838f92e893957a 100644 --- a/utilities/persistent_cache/block_cache_tier.cc +++ b/utilities/persistent_cache/block_cache_tier.cc @@ -58,7 +58,7 @@ Status BlockCacheTier::Open() { NewCacheFile(); assert(cache_file_); - if (opt_.pipeline_writes_) { + if (opt_.pipeline_writes) { assert(!insert_th_.joinable()); insert_th_ = std::thread(&BlockCacheTier::InsertMain, this); } @@ -108,7 +108,7 @@ Status BlockCacheTier::CleanupCacheFolder(const std::string& folder) { Status BlockCacheTier::Close() { // stop the insert thread - if (opt_.pipeline_writes_ && insert_th_.joinable()) { + if (opt_.pipeline_writes && insert_th_.joinable()) { InsertOp op(/*quit=*/true); insert_ops_.Push(std::move(op)); insert_th_.join(); @@ -158,14 +158,14 @@ Status BlockCacheTier::Insert(const Slice& key, const char* data, // update stats stats_.bytes_pipelined_.Add(size); - if (opt_.pipeline_writes_) { + if (opt_.pipeline_writes) { // off load the write to the write thread insert_ops_.Push( InsertOp(key.ToString(), std::move(std::string(data, size)))); return Status::OK(); } - assert(!opt_.pipeline_writes_); + assert(!opt_.pipeline_writes); return InsertImpl(key, Slice(data, size)); } diff --git a/utilities/persistent_cache/persistent_cache_bench.cc b/utilities/persistent_cache/persistent_cache_bench.cc new file mode 100644 index 0000000000000000000000000000000000000000..3d7fbf0b8abd03f7df79aaf54f407c0dec0b4fee --- /dev/null +++ b/utilities/persistent_cache/persistent_cache_bench.cc @@ -0,0 +1,354 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#ifndef ROCKSDB_LITE +#include +#include +#include +#include +#include +#include + +#include "rocksdb/env.h" + +#include "utilities/persistent_cache/block_cache_tier.h" +#include "utilities/persistent_cache/persistent_cache_tier.h" +#include "utilities/persistent_cache/volatile_tier_impl.h" + +#include "port/port_posix.h" +#include "table/block_builder.h" +#include "util/histogram.h" +#include "util/mutexlock.h" +#include "util/stop_watch.h" + +DEFINE_int32(nsec, 10, "nsec"); +DEFINE_int32(nthread_write, 1, "Insert threads"); +DEFINE_int32(nthread_read, 1, "Lookup threads"); +DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile"); +DEFINE_string(log_path, "/tmp/log", "Path for the log file"); +DEFINE_uint64(cache_size, std::numeric_limits::max(), "Cache size"); +DEFINE_int32(iosize, 4 * 1024, "Read IO size"); +DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size"); +DEFINE_int32(writer_qdepth, 1, "File writer qdepth"); +DEFINE_bool(enable_pipelined_writes, false, "Enable async writes"); +DEFINE_string(cache_type, "block_cache", + "Cache type. (block_cache, volatile, tiered)"); +DEFINE_bool(benchmark, false, "Benchmark mode"); +DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier."); + +namespace rocksdb { + +std::unique_ptr NewVolatileCache() { + assert(FLAGS_cache_size != std::numeric_limits::max()); + std::unique_ptr pcache( + new VolatileCacheTier(FLAGS_cache_size)); + return pcache; +} + +std::unique_ptr NewBlockCache() { + std::shared_ptr log; + if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { + fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); + return nullptr; + } + + PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log); + opt.writer_dispatch_size = FLAGS_writer_iosize; + opt.writer_qdepth = FLAGS_writer_qdepth; + opt.pipeline_writes = FLAGS_enable_pipelined_writes; + opt.max_write_pipeline_backlog_size = std::numeric_limits::max(); + std::unique_ptr cache(new BlockCacheTier(opt)); + Status status = cache->Open(); + return cache; +} + +// create a new cache tier +// construct a tiered RAM+Block cache +std::unique_ptr NewTieredCache( + const size_t mem_size, const PersistentCacheConfig& opt) { + std::unique_ptr tcache(new PersistentTieredCache()); + // create primary tier + assert(mem_size); + auto pcache = + std::shared_ptr(new VolatileCacheTier(mem_size)); + tcache->AddTier(pcache); + // create secondary tier + auto scache = std::shared_ptr(new BlockCacheTier(opt)); + tcache->AddTier(scache); + + Status s = tcache->Open(); + assert(s.ok()); + return tcache; +} + +std::unique_ptr NewTieredCache() { + std::shared_ptr log; + if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) { + fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str()); + abort(); + } + + auto pct = FLAGS_volatile_cache_pct / static_cast(100); + PersistentCacheConfig opt(Env::Default(), FLAGS_path, + (1 - pct) * FLAGS_cache_size, log); + opt.writer_dispatch_size = FLAGS_writer_iosize; + opt.writer_qdepth = FLAGS_writer_qdepth; + opt.pipeline_writes = FLAGS_enable_pipelined_writes; + opt.max_write_pipeline_backlog_size = std::numeric_limits::max(); + return NewTieredCache(FLAGS_cache_size * pct, opt); +} + +// +// Benchmark driver +// +class CacheTierBenchmark { + public: + explicit CacheTierBenchmark(std::shared_ptr&& cache) + : cache_(cache) { + if (FLAGS_nthread_read) { + fprintf(stdout, "Pre-populating\n"); + Prepop(); + fprintf(stdout, "Pre-population completed\n"); + } + + stats_.Clear(); + + // Start IO threads + std::list threads; + Spawn(FLAGS_nthread_write, &threads, + std::bind(&CacheTierBenchmark::Write, this)); + Spawn(FLAGS_nthread_read, &threads, + std::bind(&CacheTierBenchmark::Read, this)); + + // Wait till FLAGS_nsec and then signal to quit + StopWatchNano t(Env::Default(), /*auto_start=*/true); + size_t sec = t.ElapsedNanos() / 1000000000ULL; + while (!quit_) { + sec = t.ElapsedNanos() / 1000000000ULL; + quit_ = sec > size_t(FLAGS_nsec); + /* sleep override */ sleep(1); + } + + // Wait for threads to exit + Join(&threads); + // Print stats + PrintStats(sec); + // Close the cache + cache_->TEST_Flush(); + cache_->Close(); + } + + private: + void PrintStats(const size_t sec) { + std::ostringstream msg; + msg << "Test stats" << std::endl + << "* Elapsed: " << sec << " s" << std::endl + << "* Write Latency:" << std::endl + << stats_.write_latency_.ToString() << std::endl + << "* Read Latency:" << std::endl + << stats_.read_latency_.ToString() << std::endl + << "* Bytes written:" << std::endl + << stats_.bytes_written_.ToString() << std::endl + << "* Bytes read:" << std::endl + << stats_.bytes_read_.ToString() << std::endl + << "Cache stats:" << std::endl + << cache_->PrintStats() << std::endl; + fprintf(stderr, "%s\n", msg.str().c_str()); + } + + // + // Insert implementation and corresponding helper functions + // + void Prepop() { + for (uint64_t i = 0; i < 1024 * 1024; ++i) { + InsertKey(i); + insert_key_limit_++; + read_key_limit_++; + } + + // Wait until data is flushed + cache_->TEST_Flush(); + // warmup the cache + for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) { + } + } + + void Write() { + while (!quit_) { + InsertKey(insert_key_limit_++); + } + } + + void InsertKey(const uint64_t key) { + // construct key + uint64_t k[3]; + Slice block_key = FillKey(k, key); + + // construct value + auto block = NewBlock(key); + + // insert + StopWatchNano timer(Env::Default(), /*auto_start=*/true); + while (true) { + Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize); + if (status.ok()) { + break; + } + + // transient error is possible if we run without pipelining + assert(!FLAGS_enable_pipelined_writes); + } + + // adjust stats + const size_t elapsed_micro = timer.ElapsedNanos() / 1000; + stats_.write_latency_.Add(elapsed_micro); + stats_.bytes_written_.Add(FLAGS_iosize); + } + + // + // Read implementation + // + void Read() { + while (!quit_) { + ReadKey(random() % read_key_limit_); + } + } + + void ReadKey(const uint64_t val) { + // construct key + uint64_t k[3]; + Slice key = FillKey(k, val); + + // Lookup in cache + StopWatchNano timer(Env::Default(), /*auto_start=*/true); + std::unique_ptr block; + uint64_t size; + Status status = cache_->Lookup(key, &block, &size); + if (!status.ok()) { + fprintf(stderr, "%s\n", status.ToString().c_str()); + } + assert(status.ok()); + assert(size == (uint64_t)FLAGS_iosize); + + // adjust stats + const size_t elapsed_micro = timer.ElapsedNanos() / 1000; + stats_.read_latency_.Add(elapsed_micro); + stats_.bytes_read_.Add(FLAGS_iosize); + + // verify content + if (!FLAGS_benchmark) { + auto expected_block = NewBlock(val); + assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0); + } + } + + // create data for a key by filling with a certain pattern + std::unique_ptr NewBlock(const uint64_t val) { + unique_ptr data(new char[FLAGS_iosize]); + memset(data.get(), val % 255, FLAGS_iosize); + return data; + } + + // spawn threads + void Spawn(const size_t n, std::list* threads, + const std::function& fn) { + for (size_t i = 0; i < n; ++i) { + threads->emplace_back(fn); + } + } + + // join threads + void Join(std::list* threads) { + for (auto& th : *threads) { + th.join(); + } + } + + // construct key + Slice FillKey(uint64_t (&k)[3], const uint64_t val) { + k[0] = k[1] = 0; + k[2] = val; + void* p = static_cast(&k); + return Slice(static_cast(p), sizeof(k)); + } + + // benchmark stats + struct Stats { + void Clear() { + bytes_written_.Clear(); + bytes_read_.Clear(); + read_latency_.Clear(); + write_latency_.Clear(); + } + + HistogramImpl bytes_written_; + HistogramImpl bytes_read_; + HistogramImpl read_latency_; + HistogramImpl write_latency_; + }; + + std::shared_ptr cache_; // cache implementation + std::atomic insert_key_limit_{0}; // data inserted upto + std::atomic read_key_limit_{0}; // data can be read safely upto + bool quit_ = false; // Quit thread ? + mutable Stats stats_; // Stats +}; + +} // namespace rocksdb + +// +// main +// +int main(int argc, char** argv) { + google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + + " [OPTIONS]..."); + google::ParseCommandLineFlags(&argc, &argv, false); + + std::ostringstream msg; + msg << "Config" << std::endl + << "======" << std::endl + << "* nsec=" << FLAGS_nsec << std::endl + << "* nthread_write=" << FLAGS_nthread_write << std::endl + << "* path=" << FLAGS_path << std::endl + << "* cache_size=" << FLAGS_cache_size << std::endl + << "* iosize=" << FLAGS_iosize << std::endl + << "* writer_iosize=" << FLAGS_writer_iosize << std::endl + << "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl + << "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes + << std::endl + << "* cache_type=" << FLAGS_cache_type << std::endl + << "* benchmark=" << FLAGS_benchmark << std::endl + << "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl; + + fprintf(stderr, "%s\n", msg.str().c_str()); + + std::shared_ptr cache; + if (FLAGS_cache_type == "block_cache") { + fprintf(stderr, "Using block cache implementation\n"); + cache = rocksdb::NewBlockCache(); + } else if (FLAGS_cache_type == "volatile") { + fprintf(stderr, "Using volatile cache implementation\n"); + cache = rocksdb::NewVolatileCache(); + } else if (FLAGS_cache_type == "tiered") { + fprintf(stderr, "Using tiered cache implementation\n"); + cache = rocksdb::NewTieredCache(); + } else { + fprintf(stderr, "Unknown option for cache\n"); + } + + assert(cache); + if (!cache) { + fprintf(stderr, "Error creating cache\n"); + abort(); + } + + std::unique_ptr benchmark( + new rocksdb::CacheTierBenchmark(std::move(cache))); + + return 0; +} +#else +int main(int, char**) { return 0; } +#endif diff --git a/utilities/persistent_cache/persistent_cache_tier.h b/utilities/persistent_cache/persistent_cache_tier.h index 2717742a2f2f519d96bdb1bf7a30dcc54ac68524..64ec3ae67c68e1990815422b2821e6776a93873f 100644 --- a/utilities/persistent_cache/persistent_cache_tier.h +++ b/utilities/persistent_cache/persistent_cache_tier.h @@ -177,7 +177,7 @@ struct PersistentCacheConfig { // parameter defines if pipelining is enabled or disabled // // default: true - bool pipeline_writes_ = true; + bool pipeline_writes = true; // max-write-pipeline-backlog-size //