提交 687119ae 编写于 作者: A anand76 提交者: Facebook Github Bot

Variable key length in db_stress (#6273)

Summary:
Undo https://github.com/facebook/rocksdb/issues/6243 and fix the crash test failures.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6273

Test Plan: Run make ubsan_crash_test

Differential Revision: D19331472

Pulled By: anand1976

fbshipit-source-id: 30aa4a36c1b0f77a97159d82bbfd1cd767878e28
上级 6a998938
...@@ -247,7 +247,9 @@ class CfConsistencyStressTest : public StressTest { ...@@ -247,7 +247,9 @@ class CfConsistencyStressTest : public StressTest {
std::string upper_bound; std::string upper_bound;
Slice ub_slice; Slice ub_slice;
ReadOptions ro_copy = readoptions; ReadOptions ro_copy = readoptions;
if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) { // Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
ub_slice = Slice(upper_bound); ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice; ro_copy.iterate_upper_bound = &ub_slice;
} }
...@@ -255,13 +257,13 @@ class CfConsistencyStressTest : public StressTest { ...@@ -255,13 +257,13 @@ class CfConsistencyStressTest : public StressTest {
column_families_[rand_column_families[thread->rand.Next() % column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]]; rand_column_families.size()]];
Iterator* iter = db_->NewIterator(ro_copy, cfh); Iterator* iter = db_->NewIterator(ro_copy, cfh);
long count = 0; unsigned long count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) { iter->Next()) {
++count; ++count;
} }
assert(prefix_to_use == 0 || assert(prefix_to_use == 0 ||
count <= (static_cast<long>(1) << ((8 - prefix_to_use) * 8))); count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
Status s = iter->status(); Status s = iter->status();
if (s.ok()) { if (s.ok()) {
thread->stats.AddPrefixes(1, count); thread->stats.AddPrefixes(1, count);
......
...@@ -82,6 +82,9 @@ DECLARE_uint64(seed); ...@@ -82,6 +82,9 @@ DECLARE_uint64(seed);
DECLARE_bool(read_only); DECLARE_bool(read_only);
DECLARE_int64(max_key); DECLARE_int64(max_key);
DECLARE_double(hot_key_alpha); DECLARE_double(hot_key_alpha);
DECLARE_int32(max_key_len);
DECLARE_string(key_len_percent_dist);
DECLARE_int32(key_window_scale_factor);
DECLARE_int32(column_families); DECLARE_int32(column_families);
DECLARE_string(options_file); DECLARE_string(options_file);
DECLARE_int64(active_width); DECLARE_int64(active_width);
...@@ -352,7 +355,7 @@ inline bool GetNextPrefix(const rocksdb::Slice& src, std::string* v) { ...@@ -352,7 +355,7 @@ inline bool GetNextPrefix(const rocksdb::Slice& src, std::string* v) {
#endif #endif
// convert long to a big-endian slice key // convert long to a big-endian slice key
extern inline std::string Key(int64_t val) { extern inline std::string GetStringFromInt(int64_t val) {
std::string little_endian_key; std::string little_endian_key;
std::string big_endian_key; std::string big_endian_key;
PutFixed64(&little_endian_key, val); PutFixed64(&little_endian_key, val);
...@@ -364,16 +367,107 @@ extern inline std::string Key(int64_t val) { ...@@ -364,16 +367,107 @@ extern inline std::string Key(int64_t val) {
return big_endian_key; return big_endian_key;
} }
// A struct for maintaining the parameters for generating variable length keys
struct KeyGenContext {
// Number of adjacent keys in one cycle of key lengths
uint64_t window;
// Number of keys of each possible length in a given window
std::vector<uint64_t> weights;
};
extern KeyGenContext key_gen_ctx;
// Generate a variable length key string from the given int64 val. The
// order of the keys is preserved. The key could be anywhere from 8 to
// max_key_len * 8 bytes.
// The algorithm picks the length based on the
// offset of the val within a configured window and the distribution of the
// number of keys of various lengths in that window. For example, if x, y, x are
// the weights assigned to each possible key length, the keys generated would be
// - {0}...{x-1}
// {(x-1),0}..{(x-1),(y-1)},{(x-1),(y-1),0}..{(x-1),(y-1),(z-1)} and so on.
// Additionally, a trailer of 0-7 bytes could be appended.
extern inline std::string Key(int64_t val) {
uint64_t window = key_gen_ctx.window;
size_t levels = key_gen_ctx.weights.size();
std::string key;
for (size_t level = 0; level < levels; ++level) {
uint64_t weight = key_gen_ctx.weights[level];
uint64_t offset = static_cast<uint64_t>(val) % window;
uint64_t mult = static_cast<uint64_t>(val) / window;
uint64_t pfx = mult * weight + (offset >= weight ? weight - 1 : offset);
key.append(GetStringFromInt(pfx));
if (offset < weight) {
// Use the bottom 3 bits of offset as the number of trailing 'x's in the
// key. If the next key is going to be of the next level, then skip the
// trailer as it would break ordering. If the key length is already at max,
// skip the trailer.
if (offset < weight - 1 && level < levels - 1) {
size_t trailer_len = offset & 0x7;
key.append(trailer_len, 'x');
}
break;
}
val = offset - weight;
window -= weight;
}
return key;
}
// Given a string key, map it to an index into the expected values buffer
extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) { extern inline bool GetIntVal(std::string big_endian_key, uint64_t* key_p) {
unsigned int size_key = sizeof(*key_p); size_t size_key = big_endian_key.size();
assert(big_endian_key.size() == size_key); std::vector<uint64_t> prefixes;
assert(size_key <= key_gen_ctx.weights.size() * sizeof(uint64_t));
// Pad with zeros to make it a multiple of 8. This function may be called
// with a prefix, in which case we return the first index that falls
// inside or outside that prefix, dependeing on whether the prefix is
// the start of upper bound of a scan
unsigned int pad = sizeof(uint64_t) - (size_key % sizeof(uint64_t));
if (pad < sizeof(uint64_t)) {
big_endian_key.append(pad, '\0');
size_key += pad;
}
std::string little_endian_key; std::string little_endian_key;
little_endian_key.resize(size_key); little_endian_key.resize(size_key);
for (size_t i = 0; i < size_key; ++i) { for (size_t start = 0; start < size_key; start += sizeof(uint64_t)) {
little_endian_key[i] = big_endian_key[size_key - 1 - i]; size_t end = start + sizeof(uint64_t);
for (size_t i = 0; i < sizeof(uint64_t); ++i) {
little_endian_key[start + i] = big_endian_key[end - 1 - i];
} }
Slice little_endian_slice = Slice(little_endian_key); Slice little_endian_slice =
return GetFixed64(&little_endian_slice, key_p); Slice(&little_endian_key[start], sizeof(uint64_t));
uint64_t pfx;
if (!GetFixed64(&little_endian_slice, &pfx)) {
return false;
}
prefixes.emplace_back(pfx);
}
uint64_t key = 0;
for (size_t i = 0; i < prefixes.size(); ++i) {
uint64_t pfx = prefixes[i];
key += (pfx / key_gen_ctx.weights[i]) * key_gen_ctx.window +
pfx % key_gen_ctx.weights[i];
}
*key_p = key;
return true;
}
extern inline uint64_t GetPrefixKeyCount(const std::string& prefix,
const std::string& ub) {
uint64_t start = 0;
uint64_t end = 0;
if (!GetIntVal(prefix, &start) || !GetIntVal(ub, &end)) {
return 0;
}
return end - start;
} }
extern inline std::string StringToHex(const std::string& str) { extern inline std::string StringToHex(const std::string& str) {
......
...@@ -28,6 +28,17 @@ DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests"); ...@@ -28,6 +28,17 @@ DEFINE_bool(read_only, false, "True if open DB in read-only mode during tests");
DEFINE_int64(max_key, 1 * KB * KB, DEFINE_int64(max_key, 1 * KB * KB,
"Max number of key/values to place in database"); "Max number of key/values to place in database");
DEFINE_int32(max_key_len, 3, "Maximum length of a key in 8-byte units");
DEFINE_string(key_len_percent_dist, "",
"Percentages of keys of various lengths. For example, 1,30,69 "
"means 1% of keys are 8 bytes, 30% are 16 bytes, and 69% are "
"24 bytes. If not specified, it will be evenly distributed");
DEFINE_int32(key_window_scale_factor, 10,
"This value will be multiplied by 100 to come up with a window "
"size for varying the key length");
DEFINE_int32(column_families, 10, "Number of column families"); DEFINE_int32(column_families, 10, "Number of column families");
DEFINE_double( DEFINE_double(
......
...@@ -30,6 +30,8 @@ static std::shared_ptr<rocksdb::Env> env_guard; ...@@ -30,6 +30,8 @@ static std::shared_ptr<rocksdb::Env> env_guard;
static std::shared_ptr<rocksdb::DbStressEnvWrapper> env_wrapper_guard; static std::shared_ptr<rocksdb::DbStressEnvWrapper> env_wrapper_guard;
} // namespace } // namespace
KeyGenContext key_gen_ctx;
int db_stress_tool(int argc, char** argv) { int db_stress_tool(int argc, char** argv) {
SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) +
" [OPTIONS]..."); " [OPTIONS]...");
...@@ -197,6 +199,38 @@ int db_stress_tool(int argc, char** argv) { ...@@ -197,6 +199,38 @@ int db_stress_tool(int argc, char** argv) {
rocksdb_kill_odds = FLAGS_kill_random_test; rocksdb_kill_odds = FLAGS_kill_random_test;
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
unsigned int levels = FLAGS_max_key_len;
std::vector<std::string> weights;
uint64_t scale_factor = FLAGS_key_window_scale_factor;
key_gen_ctx.window = scale_factor * 100;
if (!FLAGS_key_len_percent_dist.empty()) {
weights = SplitString(FLAGS_key_len_percent_dist);
if (weights.size() != levels) {
fprintf(stderr,
"Number of weights in key_len_dist should be equal to"
" max_key_len");
exit(1);
}
uint64_t total_weight = 0;
for (std::string& weight : weights) {
uint64_t val = std::stoull(weight);
key_gen_ctx.weights.emplace_back(val * scale_factor);
total_weight += val;
}
if (total_weight != 100) {
fprintf(stderr, "Sum of all weights in key_len_dist should be 100");
exit(1);
}
} else {
uint64_t keys_per_level = key_gen_ctx.window / levels;
for (unsigned int level = 0; level < levels - 1; ++level) {
key_gen_ctx.weights.emplace_back(keys_per_level);
}
key_gen_ctx.weights.emplace_back(key_gen_ctx.window -
keys_per_level * (levels - 1));
}
std::unique_ptr<rocksdb::StressTest> stress; std::unique_ptr<rocksdb::StressTest> stress;
if (FLAGS_test_cf_consistency) { if (FLAGS_test_cf_consistency) {
stress.reset(CreateCfConsistencyStressTest()); stress.reset(CreateCfConsistencyStressTest());
......
...@@ -35,29 +35,35 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -35,29 +35,35 @@ class NonBatchedOpsStressTest : public StressTest {
} }
if (!thread->rand.OneIn(2)) { if (!thread->rand.OneIn(2)) {
// Use iterator to verify this range // Use iterator to verify this range
Slice prefix;
std::string seek_key = Key(start);
std::unique_ptr<Iterator> iter( std::unique_ptr<Iterator> iter(
db_->NewIterator(options, column_families_[cf])); db_->NewIterator(options, column_families_[cf]));
iter->Seek(Key(start)); iter->Seek(seek_key);
prefix = Slice(seek_key.data(), prefix_to_use);
for (auto i = start; i < end; i++) { for (auto i = start; i < end; i++) {
if (thread->shared->HasVerificationFailedYet()) { if (thread->shared->HasVerificationFailedYet()) {
break; break;
} }
// Reseek when the prefix changes
if (prefix_to_use > 0 &&
i % (static_cast<uint64_t>(1) << 8 * (8 - prefix_to_use)) == 0) {
iter->Seek(Key(i));
}
std::string from_db; std::string from_db;
std::string keystr = Key(i); std::string keystr = Key(i);
Slice k = keystr; Slice k = keystr;
Slice pfx = Slice(keystr.data(), prefix_to_use);
// Reseek when the prefix changes
if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
iter->Seek(k);
seek_key = keystr;
prefix = Slice(seek_key.data(), prefix_to_use);
}
Status s = iter->status(); Status s = iter->status();
if (iter->Valid()) { if (iter->Valid()) {
Slice iter_key = iter->key();
if (iter->key().compare(k) > 0) { if (iter->key().compare(k) > 0) {
s = Status::NotFound(Slice()); s = Status::NotFound(Slice());
} else if (iter->key().compare(k) == 0) { } else if (iter->key().compare(k) == 0) {
from_db = iter->value().ToString(); from_db = iter->value().ToString();
iter->Next(); iter->Next();
} else if (iter->key().compare(k) < 0) { } else if (iter_key.compare(k) < 0) {
VerificationAbort(shared, "An out of range key was found", VerificationAbort(shared, "An out of range key was found",
static_cast<int>(cf), i); static_cast<int>(cf), i);
} }
...@@ -265,19 +271,23 @@ class NonBatchedOpsStressTest : public StressTest { ...@@ -265,19 +271,23 @@ class NonBatchedOpsStressTest : public StressTest {
std::string upper_bound; std::string upper_bound;
Slice ub_slice; Slice ub_slice;
ReadOptions ro_copy = read_opts; ReadOptions ro_copy = read_opts;
if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) { // Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
// For half of the time, set the upper bound to the next prefix // For half of the time, set the upper bound to the next prefix
ub_slice = Slice(upper_bound); ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice; ro_copy.iterate_upper_bound = &ub_slice;
} }
Iterator* iter = db_->NewIterator(ro_copy, cfh); Iterator* iter = db_->NewIterator(ro_copy, cfh);
long count = 0; unsigned long count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) { iter->Next()) {
++count; ++count;
} }
assert(count <= (static_cast<long>(1) << ((8 - FLAGS_prefix_size) * 8)));
assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
Status s = iter->status(); Status s = iter->status();
if (iter->status().ok()) { if (iter->status().ok()) {
thread->stats.AddPrefixes(1, count); thread->stats.AddPrefixes(1, count);
......
...@@ -105,7 +105,9 @@ default_params = { ...@@ -105,7 +105,9 @@ default_params = {
"level_compaction_dynamic_level_bytes" : True, "level_compaction_dynamic_level_bytes" : True,
"verify_checksum_one_in": 1000000, "verify_checksum_one_in": 1000000,
"verify_db_one_in": 100000, "verify_db_one_in": 100000,
"continuous_verification_interval" : 0 "continuous_verification_interval" : 0,
"max_key_len": 3,
"key_len_percent_dist": "1,30,69"
} }
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR' _TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册