提交 61cd39f3 编写于 作者: P Phil Mesnier

start message handling

上级 9f0b1348
...@@ -3,7 +3,7 @@ add_library( net_plugin ...@@ -3,7 +3,7 @@ add_library( net_plugin
net_plugin.cpp net_plugin.cpp
${HEADERS} ) ${HEADERS} )
target_link_libraries( net_plugin chain_plugin appbase fc ) target_link_libraries( net_plugin chain_plugin egenesis appbase fc )
target_include_directories( net_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) target_include_directories( net_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" )
install( TARGETS install( TARGETS
......
...@@ -27,7 +27,7 @@ struct node_transaction_state { ...@@ -27,7 +27,7 @@ struct node_transaction_state {
fc::time_point received; fc::time_point received;
fc::time_point_sec expires; fc::time_point_sec expires;
vector<char> packed_transaction; vector<char> packed_transaction;
uint64_t block_num = -1; /// block transaction was included in uint32_t block_num = -1; /// block transaction was included in
bool validated = false; /// whether or not our node has validated it bool validated = false; /// whether or not our node has validated it
}; };
...@@ -43,7 +43,7 @@ struct transaction_state { ...@@ -43,7 +43,7 @@ struct transaction_state {
transaction_id_type id; transaction_id_type id;
bool is_known_by_peer = false; ///< true if we sent or received this trx to this peer or received notice from peer bool is_known_by_peer = false; ///< true if we sent or received this trx to this peer or received notice from peer
bool is_noticed_to_peer = false; ///< have we sent peer noitce we know it (true if we reeive from this peer) bool is_noticed_to_peer = false; ///< have we sent peer noitce we know it (true if we reeive from this peer)
uint64_t block_num = -1; ///< the block number the transaction was included in uint32_t block_num = -1; ///< the block number the transaction was included in
time_point validated_time; ///< infinity for unvalidated time_point validated_time; ///< infinity for unvalidated
time_point requested_time; /// incase we fetch large trx time_point requested_time; /// incase we fetch large trx
}; };
...@@ -76,9 +76,9 @@ typedef multi_index_container< ...@@ -76,9 +76,9 @@ typedef multi_index_container<
* Index by start_block * Index by start_block
*/ */
struct sync_state { struct sync_state {
uint64_t start_block = 0; uint32_t start_block = 0;
uint64_t end_block = 0; uint32_t end_block = 0;
uint64_t last = 0; ///< last sent or received uint32_t last = 0; ///< last sent or received
time_point start_time;; ///< time request made or received time_point start_time;; ///< time request made or received
}; };
...@@ -86,7 +86,7 @@ struct by_start_block; ...@@ -86,7 +86,7 @@ struct by_start_block;
typedef multi_index_container< typedef multi_index_container<
sync_state, sync_state,
indexed_by< indexed_by<
ordered_unique< tag<by_start_block>, member<sync_state, uint64_t, &sync_state::start_block > > ordered_unique< tag<by_start_block>, member<sync_state, uint32_t, &sync_state::start_block > >
> >
> sync_request_index; > sync_request_index;
...@@ -98,6 +98,7 @@ public: ...@@ -98,6 +98,7 @@ public:
wlog( "created connection" ); wlog( "created connection" );
pending_message_buffer.resize( 1024*1024*4 ); pending_message_buffer.resize( 1024*1024*4 );
} }
~connection() { ~connection() {
wlog( "released connection" ); wlog( "released connection" );
} }
...@@ -112,8 +113,7 @@ public: ...@@ -112,8 +113,7 @@ public:
vector<char> pending_message_buffer; vector<char> pending_message_buffer;
handshake_message last_handshake; handshake_message last_handshake;
std::deque<net_message> out_queue;
std::deque<net_message> out_queue;
void send( const net_message& m ) { void send( const net_message& m ) {
out_queue.push_back( m ); out_queue.push_back( m );
...@@ -122,8 +122,12 @@ public: ...@@ -122,8 +122,12 @@ public:
} }
void send_next_message() { void send_next_message() {
if( !out_queue.size() ) if( !out_queue.size() ) {
return; if (out_sync_state.size() > 0) {
write_block_backlog();
}
return;
}
auto& m = out_queue.front(); auto& m = out_queue.front();
...@@ -145,6 +149,29 @@ public: ...@@ -145,6 +149,29 @@ public:
} }
}); });
} }
void write_block_backlog ( ) {
try {
ilog ("write loop sending backlog ");
if (out_sync_state.size() > 0) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
auto ss = out_sync_state.begin();
for (uint32_t num = ss->last + 1;
num <= ss->end_block; num++) {
fc::optional<signed_block> sb = cc.fetch_block_by_number(num);
if (sb) {
send( *sb );
}
ss.get_node()->value().last = num;
}
out_sync_state.erase(0);
}
} catch ( ... ) {
wlog( "write loop exception" );
}
}
}; // class connection }; // class connection
...@@ -162,7 +189,10 @@ class net_plugin_impl { ...@@ -162,7 +189,10 @@ class net_plugin_impl {
std::set< connection* > connections; std::set< connection* > connections;
bool done = false; bool done = false;
fc::optional<handshake_message> hello; fc::optional<handshake_message> hello;
std::string user_agent_name;
chain_plugin* chain_plug;
void connect( const string& ep ) { void connect( const string& ep ) {
auto host = ep.substr( 0, ep.find(':') ); auto host = ep.substr( 0, ep.find(':') );
...@@ -229,15 +259,24 @@ class net_plugin_impl { ...@@ -229,15 +259,24 @@ class net_plugin_impl {
} }
hello->network_version = 0; hello->network_version = 0;
// hello->chain_id = chain->get_chain_id(); chain_plug->get_chain_id(hello->chain_id);
fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size()); fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size());
#if defined( __APPLE__ )
hello->os = "osx";
#elif defined( __linux__ )
hello->os = "linux";
#elif defined( _MSC_VER )
hello->os = "win32";
#else
hello->os = "other";
#endif
hello->agent = user_agent_name;
} }
void update_handshake () { void update_handshake () {
chain_plugin* cp = app().find_plugin<chain_plugin>(); hello->last_irreversible_block_id = chain_plug->chain().get_block_id_for_num
(hello->last_irreversible_block_num = chain_plug->chain().last_irreversible_block_num());
hello->last_irreversible_block_id = cp->chain().get_block_id_for_num
(hello->last_irreversible_block_num = cp->chain().last_irreversible_block_num());
} }
void start_session( connection* con ) { void start_session( connection* con ) {
...@@ -252,8 +291,8 @@ class net_plugin_impl { ...@@ -252,8 +291,8 @@ class net_plugin_impl {
con->send( *hello ); con->send( *hello );
// con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); // con->readloop_complete = bf::async( [=](){ read_loop( con ); } );
// con->writeloop_complete = bf::async( [=](){ write_loop( con ); } ); // con->writeloop_complete = bf::async( [=](){ write_loop con ); } );
} }
void start_listen_loop() { void start_listen_loop() {
...@@ -261,7 +300,7 @@ class net_plugin_impl { ...@@ -261,7 +300,7 @@ class net_plugin_impl {
acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) { acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) {
if( !ec ) { if( !ec ) {
start_session( new connection( socket ) ); start_session( new connection( socket ) );
start_listen_loop(); start_listen_loop();
} else { } else {
elog( "Error accepting connection: ${m}", ("m", ec.message() ) ); elog( "Error accepting connection: ${m}", ("m", ec.message() ) );
} }
...@@ -288,32 +327,47 @@ class net_plugin_impl { ...@@ -288,32 +327,47 @@ class net_plugin_impl {
); );
} }
void handle_message (connection &c, handshake_message &msg) { void handle_message (connection &c, handshake_message &msg) {
ilog ("got a handshake message");
if (!hello) { if (!hello) {
init_handshake(); init_handshake();
} }
if (msg.node_id == hello->node_id) ilog ("got a handshake message");
{ if (msg.node_id == hello->node_id) {
dlog ("Self connection detected. Closing connection"); elog ("Self connection detected. Closing connection");
close(&c); close(&c);
return; return;
} }
if (msg.chain_id != hello->chain_id) if (msg.chain_id != hello->chain_id) {
{ elog ("Peer on a different chain. Closing connection");
dlog ("Peer on a different chain. Closing connection"); close (&c);
close (&c); return;
return; }
} if (msg.network_version != hello->network_version) {
if (msg.network_version != hello->network_version) elog ("Peer network id does not match ");
{ close (&c);
dlog ("Peer network id does not match "); return;
close (&c); }
return; chain_controller& cc = chain_plug->chain();
uint32_t head = cc.head_block_num ();
if ( msg.last_irreversible_block_num > head) {
uint32_t delta = msg.last_irreversible_block_num - head;
uint32_t count = connections.size();
uint32_t span = delta / count;
uint32_t lastSpan = delta - (span * (count-1));
ilog ("peer is ahead of head by ${d}, count = ${c}, span = ${s}, lastspan = ${ls} ",
("d",delta)("c",count)("s",span)("ls",lastSpan));
for (auto &cx: connections) {
if (--count == 0) {
span = lastSpan;
}
sync_state req = {head+1, head+span, 0, time_point::now() };
cx->in_sync_state.insert (req);
sync_request_message srm = {req.start_block, req.end_block };
cx->send (srm);
head += span;
} }
}
c.last_handshake = msg; c.last_handshake = msg;
} }
...@@ -328,7 +382,10 @@ class net_plugin_impl { ...@@ -328,7 +382,10 @@ class net_plugin_impl {
} }
void handle_message (connection &c, sync_request_message &msg) { void handle_message (connection &c, sync_request_message &msg) {
ilog ("got a sync request message"); ilog ("got a sync request message for blocks ${s} to ${e}", ("s",msg.start_block)("e", msg.end_block));
sync_state req = {msg.start_block,msg.end_block,0,time_point::now()};
c.out_sync_state.insert (req);
c.write_block_backlog ();
} }
void handle_message (connection &c, block_summary_message &msg) { void handle_message (connection &c, block_summary_message &msg) {
...@@ -340,9 +397,68 @@ class net_plugin_impl { ...@@ -340,9 +397,68 @@ class net_plugin_impl {
} }
void handle_message (connection &c, signed_block &msg) { void handle_message (connection &c, signed_block &msg) {
ilog ("got a signed_block"); uint32_t bn = msg.block_num();
ilog ("got a signed_block, num = ${n}", ("n", bn));
chain_controller &cc = chain_plug->chain();
if (cc.is_known_block(msg.id())) {
ilog ("block id ${id} is known", ("id", msg.id()) );
return;
}
uint32_t num = msg.block_num();
for (auto &ss: c.in_sync_state) {
if (num >= ss.end_block) {
continue;
}
const_cast<sync_state&>(ss).last = num;
break;
}
// TODO: add block to global state
} }
struct msgHandler : public fc::visitor<void> {
net_plugin_impl &impl;
connection &c;
msgHandler (net_plugin_impl &imp, connection &conn) : impl(imp), c(conn) {}
void operator()(handshake_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(peer_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(notice_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(sync_request_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(block_summary_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(SignedTransaction &msg)
{
impl.handle_message (c, msg);
}
void operator()(signed_block &msg)
{
impl.handle_message (c, msg);
}
};
void start_reading_pending_buffer( connection& c ) { void start_reading_pending_buffer( connection& c ) {
boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ), boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ),
[&]( boost::system::error_code ec, std::size_t bytes_transferred ) { [&]( boost::system::error_code ec, std::size_t bytes_transferred ) {
...@@ -353,37 +469,8 @@ class net_plugin_impl { ...@@ -353,37 +469,8 @@ class net_plugin_impl {
ilog( "received message of size: ${s}", ("s",bytes_transferred) ); ilog( "received message of size: ${s}", ("s",bytes_transferred) );
start_read_message( c ); start_read_message( c );
switch (msg.which()) { msgHandler m(*this, c);
case 0: { msg.visit(m);
handle_message (c, msg.get<handshake_message> ());
break;
}
case 1: {
handle_message (c, msg.get<peer_message> ());
break;
}
case 2: {
break;
handle_message (c, msg.get<notice_message> ());
break;
}
case 3: {
handle_message (c, msg.get<sync_request_message> ());
break;
}
case 4: {
handle_message (c, msg.get<block_summary_message>());
break;
}
case 5: {
handle_message (c, msg.get<SignedTransaction>());
break;
}
case 6: {
handle_message (c, msg.get<signed_block>());
break;
}
}
return; return;
} catch ( const fc::exception& e ) { } catch ( const fc::exception& e ) {
edump((e.to_detail_string() )); edump((e.to_detail_string() ));
...@@ -397,14 +484,6 @@ class net_plugin_impl { ...@@ -397,14 +484,6 @@ class net_plugin_impl {
} }
void write_loop( connection* c ) {
try {
c->send( handshake_message{} );
} catch ( ... ) {
wlog( "write loop exception" );
}
}
void close( connection* c ) { void close( connection* c ) {
ilog( "close ${c}", ("c",int64_t(c))); ilog( "close ${c}", ("c",int64_t(c)));
if( c->socket ) if( c->socket )
...@@ -415,7 +494,6 @@ class net_plugin_impl { ...@@ -415,7 +494,6 @@ class net_plugin_impl {
}; // class net_plugin_impl }; // class net_plugin_impl
net_plugin::net_plugin() net_plugin::net_plugin()
:my( new net_plugin_impl ) { :my( new net_plugin_impl ) {
} }
...@@ -446,6 +524,9 @@ void net_plugin::plugin_initialize( const variables_map& options ) { ...@@ -446,6 +524,9 @@ void net_plugin::plugin_initialize( const variables_map& options ) {
if( options.count( "remote-endpoint" ) ) { if( options.count( "remote-endpoint" ) ) {
my->seed_nodes = options.at( "remote-endpoint" ).as< vector<string> >(); my->seed_nodes = options.at( "remote-endpoint" ).as< vector<string> >();
} }
my->user_agent_name = "EOS Test Agent";
my->chain_plug = app().find_plugin<chain_plugin>();
} }
void net_plugin::plugin_startup() { void net_plugin::plugin_startup() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册