提交 8f1ea8ae 编写于 作者: L leipeng

Add tools/multi_get.cc

上级 e437c952
......@@ -1768,6 +1768,9 @@ ldb: tools/ldb.o $(LIBOBJECTS) ${LIBNAME}.so
kvpipe: tools/kvpipe.o ${SHARED1}
$(AM_LINK)
multi_get: tools/multi_get.o ${SHARED1}
$(AM_LINK)
iostats_context_test: monitoring/iostats_context_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(TerarkLDFLAGS)
......
......@@ -1387,8 +1387,10 @@ std::vector<Status> DBImpl::MultiGet(
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
size_t num_found = 0;
#if !defined(NDEBUG)
size_t counting = num_keys;
auto get_one = [&](int i) {
#endif
auto get_one = [&](size_t i) {
// Contain a list of merge operations if merge occurs.
MergeContext merge_context;
Status& s = stat_list[i];
......@@ -1430,17 +1432,26 @@ std::vector<Status> DBImpl::MultiGet(
bytes_read += value->size();
num_found++;
}
#if !defined(NDEBUG)
counting--;
#endif
};
static thread_local terark::RunOnceFiberPool fiber_pool(16);
// current calling fiber's list head, can be treated as a handle
int myhead = -1; // must be initialized to -1
for (int i = 0; i < (int)num_keys; ++i) {
fiber_pool.submit(myhead, get_one, i);
if (read_options.use_fiber) {
static thread_local terark::RunOnceFiberPool fiber_pool(16);
// current calling fiber's list head, can be treated as a handle
int myhead = -1; // must be initialized to -1
for (size_t i = 0; i < num_keys; ++i) {
fiber_pool.submit(myhead, get_one, i);
}
fiber_pool.reap(myhead);
assert(0 == counting);
}
else {
for (size_t i = 0; i < num_keys; ++i) {
get_one(i);
}
}
fiber_pool.reap(myhead);
assert(0 == counting);
// Post processing (decrement reference counts and record statistics)
PERF_TIMER_GUARD(get_post_process_time);
......
......@@ -1170,6 +1170,9 @@ struct ReadOptions {
// Default: false
bool ignore_range_deletions;
// now only used by MultiGet
bool use_fiber;
// A callback to determine whether relevant keys for this scan exist in a
// given table based on the table's properties. The callback is passed the
// properties of each table during iteration. If the callback returns false,
......
......@@ -563,6 +563,7 @@ ReadOptions::ReadOptions()
pin_data(false),
background_purge_on_iterator_cleanup(false),
ignore_range_deletions(false),
use_fiber(true),
iter_start_seqnum(0) {}
ReadOptions::ReadOptions(bool cksum, bool cache)
......@@ -581,6 +582,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache)
pin_data(false),
background_purge_on_iterator_cleanup(false),
ignore_range_deletions(false),
use_fiber(true),
iter_start_seqnum(0) {}
} // namespace rocksdb
......@@ -5,6 +5,7 @@ set(TOOLS
write_stress.cc
ldb.cc
kvpipe.cc
multi_get.cc
db_repl_stress.cc
dump/rocksdb_dump.cc
dump/rocksdb_undump.cc)
......
//
// Created by leipeng on 2019-08-09.
//
#include <rocksdb/db.h>
#include <stdio.h>
#include <getopt.h>
#include <terark/util/linebuf.hpp>
#include <terark/util/profiling.hpp>
static void usage(const char* prog) {
fprintf(stderr, R"EOS(usage: %s
-m number per multi_get
-f use_fiber for multi_get
-q
be quite
)EOS"
, prog);
}
inline void chomp(std::string& s) {
while (!s.empty()) {
const char c = s.back();
if ('\n' == c || '\r' == c) {
s.pop_back();
} else {
break;
}
}
}
int main(int argc, char* argv[]) {
size_t mget_num = 128;
size_t bench_report = 0;
size_t cnt1 = 0;
rocksdb::Options dopt;
rocksdb::ReadOptions ropt;
dopt.use_aio_reads = true;
dopt.use_direct_reads = true;
ropt.use_fiber = true;
bool quite = false;
for (int opt=0; -1 != opt && '?' != opt;) {
opt = getopt(argc, argv, "a:b:d:f:m:q");
switch (opt) {
default:
usage(argv[0]);
return 1;
case -1:
goto GetoptDone;
case 'a':
dopt.use_aio_reads = atoi(optarg) != 0;
break;
case 'b':
bench_report = atoi(optarg);
break;
case 'd':
dopt.use_direct_reads = atoi(optarg) != 0;
break;
case 'f':
ropt.use_fiber = atoi(optarg) != 0;
break;
case 'm':
mget_num = atoi(optarg);
break;
case 'q':
quite = true;
break;
}
}
GetoptDone:
if (optind >= argc) {
usage(argv[0]);
return 1;
}
rocksdb::DB* db = nullptr;
std::string path = argv[optind];
rocksdb::Status s = rocksdb::DB::OpenForReadOnly(dopt, path, &db);
if (!s.ok()) {
fprintf(stderr, "ERROR: Open(%s) = %s\n", path.c_str(), s.ToString().c_str());
return 1;
}
using namespace terark;
profiling pf;
std::vector<rocksdb::Slice> keys;
std::vector<std::string> values;
std::string keystore;
LineBuf line;
long long t0 = pf.now();
do {
keys.resize(0);
for (size_t i = 0; i < mget_num && line.getline(stdin) > 0; ++i) {
line.chomp();
keys.emplace_back(line.p, line.n);
}
auto sv = db->MultiGet(ropt, keys, &values);
if (bench_report) {
if (++cnt1 == bench_report) {
auto t1 = pf.now();
fprintf(stderr,
"mget(use_fiber=%d,direct_io=%d,aio=%d) qps = %f M/sec\n",
ropt.use_fiber,
dopt.use_aio_reads,
dopt.use_aio_reads,
cnt1/pf.uf(t0,t1));
t0 = t1;
cnt1 = 0;
}
}
if (!quite) {
for (size_t i = 0; i < keys.size(); ++i) {
if (sv[i].ok()) {
chomp(values[i]);
printf("OK\t%s\n", values[i].c_str());
} else {
printf("%s\t%s\n", sv[i].ToString().c_str(), values[i].c_str());
}
}
}
} while (keys.size() == mget_num);
delete db;
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册