提交 c4507ada 编写于 作者: W WeijieSun 提交者: QinZuoyan

rpc message: remove dsn_message_t (#168)

上级 b3a941cb
Subproject commit b613aad324ac95532de4c795a76f43064d6b4b17
Subproject commit 184e7beae89db9eb94fa9f31cf9c6518635f460b
......@@ -91,4 +91,3 @@ inline rocksdb::Slice to_rocksdb_slice(dsn::string_view s) { return {s.data(), s
} // namespace utils
} // namespace pegasus
......@@ -110,7 +110,7 @@ void pegasus_client_impl::async_set(const std::string &hash_key,
// wrap the user defined callback function, generate a new callback function.
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -199,7 +199,7 @@ void pegasus_client_impl::async_multi_set(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(tmp_key);
// wrap the user-defined-callback-function, generate a new callback function.
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -261,7 +261,7 @@ void pegasus_client_impl::async_get(const std::string &hash_key,
pegasus_generate_key(req, hash_key, sort_key);
auto partition_hash = pegasus_key_hash(req);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -352,7 +352,7 @@ void pegasus_client_impl::async_multi_get(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -453,7 +453,7 @@ void pegasus_client_impl::async_multi_get(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -533,7 +533,7 @@ void pegasus_client_impl::async_multi_get_sortkeys(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -648,7 +648,7 @@ void pegasus_client_impl::async_del(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(req);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -730,7 +730,7 @@ void pegasus_client_impl::async_multi_del(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -799,7 +799,7 @@ void pegasus_client_impl::async_incr(const std::string &hash_key,
auto partition_hash = pegasus_key_hash(req.key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -904,7 +904,7 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -1021,7 +1021,7 @@ void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key,
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
if (user_callback == nullptr) {
return;
......@@ -1192,7 +1192,7 @@ void pegasus_client_impl::async_get_unordered_scanners(
}
auto new_callback = [ user_callback = std::move(callback), max_split_count, options, this ](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp)
{
std::vector<pegasus_scanner *> scanners;
configuration_query_by_index_response response;
......
......@@ -267,7 +267,7 @@ public:
void _async_next_internal();
void _start_scan();
void _next_batch();
void _on_scan_response(::dsn::error_code, dsn_message_t, dsn_message_t);
void _on_scan_response(::dsn::error_code, dsn::message_ex *, dsn::message_ex *);
void _split_reset();
private:
......
......@@ -169,9 +169,9 @@ void pegasus_client_impl::pegasus_scanner_impl::_next_batch()
dassert(!_rpc_started, "");
_rpc_started = true;
_client->scan(req,
[this](::dsn::error_code err, dsn_message_t req, dsn_message_t resp) mutable {
_on_scan_response(err, req, resp);
},
[this](::dsn::error_code err,
dsn::message_ex *req,
dsn::message_ex *resp) mutable { _on_scan_response(err, req, resp); },
std::chrono::milliseconds(_options.timeout_ms),
0,
_hash);
......@@ -200,18 +200,19 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
dassert(!_rpc_started, "");
_rpc_started = true;
_client->get_scanner(req,
[this](::dsn::error_code err,
dsn_message_t req,
dsn_message_t resp) mutable { _on_scan_response(err, req, resp); },
std::chrono::milliseconds(_options.timeout_ms),
0,
_hash);
_client->get_scanner(
req,
[this](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) mutable {
_on_scan_response(err, req, resp);
},
std::chrono::milliseconds(_options.timeout_ms),
0,
_hash);
}
void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_code err,
dsn_message_t req,
dsn_message_t resp)
dsn::message_ex *req,
dsn::message_ex *resp)
{
dassert(_rpc_started, "");
_rpc_started = false;
......
......@@ -26,7 +26,9 @@ PEGASUS_ERR_CODE(PERR_INVALID_VALUE, -203, "value can't be empty");
PEGASUS_ERR_CODE(PERR_INVALID_PAR_COUNT, -204, "partition count must be a power of 2");
PEGASUS_ERR_CODE(PERR_INVALID_REP_COUNT, -205, "replication count must be 3");
PEGASUS_ERR_CODE(PERR_INVALID_SPLIT_COUNT, -206, "split count must be greater than 0");
PEGASUS_ERR_CODE(PERR_GEO_DECODE_VALUE_ERROR, -207, "decode latitude and longitude from value error");
PEGASUS_ERR_CODE(PERR_GEO_DECODE_VALUE_ERROR,
-207,
"decode latitude and longitude from value error");
PEGASUS_ERR_CODE(PERR_GEO_INVALID_LATLNG_ERROR, -208, "latitude or longitude is invalid");
// SERVER ERROR
......
......@@ -12,7 +12,7 @@ class rrdb_service : public replication::replication_app_base,
public:
rrdb_service(replication::replica *r) : replication::replication_app_base(r) {}
virtual ~rrdb_service() {}
virtual int on_request(dsn_message_t request) override
virtual int on_request(dsn::message_ex *request) override
{
handle_request(request);
return 0;
......
......@@ -26,7 +26,7 @@ public:
return ::dsn::ERR_INVALID_PARAMETERS;
}
proxy_session::factory f = [](proxy_stub *p, dsn_message_t m) {
proxy_session::factory f = [](proxy_stub *p, dsn::message_ex *m) {
return std::make_shared<redis_parser>(p, m);
};
_proxy = dsn::make_unique<proxy_stub>(
......
......@@ -43,9 +43,9 @@ proxy_stub::proxy_stub(const proxy_session::factory &f,
open_service();
}
void proxy_stub::on_rpc_request(dsn_message_t request)
void proxy_stub::on_rpc_request(dsn::message_ex *request)
{
::dsn::rpc_address source = dsn_msg_from_address(request);
::dsn::rpc_address source = request->header->from_address;
std::shared_ptr<proxy_session> ps;
{
::dsn::service::zauto_read_lock l(_lock);
......@@ -68,9 +68,9 @@ void proxy_stub::on_rpc_request(dsn_message_t request)
ps->on_recv_request(request);
}
void proxy_stub::on_recv_remove_session_request(dsn_message_t request)
void proxy_stub::on_recv_remove_session_request(dsn::message_ex *request)
{
::dsn::rpc_address source = dsn_msg_from_address(request);
::dsn::rpc_address source = request->header->from_address;
std::shared_ptr<proxy_session> ps = remove_session(source);
if (ps != nullptr) {
ps->on_remove_session();
......@@ -91,13 +91,13 @@ std::shared_ptr<proxy_session> proxy_stub::remove_session(dsn::rpc_address remot
return result;
}
proxy_session::proxy_session(proxy_stub *op, dsn_message_t first_msg)
proxy_session::proxy_session(proxy_stub *op, dsn::message_ex *first_msg)
: stub(op), is_session_reset(false), backup_one_request(first_msg)
{
dassert(first_msg != nullptr, "null msg when create session");
dsn_msg_add_ref(backup_one_request);
backup_one_request->add_ref();
remote_address = dsn_msg_from_address(backup_one_request);
remote_address = backup_one_request->header->from_address;
dassert(remote_address.type() == HOST_TYPE_IPV4,
"invalid rpc_address type, type = %d",
(int)remote_address.type());
......@@ -105,11 +105,11 @@ proxy_session::proxy_session(proxy_stub *op, dsn_message_t first_msg)
proxy_session::~proxy_session()
{
dsn_msg_release_ref(backup_one_request);
backup_one_request->release_ref();
ddebug("proxy session %s destroyed", remote_address.to_string());
}
void proxy_session::on_recv_request(dsn_message_t msg)
void proxy_session::on_recv_request(dsn::message_ex *msg)
{
// NOTICE:
// 1. in the implementation of "parse", the msg may add_ref & release_ref.
......@@ -131,9 +131,6 @@ void proxy_session::on_recv_request(dsn_message_t msg)
void proxy_session::on_remove_session() { is_session_reset.store(true); }
dsn_message_t proxy_session::create_response()
{
return dsn_msg_create_response(backup_one_request);
}
dsn::message_ex *proxy_session::create_response() { return backup_one_request->create_response(); }
}
} // namespace
......@@ -21,9 +21,9 @@ class proxy_stub;
class proxy_session : public std::enable_shared_from_this<proxy_session>
{
public:
typedef std::function<std::shared_ptr<proxy_session>(proxy_stub *p, dsn_message_t first_msg)>
typedef std::function<std::shared_ptr<proxy_session>(proxy_stub *p, dsn::message_ex *first_msg)>
factory;
proxy_session(proxy_stub *p, dsn_message_t first_msg);
proxy_session(proxy_stub *p, dsn::message_ex *first_msg);
virtual ~proxy_session();
// on_recv_request & on_remove_session are called by proxy_stub when messages are got from
......@@ -35,21 +35,21 @@ public:
//
// however, during the running of on_recv_request, an "on_remove_session" may be called,
// the proxy_session and its derived class may need to do some synchronization on this.
void on_recv_request(dsn_message_t msg);
void on_recv_request(dsn::message_ex *msg);
void on_remove_session();
protected:
// return if parse ok
virtual bool parse(dsn_message_t msg) = 0;
dsn_message_t create_response();
virtual bool parse(dsn::message_ex *msg) = 0;
dsn::message_ex *create_response();
protected:
proxy_stub *stub;
std::atomic_bool is_session_reset;
// when get message from raw parser, request & response of "dsn_message_t" are not in couple.
// when get message from raw parser, request & response of "dsn::message_ex*" are not in couple.
// we need to backup one request to create a response struct.
dsn_message_t backup_one_request;
dsn::message_ex *backup_one_request;
// the client address for which this session served
dsn::rpc_address remote_address;
};
......@@ -81,8 +81,8 @@ public:
std::shared_ptr<proxy_session> remove_session(dsn::rpc_address remote_address);
private:
void on_rpc_request(dsn_message_t request);
void on_recv_remove_session_request(dsn_message_t);
void on_rpc_request(dsn::message_ex *request);
void on_recv_remove_session_request(dsn::message_ex *);
::dsn::service::zrwlock_nr _lock;
std::unordered_map<::dsn::rpc_address, std::shared_ptr<proxy_session>> _sessions;
......
......@@ -47,7 +47,7 @@ redis_parser::redis_call_handler redis_parser::get_handler(const char *command,
return iter->second;
}
redis_parser::redis_parser(proxy_stub *op, dsn_message_t first_msg)
redis_parser::redis_parser(proxy_stub *op, dsn::message_ex *first_msg)
: proxy_session(op, first_msg),
current_msg(new message_entry()),
status(start_array),
......@@ -84,25 +84,26 @@ void redis_parser::prepare_current_buffer()
{
void *msg_buffer;
if (current_buffer == nullptr) {
dsn_message_t first_msg = recv_buffers.front();
dassert(dsn_msg_read_next(first_msg, &msg_buffer, &current_buffer_length),
"read dsn_message_t failed, msg from_address = %s, to_address = %s, rpc_name = %s",
((::dsn::message_ex *)first_msg)->header->from_address.to_string(),
((::dsn::message_ex *)first_msg)->to_address.to_string(),
((::dsn::message_ex *)first_msg)->header->rpc_name);
dsn::message_ex *first_msg = recv_buffers.front();
dassert(
first_msg->read_next(&msg_buffer, &current_buffer_length),
"read dsn::message_ex* failed, msg from_address = %s, to_address = %s, rpc_name = %s",
first_msg->header->from_address.to_string(),
first_msg->to_address.to_string(),
first_msg->header->rpc_name);
current_buffer = reinterpret_cast<char *>(msg_buffer);
current_cursor = 0;
} else if (current_cursor >= current_buffer_length) {
dsn_message_t first_msg = recv_buffers.front();
dsn_msg_read_commit(first_msg, current_buffer_length);
if (dsn_msg_read_next(first_msg, &msg_buffer, &current_buffer_length)) {
dsn::message_ex *first_msg = recv_buffers.front();
first_msg->read_commit(current_buffer_length);
if (first_msg->read_next(&msg_buffer, &current_buffer_length)) {
current_cursor = 0;
current_buffer = reinterpret_cast<char *>(msg_buffer);
return;
} else {
// we have consume this message all over
// reference is added in append message
dsn_msg_release_ref(first_msg);
first_msg->release_ref();
recv_buffers.pop();
current_buffer = nullptr;
prepare_current_buffer();
......@@ -122,13 +123,13 @@ void redis_parser::reset_parser()
// clear the data stream
total_length = 0;
if (current_buffer) {
dsn_msg_read_commit(recv_buffers.front(), current_buffer_length);
recv_buffers.front()->read_commit(current_buffer_length);
}
current_buffer = nullptr;
current_buffer_length = 0;
current_cursor = 0;
while (!recv_buffers.empty()) {
dsn_msg_release_ref(recv_buffers.front());
recv_buffers.front()->release_ref();
recv_buffers.pop();
}
}
......@@ -231,11 +232,11 @@ bool redis_parser::end_bulk_string_size()
return true;
}
void redis_parser::append_message(dsn_message_t msg)
void redis_parser::append_message(dsn::message_ex *msg)
{
dsn_msg_add_ref(msg);
msg->add_ref();
recv_buffers.push(msg);
total_length += dsn_msg_body_size(msg);
total_length += msg->body_size();
dinfo("%s: recv message, currently total length:%d", remote_address.to_string(), total_length);
}
......@@ -296,7 +297,7 @@ bool redis_parser::parse_stream()
return true;
}
bool redis_parser::parse(dsn_message_t msg)
bool redis_parser::parse(dsn::message_ex *msg)
{
append_message(msg);
if (parse_stream())
......@@ -315,13 +316,13 @@ void redis_parser::enqueue_pending_response(std::unique_ptr<message_entry> &&ent
pending_response.emplace_back(std::move(entry));
}
void redis_parser::fetch_and_dequeue_messages(std::vector<dsn_message_t> &msgs,
void redis_parser::fetch_and_dequeue_messages(std::vector<dsn::message_ex *> &msgs,
bool only_ready_ones)
{
dsn::service::zauto_lock l(response_lock);
while (!pending_response.empty()) {
message_entry *entry = pending_response.front().get();
dsn_message_t r = entry->response.load(std::memory_order_acquire);
dsn::message_ex *r = entry->response.load(std::memory_order_acquire);
if (only_ready_ones && r == nullptr) {
break;
} else {
......@@ -334,24 +335,24 @@ void redis_parser::fetch_and_dequeue_messages(std::vector<dsn_message_t> &msgs,
void redis_parser::clear_reply_queue()
{
// clear the response pipeline
std::vector<dsn_message_t> all_responses;
std::vector<dsn::message_ex *> all_responses;
fetch_and_dequeue_messages(all_responses, false);
for (const dsn_message_t &m : all_responses) {
for (dsn::message_ex *m : all_responses) {
if (m != nullptr) {
dsn_msg_release_ref(m);
m->release_ref();
}
}
}
void redis_parser::reply_all_ready()
{
std::vector<dsn_message_t> ready_responses;
std::vector<dsn::message_ex *> ready_responses;
fetch_and_dequeue_messages(ready_responses, true);
for (const dsn_message_t &m : ready_responses) {
for (dsn::message_ex *m : ready_responses) {
dassert(m != nullptr, "");
dsn_rpc_reply(m, ::dsn::ERR_OK);
// added when message is created
dsn_msg_release_ref(m);
m->release_ref();
}
}
......@@ -395,51 +396,51 @@ void redis_parser::set_internal(redis_parser::message_entry &entry)
// with a reference to prevent the object from being destroyed
std::shared_ptr<proxy_session> ref_this = shared_from_this();
dinfo("%s: send set command(%" PRId64 ")", remote_address.to_string(), entry.sequence_id);
auto on_set_reply =
[ref_this, this, &entry](::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
// when the "is_session_reset" flag is set, the socket may be broken.
// so continue to reply the message is not necessary
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: set command seqid(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
entry.sequence_id);
return;
}
auto on_set_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
// when the "is_session_reset" flag is set, the socket may be broken.
// so continue to reply the message is not necessary
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: set command seqid(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
entry.sequence_id);
return;
}
// the message_enry "entry" is stored in the queue "pending_response".
// please ensure that "entry" hasn't been released right now.
//
// currently we only clear an entry when it is replied or
// in the redis_parser's destructor
dinfo("%s: set command seqid(%" PRId64 ") got reply",
remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: set command seqid(%" PRId64 ") got reply with error = %s",
remote_address.to_string(),
entry.sequence_id,
ec.to_string());
// the message_enry "entry" is stored in the queue "pending_response".
// please ensure that "entry" hasn't been released right now.
//
// currently we only clear an entry when it is replied or
// in the redis_parser's destructor
dinfo("%s: set command seqid(%" PRId64 ") got reply",
remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: set command seqid(%" PRId64 ") got reply with error = %s",
remote_address.to_string(),
entry.sequence_id,
ec.to_string());
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
reply_message(entry, result);
} else {
::dsn::apps::update_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
result.message = "ERR internal error " +
boost::lexical_cast<std::string>(rrdb_response.error);
reply_message(entry, result);
} else {
::dsn::apps::update_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
redis_simple_string result;
result.is_error = true;
result.message = "ERR internal error " +
boost::lexical_cast<std::string>(rrdb_response.error);
reply_message(entry, result);
} else {
redis_simple_string result;
result.is_error = false;
result.message = "OK";
reply_message(entry, result);
}
redis_simple_string result;
result.is_error = false;
result.message = "OK";
reply_message(entry, result);
}
};
}
};
::dsn::apps::update_request req;
::dsn::blob null_blob;
......@@ -532,7 +533,7 @@ void redis_parser::setex(message_entry &entry)
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_setex_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
......@@ -600,48 +601,48 @@ void redis_parser::get(message_entry &entry)
remote_address.to_string(),
entry.sequence_id);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_get_reply =
[ref_this, this, &entry](::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: get command(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
entry.sequence_id);
return;
}
auto on_get_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: get command(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
entry.sequence_id);
return;
}
dinfo("%s: get command seqid(%" PRId64 ") got reply",
remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: get command seqid(%" PRId64 ") got reply with error = %s",
remote_address.to_string(),
entry.sequence_id,
ec.to_string());
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
reply_message(entry, result);
} else {
::dsn::apps::read_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
if (rrdb_response.error == rocksdb::Status::kNotFound) {
redis_bulk_string result;
result.length = -1;
reply_message(entry, result);
} else {
redis_simple_string result;
result.is_error = true;
result.message = "ERR internal error " +
boost::lexical_cast<std::string>(rrdb_response.error);
reply_message(entry, result);
}
dinfo("%s: get command seqid(%" PRId64 ") got reply",
remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: get command seqid(%" PRId64 ") got reply with error = %s",
remote_address.to_string(),
entry.sequence_id,
ec.to_string());
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
reply_message(entry, result);
} else {
::dsn::apps::read_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
if (rrdb_response.error == rocksdb::Status::kNotFound) {
redis_bulk_string result;
result.length = -1;
reply_message(entry, result);
} else {
redis_bulk_string result(rrdb_response.value);
redis_simple_string result;
result.is_error = true;
result.message = "ERR internal error " +
boost::lexical_cast<std::string>(rrdb_response.error);
reply_message(entry, result);
}
} else {
redis_bulk_string result(rrdb_response.value);
reply_message(entry, result);
}
};
}
};
::dsn::blob req;
::dsn::blob null_blob;
pegasus_generate_key(req, redis_req.buffers[1].data, null_blob);
......@@ -676,43 +677,43 @@ void redis_parser::del_internal(message_entry &entry)
remote_address.to_string(),
entry.sequence_id);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_del_reply =
[ref_this, this, &entry](::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: del command seqid(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
entry.sequence_id);
return;
}
auto on_del_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: del command seqid(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
entry.sequence_id);
return;
}
dinfo("%s: del command seqid(%" PRId64 ") got reply",
remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: del command seqid(%" PRId64 ") got reply with error = %s",
remote_address.to_string(),
entry.sequence_id,
ec.to_string());
dinfo("%s: del command seqid(%" PRId64 ") got reply",
remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: del command seqid(%" PRId64 ") got reply with error = %s",
remote_address.to_string(),
entry.sequence_id,
ec.to_string());
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
reply_message(entry, result);
} else {
::dsn::apps::read_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
result.message = "ERR internal error " +
boost::lexical_cast<std::string>(rrdb_response.error);
reply_message(entry, result);
} else {
::dsn::apps::read_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
redis_simple_string result;
result.is_error = true;
result.message = "ERR internal error " +
boost::lexical_cast<std::string>(rrdb_response.error);
reply_message(entry, result);
} else {
redis_integer result;
result.value = 1;
reply_message(entry, result);
}
redis_integer result;
result.value = 1;
reply_message(entry, result);
}
};
}
};
::dsn::blob req;
::dsn::blob null_blob;
pegasus_generate_key(req, redis_req.buffers[1].data, null_blob);
......@@ -786,7 +787,7 @@ void redis_parser::ttl(message_entry &entry)
entry.sequence_id);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_ttl_reply = [ref_this, this, &entry, is_ttl](
::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: ttl/pttl command seqid(%" PRId64 ") got reply, but session has reset",
remote_address.to_string(),
......@@ -1004,7 +1005,7 @@ void redis_parser::counter_internal(message_entry &entry)
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_incr_reply = [ref_this, this, command, &entry](
::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (is_session_reset.load(std::memory_order_acquire)) {
dwarn_f("{}: command {} seqid({}) got reply, but session has reset",
remote_address.to_string(),
......
......@@ -83,11 +83,11 @@ protected:
struct message_entry
{
redis_request request;
std::atomic<dsn_message_t> response;
std::atomic<dsn::message_ex *> response;
int64_t sequence_id = 0;
};
bool parse(dsn_message_t msg) override;
bool parse(dsn::message_ex *msg) override;
// this is virtual only because we can override and test other modules
virtual void handle_command(std::unique_ptr<message_entry> &&entry);
......@@ -106,7 +106,7 @@ private:
std::string current_size;
// data stream content
std::queue<dsn_message_t> recv_buffers;
std::queue<dsn::message_ex *> recv_buffers;
size_t total_length;
char *current_buffer;
size_t current_buffer_length;
......@@ -119,7 +119,7 @@ private:
protected:
// function for data stream
void append_message(dsn_message_t msg);
void append_message(dsn::message_ex *msg);
void prepare_current_buffer();
char peek();
bool eat(char c);
......@@ -179,16 +179,16 @@ protected:
// function for pipeline reply
void enqueue_pending_response(std::unique_ptr<message_entry> &&entry);
void fetch_and_dequeue_messages(std::vector<dsn_message_t> &msgs, bool only_ready_ones);
void fetch_and_dequeue_messages(std::vector<dsn::message_ex *> &msgs, bool only_ready_ones);
void clear_reply_queue();
void reply_all_ready();
template <typename T>
void reply_message(message_entry &entry, const T &value)
{
dsn_message_t resp = create_response();
dsn::message_ex *resp = create_response();
// release in reply_all_ready or reset
dsn_msg_add_ref(resp);
resp->add_ref();
dsn::rpc_write_stream s(resp);
value.marshalling(s);
......@@ -204,7 +204,7 @@ protected:
static std::atomic_llong s_next_seqid;
public:
redis_parser(proxy_stub *op, dsn_message_t first_msg);
redis_parser(proxy_stub *op, dsn::message_ex *first_msg);
~redis_parser() override;
};
}
......
......@@ -30,7 +30,7 @@ public:
return ::dsn::ERR_INVALID_PARAMETERS;
}
proxy_session::factory f = [](proxy_stub *p, dsn_message_t m) {
proxy_session::factory f = [](proxy_stub *p, dsn::message_ex *m) {
return std::make_shared<redis_parser>(p, m);
};
_proxy = dsn::make_unique<proxy_stub>(f, args[1].c_str(), args[2].c_str());
......@@ -69,7 +69,7 @@ protected:
}
public:
redis_test_parser(proxy_stub *stub, dsn_message_t msg) : redis_parser(stub, msg)
redis_test_parser(proxy_stub *stub, dsn::message_ex *msg) : redis_parser(stub, msg)
{
reserved_entry.reserve(20);
for (int i = 0; i < 20; ++i) {
......@@ -191,7 +191,7 @@ public:
std::cout << "test random cases" << std::endl;
int total_requests = 10;
std::vector<dsn_message_t> fake_requests;
std::vector<dsn::message_ex *> fake_requests;
int total_body_size = 0;
// create several requests
......@@ -214,14 +214,14 @@ public:
bs.data.assign(std::move(raw_buf), 0, bs.length);
}
}
dsn_message_t fake_response = marshalling_array(ra);
dsn_message_t fake_request = dsn_msg_copy(fake_response, true, true);
dsn::message_ex *fake_response = marshalling_array(ra);
dsn::message_ex *fake_request = fake_response->copy(true, true);
dsn_msg_add_ref(fake_response);
dsn_msg_release_ref(fake_response);
fake_response->add_ref();
fake_response->release_ref();
fake_requests.push_back(fake_request);
total_body_size += dsn_msg_body_size(fake_request);
total_body_size += fake_request->body_size();
}
// let's copy the messages
......@@ -229,16 +229,16 @@ public:
std::default_delete<char[]>());
char *raw_msg_buffer = msg_buffer.get();
for (dsn_message_t r : fake_requests) {
for (dsn::message_ex *r : fake_requests) {
void *rw_ptr;
size_t length;
while (dsn_msg_read_next(r, &rw_ptr, &length)) {
while (r->read_next(&rw_ptr, &length)) {
memcpy(raw_msg_buffer, rw_ptr, length);
raw_msg_buffer += length;
dsn_msg_read_commit(r, length);
r->read_commit(length);
}
dsn_msg_add_ref(r);
dsn_msg_release_ref(r);
r->add_ref();
r->release_ref();
}
*raw_msg_buffer = 0;
......@@ -247,7 +247,7 @@ public:
raw_msg_buffer = msg_buffer.get();
// first create a big message, test the pipeline
{
dsn_message_t msg = create_message(raw_msg_buffer, total_body_size);
dsn::message_ex *msg = create_message(raw_msg_buffer, total_body_size);
entry_index = 0;
ASSERT_TRUE(parse(msg));
ASSERT_EQ(entry_index, total_requests);
......@@ -268,7 +268,7 @@ public:
for (unsigned i = 0; i < start_pos.size() - 1; ++i) {
if (start_pos[i] != start_pos[i + 1]) {
int length = start_pos[i + 1] - start_pos[i];
dsn_message_t msg = create_message(raw_msg_buffer + start_pos[i], length);
dsn::message_ex *msg = create_message(raw_msg_buffer + start_pos[i], length);
ASSERT_TRUE(parse(msg));
}
}
......@@ -410,34 +410,30 @@ public:
{
int ttl_seconds = 0;
std::vector<redis_bulk_string> opts({{"SET"},
{"KK"},
{"vv"},
{"EX"},
{"123"}});
std::vector<redis_bulk_string> opts({{"SET"}, {"KK"}, {"vv"}, {"EX"}, {"123"}});
parse_set_parameters(opts, ttl_seconds);
ASSERT_EQ(ttl_seconds, 123);
}
}
public:
static dsn_message_t create_message(const char *data)
static dsn::message_ex *create_message(const char *data)
{
dsn_message_t m = dsn_msg_create_received_request(
RPC_CALL_RAW_MESSAGE, DSF_THRIFT_BINARY, (void *)data, strlen(data));
dsn::message_ex *m = dsn::message_ex::create_received_request(
RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data, strlen(data));
return m;
}
static dsn_message_t create_message(const char *data, int length)
static dsn::message_ex *create_message(const char *data, int length)
{
dsn_message_t m = dsn_msg_create_received_request(
RPC_CALL_RAW_MESSAGE, DSF_THRIFT_BINARY, (void *)data, length);
dsn::message_ex *m = dsn::message_ex::create_received_request(
RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, (void *)data, length);
return m;
}
static dsn_message_t marshalling_array(const redis_request &ra)
static dsn::message_ex *marshalling_array(const redis_request &ra)
{
dsn_message_t m = create_message("dummy");
dsn::message_ex *m = create_message("dummy");
dsn_message_t result = dsn_msg_create_response(m);
dsn::message_ex *result = m->create_response();
::dsn::rpc_write_stream stream(result);
stream.write_pod('*');
......@@ -450,7 +446,7 @@ public:
ra.buffers[i].marshalling(stream);
}
dsn_msg_release_ref(m);
m->release_ref();
return result;
}
......@@ -461,9 +457,10 @@ public:
TEST(proxy, parser)
{
dsn_message_t m = nullptr;
dsn::message_ex *m = nullptr;
{
m = dsn_msg_create_received_request(RPC_CALL_RAW_MESSAGE, DSF_THRIFT_BINARY, nullptr, 0);
m = dsn::message_ex::create_received_request(
RPC_CALL_RAW_MESSAGE, dsn::DSF_THRIFT_BINARY, nullptr, 0);
dsn::message_ex *msg = (dsn::message_ex *)m;
msg->header->from_address = dsn::rpc_address("127.0.0.1", 123);
}
......@@ -471,7 +468,7 @@ TEST(proxy, parser)
parser->test_fixed_cases();
parser->test_random_cases();
parser->test_parse_parameters();
dsn_msg_release_ref(m);
m->release_ref();
}
TEST(proxy, utils)
......
......@@ -135,7 +135,7 @@ void pegasus_counter_reporter::stop()
}
void pegasus_counter_reporter::update_counters_to_falcon(const std::string &result,
int64_t timestamp)
int64_t timestamp)
{
ddebug("update counters to falcon with timestamp = %" PRId64, timestamp);
http_post_request(
......@@ -187,10 +187,10 @@ void pegasus_counter_reporter::update()
}
void pegasus_counter_reporter::http_post_request(const std::string &host,
int32_t port,
const std::string &path,
const std::string &contentType,
const std::string &data)
int32_t port,
const std::string &path,
const std::string &contentType,
const std::string &data)
{
dinfo("start update_request, %s", data.c_str());
struct event_base *base = event_base_new();
......@@ -242,7 +242,7 @@ void pegasus_counter_reporter::http_request_done(struct evhttp_request *req, voi
}
void pegasus_counter_reporter::on_report_timer(std::shared_ptr<boost::asio::deadline_timer> timer,
const boost::system::error_code &ec)
const boost::system::error_code &ec)
{
// NOTICE: the following code is running out of rDSN's tls_context
if (!ec) {
......
......@@ -489,7 +489,7 @@ void pegasus_server_impl::gc_checkpoints()
int pegasus_server_impl::on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn_message_t *requests,
dsn::message_ex **requests,
int count)
{
dassert(_is_open, "");
......
......@@ -75,7 +75,7 @@ public:
/// \inherit dsn::replication::replication_app_base
virtual int on_batched_write_requests(int64_t decree,
uint64_t timestamp,
dsn_message_t *requests,
dsn::message_ex **requests,
int count) override;
virtual ::dsn::error_code prepare_get_checkpoint(dsn::blob &learn_req) override
......
......@@ -19,7 +19,7 @@ pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool ver
_write_svc = dsn::make_unique<pegasus_write_service>(server);
}
int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
int count,
int64_t decree,
uint64_t timestamp)
......@@ -33,7 +33,7 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
return _write_svc->empty_put(_decree);
}
dsn::task_code rpc_code(dsn_msg_task_code(requests[0]));
dsn::task_code rpc_code(requests[0]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
dassert(count == 1, "count = %d", count);
auto rpc = multi_put_rpc::auto_reply(requests[0]);
......@@ -63,7 +63,7 @@ int pegasus_server_write::on_batched_write_requests(dsn_message_t *requests,
return on_batched_writes(requests, count);
}
int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count)
int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count)
{
int err = 0;
{
......@@ -77,7 +77,7 @@ int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count)
// and respond for all RPCs regardless of their result.
int local_err = 0;
dsn::task_code rpc_code(dsn_msg_task_code(requests[i]));
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
local_err = on_single_put_in_batch(rpc);
......@@ -114,7 +114,9 @@ int pegasus_server_write::on_batched_writes(dsn_message_t *requests, int count)
return err;
}
void pegasus_server_write::request_key_check(int64_t decree, dsn_message_t m, const dsn::blob &key)
void pegasus_server_write::request_key_check(int64_t decree,
dsn::message_ex *m,
const dsn::blob &key)
{
auto msg = (dsn::message_ex *)m;
if (msg->header->client.partition_hash != 0) {
......
......@@ -26,14 +26,14 @@ public:
/// As long as the returned error is 0, the operation is guaranteed to be
/// successfully applied into rocksdb, which means an empty_put will be called
/// even if there's no write.
int on_batched_write_requests(dsn_message_t *requests,
int on_batched_write_requests(dsn::message_ex **requests,
int count,
int64_t decree,
uint64_t timestamp);
private:
/// Delay replying for the batched requests until all of them complete.
int on_batched_writes(dsn_message_t *requests, int count);
int on_batched_writes(dsn::message_ex **requests, int count);
int on_single_put_in_batch(put_rpc &rpc)
{
......@@ -51,7 +51,7 @@ private:
// Ensure that the write request is directed to the right partition.
// In verbose mode it will log for every request.
void request_key_check(int64_t decree, dsn_message_t m, const dsn::blob &key);
void request_key_check(int64_t decree, dsn::message_ex *m, const dsn::blob &key);
private:
friend class pegasus_server_write_test;
......
......@@ -11,29 +11,29 @@
namespace pegasus {
inline dsn_message_t create_multi_put_request(const dsn::apps::multi_put_request &request)
inline dsn::message_ex *create_multi_put_request(const dsn::apps::multi_put_request &request)
{
return dsn::from_thrift_request_to_received_message(request,
dsn::apps::RPC_RRDB_RRDB_MULTI_PUT);
}
inline dsn_message_t create_multi_remove_request(const dsn::apps::multi_remove_request &request)
inline dsn::message_ex *create_multi_remove_request(const dsn::apps::multi_remove_request &request)
{
return dsn::from_thrift_request_to_received_message(request,
dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE);
}
inline dsn_message_t create_put_request(const dsn::apps::update_request &request)
inline dsn::message_ex *create_put_request(const dsn::apps::update_request &request)
{
return dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_PUT);
}
inline dsn_message_t create_remove_request(const dsn::blob &key)
inline dsn::message_ex *create_remove_request(const dsn::blob &key)
{
return dsn::from_thrift_request_to_received_message(key, dsn::apps::RPC_RRDB_RRDB_REMOVE);
}
inline dsn_message_t create_incr_request(const dsn::apps::incr_request &request)
inline dsn::message_ex *create_incr_request(const dsn::apps::incr_request &request)
{
return dsn::from_thrift_request_to_received_message(request, dsn::apps::RPC_RRDB_RRDB_INCR);
}
......
......@@ -44,7 +44,7 @@ public:
int put_rpc_cnt = dsn_random32(1, 10);
int remove_rpc_cnt = dsn_random32(1, 10);
int total_rpc_cnt = put_rpc_cnt + remove_rpc_cnt;
auto writes = new dsn_message_t[total_rpc_cnt];
auto writes = new dsn::message_ex *[total_rpc_cnt];
for (int i = 0; i < put_rpc_cnt; i++) {
writes[i] = pegasus::create_put_request(req);
}
......@@ -53,7 +53,7 @@ public:
}
auto cleanup = dsn::defer([=]() {
for (int i = 0; i < total_rpc_cnt; i++) {
dsn_msg_release_ref(writes[i]);
writes[i]->release_ref();
}
delete[] writes;
});
......
......@@ -330,7 +330,7 @@ inline void call_remote_command(shell_context *sc,
results.resize(nodes.size());
for (int i = 0; i < nodes.size(); ++i) {
auto callback = [&results,
i](::dsn::error_code err, dsn_message_t req, dsn_message_t resp) {
i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) {
if (err == ::dsn::ERR_OK) {
results[i].first = true;
::dsn::unmarshall(resp, results[i].second);
......
......@@ -2975,13 +2975,13 @@ inline bool mlog_dump(command_executor *e, shell_context *sc, arguments args)
}
std::ostream &os = *os_ptr;
std::function<void(int64_t decree, int64_t timestamp, dsn_message_t * requests, int count)>
std::function<void(int64_t decree, int64_t timestamp, dsn::message_ex * *requests, int count)>
callback;
if (detailed) {
callback = [&os, sc](
int64_t decree, int64_t timestamp, dsn_message_t *requests, int count) mutable {
int64_t decree, int64_t timestamp, dsn::message_ex **requests, int count) mutable {
for (int i = 0; i < count; ++i) {
dsn_message_t request = requests[i];
dsn::message_ex *request = requests[i];
dassert(request != nullptr, "");
::dsn::message_ex *msg = (::dsn::message_ex *)request;
if (msg->local_rpc_code == RPC_REPLICATION_WRITE_EMPTY) {
......
......@@ -46,10 +46,12 @@ protected:
// 2. initialize the clients
std::vector<dsn::rpc_address> meta_list;
replica_helper::load_meta_servers(meta_list, "uri-resolver.dsn://single_master_cluster", "arguments");
replica_helper::load_meta_servers(
meta_list, "uri-resolver.dsn://single_master_cluster", "arguments");
ddl_client = std::make_shared<replication_ddl_client>(meta_list);
pg_client = pegasus::pegasus_client_factory::get_client("single_master_cluster", table_name.c_str());
pg_client = pegasus::pegasus_client_factory::get_client("single_master_cluster",
table_name.c_str());
// 3. write some data to the app
dsn::error_code err;
......
......@@ -32,7 +32,7 @@ protected:
// generate cnt number belong to [a, b],
// if cnt > (b - a + 1), then just return the numbers between a ~ b
void generate_random(std::vector<int> &res, int cnt, int a, int b);
// generate one number belong to [a, b]
int generate_one_number(int a, int b);
......
......@@ -12,16 +12,17 @@
int main(int argc, const char **argv)
{
if (argc < 3) {
printf("invalid arguments: pegasus_kill_test configfile worker_type(verifier|process_killer|partition_killer)\n");
printf("invalid arguments: pegasus_kill_test configfile "
"worker_type(verifier|process_killer|partition_killer)\n");
return -1;
} else if (strcmp(argv[2], "verifier") == 0) {
verifier_initialize(argv[1]);
verifier_start();
} else if (strcmp(argv[2], "process_killer") == 0) {
pegasus::test::kill_testor * killtestor = new pegasus::test::process_kill_testor(argv[1]);
pegasus::test::kill_testor *killtestor = new pegasus::test::process_kill_testor(argv[1]);
killtestor->Run();
} else if (strcmp(argv[2], "partition_killer") == 0) {
pegasus::test::kill_testor * killtestor = new pegasus::test::partition_kill_testor(argv[1]);
pegasus::test::kill_testor *killtestor = new pegasus::test::partition_kill_testor(argv[1]);
killtestor->Run();
} else {
printf("invalid worker_type: %s\n", argv[2]);
......
......@@ -61,7 +61,7 @@ void partition_kill_testor::run()
cmd.arguments[1] = to_string(p.pid.get_partition_index());
auto callback = [&results,
i](::dsn::error_code err, dsn_message_t req, dsn_message_t resp) {
i](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) {
if (err == ::dsn::ERR_OK) {
results[i].first = true;
::dsn::unmarshall(resp, results[i].second);
......
......@@ -91,16 +91,17 @@ void do_set(int thread_id)
client->set(hash_key, sort_key, value, set_and_get_timeout_milliseconds, 0, &info);
if (ret == PERR_OK) {
long cur_time = get_time();
// ddebug("SetThread[%d]: set succeed: id=%lld, try=%d, time=%ld (gpid=%d.%d, "
// "decree=%lld, server=%s)",
// thread_id,
// id,
// try_count,
// (cur_time - last_time),
// info.app_id,
// info.partition_index,
// info.decree,
// info.server.c_str());
// ddebug("SetThread[%d]: set succeed: id=%lld, try=%d, time=%ld (gpid=%d.%d,
// "
// "decree=%lld, server=%s)",
// thread_id,
// id,
// try_count,
// (cur_time - last_time),
// info.app_id,
// info.partition_index,
// info.decree,
// info.server.c_str());
stat_time[stat_count++] = cur_time - last_time;
if (stat_count == stat_batch) {
std::sort(stat_time.begin(), stat_time.end());
......
......@@ -11,7 +11,8 @@
int main(int argc, const char **argv)
{
if (argc < 3) {
printf("invalid arguments: pegasus_upgrade_test configfile worker_type(verifier|upgrader)\n");
printf(
"invalid arguments: pegasus_upgrade_test configfile worker_type(verifier|upgrader)\n");
return -1;
} else if (strcmp(argv[2], "verifier") == 0) {
verifier_initialize(argv[1]);
......
......@@ -127,7 +127,8 @@ void upgrade_testor::run()
ddebug("************************");
ddebug("Round [%d]", upgrade_round);
ddebug("start upgrade...");
ddebug("upgrade meta number=%d, replica number=%d, zk number=%d", meta_cnt, replica_cnt, zk_cnt);
ddebug(
"upgrade meta number=%d, replica number=%d, zk number=%d", meta_cnt, replica_cnt, zk_cnt);
if (!upgrade(replica_cnt)) {
stop_verifier_and_exit("upgrade jobs failed");
......@@ -151,11 +152,14 @@ bool upgrade_testor::upgrade(int replica_cnt)
std::vector<int> total_count = {
_total_meta_count, _total_replica_count, _total_zookeeper_count};
std::vector<int> random_idxs;
generate_random(random_idxs, 1 /*REPLICA - REPLICA + 1*/, REPLICA, REPLICA); // 生成type列表
generate_random(random_idxs, 1 /*REPLICA - REPLICA + 1*/, REPLICA, REPLICA); // 生成type列表
for (auto id : random_idxs) {
std::vector<int> &job_index_to_upgrade = _job_index_to_upgrade[_job_types[id]];
job_index_to_upgrade.clear();
generate_random(job_index_to_upgrade, upgrade_counts[id], 1, total_count[id]); // 生成该type需要upgrade的index列表
generate_random(job_index_to_upgrade,
upgrade_counts[id],
1,
total_count[id]); // 生成该type需要upgrade的index列表
for (auto index : job_index_to_upgrade) {
ddebug("start to upgrade %s@%d", job_type_str(_job_types[id]), index);
if (!upgrade_job_by_index(_job_types[id], index)) {
......
......@@ -39,7 +39,8 @@ public:
private:
// action = upgrade | downgrade.
std::list<std::string> generate_cmd(int index, const std::string &job, const std::string &action);
std::list<std::string>
generate_cmd(int index, const std::string &job, const std::string &action);
// check whether the command execute success.
bool check(const std::string &job, int index, const std::string &type);
......
......@@ -4,4 +4,7 @@
using namespace pegasus::test;
void register_upgrade_handlers() { upgrader_handler::register_factory<upgrader_handler_shell>("shell"); }
void register_upgrade_handlers()
{
upgrader_handler::register_factory<upgrader_handler_shell>("shell");
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册