提交 78279350 编写于 作者: Y Yi Wu 提交者: Facebook Github Bot

Blob DB: Add statistics

Summary:
Adding a list of blob db counters.

Also remove WaStats() which doesn't expose the stats and can be substitute by (BLOB_DB_BYTES_WRITTEN / BLOB_DB_BLOB_FILE_BYTES_WRITTEN).
Closes https://github.com/facebook/rocksdb/pull/3193

Differential Revision: D6394216

Pulled By: yiwu-arbug

fbshipit-source-id: 017508c8ff3fcd7ea7403c64d0f9834b24816803
上级 3cf562be
......@@ -226,6 +226,73 @@ enum Tickers : uint32_t {
// Number of internal keys skipped by Iterator
NUMBER_ITER_SKIP,
// BlobDB specific stats
// # of Put/PutTTL/PutUntil to BlobDB.
BLOB_DB_NUM_PUT,
// # of Write to BlobDB.
BLOB_DB_NUM_WRITE,
// # of Get to BlobDB.
BLOB_DB_NUM_GET,
// # of MultiGet to BlobDB.
BLOB_DB_NUM_MULTIGET,
// # of Seek/SeekToFirst/SeekToLast/SeekForPrev to BlobDB iterator.
BLOB_DB_NUM_SEEK,
// # of Next to BlobDB iterator.
BLOB_DB_NUM_NEXT,
// # of Prev to BlobDB iterator.
BLOB_DB_NUM_PREV,
// # of keys written to BlobDB.
BLOB_DB_NUM_KEYS_WRITTEN,
// # of keys read from BlobDB.
BLOB_DB_NUM_KEYS_READ,
// # of bytes (key + value) written to BlobDB.
BLOB_DB_BYTES_WRITTEN,
// # of bytes (keys + value) read from BlobDB.
BLOB_DB_BYTES_READ,
// # of keys written by BlobDB as non-TTL inlined value.
BLOB_DB_WRITE_INLINED,
// # of keys written by BlobDB as TTL inlined value.
BLOB_DB_WRITE_INLINED_TTL,
// # of keys written by BlobDB as non-TTL blob value.
BLOB_DB_WRITE_BLOB,
// # of keys written by BlobDB as TTL blob value.
BLOB_DB_WRITE_BLOB_TTL,
// # of bytes written to blob file.
BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
// # of bytes read from blob file.
BLOB_DB_BLOB_FILE_BYTES_READ,
// # of times a blob files being synced.
BLOB_DB_BLOB_FILE_SYNCED,
// # of blob index evicted from base DB by BlobDB compaction filter because
// of expiration.
BLOB_DB_BLOB_INDEX_EXPIRED,
// # of blob files being garbage collected.
BLOB_DB_GC_NUM_FILES,
// # of blob files generated by garbage collection.
BLOB_DB_GC_NUM_NEW_FILES,
// # of BlobDB garbage collection failures.
BLOB_DB_GC_FAILURES,
// # of keys drop by BlobDB garbage collection because they had been
// overwritten.
BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
// # of keys drop by BlobDB garbage collection because of expiration.
BLOB_DB_GC_NUM_KEYS_EXPIRED,
// # of keys relocated to new blob file by garbage collection.
BLOB_DB_GC_NUM_KEYS_RELOCATED,
// # of bytes drop by BlobDB garbage collection because they had been
// overwritten.
BLOB_DB_GC_BYTES_OVERWRITTEN,
// # of bytes drop by BlobDB garbage collection because of expiration.
BLOB_DB_GC_BYTES_EXPIRED,
// # of bytes relocated to new blob file by garbage collection.
BLOB_DB_GC_BYTES_RELOCATED,
// # of blob files evicted because of BlobDB is full.
BLOB_DB_FIFO_NUM_FILES_EVICTED,
// # of keys in the blob files evicted because of BlobDB is full.
BLOB_DB_FIFO_NUM_KEYS_EVICTED,
// # of bytes in the blob files evicted because of BlobDB is full.
BLOB_DB_FIFO_BYTES_EVICTED,
TICKER_ENUM_MAX
};
......@@ -268,9 +335,9 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{COMPACTION_KEY_DROP_RANGE_DEL, "rocksdb.compaction.key.drop.range_del"},
{COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user"},
{COMPACTION_RANGE_DEL_DROP_OBSOLETE,
"rocksdb.compaction.range_del.drop.obsolete"},
"rocksdb.compaction.range_del.drop.obsolete"},
{COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
"rocksdb.compaction.optimized.del.drop.obsolete"},
"rocksdb.compaction.optimized.del.drop.obsolete"},
{NUMBER_KEYS_WRITTEN, "rocksdb.number.keys.written"},
{NUMBER_KEYS_READ, "rocksdb.number.keys.read"},
{NUMBER_KEYS_UPDATED, "rocksdb.number.keys.updated"},
......@@ -332,6 +399,37 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{READ_AMP_TOTAL_READ_BYTES, "rocksdb.read.amp.total.read.bytes"},
{NUMBER_RATE_LIMITER_DRAINS, "rocksdb.number.rate_limiter.drains"},
{NUMBER_ITER_SKIP, "rocksdb.number.iter.skip"},
{BLOB_DB_NUM_PUT, "rocksdb.blobdb.num.put"},
{BLOB_DB_NUM_WRITE, "rocksdb.blobdb.num.write"},
{BLOB_DB_NUM_GET, "rocksdb.blobdb.num.get"},
{BLOB_DB_NUM_MULTIGET, "rocksdb.blobdb.num.multiget"},
{BLOB_DB_NUM_SEEK, "rocksdb.blobdb.num.seek"},
{BLOB_DB_NUM_NEXT, "rocksdb.blobdb.num.next"},
{BLOB_DB_NUM_PREV, "rocksdb.blobdb.num.prev"},
{BLOB_DB_NUM_KEYS_WRITTEN, "rocksdb.blobdb.num.keys.written"},
{BLOB_DB_NUM_KEYS_READ, "rocksdb.blobdb.num.keys.read"},
{BLOB_DB_BYTES_WRITTEN, "rocksdb.blobdb.bytes.written"},
{BLOB_DB_BYTES_READ, "rocksdb.blobdb.bytes.read"},
{BLOB_DB_WRITE_INLINED, "rocksdb.blobdb.write.inlined"},
{BLOB_DB_WRITE_INLINED_TTL, "rocksdb.blobdb.write.inlined.ttl"},
{BLOB_DB_WRITE_BLOB, "rocksdb.blobdb.write.blob"},
{BLOB_DB_WRITE_BLOB_TTL, "rocksdb.blobdb.write.blob.ttl"},
{BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "rocksdb.blobdb.blob.file.bytes.written"},
{BLOB_DB_BLOB_FILE_BYTES_READ, "rocksdb.blobdb.blob.file,bytes.read"},
{BLOB_DB_BLOB_FILE_SYNCED, "rocksdb.blobdb.blob.file.synced"},
{BLOB_DB_BLOB_INDEX_EXPIRED, "rocksdb.blobdb.blob.index.expired"},
{BLOB_DB_GC_NUM_FILES, "rocksdb.blobdb.gc.num.files"},
{BLOB_DB_GC_NUM_NEW_FILES, "rocksdb.blobdb.gc.num.new.files"},
{BLOB_DB_GC_FAILURES, "rocksdb.blobdb.gc.failures"},
{BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, "rocksdb.blobdb.gc.num.keys.overwritten"},
{BLOB_DB_GC_NUM_KEYS_EXPIRED, "rocksdb.blobdb.gc.num.keys.expired"},
{BLOB_DB_GC_NUM_KEYS_RELOCATED, "rocksdb.blobdb.gc.num.keys.relocated"},
{BLOB_DB_GC_BYTES_OVERWRITTEN, "rocksdb.blobdb.gc.bytes.overwritten"},
{BLOB_DB_GC_BYTES_EXPIRED, "rocksdb.blobdb.gc.bytes.expired"},
{BLOB_DB_GC_BYTES_RELOCATED, "rocksdb.blobdb.gc.bytes.relocated"},
{BLOB_DB_FIFO_NUM_FILES_EVICTED, "rocksdb.blobdb.fifo.num.files.evicted"},
{BLOB_DB_FIFO_NUM_KEYS_EVICTED, "rocksdb.blobdb.fifo.num.keys.evicted"},
{BLOB_DB_FIFO_BYTES_EVICTED, "rocksdb.blobdb.fifo.bytes.evicted"},
};
/**
......@@ -383,6 +481,36 @@ enum Histograms : uint32_t {
// requests.
READ_NUM_MERGE_OPERANDS,
// BlobDB specific stats
// Size of keys written to BlobDB.
BLOB_DB_KEY_SIZE,
// Size of values written to BlobDB.
BLOB_DB_VALUE_SIZE,
// BlobDB Put/PutWithTTL/PutUntil/Write latency.
BLOB_DB_WRITE_MICROS,
// BlobDB Get lagency.
BLOB_DB_GET_MICROS,
// BlobDB MultiGet latency.
BLOB_DB_MULTIGET_MICROS,
// BlobDB Seek/SeekToFirst/SeekToLast/SeekForPrev latency.
BLOB_DB_SEEK_MICROS,
// BlobDB Next latency.
BLOB_DB_NEXT_MICROS,
// BlobDB Prev latency.
BLOB_DB_PREV_MICROS,
// Blob file write latency.
BLOB_DB_BLOB_FILE_WRITE_MICROS,
// Blob file read latency.
BLOB_DB_BLOB_FILE_READ_MICROS,
// Blob file sync latency.
BLOB_DB_BLOB_FILE_SYNC_MICROS,
// BlobDB garbage collection time.
BLOB_DB_GC_MICROS,
// BlobDB compression time.
BLOB_DB_COMPRESSION_MICROS,
// BlobDB decompression time.
BLOB_DB_DECOMPRESSION_MICROS,
HISTOGRAM_ENUM_MAX, // TODO(ldemailly): enforce HistogramsNameMap match
};
......@@ -418,6 +546,20 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
{COMPRESSION_TIMES_NANOS, "rocksdb.compression.times.nanos"},
{DECOMPRESSION_TIMES_NANOS, "rocksdb.decompression.times.nanos"},
{READ_NUM_MERGE_OPERANDS, "rocksdb.read.num.merge_operands"},
{BLOB_DB_KEY_SIZE, "rocksdb.blobdb.key.size"},
{BLOB_DB_VALUE_SIZE, "rocksdb.blobdb.value.size"},
{BLOB_DB_WRITE_MICROS, "rocksdb.blobdb.write.micros"},
{BLOB_DB_GET_MICROS, "rocksdb.blobdb.get.micros"},
{BLOB_DB_MULTIGET_MICROS, "rocksdb.blobdb.multiget.micros"},
{BLOB_DB_SEEK_MICROS, "rocksdb.blobdb.seek.micros"},
{BLOB_DB_NEXT_MICROS, "rocksdb.blobdb.next.micros"},
{BLOB_DB_PREV_MICROS, "rocksdb.blobdb.prev.micros"},
{BLOB_DB_BLOB_FILE_WRITE_MICROS, "rocksdb.blobdb.blob.file.write.micros"},
{BLOB_DB_BLOB_FILE_READ_MICROS, "rocksdb.blobdb.blob.file.read.micros"},
{BLOB_DB_BLOB_FILE_SYNC_MICROS, "rocksdb.blobdb.blob.file.sync.micros"},
{BLOB_DB_GC_MICROS, "rocksdb.blobdb.gc.micros"},
{BLOB_DB_COMPRESSION_MICROS, "rocksdb.blobdb.compression.micros"},
{BLOB_DB_DECOMPRESSION_MICROS, "rocksdb.blobdb.decompression.micros"},
};
struct HistogramData {
......
......@@ -5,6 +5,7 @@
#pragma once
#ifndef ROCKSDB_LITE
#include "monitoring/statistics.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "utilities/blob_db/blob_index.h"
......@@ -15,8 +16,12 @@ namespace blob_db {
// CompactionFilter to delete expired blob index from base DB.
class BlobIndexCompactionFilter : public CompactionFilter {
public:
explicit BlobIndexCompactionFilter(uint64_t current_time)
: current_time_(current_time) {}
BlobIndexCompactionFilter(uint64_t current_time, Statistics* statistics)
: current_time_(current_time), statistics_(statistics) {}
virtual ~BlobIndexCompactionFilter() {
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED, expired_count_);
}
virtual const char* Name() const override {
return "BlobIndexCompactionFilter";
......@@ -40,6 +45,7 @@ class BlobIndexCompactionFilter : public CompactionFilter {
}
if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
// Expired
expired_count_++;
return Decision::kRemove;
}
return Decision::kKeep;
......@@ -47,11 +53,16 @@ class BlobIndexCompactionFilter : public CompactionFilter {
private:
const uint64_t current_time_;
Statistics* statistics_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
};
class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
public:
explicit BlobIndexCompactionFilterFactory(Env* env) : env_(env) {}
BlobIndexCompactionFilterFactory(Env* env, Statistics* statistics)
: env_(env), statistics_(statistics) {}
virtual const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
......@@ -65,12 +76,13 @@ class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
return nullptr;
}
assert(current_time >= 0);
return std::unique_ptr<CompactionFilter>(
new BlobIndexCompactionFilter(static_cast<uint64_t>(current_time)));
return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
static_cast<uint64_t>(current_time), statistics_));
}
private:
Env* env_;
Statistics* statistics_;
};
} // namespace blob_db
......
......@@ -70,7 +70,8 @@ Status BlobDB::OpenAndLoad(const Options& options,
}
changed_options->compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(options.env));
new BlobIndexCompactionFilterFactory(options.env,
options.statistics.get()));
changed_options->listeners.emplace_back(fblistener);
if (bdb_options.enable_garbage_collection) {
changed_options->listeners.emplace_back(ce_listener);
......@@ -163,7 +164,8 @@ Status BlobDB::Open(const DBOptions& db_options_input,
return Status::NotSupported("Blob DB doesn't support compaction filter.");
}
cf_options.compaction_filter_factory.reset(
new BlobIndexCompactionFilterFactory(db_options.env));
new BlobIndexCompactionFilterFactory(db_options.env,
db_options.statistics.get()));
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options);
// we need to open blob db first so that recovery can happen
......
......@@ -14,6 +14,7 @@
#include "db/db_impl.h"
#include "db/write_batch_internal.h"
#include "monitoring/instrumented_mutex.h"
#include "monitoring/statistics.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
......@@ -30,6 +31,7 @@
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
#include "util/timer_queue.h"
#include "utilities/blob_db/blob_db_iterator.h"
......@@ -106,16 +108,13 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
bdb_options_(blob_db_options),
db_options_(db_options),
env_options_(db_options),
statistics_(db_options_.statistics.get()),
dir_change_(false),
next_file_number_(1),
epoch_of_(0),
shutdown_(false),
current_epoch_(0),
open_file_count_(0),
last_period_write_(0),
last_period_ampl_(0),
total_periods_write_(0),
total_periods_ampl_(0),
total_blob_space_(0),
open_p1_done_(false),
debug_level_(0),
......@@ -163,16 +162,13 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
bdb_options_(blob_db_options),
db_options_(db->GetOptions()),
env_options_(db_->GetOptions()),
statistics_(db_options_.statistics.get()),
dir_change_(false),
next_file_number_(1),
epoch_of_(0),
shutdown_(false),
current_epoch_(0),
open_file_count_(0),
last_period_write_(0),
last_period_ampl_(0),
total_periods_write_(0),
total_periods_ampl_(0),
total_blob_space_(0),
oldest_file_evicted_(false) {
if (!bdb_options_.blob_dir.empty())
......@@ -227,8 +223,6 @@ void BlobDBImpl::StartBackgroundTasks() {
std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
tqueue_.add(kSanityCheckPeriodMillisecs,
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
tqueue_.add(kWriteAmplificationStatsPeriodMillisecs,
std::bind(&BlobDBImpl::WaStats, this, std::placeholders::_1));
tqueue_.add(kFSyncFilesPeriodMillisecs,
std::bind(&BlobDBImpl::FsyncFiles, this, std::placeholders::_1));
tqueue_.add(
......@@ -490,8 +484,8 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
}
bfile->log_writer_ = std::make_shared<Writer>(
std::move(fwriter), bfile->file_number_, bdb_options_.bytes_per_sync,
db_options_.use_fsync, boffset);
std::move(fwriter), env_, statistics_, bfile->file_number_,
bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
bfile->log_writer_->last_elem_type_ = et;
return s;
......@@ -745,7 +739,8 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
};
Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_WRITE);
uint32_t default_cf_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
// TODO(yiwu): In case there are multiple writers the latest sequence would
......@@ -856,6 +851,8 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration) {
StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_PUT);
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
Status s;
WriteBatch batch;
......@@ -888,11 +885,13 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
if (expiration == kNoExpiration) {
// Put as normal value
s = batch->Put(key, value);
RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
} else {
// Inlined with TTL
BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
index_entry);
RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
}
} else {
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
......@@ -911,6 +910,11 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
&index_entry);
if (expiration == kNoExpiration) {
RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
} else {
RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
}
if (s.ok()) {
bfile->ExtendSequenceRange(sequence);
......@@ -932,6 +936,11 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
}
}
RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
MeasureTime(statistics_, BLOB_DB_KEY_SIZE, key.size());
MeasureTime(statistics_, BLOB_DB_VALUE_SIZE, value.size());
return s;
}
......@@ -940,6 +949,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
if (bdb_options_.compression == kNoCompression) {
return raw;
}
StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
CompressionType ct = bdb_options_.compression;
CompressionOptions compression_opts;
CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat,
......@@ -991,6 +1001,11 @@ bool BlobDBImpl::EvictOldestBlobFile() {
oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second);
obsolete_files_.push_back(oldest_file);
oldest_file_evicted_.store(true);
RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
oldest_file->BlobCount());
RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
oldest_file->GetFileSize());
return true;
}
......@@ -1048,7 +1063,6 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
bfile->blob_count_++;
bfile->file_size_ += size_put;
last_period_write_ += size_put;
total_blob_space_ += size_put;
if (expiration == kNoExpiration) {
......@@ -1066,6 +1080,8 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
std::vector<Status> BlobDBImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ReadOptions ro(read_options);
......@@ -1169,7 +1185,12 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
char* buffer = &(*valueptr)[0];
Slice blob_value;
s = reader->Read(blob_index.offset(), blob_index.size(), &blob_value, buffer);
{
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
s = reader->Read(blob_index.offset(), blob_index.size(), &blob_value,
buffer);
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_value.size());
}
if (!s.ok() || blob_value.size() != blob_index.size()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
......@@ -1218,10 +1239,14 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
if (bfile->compression() != kNoCompression) {
BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
s = UncompressBlockContentsForCompressionType(
blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bfile->compression(),
*(cfh->cfd()->ioptions()));
{
StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS);
s = UncompressBlockContentsForCompressionType(
blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bfile->compression(),
*(cfh->cfd()->ioptions()));
}
*(value->GetSelf()) = contents.data.ToString();
}
......@@ -1233,6 +1258,14 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
Status BlobDBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_GET);
return GetImpl(read_options, column_family, key, value);
}
Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
if (column_family != DefaultColumnFamily()) {
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
......@@ -1258,6 +1291,8 @@ Status BlobDBImpl::Get(const ReadOptions& read_options,
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
}
RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
return s;
}
......@@ -1546,35 +1581,6 @@ std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
return std::make_pair(true, -1);
}
// TODO(yiwu): correct the stats and expose it.
std::pair<bool, int64_t> BlobDBImpl::WaStats(bool aborted) {
if (aborted) return std::make_pair(false, -1);
WriteLock wl(&mutex_);
if (all_periods_write_.size() >= kWriteAmplificationStatsPeriods) {
total_periods_write_ -= (*all_periods_write_.begin());
total_periods_ampl_ = (*all_periods_ampl_.begin());
all_periods_write_.pop_front();
all_periods_ampl_.pop_front();
}
uint64_t val1 = last_period_write_.load();
uint64_t val2 = last_period_ampl_.load();
all_periods_write_.push_back(val1);
all_periods_ampl_.push_back(val2);
last_period_write_ = 0;
last_period_ampl_ = 0;
total_periods_write_ += val1;
total_periods_ampl_ += val2;
return std::make_pair(true, -1);
}
// Write callback for garbage collection to check if key has been updated
// since last read. Similar to how OptimisticTransaction works. See inline
// comment in GCFileAndUpdateLSM().
......@@ -1635,6 +1641,7 @@ class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback {
// DELETED in the LSM
Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gc_stats) {
StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS);
uint64_t now = EpochNow();
std::shared_ptr<Reader> reader =
......@@ -1727,6 +1734,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
if (get_status.IsNotFound() || !is_blob_index) {
// Either the key is deleted or updated with a newer version whish is
// inlined in LSM.
gc_stats->num_keys_overwritten++;
gc_stats->bytes_overwritten += record.record_size();
continue;
}
......@@ -1742,6 +1751,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
blob_index.file_number() != bfptr->BlobFileNumber() ||
blob_index.offset() != blob_offset) {
// Key has been overwritten. Drop the blob record.
gc_stats->num_keys_overwritten++;
gc_stats->bytes_overwritten += record.record_size();
continue;
}
......@@ -1751,8 +1762,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
// TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
// We can just drop the blob record.
if (no_relocation_ttl || (has_ttl && now >= record.expiration)) {
gc_stats->num_deletes++;
gc_stats->deleted_size += record.value_size;
gc_stats->num_keys_expired++;
gc_stats->bytes_expired += record.record_size();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
WriteBatch delete_batch;
Status delete_status = delete_batch.Delete(record.key);
......@@ -1760,12 +1771,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
delete_status = db_impl_->WriteWithCallback(WriteOptions(),
&delete_batch, &callback);
}
if (delete_status.ok()) {
gc_stats->delete_succeeded++;
} else if (delete_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
gc_stats->overwritten_while_delete++;
} else {
if (!delete_status.ok() && !delete_status.IsBusy()) {
// We hit an error.
s = delete_status;
ROCKS_LOG_ERROR(db_options_.info_log,
......@@ -1788,7 +1794,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
std::string reason("GC of ");
reason += bfptr->PathName();
newfile = NewBlobFile(reason);
gc_stats->newfile = newfile;
new_writer = CheckOrCreateWriterLocked(newfile);
newfile->header_ = std::move(header);
......@@ -1810,9 +1815,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
}
gc_stats->num_relocate++;
std::string new_index_entry;
uint64_t new_blob_offset = 0;
uint64_t new_key_offset = 0;
// write the blob to the blob log.
......@@ -1838,10 +1841,12 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
if (rewrite_status.ok()) {
newfile->ExtendSequenceRange(
WriteBatchInternal::Sequence(&rewrite_batch));
gc_stats->relocate_succeeded++;
gc_stats->num_keys_relocated++;
gc_stats->bytes_relocated += record.record_size();
} else if (rewrite_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
gc_stats->overwritten_while_relocate++;
gc_stats->num_keys_overwritten++;
gc_stats->bytes_overwritten += record.record_size();
} else {
// We hit an error.
s = rewrite_status;
......@@ -1864,17 +1869,34 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
ROCKS_LOG_INFO(
db_options_.info_log,
"%s blob file %" PRIu64
". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64
" succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.",
"%s blob file %" PRIu64 ". Total blob records: %" PRIu64
", Expired: %" PRIu64 " keys/%" PRIu64 " bytes, Overwritten: %" PRIu64
" keys/%" PRIu64 " bytes.",
s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->delete_succeeded,
gc_stats->num_deletes, gc_stats->relocate_succeeded,
gc_stats->num_relocate);
bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired,
gc_stats->bytes_expired, gc_stats->num_keys_overwritten,
gc_stats->bytes_overwritten, gc_stats->num_keys_relocated,
gc_stats->bytes_relocated);
RecordTick(statistics_, BLOB_DB_GC_NUM_FILES);
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
gc_stats->num_keys_overwritten);
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED,
gc_stats->num_keys_expired);
RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN,
gc_stats->bytes_overwritten);
RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired);
if (newfile != nullptr) {
total_blob_space_ += newfile->file_size_;
ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
newfile->BlobFileNumber());
RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES);
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
gc_stats->num_keys_relocated);
RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED,
gc_stats->bytes_relocated);
}
if (!s.ok()) {
RecordTick(statistics_, BLOB_DB_GC_FAILURES);
}
return s;
}
......@@ -2120,8 +2142,10 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
if (bfile->gc_once_after_open_.load()) {
WriteLock lockbfile_w(&bfile->mutex_);
bfile->deleted_size_ = gc_stats.deleted_size;
bfile->deleted_count_ = gc_stats.num_deletes;
bfile->deleted_size_ =
gc_stats.bytes_overwritten + gc_stats.bytes_expired;
bfile->deleted_count_ =
gc_stats.num_keys_overwritten + gc_stats.num_keys_expired;
bfile->gc_once_after_open_ = false;
}
}
......@@ -2144,7 +2168,7 @@ Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
auto* iter = db_impl_->NewIteratorImpl(
read_options, cfd, snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/);
return new BlobDBIterator(own_snapshot, iter, this);
return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
}
Status DestroyBlobDB(const std::string& dbname, const Options& options,
......
......@@ -24,6 +24,7 @@
#include "rocksdb/db.h"
#include "rocksdb/listener.h"
#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
#include "rocksdb/wal_filter.h"
#include "util/mpsc.h"
#include "util/mutexlock.h"
......@@ -135,16 +136,12 @@ struct blobf_compare_ttl {
struct GCStats {
uint64_t blob_count = 0;
uint64_t num_deletes = 0;
uint64_t deleted_size = 0;
uint64_t retry_delete = 0;
uint64_t delete_succeeded = 0;
uint64_t overwritten_while_delete = 0;
uint64_t num_relocate = 0;
uint64_t retry_relocate = 0;
uint64_t relocate_succeeded = 0;
uint64_t overwritten_while_relocate = 0;
std::shared_ptr<BlobFile> newfile = nullptr;
uint64_t num_keys_overwritten = 0;
uint64_t num_keys_expired = 0;
uint64_t num_keys_relocated = 0;
uint64_t bytes_overwritten = 0;
uint64_t bytes_expired = 0;
uint64_t bytes_relocated = 0;
};
/**
......@@ -178,10 +175,6 @@ class BlobDBImpl : public BlobDB {
// how many periods of stats do we keep.
static constexpr uint32_t kWriteAmplificationStatsPeriods = 24;
// what is the length of any period
static constexpr uint32_t kWriteAmplificationStatsPeriodMillisecs =
3600 * 1000;
// we will garbage collect blob files in
// which entire files have expired. However if the
// ttl_range of files is very large say a day, we
......@@ -292,6 +285,10 @@ class BlobDBImpl : public BlobDB {
// Return true if a snapshot is created.
bool SetSnapshotIfNeeded(ReadOptions* read_options);
Status GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value);
Status GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
......@@ -364,9 +361,6 @@ class BlobDBImpl : public BlobDB {
// efficiency
std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
// periodically print write amplification statistics
std::pair<bool, int64_t> WaStats(bool aborted);
// background task to do book-keeping of deleted keys
std::pair<bool, int64_t> EvictDeletions(bool aborted);
......@@ -444,6 +438,9 @@ class BlobDBImpl : public BlobDB {
DBOptions db_options_;
EnvOptions env_options_;
// Raw pointer of statistic. db_options_ has a shared_ptr to hold ownership.
Statistics* statistics_;
// name of the database directory
std::string dbname_;
......@@ -519,18 +516,6 @@ class BlobDBImpl : public BlobDB {
// counter is used to monitor and close excess RA files.
std::atomic<uint32_t> open_file_count_;
// should hold mutex to modify
// STATISTICS for WA of Blob Files due to GC
// collect by default 24 hourly periods
std::list<uint64_t> all_periods_write_;
std::list<uint64_t> all_periods_ampl_;
std::atomic<uint64_t> last_period_write_;
std::atomic<uint64_t> last_period_ampl_;
uint64_t total_periods_write_;
uint64_t total_periods_ampl_;
// total size of all blob files at a given time
std::atomic<uint64_t> total_blob_space_;
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
......
......@@ -6,7 +6,9 @@
#pragma once
#ifndef ROCKSDB_LITE
#include "monitoring/statistics.h"
#include "rocksdb/iterator.h"
#include "util/stop_watch.h"
#include "utilities/blob_db/blob_db_impl.h"
namespace rocksdb {
......@@ -17,8 +19,12 @@ using rocksdb::ManagedSnapshot;
class BlobDBIterator : public Iterator {
public:
BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter,
BlobDBImpl* blob_db)
: snapshot_(snapshot), iter_(iter), blob_db_(blob_db) {}
BlobDBImpl* blob_db, Env* env, Statistics* statistics)
: snapshot_(snapshot),
iter_(iter),
blob_db_(blob_db),
env_(env),
statistics_(statistics) {}
virtual ~BlobDBIterator() = default;
......@@ -37,33 +43,45 @@ class BlobDBIterator : public Iterator {
}
void SeekToFirst() override {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->SeekToFirst();
UpdateBlobValue();
}
void SeekToLast() override {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->SeekToLast();
UpdateBlobValue();
}
void Seek(const Slice& target) override {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->Seek(target);
UpdateBlobValue();
}
void SeekForPrev(const Slice& target) override {
StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_SEEK);
iter_->SeekForPrev(target);
UpdateBlobValue();
}
void Next() override {
assert(Valid());
StopWatch next_sw(env_, statistics_, BLOB_DB_NEXT_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_NEXT);
iter_->Next();
UpdateBlobValue();
}
void Prev() override {
assert(Valid());
StopWatch prev_sw(env_, statistics_, BLOB_DB_PREV_MICROS);
RecordTick(statistics_, BLOB_DB_NUM_PREV);
iter_->Prev();
UpdateBlobValue();
}
......@@ -96,6 +114,8 @@ class BlobDBIterator : public Iterator {
std::unique_ptr<ManagedSnapshot> snapshot_;
std::unique_ptr<ArenaWrappedDBIter> iter_;
BlobDBImpl* blob_db_;
Env* env_;
Statistics* statistics_;
Status status_;
PinnableSlice value_;
};
......
......@@ -260,8 +260,8 @@ TEST_F(BlobDBTest, PutWithTTL) {
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
......@@ -290,8 +290,8 @@ TEST_F(BlobDBTest, PutUntil) {
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
......@@ -323,8 +323,8 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(100, gc_stats.num_relocate);
ASSERT_EQ(0, gc_stats.num_keys_expired);
ASSERT_EQ(100, gc_stats.num_keys_relocated);
VerifyDB(data);
}
......@@ -370,8 +370,8 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
......@@ -418,8 +418,8 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
......@@ -475,8 +475,8 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
......@@ -675,8 +675,8 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(200, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(200 - new_keys, gc_stats.num_relocate);
ASSERT_EQ(0, gc_stats.num_keys_expired);
ASSERT_EQ(200 - new_keys, gc_stats.num_keys_relocated);
VerifyDB(data);
}
......@@ -704,10 +704,9 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(1, gc_stats.num_relocate);
ASSERT_EQ(0, gc_stats.relocate_succeeded);
ASSERT_EQ(1, gc_stats.overwritten_while_relocate);
ASSERT_EQ(0, gc_stats.num_keys_expired);
ASSERT_EQ(1, gc_stats.num_keys_overwritten);
ASSERT_EQ(0, gc_stats.num_keys_relocated);
writer.join();
VerifyDB({{"foo", "v2"}});
}
......@@ -741,10 +740,8 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_deletes);
ASSERT_EQ(0, gc_stats.delete_succeeded);
ASSERT_EQ(1, gc_stats.overwritten_while_delete);
ASSERT_EQ(0, gc_stats.num_relocate);
ASSERT_EQ(1, gc_stats.num_keys_expired);
ASSERT_EQ(0, gc_stats.num_keys_relocated);
writer.join();
VerifyDB({{"foo", "v2"}});
}
......@@ -838,8 +835,7 @@ TEST_F(BlobDBTest, ReadWhileGC) {
GCStats gc_stats;
ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_relocate);
ASSERT_EQ(1, gc_stats.relocate_succeeded);
ASSERT_EQ(1, gc_stats.num_keys_relocated);
blob_db_impl()->TEST_DeleteObsoleteFiles();
// The file shouln't be deleted
blob_files = blob_db_impl()->TEST_GetBlobFiles();
......@@ -904,11 +900,11 @@ TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
ASSERT_TRUE(bfile->Obsolete());
ASSERT_EQ(1, gc_stats.blob_count);
if (delete_key) {
ASSERT_EQ(0, gc_stats.num_relocate);
ASSERT_EQ(0, gc_stats.num_keys_relocated);
ASSERT_EQ(bfile->GetSequenceRange().second + 1,
bfile->GetObsoleteSequence());
} else {
ASSERT_EQ(1, gc_stats.num_relocate);
ASSERT_EQ(1, gc_stats.num_keys_relocated);
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
bfile->GetObsoleteSequence());
}
......
......@@ -100,8 +100,8 @@ std::shared_ptr<Reader> BlobFile::OpenSequentialReader(
std::unique_ptr<SequentialFileReader> sfile_reader;
sfile_reader.reset(new SequentialFileReader(std::move(sfile)));
std::shared_ptr<Reader> log_reader =
std::make_shared<Reader>(db_options.info_log, std::move(sfile_reader));
std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
std::move(sfile_reader), db_options.env, db_options.statistics.get());
return log_reader;
}
......
......@@ -111,6 +111,8 @@ struct BlobLogRecord {
std::string key_buf;
std::string value_buf;
uint64_t record_size() const { return kHeaderSize + key_size + value_size; }
void EncodeHeaderTo(std::string* dst);
Status DecodeHeaderFrom(Slice src);
......
......@@ -9,22 +9,30 @@
#include <algorithm>
#include "monitoring/statistics.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h"
namespace rocksdb {
namespace blob_db {
Reader::Reader(std::shared_ptr<Logger> info_log,
unique_ptr<SequentialFileReader>&& _file)
: info_log_(info_log), file_(std::move(_file)), buffer_(), next_byte_(0) {}
Reader::Reader(unique_ptr<SequentialFileReader>&& file_reader, Env* env,
Statistics* statistics)
: file_(std::move(file_reader)),
env_(env),
statistics_(statistics),
buffer_(),
next_byte_(0) {}
Status Reader::ReadSlice(uint64_t size, Slice* slice, std::string* buf) {
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
buf->reserve(size);
Status s = file_->Read(size, slice, &(*buf)[0]);
next_byte_ += size;
if (!s.ok()) {
return s;
}
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size());
if (slice->size() != size) {
return Status::Corruption("EOF reached while reading record");
}
......
......@@ -10,7 +10,9 @@
#include <memory>
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "utilities/blob_db/blob_log_format.h"
......@@ -37,17 +39,8 @@ class Reader {
// Create a reader that will return log records from "*file".
// "*file" must remain live while this Reader is in use.
//
// If "reporter" is non-nullptr, it is notified whenever some data is
// dropped due to a detected corruption. "*reporter" must remain
// live while this Reader is in use.
//
// If "checksum" is true, verify checksums if available.
//
// The Reader will start reading at the first record located at physical
// position >= initial_offset within the file.
Reader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& file);
Reader(std::unique_ptr<SequentialFileReader>&& file_reader, Env* env,
Statistics* statistics);
~Reader() = default;
......@@ -68,17 +61,14 @@ class Reader {
Status ReadSlice(uint64_t size, Slice* slice, std::string* buf);
SequentialFileReader* file() { return file_.get(); }
void ResetNextByte() { next_byte_ = 0; }
uint64_t GetNextByte() const { return next_byte_; }
const SequentialFileReader* file_reader() const { return file_.get(); }
private:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
Env* env_;
Statistics* statistics_;
std::string backing_store_;
Slice buffer_;
......
......@@ -8,17 +8,23 @@
#include <cstdint>
#include <string>
#include "monitoring/statistics.h"
#include "rocksdb/env.h"
#include "util/coding.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h"
#include "utilities/blob_db/blob_log_format.h"
namespace rocksdb {
namespace blob_db {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
uint64_t bpsync, bool use_fs, uint64_t boffset)
Writer::Writer(unique_ptr<WritableFileWriter>&& dest, Env* env,
Statistics* statistics, uint64_t log_number, uint64_t bpsync,
bool use_fs, uint64_t boffset)
: dest_(std::move(dest)),
env_(env),
statistics_(statistics),
log_number_(log_number),
block_offset_(boffset),
bytes_per_sync_(bpsync),
......@@ -26,7 +32,11 @@ Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
use_fsync_(use_fs),
last_elem_type_(kEtNone) {}
void Writer::Sync() { dest_->Sync(use_fsync_); }
void Writer::Sync() {
StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
dest_->Sync(use_fsync_);
RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
}
Status Writer::WriteHeader(BlobLogHeader& header) {
assert(block_offset_ == 0);
......@@ -40,6 +50,8 @@ Status Writer::WriteHeader(BlobLogHeader& header) {
s = dest_->Flush();
}
last_elem_type_ = kEtFileHdr;
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
BlobLogHeader::kSize);
return s;
}
......@@ -58,6 +70,8 @@ Status Writer::AppendFooter(BlobLogFooter& footer) {
}
last_elem_type_ = kEtFileFooter;
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
BlobLogFooter::kSize);
return s;
}
......@@ -98,6 +112,7 @@ void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
Status s = dest_->Append(Slice(headerbuf));
if (s.ok()) {
s = dest_->Append(key);
......@@ -113,6 +128,8 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
*blob_offset = *key_offset + key.size();
block_offset_ = *blob_offset + val.size();
last_elem_type_ = kEtRecord;
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
BlobLogRecord::kHeaderSize + key.size() + val.size());
return s;
}
......
......@@ -10,7 +10,9 @@
#include <memory>
#include <string>
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "utilities/blob_db/blob_log_format.h"
......@@ -34,9 +36,9 @@ class Writer {
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit Writer(std::unique_ptr<WritableFileWriter>&& dest,
uint64_t log_number, uint64_t bpsync, bool use_fsync,
uint64_t boffset = 0);
Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
Statistics* statistics, uint64_t log_number, uint64_t bpsync,
bool use_fsync, uint64_t boffset = 0);
~Writer() = default;
......@@ -75,6 +77,8 @@ class Writer {
private:
std::unique_ptr<WritableFileWriter> dest_;
Env* env_;
Statistics* statistics_;
uint64_t log_number_;
uint64_t block_offset_; // Current offset in block
uint64_t bytes_per_sync_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册