提交 b947b18c 编写于 作者: Z Zhongyi Xie 提交者: 奏之章

cherry-pick c4f5d0aa152a0f0d8d603c91cbc2999993562c17Thu Feb 21 07:52:54 CST...

cherry-pick c4f5d0aa152a0f0d8d603c91cbc2999993562c17Thu Feb 21 07:52:54 CST 2019Zhongyi Xie*add GetStatsHistory to retrieve stats snapshots
上级 5330567c
......@@ -558,6 +558,7 @@ set(SOURCES
db/flush_scheduler.cc
db/forward_iterator.cc
db/internal_stats.cc
db/in_memory_stats_history.cc
db/logs_with_prep_tracker.cc
db/log_reader.cc
db/log_writer.cc
......
此差异已折叠。
......@@ -113,6 +113,7 @@ cpp_library(
"db/flush_job.cc",
"db/flush_scheduler.cc",
"db/forward_iterator.cc",
"db/in_memory_stats_history.cc",
"db/internal_stats.cc",
"db/log_reader.cc",
"db/log_writer.cc",
......
......@@ -37,6 +37,7 @@
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/forward_iterator.h"
#include "db/in_memory_stats_history.h"
#include "db/job_context.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
......@@ -70,6 +71,7 @@
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/write_buffer_manager.h"
......@@ -501,16 +503,15 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Shutdown: canceling all background work");
InstrumentedMutexLock l(&mutex_);
// To avoid deadlock, `thread_dump_stats_->cancel()` needs to be called
// before grabbing db mutex because the actual worker function
// `DBImpl::DumpStats()` also holds db mutex
if (thread_dump_stats_ != nullptr) {
mutex_.Unlock();
thread_dump_stats_->cancel();
mutex_.Lock();
thread_dump_stats_.reset();
}
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->cancel();
thread_persist_stats_.reset();
}
InstrumentedMutexLock l(&mutex_);
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
......@@ -760,6 +761,7 @@ void DBImpl::PrintStatistics() {
void DBImpl::StartTimedTasks() {
unsigned int stats_dump_period_sec = 0;
unsigned int stats_persist_period_sec = 0;
{
InstrumentedMutexLock l(&mutex_);
stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
......@@ -770,8 +772,113 @@ void DBImpl::StartTimedTasks() {
stats_dump_period_sec * 1000000));
}
}
stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
if (stats_persist_period_sec > 0) {
if (!thread_persist_stats_) {
thread_persist_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
stats_persist_period_sec * 1000000));
}
}
}
}
// esitmate the total size of stats_history_
size_t DBImpl::EstiamteStatsHistorySize() const {
size_t size_total =
sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
if (stats_history_.size() == 0) return size_total;
size_t size_per_slice =
sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
// non-empty map, stats_history_.begin() guaranteed to exist
std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
for (const auto& pairs : sample_slice) {
size_per_slice +=
pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
}
size_total = size_per_slice * stats_history_.size();
return size_total;
}
void DBImpl::PersistStats() {
TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
#ifndef ROCKSDB_LITE
if (shutdown_initiated_) {
return;
}
uint64_t now_micros = env_->NowMicros();
Statistics* statistics = immutable_db_options_.statistics.get();
if (!statistics) {
return;
}
size_t stats_history_size_limit = 0;
{
InstrumentedMutexLock l(&mutex_);
stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
}
// TODO(Zhongyi): also persist immutable_db_options_.statistics
{
std::map<std::string, uint64_t> stats_map;
if (!statistics->getTickerMap(&stats_map)) {
return;
}
InstrumentedMutexLock l(&stats_history_mutex_);
// calculate the delta from last time
if (stats_slice_initialized_) {
std::map<std::string, uint64_t> stats_delta;
for (const auto& stat : stats_map) {
if (stats_slice_.find(stat.first) != stats_slice_.end()) {
stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
}
}
stats_history_[now_micros] = stats_delta;
}
stats_slice_initialized_ = true;
std::swap(stats_slice_, stats_map);
TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
// delete older stats snapshots to control memory consumption
bool purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit;
while (purge_needed && !stats_history_.empty()) {
stats_history_.erase(stats_history_.begin());
purge_needed = EstiamteStatsHistorySize() > stats_history_size_limit;
}
}
// TODO: persist stats to disk
#endif // !ROCKSDB_LITE
}
bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
uint64_t* new_time,
std::map<std::string, uint64_t>* stats_map) {
assert(new_time);
assert(stats_map);
if (!new_time || !stats_map) return false;
// lock when search for start_time
{
InstrumentedMutexLock l(&stats_history_mutex_);
auto it = stats_history_.lower_bound(start_time);
if (it != stats_history_.end() && it->first < end_time) {
// make a copy for timestamp and stats_map
*new_time = it->first;
*stats_map = it->second;
return true;
} else {
return false;
}
}
}
Status DBImpl::GetStatsHistory(uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
if (!stats_iterator) {
return Status::InvalidArgument("stats_iterator not preallocated.");
}
stats_iterator->reset(
new InMemoryStatsHistoryIterator(start_time, end_time, this));
return (*stats_iterator)->status();
}
void DBImpl::ScheduleGCTTL() {
TEST_SYNC_POINT("DBImpl:ScheduleGCTTL");
uint64_t mark_count = 0;
......@@ -1031,6 +1138,21 @@ Status DBImpl::SetDBOptions(
thread_dump_stats_.reset();
}
}
if (new_options.stats_persist_period_sec !=
mutable_db_options_.stats_persist_period_sec) {
if (thread_persist_stats_) {
mutex_.Unlock();
thread_persist_stats_->cancel();
mutex_.Lock();
}
if (new_options.stats_persist_period_sec > 0) {
thread_persist_stats_.reset(new rocksdb::RepeatableThread(
[this]() { DBImpl::PersistStats(); }, "pst_st", env_,
new_options.stats_persist_period_sec * 1000000));
} else {
thread_persist_stats_.reset();
}
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
......
......@@ -66,6 +66,7 @@ namespace TERARKDB_NAMESPACE {
class Arena;
class ArenaWrappedDBIter;
class InMemoryStatsHistoryIterator;
class MemTable;
class TableCache;
class Version;
......@@ -470,7 +471,10 @@ class DBImpl : public DB {
int TEST_BGGarbageCollectionAllowed() const;
int TEST_BGFlushesAllowed() const;
size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
void TEST_WaitForTimedTaskRun(std::function<void()> callback) const;
void TEST_WaitForDumpStatsRun(std::function<void()> callback) const;
void TEST_WaitForPersistStatsRun(std::function<void()> callback) const;
bool TEST_IsPersistentStatsEnabled() const;
size_t TEST_EstiamteStatsHistorySize() const;
#endif // NDEBUG
......@@ -737,6 +741,17 @@ class DBImpl : public DB {
static Status CreateAndNewDirectory(Env* env, const std::string& dirname,
std::unique_ptr<Directory>* directory);
// Given a time window, return an iterator for accessing stats history
Status GetStatsHistory(
uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
// find stats map from stats_history_ with smallest timestamp in
// the range of [start_time, end_time)
bool FindStatsByTime(uint64_t start_time, uint64_t end_time,
uint64_t* new_time,
std::map<std::string, uint64_t>* stats_map);
protected:
Env* const env_;
const std::string dbname_;
......@@ -1148,6 +1163,11 @@ class DBImpl : public DB {
void PrintStatistics();
size_t EstiamteStatsHistorySize() const;
// persist stats to column family "_persistent_stats"
void PersistStats();
// dump rocksdb.stats to LOG
void DumpStats();
......@@ -1194,6 +1214,8 @@ class DBImpl : public DB {
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;
// In addition to mutex_, log_write_mutex_ protected writes to stats_history_
InstrumentedMutex stats_history_mutex_;
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and
// logfile_number_. With two_write_queues it also protects alive_log_files_,
// and log_empty_. Refer to the definition of each variable below for more
......@@ -1325,6 +1347,12 @@ class DBImpl : public DB {
bool is_snapshot_supported_;
std::map<uint64_t, std::map<std::string, uint64_t>> stats_history_;
std::map<std::string, uint64_t> stats_slice_;
bool stats_slice_initialized_ = false;
// Class to maintain directories for all database paths other than main one.
class Directories {
public:
......@@ -1588,10 +1616,14 @@ class DBImpl : public DB {
// Only to be set during initialization
std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
// handle for scheduling jobs at fixed intervals
// handle for scheduling stats dumping at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<TERARKDB_NAMESPACE::RepeatableThread> thread_dump_stats_;
// handle for scheduling stats snapshoting at fixed intervals
// REQUIRES: mutex locked
std::unique_ptr<rocksdb::RepeatableThread> thread_persist_stats_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);
......
......@@ -12,8 +12,8 @@
#include "db/db_impl.h"
#include "db/error_handler.h"
#include "monitoring/thread_status_updater.h"
#include "rocksdb/terark_namespace.h"
namespace TERARKDB_NAMESPACE {
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
......@@ -251,10 +251,24 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize(
return GetWalPreallocateBlockSize(write_buffer_size);
}
void DBImpl::TEST_WaitForTimedTaskRun(std::function<void()> callback) const {
void DBImpl::TEST_WaitForDumpStatsRun(std::function<void()> callback) const {
if (thread_dump_stats_ != nullptr) {
thread_dump_stats_->TEST_WaitForRun(callback);
}
}
void DBImpl::TEST_WaitForPersistStatsRun(std::function<void()> callback) const {
if (thread_persist_stats_ != nullptr) {
thread_persist_stats_->TEST_WaitForRun(callback);
}
}
bool DBImpl::TEST_IsPersistentStatsEnabled() const {
return thread_persist_stats_ && thread_persist_stats_->IsRunning();
}
size_t DBImpl::TEST_EstiamteStatsHistorySize() const {
return EstiamteStatsHistorySize();
}
} // namespace TERARKDB_NAMESPACE
#endif // NDEBUG
......@@ -18,6 +18,7 @@
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/stats_history.h"
#include "rocksdb/terark_namespace.h"
#include "util/random.h"
#include "util/sync_point.h"
......@@ -25,6 +26,8 @@
namespace TERARKDB_NAMESPACE {
const int kMicrosInSec = 1000000;
class DBOptionsTest : public DBTestBase {
public:
DBOptionsTest() : DBTestBase("/db_options_test") {}
......@@ -527,10 +530,11 @@ TEST_F(DBOptionsTest, SetStatsDumpPeriodSec) {
for (int i = 0; i < 20; i++) {
int num = rand() % 5000 + 1;
ASSERT_OK(dbfull()->SetDBOptions(
{{"stats_dump_period_sec", std::to_string(num)}}));
ASSERT_OK(
dbfull()->SetDBOptions({{"stats_dump_period_sec", ToString(num)}}));
ASSERT_EQ(num, dbfull()->GetDBOptions().stats_dump_period_sec);
}
Close();
}
TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) {
......@@ -560,13 +564,15 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) {
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec);
dbfull()->TEST_WaitForTimedTaskRun([&] { mock_env->set_current_time(5); });
dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); });
ASSERT_GE(counter, 1);
// Test cacel job through SetOptions
ASSERT_OK(dbfull()->SetDBOptions({{"stats_dump_period_sec", "0"}}));
int old_val = counter;
env_->SleepForMicroseconds(10000000);
for (int i = 6; i < 20; ++i) {
dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(i); });
}
ASSERT_EQ(counter, old_val);
Close();
}
......@@ -605,7 +611,30 @@ TEST_F(DBOptionsTest, StatsPersistScheduling) {
ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled());
Close();
}
// Test persistent stats background thread scheduling and cancelling
TEST_F(DBOptionsTest, StatsPersistScheduling) {
Options options;
options.create_if_missing = true;
options.stats_persist_period_sec = 5;
std::unique_ptr<rocksdb::MockTimeEnv> mock_env;
mock_env.reset(new rocksdb::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds
options.env = mock_env.get();
int counter = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_EQ(5, dbfull()->GetDBOptions().stats_persist_period_sec);
dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
ASSERT_GE(counter, 1);
// Test cacel job through SetOptions
ASSERT_TRUE(dbfull()->TEST_IsPersistentStatsEnabled());
ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}}));
ASSERT_FALSE(dbfull()->TEST_IsPersistentStatsEnabled());
Close();
}
// Test enabling persistent stats for the first time
TEST_F(DBOptionsTest, PersistentStatsFreshInstall) {
Options options;
......@@ -673,7 +702,6 @@ TEST_F(DBOptionsTest, GetStatsHistory) {
});
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
#endif // OS_MACOSX && !NDEBUG
CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
......@@ -728,7 +756,6 @@ TEST_F(DBOptionsTest, InMemoryStatsHistoryPurging) {
});
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
#endif // OS_MACOSX && !NDEBUG
CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar"));
ReopenWithColumnFamilies({"default", "pikachu"}, options);
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#include "db/in_memory_stats_history.h"
namespace rocksdb {
InMemoryStatsHistoryIterator::~InMemoryStatsHistoryIterator() {}
bool InMemoryStatsHistoryIterator::Valid() const { return valid_; }
Status InMemoryStatsHistoryIterator::status() const { return status_; }
void InMemoryStatsHistoryIterator::Next() {
// increment start_time by 1 to avoid infinite loop
AdvanceIteratorByTime(GetStatsTime() + 1, end_time_);
}
uint64_t InMemoryStatsHistoryIterator::GetStatsTime() const { return time_; }
const std::map<std::string, uint64_t>&
InMemoryStatsHistoryIterator::GetStatsMap() const {
return stats_map_;
}
// advance the iterator to the next time between [start_time, end_time)
// if success, update time_ and stats_map_ with new_time and stats_map
void InMemoryStatsHistoryIterator::AdvanceIteratorByTime(uint64_t start_time,
uint64_t end_time) {
// try to find next entry in stats_history_ map
if (db_impl_ != nullptr) {
valid_ =
db_impl_->FindStatsByTime(start_time, end_time, &time_, &stats_map_);
} else {
valid_ = false;
}
}
} // namespace rocksdb
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/stats_history.h"
namespace rocksdb {
class InMemoryStatsHistoryIterator final : public StatsHistoryIterator {
public:
InMemoryStatsHistoryIterator(uint64_t start_time, uint64_t end_time,
DBImpl* db_impl)
: start_time_(start_time),
end_time_(end_time),
valid_(true),
db_impl_(db_impl) {
AdvanceIteratorByTime(start_time_, end_time_);
}
~InMemoryStatsHistoryIterator() override;
bool Valid() const override;
Status status() const override;
void Next() override;
uint64_t GetStatsTime() const override;
const std::map<std::string, uint64_t>& GetStatsMap() const override;
private:
// advance the iterator to the next stats history record with timestamp
// between [start_time, end_time)
void AdvanceIteratorByTime(uint64_t start_time, uint64_t end_time);
// No copying allowed
InMemoryStatsHistoryIterator(const InMemoryStatsHistoryIterator&) = delete;
void operator=(const InMemoryStatsHistoryIterator&) = delete;
InMemoryStatsHistoryIterator(InMemoryStatsHistoryIterator&&) = delete;
InMemoryStatsHistoryIterator& operator=(InMemoryStatsHistoryIterator&&) =
delete;
uint64_t time_;
uint64_t start_time_;
uint64_t end_time_;
std::map<std::string, uint64_t> stats_map_;
Status status_;
bool valid_;
DBImpl* db_impl_;
};
} // namespace rocksdb
......@@ -66,6 +66,7 @@ struct ExternalSstFileInfo;
class WriteBatch;
class Env;
class EventListener;
class StatsHistoryIterator;
class TraceWriter;
using std::unique_ptr;
......@@ -1274,6 +1275,14 @@ class DB {
// Needed for StackableDB
virtual DB* GetRootDB() { return this; }
// Given a time window, return an iterator for accessing stats history
// User is responsible for deleting StatsHistoryIterator after use
virtual Status GetStatsHistory(uint64_t /*start_time*/,
uint64_t /*end_time*/,
std::unique_ptr<StatsHistoryIterator>* /*stats_iterator*/) {
return Status::NotSupported("GetStatsHistory() is not implemented.");
}
private:
// No copying allowed
DB(const DB&);
......
......@@ -733,6 +733,15 @@ struct DBOptions {
// Dynamically changeable through SetDBOptions() API.
unsigned int stats_dump_period_sec = 600;
// if not zero, dump rocksdb.stats to RocksDB every stats_persist_period_sec
// Default: 600
unsigned int stats_persist_period_sec = 600;
// if not zero, periodically take stats snapshots and store in memory, the
// memory size for stats snapshots is capped at stats_history_buffer_size
// Default: 1MB
size_t stats_history_buffer_size = 1024 * 1024;
// If set true, will hint the underlying file system that the file
// access pattern is random, when a sst file is opened.
// Default: true
......
......@@ -8,6 +8,8 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <vector>
......@@ -369,6 +371,11 @@ class Statistics {
return std::string("ToString(): not implemented");
}
virtual bool getTickerMap(std::map<std::string, uint64_t>*) const {
// Do nothing by default
return false;
};
// Override this function to disable particular histogram collection
virtual bool HistEnabledForType(uint32_t type) const {
return type < HISTOGRAM_ENUM_MAX;
......
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <map>
#include <string>
// #include "db/db_impl.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
namespace rocksdb {
class DBImpl;
class StatsHistoryIterator {
public:
StatsHistoryIterator() {}
virtual ~StatsHistoryIterator() {}
virtual bool Valid() const = 0;
// Moves to the next stats history record. After this call, Valid() is
// true iff the iterator was not positioned at the last entry in the source.
// REQUIRES: Valid()
virtual void Next() = 0;
// Return the time stamp (in microseconds) when stats history is recorded.
// REQUIRES: Valid()
virtual uint64_t GetStatsTime() const = 0;
// Return the current stats history as an std::map which specifies the
// mapping from stats name to stats value . The underlying storage
// for the returned map is valid only until the next modification of
// the iterator.
// REQUIRES: Valid()
virtual const std::map<std::string, uint64_t>& GetStatsMap() const = 0;
// If an error has occurred, return it. Else return an ok status.
virtual Status status() const = 0;
};
} // namespace rocksdb
......@@ -4305,7 +4305,7 @@ class JniUtil {
* if an OutOfMemoryError or ArrayIndexOutOfBoundsException
* exception occurs
*
* @return A std::vector<std:string> containing copies of the Java strings
* @return A std::vector<std::string> containing copies of the Java strings
*/
static std::vector<std::string> copyStrings(JNIEnv* env,
jobjectArray jss, jboolean* has_exception) {
......@@ -4323,7 +4323,7 @@ class JniUtil {
* if an OutOfMemoryError or ArrayIndexOutOfBoundsException
* exception occurs
*
* @return A std::vector<std:string> containing copies of the Java strings
* @return A std::vector<std::string> containing copies of the Java strings
*/
static std::vector<std::string> copyStrings(JNIEnv* env,
jobjectArray jss, const jsize jss_len, jboolean* has_exception) {
......@@ -4407,7 +4407,7 @@ class JniUtil {
* @param has_exception (OUT) will be set to JNI_TRUE
* if an OutOfMemoryError exception occurs
*
* @return A std:string copy of the jstring, or an
* @return A std::string copy of the jstring, or an
* empty std::string if has_exception == JNI_TRUE
*/
static std::string copyStdString(JNIEnv* env, jstring js,
......@@ -4438,8 +4438,8 @@ class JniUtil {
* @param bytes The bytes to copy
*
* @return the Java byte[] or nullptr if an exception occurs
*
* @throws RocksDBException thrown
*
* @throws RocksDBException thrown
* if memory size to copy exceeds general java specific array size limitation.
*/
static jbyteArray copyBytes(JNIEnv* env, std::string bytes) {
......@@ -4606,7 +4606,7 @@ class JniUtil {
return jbyte_strings;
}
/**
* Copies bytes to a new jByteArray with the check of java array size limitation.
*
......@@ -4614,29 +4614,29 @@ class JniUtil {
* @param size number of bytes to copy
*
* @return the Java byte[] or nullptr if an exception occurs
*
* @throws RocksDBException thrown
*
* @throws RocksDBException thrown
* if memory size to copy exceeds general java array size limitation to avoid overflow.
*/
static jbyteArray createJavaByteArrayWithSizeCheck(JNIEnv* env, const char* bytes, const size_t size) {
// Limitation for java array size is vm specific
// In general it cannot exceed Integer.MAX_VALUE (2^31 - 1)
// Current HotSpot VM limitation for array size is Integer.MAX_VALUE - 5 (2^31 - 1 - 5)
// It means that the next call to env->NewByteArray can still end with
// It means that the next call to env->NewByteArray can still end with
// OutOfMemoryError("Requested array size exceeds VM limit") coming from VM
static const size_t MAX_JARRAY_SIZE = (static_cast<size_t>(1)) << 31;
if(size > MAX_JARRAY_SIZE) {
TERARKDB_NAMESPACE::RocksDBExceptionJni::ThrowNew(env, "Requested array size exceeds VM limit");
return nullptr;
}
const jsize jlen = static_cast<jsize>(size);
jbyteArray jbytes = env->NewByteArray(jlen);
if(jbytes == nullptr) {
// exception thrown: OutOfMemoryError
// exception thrown: OutOfMemoryError
return nullptr;
}
env->SetByteArrayRegion(jbytes, 0, jlen,
const_cast<jbyte*>(reinterpret_cast<const jbyte*>(bytes)));
if(env->ExceptionCheck()) {
......@@ -4655,8 +4655,8 @@ class JniUtil {
* @param bytes The bytes to copy
*
* @return the Java byte[] or nullptr if an exception occurs
*
* @throws RocksDBException thrown
*
* @throws RocksDBException thrown
* if memory size to copy exceeds general java specific array size limitation.
*/
static jbyteArray copyBytes(JNIEnv* env, const Slice& bytes) {
......
......@@ -329,6 +329,19 @@ std::string StatisticsImpl::ToString() const {
return res;
}
bool StatisticsImpl::getTickerMap(
std::map<std::string, uint64_t>* stats_map) const {
assert(stats_map);
if (!stats_map) return false;
stats_map->clear();
MutexLock lock(&aggregate_lock_);
for (const auto& t : TickersNameMap) {
assert(t.first < TICKER_ENUM_MAX);
(*stats_map)[t.second.c_str()] = getTickerCountLocked(t.first);
}
return true;
}
bool StatisticsImpl::HistEnabledForType(uint32_t type) const {
return type < HISTOGRAM_ENUM_MAX;
}
......
......@@ -6,9 +6,10 @@
#pragma once
#include "rocksdb/statistics.h"
#include <vector>
#include <atomic>
#include <map>
#include <string>
#include <vector>
#include "monitoring/histogram.h"
#include "port/likely.h"
......@@ -57,6 +58,7 @@ class StatisticsImpl : public Statistics {
virtual Status Reset() override;
virtual std::string ToString() const override;
virtual bool getTickerMap(std::map<std::string, uint64_t>*) const override;
virtual bool HistEnabledForType(uint32_t type) const override;
private:
......
......@@ -261,6 +261,8 @@ MutableDBOptions::MutableDBOptions()
max_total_wal_size(0),
delete_obsolete_files_period_micros(6ULL * 60 * 60 * 1000000),
stats_dump_period_sec(600),
stats_persist_period_sec(600),
stats_history_buffer_size(1024 * 1024),
max_open_files(-1),
bytes_per_sync(0),
wal_bytes_per_sync(0),
......@@ -280,6 +282,8 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options)
delete_obsolete_files_period_micros(
options.delete_obsolete_files_period_micros),
stats_dump_period_sec(options.stats_dump_period_sec),
stats_persist_period_sec(options.stats_persist_period_sec),
stats_history_buffer_size(options.stats_history_buffer_size),
max_open_files(options.max_open_files),
bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync),
......@@ -308,6 +312,10 @@ void MutableDBOptions::Dump(Logger* log) const {
delete_obsolete_files_period_micros);
ROCKS_LOG_HEADER(log, " Options.stats_dump_period_sec: %u",
stats_dump_period_sec);
ROCKS_LOG_HEADER(log, " Options.stats_persist_period_sec: %d",
stats_persist_period_sec);
ROCKS_LOG_HEADER(log, " Options.stats_history_buffer_size: %d",
stats_history_buffer_size);
ROCKS_LOG_HEADER(log, " Options.max_open_files: %d",
max_open_files);
ROCKS_LOG_HEADER(log,
......
......@@ -107,6 +107,8 @@ struct MutableDBOptions {
uint64_t max_total_wal_size;
uint64_t delete_obsolete_files_period_micros;
unsigned int stats_dump_period_sec;
unsigned int stats_persist_period_sec;
size_t stats_history_buffer_size;
int max_open_files;
uint64_t bytes_per_sync;
uint64_t wal_bytes_per_sync;
......
......@@ -88,6 +88,10 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.allow_fallocate = immutable_db_options.allow_fallocate;
options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec;
options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec;
options.stats_persist_period_sec =
mutable_db_options.stats_persist_period_sec;
options.stats_history_buffer_size =
mutable_db_options.stats_history_buffer_size;
options.advise_random_on_open = immutable_db_options.advise_random_on_open;
options.allow_mmap_populate = immutable_db_options.allow_mmap_populate;
options.write_buffer_flush_pri = immutable_db_options.write_buffer_flush_pri;
......@@ -1618,6 +1622,14 @@ std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct DBOptions, stats_dump_period_sec), OptionType::kUInt,
OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, stats_dump_period_sec)}},
{"stats_persist_period_sec",
{offsetof(struct DBOptions, stats_persist_period_sec),
OptionType::kUInt, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, stats_persist_period_sec)}},
{"stats_history_buffer_size",
{offsetof(struct DBOptions, stats_history_buffer_size),
OptionType::kSizeT, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, stats_history_buffer_size)}},
{"fail_if_options_file_error",
{offsetof(struct DBOptions, fail_if_options_file_error),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
......
......@@ -274,6 +274,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"manifest_preallocation_size=1222;"
"allow_mmap_writes=false;"
"stats_dump_period_sec=70127;"
"stats_persist_period_sec=54321;"
"stats_history_buffer_size=14159;"
"allow_fallocate=true;"
"allow_mmap_reads=false;"
"use_direct_reads=false;"
......
......@@ -134,6 +134,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"is_fd_close_on_exec", "true"},
{"skip_log_error_on_recovery", "false"},
{"stats_dump_period_sec", "46"},
{"stats_persist_period_sec", "57"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
{"allow_mmap_populate", "true"},
{"use_adaptive_mutex", "false"},
......@@ -272,6 +274,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true);
ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false);
ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U);
ASSERT_EQ(new_db_opt.stats_persist_period_sec, 57U);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.allow_mmap_populate, true);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
......
......@@ -36,6 +36,7 @@ LIB_SOURCES = \
db/flush_job.cc \
db/flush_scheduler.cc \
db/forward_iterator.cc \
db/in_memory_stats_history.cc \
db/internal_stats.cc \
db/logs_with_prep_tracker.cc \
db/log_reader.cc \
......
......@@ -1061,6 +1061,12 @@ DEFINE_bool(identity_as_first_hash, false,
DEFINE_bool(dump_malloc_stats, true, "Dump malloc stats in LOG ");
DEFINE_uint64(stats_dump_period_sec, TERARKDB_NAMESPACE::Options().stats_dump_period_sec,
"Gap between printing stats to log in seconds");
DEFINE_uint64(stats_persist_period_sec,
rocksdb::Options().stats_persist_period_sec,
"Gap between persisting stats in seconds");
DEFINE_uint64(stats_history_buffer_size,
rocksdb::Options().stats_history_buffer_size,
"Max number of stats snapshots to keep in memory");
enum RepFactory {
kSkipList,
......@@ -3562,6 +3568,10 @@ class Benchmark {
options.dump_malloc_stats = FLAGS_dump_malloc_stats;
options.stats_dump_period_sec =
static_cast<unsigned int>(FLAGS_stats_dump_period_sec);
options.stats_persist_period_sec =
static_cast<unsigned int>(FLAGS_stats_persist_period_sec);
options.stats_history_buffer_size =
static_cast<size_t>(FLAGS_stats_history_buffer_size);
options.compression_opts.level = FLAGS_compression_level;
options.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
......
......@@ -48,6 +48,8 @@ class RepeatableThread {
thread_.join();
}
bool IsRunning() { return running_; }
~RepeatableThread() { cancel(); }
#ifndef NDEBUG
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册