提交 4ee3b983 编写于 作者: D Daniel Larimer

start implementing some scheduling of cycles

上级 38c74a2b
#include <eosio/chain/apply_context.hpp>
#include <eosio/chain/chain_controller.hpp>
#include <eosio/chain/wasm_interface.hpp>
#include <eosio/chain/generated_transaction_object.hpp>
namespace eosio { namespace chain {
void apply_context::exec()
......@@ -109,7 +110,7 @@ void apply_context::deferred_transaction_start( uint32_t id,
trx.expiration = execute_before;
trx.execute_after = execute_after;
trx.sender = receiver; ///< sender is the receiver of the current action
trx.id = id;
trx.sender_id = id;
controller.validate_scope( trx );
......@@ -134,6 +135,18 @@ void apply_context::deferred_transaction_send( uint32_t id ) {
FC_ASSERT( dt.actions.size(), "transaction must contain at least one action" );
controller.check_authorization( dt, flat_set<public_key_type>(), false, {receiver} );
auto itr = _pending_deferred_transactions.find( id );
mutable_db.create<generated_transaction_object>( [&]( auto& obj ) {
obj.trx_id = dt.id();
obj.sender = receiver;
obj.sender_id = dt.sender_id;
obj.expiration = dt.expiration;
obj.delay_until = dt.execute_after;
obj.packed_trx.resize( fc::raw::pack_size( dt ) );
fc::datastream<char*> ds( obj.packed_trx.data(), obj.packed_trx.size() );
fc::raw::pack( ds, dt );
});
_pending_deferred_transactions.erase(itr);
}
......
......@@ -265,9 +265,19 @@ void chain_controller::_push_transaction(const signed_transaction& trx) {
validate_referenced_accounts(trx);
check_transaction_authorization(trx);
auto shardnum = _pending_cycle.schedule( trx );
if( shardnum == -1 ) { /// schedule conflict start new cycle
_start_cycle();
}
auto& bcycle = _pending_block->cycles_summary.back();
if( shardnum >= bcycle.size() ) {
bcycle.resize( bcycle.size()+1 );
bcycle.back().emplace_back( tid );
}
_apply_transaction(trx);
/** for now we will just shove everything into the first shard */
_pending_block->cycles_summary[0][0].emplace_back( tid );
_pending_block->input_transactions.push_back(trx);
// The transaction applied successfully. Merge its changes into the pending block session.
......@@ -277,6 +287,18 @@ void chain_controller::_push_transaction(const signed_transaction& trx) {
on_pending_transaction(trx);
}
/**
* Wraps up all work for current shards, starts a new cycle, and
* executes any pending transactions
*/
void chain_controller::_start_cycle() {
_pending_block->cycles_summary.resize( _pending_block->cycles_summary.size() + 1 );
_pending_cycle = pending_cycle_state();
/// TODO: check for deferred transactions and schedule them
} // _start_cycle
signed_block chain_controller::generate_block(
block_timestamp_type when,
account_name producer,
......@@ -614,86 +636,6 @@ void chain_controller::validate_expiration( const transaction& trx ) const
} FC_CAPTURE_AND_RETHROW((trx)) }
/*
void chain_controller::process_message( const transaction& trx, account_name code,
const action& message, actionOutput& output, apply_context* parent_context) {
apply_context apply_ctx(*this, _db, trx, message, code);
apply_message(apply_ctx);
output.notify.reserve( apply_ctx.notified.size() );
for( uint32_t i = 0; i < apply_ctx.notified.size(); ++i ) {
try {
auto notify_code = apply_ctx.notified[i];
output.notify.push_back( {notify_code} );
process_message(trx, notify_code, message, output.notify.back().output, &apply_ctx);
} FC_CAPTURE_AND_RETHROW((apply_ctx.notified[i]))
}
// combine inline messages and process
if (apply_ctx.inline_messages.size() > 0) {
output.inline_transaction = Inlinetransaction(trx);
(*output.inline_transaction).messages = std::move(apply_ctx.inline_messages);
}
for( auto& asynctrx : apply_ctx.deferred_transactions ) {
digest_type::encoder enc; fc::raw::pack( enc, trx ); fc::raw::pack( enc, asynctrx );
auto id = enc.result();
auto gtrx = Generatedtransaction(id, asynctrx);
_db.create<generated_transaction_object>([&](generated_transaction_object& transaction) {
transaction.trx = gtrx;
transaction.status = generated_transaction_object::PENDING;
});
output.deferred_transactions.emplace_back( gtrx );
}
// propagate used_authorizations up the context chain
if (parent_context != nullptr)
for (int i = 0; i < apply_ctx.used_authorizations.size(); ++i)
if (apply_ctx.used_authorizations[i])
parent_context->used_authorizations[i] = true;
// process_message recurses for each notified account, but we only want to run this check at the top level
if (parent_context == nullptr && (_skip_flags & skip_authority_check) == false)
EOS_ASSERT(apply_ctx.all_authorizations_used(), tx_irrelevant_auth,
"action declared authorities it did not need: ${unused}",
("unused", apply_ctx.unused_authorizations())("message", message));
}
*/
/*
void chain_controller::apply_message(apply_context& context)
{ try {
/// context.code => the execution namespace
/// message.code / message.type => Event
const auto& m = context.msg;
auto contract_handlers_itr = apply_handlers.find(context.code);
if (contract_handlers_itr != apply_handlers.end()) {
auto message_handler_itr = contract_handlers_itr->second.find({m.code, m.type});
if (message_handler_itr != contract_handlers_itr->second.end()) {
message_handler_itr->second(context);
return;
}
}
const auto& recipient = _db.get<account_object,by_name>(context.code);
if (recipient.code.size()) {
//idump((context.code)(context.msg.type));
const uint32_t execution_time =
_skip_flags | received_block
? _rcvd_block_txn_execution_time
: _skip_flags | created_block
? _create_block_txn_execution_time
: _txn_execution_time;
const bool is_received_block = _skip_flags & received_block;
wasm_interface::get().apply(context, execution_time, is_received_block);
}
} FC_CAPTURE_AND_RETHROW((context.msg)) }
*/
void chain_controller::require_scope( const scope_name& scope )const {
switch( uint64_t(scope) ) {
case config::eosio_all_scope:
......@@ -802,26 +744,17 @@ void chain_controller::update_global_properties(const signed_block& b) { try {
});
/*
auto schedule = _admin->get_next_round(_db);
auto config = _admin->get_blockchain_configuration(_db, schedule);
_db.modify(gpo, [schedule = std::move(schedule), config = std::move(config)] (global_property_object& gpo) {
gpo.active_producers = std::move(schedule);
gpo.configuration = std::move(config);
});
auto active_producers_authority = types::Authority(config::ProducersAuthorityThreshold, {}, {});
auto active_producers_authority = authority(config::producers_authority_threshold, {}, {});
for(auto& name : gpo.active_producers) {
active_producers_authority.accounts.push_back({{name, config::ActiveName}, 1});
active_producers_authority.accounts.push_back({{name.producer_name, config::active_name}, 1});
}
auto& po = _db.get<permission_object, by_owner>( boost::make_tuple(config::producers_account_name,
config::active_level_name ) );
config::active_name ) );
_db.modify(po,[active_producers_authority] (permission_object& po) {
po.auth = active_producers_authority;
});
*/
}
} FC_CAPTURE_AND_RETHROW() }
......
......@@ -19,6 +19,7 @@
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/contracts/genesis_state.hpp>
#include <eosio/chain/wasm_interface.hpp>
#include <eosio/chain/pending_cycle_state.hpp>
#include <fc/log/logger.hpp>
......@@ -392,6 +393,7 @@ namespace eosio { namespace chain {
void _spinup_db();
void _spinup_fork_db();
void _start_cycle();
// producer_schedule_type calculate_next_round( const signed_block& next_block );
......@@ -402,6 +404,7 @@ namespace eosio { namespace chain {
optional<database::session> _pending_block_session;
optional<signed_block> _pending_block;
uint32_t _pending_transaction_count = 0;
pending_cycle_state _pending_cycle;
bool _currently_applying_block = false;
bool _currently_replaying_blocks = false;
......
......@@ -29,7 +29,7 @@ namespace eosio { namespace chain {
id_type id;
transaction_id_type trx_id;
account_name sender;
uint64_t sender_id = 0; /// ID given this transaction by the sender
uint32_t sender_id = 0; /// ID given this transaction by the sender
time_point delay_until; /// this generated transaction will not be applied until the specified time
time_point expiration; /// this generated transaction will not be applied after this time
shared_vector<char> packed_trx;
......@@ -53,7 +53,7 @@ namespace eosio { namespace chain {
>,
ordered_unique< tag<by_delay>,
composite_key< generated_transaction_object,
BOOST_MULTI_INDEX_MEMBER( generated_transaction_object, time_point, expiration),
BOOST_MULTI_INDEX_MEMBER( generated_transaction_object, time_point, delay_until),
BOOST_MULTI_INDEX_MEMBER( generated_transaction_object, generated_transaction_object::id_type, id)
>
>
......@@ -61,7 +61,7 @@ namespace eosio { namespace chain {
>;
typedef chainbase::generic_index<generated_transaction_multi_index> generated_transaction_index;
} }
} } // eosio::chain
CHAINBASE_SET_INDEX_TYPE(eosio::chain::generated_transaction_object, eosio::chain::generated_transaction_multi_index)
#pragma once
#include <eosio/chain/transaction.hpp>
namespace eosio { namespace chain {
struct pending_cycle_state {
set<scope_name> read_scopes;
map<scope_name,uint32_t> write_scope_to_shard;
/**
* @return the shard number this transation goes in or (-1) if it
* cannot go on any shard due to conflict
*/
uint32_t schedule( const transaction& trx ) {
uint32_t current_shard = -1;
for( const auto& ws : trx.write_scope ) {
if( read_scopes.find(ws) != read_scopes.end() )
return -1;
auto itr = write_scope_to_shard.find(ws);
if( itr != write_scope_to_shard.end() ) {
if( current_shard == -1 ) {
current_shard = itr->second;
continue;
}
if( current_shard != itr->second )
return -1; /// conflict detected
}
}
for( const auto& rs : trx.read_scope )
{
auto itr = write_scope_to_shard.find(rs);
if( itr != write_scope_to_shard.end() ) {
if( current_shard == -1 ) {
current_shard = itr->second;
continue;
}
if( current_shard != itr->second )
return -1; /// schedule conflict
current_shard = itr->second;
}
}
if( current_shard == -1 ) {
shards.resize( shards.size()+1 );
current_shard = shards.size() - 1;
for( auto ws : trx.write_scope )
{
shards.back().write_scopes.insert( ws );
write_scope_to_shard[ws] = current_shard;
}
for( auto rs : trx.read_scope )
read_scopes.insert(rs);
}
return current_shard;
} /// schedule
struct pending_shard {
set<scope_name> write_scopes;
};
vector<pending_shard> shards;
};
} } /// eosio::chain
......@@ -147,7 +147,7 @@ namespace eosio { namespace chain {
*/
struct deferred_transaction : public transaction
{
uint32_t id; /// ID assigned by sender of generated, accessible via WASM api when executing normal or error
uint32_t sender_id; /// ID assigned by sender of generated, accessible via WASM api when executing normal or error
account_name sender; /// receives error handler callback
time_point_sec execute_after; /// delayed exeuction
};
......@@ -159,6 +159,6 @@ FC_REFLECT( eosio::chain::action, (scope)(name)(authorization)(data) )
FC_REFLECT( eosio::chain::transaction_header, (expiration)(region)(ref_block_num)(ref_block_prefix) )
FC_REFLECT_DERIVED( eosio::chain::transaction, (eosio::chain::transaction_header), (read_scope)(write_scope)(actions) )
FC_REFLECT_DERIVED( eosio::chain::signed_transaction, (eosio::chain::transaction), (signatures) )
FC_REFLECT_DERIVED( eosio::chain::deferred_transaction, (eosio::chain::transaction), (id)(sender)(execute_after) )
FC_REFLECT_DERIVED( eosio::chain::deferred_transaction, (eosio::chain::transaction), (sender_id)(sender)(execute_after) )
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册