diff --git a/libraries/chain/CMakeLists.txt b/libraries/chain/CMakeLists.txt index dfed91fb57a792c3d486a255d8ba8cd406d35f7a..00c20e7999a28454d484fd5fe7b413b9e5b201ce 100644 --- a/libraries/chain/CMakeLists.txt +++ b/libraries/chain/CMakeLists.txt @@ -23,7 +23,7 @@ add_library( eos_chain target_link_libraries( eos_chain fc chainbase eos_types Logging IR WAST WASM Runtime ) target_include_directories( eos_chain - PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" "${CMAKE_CURRENT_SOURCE_DIR}/../wasm-jit/Include" ) diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index 5f40cd8659dfcb9c3f2094215cdf3b93163523b7..201896c2fd92066db4fa573220193c85ac0732a8 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -20,6 +20,7 @@ using chain::fork_database; using chain::block_log; using chain::type_index; using chain::by_scope_name; +using chain::chain_id_type; class chain_plugin_impl { public: @@ -31,6 +32,7 @@ public: fc::optional fork_db; fc::optional block_logger; fc::optional chain; + chain_id_type chain_id; }; @@ -100,6 +102,7 @@ void chain_plugin::plugin_startup() { my->fork_db = fork_database(); my->block_logger = block_log(my->block_log_dir); + my->chain_id = genesis.compute_chain_id(); my->chain = chain_controller(db, *my->fork_db, *my->block_logger, initializer, native_contract::make_administrator()); @@ -140,6 +143,10 @@ bool chain_plugin::block_is_on_preferred_chain(const chain::block_id_type& block chain_controller& chain_plugin::chain() { return *my->chain; } const chain::chain_controller& chain_plugin::chain() const { return *my->chain; } + void chain_plugin::get_chain_id (chain_id_type &cid)const { + memcpy (cid.data(), my->chain_id.data(), cid.data_size()); + } + namespace chain_apis { read_only::get_info_results read_only::get_info(const read_only::get_info_params&) const { diff --git a/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp b/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp index c9edb532a292a2c89db788cd96fac3e74de355eb..0352f7ce8ecd1873ec79722407fb74312f5931a1 100644 --- a/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp +++ b/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp @@ -86,6 +86,8 @@ public: // Only call this after plugin_startup()! const chain_controller& chain() const; + void get_chain_id (chain::chain_id_type &cid) const; + private: unique_ptr my; }; @@ -97,4 +99,4 @@ FC_REFLECT(eos::chain_apis::read_only::get_info_results, (head_block_num)(head_block_id)(head_block_time)(head_block_producer) (recent_slots)(participation_rate)) FC_REFLECT(eos::chain_apis::read_only::get_block_params, (block_num_or_id)) -FC_REFLECT(eos::chain_apis::read_only::get_types_params, (account_name)) \ No newline at end of file +FC_REFLECT(eos::chain_apis::read_only::get_types_params, (account_name)) diff --git a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp index b8026f2c68495ceca652f6bcfe40c48a1bd3ce94..5651b096aa80ba02cfa2e78d8346bc9cdc36f9bb 100644 --- a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp @@ -8,9 +8,9 @@ namespace eos { struct handshake_message { int16_t network_version = 0; - fc::sha256 chain_id; ///< used to identify chain + chain_id_type chain_id; ///< used to identify chain fc::sha256 node_id; ///< used to identify peers and prevent self-connect - uint64_t last_irreversible_block_num = 0; + uint32_t last_irreversible_block_num = 0; block_id_type last_irreversible_block_id; string os; string agent; @@ -27,8 +27,8 @@ namespace eos { }; struct sync_request_message { - uint64_t start_block; - uint64_t end_block; + uint32_t start_block; + uint32_t end_block; }; struct peer_message { @@ -56,13 +56,13 @@ FC_REFLECT( eos::notice_message, (known_trx)(known_blocks) ) FC_REFLECT( eos::sync_request_message, (start_block)(end_block) ) FC_REFLECT( eos::peer_message, (peers) ) -/** +/** * Goals of Network Code 1. low latency to minimize missed blocks and potentially reduce block interval 2. minimize redundant data between blocks and transactions. 3. enable rapid sync of a new node -4. update to new boost / fc +4. update to new boost / fc @@ -90,23 +90,23 @@ State: wait for new validated block, transaction, or peer signal from network fiber } else { we assume peer is in sync mode in which case it is operating on a - request / response basis + request / response basis wait for notice of sync from the read loop } - read loop + read loop if hello message verify that peers Last Ir Block is in our state or disconnect, they are on fork verify peer network protocol if notice message update list of transactions known by remote peer - if trx message then insert into global state as unvalidated + if trx message then insert into global state as unvalidated if blk summary message then insert into global state *if* we know of all dependent transactions else close connection - + if my head block < the LIB of a peer and my head block age > block interval * round_size/2 then enter sync mode... divide the block numbers you need to fetch among peers and send fetch request diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index f067832e834f60c8b5f60fbd7a398c84efde1355..9c401f6eb38ee948b6db665e8b47fa8983618cfa 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -2,11 +2,13 @@ #include #include +#include #include #include #include #include +#include #include @@ -25,7 +27,7 @@ struct node_transaction_state { fc::time_point received; fc::time_point_sec expires; vector packed_transaction; - uint64_t block_num = -1; /// block transaction was included in + uint32_t block_num = -1; /// block transaction was included in bool validated = false; /// whether or not our node has validated it }; @@ -35,13 +37,13 @@ struct node_transaction_state { * Index by is_known, block_num, validated_time, this is the order we will broadcast * to peer. * Index by is_noticed, validated_time - * + * */ struct transaction_state { transaction_id_type id; bool is_known_by_peer = false; ///< true if we sent or received this trx to this peer or received notice from peer bool is_noticed_to_peer = false; ///< have we sent peer noitce we know it (true if we reeive from this peer) - uint64_t block_num = -1; ///< the block number the transaction was included in + uint32_t block_num = -1; ///< the block number the transaction was included in time_point validated_time; ///< infinity for unvalidated time_point requested_time; /// incase we fetch large trx }; @@ -54,7 +56,7 @@ typedef multi_index_container< > transaction_state_index; /** - * + * */ struct block_state { block_id_type id; @@ -74,9 +76,9 @@ typedef multi_index_container< * Index by start_block */ struct sync_state { - uint64_t start_block = 0; - uint64_t end_block = 0; - uint64_t last = 0; ///< last sent or received + uint32_t start_block = 0; + uint32_t end_block = 0; + uint32_t last = 0; ///< last sent or received time_point start_time;; ///< time request made or received }; @@ -84,16 +86,19 @@ struct by_start_block; typedef multi_index_container< sync_state, indexed_by< - ordered_unique< tag, member > + ordered_unique< tag, member > > > sync_request_index; class connection { - public: - connection( socket_ptr s ):socket(s){ +public: + connection( socket_ptr s ) + : socket(s) + { wlog( "created connection" ); pending_message_buffer.resize( 1024*1024*4 ); } + ~connection() { wlog( "released connection" ); } @@ -108,8 +113,7 @@ class connection { vector pending_message_buffer; handshake_message last_handshake; - - std::deque out_queue; + std::deque out_queue; void send( const net_message& m ) { out_queue.push_back( m ); @@ -118,8 +122,12 @@ class connection { } void send_next_message() { - if( !out_queue.size() ) - return; + if( !out_queue.size() ) { + if (out_sync_state.size() > 0) { + write_block_backlog(); + } + return; + } auto& m = out_queue.front(); @@ -141,7 +149,30 @@ class connection { } }); } -}; + + void write_block_backlog ( ) { + try { + ilog ("write loop sending backlog "); + if (out_sync_state.size() > 0) { + chain_controller& cc = app().find_plugin()->chain(); + auto ss = out_sync_state.begin(); + for (uint32_t num = ss->last + 1; + num <= ss->end_block; num++) { + fc::optional sb = cc.fetch_block_by_number(num); + if (sb) { + send( *sb ); + } + ss.get_node()->value().last = num; + } + out_sync_state.erase(0); + } + } catch ( ... ) { + wlog( "write loop exception" ); + } + } + + +}; // class connection @@ -158,14 +189,19 @@ class net_plugin_impl { std::set< connection* > connections; bool done = false; + fc::optional hello; + std::string user_agent_name; + chain_plugin* chain_plug; + + void connect( const string& ep ) { auto host = ep.substr( 0, ep.find(':') ); auto port = ep.substr( host.size()+1, host.size() ); idump((host)(port)); - auto resolver = std::make_shared( std::ref( app().get_io_service() ) ); + auto resolver = std::make_shared( std::ref( app().get_io_service() ) ); tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() ); - resolver->async_resolve( query, + resolver->async_resolve( query, [resolver,ep,this]( const boost::system::error_code& err, tcp::resolver::iterator endpoint_itr ){ if( !err ) { connect( resolver, endpoint_itr ); @@ -216,20 +252,55 @@ class net_plugin_impl { ilog("network loop done"); } FC_CAPTURE_AND_RETHROW() } + + void init_handshake () { + if (!hello) { + hello = handshake_message(); + } + + hello->network_version = 0; + chain_plug->get_chain_id(hello->chain_id); + fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size()); +#if defined( __APPLE__ ) + hello->os = "osx"; +#elif defined( __linux__ ) + hello->os = "linux"; +#elif defined( _MSC_VER ) + hello->os = "win32"; +#else + hello->os = "other"; +#endif + hello->agent = user_agent_name; + + } + + void update_handshake () { + hello->last_irreversible_block_id = chain_plug->chain().get_block_id_for_num + (hello->last_irreversible_block_num = chain_plug->chain().last_irreversible_block_num()); + } + void start_session( connection* con ) { connections.insert( con ); start_read_message( *con ); - con->send( handshake_message{} ); - // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); - // con->writeloop_complete = bf::async( [=](){ write_loop( con ); } ); + + if (hello.valid()) { + update_handshake (); + } else { + init_handshake (); + } + + con->send( *hello ); + + // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); + // con->writeloop_complete = bf::async( [=](){ write_loop con ); } ); } void start_listen_loop() { auto socket = std::make_shared( std::ref( app().get_io_service() ) ); acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) { if( !ec ) { - start_session( new connection( socket ) ); - start_listen_loop(); + start_session( new connection( socket ) ); + start_listen_loop(); } else { elog( "Error accepting connection: ${m}", ("m", ec.message() ) ); } @@ -238,7 +309,7 @@ class net_plugin_impl { void start_read_message( connection& c ) { c.pending_message_size = 0; - boost::asio::async_read( *c.socket, boost::asio::buffer((char*)&c.pending_message_size,sizeof(c.pending_message_size)), + boost::asio::async_read( *c.socket, boost::asio::buffer((char*)&c.pending_message_size,sizeof(c.pending_message_size)), [&]( boost::system::error_code ec, std::size_t bytes_transferred ) { ilog( "read size handler..." ); if( !ec ) { @@ -255,8 +326,141 @@ class net_plugin_impl { } ); } - void start_reading_pending_buffer( connection& c ) { - boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ), + + void handle_message (connection &c, handshake_message &msg) { + if (!hello) { + init_handshake(); + } + ilog ("got a handshake message"); + if (msg.node_id == hello->node_id) { + elog ("Self connection detected. Closing connection"); + close(&c); + return; + } + if (msg.chain_id != hello->chain_id) { + elog ("Peer on a different chain. Closing connection"); + close (&c); + return; + } + if (msg.network_version != hello->network_version) { + elog ("Peer network id does not match "); + close (&c); + return; + } + chain_controller& cc = chain_plug->chain(); + uint32_t head = cc.head_block_num (); + if ( msg.last_irreversible_block_num > head) { + uint32_t delta = msg.last_irreversible_block_num - head; + uint32_t count = connections.size(); + uint32_t span = delta / count; + uint32_t lastSpan = delta - (span * (count-1)); + ilog ("peer is ahead of head by ${d}, count = ${c}, span = ${s}, lastspan = ${ls} ", + ("d",delta)("c",count)("s",span)("ls",lastSpan)); + for (auto &cx: connections) { + if (--count == 0) { + span = lastSpan; + } + sync_state req = {head+1, head+span, 0, time_point::now() }; + cx->in_sync_state.insert (req); + sync_request_message srm = {req.start_block, req.end_block }; + cx->send (srm); + head += span; + } + + } + + c.last_handshake = msg; + } + + + void handle_message (connection &c, peer_message &msg) { + ilog ("got a peer message"); + } + + void handle_message (connection &c, notice_message &msg) { + ilog ("got a notice message"); + } + + void handle_message (connection &c, sync_request_message &msg) { + ilog ("got a sync request message for blocks ${s} to ${e}", ("s",msg.start_block)("e", msg.end_block)); + sync_state req = {msg.start_block,msg.end_block,0,time_point::now()}; + c.out_sync_state.insert (req); + c.write_block_backlog (); + } + + void handle_message (connection &c, block_summary_message &msg) { + ilog ("got a block summary message"); + } + + void handle_message (connection &c, SignedTransaction &msg) { + ilog ("got a SignedTransacton"); + } + + void handle_message (connection &c, signed_block &msg) { + uint32_t bn = msg.block_num(); + ilog ("got a signed_block, num = ${n}", ("n", bn)); + chain_controller &cc = chain_plug->chain(); + + if (cc.is_known_block(msg.id())) { + ilog ("block id ${id} is known", ("id", msg.id()) ); + return; + } + uint32_t num = msg.block_num(); + for (auto &ss: c.in_sync_state) { + if (num >= ss.end_block) { + continue; + } + const_cast(ss).last = num; + break; + } + // TODO: add block to global state + } + + + struct msgHandler : public fc::visitor { + net_plugin_impl &impl; + connection &c; + msgHandler (net_plugin_impl &imp, connection &conn) : impl(imp), c(conn) {} + + void operator()(handshake_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(peer_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(notice_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(sync_request_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(block_summary_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(SignedTransaction &msg) + { + impl.handle_message (c, msg); + } + + void operator()(signed_block &msg) + { + impl.handle_message (c, msg); + } + }; + + + void start_reading_pending_buffer( connection& c ) { + boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ), [&]( boost::system::error_code ec, std::size_t bytes_transferred ) { ilog( "read buffer handler..." ); if( !ec ) { @@ -264,6 +468,9 @@ class net_plugin_impl { auto msg = fc::raw::unpack( c.pending_message_buffer ); ilog( "received message of size: ${s}", ("s",bytes_transferred) ); start_read_message( c ); + + msgHandler m(*this, c); + msg.visit(m); return; } catch ( const fc::exception& e ) { edump((e.to_detail_string() )); @@ -277,14 +484,6 @@ class net_plugin_impl { } - void write_loop( connection* c ) { - try { - c->send( handshake_message{} ); - } catch ( ... ) { - wlog( "write loop exception" ); - } - } - void close( connection* c ) { ilog( "close ${c}", ("c",int64_t(c))); if( c->socket ) @@ -293,7 +492,7 @@ class net_plugin_impl { delete c; } -}; +}; // class net_plugin_impl net_plugin::net_plugin() :my( new net_plugin_impl ) { @@ -302,7 +501,7 @@ net_plugin::net_plugin() net_plugin::~net_plugin() { } -void net_plugin::set_program_options( options_description& cli, options_description& cfg ) +void net_plugin::set_program_options( options_description& cli, options_description& cfg ) { cfg.add_options() ("listen-endpoint", bpo::value()->default_value( "127.0.0.1:9876" ), "The local IP address and port to listen for incoming connections.") @@ -317,7 +516,7 @@ void net_plugin::plugin_initialize( const variables_map& options ) { auto lipstr = options.at("listen-endpoint").as< string >(); auto fcep = fc::ip::endpoint::from_string( lipstr ); my->listen_endpoint = tcp::endpoint( boost::asio::ip::address_v4::from_string( (string)fcep.get_address() ), fcep.port() ); - + ilog( "configured net to listen on ${ep}", ("ep", fcep) ); my->acceptor.reset( new tcp::acceptor( app().get_io_service() ) ); @@ -325,9 +524,12 @@ void net_plugin::plugin_initialize( const variables_map& options ) { if( options.count( "remote-endpoint" ) ) { my->seed_nodes = options.at( "remote-endpoint" ).as< vector >(); } + + my->user_agent_name = "EOS Test Agent"; + my->chain_plug = app().find_plugin(); } -void net_plugin::plugin_startup() { + void net_plugin::plugin_startup() { // boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port); if( my->acceptor ) { my->acceptor->open(my->listen_endpoint.protocol()); @@ -353,7 +555,7 @@ try { ilog( "close connections ${s}", ("s",my->connections.size()) ); auto cons = my->connections; - for( auto con : cons ) + for( auto con : cons ) con->socket->close(); while( my->connections.size() ) {