提交 ad6aaf4f 编写于 作者: S Siying Dong

Merge pull request #848 from SherlockNoMad/db_bench

Split histogram per OperationType in db_bench
......@@ -39,6 +39,7 @@ int main() {
#include <condition_variable>
#include <mutex>
#include <thread>
#include <unordered_map>
#include "db/db_impl.h"
#include "db/version_set.h"
......@@ -741,11 +742,6 @@ DEFINE_bool(identity_as_first_hash, false, "the first hash function of cuckoo "
"table becomes an identity function. This is only valid when key "
"is 8 bytes");
enum PutOrMerge {
kPut,
kMerge
};
enum RepFactory {
kSkipList,
kPrefixHash,
......@@ -1163,6 +1159,35 @@ class ReporterAgent {
bool stop_;
};
enum OperationType : unsigned char {
kRead = 0,
kWrite,
kDelete,
kSeek,
kMerge,
kUpdate,
kCompress,
kUncompress,
kCrc,
kHash,
kOthers
};
static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
OperationTypeString = {
{kRead, "read"},
{kWrite, "write"},
{kDelete, "delete"},
{kSeek, "seek"},
{kMerge, "merge"},
{kUpdate, "update"},
{kCompress, "compress"},
{kCompress, "uncompress"},
{kCrc, "crc"},
{kHash, "hash"},
{kOthers, "op"}
};
class Stats {
private:
int id_;
......@@ -1175,7 +1200,8 @@ class Stats {
int64_t bytes_;
double last_op_finish_;
double last_report_finish_;
HistogramImpl hist_;
std::unordered_map<OperationType, HistogramImpl,
std::hash<unsigned char>> hist_;
std::string message_;
bool exclude_from_merge_;
ReporterAgent* reporter_agent_; // does not own
......@@ -1191,7 +1217,7 @@ class Stats {
id_ = id;
next_report_ = FLAGS_stats_interval ? FLAGS_stats_interval : 100;
last_op_finish_ = start_;
hist_.Clear();
hist_.clear();
done_ = 0;
last_report_done_ = 0;
bytes_ = 0;
......@@ -1208,7 +1234,15 @@ class Stats {
if (other.exclude_from_merge_)
return;
hist_.Merge(other.hist_);
for (auto it = other.hist_.begin(); it != other.hist_.end(); ++it) {
auto this_it = hist_.find(it->first);
if (this_it != hist_.end()) {
this_it->second.Merge(other.hist_.at(it->first));
} else {
hist_.insert({ it->first, it->second });
}
}
done_ += other.done_;
bytes_ += other.bytes_;
seconds_ += other.seconds_;
......@@ -1261,14 +1295,22 @@ class Stats {
}
}
void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops) {
void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
enum OperationType op_type = kOthers) {
if (reporter_agent_) {
reporter_agent_->ReportFinishedOps(num_ops);
}
if (FLAGS_histogram) {
double now = FLAGS_env->NowMicros();
double micros = now - last_op_finish_;
hist_.Add(micros);
if (hist_.find(op_type) == hist_.end())
{
HistogramImpl hist_temp;
hist_.insert({op_type, hist_temp});
}
hist_[op_type].Add(micros);
if (micros > 20000 && !FLAGS_stats_interval) {
fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
fflush(stderr);
......@@ -1397,7 +1439,11 @@ class Stats {
(extra.empty() ? "" : " "),
extra.c_str());
if (FLAGS_histogram) {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
for (auto it = hist_.begin(); it != hist_.end(); ++it) {
fprintf(stdout, "Microseconds per %s:\n%s\n",
OperationTypeString[it->first].c_str(),
it->second.ToString().c_str());
}
}
if (FLAGS_report_file_operations) {
ReportFileOpEnv* env = static_cast<ReportFileOpEnv*>(FLAGS_env);
......@@ -2138,7 +2184,7 @@ class Benchmark {
uint32_t crc = 0;
while (bytes < 500 * 1048576) {
crc = crc32c::Value(data.data(), size);
thread->stats.FinishedOps(nullptr, nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc);
bytes += size;
}
// Print so result is not dead
......@@ -2157,7 +2203,7 @@ class Benchmark {
unsigned int xxh32 = 0;
while (bytes < 500 * 1048576) {
xxh32 = XXH32(data.data(), size, 0);
thread->stats.FinishedOps(nullptr, nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
bytes += size;
}
// Print so result is not dead
......@@ -2178,7 +2224,7 @@ class Benchmark {
ptr = ap.load(std::memory_order_acquire);
}
count++;
thread->stats.FinishedOps(nullptr, nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1, kOthers);
}
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}
......@@ -2196,7 +2242,7 @@ class Benchmark {
ok = CompressSlice(input, &compressed);
produced += compressed.size();
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
}
if (!ok) {
......@@ -2257,7 +2303,7 @@ class Benchmark {
}
delete[] uncompressed;
bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1);
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
}
if (!ok) {
......@@ -2788,7 +2834,7 @@ class Benchmark {
}
s = db_with_cfh->db->Write(write_options_, &batch);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db,
entries_per_batch_);
entries_per_batch_, kWrite);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
......@@ -2816,7 +2862,7 @@ class Benchmark {
int64_t bytes = 0;
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kRead);
++i;
}
delete iter;
......@@ -2839,7 +2885,7 @@ class Benchmark {
int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kRead);
++i;
}
delete iter;
......@@ -2879,7 +2925,7 @@ class Benchmark {
++nonexist;
}
}
thread->stats.FinishedOps(nullptr, db, 100);
thread->stats.FinishedOps(nullptr, db, 100, kRead);
} while (!duration.Done(100));
char msg[100];
......@@ -2947,7 +2993,7 @@ class Benchmark {
fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str());
abort();
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1);
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
}
char msg[100];
......@@ -2995,7 +3041,7 @@ class Benchmark {
abort();
}
}
thread->stats.FinishedOps(nullptr, db, entries_per_batch_);
thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kRead);
}
char msg[100];
......@@ -3011,7 +3057,7 @@ class Benchmark {
DB* db = SelectDB(thread);
Iterator* iter = db->NewIterator(options);
delete iter;
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kOthers);
}
}
......@@ -3019,7 +3065,7 @@ class Benchmark {
if (thread->tid > 0) {
IteratorCreation(thread);
} else {
BGWriter(thread, kPut);
BGWriter(thread, kWrite);
}
}
......@@ -3088,7 +3134,7 @@ class Benchmark {
assert(iter_to_use->status().ok());
}
thread->stats.FinishedOps(&db_, db_.db, 1);
thread->stats.FinishedOps(&db_, db_.db, 1, kSeek);
}
delete single_iter;
for (auto iter : multi_iters) {
......@@ -3109,7 +3155,7 @@ class Benchmark {
if (thread->tid > 0) {
SeekRandom(thread);
} else {
BGWriter(thread, kPut);
BGWriter(thread, kWrite);
}
}
......@@ -3137,7 +3183,7 @@ class Benchmark {
batch.Delete(key);
}
auto s = db->Write(write_options_, &batch);
thread->stats.FinishedOps(nullptr, db, entries_per_batch_);
thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete);
if (!s.ok()) {
fprintf(stderr, "del error: %s\n", s.ToString().c_str());
exit(1);
......@@ -3158,7 +3204,7 @@ class Benchmark {
if (thread->tid > 0) {
ReadRandom(thread);
} else {
BGWriter(thread, kPut);
BGWriter(thread, kWrite);
}
}
......@@ -3170,7 +3216,7 @@ class Benchmark {
}
}
void BGWriter(ThreadState* thread, enum PutOrMerge write_merge) {
void BGWriter(ThreadState* thread, enum OperationType write_merge) {
// Special thread that keeps writing until other threads are done.
RandomGenerator gen;
double last = FLAGS_env->NowMicros();
......@@ -3203,7 +3249,7 @@ class Benchmark {
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Status s;
if (write_merge == kPut) {
if (write_merge == kWrite) {
s = db->Put(write_options_, key, gen.Generate(value_size_));
} else {
s = db->Merge(write_options_, key, gen.Generate(value_size_));
......@@ -3214,7 +3260,7 @@ class Benchmark {
exit(1);
}
bytes += key.size() + value_size_;
thread->stats.FinishedOps(&db_, db_.db, 1);
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
++num_writes;
if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) {
......@@ -3355,6 +3401,7 @@ class Benchmark {
}
get_weight--;
gets_done++;
thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
......@@ -3365,6 +3412,7 @@ class Benchmark {
}
put_weight--;
puts_done++;
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
} else if (delete_weight > 0) {
Status s = DeleteMany(db, write_options_, key);
if (!s.ok()) {
......@@ -3373,9 +3421,8 @@ class Benchmark {
}
delete_weight--;
deletes_done++;
thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
}
thread->stats.FinishedOps(&db_, db_.db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg),
......@@ -3422,6 +3469,7 @@ class Benchmark {
}
get_weight--;
reads_done++;
thread->stats.FinishedOps(nullptr, db, 1, kRead);
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
......@@ -3432,8 +3480,8 @@ class Benchmark {
}
put_weight--;
writes_done++;
thread->stats.FinishedOps(nullptr, db, 1, kWrite);
}
thread->stats.FinishedOps(nullptr, db, 1);
}
char msg[100];
snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \
......@@ -3475,7 +3523,7 @@ class Benchmark {
exit(1);
}
bytes += key.size() + value_size_;
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
}
char msg[100];
snprintf(msg, sizeof(msg),
......@@ -3530,7 +3578,7 @@ class Benchmark {
exit(1);
}
bytes += key.size() + value.size();
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
}
char msg[100];
......@@ -3568,7 +3616,7 @@ class Benchmark {
exit(1);
}
bytes += key.size() + value_size_;
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kMerge);
}
// Print some statistics
......@@ -3610,9 +3658,8 @@ class Benchmark {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
num_merges++;
thread->stats.FinishedOps(nullptr, db, 1, kMerge);
} else {
Status s = db->Get(options, key, &value);
if (value.length() > max_length)
......@@ -3625,12 +3672,9 @@ class Benchmark {
} else if (!s.IsNotFound()) {
num_hits++;
}
num_gets++;
thread->stats.FinishedOps(nullptr, db, 1, kRead);
}
thread->stats.FinishedOps(nullptr, db, 1);
}
char msg[100];
......@@ -3657,7 +3701,7 @@ class Benchmark {
GenerateKeyFromInt(i, FLAGS_num, &key);
iter->Seek(key);
assert(iter->Valid() && iter->key() == key);
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kSeek);
for (int j = 0; j < FLAGS_seek_nexts && i + 1 < FLAGS_num; ++j) {
if (!FLAGS_reverse_iterator) {
......@@ -3667,12 +3711,12 @@ class Benchmark {
}
GenerateKeyFromInt(++i, FLAGS_num, &key);
assert(iter->Valid() && iter->key() == key);
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kSeek);
}
iter->Seek(key);
assert(iter->Valid() && iter->key() == key);
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kSeek);
}
}
......@@ -3834,7 +3878,7 @@ class Benchmark {
}
if (!failed) {
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kOthers);
}
transactions_done++;
......@@ -3960,7 +4004,7 @@ class Benchmark {
exit(1);
}
thread->stats.FinishedOps(nullptr, db, 1);
thread->stats.FinishedOps(nullptr, db, 1, kOthers);
}
char msg[200];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册