diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 7f70d1c8e863b9a08f8c7387da3647c9ea5c5f9b..018f3832a559380d1bc9d178b6b3bf1395ce7660 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -104,4 +104,11 @@ class EventListener { } // namespace rocksdb +#else + +namespace rocksdb { +class EventListener { +}; +} + #endif // ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 10c36019eea96478d19e312114a2bd9396935317..66fe9e2b63e9bdfecfc6ee6972b6cd595796ccb5 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1018,11 +1018,9 @@ struct DBOptions { // Default: 0, turned off uint64_t wal_bytes_per_sync; -#ifndef ROCKSDB_LITE // A vector of EventListeners which call-back functions will be called // when specific RocksDB event happens. std::vector> listeners; -#endif // ROCKSDB_LITE // If true, then the status of the threads involved in this DB will // be tracked and available via GetThreadList() API. diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 9e1d7ea6fd412fa1b02f4e3b2386286edfd1e899..8fe18433d9f9bf461fcff39a9262b119267a7902 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -30,10 +30,13 @@ int main() { #define __STDC_FORMAT_MACROS #include -#include #include #include +#include +#include #include +#include + #include #include "db/db_impl.h" #include "db/version_set.h" @@ -777,6 +780,110 @@ struct ThreadState { : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {} }; +class DbStressListener : public EventListener { + public: + DbStressListener( + const std::string& db_name, + const std::vector& db_paths, + const std::vector& cf_descs) : + db_name_(db_name), + db_paths_(db_paths), + cf_descs_(cf_descs), + rand_(301) {} + virtual ~DbStressListener() {} +#ifndef ROCKSDB_LITE + virtual void OnFlushCompleted( + DB* db, const std::string& column_family_name, + const std::string& file_path, + bool triggered_writes_slowdown, + bool triggered_writes_stop) override { + assert(db); + assert(db->GetName() == db_name_); + assert(IsValidColumnFamilyName(column_family_name)); + VerifyFilePath(file_path); + // pretending doing some work here + std::this_thread::sleep_for( + std::chrono::microseconds(rand_.Uniform(5000))); + } + + virtual void OnCompactionCompleted( + DB *db, const CompactionJobInfo& ci) { + assert(db); + assert(db->GetName() == db_name_); + assert(IsValidColumnFamilyName(ci.cf_name)); + assert(ci.input_files.size() + ci.output_files.size() > 0U); + for (const auto& file_path : ci.input_files) { + VerifyFilePath(file_path); + } + for (const auto& file_path : ci.output_files) { + VerifyFilePath(file_path); + } + // pretending doing some work here + std::this_thread::sleep_for( + std::chrono::microseconds(rand_.Uniform(5000))); + } + + protected: + bool IsValidColumnFamilyName(const std::string& cf_name) const { + if (cf_name == kDefaultColumnFamilyName) { + return true; + } + for (const auto& cf_desc : cf_descs_) { + if (cf_desc.name == cf_name) { + return true; + } + } + fprintf(stderr, + "Unable to find the matched column family name " + "for CF: %s. Existing CF names are:\n", + cf_name.c_str()); + for (const auto& cf_desc : cf_descs_) { + fprintf(stderr, " %s\n", cf_desc.name.c_str()); + } + fflush(stderr); + return false; + } + + void VerifyFileDir(const std::string& file_dir) { + if (db_name_ == file_dir) { + return; + } + for (const auto& db_path : db_paths_) { + if (db_path.path == file_dir) { + return; + } + } + assert(false); + } + + void VerifyFileName(const std::string& file_name) { + uint64_t file_number; + FileType file_type; + bool result = ParseFileName(file_name, &file_number, &file_type); + assert(result); + assert(file_type == kTableFile); + } + + void VerifyFilePath(const std::string& file_path) { + size_t pos = file_path.find_last_of("/"); + if (pos == std::string::npos) { + VerifyFileName(file_path); + } else { + if (pos > 0) { + VerifyFileDir(file_path.substr(0, pos)); + } + VerifyFileName(file_path.substr(pos)); + } + } +#endif // !ROCKSDB_LITE + + private: + std::string db_name_; + std::vector db_paths_; + std::vector cf_descs_; + Random rand_; +}; + } // namespace class StressTest { @@ -1913,6 +2020,9 @@ class StressTest { cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); column_family_names_.push_back(name); } + options_.listeners.clear(); + options_.listeners.emplace_back( + new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors)); options_.create_missing_column_families = true; s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, &column_families_, &db_); diff --git a/util/options.cc b/util/options.cc index 279cd5a001d0a3739a9b27fe2223a51d35c0e44a..3e3650dd7b93324c4f05e25814e867c2f35b4616 100644 --- a/util/options.cc +++ b/util/options.cc @@ -70,15 +70,9 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) options.level_compaction_dynamic_level_bytes), access_hint_on_compaction_start(options.access_hint_on_compaction_start), num_levels(options.num_levels), - optimize_filters_for_hits(options.optimize_filters_for_hits) -#ifndef ROCKSDB_LITE - , + optimize_filters_for_hits(options.optimize_filters_for_hits), listeners(options.listeners) { } -#else // ROCKSDB_LITE -{ -} -#endif // ROCKSDB_LITE ColumnFamilyOptions::ColumnFamilyOptions() : comparator(BytewiseComparator()), @@ -247,9 +241,7 @@ DBOptions::DBOptions() use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), -#ifndef ROCKSDB_LITE listeners(), -#endif enable_thread_tracking(false) { } @@ -294,9 +286,7 @@ DBOptions::DBOptions(const Options& options) use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), -#ifndef ROCKSDB_LITE listeners(options.listeners), -#endif enable_thread_tracking(options.enable_thread_tracking) {} static const char* const access_hints[] = {