From 4abc2b6ef17f3892f2950f01a89c49a3b20832e2 Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Thu, 15 Feb 2018 17:53:44 -0600 Subject: [PATCH] #1329, dawn-567, bring back subsequent work done on the dawn 2.x branch, and add support for packed messages. block summary support to follow. --- libraries/chain/chain_controller.cpp | 55 +- .../include/eosio/chain/chain_controller.hpp | 8 +- .../include/eosio/net_plugin/protocol.hpp | 4 +- plugins/net_plugin/net_plugin.cpp | 533 +++++++++--------- 4 files changed, 284 insertions(+), 316 deletions(-) diff --git a/libraries/chain/chain_controller.cpp b/libraries/chain/chain_controller.cpp index b70a606e1..94bd99148 100644 --- a/libraries/chain/chain_controller.cpp +++ b/libraries/chain/chain_controller.cpp @@ -48,9 +48,9 @@ uint32_t chain_controller::blocks_per_round()const { } chain_controller::chain_controller( const chain_controller::controller_config& cfg ) -:_db( cfg.shared_memory_dir, - (cfg.read_only ? database::read_only : database::read_write), - cfg.shared_memory_size), +:_db( cfg.shared_memory_dir, + (cfg.read_only ? database::read_only : database::read_write), + cfg.shared_memory_size), _block_log(cfg.block_log_dir), _limits(cfg.limits) { @@ -154,7 +154,7 @@ std::vector chain_controller::get_block_ids_on_fork(block_id_type */ void chain_controller::push_block(const signed_block& new_block, uint32_t skip) { try { - with_skip_flags( skip, [&](){ + with_skip_flags( skip, [&](){ return without_pending_transactions( [&]() { return _db.with_write_lock( [&]() { return _push_block(new_block); @@ -255,11 +255,10 @@ transaction_trace chain_controller::push_transaction(const packed_transaction& t transaction_trace chain_controller::_push_transaction(const packed_transaction& trx) { transaction_metadata mtrx( trx, get_chain_id(), head_block_time()); check_transaction_authorization(mtrx.trx(), trx.signatures); - auto result = _push_transaction(std::move(mtrx)); // notify anyone listening to pending transactions - on_pending_transaction(trx); + on_pending_transaction(_pending_transaction_metas.back(), trx); _pending_block->input_transactions.emplace_back(trx); @@ -458,11 +457,11 @@ signed_block chain_controller::generate_block( }); } FC_CAPTURE_AND_RETHROW( (when) ) } -signed_block chain_controller::_generate_block( block_timestamp_type when, - account_name producer, +signed_block chain_controller::_generate_block( block_timestamp_type when, + account_name producer, const private_key_type& block_signing_key ) { try { - + try { uint32_t skip = _skip_flags; uint32_t slot_num = get_slot_at_time( when ); @@ -586,7 +585,7 @@ void chain_controller::__apply_block(const signed_block& next_block) uint32_t skip = _skip_flags; /* - FC_ASSERT((skip & skip_merkle_check) + FC_ASSERT((skip & skip_merkle_check) || next_block.transaction_merkle_root == next_block.calculate_merkle_root(), "", ("next_block.transaction_merkle_root", next_block.transaction_merkle_root) ("calc",next_block.calculate_merkle_root())("next_block",next_block)("id",next_block.id())); @@ -771,13 +770,13 @@ void chain_controller::check_authorization( const vector& actions, if (!allow_unused_signatures && (_skip_flags & skip_transaction_signatures) == false) EOS_ASSERT(checker.all_keys_used(), tx_irrelevant_sig, - "transaction bears irrelevant signatures from these keys: ${keys}", + "transaction bears irrelevant signatures from these keys: ${keys}", ("keys", checker.unused_keys())); } void chain_controller::check_transaction_authorization(const transaction& trx, const vector& signatures, - bool allow_unused_signatures)const + bool allow_unused_signatures)const { check_authorization( trx.actions, trx.get_signature_keys( signatures, chain_id_type{} ), allow_unused_signatures ); } @@ -819,7 +818,7 @@ void chain_controller::validate_uniqueness( const transaction& trx )const { void chain_controller::record_transaction(const transaction& trx) { //Insert transaction into unique transactions database. _db.create([&](transaction_object& transaction) { - transaction.trx_id = trx.id(); + transaction.trx_id = trx.id(); transaction.expiration = trx.expiration; }); } @@ -836,8 +835,8 @@ void chain_controller::validate_tapos(const transaction& trx)const { ("tapos_summary", tapos_block_summary)); } -void chain_controller::validate_referenced_accounts( const transaction& trx )const -{ try { +void chain_controller::validate_referenced_accounts( const transaction& trx )const +{ try { for( const auto& act : trx.actions ) { require_account(act.account); for (const auto& auth : act.authorization ) @@ -890,7 +889,7 @@ const producer_object& chain_controller::validate_block_header(uint32_t skip, co EOS_ASSERT(!next_block.new_producers, block_validate_exception, "Producer changes may only occur at the end of a round."); } - + const producer_object& producer = get_producer(get_scheduled_producer(get_slot_at_time(next_block.timestamp))); if(!(skip&skip_producer_signature)) @@ -904,7 +903,7 @@ const producer_object& chain_controller::validate_block_header(uint32_t skip, co ("block producer",next_block.producer)("scheduled producer",producer.owner)); } - + return producer; } @@ -917,7 +916,7 @@ void chain_controller::create_block_summary(const signed_block& next_block) { /** * Takes the top config::producer_count producers by total vote excluding any producer whose - * block_signing_key is null. + * block_signing_key is null. */ producer_schedule_type chain_controller::_calculate_producer_schedule()const { producer_schedule_type schedule = get_global_properties().new_active_producers; @@ -934,7 +933,7 @@ producer_schedule_type chain_controller::_calculate_producer_schedule()const { */ const shared_producer_schedule_type& chain_controller::_head_producer_schedule()const { const auto& gpo = get_global_properties(); - if( gpo.pending_active_producers.size() ) + if( gpo.pending_active_producers.size() ) return gpo.pending_active_producers.back().second; return gpo.active_producers; } @@ -973,13 +972,13 @@ void chain_controller::update_global_properties(const signed_block& b) { try { active_producers_authority.accounts.push_back({{name.producer_name, config::active_name}, 1}); } - auto& po = _db.get( boost::make_tuple(config::producers_account_name, + auto& po = _db.get( boost::make_tuple(config::producers_account_name, config::active_name ) ); _db.modify(po,[active_producers_authority] (permission_object& po) { po.auth = active_producers_authority; }); } -} FC_CAPTURE_AND_RETHROW() } +} FC_CAPTURE_AND_RETHROW() } void chain_controller::add_checkpoints( const flat_map& checkpts ) { for (const auto& i : checkpts) @@ -1019,12 +1018,12 @@ account_name chain_controller::head_block_producer() const { return {}; } -const producer_object& chain_controller::get_producer(const account_name& owner_name) const +const producer_object& chain_controller::get_producer(const account_name& owner_name) const { try { return _db.get(owner_name); } FC_CAPTURE_AND_RETHROW( (owner_name) ) } -const permission_object& chain_controller::get_permission( const permission_level& level )const +const permission_object& chain_controller::get_permission( const permission_level& level )const { try { return _db.get( boost::make_tuple(level.actor,level.permission) ); } FC_CAPTURE_AND_RETHROW( (level) ) } @@ -1167,7 +1166,7 @@ ProducerRound chain_controller::calculate_next_round(const signed_block& next_bl EOS_ASSERT(boost::range::equal(next_block.producer_changes, changes), block_validate_exception, "Unexpected round changes in new block header", ("expected changes", changes)("block changes", next_block.producer_changes)); - + fc::time_point tp = (fc::time_point)next_block.timestamp; utilities::rand::random rng(tp.sec_since_epoch()); rng.shuffle(schedule); @@ -1229,7 +1228,7 @@ void chain_controller::update_global_dynamic_data(const signed_block& b) { dgp.recent_slots_filled = uint64_t(-1); else dgp.recent_slots_filled = 0; - dgp.block_merkle_root.append( head_block_id() ); + dgp.block_merkle_root.append( head_block_id() ); }); _fork_db.set_max_size( _dgp.head_block_number - _dgp.last_irreversible_block_num + 1 ); @@ -1255,7 +1254,7 @@ void chain_controller::update_last_irreversible_block() vector producer_objs; producer_objs.reserve(gpo.active_producers.producers.size()); - std::transform(gpo.active_producers.producers.begin(), + std::transform(gpo.active_producers.producers.begin(), gpo.active_producers.producers.end(), std::back_inserter(producer_objs), [this](const producer_key& pk) { return &get_producer(pk.producer_name); }); @@ -1569,7 +1568,7 @@ void chain_controller::update_usage( transaction_metadata& meta, uint32_t act_us uint128_t used_uacts = buo.acts.value; uint128_t virtual_max_ubytes = dgpo.virtual_net_bandwidth * config::rate_limiting_precision; uint128_t virtual_max_uacts = dgpo.virtual_act_bandwidth * config::rate_limiting_precision; - + if( !(_skip_flags & genesis_setup) ) { #warning TODO: restore bandwidth checks /* setting of bandwidth currently not implemented @@ -1618,7 +1617,7 @@ const apply_handler* chain_controller::find_apply_handler( account_name receiver auto native_handler_scope = _apply_handlers.find( receiver ); if( native_handler_scope != _apply_handlers.end() ) { auto handler = native_handler_scope->second.find( make_pair( scope, act ) ); - if( handler != native_handler_scope->second.end() ) + if( handler != native_handler_scope->second.end() ) return &handler->second; } return nullptr; diff --git a/libraries/chain/include/eosio/chain/chain_controller.hpp b/libraries/chain/include/eosio/chain/chain_controller.hpp index 09b17509f..bce7ce374 100644 --- a/libraries/chain/include/eosio/chain/chain_controller.hpp +++ b/libraries/chain/include/eosio/chain/chain_controller.hpp @@ -110,7 +110,7 @@ namespace eosio { namespace chain { * This signal is emitted any time a new transaction is added to the pending * block state. */ - signal on_pending_transaction; + signal on_pending_transaction; @@ -128,7 +128,7 @@ namespace eosio { namespace chain { */ bool is_applying_block()const { return _currently_applying_block; } bool is_start_of_round( block_num_type n )const; - uint32_t blocks_per_round()const; + uint32_t blocks_per_round()const; chain_id_type get_chain_id()const { return chain_id_type(); } /// TODO: make this hash of constitution @@ -215,7 +215,7 @@ namespace eosio { namespace chain { clear_pending(); /** after applying f() push previously input transactions on top */ - auto on_exit = fc::make_scoped_exit( [&](){ + auto on_exit = fc::make_scoped_exit( [&](){ for( auto& t : old_input ) { try { if (!is_known_transaction(t.id)) @@ -366,7 +366,7 @@ namespace eosio { namespace chain { validate_tapos(trx); } FC_CAPTURE_AND_RETHROW( (trx) ) } - + /// Validate transaction helpers @{ void validate_uniqueness(const transaction& trx)const; void validate_tapos(const transaction& trx)const; diff --git a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp index a929fc296..f5c5d3489 100644 --- a/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eosio/net_plugin/protocol.hpp @@ -156,8 +156,8 @@ namespace eosio { request_message, sync_request_message, block_summary_message, - signed_transaction, - signed_block>; + signed_block, + packed_transaction>; } // namespace eosio diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index aa07ef833..7452dd023 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -61,29 +61,20 @@ namespace eosio { struct node_transaction_state { transaction_id_type id; - fc::time_point received; - fc::time_point_sec expires; - fc::time_point_sec true_expires; - vector packed_transaction; /// the received raw bundle - uint32_t block_num = -1; /// block transaction was included in - bool validated = false; /// whether or not our node has validated it - }; - - struct update_block_num { - uint16 new_bnum; - update_block_num(uint16 bnum) : new_bnum(bnum) {} - void operator() (node_transaction_state& nts) { - nts.block_num = static_cast(new_bnum); - } + time_point_sec expires; /// time after which this may be purged. + /// Expires increased while the txn is + /// "in flight" to anoher peer + vector packed_transaction; /// the received raw bundle + uint32_t block_num = 0; /// block transaction was included in + uint32_t true_block = 0; /// used to reset block_uum when request is 0 + uint16_t requests = 0; /// the number of "in flight" requests for this txn }; struct update_entry { - const signed_transaction &txn; - update_entry(const signed_transaction &msg) : txn(msg) {} + const packed_transaction &txn; + update_entry(const packed_transaction &msg) : txn(msg) {} void operator() (node_transaction_state& nts) { - nts.received = fc::time_point::now(); - nts.validated = true; net_message msg(txn); uint32_t packsiz = fc::raw::pack_size(msg); uint32_t bufsiz = packsiz + sizeof(packsiz); @@ -100,8 +91,16 @@ namespace eosio { void operator() (node_transaction_state& nts) { int32_t exp = nts.expires.sec_since_epoch(); nts.expires = fc::time_point_sec (exp + incr * 60); + if( nts.requests == 0 ) { + nts.true_block = nts.block_num; + nts.block_num = 0; + } + nts.requests += incr; + if( nts.requests == 0 ) { + nts.block_num = nts.true_block; + } } - }; + } incr_in_flight(1), decr_in_flight(-1); struct by_expiry; struct by_block_num; @@ -156,7 +155,7 @@ namespace eosio { std::set< connection_ptr > connections; bool done = false; unique_ptr< sync_manager > sync_master; - unique_ptr< big_msg_manager > bm_master; + unique_ptr< big_msg_manager > big_msg_master; unique_ptr connector_check; unique_ptr transaction_check; @@ -194,7 +193,7 @@ namespace eosio { template void send_all( const net_message &msg, VerifierFunc verify ); - static void transaction_ready( const packed_transaction& txn); + static void transaction_ready( const transaction_metadata&, const packed_transaction& txn); void broadcast_block_impl( const signed_block &sb); bool is_valid( const handshake_message &msg); @@ -220,7 +219,7 @@ namespace eosio { void handle_message( connection_ptr c, const request_message &msg); void handle_message( connection_ptr c, const sync_request_message &msg); void handle_message( connection_ptr c, const block_summary_message &msg); - void handle_message( connection_ptr c, const signed_transaction &msg); + void handle_message( connection_ptr c, const packed_transaction &msg); void handle_message( connection_ptr c, const signed_block &msg); void start_conn_timer( ); @@ -300,9 +299,24 @@ namespace eosio { transaction_id_type id; bool is_known_by_peer = false; ///< true if we sent or received this trx to this peer or received notice from peer bool is_noticed_to_peer = false; ///< have we sent peer notice we know it (true if we receive from this peer) - uint32_t block_num = -1; ///< the block number the transaction was included in - time_point validated_time; ///< infinity for unvalidated - time_point requested_time; /// incase we fetch large trx + uint32_t block_num = 0; ///< the block number the transaction was included in + time_point requested_time; /// in case we fetch large trx + }; + + struct update_block_num { + uint32_t new_bnum; + update_block_num(uint32_t bnum) : new_bnum(bnum) {} + void operator() (node_transaction_state& nts) { + if (nts.requests ) { + nts.true_block = new_bnum; + } + else { + nts.block_num = new_bnum; + } + } + void operator() (transaction_state& ts) { + ts.block_num = new_bnum; + } }; typedef multi_index_container< @@ -318,10 +332,19 @@ namespace eosio { struct block_state { block_id_type id; bool is_known; - bool is_noticed_to_peer; + bool is_noticed; time_point requested_time; }; + struct update_request_time { + void operator() (struct transaction_state &ts) { + ts.requested_time = time_point::now(); + } + void operator () (struct block_state &bs) { + bs.requested_time = time_point::now(); + } + } set_request_time; + typedef multi_index_container< block_state, indexed_by< @@ -330,11 +353,14 @@ namespace eosio { > block_state_index; - struct make_known { + struct update_known_by_peer { void operator() (block_state& bs) { bs.is_known = true; } - }; + void operator() (transaction_state& ts) { + ts.is_known_by_peer = true; + } + } set_is_known; /** * Index by start_block_num @@ -372,9 +398,6 @@ namespace eosio { message_buffer<1024*1024> pending_message_buffer; vector blk_buffer; - deque< transaction_id_type > txn_queue; - bool halt_txn_send; - struct queued_write { std::shared_ptr> buff; std::function cb; @@ -385,7 +408,6 @@ namespace eosio { handshake_message last_handshake_recv; handshake_message last_handshake_sent; int16_t sent_handshake_count; - deque out_queue; bool connecting; bool syncing; int write_depth; @@ -463,13 +485,12 @@ namespace eosio { void blk_send(const vector &txn_lis); void stop_send(); - void enqueue( const net_message &msg ); + void enqueue( transaction_id_type id ); + void enqueue( const net_message &msg, bool trigger_send = true ); void cancel_sync(go_away_reason); void cancel_fetch(); void flush_queues(); bool enqueue_sync_block(); - void send_next_message(); - void send_next_txn(); void cancel_wait(); void sync_wait(); @@ -477,7 +498,9 @@ namespace eosio { void sync_timeout(boost::system::error_code ec); void fetch_timeout(boost::system::error_code ec); - void queue_write(std::shared_ptr> buff, std::function cb); + void queue_write(std::shared_ptr> buff, + bool trigger_send, + std::function cb); void do_queue_write(); /** \brief Process the next message from the pending message buffer @@ -545,17 +568,18 @@ namespace eosio { class big_msg_manager { public: uint32_t just_send_it_max; + connection_ptr pending_txn_source; vector req_blks; vector req_txn; - bool requested_blk (block_id_type bid); - bool requested_txn (transaction_id_type tid); void bcast_summary (const block_summary_message& msg); - void bcast_block (const signed_block& msg,connection_ptr skip = connection_ptr() ); - void bcast_transaction (const signed_transaction& msg, connection_ptr skip = connection_ptr() ); - void bcast_transaction (const packed_transaction& msg, connection_ptr skip = connection_ptr() ); + void bcast_block (const signed_block& msg, connection_ptr skip = connection_ptr()); + void bcast_transaction (const transaction_id_type& id, + time_point_sec expiration, + const packed_transaction& msg); + void bcast_rejected_transaction (const packed_transaction& msg); void recv_block (connection_ptr conn, const signed_block& msg); - void recv_transaction (connection_ptr conn, const signed_transaction& msg); + void recv_transaction(connection_ptr c); void recv_notice (connection_ptr conn, const notice_message& msg); static const fc::string logger_name; @@ -582,7 +606,6 @@ namespace eosio { last_handshake_recv(), last_handshake_sent(), sent_handshake_count(0), - out_queue(), connecting(false), syncing(false), write_depth(0), @@ -606,7 +629,6 @@ namespace eosio { last_handshake_recv(), last_handshake_sent(), sent_handshake_count(0), - out_queue(), connecting(true), syncing(false), write_depth(0), @@ -649,13 +671,6 @@ namespace eosio { } void connection::flush_queues() { - if (write_depth > 1) { - while (out_queue.size() > 1) { - out_queue.pop_back(); - } - } else { - out_queue.clear(); - } if (write_depth > 0) { while (write_queue.size() > 1) { write_queue.pop_back(); @@ -685,17 +700,22 @@ namespace eosio { } void connection::txn_send_pending(const vector &ids) { - for(auto t : my_impl->local_txns){ - if(t.packed_transaction.size()) { + for(auto tx = my_impl->local_txns.begin(); tx != my_impl->local_txns.end(); ++tx ){ + if(tx->packed_transaction.size() && tx->block_num == 0) { bool found = false; - for(auto l : ids) { - if( l == t.id) { + for(auto known : ids) { + if( known == tx->id) { found = true; break; } } if(!found) { - txn_queue.push_back( t.id ); + my_impl->local_txns.modify(tx,incr_in_flight); + queue_write(std::make_shared>(tx->packed_transaction), + true, + [this, tx](boost::system::error_code ec, std::size_t ) { + my_impl->local_txns.modify(tx, decr_in_flight); + }); } } } @@ -703,10 +723,14 @@ namespace eosio { void connection::txn_send(const vector &ids) { for(auto t : ids) { - auto n = my_impl->local_txns.get().find(t); - if(n != my_impl->local_txns.end() && - n->packed_transaction.size()) { - txn_queue.push_back( t ); + auto tx = my_impl->local_txns.get().find(t); + if( tx != my_impl->local_txns.end() && tx->packed_transaction.size()) { + my_impl->local_txns.modify( tx,incr_in_flight); + queue_write(std::make_shared>(tx->packed_transaction), + true, + [this, tx](boost::system::error_code ec, std::size_t ) { + my_impl->local_txns.modify(tx, decr_in_flight); + }); } } } @@ -778,6 +802,7 @@ namespace eosio { try { optional b = cc.fetch_block_by_id(blkid); if(b) { + fc_dlog(logger,"found block for id at num ${n}",("n",b->block_num())); enqueue(*b); } else { @@ -797,11 +822,11 @@ namespace eosio { break; } } + } void connection::stop_send() { syncing = false; - txn_queue.clear(); } void connection::send_handshake( ) { @@ -838,16 +863,11 @@ namespace eosio { enqueue(xpkt); } - void connection::enqueue( const net_message &m ) { - out_queue.push_back( m ); - if( out_queue.size() == 1 ) { - send_next_message(); - } - } - - void connection::queue_write(std::shared_ptr> buff, std::function cb) { + void connection::queue_write(std::shared_ptr> buff, + bool trigger_send, + std::function cb) { write_queue.push_back({buff, cb}); - if(write_queue.size() == 1) + if(write_queue.size() == 1 && trigger_send) do_queue_write(); } @@ -879,6 +899,7 @@ namespace eosio { return; } conn->write_queue.pop_front(); + conn->enqueue_sync_block(); conn->do_queue_write(); } catch(const std::exception &ex) { @@ -900,8 +921,8 @@ namespace eosio { } void connection::cancel_sync(go_away_reason reason) { - fc_dlog(logger,"cancel sync reason = ${m}, out queue size ${o} peer ${p}", - ("m",reason_str(reason)) ("o", out_queue.size())("p", peer_name())); + fc_dlog(logger,"cancel sync reason = ${m}, write queue size ${o} peer ${p}", + ("m",reason_str(reason)) ("o", write_queue.size())("p", peer_name())); cancel_wait(); flush_queues(); switch (reason) { @@ -922,15 +943,17 @@ namespace eosio { bool connection::enqueue_sync_block() { chain_controller& cc = app().find_plugin()->chain(); + if (!sync_requested) + return false; uint32_t num = ++sync_requested->last; - + bool trigger_send = num == sync_requested->start_block; if(num == sync_requested->end_block) { sync_requested.reset(); } try { fc::optional sb = cc.fetch_block_by_number(num); if(sb) { - enqueue( *sb ); + enqueue( *sb, trigger_send); return true; } } catch ( ... ) { @@ -939,21 +962,18 @@ namespace eosio { return false; } - void connection::send_next_message() { - if( !out_queue.size() ) { - if( !sync_requested || !enqueue_sync_block( ) ) { - send_next_txn(); - } - return; - } - - auto& m = out_queue.front(); + void connection::enqueue( const net_message &m, bool trigger_send ) { + bool close_after_send = false; if(m.contains()) { sync_wait( ); - } else if(m.contains()) { + } + else if(m.contains()) { pending_fetch = m.get(); fetch_wait( ); } + else { + close_after_send = m.contains(); + } uint32_t payload_size = fc::raw::pack_size( m ); char * header = reinterpret_cast(&payload_size); @@ -965,57 +985,18 @@ namespace eosio { fc::datastream ds( send_buffer->data(), buffer_size); ds.write( header, header_size ); fc::raw::pack( ds, m ); - write_depth++; - queue_write(send_buffer, - [this](boost::system::error_code ec, std::size_t ) { + queue_write(send_buffer,trigger_send, + [this, close_after_send](boost::system::error_code ec, std::size_t ) { write_depth--; - if(out_queue.size()) { - if(out_queue.front().contains()) { - elog ("sent a go away message, closing connection to ${p}",("p",peer_name())); - my_impl->close(shared_from_this()); - return; - } - out_queue.pop_front(); + if(close_after_send) { + elog ("sent a go away message, closing connection to ${p}",("p",peer_name())); + my_impl->close(shared_from_this()); + return; } - send_next_message(); }); } - void connection::send_next_txn() { - if( !txn_queue.size() || halt_txn_send) { - return; - } - - size_t limit = min(txn_queue.size(),size_t(1000)); - // we'll make this fc_dlog later - elog("sending up to ${limit} pending transactions to ${p}",("limit",limit)("p",peer_name())); - - size_t count = 0; - connection_wptr c(shared_from_this()); - for(size_t i = 0; i < limit; i++) { - transaction_id_type id = txn_queue.front(); - const auto &tx = my_impl->local_txns.get( ).find( id ); - if( tx == my_impl->local_txns.end() || - tx->true_expires <= time_point::now() || - tx->packed_transaction.size() == 0 ) { - txn_queue.pop_front(); - continue; - } - my_impl->local_txns.modify( tx,update_in_flight(1)); - count++; - txn_queue.pop_front(); - - queue_write(std::make_shared>(tx->packed_transaction), - [this, tx](boost::system::error_code ec, std::size_t ) { - my_impl->local_txns.modify(tx, update_in_flight(-1)); - send_next_message(); - }); - } - // we'll make this fc_dlog later - elog("actually sent ${limit} pending transactions to ${p}",("limit",limit)("p",peer_name())); - } - void connection::cancel_wait() { if (response_expected) response_expected->cancel(); @@ -1399,16 +1380,17 @@ namespace eosio { } last_repeated = 0; + uint32_t blk_num = blk.block_num(); if (state == head_catchup) { fc_dlog (logger, "sync_manager in head_catchup state"); state = in_sync; + block_id_type null_id; for (auto cp : my_impl->connections) { - if (cp->fork_head == block_id_type()) { + if (cp->fork_head == null_id) { continue; } - if (cp->fork_head == blk.id() || - cp->fork_head_num < blk.block_num()) { - c->fork_head = block_id_type(); + if (cp->fork_head == blk.id() || cp->fork_head_num < blk_num) { + c->fork_head = null_id; c->fork_head_num = 0; } else { @@ -1417,14 +1399,13 @@ namespace eosio { } } else if (state == lib_catchup) { - uint32_t num = blk.block_num(); - if( num == sync_known_lib_num ) { + if( blk_num == sync_known_lib_num ) { fc_dlog( logger, "All caught up with last known last irreversible block resending handshake"); c->cancel_wait(); state = in_sync; send_handshakes(); } - else if (num == sync_last_requested_num) { + else if (blk_num == sync_last_requested_num) { request_next_chunk(); } else { @@ -1451,13 +1432,14 @@ namespace eosio { pending_notify.known_blocks.ids.push_back( bid ); pending_notify.known_trx.mode = none; if (msgsiz > just_send_it_max) { + fc_ilog(logger, "block size is ${ms}, sending notify",("ms", msgsiz)); my_impl->send_all(pending_notify, [skip, bid](connection_ptr c) -> bool { if (c == skip || !c->current()) return false; const auto& bs = c->blk_state.find(bid); bool unknown = bs == c->blk_state.end(); if (unknown) { - c->blk_state.insert(block_state({bid,false,true,fc::time_point() })); + c->blk_state.insert((block_state){bid,false,true,time_point()}); } else { elog("${p} already has knowledge of block ${b}", ("p",c->peer_name())("b",bid)); @@ -1466,56 +1448,74 @@ namespace eosio { }); } else { - block_id_type prev = sb.previous; for (auto cp : my_impl->connections) { if (cp == skip || !cp->current()) { continue; } - const auto& bs = cp->blk_state.find (prev); - if (bs != cp->blk_state.end() && !bs->is_known) { - cp->blk_state.insert(block_state({bid,false,true,fc::time_point() })); + const auto& prev = cp->blk_state.find (sb.previous); + if (prev != cp->blk_state.end() && !prev->is_known) { + cp->blk_state.insert((block_state){bid,false,true,time_point()}); cp->enqueue( pending_notify ); } else { + cp->blk_state.insert((block_state){bid,true,true,time_point()}); cp->enqueue( sb ); } } } } - void big_msg_manager::bcast_transaction (const signed_transaction& txn, connection_ptr skip) { - transaction_id_type txnid = txn.id(); + void big_msg_manager::bcast_rejected_transaction (const packed_transaction& txn) { + transaction_id_type tid = txn.get_transaction().id(); + fc_wlog(logger,"sending rejected transaction ${tid}",("tid",tid)); + bcast_transaction (tid, time_point_sec(), txn); + } + + void big_msg_manager::bcast_transaction (const transaction_id_type & txnid, + time_point_sec expiration, + const packed_transaction& txn) { + connection_ptr skip; + for (auto ref = req_txn.begin(); ref != req_txn.end(); ++ref) { + if (*ref == txnid) { + skip = pending_txn_source; + pending_txn_source.reset(); + req_txn.erase(ref); + break; + } + } + if( my_impl->local_txns.get().find( txnid ) != my_impl->local_txns.end( ) ) { //found fc_dlog(logger, "found txnid in local_txns" ); return; } - net_message msg(txn); - uint32_t packsiz = fc::raw::pack_size(msg); - uint32_t bufsiz = packsiz + sizeof(packsiz); - vector buff(bufsiz); - fc::datastream ds( buff.data(), bufsiz); - ds.write( reinterpret_cast(&packsiz), sizeof(packsiz) ); - fc::raw::pack( ds, msg ); - - uint16_t bn = static_cast(txn.ref_block_num); - node_transaction_state nts = {txnid,time_point::now(), - txn.expiration, - txn.expiration, - buff, - bn, true}; - my_impl->local_txns.insert(nts); - fc_dlog(logger, "bufsiz = ${bs} max = ${max}",("bs", bufsiz)("max", just_send_it_max)); - - if( bufsiz <= just_send_it_max) { - my_impl->send_all( txn, [skip, txnid](connection_ptr c) -> bool { - if( c == skip || c->syncing ) { + bool remember = expiration != time_point_sec(); + uint32_t packsiz = 0; + uint32_t bufsiz = 0; + if (remember) { + net_message msg(txn); + packsiz = fc::raw::pack_size(msg); + bufsiz = packsiz + sizeof(packsiz); + vector buff(bufsiz); + fc::datastream ds( buff.data(), bufsiz); + ds.write( reinterpret_cast(&packsiz), sizeof(packsiz) ); + fc::raw::pack( ds, msg ); + node_transaction_state nts = {txnid, + expiration, + buff, + 0, 0, 0}; + my_impl->local_txns.insert(nts); + } + if( !skip && bufsiz <= just_send_it_max) { + my_impl->send_all( txn, [remember,txnid](connection_ptr c) -> bool { + if( c->syncing ) { return false; } const auto& bs = c->trx_state.find(txnid); bool unknown = bs == c->trx_state.end(); if( unknown) { - c->trx_state.insert(transaction_state({txnid,true,true,(uint32_t)-1, - fc::time_point(),fc::time_point() })); + if (remember) { + c->trx_state.insert(transaction_state({txnid,true,true,0,time_point() })); + } fc_dlog(logger, "sending whole txn to ${n}", ("n",c->peer_name() ) ); } return unknown; @@ -1526,17 +1526,17 @@ namespace eosio { pending_notify.known_trx.mode = normal; pending_notify.known_trx.ids.push_back( txnid ); pending_notify.known_blocks.mode = none; - my_impl->send_all(pending_notify, [skip, txnid](connection_ptr c) -> bool { + my_impl->send_all(pending_notify, [skip, remember, txnid](connection_ptr c) -> bool { if (c == skip || c->syncing) { return false; } const auto& bs = c->trx_state.find(txnid); bool unknown = bs == c->trx_state.end(); if( unknown) { - fc_dlog(logger, "sending notice to ${n}", ("n",c->peer_name() ) ); - - c->trx_state.insert(transaction_state({txnid,false,true,(uint32_t)-1, - fc::time_point(),fc::time_point() })); + fc_ilog(logger, "sending notice to ${n}", ("n",c->peer_name() ) ); + if (remember) { + c->trx_state.insert(transaction_state({txnid,false,true,0, time_point() })); + } } return unknown; }); @@ -1544,53 +1544,31 @@ namespace eosio { } - void big_msg_manager::bcast_transaction (const packed_transaction& txn, connection_ptr skip) { - // TODO: avoid this reserialization by updating protocol to use packed_transactions directly - auto strx = signed_transaction(txn.get_transaction(), txn.signatures); - bcast_transaction( strx, skip ); - } - - bool big_msg_manager::requested_blk( block_id_type blk_id) { + void big_msg_manager::recv_block (connection_ptr c, const signed_block& msg) { + block_id_type blk_id = msg.id(); + uint32_t num = msg.block_num(); for (auto ref = req_blks.begin(); ref != req_blks.end(); ++ref) { if (*ref == blk_id) { req_blks.erase(ref); - return true; - } - } - return false; - } - - bool big_msg_manager::requested_txn( transaction_id_type txn_id) { - for (auto ref = req_txn.begin(); ref != req_txn.end(); ++ref) { - if (*ref == txn_id) { - req_txn.erase(ref); - return true; + fc_dlog(logger, "received a requested block"); + notice_message note; + note.known_blocks.mode = normal; + note.known_blocks.ids.push_back( blk_id ); + note.known_trx.mode = none; + my_impl->send_all(note, [blk_id](connection_ptr conn) -> bool { + const auto& bs = conn->blk_state.find(blk_id); + bool unknown = bs == conn->blk_state.end(); + if (unknown) { + conn->blk_state.insert(block_state({blk_id,false,true,fc::time_point() })); + } + return unknown; + }); + return; } } - return false; - } - - void big_msg_manager::recv_block (connection_ptr c, const signed_block& msg) { - block_id_type blk_id = msg.id(); - uint16_t num = msg.block_num(); - if( requested_blk(blk_id) ) { - notice_message note; - note.known_blocks.mode = normal; - note.known_blocks.ids.push_back( blk_id ); - note.known_trx.mode = none; - my_impl->send_all(note, [blk_id](connection_ptr conn) -> bool { - const auto& bs = conn->blk_state.find(blk_id); - bool unknown = bs == conn->blk_state.end(); - if (unknown) { - conn->blk_state.insert(block_state({blk_id,false,true,fc::time_point() })); - } - else { - } - return unknown; - }); - } - else if(!my_impl->sync_master->is_active(c)) { + if( !my_impl->sync_master->is_active(c) ) { + fc_dlog(logger,"got a block to forward"); net_message nmsg(msg); if(fc::raw::pack_size(nmsg) < just_send_it_max ) { fc_dlog(logger, "forwarding the signed block"); @@ -1602,42 +1580,39 @@ namespace eosio { conn->blk_state.insert( (block_state){blk_id,true,true,fc::time_point()}); sendit = true; } else if (!b->is_known) { - conn->blk_state.modify(b,make_known()); + conn->blk_state.modify(b,set_is_known); sendit = true; } } - fc_dlog(logger, "${action} block ${num} to ${c}",("action", sendit ? "sending " : "skipping ")("num",num)("c", conn->peer_name() )); + fc_dlog(logger, "${action} block ${num} to ${c}", + ("action", sendit ? "sending" : "skipping") + ("num",num) + ("c", conn->peer_name() )); return sendit; }); } + else { + notice_message note; + note.known_blocks.mode = normal; + note.known_blocks.ids.push_back( blk_id ); + note.known_trx.mode = none; + my_impl->send_all(note, [blk_id](connection_ptr conn) -> bool { + const auto& bs = conn->blk_state.find(blk_id); + bool unknown = bs == conn->blk_state.end(); + if (unknown) { + conn->blk_state.insert(block_state({blk_id,false,true,fc::time_point() })); + } + return unknown; + }); + } } else { fc_dlog(logger, "not forwarding from active syncing connection ${p}",("p",c->peer_name())); } } - void big_msg_manager::recv_transaction (connection_ptr c, const signed_transaction& msg) { - transaction_id_type txn_id = msg.id(); - if( requested_txn(txn_id) ) { - notice_message notify; - notify.known_trx.mode = normal; - notify.known_trx.ids.push_back( txn_id ); - notify.known_blocks.mode = none; - my_impl->send_all(notify, [txn_id](connection_ptr conn) -> bool { - const auto& bs = conn->trx_state.find(txn_id); - bool unknown = bs == conn->trx_state.end(); - if (unknown) { - conn->trx_state.insert(transaction_state({txn_id,false,true,(uint32_t)-1, fc::time_point(), fc::time_point() })); - } - else { - - } - return unknown; - }); - } - else { - bcast_transaction(msg, c); - } + void big_msg_manager::recv_transaction (connection_ptr c) { + pending_txn_source = c; } void big_msg_manager::recv_notice (connection_ptr c, const notice_message& msg) { @@ -1651,8 +1626,8 @@ namespace eosio { const auto &tx = my_impl->local_txns.get( ).find( t ); if( tx == my_impl->local_txns.end( ) ) { - c->trx_state.insert( ( transaction_state ){ t,true,true,( uint32_t ) - 1, - fc::time_point( ),fc::time_point( ) } ); + c->trx_state.insert( (transaction_state){t,true,true,0, + time_point()} ); req.req_trx.ids.push_back( t ); req_txn.push_back( t ); @@ -1676,10 +1651,8 @@ namespace eosio { } catch (...) { elog( "failed to retrieve block for id"); } - if(b) { - bool known=true; - bool noticed=true; - c->blk_state.insert((block_state){blkid,known,noticed,fc::time_point::now()}); + if (!b) { + c->blk_state.insert((block_state){blkid,true,true,time_point::now()}); send_req = true; req.req_blocks.ids.push_back( blkid ); req_blks.push_back( blkid ); @@ -2100,7 +2073,7 @@ namespace eosio { break; } case normal: { - bm_master->recv_notice (c, msg); + big_msg_master->recv_notice (c, msg); } } @@ -2119,7 +2092,7 @@ namespace eosio { break; } case normal : { - bm_master->recv_notice (c, msg); + big_msg_master->recv_notice (c, msg); break; } default: { @@ -2139,7 +2112,7 @@ namespace eosio { c->blk_send_branch( ); break; case normal : - fc_dlog(logger, "got a normal request_message from ${p}", ("p",c->peer_name())); + fc_dlog(logger, "got a normal block request_message from ${p}", ("p",c->peer_name())); c->blk_send(msg.req_blocks.ids); break; default:; @@ -2177,46 +2150,19 @@ namespace eosio { #endif } - - void net_plugin_impl::handle_message( connection_ptr c, const signed_transaction &msg) { + void net_plugin_impl::handle_message( connection_ptr c, const packed_transaction &msg) { fc_dlog(logger, "got a signed transaction from ${p}", ("p",c->peer_name())); - transaction_id_type txnid = msg.id(); - auto entry = local_txns.get().find( txnid ); - if(entry != local_txns.end( ) ) { - if( entry->validated ) { - fc_dlog(logger, "the txnid is known and validated, so short circuit" ); - return; - } - } if( sync_master->is_active(c) ) { fc_dlog(logger, "got a txn during sync - dropping"); return; } - if(entry != local_txns.end( ) ) { - local_txns.modify( entry, update_entry( msg ) ); - } - - auto tx = c->trx_state.find(txnid); - if( tx == c->trx_state.end()) { - c->trx_state.insert((transaction_state){txnid,true,true,(uint32_t)msg.ref_block_num, - fc::time_point(),fc::time_point()}); - } else { - struct trx_mod { - uint16 block; - trx_mod( uint16 bn) : block(bn) {} - void operator( )( transaction_state &t) { - t.is_known_by_peer = true; - t.block_num = static_cast(block); - } - }; - c->trx_state.modify(tx,trx_mod(msg.ref_block_num)); - } + big_msg_master->recv_transaction(c); try { - // TODO: avoid this reserialization by adjusting messages to deal with packed transactions. - chain_plug->accept_transaction( packed_transaction(msg) ); + chain_plug->accept_transaction( msg); fc_dlog(logger, "chain accepted transaction" ); + return; } catch( const fc::exception &ex) { // received a block due to out of sequence elog( "accept txn threw ${m}",("m",ex.to_detail_string())); @@ -2225,6 +2171,7 @@ namespace eosio { elog( " caught something attempting to accept transaction"); } + big_msg_master->bcast_rejected_transaction(msg); } void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) { @@ -2264,8 +2211,25 @@ namespace eosio { elog( "handle sync block caught something else from ${p}",("num",blk_num)("p",c->peer_name())); } + update_block_num ubn(blk_num); if( reason == no_reason ) { - bm_master->recv_block (c, msg); + for (const auto ®ion : msg.regions) { + for (const auto &cycle_sum : region.cycles_summary) { + for (const auto &shard : cycle_sum) { + for (const auto &recpt : shard.transactions) { + auto ltx = local_txns.get().find(recpt.id); + if( ltx != local_txns.end()) { + local_txns.modify( ltx, ubn ); + } + auto ctx = c->trx_state.get().find(recpt.id); + if( ctx != c->trx_state.end()) { + c->trx_state.modify( ctx, ubn ); + } + } + } + } + } + big_msg_master->recv_block (c, msg); } sync_master->recv_block (c, msg, reason == no_reason); } @@ -2324,11 +2288,12 @@ namespace eosio { auto ex_up = old.upper_bound( time_point::now()); auto ex_lo = old.lower_bound( fc::time_point_sec( 0)); old.erase( ex_lo, ex_up); + auto &stale = local_txns.get(); chain_controller &cc = chain_plug->chain(); uint32_t bn = cc.last_irreversible_block_num(); auto bn_up = stale.upper_bound(bn); - auto bn_lo = stale.lower_bound(0); + auto bn_lo = stale.lower_bound(1); stale.erase( bn_lo, bn_up); } @@ -2368,12 +2333,16 @@ namespace eosio { /** * This one is necessary to hook into the boost notifier api **/ - void net_plugin_impl::transaction_ready( const packed_transaction& txn) { - my_impl->bm_master->bcast_transaction( txn ); + void net_plugin_impl::transaction_ready(const transaction_metadata& md, const packed_transaction& txn) { + time_point_sec expire; + if (md.decompressed_trx) { + expire = md.decompressed_trx->expiration; + } + my_impl->big_msg_master->bcast_transaction(md.id, expire, txn); } void net_plugin_impl::broadcast_block_impl( const chain::signed_block &sb) { - bm_master->bcast_block(sb); + big_msg_master->bcast_block(sb); } bool net_plugin_impl::authenticate_peer(const handshake_message& msg) const { @@ -2574,12 +2543,12 @@ namespace eosio { my->send_whole_blocks = def_send_whole_blocks; my->sync_master.reset( new sync_manager(options.at("sync-fetch-span").as() ) ); - my->bm_master.reset( new big_msg_manager ); + my->big_msg_master.reset( new big_msg_manager ); my->connector_period = std::chrono::seconds(options.at("connection-cleanup-period").as()); my->txn_exp_period = def_txn_expire_wait; my->resp_expected_period = def_resp_expected_wait; - my->bm_master->just_send_it_max = def_max_just_send; + my->big_msg_master->just_send_it_max = def_max_just_send; my->max_client_count = options.at("max-clients").as(); my->num_clients = 0; -- GitLab