提交 c917e3e0 编写于 作者: D Daniel Larimer 提交者: GitHub

Merge pull request #44 from pmesnier/master

Net_plugin first feature
......@@ -23,7 +23,7 @@ add_library( eos_chain
target_link_libraries( eos_chain fc chainbase eos_types Logging IR WAST WASM Runtime )
target_include_directories( eos_chain
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include"
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include"
"${CMAKE_CURRENT_SOURCE_DIR}/../wasm-jit/Include"
)
......
......@@ -20,6 +20,7 @@ using chain::fork_database;
using chain::block_log;
using chain::type_index;
using chain::by_scope_name;
using chain::chain_id_type;
class chain_plugin_impl {
public:
......@@ -31,6 +32,7 @@ public:
fc::optional<fork_database> fork_db;
fc::optional<block_log> block_logger;
fc::optional<chain_controller> chain;
chain_id_type chain_id;
};
......@@ -100,6 +102,7 @@ void chain_plugin::plugin_startup() {
my->fork_db = fork_database();
my->block_logger = block_log(my->block_log_dir);
my->chain_id = genesis.compute_chain_id();
my->chain = chain_controller(db, *my->fork_db, *my->block_logger,
initializer, native_contract::make_administrator());
......@@ -140,6 +143,10 @@ bool chain_plugin::block_is_on_preferred_chain(const chain::block_id_type& block
chain_controller& chain_plugin::chain() { return *my->chain; }
const chain::chain_controller& chain_plugin::chain() const { return *my->chain; }
void chain_plugin::get_chain_id (chain_id_type &cid)const {
memcpy (cid.data(), my->chain_id.data(), cid.data_size());
}
namespace chain_apis {
read_only::get_info_results read_only::get_info(const read_only::get_info_params&) const {
......
......@@ -86,6 +86,8 @@ public:
// Only call this after plugin_startup()!
const chain_controller& chain() const;
void get_chain_id (chain::chain_id_type &cid) const;
private:
unique_ptr<class chain_plugin_impl> my;
};
......@@ -97,4 +99,4 @@ FC_REFLECT(eos::chain_apis::read_only::get_info_results,
(head_block_num)(head_block_id)(head_block_time)(head_block_producer)
(recent_slots)(participation_rate))
FC_REFLECT(eos::chain_apis::read_only::get_block_params, (block_num_or_id))
FC_REFLECT(eos::chain_apis::read_only::get_types_params, (account_name))
\ No newline at end of file
FC_REFLECT(eos::chain_apis::read_only::get_types_params, (account_name))
......@@ -8,9 +8,9 @@ namespace eos {
struct handshake_message {
int16_t network_version = 0;
fc::sha256 chain_id; ///< used to identify chain
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
uint64_t last_irreversible_block_num = 0;
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
string os;
string agent;
......@@ -27,8 +27,8 @@ namespace eos {
};
struct sync_request_message {
uint64_t start_block;
uint64_t end_block;
uint32_t start_block;
uint32_t end_block;
};
struct peer_message {
......@@ -56,13 +56,13 @@ FC_REFLECT( eos::notice_message, (known_trx)(known_blocks) )
FC_REFLECT( eos::sync_request_message, (start_block)(end_block) )
FC_REFLECT( eos::peer_message, (peers) )
/**
/**
*
Goals of Network Code
1. low latency to minimize missed blocks and potentially reduce block interval
2. minimize redundant data between blocks and transactions.
3. enable rapid sync of a new node
4. update to new boost / fc
4. update to new boost / fc
......@@ -90,23 +90,23 @@ State:
wait for new validated block, transaction, or peer signal from network fiber
} else {
we assume peer is in sync mode in which case it is operating on a
request / response basis
request / response basis
wait for notice of sync from the read loop
}
read loop
read loop
if hello message
verify that peers Last Ir Block is in our state or disconnect, they are on fork
verify peer network protocol
if notice message update list of transactions known by remote peer
if trx message then insert into global state as unvalidated
if trx message then insert into global state as unvalidated
if blk summary message then insert into global state *if* we know of all dependent transactions
else close connection
if my head block < the LIB of a peer and my head block age > block interval * round_size/2 then
enter sync mode...
divide the block numbers you need to fetch among peers and send fetch request
......
......@@ -2,11 +2,13 @@
#include <eos/net_plugin/net_plugin.hpp>
#include <eos/net_plugin/protocol.hpp>
#include <eos/chain/chain_controller.hpp>
#include <fc/network/ip.hpp>
#include <fc/io/raw.hpp>
#include <fc/container/flat.hpp>
#include <fc/reflect/variant.hpp>
#include <fc/crypto/rand.hpp>
#include <boost/asio/ip/tcp.hpp>
......@@ -25,7 +27,7 @@ struct node_transaction_state {
fc::time_point received;
fc::time_point_sec expires;
vector<char> packed_transaction;
uint64_t block_num = -1; /// block transaction was included in
uint32_t block_num = -1; /// block transaction was included in
bool validated = false; /// whether or not our node has validated it
};
......@@ -35,13 +37,13 @@ struct node_transaction_state {
* Index by is_known, block_num, validated_time, this is the order we will broadcast
* to peer.
* Index by is_noticed, validated_time
*
*
*/
struct transaction_state {
transaction_id_type id;
bool is_known_by_peer = false; ///< true if we sent or received this trx to this peer or received notice from peer
bool is_noticed_to_peer = false; ///< have we sent peer noitce we know it (true if we reeive from this peer)
uint64_t block_num = -1; ///< the block number the transaction was included in
uint32_t block_num = -1; ///< the block number the transaction was included in
time_point validated_time; ///< infinity for unvalidated
time_point requested_time; /// incase we fetch large trx
};
......@@ -54,7 +56,7 @@ typedef multi_index_container<
> transaction_state_index;
/**
*
*
*/
struct block_state {
block_id_type id;
......@@ -74,9 +76,9 @@ typedef multi_index_container<
* Index by start_block
*/
struct sync_state {
uint64_t start_block = 0;
uint64_t end_block = 0;
uint64_t last = 0; ///< last sent or received
uint32_t start_block = 0;
uint32_t end_block = 0;
uint32_t last = 0; ///< last sent or received
time_point start_time;; ///< time request made or received
};
......@@ -84,16 +86,19 @@ struct by_start_block;
typedef multi_index_container<
sync_state,
indexed_by<
ordered_unique< tag<by_start_block>, member<sync_state, uint64_t, &sync_state::start_block > >
ordered_unique< tag<by_start_block>, member<sync_state, uint32_t, &sync_state::start_block > >
>
> sync_request_index;
class connection {
public:
connection( socket_ptr s ):socket(s){
public:
connection( socket_ptr s )
: socket(s)
{
wlog( "created connection" );
pending_message_buffer.resize( 1024*1024*4 );
}
~connection() {
wlog( "released connection" );
}
......@@ -108,8 +113,7 @@ class connection {
vector<char> pending_message_buffer;
handshake_message last_handshake;
std::deque<net_message> out_queue;
std::deque<net_message> out_queue;
void send( const net_message& m ) {
out_queue.push_back( m );
......@@ -118,8 +122,12 @@ class connection {
}
void send_next_message() {
if( !out_queue.size() )
return;
if( !out_queue.size() ) {
if (out_sync_state.size() > 0) {
write_block_backlog();
}
return;
}
auto& m = out_queue.front();
......@@ -141,7 +149,30 @@ class connection {
}
});
}
};
void write_block_backlog ( ) {
try {
ilog ("write loop sending backlog ");
if (out_sync_state.size() > 0) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
auto ss = out_sync_state.begin();
for (uint32_t num = ss->last + 1;
num <= ss->end_block; num++) {
fc::optional<signed_block> sb = cc.fetch_block_by_number(num);
if (sb) {
send( *sb );
}
ss.get_node()->value().last = num;
}
out_sync_state.erase(0);
}
} catch ( ... ) {
wlog( "write loop exception" );
}
}
}; // class connection
......@@ -158,14 +189,19 @@ class net_plugin_impl {
std::set< connection* > connections;
bool done = false;
fc::optional<handshake_message> hello;
std::string user_agent_name;
chain_plugin* chain_plug;
void connect( const string& ep ) {
auto host = ep.substr( 0, ep.find(':') );
auto port = ep.substr( host.size()+1, host.size() );
idump((host)(port));
auto resolver = std::make_shared<tcp::resolver>( std::ref( app().get_io_service() ) );
auto resolver = std::make_shared<tcp::resolver>( std::ref( app().get_io_service() ) );
tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() );
resolver->async_resolve( query,
resolver->async_resolve( query,
[resolver,ep,this]( const boost::system::error_code& err, tcp::resolver::iterator endpoint_itr ){
if( !err ) {
connect( resolver, endpoint_itr );
......@@ -216,20 +252,55 @@ class net_plugin_impl {
ilog("network loop done");
} FC_CAPTURE_AND_RETHROW() }
void init_handshake () {
if (!hello) {
hello = handshake_message();
}
hello->network_version = 0;
chain_plug->get_chain_id(hello->chain_id);
fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size());
#if defined( __APPLE__ )
hello->os = "osx";
#elif defined( __linux__ )
hello->os = "linux";
#elif defined( _MSC_VER )
hello->os = "win32";
#else
hello->os = "other";
#endif
hello->agent = user_agent_name;
}
void update_handshake () {
hello->last_irreversible_block_id = chain_plug->chain().get_block_id_for_num
(hello->last_irreversible_block_num = chain_plug->chain().last_irreversible_block_num());
}
void start_session( connection* con ) {
connections.insert( con );
start_read_message( *con );
con->send( handshake_message{} );
// con->readloop_complete = bf::async( [=](){ read_loop( con ); } );
// con->writeloop_complete = bf::async( [=](){ write_loop( con ); } );
if (hello.valid()) {
update_handshake ();
} else {
init_handshake ();
}
con->send( *hello );
// con->readloop_complete = bf::async( [=](){ read_loop( con ); } );
// con->writeloop_complete = bf::async( [=](){ write_loop con ); } );
}
void start_listen_loop() {
auto socket = std::make_shared<tcp::socket>( std::ref( app().get_io_service() ) );
acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) {
if( !ec ) {
start_session( new connection( socket ) );
start_listen_loop();
start_session( new connection( socket ) );
start_listen_loop();
} else {
elog( "Error accepting connection: ${m}", ("m", ec.message() ) );
}
......@@ -238,7 +309,7 @@ class net_plugin_impl {
void start_read_message( connection& c ) {
c.pending_message_size = 0;
boost::asio::async_read( *c.socket, boost::asio::buffer((char*)&c.pending_message_size,sizeof(c.pending_message_size)),
boost::asio::async_read( *c.socket, boost::asio::buffer((char*)&c.pending_message_size,sizeof(c.pending_message_size)),
[&]( boost::system::error_code ec, std::size_t bytes_transferred ) {
ilog( "read size handler..." );
if( !ec ) {
......@@ -255,8 +326,141 @@ class net_plugin_impl {
}
);
}
void start_reading_pending_buffer( connection& c ) {
boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ),
void handle_message (connection &c, handshake_message &msg) {
if (!hello) {
init_handshake();
}
ilog ("got a handshake message");
if (msg.node_id == hello->node_id) {
elog ("Self connection detected. Closing connection");
close(&c);
return;
}
if (msg.chain_id != hello->chain_id) {
elog ("Peer on a different chain. Closing connection");
close (&c);
return;
}
if (msg.network_version != hello->network_version) {
elog ("Peer network id does not match ");
close (&c);
return;
}
chain_controller& cc = chain_plug->chain();
uint32_t head = cc.head_block_num ();
if ( msg.last_irreversible_block_num > head) {
uint32_t delta = msg.last_irreversible_block_num - head;
uint32_t count = connections.size();
uint32_t span = delta / count;
uint32_t lastSpan = delta - (span * (count-1));
ilog ("peer is ahead of head by ${d}, count = ${c}, span = ${s}, lastspan = ${ls} ",
("d",delta)("c",count)("s",span)("ls",lastSpan));
for (auto &cx: connections) {
if (--count == 0) {
span = lastSpan;
}
sync_state req = {head+1, head+span, 0, time_point::now() };
cx->in_sync_state.insert (req);
sync_request_message srm = {req.start_block, req.end_block };
cx->send (srm);
head += span;
}
}
c.last_handshake = msg;
}
void handle_message (connection &c, peer_message &msg) {
ilog ("got a peer message");
}
void handle_message (connection &c, notice_message &msg) {
ilog ("got a notice message");
}
void handle_message (connection &c, sync_request_message &msg) {
ilog ("got a sync request message for blocks ${s} to ${e}", ("s",msg.start_block)("e", msg.end_block));
sync_state req = {msg.start_block,msg.end_block,0,time_point::now()};
c.out_sync_state.insert (req);
c.write_block_backlog ();
}
void handle_message (connection &c, block_summary_message &msg) {
ilog ("got a block summary message");
}
void handle_message (connection &c, SignedTransaction &msg) {
ilog ("got a SignedTransacton");
}
void handle_message (connection &c, signed_block &msg) {
uint32_t bn = msg.block_num();
ilog ("got a signed_block, num = ${n}", ("n", bn));
chain_controller &cc = chain_plug->chain();
if (cc.is_known_block(msg.id())) {
ilog ("block id ${id} is known", ("id", msg.id()) );
return;
}
uint32_t num = msg.block_num();
for (auto &ss: c.in_sync_state) {
if (num >= ss.end_block) {
continue;
}
const_cast<sync_state&>(ss).last = num;
break;
}
// TODO: add block to global state
}
struct msgHandler : public fc::visitor<void> {
net_plugin_impl &impl;
connection &c;
msgHandler (net_plugin_impl &imp, connection &conn) : impl(imp), c(conn) {}
void operator()(handshake_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(peer_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(notice_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(sync_request_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(block_summary_message &msg)
{
impl.handle_message (c, msg);
}
void operator()(SignedTransaction &msg)
{
impl.handle_message (c, msg);
}
void operator()(signed_block &msg)
{
impl.handle_message (c, msg);
}
};
void start_reading_pending_buffer( connection& c ) {
boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ),
[&]( boost::system::error_code ec, std::size_t bytes_transferred ) {
ilog( "read buffer handler..." );
if( !ec ) {
......@@ -264,6 +468,9 @@ class net_plugin_impl {
auto msg = fc::raw::unpack<net_message>( c.pending_message_buffer );
ilog( "received message of size: ${s}", ("s",bytes_transferred) );
start_read_message( c );
msgHandler m(*this, c);
msg.visit(m);
return;
} catch ( const fc::exception& e ) {
edump((e.to_detail_string() ));
......@@ -277,14 +484,6 @@ class net_plugin_impl {
}
void write_loop( connection* c ) {
try {
c->send( handshake_message{} );
} catch ( ... ) {
wlog( "write loop exception" );
}
}
void close( connection* c ) {
ilog( "close ${c}", ("c",int64_t(c)));
if( c->socket )
......@@ -293,7 +492,7 @@ class net_plugin_impl {
delete c;
}
};
}; // class net_plugin_impl
net_plugin::net_plugin()
:my( new net_plugin_impl ) {
......@@ -302,7 +501,7 @@ net_plugin::net_plugin()
net_plugin::~net_plugin() {
}
void net_plugin::set_program_options( options_description& cli, options_description& cfg )
void net_plugin::set_program_options( options_description& cli, options_description& cfg )
{
cfg.add_options()
("listen-endpoint", bpo::value<string>()->default_value( "127.0.0.1:9876" ), "The local IP address and port to listen for incoming connections.")
......@@ -317,7 +516,7 @@ void net_plugin::plugin_initialize( const variables_map& options ) {
auto lipstr = options.at("listen-endpoint").as< string >();
auto fcep = fc::ip::endpoint::from_string( lipstr );
my->listen_endpoint = tcp::endpoint( boost::asio::ip::address_v4::from_string( (string)fcep.get_address() ), fcep.port() );
ilog( "configured net to listen on ${ep}", ("ep", fcep) );
my->acceptor.reset( new tcp::acceptor( app().get_io_service() ) );
......@@ -325,9 +524,12 @@ void net_plugin::plugin_initialize( const variables_map& options ) {
if( options.count( "remote-endpoint" ) ) {
my->seed_nodes = options.at( "remote-endpoint" ).as< vector<string> >();
}
my->user_agent_name = "EOS Test Agent";
my->chain_plug = app().find_plugin<chain_plugin>();
}
void net_plugin::plugin_startup() {
void net_plugin::plugin_startup() {
// boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port);
if( my->acceptor ) {
my->acceptor->open(my->listen_endpoint.protocol());
......@@ -353,7 +555,7 @@ try {
ilog( "close connections ${s}", ("s",my->connections.size()) );
auto cons = my->connections;
for( auto con : cons )
for( auto con : cons )
con->socket->close();
while( my->connections.size() ) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册