提交 465b9103 编写于 作者: V Vamsi Ponnekanti

[Add a second kind of verification to db_stress

Summary:
Currently the test tracks all writes in memory and
uses it for verification at the end. This has 4 problems:
(a) It needs mutex for each write to ensure in-memory update
and leveldb update are done atomically. This slows down the
benchmark.
(b) Verification phase at the end is time consuming as well
(c) Does not test batch writes or snapshots
(d) We cannot kill the test and restart multiple times in a
loop because in-memory state will be lost.

I am adding a FLAGS_multi that does MultiGet/MultiPut/MultiDelete
instead of get/put/delete to get/put/delete a group of related
keys with same values atomically. Every get retrieves the group
of keys and checks that their values are same. This does not have
the above problems but the downside is that it does less amount
of validation than the other approach.

Test Plan:
This whole this is a test! Here is a small run. I am doing larger run now.

[nponnekanti@dev902 /data/users/nponnekanti/rocksdb] ./db_stress --ops_per_thread=10000 --multi=1 --ops_per_key=25
LevelDB version     : 1.5
Number of threads   : 32
Ops per thread      : 10000
Read percentage     : 10
Delete percentage   : 30
Max key             : 2147483648
Num times DB reopens: 10
Num keys per lock   : 4
Compression         : snappy
------------------------------------------------
Creating 536870912 locks
2013/02/20-16:59:32  Starting database operations
Created bg thread 0x7f9ebcfff700
2013/02/20-16:59:37  Reopening database for the 1th time
2013/02/20-16:59:46  Reopening database for the 2th time
2013/02/20-16:59:57  Reopening database for the 3th time
2013/02/20-17:00:11  Reopening database for the 4th time
2013/02/20-17:00:25  Reopening database for the 5th time
2013/02/20-17:00:36  Reopening database for the 6th time
2013/02/20-17:00:47  Reopening database for the 7th time
2013/02/20-17:00:59  Reopening database for the 8th time
2013/02/20-17:01:10  Reopening database for the 9th time
2013/02/20-17:01:20  Reopening database for the 10th time
2013/02/20-17:01:31  Reopening database for the 11th time
2013/02/20-17:01:31  Starting verification
Stress Test : 109.125 micros/op 22191 ops/sec
            : Wrote 0.00 MB (0.23 MB/sec) (59% of 32 ops)
            : Deleted 10 times
2013/02/20-17:01:31  Verification successful

Revert Plan: OK

Task ID: #

Reviewers: dhruba, emayanke

Reviewed By: emayanke

CC: leveldb

Differential Revision: https://reviews.facebook.net/D8733
上级 959337ed
......@@ -39,6 +39,16 @@ static uint32_t FLAGS_seed = 2341234;
// Max number of key/values to place in database
static long FLAGS_max_key = 2 * KB * KB * KB;
// If set, the test uses MultiGet, MultiPut and MultiDelete that
// do a different kind of validation during the test itself,
// rather than at the end. This is meant to solve the following
// problems at the expense of doing less degree of validation.
// (a) No need to acquire mutexes during writes (less cache flushes
// in multi-core leading to speed up)
// (b) No long validation at the end (more speed up)
// (c) Also test snapshot and atomicity of batch writes
static bool FLAGS_test_batches_snapshots = false;
// Number of concurrent threads to run.
static int FLAGS_threads = 32;
......@@ -165,6 +175,8 @@ class Stats {
long done_;
long writes_;
long deletes_;
long founds_;
long errors_;
int next_report_;
size_t bytes_;
double last_op_finish_;
......@@ -179,6 +191,8 @@ class Stats {
done_ = 0;
writes_ = 0;
deletes_ = 0;
founds_ = 0;
errors_ = 0;
bytes_ = 0;
seconds_ = 0;
start_ = FLAGS_env->NowMicros();
......@@ -228,13 +242,21 @@ class Stats {
}
}
void AddBytesForOneWrite(size_t n) {
writes_ ++;
bytes_ += n;
void AddBytesForWrites(int nwrites, size_t nbytes) {
writes_ += nwrites;
bytes_ += nbytes;
}
void AddDeletes(int n) {
deletes_ += n;
}
void AddOneDelete() {
deletes_ ++;
void AddFounds(int n) {
founds_ += n;
}
void AddErrors(int n) {
errors_ += n;
}
void Report(const char* name) {
......@@ -255,6 +277,8 @@ class Stats {
fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n",
"", bytes_mb, rate, (100*writes_)/done_, done_);
fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
fprintf(stdout, "%-12s: Found lookups %ld times\n", "", founds_);
fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_);
if (FLAGS_histogram) {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
......@@ -281,10 +305,17 @@ class SharedState {
start_(false),
start_verify_(false),
stress_test_(stress_test) {
if (FLAGS_test_batches_snapshots) {
key_locks_ = nullptr;
values_ = nullptr;
fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
return;
}
values_ = new uint32_t[max_key_];
for (long i = 0; i < max_key_; i++) {
values_[i] = SENTINEL;
}
long num_locks = (max_key_ >> log2_keys_per_lock_);
if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
num_locks ++;
......@@ -533,7 +564,10 @@ class StressTest {
}
}
thread->shared->GetStressTest()->VerifyDb(*(thread->shared), thread->tid);
if (!FLAGS_test_batches_snapshots) {
thread->shared->GetStressTest()->VerifyDb(*(thread->shared),
thread->tid);
}
{
MutexLock l(shared->GetMutex());
......@@ -545,6 +579,119 @@ class StressTest {
}
// Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ...
// ("9"+K, "9"+V) in DB atomically i.e in a single batch.
// Also refer MultiGet.
Status MultiPut(ThreadState* thread,
const WriteOptions& writeoptions,
const Slice& key, const Slice& value, size_t sz) {
std::string keys[10] = {"9", "8", "7", "6", "5",
"4", "3", "2", "1", "0"};
std::string values[10] = {"9", "8", "7", "6", "5",
"4", "3", "2", "1", "0"};
Slice value_slices[10];
WriteBatch batch;
Status s;
for (int i = 0; i < 10; i++) {
keys[i] += key.ToString();
values[i] += value.ToString();
value_slices[i] = values[i];
batch.Put(keys[i], value_slices[i]);
}
s = db_->Write(writeoptions, &batch);
if (!s.ok()) {
fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
// we did 10 writes each of size sz + 1
thread->stats.AddBytesForWrites(10, (sz + 1) * 10);
}
return s;
}
// Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K)
// in DB atomically i.e in a single batch. Also refer MultiGet.
Status MultiDelete(ThreadState* thread,
const WriteOptions& writeoptions,
const Slice& key) {
std::string keys[10] = {"9", "7", "5", "3", "1",
"8", "6", "4", "2", "0"};
WriteBatch batch;
Status s;
for (int i = 0; i < 10; i++) {
keys[i] += key.ToString();
batch.Delete(keys[i]);
}
s = db_->Write(writeoptions, &batch);
if (!s.ok()) {
fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddDeletes(10);
}
return s;
}
// Given a key K, this gets values for "0"+K, "1"+K,..."9"+K
// in the same snapshot, and verifies that all the values are of the form
// "0"+V, "1"+V,..."9"+V.
// ASSUMES that MultiPut was used to put (K, V) into the DB.
Status MultiGet(ThreadState* thread,
const ReadOptions& readoptions,
const Slice& key, std::string* value) {
std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
Slice key_slices[10];
std::string values[10];
ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = db_->GetSnapshot();
Status s;
for (int i = 0; i < 10; i++) {
keys[i] += key.ToString();
key_slices[i] = keys[i];
s = db_->Get(readoptionscopy, key_slices[i], value);
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "get error: %s\n", s.ToString().c_str());
values[i] = "";
thread->stats.AddErrors(1);
// we continue after error rather than exiting so that we can
// find more errors if any
} else if (s.IsNotFound()) {
values[i] = "";
} else {
values[i] = *value;
char expected_prefix = (keys[i])[0];
char actual_prefix = (values[i])[0];
if (actual_prefix != expected_prefix) {
fprintf(stderr, "expected prefix = %c actual = %c\n",
expected_prefix, actual_prefix);
}
(values[i])[0] = ' '; // blank out the differing character
thread->stats.AddFounds(1);
}
}
db_->ReleaseSnapshot(readoptionscopy.snapshot);
// Now that we retrieved all values, check that they all match
for (int i = 1; i < 10; i++) {
if (values[i] != values[0]) {
fprintf(stderr, "inconsistent values for key %s: %s, %s\n",
key.ToString().c_str(), values[0].c_str(),
values[i].c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
}
}
return s;
}
void OperateDb(ThreadState* thread) {
ReadOptions read_opts(FLAGS_verify_checksum, true);
WriteOptions write_opts;
......@@ -573,36 +720,45 @@ class StressTest {
thread->stats.Start();
}
}
long rand_key = thread->rand.Next() % max_key;
Slice key((char*)&rand_key, sizeof(rand_key));
//Read:10%;Delete:30%;Write:60%
unsigned int probability_operation = thread->rand.Uniform(100);
if (probability_operation < FLAGS_readpercent) {
// read load
db_->Get(read_opts, key, &from_db);
FLAGS_test_batches_snapshots ?
MultiGet(thread, read_opts, key, &from_db) :
db_->Get(read_opts, key, &from_db);
} else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) {
//introduce delete load
{
if (!FLAGS_test_batches_snapshots) {
MutexLock l(thread->shared->GetMutexForKey(rand_key));
thread->shared->Delete(rand_key);
db_->Delete(write_opts, key);
thread->stats.AddDeletes(1);
} else {
MultiDelete(thread, write_opts, key);
}
thread->stats.AddOneDelete();
} else {
// write load
uint32_t value_base = thread->rand.Next();
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
{
if (!FLAGS_test_batches_snapshots) {
MutexLock l(thread->shared->GetMutexForKey(rand_key));
if (FLAGS_verify_before_write) {
VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true);
}
thread->shared->Put(rand_key, value_base);
db_->Put(write_opts, key, v);
thread->stats.AddBytesForWrites(1, sz);
} else {
MultiPut(thread, write_opts, key, v, sz);
}
PrintKeyValue(rand_key, value, sz);
thread->stats.AddBytesForOneWrite(sz);
}
thread->stats.FinishedSingleOp();
}
......@@ -682,7 +838,11 @@ class StressTest {
fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent);
fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent);
fprintf(stdout, "Max key : %ld\n", FLAGS_max_key);
fprintf(stdout, "Ratio #ops/#keys : %ld\n",
(FLAGS_ops_per_thread * FLAGS_threads)/FLAGS_max_key);
fprintf(stdout, "Num times DB reopens: %d\n", FLAGS_reopen);
fprintf(stdout, "Batches/snapshots : %d\n",
FLAGS_test_batches_snapshots);
fprintf(stdout, "Num keys per lock : %d\n",
1 << FLAGS_log2_keys_per_lock);
......@@ -807,6 +967,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--verify_before_write=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_verify_before_write = n;
} else if (sscanf(argv[i], "--test_batches_snapshots=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) {
FLAGS_test_batches_snapshots = n;
} else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
FLAGS_threads = n;
} else if (sscanf(argv[i], "--value_size_mult=%d%c", &n, &junk) == 1) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册