未验证 提交 280792e4 编写于 作者: W Wu Tao 提交者: GitHub

server: refactor pegasus server write procedure (#39)

上级 ce9b95fc
......@@ -206,7 +206,7 @@ function run_test()
done
if [ "$test_modules" == "" ]; then
test_modules="pegasus_rproxy_test pegasus_function_test pegasus_unit_test"
test_modules="pegasus_function_test pegasus_unit_test"
fi
./run.sh clear_onebox #clear the onebox before test
......
......@@ -11,6 +11,7 @@
#include <dsn/utility/blob.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/crc.h>
#include <dsn/c/api_utilities.h>
namespace pegasus {
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include <dsn/cpp/rpc_holder.h>
#include <rrdb/rrdb_types.h>
#include <rrdb/rrdb.client.h>
namespace pegasus {
using multi_put_rpc = dsn::rpc_holder<dsn::apps::multi_put_request, dsn::apps::update_response>;
using put_rpc = dsn::rpc_holder<dsn::apps::update_request, dsn::apps::update_response>;
using multi_remove_rpc =
dsn::rpc_holder<dsn::apps::multi_remove_request, dsn::apps::multi_remove_response>;
using remove_rpc = dsn::rpc_holder<dsn::blob, dsn::apps::update_response>;
} // namespace pegasus
......@@ -9,6 +9,8 @@
#include <cstring>
#include <boost/lexical_cast.hpp>
#include <dsn/tool-api/rpc_address.h>
#include <dsn/utility/string_view.h>
#include <rocksdb/slice.h>
namespace pegasus {
namespace utils {
......@@ -76,5 +78,10 @@ std::string c_escape_string(const T &src, bool always_escape = false)
// or (-n) if unescape failed, where n is the failure position.
// ----------------------------------------------------------------------
int c_unescape_string(const std::string &src, std::string &dest);
inline dsn::string_view to_string_view(rocksdb::Slice s) { return {s.data(), s.size()}; }
inline rocksdb::Slice to_rocksdb_slice(dsn::string_view s) { return {s.data(), s.size()}; }
}
} // namespace
......@@ -8,9 +8,11 @@
#include <string.h>
#include <string>
#include <vector>
#include <dsn/utility/ports.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/blob.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/endians.h>
#include <dsn/service_api_c.h>
#include <rocksdb/slice.h>
......@@ -18,95 +20,101 @@ namespace pegasus {
#define PEGASUS_VALUE_SCHEMA_MAX_VERSION 0
// =====================================================================================
// rocksdb value (version 1) = [expire_ts(uint32_t)] [user_data(bytes)]
// generate rocksdb value
// T may be std::string or ::dsn::blob or rocksdb::slice.
// 'user_data' will not be copied, so 'user_data' may be referenced by 'slices'.
// some data may be put in 'buf', so 'buf' may be referenced by 'slices'.
template <typename T>
void pegasus_generate_value(uint32_t version,
uint32_t expire_ts,
T &user_data,
std::string &buf,
std::vector<rocksdb::Slice> &slices)
/// Extracts expire_ts from rocksdb value with given version.
/// The value schema must be in v0.
/// \return expire_ts in host endian
inline uint32_t pegasus_extract_expire_ts(int version, dsn::string_view value)
{
if (version == 0) {
buf.resize(4);
slices.resize(2);
// expire_ts is in big endian
*((int32_t *)(&buf[0])) = htobe32((int32_t)expire_ts);
slices[0] = rocksdb::Slice(buf.data(), 4);
dassert(version <= PEGASUS_VALUE_SCHEMA_MAX_VERSION,
"value schema version(%d) must be <= %d",
version,
PEGASUS_VALUE_SCHEMA_MAX_VERSION);
if (user_data.length() > 0) {
slices[1] = rocksdb::Slice(user_data.data(), user_data.length());
} else {
slices.resize(1);
}
} else {
dassert(false, "unsupported version %" PRIu32, version);
}
return dsn::data_input(value).read_u32();
}
// extract expire timestamp from rocksdb value
// T may be std::string or ::dsn::blob or rocksdb::slice.
template <typename T>
inline uint32_t pegasus_extract_expire_ts(uint32_t version, const T &value)
/// Extracts user value from a raw rocksdb value.
/// In order to avoid data copy, the ownership of `raw_value` will be transferred
/// into `user_data`.
/// \param user_data: the result.
inline void pegasus_extract_user_data(int version, std::string &&raw_value, ::dsn::blob &user_data)
{
if (version == 0) {
dassert(value.length() >= 4, "value length must be no less than 4");
dassert(version <= PEGASUS_VALUE_SCHEMA_MAX_VERSION,
"value schema version(%d) must be <= %d",
version,
PEGASUS_VALUE_SCHEMA_MAX_VERSION);
// expire_ts is in big endian
uint32_t expire_ts = be32toh(*(int32_t *)(value.data()));
dsn::data_input input(raw_value);
input.skip(sizeof(uint32_t));
return expire_ts;
} else {
dassert(false, "unsupported version %" PRIu32, version);
return 0;
}
dsn::string_view view = input.read_str();
// tricky code to avoid memory copy
auto ptr = const_cast<char *>(view.data());
auto deleter = [s = new std::string(std::move(raw_value))](char *) { delete s; };
std::shared_ptr<char> buf(ptr, deleter);
user_data.assign(std::move(buf), 0, static_cast<unsigned int>(view.length()));
}
// extract data from rocksdb value.
// T may be std::string or ::dsn::blob or rocksdb::slice.
template <typename T>
inline void pegasus_extract_user_data(uint32_t version, const T &value, std::string &user_data)
/// \return true if expired
inline bool check_if_record_expired(uint32_t epoch_now, uint32_t expire_ts)
{
if (version == 0) {
dassert(value.length() >= 4, "value length must be no less than 4");
return expire_ts > 0 && expire_ts <= epoch_now;
}
if (value.length() > 4) {
user_data.assign(value.data() + 4, value.length() - 4);
} else {
user_data.clear();
}
} else {
dassert(false, "unsupported version %" PRIu32, version);
}
/// \return true if expired
inline bool check_if_record_expired(uint32_t value_schema_version,
uint32_t epoch_now,
dsn::string_view raw_value)
{
uint32_t expire_ts = pegasus_extract_expire_ts(value_schema_version, raw_value);
return check_if_record_expired(epoch_now, expire_ts);
}
// extract data from rocksdb value (special implementation to avoid memory copy).
// the ownership of 'value' is tranfered into this function.
inline void pegasus_extract_user_data(uint32_t version,
std::unique_ptr<std::string> value,
::dsn::blob &user_data)
/// Helper class for generating value.
/// NOTE that the instance of pegasus_value_generator must be alive
/// while the returned SliceParts is.
class pegasus_value_generator
{
if (version == 0) {
dassert(value->length() >= 4, "value length must be no less than 4");
if (value->length() > 4) {
// tricky code to avoid memory copy
char *ptr = &value->front();
unsigned int len = value->length();
std::shared_ptr<char> buf(ptr, [s = value.release()](char *) { delete s; });
user_data.assign(std::move(buf), 4, len - 4);
public:
/// A higher level utility for generating value with given version.
/// The value schema must be in v0.
rocksdb::SliceParts
generate_value(int value_schema_version, dsn::string_view user_data, uint32_t expire_ts)
{
if (value_schema_version == 0) {
return generate_value_v0(expire_ts, user_data);
} else {
user_data = ::dsn::blob();
dfatal("unsupported value schema version: %d", value_schema_version);
__builtin_unreachable();
}
} else {
dassert(false, "unsupported version %" PRIu32, version);
}
}
} // namespace
/// The heading expire_ts is encoded to support TTL, and the record will be
/// automatically cleared (by \see pegasus::server::KeyWithTTLCompactionFilter)
/// after expiration reached. The expired record will be invisible even though
/// they are not yet compacted.
///
/// rocksdb value (ver 0) = [expire_ts(uint32_t)] [user_data(bytes)]
/// \internal
rocksdb::SliceParts generate_value_v0(uint32_t expire_ts, dsn::string_view user_data)
{
_write_buf.resize(sizeof(uint32_t));
_write_slices.clear();
dsn::data_output(_write_buf).write_u32(expire_ts);
_write_slices.emplace_back(_write_buf.data(), _write_buf.size());
if (user_data.length() > 0) {
_write_slices.emplace_back(user_data.data(), user_data.length());
}
return rocksdb::SliceParts(&_write_slices[0], static_cast<int>(_write_slices.size()));
}
private:
std::string _write_buf;
std::vector<rocksdb::Slice> _write_slices;
};
} // namespace pegasus
......@@ -27,10 +27,8 @@ public:
{
if (!_enabled.load(std::memory_order_acquire))
return false;
uint32_t expire_ts = pegasus_extract_expire_ts(_value_schema_version, existing_value);
if (expire_ts == 0) // 0 means no expire
return false;
return expire_ts <= ::pegasus::utils::epoch_now();
return check_if_record_expired(
_value_schema_version, utils::epoch_now(), utils::to_string_view(existing_value));
}
virtual const char *Name() const override { return "KeyWithTTLCompactionFilter"; }
void SetValueSchemaVersion(uint32_t version) { _value_schema_version = version; }
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include <dsn/dist/fmt_logging.h>
/// Utilities for logging the operation on rocksdb.
#define derror_rocksdb(op, error, ...) \
derror_f("{}: rocksdb {} failed: error = {} [{}]", \
replica_name(), \
op, \
error, \
fmt::format(__VA_ARGS__))
#define ddebug_rocksdb(op, ...) \
ddebug_f("{}: rocksdb {}: [{}]", replica_name(), op, fmt::format(__VA_ARGS__))
#define dwarn_rocksdb(op, ...) \
dwarn_f("{}: rocksdb {}: [{}]", replica_name(), op, fmt::format(__VA_ARGS__))
此差异已折叠。
......@@ -15,10 +15,13 @@
#include "key_ttl_compaction_filter.h"
#include "pegasus_scan_context.h"
#include "pagasus_manual_compact_service.h"
#include "pegasus_write_service.h"
namespace pegasus {
namespace server {
class pegasus_server_write;
class pegasus_server_impl : public ::dsn::apps::rrdb_service
{
public:
......@@ -29,18 +32,10 @@ public:
register_rpc_handlers();
}
explicit pegasus_server_impl(dsn::replication::replica *r);
virtual ~pegasus_server_impl() {}
virtual ~pegasus_server_impl() override;
// the following methods may set physical error if internal error occurs
virtual void on_put(const ::dsn::apps::update_request &update,
::dsn::rpc_replier<::dsn::apps::update_response> &reply) override;
virtual void on_multi_put(const ::dsn::apps::multi_put_request &args,
::dsn::rpc_replier<::dsn::apps::update_response> &reply) override;
virtual void on_remove(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::update_response> &reply) override;
virtual void
on_multi_remove(const ::dsn::apps::multi_remove_request &args,
::dsn::rpc_replier<::dsn::apps::multi_remove_response> &reply) override;
virtual void on_get(const ::dsn::blob &key,
::dsn::rpc_replier<::dsn::apps::read_response> &reply) override;
virtual void on_multi_get(const ::dsn::apps::multi_get_request &args,
......@@ -71,6 +66,13 @@ public:
// - ERR_FILE_OPERATION_FAILED
virtual ::dsn::error_code stop(bool clear_state) override;
/// Each of the write request (specifically, the rpc that's configured as write, see
/// option `rpc_request_is_write_operation` in rDSN `task_spec`) will first be
/// replicated to the replicas through the underlying PacificA protocol in rDSN, and
/// after being committed, the mutation will be applied into rocksdb by this function.
///
/// \see dsn::replication::replication_app_base::apply_mutation
/// \inherit dsn::replication::replication_app_base
virtual int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn_message_t *requests,
......@@ -141,9 +143,16 @@ public:
virtual int64_t last_flushed_decree() const override { return _db->GetLastFlushedDecree(); }
inline bool check_if_record_expired(uint32_t epoch_now, rocksdb::Slice raw_value)
{
return pegasus::check_if_record_expired(
_value_schema_version, epoch_now, utils::to_string_view(raw_value));
}
private:
friend class pagasus_manual_compact_service;
friend class manual_compact_service_test;
friend class pegasus_write_service;
// parse checkpoint directories in the data dir
// checkpoint directory format is: "checkpoint.{decree}"
......@@ -236,13 +245,7 @@ private:
uint32_t _value_schema_version;
std::atomic<int64_t> _last_durable_decree;
rocksdb::WriteBatch _batch;
std::vector<::dsn::rpc_replier<::dsn::apps::update_response>> _batch_repliers;
std::vector<::dsn::perf_counter *> _batch_perfcounters;
std::string _write_buf;
std::vector<rocksdb::Slice> _write_slices;
int _physical_error;
std::unique_ptr<pegasus_server_write> _server_write;
uint32_t _checkpoint_reserve_min_count;
uint32_t _checkpoint_reserve_time_seconds;
......@@ -263,18 +266,10 @@ private:
::dsn::perf_counter_wrapper _pfc_get_qps;
::dsn::perf_counter_wrapper _pfc_multi_get_qps;
::dsn::perf_counter_wrapper _pfc_scan_qps;
::dsn::perf_counter_wrapper _pfc_put_qps;
::dsn::perf_counter_wrapper _pfc_multi_put_qps;
::dsn::perf_counter_wrapper _pfc_remove_qps;
::dsn::perf_counter_wrapper _pfc_multi_remove_qps;
::dsn::perf_counter_wrapper _pfc_get_latency;
::dsn::perf_counter_wrapper _pfc_multi_get_latency;
::dsn::perf_counter_wrapper _pfc_scan_latency;
::dsn::perf_counter_wrapper _pfc_put_latency;
::dsn::perf_counter_wrapper _pfc_multi_put_latency;
::dsn::perf_counter_wrapper _pfc_remove_latency;
::dsn::perf_counter_wrapper _pfc_multi_remove_latency;
::dsn::perf_counter_wrapper _pfc_recent_expire_count;
::dsn::perf_counter_wrapper _pfc_recent_filter_count;
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include <dsn/cpp/message_utils.h>
#include "base/pegasus_utils.h"
#include "base/pegasus_key_schema.h"
#include "pegasus_server_write.h"
#include "pegasus_server_impl.h"
#include "logging_utils.h"
namespace pegasus {
namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log)
: replica_base(*server), _verbose_log(verbose_log)
{
_write_svc = dsn::make_unique<pegasus_write_service>(server);
}
int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
int count,
int64_t decree,
uint64_t timestamp)
{
_decree = decree;
// Write down empty record (RPC_REPLICATION_WRITE_EMPTY) to update
// rocksdb's `last_flushed_decree` (see rocksdb::DB::GetLastFlushedDecree())
// TODO(wutao1): remove it when shared log is removed.
if (count == 0) {
return _write_svc->empty_put(decree);
}
dsn::task_code rpc_code(dsn_msg_task_code(requests[0]));
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dassert(count == 1, "");
auto rpc = multi_put_rpc::auto_reply(requests[0]);
on_multi_put(rpc);
return rpc.response().error;
}
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dassert(count == 1, "");
auto rpc = multi_remove_rpc::auto_reply(requests[0]);
on_multi_remove(rpc);
return rpc.response().error;
}
return on_batched_writes(requests, count, decree);
}
int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count, int64_t decree)
{
int err;
{
_write_svc->batch_prepare();
for (int i = 0; i < count; ++i) {
dassert(requests[i] != nullptr, "");
dsn::task_code rpc_code(dsn_msg_task_code(requests[i]));
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT ||
rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
dfatal("rpc code not allow batch: %s", rpc_code.to_string());
} else {
dfatal("rpc code not handled: %s", rpc_code.to_string());
}
}
}
err = _write_svc->batch_commit(decree);
}
// reply the batched RPCs
_put_rpc_batch.clear();
_remove_rpc_batch.clear();
return err;
}
void pegasus_server_write::request_key_check(int64_t decree, dsn_message_t m, const dsn::blob &key)
{
auto msg = (dsn::message_ex *)m;
if (msg->header->client.partition_hash != 0) {
uint64_t partition_hash = pegasus_key_hash(key);
dassert(msg->header->client.partition_hash == partition_hash,
"inconsistent partition hash");
int thread_hash = get_gpid().thread_hash();
dassert(msg->header->client.thread_hash == thread_hash, "inconsistent thread hash");
}
if (_verbose_log) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
ddebug_rocksdb("write",
"decree={}, code={}, hash_key={}, sort_key={}",
decree,
msg->local_rpc_code.to_string(),
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
}
}
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include <dsn/dist/replication/replica_base.h>
#include "base/pegasus_rpc_types.h"
#include "pegasus_write_service.h"
namespace pegasus {
namespace server {
/// This class implements the interface of `pegasus_sever_impl::on_batched_write_requests`.
class pegasus_server_write : public dsn::replication::replica_base
{
public:
pegasus_server_write(pegasus_server_impl *server, bool verbose_log);
int on_batched_write_requests(dsn_message_t *requests,
int count,
int64_t decree,
uint64_t timestamp);
private:
void on_multi_put(multi_put_rpc &rpc)
{
_write_svc->multi_put(_decree, rpc.request(), rpc.response());
}
void on_multi_remove(multi_remove_rpc &rpc)
{
_write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}
/// Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn_message_t *requests, int count, int64_t decree);
void on_single_put_in_batch(put_rpc &rpc)
{
_write_svc->batch_put(rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request().key);
}
void on_single_remove_in_batch(remove_rpc &rpc)
{
_write_svc->batch_remove(rpc.request(), rpc.response());
request_key_check(_decree, rpc.dsn_request(), rpc.request());
}
// Ensure that the write request is directed to the right partition.
// In verbose mode it will log for every request.
void request_key_check(int64_t decree, dsn_message_t m, const dsn::blob &key);
private:
friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
std::unique_ptr<pegasus_write_service> _write_svc;
std::vector<put_rpc> _put_rpc_batch;
std::vector<remove_rpc> _remove_rpc_batch;
int64_t _decree;
const bool _verbose_log;
};
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "pegasus_write_service.h"
#include "pegasus_write_service_impl.h"
namespace pegasus {
namespace server {
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
: _impl(new impl(server)), _batch_start_time(0)
{
std::string str_gpid = fmt::format("{}", server->get_gpid());
std::string name;
name = fmt::format("put_qps@{}", str_gpid);
_pfc_put_qps.init_app_counter(
"app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request");
name = fmt::format("multi_put_qps@{}", str_gpid);
_pfc_multi_put_qps.init_app_counter(
"app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request");
name = fmt::format("remove_qps@{}", str_gpid);
_pfc_remove_qps.init_app_counter(
"app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request");
name = fmt::format("multi_remove_qps@{}", str_gpid);
_pfc_multi_remove_qps.init_app_counter("app.pegasus",
name.c_str(),
COUNTER_TYPE_RATE,
"statistic the qps of MULTI_REMOVE request");
name = fmt::format("put_latency@{}", str_gpid);
_pfc_put_latency.init_app_counter("app.pegasus",
name.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of PUT request");
name = fmt::format("multi_put_latency@{}", str_gpid);
_pfc_multi_put_latency.init_app_counter("app.pegasus",
name.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of MULTI_PUT request");
name = fmt::format("remove_latency@{}", str_gpid);
_pfc_remove_latency.init_app_counter("app.pegasus",
name.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of REMOVE request");
name = fmt::format("multi_remove_latency@{}", str_gpid);
_pfc_multi_remove_latency.init_app_counter("app.pegasus",
name.c_str(),
COUNTER_TYPE_NUMBER_PERCENTILES,
"statistic the latency of MULTI_REMOVE request");
}
pegasus_write_service::~pegasus_write_service() = default;
void pegasus_write_service::multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
{
uint64_t start_time = dsn_now_ns();
_pfc_multi_put_qps->increment();
_impl->multi_put(decree, update, resp);
_pfc_multi_put_latency->set(dsn_now_ns() - start_time);
}
void pegasus_write_service::multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
{
uint64_t start_time = dsn_now_ns();
_pfc_multi_remove_qps->increment();
_impl->multi_remove(decree, update, resp);
_pfc_multi_remove_latency->set(dsn_now_ns() - start_time);
}
void pegasus_write_service::batch_put(const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
_pfc_put_qps->increment();
_batch_perfcounters.push_back(_pfc_put_latency.get());
_impl->batch_put(update, resp);
}
void pegasus_write_service::batch_remove(const dsn::blob &key, dsn::apps::update_response &resp)
{
_pfc_remove_qps->increment();
_batch_perfcounters.push_back(_pfc_remove_latency.get());
_impl->batch_remove(key, resp);
}
int pegasus_write_service::batch_commit(int64_t decree)
{
dassert(_batch_start_time != 0, "batch_commit and batch_prepare must be called in pair");
int ret = _impl->batch_commit(decree);
uint64_t latency = dsn_now_ns() - _batch_start_time;
for (dsn::perf_counter *pfc : _batch_perfcounters) {
pfc->set(latency);
}
_batch_perfcounters.clear();
_batch_start_time = 0;
return ret;
}
void pegasus_write_service::batch_prepare()
{
dassert(_batch_start_time == 0, "batch_commit and batch_prepare must be called in pair");
_batch_start_time = dsn_now_ns();
}
int pegasus_write_service::empty_put(int64_t decree)
{
std::string empty_key, empty_value;
_impl->db_write_batch_put(empty_key, empty_value, 0);
return _impl->db_write(decree);
}
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include <dsn/cpp/perf_counter_wrapper.h>
#include "base/pegasus_value_schema.h"
#include "base/pegasus_utils.h"
#include "rrdb/rrdb_types.h"
namespace pegasus {
namespace server {
class pegasus_server_impl;
/// Handle the write requests.
/// As the signatures imply, this class is not responsible for replying the rpc,
/// the caller(pegasus_server_write) should do.
/// \see pegasus::server::pegasus_server_write::on_batched_write_requests
class pegasus_write_service
{
public:
explicit pegasus_write_service(pegasus_server_impl *server);
~pegasus_write_service();
void multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp);
void multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp);
/// Prepare for batch write.
void batch_prepare();
// NOTE: A batch write may incur a database read for consistency check of timetag.
// (see pegasus::pegasus_value_generator::generate_value_v1 for more info about timetag)
// To disable the consistency check, unset `verify_timetag` under `pegasus.server` section
// in configuration.
/// NOTE that `resp` should not be moved or freed while
/// the batch is not committed.
void batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp);
void batch_remove(const dsn::blob &key, dsn::apps::update_response &resp);
/// \returns 0 if success, non-0 if failure.
/// If the batch contains no updates, 0 is returned.
int batch_commit(int64_t decree);
/// Write empty record.
/// See this document (https://github.com/XiaoMi/pegasus/wiki/last_flushed_decree)
/// to know why we must have empty write.
int empty_put(int64_t decree);
private:
friend class pegasus_write_service_test;
class impl;
std::unique_ptr<impl> _impl;
uint64_t _batch_start_time;
::dsn::perf_counter_wrapper _pfc_put_qps;
::dsn::perf_counter_wrapper _pfc_multi_put_qps;
::dsn::perf_counter_wrapper _pfc_remove_qps;
::dsn::perf_counter_wrapper _pfc_multi_remove_qps;
::dsn::perf_counter_wrapper _pfc_put_latency;
::dsn::perf_counter_wrapper _pfc_multi_put_latency;
::dsn::perf_counter_wrapper _pfc_remove_latency;
::dsn::perf_counter_wrapper _pfc_multi_remove_latency;
std::vector<::dsn::perf_counter *> _batch_perfcounters;
};
} // namespace server
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include "pegasus_write_service.h"
#include "pegasus_server_impl.h"
#include "logging_utils.h"
#include "base/pegasus_key_schema.h"
namespace pegasus {
namespace server {
static inline dsn::blob composite_raw_key(dsn::string_view hash_key, dsn::string_view sort_key)
{
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, sort_key);
return raw_key;
}
class pegasus_write_service::impl : public dsn::replication::replica_base
{
public:
explicit impl(pegasus_server_impl *server)
: replica_base(*server),
_primary_address(server->_primary_address),
_value_schema_version(server->_value_schema_version),
_db(server->_db),
_wt_opts(&server->_wt_opts),
_rd_opts(&server->_rd_opts)
{
}
void multi_put(int64_t decree,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_address;
if (update.kvs.empty()) {
// invalid argument
derror_replica("invalid argument for multi_put: decree = {}, error = empty kvs",
decree);
// an invalid operation shouldn't be added to latency calculation
resp.error = rocksdb::Status::kInvalidArgument;
return;
}
for (auto &kv : update.kvs) {
resp.error = db_write_batch_put(composite_raw_key(update.hash_key, kv.key),
kv.value,
static_cast<uint32_t>(update.expire_ts_seconds));
if (resp.error != 0) {
return;
}
}
resp.error = db_write(decree);
}
void multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_address;
if (update.sort_keys.empty()) {
// invalid argument
derror_replica(
"invalid argument for multi_remove: decree = {}, error = empty sort keys", decree);
// an invalid operation shouldn't be added to latency calculation
resp.error = rocksdb::Status::kInvalidArgument;
resp.count = 0;
return;
}
for (auto &sort_key : update.sort_keys) {
// TODO(wutao1): check returned error
db_write_batch_delete(composite_raw_key(update.hash_key, sort_key));
}
resp.error = db_write(decree);
if (resp.error != 0) {
resp.count = 0;
} else {
resp.count = update.sort_keys.size();
}
}
inline void batch_put(const dsn::apps::update_request &update, dsn::apps::update_response &resp)
{
resp.error = db_write_batch_put(
update.key, update.value, static_cast<uint32_t>(update.expire_ts_seconds));
_update_responses.emplace_back(&resp);
}
inline void batch_remove(const dsn::blob &key, dsn::apps::update_response &resp)
{
resp.error = db_write_batch_delete(key);
_update_responses.emplace_back(&resp);
}
int batch_commit(int64_t decree)
{
int err = db_write(decree);
dsn::apps::update_response resp;
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_address;
for (dsn::apps::update_response *uresp : _update_responses) {
*uresp = resp;
}
_update_responses.clear();
return err;
}
int db_write_batch_put(dsn::string_view raw_key, dsn::string_view value, uint32_t expire_sec)
{
rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue =
_value_generator.generate_value(_value_schema_version, value, expire_sec);
_batch.Put(skey_parts, svalue);
return 0;
}
int db_write_batch_delete(dsn::string_view raw_key)
{
_batch.Delete(utils::to_rocksdb_slice(raw_key));
return 0;
}
// Apply the write batch into rocksdb.
int db_write(int64_t decree)
{
if (_batch.Count() == 0) {
return 0;
}
_wt_opts->given_decree = static_cast<uint64_t>(decree);
auto status = _db->Write(*_wt_opts, &_batch);
if (!status.ok()) {
derror_rocksdb("write", status.ToString(), "decree: {}", decree);
}
_batch.Clear();
return status.code();
}
private:
friend class pegasus_write_service_test;
const std::string _primary_address;
const int _value_schema_version;
rocksdb::WriteBatch _batch;
rocksdb::DB *_db;
rocksdb::WriteOptions *_wt_opts;
const rocksdb::ReadOptions *_rd_opts;
pegasus_value_generator _value_generator;
// for setting update_response.error after committed.
std::vector<dsn::apps::update_response *> _update_responses;
};
} // namespace server
} // namespace pegasus
......@@ -5,6 +5,8 @@ set(MY_PROJ_SRC "../pegasus_server_impl.cpp"
"../pegasus_counter_updater.cpp"
"../pagasus_manual_compact_service.cpp"
"../pegasus_event_listener.cpp"
"../pegasus_write_service.cpp"
"../pegasus_server_write.cpp"
)
set(MY_SRC_SEARCH_MODE "GLOB")
......
......@@ -11,17 +11,14 @@
std::atomic_bool gtest_done{false};
class gtest_app : public ::dsn::replication::replication_service_app
class gtest_app : public dsn::service_app
{
public:
explicit gtest_app(const dsn::service_app_info *info)
: dsn::replication::replication_service_app::replication_service_app(info)
{
}
explicit gtest_app(const dsn::service_app_info *info) : dsn::service_app(info) {}
dsn::error_code start(const std::vector<std::string> &args) override
{
dsn::replication::replication_service_app::start(args);
dsn::service_app::start(args);
RUN_ALL_TESTS();
gtest_done = true;
return dsn::ERR_OK;
......
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#pragma once
#include "rrdb/rrdb_types.h"
#include "rrdb/rrdb.code.definition.h"
#include <dsn/cpp/message_utils.h>
namespace pegasus {
inline dsn_message_t create_multi_put_request(const dsn::apps::multi_put_request &request)
{
return dsn::from_thrift_request_to_received_message(request,
dsn::apps::RPC_RRDB_RRDB_MULTI_PUT);
}
inline dsn_message_t create_multi_remove_request(const dsn::apps::multi_remove_request &request)
{
return dsn::from_thrift_request_to_received_message(request,
dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE);
}
inline dsn_message_t create_put_request(const dsn::apps::update_request &request)
{
return dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT);
}
inline dsn_message_t create_remove_request(const dsn::blob &key)
{
return dsn_msg_create_received_request(
dsn::apps::RPC_RRDB_RRDB_REMOVE, DSF_THRIFT_BINARY, (void *)key.data(), key.length());
}
} // namespace pegasus
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "base/pegasus_value_schema.h"
#include <gtest/gtest.h>
using namespace pegasus;
TEST(value_schema, generate_and_extract_v1_v0)
{
struct test_case
{
int value_schema_version;
uint32_t expire_ts;
std::string user_data;
} tests[] = {
{0, 1000, ""},
{0, std::numeric_limits<uint32_t>::max(), "pegasus"},
{0, std::numeric_limits<uint32_t>::max(), ""},
};
for (auto &t : tests) {
pegasus_value_generator gen;
rocksdb::SliceParts sparts =
gen.generate_value(t.value_schema_version, t.user_data, t.expire_ts);
std::string raw_value;
for (int i = 0; i < sparts.num_parts; i++) {
raw_value += sparts.parts[i].ToString();
}
ASSERT_EQ(t.expire_ts, pegasus_extract_expire_ts(t.value_schema_version, raw_value));
dsn::blob user_data;
pegasus_extract_user_data(t.value_schema_version, std::move(raw_value), user_data);
ASSERT_EQ(t.user_data, user_data.to_string());
}
}
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include "base/pegasus_key_schema.h"
#include "pegasus_server_test_base.h"
#include "server/pegasus_server_write.h"
#include "server/pegasus_write_service_impl.h"
namespace pegasus {
namespace server {
class pegasus_write_service_test : public pegasus_server_test_base
{
protected:
pegasus_write_service *_write_svc;
std::unique_ptr<pegasus_server_write> _server_write;
public:
pegasus_write_service_test() : pegasus_server_test_base()
{
_server_write = dsn::make_unique<pegasus_server_write>(_server.get(), true);
_write_svc = _server_write->_write_svc.get();
}
void test_multi_put()
{
dsn::apps::multi_put_request request;
dsn::apps::update_response response;
int64_t decree = 10;
std::string hash_key = "hash_key";
// alarm for empty request
request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size());
_write_svc->multi_put(decree, request, response);
ASSERT_EQ(response.error, rocksdb::Status::kInvalidArgument);
constexpr int kv_num = 100;
std::string sort_key[kv_num];
std::string value[kv_num];
for (int i = 0; i < 100; i++) {
sort_key[i] = "sort_key_" + std::to_string(i);
value[i] = "value_" + std::to_string(i);
}
for (int i = 0; i < 100; i++) {
request.kvs.emplace_back();
request.kvs.back().key.assign(sort_key[i].data(), 0, sort_key[i].size());
request.kvs.back().value.assign(value[i].data(), 0, value[i].size());
}
_write_svc->multi_put(decree, request, response);
ASSERT_EQ(response.error, 0);
ASSERT_EQ(response.app_id, _gpid.get_app_id());
ASSERT_EQ(response.partition_index, _gpid.get_partition_index());
ASSERT_EQ(response.decree, decree);
}
void test_multi_remove()
{
dsn::apps::multi_remove_request request;
dsn::apps::multi_remove_response response;
int64_t decree = 10;
std::string hash_key = "hash_key";
// alarm for empty request
request.hash_key = dsn::blob(hash_key.data(), 0, hash_key.size());
_write_svc->multi_remove(decree, request, response);
ASSERT_EQ(response.error, rocksdb::Status::kInvalidArgument);
constexpr int kv_num = 100;
std::string sort_key[kv_num];
for (int i = 0; i < kv_num; i++) {
sort_key[i] = "sort_key_" + std::to_string(i);
}
for (int i = 0; i < kv_num; i++) {
request.sort_keys.emplace_back();
request.sort_keys.back().assign(sort_key[i].data(), 0, sort_key[i].size());
}
_write_svc->multi_remove(decree, request, response);
ASSERT_EQ(response.error, 0);
ASSERT_EQ(response.app_id, _gpid.get_app_id());
ASSERT_EQ(response.partition_index, _gpid.get_partition_index());
ASSERT_EQ(response.decree, decree);
}
void test_batched_writes()
{
int64_t decree = 10;
std::string hash_key = "hash_key";
constexpr int kv_num = 100;
dsn::blob key[kv_num];
std::string value[kv_num];
for (int i = 0; i < kv_num; i++) {
std::string sort_key = "sort_key_" + std::to_string(i);
pegasus::pegasus_generate_key(key[i], hash_key, sort_key);
value[i] = "value_" + std::to_string(i);
}
// It's dangerous to use std::vector<> here, since the address
// of response may be changed due to capacity increase.
std::array<dsn::apps::update_response, kv_num> responses;
{
_write_svc->batch_prepare();
for (int i = 0; i < kv_num; i++) {
dsn::apps::update_request req;
req.key = key[i];
_write_svc->batch_put(req, responses[i]);
}
for (int i = 0; i < kv_num; i++) {
_write_svc->batch_remove(key[i], responses[i]);
}
_write_svc->batch_commit(decree);
}
for (const dsn::apps::update_response &resp : responses) {
ASSERT_EQ(resp.error, 0);
ASSERT_EQ(resp.app_id, _gpid.get_app_id());
ASSERT_EQ(resp.partition_index, _gpid.get_partition_index());
ASSERT_EQ(resp.decree, decree);
}
}
};
TEST_F(pegasus_write_service_test, multi_put) { test_multi_put(); }
TEST_F(pegasus_write_service_test, multi_remove) { test_multi_remove(); }
TEST_F(pegasus_write_service_test, batched_writes) { test_batched_writes(); }
} // namespace server
} // namespace pegasus
......@@ -2424,8 +2424,8 @@ inline bool local_get(command_executor *e, shell_context *sc, arguments args)
fprintf(stderr, "ERROR: get failed: %s\n", status.ToString().c_str());
} else {
uint32_t expire_ts = pegasus::pegasus_extract_expire_ts(0, value);
std::string user_data;
pegasus::pegasus_extract_user_data(0, value, user_data);
dsn::blob user_data;
pegasus::pegasus_extract_user_data(0, std::move(value), user_data);
fprintf(stderr,
"%u : \"%s\"\n",
expire_ts,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册