提交 79dddaee 编写于 作者: A Andrew Kryczka 提交者: 奏之章

[cherry-pick] finish period_schedule cherry-pick, todo update unit test

上级 917c8a8b
......@@ -568,6 +568,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/periodic_work_scheduler.cc
db/range_del_aggregator.cc
db/range_tombstone_fragmenter.cc
db/repair.cc
......@@ -605,7 +606,6 @@ set(SOURCES
monitoring/perf_level.cc
monitoring/persistent_stats_history.cc
monitoring/statistics.cc
monitoring/stats_dump_scheduler.cc
monitoring/thread_status_impl.cc
monitoring/thread_status_updater.cc
monitoring/thread_status_util.cc
......@@ -922,7 +922,7 @@ IF(WITH_TESTS OR WITH_TOOLS)
add_library(testharness OBJECT util/testharness.cc)
ENDIF()
# db/periodic_work_scheduler_test.cc
if(WITH_TESTS)
set(TESTS
cache/cache_test.cc
......
......@@ -1965,7 +1965,7 @@ blob_file_garbage_test: $(OBJ_DIR)/db/blob/blob_file_garbage_test.o $(TEST_LIBRA
timer_test: $(OBJ_DIR)/util/timer_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
stats_dump_scheduler_test: $(OBJ_DIR)/monitoring/stats_dump_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY)
periodic_work_scheduler_test: $(OBJ_DIR)/db/periodic_work_scheduler_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
testutil_test: $(OBJ_DIR)/test_util/testutil_test.o $(TEST_LIBRARY) $(LIBRARY)
......
......@@ -78,6 +78,297 @@ if sanitizer == "":
cpp_library(
name = "rocksdb_lib",
srcs = [
<<<<<<< HEAD
=======
"cache/cache.cc",
"cache/clock_cache.cc",
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_file_addition.cc",
"db/blob/blob_file_builder.cc",
"db/blob/blob_file_garbage.cc",
"db/blob/blob_file_meta.cc",
"db/blob/blob_log_format.cc",
"db/blob/blob_log_reader.cc",
"db/blob/blob_log_writer.cc",
"db/builder.cc",
"db/c.cc",
"db/column_family.cc",
"db/compacted_db_impl.cc",
"db/compaction/compaction.cc",
"db/compaction/compaction_iterator.cc",
"db/compaction/compaction_job.cc",
"db/compaction/compaction_picker.cc",
"db/compaction/compaction_picker_fifo.cc",
"db/compaction/compaction_picker_level.cc",
"db/compaction/compaction_picker_universal.cc",
"db/compaction/sst_partitioner.cc",
"db/convenience.cc",
"db/db_filesnapshot.cc",
"db/db_impl/db_impl.cc",
"db/db_impl/db_impl_compaction_flush.cc",
"db/db_impl/db_impl_debug.cc",
"db/db_impl/db_impl_experimental.cc",
"db/db_impl/db_impl_files.cc",
"db/db_impl/db_impl_open.cc",
"db/db_impl/db_impl_readonly.cc",
"db/db_impl/db_impl_secondary.cc",
"db/db_impl/db_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
"db/dbformat.cc",
"db/error_handler.cc",
"db/event_helpers.cc",
"db/experimental.cc",
"db/external_sst_file_ingestion_job.cc",
"db/file_indexer.cc",
"db/flush_job.cc",
"db/flush_scheduler.cc",
"db/forward_iterator.cc",
"db/import_column_family_job.cc",
"db/internal_stats.cc",
"db/log_reader.cc",
"db/log_writer.cc",
"db/logs_with_prep_tracker.cc",
"db/malloc_stats.cc",
"db/memtable.cc",
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/output_validator.cc",
"db/periodic_work_scheduler.cc",
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",
"db/snapshot_impl.cc",
"db/table_cache.cc",
"db/table_properties_collector.cc",
"db/transaction_log_impl.cc",
"db/trim_history_scheduler.cc",
"db/version_builder.cc",
"db/version_edit.cc",
"db/version_edit_handler.cc",
"db/version_set.cc",
"db/wal_edit.cc",
"db/wal_manager.cc",
"db/write_batch.cc",
"db/write_batch_base.cc",
"db/write_controller.cc",
"db/write_thread.cc",
"env/env.cc",
"env/env_chroot.cc",
"env/env_encryption.cc",
"env/env_hdfs.cc",
"env/env_posix.cc",
"env/file_system.cc",
"env/file_system_tracer.cc",
"env/fs_posix.cc",
"env/io_posix.cc",
"env/mock_env.cc",
"file/delete_scheduler.cc",
"file/file_prefetch_buffer.cc",
"file/file_util.cc",
"file/filename.cc",
"file/random_access_file_reader.cc",
"file/read_write_util.cc",
"file/readahead_raf.cc",
"file/sequence_file_reader.cc",
"file/sst_file_manager_impl.cc",
"file/writable_file_writer.cc",
"logging/auto_roll_logger.cc",
"logging/event_logger.cc",
"logging/log_buffer.cc",
"memory/arena.cc",
"memory/concurrent_arena.cc",
"memory/jemalloc_nodump_allocator.cc",
"memory/memkind_kmem_allocator.cc",
"memtable/alloc_tracker.cc",
"memtable/hash_linklist_rep.cc",
"memtable/hash_skiplist_rep.cc",
"memtable/skiplistrep.cc",
"memtable/vectorrep.cc",
"memtable/write_buffer_manager.cc",
"monitoring/histogram.cc",
"monitoring/histogram_windowing.cc",
"monitoring/in_memory_stats_history.cc",
"monitoring/instrumented_mutex.cc",
"monitoring/iostats_context.cc",
"monitoring/perf_context.cc",
"monitoring/perf_level.cc",
"monitoring/persistent_stats_history.cc",
"monitoring/statistics.cc",
"monitoring/thread_status_impl.cc",
"monitoring/thread_status_updater.cc",
"monitoring/thread_status_updater_debug.cc",
"monitoring/thread_status_util.cc",
"monitoring/thread_status_util_debug.cc",
"options/cf_options.cc",
"options/configurable.cc",
"options/db_options.cc",
"options/options.cc",
"options/options_helper.cc",
"options/options_parser.cc",
"port/port_posix.cc",
"port/stack_trace.cc",
"port/win/env_default.cc",
"port/win/env_win.cc",
"port/win/io_win.cc",
"port/win/port_win.cc",
"port/win/win_logger.cc",
"port/win/win_thread.cc",
"table/adaptive/adaptive_table_factory.cc",
"table/block_based/binary_search_index_reader.cc",
"table/block_based/block.cc",
"table/block_based/block_based_filter_block.cc",
"table/block_based/block_based_table_builder.cc",
"table/block_based/block_based_table_factory.cc",
"table/block_based/block_based_table_iterator.cc",
"table/block_based/block_based_table_reader.cc",
"table/block_based/block_builder.cc",
"table/block_based/block_prefetcher.cc",
"table/block_based/block_prefix_index.cc",
"table/block_based/data_block_footer.cc",
"table/block_based/data_block_hash_index.cc",
"table/block_based/filter_block_reader_common.cc",
"table/block_based/filter_policy.cc",
"table/block_based/flush_block_policy.cc",
"table/block_based/full_filter_block.cc",
"table/block_based/hash_index_reader.cc",
"table/block_based/index_builder.cc",
"table/block_based/index_reader_common.cc",
"table/block_based/parsed_full_filter_block.cc",
"table/block_based/partitioned_filter_block.cc",
"table/block_based/partitioned_index_iterator.cc",
"table/block_based/partitioned_index_reader.cc",
"table/block_based/reader_common.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/cuckoo/cuckoo_table_builder.cc",
"table/cuckoo/cuckoo_table_factory.cc",
"table/cuckoo/cuckoo_table_reader.cc",
"table/format.cc",
"table/get_context.cc",
"table/iterator.cc",
"table/merging_iterator.cc",
"table/meta_blocks.cc",
"table/persistent_cache_helper.cc",
"table/plain/plain_table_bloom.cc",
"table/plain/plain_table_builder.cc",
"table/plain/plain_table_factory.cc",
"table/plain/plain_table_index.cc",
"table/plain/plain_table_key_coding.cc",
"table/plain/plain_table_reader.cc",
"table/sst_file_dumper.cc",
"table/sst_file_reader.cc",
"table/sst_file_writer.cc",
"table/table_factory.cc",
"table/table_properties.cc",
"table/two_level_iterator.cc",
"test_util/sync_point.cc",
"test_util/sync_point_impl.cc",
"test_util/transaction_test_util.cc",
"tools/dump/db_dump_tool.cc",
"tools/io_tracer_parser_tool.cc",
"tools/ldb_cmd.cc",
"tools/ldb_tool.cc",
"tools/sst_dump_tool.cc",
"trace_replay/block_cache_tracer.cc",
"trace_replay/io_tracer.cc",
"trace_replay/trace_replay.cc",
"util/build_version.cc",
"util/coding.cc",
"util/compaction_job_stats_impl.cc",
"util/comparator.cc",
"util/compression_context_cache.cc",
"util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc",
"util/dynamic_bloom.cc",
"util/file_checksum_helper.cc",
"util/hash.cc",
"util/murmurhash.cc",
"util/random.cc",
"util/rate_limiter.cc",
"util/slice.cc",
"util/status.cc",
"util/string_util.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/xxhash.cc",
"utilities/backupable/backupable_db.cc",
"utilities/blob_db/blob_compaction_filter.cc",
"utilities/blob_db/blob_db.cc",
"utilities/blob_db/blob_db_impl.cc",
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
"utilities/blob_db/blob_dump_tool.cc",
"utilities/blob_db/blob_file.cc",
"utilities/cassandra/cassandra_compaction_filter.cc",
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
"utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc",
"utilities/debug.cc",
"utilities/env_mirror.cc",
"utilities/env_timed.cc",
"utilities/fault_injection_env.cc",
"utilities/fault_injection_fs.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",
"utilities/merge_operators/put.cc",
"utilities/merge_operators/sortlist.cc",
"utilities/merge_operators/string_append/stringappend.cc",
"utilities/merge_operators/string_append/stringappend2.cc",
"utilities/merge_operators/uint64add.cc",
"utilities/object_registry.cc",
"utilities/option_change_migration/option_change_migration.cc",
"utilities/options/options_util.cc",
"utilities/persistent_cache/block_cache_tier.cc",
"utilities/persistent_cache/block_cache_tier_file.cc",
"utilities/persistent_cache/block_cache_tier_metadata.cc",
"utilities/persistent_cache/persistent_cache_tier.cc",
"utilities/persistent_cache/volatile_tier_impl.cc",
"utilities/simulator_cache/cache_simulator.cc",
"utilities/simulator_cache/sim_cache.cc",
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/trace/file_trace_reader_writer.cc",
"utilities/transactions/lock/lock_tracker.cc",
"utilities/transactions/lock/point_lock_tracker.cc",
"utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/pessimistic_transaction.cc",
"utilities/transactions/pessimistic_transaction_db.cc",
"utilities/transactions/snapshot_checker.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_txn.cc",
"utilities/transactions/write_prepared_txn_db.cc",
"utilities/transactions/write_unprepared_txn.cc",
"utilities/transactions/write_unprepared_txn_db.cc",
"utilities/ttl/db_ttl_impl.cc",
"utilities/write_batch_with_index/write_batch_with_index.cc",
"utilities/write_batch_with_index/write_batch_with_index_internal.cc",
],
auto_headers = AutoHeaders.RECURSIVE_GLOB,
arch_preprocessor_flags = ROCKSDB_ARCH_PREPROCESSOR_FLAGS,
compiler_flags = ROCKSDB_COMPILER_FLAGS,
os_deps = ROCKSDB_OS_DEPS,
os_preprocessor_flags = ROCKSDB_OS_PREPROCESSOR_FLAGS,
preprocessor_flags = ROCKSDB_PREPROCESSOR_FLAGS,
deps = [],
external_deps = ROCKSDB_EXTERNAL_DEPS,
link_whole = False,
)
cpp_library(
name = "rocksdb_whole_archive_lib",
srcs = [
"cache/cache.cc",
>>>>>>> 1e0090973... Periodically flush info log out of application buffer (#7488)
"cache/clock_cache.cc",
"cache/lru_cache.cc",
"cache/sharded_cache.cc",
......@@ -122,6 +413,11 @@ cpp_library(
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
<<<<<<< HEAD
=======
"db/output_validator.cc",
"db/periodic_work_scheduler.cc",
>>>>>>> 1e0090973... Periodically flush info log out of application buffer (#7488)
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",
......@@ -160,7 +456,6 @@ cpp_library(
"monitoring/perf_level.cc",
"monitoring/persistent_stats_history.cc",
"monitoring/statistics.cc",
"monitoring/stats_dump_scheduler.cc",
"monitoring/thread_status_impl.cc",
"monitoring/thread_status_updater.cc",
"monitoring/thread_status_updater_debug.cc",
......@@ -905,6 +1200,13 @@ ROCKS_TESTS = [
"db/perf_context_test.cc",
"serial",
],
[
"periodic_work_scheduler_test",
"db/periodic_work_scheduler_test.cc",
"serial",
[],
[],
],
[
"persistent_cache_test",
"utilities/persistent_cache/persistent_cache_test.cc",
......@@ -987,13 +1289,6 @@ ROCKS_TESTS = [
[],
[],
],
[
"stats_dump_scheduler_test",
"monitoring/stats_dump_scheduler_test.cc",
"serial",
[],
[],
],
[
"stats_history_test",
"monitoring/stats_history_test.cc",
......
......@@ -208,7 +208,7 @@ Status CompactedDBImpl::Open(const Options& options, const std::string& dbname,
std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
Status s = db->Init(options);
if (s.ok()) {
db->StartStatsDumpScheduler();
db->StartPeriodicWorkScheduler();
ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
"Opened the db as fully compacted mode");
LogFlush(db->immutable_db_options_.info_log);
......
......@@ -46,6 +46,7 @@
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/periodic_work_scheduler.h"
#include "db/range_tombstone_fragmenter.h"
#include "db/table_cache.h"
#include "db/table_properties_collector.h"
......@@ -58,7 +59,6 @@
#include "monitoring/in_memory_stats_history.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/stats_dump_scheduler.h"
#include "monitoring/persistent_stats_history.h"
#include "monitoring/thread_status_updater.h"
#include "monitoring/thread_status_util.h"
......@@ -262,7 +262,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
refitting_level_(false),
opened_successfully_(false),
#ifndef ROCKSDB_LITE
stats_dump_scheduler_(nullptr),
periodic_work_scheduler_(nullptr),
#endif // ROCKSDB_LITE
two_write_queues_(options.two_write_queues),
manual_wal_flush_(options.manual_wal_flush),
......@@ -513,8 +513,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
"Shutdown: canceling all background work");
#ifndef ROCKSDB_LITE
if (stats_dump_scheduler_ != nullptr) {
stats_dump_scheduler_->Unregister(this);
if (periodic_work_scheduler_ != nullptr) {
periodic_work_scheduler_->Unregister(this);
}
#endif // !ROCKSDB_LITE
......@@ -773,18 +773,18 @@ void DBImpl::PrintStatistics() {
}
}
void DBImpl::StartStatsDumpScheduler() {
void DBImpl::StartPeriodicWorkScheduler() {
#ifndef ROCKSDB_LITE
{
InstrumentedMutexLock l(&mutex_);
stats_dump_scheduler_ = StatsDumpScheduler::Default();
TEST_SYNC_POINT_CALLBACK("DBImpl::StartStatsDumpScheduler:Init",
&stats_dump_scheduler_);
periodic_work_scheduler_ = PeriodicWorkScheduler::Default();
TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init",
&periodic_work_scheduler_);
}
stats_dump_scheduler_->Register(this,
mutable_db_options_.stats_dump_period_sec,
mutable_db_options_.stats_persist_period_sec);
periodic_work_scheduler_->Register(
this, mutable_db_options_.stats_dump_period_sec,
mutable_db_options_.stats_persist_period_sec);
#endif // !ROCKSDB_LITE
}
......@@ -1045,6 +1045,13 @@ void DBImpl::ScheduleBgFree(JobContext* job_context, SuperVersion* sv) {
SchedulePurge();
}
}
void DBImpl::FlushInfoLog() {
if (shutdown_initiated_) {
return;
}
TEST_SYNC_POINT("DBImpl::FlushInfoLog:StartRunning");
LogFlush(immutable_db_options_.info_log);
}
Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
assert(cfd);
......@@ -1170,19 +1177,12 @@ Status DBImpl::SetDBOptions(
mutable_db_options_.stats_dump_period_sec ||
new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
if (stats_dump_scheduler_) {
mutex_.Unlock();
stats_dump_scheduler_->Unregister(this);
mutex_.Lock();
}
}
if (new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
mutex_.Unlock();
stats_dump_scheduler_->Register(this,
new_options.stats_dump_period_sec,
new_options.stats_persist_period_sec);
mutex_.Lock();
mutex_.Unlock();
periodic_work_scheduler_->Unregister(this);
periodic_work_scheduler_->Register(
this, new_options.stats_dump_period_sec,
new_options.stats_persist_period_sec);
mutex_.Lock();
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
......
......@@ -69,9 +69,9 @@ class ArenaWrappedDBIter;
class InMemoryStatsHistoryIterator;
class MemTable;
class PersistentStatsHistoryIterator;
class StatsDumpScheduler;
class PeriodicWorkScheduler;
#ifndef NDEBUG
class StatsDumpTestScheduler;
class PeriodicWorkTestScheduler;
#endif // !NDEBUG
class TableCache;
......@@ -398,14 +398,6 @@ class DBImpl : public DB {
LogsWithPrepTracker* logs_with_prep_tracker() {
return &logs_with_prep_tracker_;
}
// persist stats to column family "_persistent_stats"
void PersistStats();
// dump rocksdb.stats to LOG
void DumpStats();
//
void ScheduleGCTTL();
#ifndef NDEBUG
// Extra methods (for testing) that are not in the public DB interface
......@@ -790,10 +782,21 @@ class DBImpl : public DB {
VersionSet* TEST_GetVersionSet() const { return versions_.get(); }
#ifndef ROCKSDB_LITE
StatsDumpTestScheduler* TEST_GetStatsDumpScheduler() const;
PeriodicWorkTestScheduler* TEST_GetPeriodicWorkScheduler() const;
#endif // !ROCKSDB_LITE
#endif // NDEBUG
// persist stats to column family "_persistent_stats"
void PersistStats();
// dump rocksdb.stats to LOG
void DumpStats();
// flush LOG out of application buffer
void FlushInfoLog();
void ScheduleGCTTL();
protected:
Env* const env_;
const std::string dbname_;
......@@ -1239,7 +1242,7 @@ class DBImpl : public DB {
bool* sfm_bookkeeping, LogBuffer* log_buffer);
// Schedule background tasks
void StartStatsDumpScheduler();
void StartPeriodicWorkScheduler();
void PrintStatistics();
......@@ -1733,10 +1736,11 @@ class DBImpl : public DB {
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
#ifndef ROCKSDB_LITE
// Scheduler to run DumpStats() and PersistStats(). Currently, it always use
// a global instance from StatsDumpScheduler::Default(). Only in unittest, it
// can be overrided by StatsDumpTestSchduler.
StatsDumpScheduler* stats_dump_scheduler_;
// Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog().
// Currently, it always use a global instance from
// PeriodicWorkScheduler::Default(). Only in unittest, it can be overrided by
// PeriodicWorkTestScheduler.
PeriodicWorkScheduler* periodic_work_scheduler_;
#endif
// When set, we use a separate queue for writes that dont write to memtable.
......
......@@ -11,7 +11,7 @@
#include "db/db_impl.h"
#include "db/error_handler.h"
#include "monitoring/stats_dump_scheduler.h"
#include "db/periodic_work_scheduler.h"
#include "monitoring/thread_status_updater.h"
#include "rocksdb/terark_namespace.h"
......@@ -254,14 +254,14 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
#ifndef ROCKSDB_LITE
void DBImpl::TEST_WaitForStatsDumpRun(std::function<void()> callback) const {
if (stats_dump_scheduler_ != nullptr) {
static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_)
if (periodic_work_scheduler_ != nullptr) {
static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_)
->TEST_WaitForRun(callback);
}
}
StatsDumpTestScheduler* DBImpl::TEST_GetStatsDumpScheduler() const {
return static_cast<StatsDumpTestScheduler*>(stats_dump_scheduler_);
PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const {
return static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_);
}
#endif // !ROCKSDB_LITE
......
......@@ -2,11 +2,10 @@
// This source code is licensed under Apache 2.0 License.
#include "util/testharness.h"
#include "util/testutil.h"
#include "db/db_test_util.h"
#include "util/string_util.h"
#include "db/db_impl.h"
#include "util/sync_point.h"
#include "db/db_test_util.h"
namespace rocksdb {
......@@ -67,7 +66,7 @@ TEST_F(DBImplGCTTL_Test, L0FileExpiredTest) {
}
dbfull()->Flush(FlushOptions());
}
// dbfull()->StartPeriodicWorkScheduler();
ASSERT_EQ(10, dbfull()->GetDBOptions().stats_dump_period_sec);
dbfull()->TEST_WaitForStatsDumpRun([&] { mock_env->set_current_time(ttl); });
ASSERT_TRUE(flag);
......
......@@ -16,6 +16,7 @@
#include "db/builder.h"
#include "db/error_handler.h"
#include "db/map_builder.h"
#include "monitoring/persistent_stats_history.h"
#include "options/options_helper.h"
#include "rocksdb/wal_filter.h"
......@@ -1513,7 +1514,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}
if (s.ok()) {
impl->StartStatsDumpScheduler();
impl->StartPeriodicWorkScheduler();
} else {
for (auto* h : *handles) {
delete h;
......
......@@ -39,6 +39,7 @@
#include "rocksdb/sst_file_writer.h"
#include "rocksdb/statistics.h"
#include "rocksdb/table.h"
#include "rocksdb/terark_namespace.h"
#include "rocksdb/utilities/checkpoint.h"
#include "table/block_based_table_factory.h"
#include "table/mock_table.h"
......@@ -48,14 +49,12 @@
#include "util/filename.h"
#include "util/mock_time_env.h"
#include "util/mutexlock.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
#include "rocksdb/terark_namespace.h"
namespace TERARKDB_NAMESPACE {
namespace anon {
......@@ -290,9 +289,7 @@ class SpecialEnv : public EnvWrapper {
Env::IOPriority GetIOPriority() override {
return base_->GetIOPriority();
}
bool use_direct_io() const override {
return base_->use_direct_io();
}
bool use_direct_io() const override { return base_->use_direct_io(); }
Status Allocate(uint64_t offset, uint64_t len) override {
return base_->Allocate(offset, len);
}
......@@ -981,28 +978,4 @@ class DBTestBase : public testing::Test {
}
};
class SafeMockTimeEnv : public MockTimeEnv {
public:
explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
// that on some platforms, pthread_cond_timedwait does not appear to
// release the lock for other threads to operate if the deadline time
// is already passed. (TimedWait calls are currently a bad abstraction
// because the deadline parameter is usually computed from Env time,
// but is interpreted in real clock time.)
SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < this->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
SyncPoint::GetInstance()->EnableProcessing();
}
};
} // namespace TERARKDB_NAMESPACE
......@@ -3,7 +3,7 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h"
#include "db/periodic_work_scheduler.h"
#include "db/db_impl.h"
#include "util/cast_util.h"
......@@ -11,16 +11,16 @@
#ifndef ROCKSDB_LITE
namespace rocksdb {
StatsDumpScheduler::StatsDumpScheduler(Env* env) {
PeriodicWorkScheduler::PeriodicWorkScheduler(Env* env) {
timer = std::unique_ptr<Timer>(new Timer(env));
}
void StatsDumpScheduler::Register(DBImpl* dbi,
unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec) {
void PeriodicWorkScheduler::Register(DBImpl* dbi,
unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec) {
static std::atomic<uint64_t> initial_delay(0);
timer->Start();
if (stats_dump_period_sec > 0) {
timer->Start();
timer->Add([dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"),
initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_dump_period_sec) *
......@@ -28,33 +28,38 @@ void StatsDumpScheduler::Register(DBImpl* dbi,
static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond);
}
if (stats_persist_period_sec > 0) {
timer->Start();
timer->Add(
[dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"),
initial_delay.fetch_add(1) %
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond,
static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond);
}
timer->Add([dbi]() { dbi->FlushInfoLog(); },
GetTaskName(dbi, "flush_info_log"),
initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec *
kMicrosInSecond,
kDefaultFlushInfoLogPeriodSec * kMicrosInSecond);
}
void StatsDumpScheduler::Unregister(DBImpl* dbi) {
void PeriodicWorkScheduler::Unregister(DBImpl* dbi) {
timer->Cancel(GetTaskName(dbi, "dump_st"));
timer->Cancel(GetTaskName(dbi, "pst_st"));
timer->Cancel(GetTaskName(dbi, "flush_info_log"));
if (!timer->HasPendingTask()) {
timer->Shutdown();
}
}
StatsDumpScheduler* StatsDumpScheduler::Default() {
PeriodicWorkScheduler* PeriodicWorkScheduler::Default() {
// Always use the default Env for the scheduler, as we only use the NowMicros
// which is the same for all env.
// The Env could only be overridden in test.
static StatsDumpScheduler scheduler(Env::Default());
static PeriodicWorkScheduler scheduler(Env::Default());
return &scheduler;
}
std::string StatsDumpScheduler::GetTaskName(DBImpl* dbi,
const std::string& func_name) {
std::string PeriodicWorkScheduler::GetTaskName(DBImpl* dbi,
const std::string& func_name) {
std::string db_session_id;
dbi->GetDbSessionId(db_session_id);
return db_session_id + ":" + func_name;
......@@ -66,8 +71,8 @@ std::string StatsDumpScheduler::GetTaskName(DBImpl* dbi,
// timer, so only re-create it when there's no running task. Otherwise, return
// the existing scheduler. Which means if the unittest needs to update MockEnv,
// Close all db instances and then re-open them.
StatsDumpTestScheduler* StatsDumpTestScheduler::Default(Env* env) {
static StatsDumpTestScheduler scheduler(env);
PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default(Env* env) {
static PeriodicWorkTestScheduler scheduler(env);
static port::Mutex mutex;
{
MutexLock l(&mutex);
......@@ -80,22 +85,22 @@ StatsDumpTestScheduler* StatsDumpTestScheduler::Default(Env* env) {
return &scheduler;
}
void StatsDumpTestScheduler::TEST_WaitForRun(
void PeriodicWorkTestScheduler::TEST_WaitForRun(
std::function<void()> callback) const {
if (timer != nullptr) {
timer->TEST_WaitForRun(callback);
}
}
size_t StatsDumpTestScheduler::TEST_GetValidTaskNum() const {
size_t PeriodicWorkTestScheduler::TEST_GetValidTaskNum() const {
if (timer != nullptr) {
return timer->TEST_GetPendingTaskNum();
}
return 0;
}
StatsDumpTestScheduler::StatsDumpTestScheduler(Env* env)
: StatsDumpScheduler(env) {}
PeriodicWorkTestScheduler::PeriodicWorkTestScheduler(Env* env)
: PeriodicWorkScheduler(env) {}
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "db/db_impl.h"
#include "util/timer.h"
namespace rocksdb {
// PeriodicWorkScheduler is a singleton object, which is scheduling/running
// DumpStats(), PersistStats(), and FlushInfoLog() for all DB instances. All DB
// instances use the same object from `Default()`.
//
// Internally, it uses a single threaded timer object to run the periodic work
// functions. Timer thread will always be started since the info log flushing
// cannot be disabled.
class PeriodicWorkScheduler {
public:
static PeriodicWorkScheduler* Default();
PeriodicWorkScheduler() = delete;
PeriodicWorkScheduler(const PeriodicWorkScheduler&) = delete;
PeriodicWorkScheduler(PeriodicWorkScheduler&&) = delete;
PeriodicWorkScheduler& operator=(const PeriodicWorkScheduler&) = delete;
PeriodicWorkScheduler& operator=(PeriodicWorkScheduler&&) = delete;
void Register(DBImpl* dbi, unsigned int stats_dump_period_sec,
unsigned int stats_persist_period_sec);
void Unregister(DBImpl* dbi);
// Periodically flush info log out of application buffer at a low frequency.
// This improves debuggability in case of RocksDB hanging since it ensures the
// log messages leading up to the hang will eventually become visible in the
// log.
static const uint64_t kDefaultFlushInfoLogPeriodSec = 10;
protected:
std::unique_ptr<Timer> timer;
explicit PeriodicWorkScheduler(Env* env);
private:
std::string GetTaskName(DBImpl* dbi, const std::string& func_name);
};
#ifndef NDEBUG
// PeriodicWorkTestScheduler is for unittest, which can specify the Env like
// SafeMockTimeEnv. It also contains functions for unittest.
class PeriodicWorkTestScheduler : public PeriodicWorkScheduler {
public:
static PeriodicWorkTestScheduler* Default(Env* env);
void TEST_WaitForRun(std::function<void()> callback) const;
size_t TEST_GetValidTaskNum() const;
private:
explicit PeriodicWorkTestScheduler(Env* env);
};
#endif // !NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
......@@ -3,35 +3,36 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "monitoring/stats_dump_scheduler.h"
#include "db/periodic_work_scheduler.h"
#include "db/db_test_util.h"
namespace ROCKSDB_NAMESPACE {
namespace rocksdb {
#ifndef ROCKSDB_LITE
class StatsDumpSchedulerTest : public DBTestBase {
class PeriodicWorkSchedulerTest : public DBTestBase {
public:
StatsDumpSchedulerTest()
: DBTestBase("/stats_dump_scheduler_test"),
mock_env_(new SafeMockTimeEnv(Env::Default())) {}
PeriodicWorkSchedulerTest()
: DBTestBase("/periodic_work_scheduler_test"),
mock_env_(new MockTimeEnv(Env::Default())) {}
protected:
std::unique_ptr<SafeMockTimeEnv> mock_env_;
void SetUp() override {
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::StartStatsDumpScheduler:Init", [&](void* arg) {
auto* stats_dump_scheduler_ptr =
reinterpret_cast<StatsDumpScheduler**>(arg);
*stats_dump_scheduler_ptr =
StatsDumpTestScheduler::Default(mock_env_.get());
"DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) {
auto* periodic_work_scheduler_ptr =
reinterpret_cast<PeriodicWorkScheduler**>(arg);
*periodic_work_scheduler_ptr =
PeriodicWorkTestScheduler::Default(mock_env_.get());
});
}
};
TEST_F(StatsDumpSchedulerTest, Basic) {
constexpr int kPeriodSec = 5;
TEST_F(PeriodicWorkSchedulerTest, Basic) {
constexpr unsigned int kPeriodSec =
PeriodicWorkScheduler::kDefaultFlushInfoLogPeriodSec;
Close();
Options options;
options.stats_dump_period_sec = kPeriodSec;
......@@ -48,37 +49,46 @@ TEST_F(StatsDumpSchedulerTest, Basic) {
int pst_st_counter = 0;
SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:StartRunning",
[&](void*) { pst_st_counter++; });
int flush_info_log_counter = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushInfoLog:StartRunning",
[&](void*) { flush_info_log_counter++; });
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec);
ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec);
mock_time_sec += kPeriodSec - 1;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_GT(kPeriodSec, 1u);
dbfull()->TEST_WaitForStatsDumpRun([&] {
mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec) - 1);
});
auto scheduler = dbfull()->TEST_GetStatsDumpScheduler();
auto scheduler = dbfull()->TEST_GetPeriodicWorkScheduler();
ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum());
ASSERT_EQ(3, scheduler->TEST_GetValidTaskNum());
ASSERT_EQ(1, dump_st_counter);
ASSERT_EQ(1, pst_st_counter);
ASSERT_EQ(1, flush_info_log_counter);
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
[&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(2, dump_st_counter);
ASSERT_EQ(2, pst_st_counter);
ASSERT_EQ(2, flush_info_log_counter);
mock_time_sec += kPeriodSec;
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
[&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(3, flush_info_log_counter);
// Disable scheduler with SetOption
ASSERT_OK(dbfull()->SetDBOptions(
......@@ -86,28 +96,34 @@ TEST_F(StatsDumpSchedulerTest, Basic) {
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler();
ASSERT_EQ(0u, scheduler->TEST_GetValidTaskNum());
// Info log flush should still run.
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(3, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(4, flush_info_log_counter);
scheduler = dbfull()->TEST_GetPeriodicWorkScheduler();
ASSERT_EQ(1u, scheduler->TEST_GetValidTaskNum());
// Re-enable one task
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "5"}}));
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec);
scheduler = dbfull()->TEST_GetStatsDumpScheduler();
scheduler = dbfull()->TEST_GetPeriodicWorkScheduler();
ASSERT_NE(nullptr, scheduler);
ASSERT_EQ(1, scheduler->TEST_GetValidTaskNum());
dump_st_counter = 0;
mock_time_sec += kPeriodSec;
ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum());
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_env_->set_current_time(mock_time_sec); });
ASSERT_EQ(1, dump_st_counter);
[&] { mock_env_->MockSleepForSeconds(static_cast<int>(kPeriodSec)); });
ASSERT_EQ(4, dump_st_counter);
ASSERT_EQ(3, pst_st_counter);
ASSERT_EQ(5, flush_info_log_counter);
Close();
}
TEST_F(StatsDumpSchedulerTest, MultiInstances) {
TEST_F(PeriodicWorkSchedulerTest, MultiInstances) {
constexpr int kPeriodSec = 5;
const int kInstanceNum = 10;
......@@ -136,8 +152,8 @@ TEST_F(StatsDumpSchedulerTest, MultiInstances) {
}
auto dbi = static_cast_with_check<DBImpl>(dbs[kInstanceNum - 1]);
auto scheduler = dbi->TEST_GetStatsDumpScheduler();
ASSERT_EQ(kInstanceNum * 2, scheduler->TEST_GetValidTaskNum());
auto scheduler = dbi->TEST_GetPeriodicWorkScheduler();
ASSERT_EQ(kInstanceNum * 3, scheduler->TEST_GetValidTaskNum());
int expected_run = kInstanceNum;
mock_time_sec += kPeriodSec - 1;
......@@ -182,7 +198,7 @@ TEST_F(StatsDumpSchedulerTest, MultiInstances) {
}
}
TEST_F(StatsDumpSchedulerTest, MultiEnv) {
TEST_F(PeriodicWorkSchedulerTest, MultiEnv) {
constexpr int kDumpPeriodSec = 5;
constexpr int kPersistPeriodSec = 10;
Close();
......@@ -208,8 +224,8 @@ TEST_F(StatsDumpSchedulerTest, MultiEnv) {
ASSERT_OK(DB::Open(options2, dbname, &db));
DBImpl* dbi = static_cast_with_check<DBImpl>(db);
ASSERT_EQ(dbi->TEST_GetStatsDumpScheduler(),
dbfull()->TEST_GetStatsDumpScheduler());
ASSERT_EQ(dbi->TEST_GetPeriodicWorkScheduler(),
dbfull()->TEST_GetPeriodicWorkScheduler());
db->Close();
delete db;
......
......@@ -22,11 +22,11 @@
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/rate_limiter.h"
#include "test_util/sync_point.h"
#include "test_util/testutil.h"
#include "util/sync_point.h"
#include "util/testutil.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
namespace rocksdb {
#ifndef ROCKSDB_LITE
class StatsHistoryTest : public DBTestBase {
......
......@@ -47,6 +47,11 @@ LIB_SOURCES = \
db/memtable_list.cc \
db/merge_helper.cc \
db/merge_operator.cc \
<<<<<<< HEAD
=======
db/output_validator.cc \
db/periodic_work_scheduler.cc \
>>>>>>> 1e0090973... Periodically flush info log out of application buffer (#7488)
db/range_del_aggregator.cc \
db/range_tombstone_fragmenter.cc \
db/repair.cc \
......@@ -88,7 +93,6 @@ LIB_SOURCES = \
monitoring/perf_level.cc \
monitoring/persistent_stats_history.cc \
monitoring/statistics.cc \
monitoring/stats_dump_scheduler.cc \
monitoring/thread_status_impl.cc \
monitoring/thread_status_updater.cc \
monitoring/thread_status_updater_debug.cc \
......@@ -381,7 +385,11 @@ MAIN_SOURCES = \
db/options_file_test.cc \
db/partitioned_filter_block_test.cc \
db/perf_context_test.cc \
<<<<<<< HEAD
db/persistent_cache_test.cc \
=======
db/periodic_work_scheduler_test.cc \
>>>>>>> 1e0090973... Periodically flush info log out of application buffer (#7488)
db/plain_table_db_test.cc \
db/prefix_test.cc \
db/redis_test.cc \
......@@ -410,8 +418,11 @@ MAIN_SOURCES = \
monitoring/histogram_test.cc \
monitoring/iostats_context_test.cc \
monitoring/statistics_test.cc \
<<<<<<< HEAD
<<<<<<< HEAD
monitoring/stats_dump_scheduler_test.cc \
=======
>>>>>>> 1e0090973... Periodically flush info log out of application buffer (#7488)
monitoring/stats_history_test.cc \
options/options_settable_test.cc
options/options_test.cc \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册