未验证 提交 a70eb218 编写于 作者: W WeijieSun 提交者: GitHub

redis_proxy: several bug fix & some refactor (#105)

* redis_proxy: several bug fix & some refactor

1. fix the thread safety problem when socket is closed
2. when parse error, close the socket instead of abort
3. make the proxy_session & redis_parser's semantic more clear
4. make some rpc codes inline by declare it in source code
上级 416f79ae
......@@ -10,7 +10,7 @@ name = proxy
type = proxy
arguments = dsn://redis_cluster/temp
ports = 6379
pools = THREAD_POOL_DEFAULT, THREAD_POOL_PROXY_SERVER
pools = THREAD_POOL_DEFAULT
run = true
[apps.mimic]
......@@ -26,7 +26,7 @@ delay_seconds = 30
;tool = simulator
;tool = fastrun
tool = nativerun
toollets = tracer
toollets = profiler
;toollets = tracer, profiler, fault_injector
pause_on_start = false
cli_local = false
......@@ -98,22 +98,6 @@ allow_inline = false
is_trace = false
allow_inline = false
[task.RPC_CALL_RAW_SESSION_DISCONNECT]
is_trace = false
allow_inline = true
[task.RPC_CALL_RAW_MESSAGE]
is_trace = false
allow_inline = true
[task.RPC_RRDB_RRDB_PUT_ACK]
is_trace = false
allow_inline = true
[task.RPC_RRDB_RRDB_GET_ACK]
is_trace = false
allow_inline = true
[uri-resolver.dsn://redis_cluster]
factory = partition_resolver_simple
arguments = localhost:34601
......@@ -23,8 +23,8 @@ public:
{
if (args.size() < 2)
return ::dsn::ERR_INVALID_PARAMETERS;
proxy_session::factory f = [](proxy_stub *p, ::dsn::rpc_address remote) {
return std::make_shared<redis_parser>(p, remote);
proxy_session::factory f = [](proxy_stub *p, dsn_message_t m) {
return std::make_shared<redis_parser>(p, m);
};
_proxy.reset(new proxy_stub(f, args[1].c_str()));
return ::dsn::ERR_OK;
......
......@@ -2,7 +2,10 @@
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
#include <dsn/tool-api/task_spec.h>
#include <dsn/tool-api/uri_address.h>
#include <rrdb/rrdb.code.definition.h>
#include "proxy_layer.h"
namespace pegasus {
......@@ -11,6 +14,21 @@ namespace proxy {
proxy_stub::proxy_stub(const proxy_session::factory &factory, const char *uri)
: serverlet<proxy_stub>("proxy_stub"), _factory(factory)
{
dsn::task_spec::get(RPC_CALL_RAW_MESSAGE)->allow_inline = true;
dsn::task_spec::get(RPC_CALL_RAW_SESSION_DISCONNECT)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_PUT_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_MULTI_PUT_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_REMOVE_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_GET_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_MULTI_GET_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_SORTKEY_COUNT_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_TTL_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_GET_SCANNER_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_SCAN_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER_ACK)->allow_inline = true;
_uri_address.assign_uri(uri);
open_service();
}
......@@ -34,74 +52,79 @@ void proxy_stub::on_rpc_request(dsn_message_t request)
if (it != _sessions.end()) {
ps = it->second;
} else {
ps = _factory(this, source);
ps = _factory(this, request);
_sessions.emplace(source, ps);
}
}
// release in proxy_session
dsn_msg_add_ref(request);
ps->on_recv_request(ps, request);
ps->on_recv_request(request);
}
void proxy_stub::on_recv_remove_session_request(dsn_message_t request)
{
::dsn::rpc_address source = dsn_msg_from_address(request);
std::shared_ptr<proxy_session> ps;
{
::dsn::service::zauto_write_lock l(_lock);
auto iter = _sessions.find(source);
if (iter == _sessions.end())
return;
ps = iter->second;
_sessions.erase(iter);
std::shared_ptr<proxy_session> ps = remove_session(source);
if (ps != nullptr) {
ps->on_remove_session();
}
ddebug("proxy session %s removed", source.to_string());
::dsn::tasking::enqueue(LPC_RPC_CALL_RAW_SCATTER,
nullptr,
std::bind(&proxy_session::on_remove_session, ps.get(), ps),
ps->hash());
}
proxy_session::proxy_session(proxy_stub *op, ::dsn::rpc_address raddr)
: stub(op),
backup_one_request(nullptr),
remote_address(raddr),
hash_code(std::hash<::dsn::rpc_address>()(remote_address))
std::shared_ptr<proxy_session> proxy_stub::remove_session(dsn::rpc_address remote_address)
{
dassert(
raddr.type() == HOST_TYPE_IPV4, "invalid rpc_address type, type = %d", (int)raddr.type());
::dsn::service::zauto_write_lock l(_lock);
auto iter = _sessions.find(remote_address);
if (iter == _sessions.end()) {
dwarn("%s has been removed from proxy stub", remote_address.to_string());
return nullptr;
}
ddebug("remove %s from proxy stub", remote_address.to_string());
std::shared_ptr<proxy_session> result = std::move(iter->second);
_sessions.erase(iter);
return result;
}
proxy_session::~proxy_session()
proxy_session::proxy_session(proxy_stub *op, dsn_message_t first_msg)
: stub(op), is_session_reset(false), backup_one_request(first_msg)
{
if (backup_one_request) {
dsn_msg_release_ref(backup_one_request);
}
dinfo("proxy session %s destroyed", remote_address.to_string());
dassert(first_msg != nullptr, "null msg when create session");
dsn_msg_add_ref(backup_one_request);
remote_address = dsn_msg_from_address(backup_one_request);
dassert(remote_address.type() == HOST_TYPE_IPV4,
"invalid rpc_address type, type = %d",
(int)remote_address.type());
}
void proxy_session::on_recv_request(std::shared_ptr<proxy_session> _this, dsn_message_t msg)
proxy_session::~proxy_session()
{
::dsn::service::zauto_lock l(_lock);
if (backup_one_request == nullptr) {
backup_one_request = msg;
dsn_msg_add_ref(msg); // release in service session's dtor
}
dsn_msg_release_ref(backup_one_request);
ddebug("proxy session %s destroyed", remote_address.to_string());
}
void proxy_session::on_recv_request(dsn_message_t msg)
{
// NOTICE:
// 1. in the implementation of "parse", the msg may add_ref & release_ref.
// so if the ref_count of msg is 0 before call "parse", the msg may be released already
// after "parse" returns. so please take care when you want to
// use "msg" after call "parse"
//
// 2. as "on_recv_request" won't be called concurrently, it's not necessary to call
// "parse" with a lock. a subclass may implement a lock inside parse if necessary
if (!parse(msg)) {
::dsn::rpc_address from = dsn_msg_from_address(msg);
derror("got invalid message from remote address %s", from.to_string());
derror("%s: got invalid message, try to remove proxy session from proxy stub",
remote_address.to_string());
stub->remove_session(remote_address);
// TODO: notify rpc engine to reset the socket, currently let's take the easiest way :)
dassert(false, "got invalid message");
derror("close the rpc session %s", remote_address.to_string());
((dsn::message_ex *)backup_one_request)->io_session->close();
}
}
void proxy_session::on_remove_session() { is_session_reset.store(true); }
dsn_message_t proxy_session::create_response()
{
if (backup_one_request == nullptr)
return nullptr;
return dsn_msg_create_response(backup_one_request);
}
}
......
......@@ -17,40 +17,41 @@ DEFINE_TASK_CODE_RPC(RPC_CALL_RAW_SESSION_DISCONNECT,
::dsn::THREAD_POOL_DEFAULT)
DEFINE_TASK_CODE_RPC(RPC_CALL_RAW_MESSAGE, TASK_PRIORITY_COMMON, ::dsn::THREAD_POOL_DEFAULT)
DEFINE_THREAD_POOL_CODE(THREAD_POOL_PROXY_SERVER)
DEFINE_TASK_CODE(LPC_RPC_CALL_RAW_SCATTER, TASK_PRIORITY_COMMON, THREAD_POOL_PROXY_SERVER)
class proxy_stub;
class proxy_session : public std::enable_shared_from_this<proxy_session>, public ::dsn::clientlet
class proxy_session : public std::enable_shared_from_this<proxy_session>
{
public:
typedef std::function<std::shared_ptr<proxy_session>(proxy_stub *p, ::dsn::rpc_address raddr)>
typedef std::function<std::shared_ptr<proxy_session>(proxy_stub *p, dsn_message_t first_msg)>
factory;
proxy_session(proxy_stub *p, ::dsn::rpc_address raddr);
proxy_session(proxy_stub *p, dsn_message_t first_msg);
virtual ~proxy_session();
void on_recv_request(std::shared_ptr<proxy_session> _this, dsn_message_t msg);
// called when proxy_stub remove this session
virtual void on_remove_session(std::shared_ptr<proxy_session> _this) = 0;
std::size_t hash() const { return hash_code; }
// on_recv_request & on_remove_session are called by proxy_stub when messages are got from
// underlying rpc engine.
//
// then rpc engine ensures that on_recv_request for one proxy_session
// won't be called concurrently. that is to say: another on_recv_requst
// may happen only after the first one returns
//
// however, during the running of on_recv_request, an "on_remove_session" may be called,
// the proxy_session and its derived class may need to do some synchronization on this.
void on_recv_request(dsn_message_t msg);
void on_remove_session();
protected:
// return true if no parse error, else return false
// return if parse ok
virtual bool parse(dsn_message_t msg) = 0;
dsn_message_t create_response();
protected:
proxy_stub *stub;
std::atomic_bool is_session_reset;
private:
// when get message from raw parser, request & response of "dsn_message_t" are not in couple.
// we need to backup one request to create a response struct.
dsn_message_t backup_one_request;
// the client address for which this session served
::dsn::rpc_address remote_address;
std::size_t hash_code;
::dsn::service::zlock _lock;
protected:
::dsn::service::zlock _rlock; // reply lock
dsn::rpc_address remote_address;
};
class proxy_stub : public ::dsn::serverlet<proxy_stub>
......@@ -72,6 +73,7 @@ public:
this->unregister_rpc_handler(RPC_CALL_RAW_MESSAGE);
this->unregister_rpc_handler(RPC_CALL_RAW_SESSION_DISCONNECT);
}
std::shared_ptr<proxy_session> remove_session(dsn::rpc_address remote_address);
private:
void on_rpc_request(dsn_message_t request);
......
......@@ -28,7 +28,6 @@ protected:
start_bulk_string,
in_bulk_string_size,
start_bulk_string_data,
removed
};
struct redis_bulk_string
{
......@@ -56,7 +55,7 @@ protected:
struct message_entry
{
redis_request request;
dsn_message_t response;
std::atomic<dsn_message_t> response;
int64_t sequence_id;
};
......@@ -71,9 +70,11 @@ protected:
private:
// queue for pipeline the response
dsn::service::zlock response_lock;
std::deque<std::unique_ptr<message_entry>> pending_response;
int64_t next_seqid;
// recieving message and parsing status
// [
// content for current parser
redis_bulk_string current_str;
std::unique_ptr<message_entry> current_msg;
......@@ -86,6 +87,7 @@ private:
char *current_buffer;
size_t current_buffer_length;
size_t current_cursor;
// ]
// for rrdb
std::unique_ptr<::dsn::apps::rrdb_client> client;
......@@ -97,7 +99,7 @@ private:
char peek();
bool eat(char c);
void eat_all(char *dest, size_t length);
void reset();
void reset_parser();
// function for parser
bool end_array_size();
......@@ -121,30 +123,34 @@ private:
DECLARE_REDIS_HANDLER(default_handler)
// function for pipeline reply
void enqueue_pending_response(std::unique_ptr<message_entry> &&entry);
void fetch_and_dequeue_messages(std::vector<dsn_message_t> &msgs, bool only_ready_ones);
void clear_reply_queue();
void reply_all_ready();
template <typename T>
void reply_message(message_entry &entry, const T &value)
{
dsn_message_t resp = create_response();
::dsn::rpc_write_stream s(resp);
// release in reply_all_ready or reset
dsn_msg_add_ref(resp);
dsn::rpc_write_stream s(resp);
marshalling(s, value);
s.commit_buffer();
entry.response = resp;
// released when dequeue fro the pending_response queue
dsn_msg_add_ref(entry.response);
entry.response.store(resp, std::memory_order_release);
reply_all_ready();
}
void reply_all_ready();
typedef void (*redis_call_handler)(redis_parser *, message_entry &);
static std::unordered_map<std::string, redis_call_handler> s_dispatcher;
static redis_call_handler get_handler(const char *command, unsigned int length);
static std::atomic_llong s_next_seqid;
public:
redis_parser(proxy_stub *op, ::dsn::rpc_address remote);
redis_parser(proxy_stub *op, dsn_message_t first_msg);
virtual ~redis_parser();
virtual void on_remove_session(std::shared_ptr<proxy_session> _this) override;
};
}
} // namespace
......@@ -10,7 +10,7 @@ name = proxy
type = proxy
arguments = dsn://redis_cluster/temp
ports = 12345
pools = THREAD_POOL_DEFAULT, THREAD_POOL_PROXY_SERVER
pools = THREAD_POOL_DEFAULT
run = true
[apps.mimic]
......@@ -98,14 +98,6 @@ allow_inline = false
is_trace = false
allow_inline = false
[task.RPC_CALL_RAW_SESSION_DISCONNECT]
is_trace = false
allow_inline = true
[task.RPC_CALL_RAW_MESSAGE]
is_trace = false
allow_inline = true
[uri-resolver.dsn://redis_cluster]
factory = partition_resolver_simple
arguments = localhost:34601, localhost:34602, localhost:34603
......
......@@ -26,8 +26,8 @@ public:
{
if (args.size() < 2)
return ::dsn::ERR_INVALID_PARAMETERS;
proxy_session::factory f = [](proxy_stub *p, ::dsn::rpc_address remote) {
return std::make_shared<redis_parser>(p, remote);
proxy_session::factory f = [](proxy_stub *p, dsn_message_t m) {
return std::make_shared<redis_parser>(p, m);
};
_proxy.reset(new proxy_stub(f, args[1].c_str()));
return ::dsn::ERR_OK;
......@@ -48,7 +48,7 @@ protected:
virtual void handle_command(std::unique_ptr<message_entry> &&entry)
{
redis_request &act_request = entry->request;
redis_request &exp_request = reserved_entry[entry_index].request;
redis_request &exp_request = reserved_entry[entry_index]->request;
ASSERT_TRUE(act_request.length > 0);
ASSERT_EQ(act_request.length, exp_request.length);
......@@ -65,16 +65,18 @@ protected:
}
public:
redis_test_parser1(proxy_stub *stub, ::dsn::rpc_address addr) : redis_parser(stub, addr)
redis_test_parser1(proxy_stub *stub, dsn_message_t msg) : redis_parser(stub, msg)
{
reserved_entry.resize(20);
reserved_entry.reserve(20);
for (int i = 0; i < 20; ++i)
reserved_entry.emplace_back(new message_entry());
}
void test_fixed_cases()
{
std::cout << "test fixed cases" << std::endl;
redis_request &rr = reserved_entry[0].request;
redis_request &rr = reserved_entry[0]->request;
// simple case
{
rr.length = 3;
......@@ -158,7 +160,7 @@ public:
// create several requests
for (entry_index = 0; entry_index < total_requests; ++entry_index) {
redis_request &ra = reserved_entry[entry_index].request;
redis_request &ra = reserved_entry[entry_index]->request;
ra.length = dsn_random32(1, 20);
ra.buffers.resize(ra.length);
for (unsigned int i = 0; i != ra.length; ++i) {
......@@ -243,14 +245,12 @@ public:
{
dsn_message_t m = dsn_msg_create_received_request(
RPC_CALL_RAW_MESSAGE, DSF_THRIFT_BINARY, (void *)data, strlen(data));
dsn_msg_add_ref(m);
return m;
}
static dsn_message_t create_message(const char *data, int length)
{
dsn_message_t m = dsn_msg_create_received_request(
RPC_CALL_RAW_MESSAGE, DSF_THRIFT_BINARY, (void *)data, length);
dsn_msg_add_ref(m);
return m;
}
static dsn_message_t marshalling_array(const redis_request &ra)
......@@ -273,17 +273,23 @@ public:
return result;
}
std::vector<message_entry> reserved_entry;
std::vector<std::unique_ptr<message_entry>> reserved_entry;
int entry_index;
bool got_a_message;
};
TEST(proxy, parser)
{
std::shared_ptr<redis_test_parser1> parser(
new redis_test_parser1(nullptr, ::dsn::rpc_address("127.0.0.1", 123)));
dsn_message_t m = nullptr;
{
m = dsn_msg_create_received_request(RPC_CALL_RAW_MESSAGE, DSF_THRIFT_BINARY, nullptr, 0);
dsn::message_ex *msg = (dsn::message_ex *)m;
msg->header->from_address = dsn::rpc_address("127.0.0.1", 123);
}
std::shared_ptr<redis_test_parser1> parser(new redis_test_parser1(nullptr, m));
parser->test_fixed_cases();
parser->test_random_cases();
dsn_msg_release_ref(m);
}
TEST(proxy, utils)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册