diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index cee43dff643fad4ee323af86820138bc71a083b9..7721ee476a4de4ed36eae672bd818d2fb6520e6a 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -798,7 +798,6 @@ DEFINE_string(trace_file, "", "Trace workload to a file. "); DEFINE_int32(trace_replay_fast_forward, 1, "Fast forward trace replay, must >= 1. "); - DEFINE_int32(block_cache_trace_sampling_frequency, 1, "Block cache trace sampling frequency, termed s. It uses spatial " "downsampling and samples accesses to one out of s blocks."); @@ -809,6 +808,8 @@ DEFINE_int64( "will not be logged if the trace file size exceeds this threshold. Default " "is 64 GB."); DEFINE_string(block_cache_trace_file, "", "Block cache trace file path."); +DEFINE_int32(trace_replay_threads, 1, + "The number of threads to replay, must >=1."); static enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { assert(ctype); @@ -6529,7 +6530,8 @@ class Benchmark { std::move(trace_reader)); replayer.SetFastForward( static_cast(FLAGS_trace_replay_fast_forward)); - s = replayer.Replay(); + s = replayer.MultiThreadReplay( + static_cast(FLAGS_trace_replay_threads)); if (s.ok()) { fprintf(stdout, "Replay started from trace_file: %s\n", FLAGS_trace_file.c_str()); diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc index b444ab371d93bef873ede9d1a54fc29eb3befc01..ef1ae1785a47499d28b7599d8f7c15641619c4d4 100644 --- a/trace_replay/trace_replay.cc +++ b/trace_replay/trace_replay.cc @@ -13,6 +13,7 @@ #include "rocksdb/write_batch.h" #include "util/coding.h" #include "util/string_util.h" +#include "util/threadpool_imp.h" namespace rocksdb { @@ -173,6 +174,7 @@ Replayer::Replayer(DB* db, const std::vector& handles, : trace_reader_(std::move(reader)) { assert(db != nullptr); db_ = static_cast(db->GetRootDB()); + env_ = Env::Default(); for (ColumnFamilyHandle* cfh : handles) { cf_map_[cfh->GetID()] = cfh; } @@ -285,6 +287,78 @@ Status Replayer::Replay() { return s; } +// The trace can be replayed with multithread by configurnge the number of +// threads in the thread pool. Trace records are read from the trace file +// sequentially and the corresponding queries are scheduled in the task +// queue based on the timestamp. Currently, we support Write_batch (Put, +// Delete, SingleDelete, DeleteRange), Get, Iterator (Seek and SeekForPrev). +Status Replayer::MultiThreadReplay(uint32_t threads_num) { + Status s; + Trace header; + s = ReadHeader(&header); + if (!s.ok()) { + return s; + } + + ThreadPoolImpl thread_pool; + thread_pool.SetHostEnv(env_); + + if (threads_num > 1) { + thread_pool.SetBackgroundThreads(static_cast(threads_num)); + } else { + thread_pool.SetBackgroundThreads(1); + } + + std::chrono::system_clock::time_point replay_epoch = + std::chrono::system_clock::now(); + WriteOptions woptions; + ReadOptions roptions; + ReplayerWorkerArg* ra; + uint64_t ops = 0; + while (s.ok()) { + ra = new ReplayerWorkerArg; + ra->db = db_; + s = ReadTrace(&(ra->trace_entry)); + if (!s.ok()) { + break; + } + ra->woptions = woptions; + ra->roptions = roptions; + + std::this_thread::sleep_until( + replay_epoch + std::chrono::microseconds( + (ra->trace_entry.ts - header.ts) / fast_forward_)); + if (ra->trace_entry.type == kTraceWrite) { + thread_pool.Schedule(&Replayer::BGWorkWriteBatch, ra, nullptr, nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceGet) { + thread_pool.Schedule(&Replayer::BGWorkGet, ra, nullptr, nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceIteratorSeek) { + thread_pool.Schedule(&Replayer::BGWorkIterSeek, ra, nullptr, nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceIteratorSeekForPrev) { + thread_pool.Schedule(&Replayer::BGWorkIterSeekForPrev, ra, nullptr, + nullptr); + ops++; + } else if (ra->trace_entry.type == kTraceEnd) { + // Do nothing for now. + // TODO: Add some validations later. + delete ra; + break; + } + } + + if (s.IsIncomplete()) { + // Reaching eof returns Incomplete status at the moment. + // Could happen when killing a process without calling EndTrace() API. + // TODO: Add better error handling. + s = Status::OK(); + } + thread_pool.JoinAllThreads(); + return s; +} + Status Replayer::ReadHeader(Trace* header) { assert(header != nullptr); Status s = ReadTrace(header); @@ -325,4 +399,82 @@ Status Replayer::ReadTrace(Trace* trace) { return TracerHelper::DecodeTrace(encoded_trace, trace); } +void Replayer::BGWorkGet(void* arg) { + std::unique_ptr ra( + reinterpret_cast(arg)); + auto cf_map = static_cast*>( + ra->cf_map); + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); + if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + return; + } + + std::string value; + if (cf_id == 0) { + ra->db->Get(ra->roptions, key, &value); + } else { + ra->db->Get(ra->roptions, (*cf_map)[cf_id], key, &value); + } + + return; +} + +void Replayer::BGWorkWriteBatch(void* arg) { + std::unique_ptr ra( + reinterpret_cast(arg)); + WriteBatch batch(ra->trace_entry.payload); + ra->db->Write(ra->woptions, &batch); + return; +} + +void Replayer::BGWorkIterSeek(void* arg) { + std::unique_ptr ra( + reinterpret_cast(arg)); + auto cf_map = static_cast*>( + ra->cf_map); + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); + if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + return; + } + + std::string value; + Iterator* single_iter = nullptr; + if (cf_id == 0) { + single_iter = ra->db->NewIterator(ra->roptions); + } else { + single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); + } + single_iter->Seek(key); + delete single_iter; + return; +} + +void Replayer::BGWorkIterSeekForPrev(void* arg) { + std::unique_ptr ra( + reinterpret_cast(arg)); + auto cf_map = static_cast*>( + ra->cf_map); + uint32_t cf_id = 0; + Slice key; + DecodeCFAndKey(ra->trace_entry.payload, &cf_id, &key); + if (cf_id > 0 && cf_map->find(cf_id) == cf_map->end()) { + return; + } + + std::string value; + Iterator* single_iter = nullptr; + if (cf_id == 0) { + single_iter = ra->db->NewIterator(ra->roptions); + } else { + single_iter = ra->db->NewIterator(ra->roptions, (*cf_map)[cf_id]); + } + single_iter->SeekForPrev(key); + delete single_iter; + return; +} + } // namespace rocksdb diff --git a/trace_replay/trace_replay.h b/trace_replay/trace_replay.h index d6956317096d3940dcbfc6ab102254ae297cafcb..776a1e0ca1894fbaa30d8b25ba2e0a2c88a14849 100644 --- a/trace_replay/trace_replay.h +++ b/trace_replay/trace_replay.h @@ -137,6 +137,11 @@ class Replayer { // between the traces into consideration. Status Replay(); + // Replay the provide trace stream, which is the same as Replay(), with + // multi-threads. Queries are scheduled in the thread pool job queue. + // User can set the number of threads in the thread pool. + Status MultiThreadReplay(uint32_t threads_num); + // Enables fast forwarding a replay by reducing the delay between the ingested // traces. // fast_forward : Rate of replay speedup. @@ -149,10 +154,36 @@ class Replayer { Status ReadFooter(Trace* footer); Status ReadTrace(Trace* trace); + // The background function for MultiThreadReplay to execute Get query + // based on the trace records. + static void BGWorkGet(void* arg); + + // The background function for MultiThreadReplay to execute WriteBatch + // (Put, Delete, SingleDelete, DeleteRange) based on the trace records. + static void BGWorkWriteBatch(void* arg); + + // The background function for MultiThreadReplay to execute Iterator (Seek) + // based on the trace records. + static void BGWorkIterSeek(void* arg); + + // The background function for MultiThreadReplay to execute Iterator + // (SeekForPrev) based on the trace records. + static void BGWorkIterSeekForPrev(void* arg); + DBImpl* db_; + Env* env_; std::unique_ptr trace_reader_; std::unordered_map cf_map_; uint32_t fast_forward_; }; +// The passin arg of MultiThreadRepkay for each trace record. +struct ReplayerWorkerArg { + DB* db; + Trace trace_entry; + std::unordered_map* cf_map; + WriteOptions woptions; + ReadOptions roptions; +}; + } // namespace rocksdb