提交 11592d05 编写于 作者: P Phil Mesnier 提交者: GitHub

Merge pull request #77 from pmesnier/master

net plugin and genesis-timestamp
......@@ -3,6 +3,7 @@
#include <eos/chain/block_log.hpp>
#include <eos/chain/exceptions.hpp>
#include <eos/chain/producer_object.hpp>
#include <eos/chain/config.hpp>
#include <eos/native_contract/native_contract_chain_initializer.hpp>
#include <eos/native_contract/native_contract_chain_administrator.hpp>
......@@ -24,6 +25,7 @@ class chain_plugin_impl {
public:
bfs::path block_log_dir;
bfs::path genesis_file;
chain::Time genesis_timestamp;
bool readonly = false;
flat_map<uint32_t,block_id_type> loaded_checkpoints;
......@@ -44,6 +46,7 @@ void chain_plugin::set_program_options(options_description& cli, options_descrip
{
cfg.add_options()
("genesis-json", bpo::value<boost::filesystem::path>(), "File to read Genesis State from")
("genesis-timestamp", bpo::value<string>(), "override the initial timestamp in the Genesis State file")
("block-log-dir", bpo::value<bfs::path>()->default_value("blocks"),
"the location of the block log (absolute path or relative to application data dir)")
("checkpoint,c", bpo::value<vector<string>>()->composing(), "Pairs of [BLOCK_NUM,BLOCK_ID] that should be enforced as checkpoints.")
......@@ -62,6 +65,21 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
if(options.count("genesis-json")) {
my->genesis_file = options.at("genesis-json").as<bfs::path>();
}
if(options.count("genesis-timestamp")) {
string tstr = options.at("genesis-timestamp").as<string>();
if (strcasecmp (tstr.c_str(), "now") == 0) {
my->genesis_timestamp = fc::time_point::now();
auto diff = my->genesis_timestamp.sec_since_epoch() % config::BlockIntervalSeconds;
if (diff > 0) {
auto delay = (config::BlockIntervalSeconds - diff);
my->genesis_timestamp += delay;
dlog ("pausing ${s} seconds to the next interval",("s",delay));
}
}
else {
my->genesis_timestamp = chain::Time::from_iso_string (tstr);
}
}
if (options.count("block-log-dir")) {
auto bld = options.at("block-log-dir").as<bfs::path>();
if(bld.is_relative())
......@@ -96,6 +114,11 @@ void chain_plugin::plugin_startup() {
auto& db = app().get_plugin<database_plugin>().db();
auto genesis = fc::json::from_file(my->genesis_file).as<native_contract::genesis_state_type>();
if (my->genesis_timestamp.sec_since_epoch() > 0) {
genesis.initial_timestamp = my->genesis_timestamp;
}
native_contract::native_contract_chain_initializer initializer(genesis);
my->fork_db = fork_database();
......@@ -109,7 +132,8 @@ void chain_plugin::plugin_startup() {
my->chain->add_checkpoints(my->loaded_checkpoints);
}
ilog("Blockchain started; head block is #${num}", ("num", my->chain->head_block_num()));
ilog("Blockchain started; head block is #${num}, genesis timestamp is ${ts}",
("num", my->chain->head_block_num())("ts", genesis.initial_timestamp.to_iso_string()));
}
void chain_plugin::plugin_shutdown() {
......
......@@ -16,7 +16,7 @@ namespace eos {
block_id_type head_id;
string os;
string agent;
};
};
struct notice_message {
vector<transaction_id_type> known_trx;
......
......@@ -4,6 +4,7 @@
#include <eos/net_plugin/protocol.hpp>
#include <eos/chain/chain_controller.hpp>
#include <eos/chain/exceptions.hpp>
#include <eos/chain/block.hpp>
#include <fc/network/ip.hpp>
#include <fc/io/raw.hpp>
......@@ -95,6 +96,12 @@ namespace eos {
>
> sync_request_index;
struct handshake_initializer {
static void populate (handshake_message &hello);
static net_plugin_impl* info;
};
class connection : public std::enable_shared_from_this<connection> {
public:
connection( socket_ptr s )
......@@ -114,38 +121,19 @@ namespace eos {
sync_request_index in_sync_state;
sync_request_index out_sync_state;
socket_ptr socket;
vector<fc::ip::endpoint> shared_peers;
std::set<fc::ip::endpoint> shared_peers;
uint32_t pending_message_size;
vector<char> pending_message_buffer;
fc::optional<handshake_message> hello;
handshake_message last_handshake;
std::deque<net_message> out_queue;
connection_ptr self;
void send_handshake ( ) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
try {
hello->last_irreversible_block_id = cc.get_block_id_for_num
(hello->last_irreversible_block_num = cc.last_irreversible_block_num());
}
catch (const unknown_block_exception &ex) {
hello->last_irreversible_block_id = fc::sha256::hash(0);
hello->last_irreversible_block_num = 0;
}
try {
hello->head_id = cc.get_block_id_for_num
(hello->head_num = cc.head_block_num());
}
catch (const unknown_block_exception &ex) {
hello->head_id = fc::sha256::hash(0);
hello->head_num = 0;
}
ilog ("send_handshake my libnum = ${n} head = ${h}",
("n",hello->last_irreversible_block_num)("h",hello->head_num));
send (*hello);
handshake_message hello;
handshake_initializer::populate(hello);
send (hello);
}
void send( const net_message& m ) {
......@@ -189,7 +177,7 @@ namespace eos {
});
}
void write_block_backlog ( ) {
void write_block_backlog ( ) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
auto ss = out_sync_state.begin();
uint32_t num = ++ss.get_node()->value().last;
......@@ -201,7 +189,7 @@ namespace eos {
try {
fc::optional<signed_block> sb = cc.fetch_block_by_number(num);
if (sb) {
dlog("write backlog, block #${num}",("num",num));
// dlog("write backlog, block #${num}",("num",num));
send( *sb );
}
} catch ( ... ) {
......@@ -216,7 +204,6 @@ namespace eos {
}; // class connection
class net_plugin_impl {
public:
unique_ptr<tcp::acceptor> acceptor;
......@@ -230,7 +217,10 @@ namespace eos {
std::set< connection_ptr > connections;
bool done = false;
fc::optional<handshake_message> hello;
int16_t network_version = 0;
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
std::string user_agent_name;
chain_plugin* chain_plug;
vector<node_transaction_state> local_txns;
......@@ -311,36 +301,13 @@ namespace eos {
} FC_CAPTURE_AND_RETHROW() }
#endif
void init_handshake () {
if (!hello) {
hello = handshake_message();
}
hello->network_version = 0;
chain_plug->get_chain_id(hello->chain_id);
fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size());
#if defined( __APPLE__ )
hello->os = "osx";
#elif defined( __linux__ )
hello->os = "linux";
#elif defined( _MSC_VER )
hello->os = "win32";
#else
hello->os = "other";
#endif
hello->agent = user_agent_name;
}
void start_session( connection_ptr con ) {
connections.insert (con);
start_read_message( con );
if (!hello.valid()) {
init_handshake ();
}
con->hello = hello;
con->send_handshake( );
con->send_handshake();
send_peer_message(*con);
// con->readloop_complete = bf::async( [=](){ read_loop( con ); } );
// con->writeloop_complete = bf::async( [=](){ write_loop con ); } );
......@@ -366,7 +333,7 @@ namespace eos {
boost::asio::async_read( *c->socket,
boost::asio::buffer((char *)buff, sizeof(c->pending_message_size)),
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
ilog( "read size handler..." );
//ilog( "read size handler..." );
if( !ec ) {
if( c->pending_message_size <= c->pending_message_buffer.size() ) {
start_reading_pending_buffer( c );
......@@ -393,18 +360,20 @@ namespace eos {
return fc::ip::endpoint (addr,ep.port());
}
void send_peer_list () {
void send_peer_message (connection &conn) {
peer_message pm;
pm.peers.resize(connections.size());
for (auto &c : connections) {
pm.peers.push_back (asio_to_fc(c->socket->remote_endpoint()));
fc::ip::endpoint remote = asio_to_fc(c->socket->remote_endpoint());
if (conn.shared_peers.find(remote) == conn.shared_peers.end()) {
pm.peers.push_back(remote);
}
}
for (auto c : connections) {
c->send (pm);
if (!pm.peers.empty()) {
conn.send (pm);
}
}
template<typename T>
void send_all (const T &msg) {
for (auto &c : connections) {
......@@ -434,21 +403,19 @@ namespace eos {
}
void handle_message (connection_ptr c, const handshake_message &msg) {
if (!hello) {
init_handshake();
}
dlog ("got a handshake message");
if (msg.node_id == hello->node_id) {
if (msg.node_id == node_id) {
elog ("Self connection detected. Closing connection");
close(c);
return;
}
if (msg.chain_id != hello->chain_id) {
if (msg.chain_id != chain_id) {
elog ("Peer on a different chain. Closing connection");
close (c);
return;
}
if (msg.network_version != hello->network_version) {
if (msg.network_version != network_version) {
elog ("Peer network id does not match ");
close (c);
return;
......@@ -465,6 +432,7 @@ namespace eos {
void handle_message (connection_ptr c, const peer_message &msg) {
dlog ("got a peer message");
for (auto fcep : msg.peers) {
c->shared_peers.insert (fcep);
tcp::endpoint ep = fc_to_asio (fcep);
if (ep == listen_endpoint || ep == public_endpoint) {
continue;
......@@ -486,21 +454,38 @@ namespace eos {
void handle_message (connection_ptr c, const notice_message &msg) {
dlog ("got a notice message");
chain_controller &cc = chain_plug->chain();
for (const auto& b : msg.known_blocks) {
if (! cc.is_known_block (b)) {
c->block_state.insert((block_state){b,true,true,fc::time_point()});
}
}
for (const auto& t : msg.known_trx) {
if (!cc.is_known_transaction (t)) {
c->trx_state.insert((transaction_state){t,true,true,(uint32_t)-1,
fc::time_point(),fc::time_point()});
}
}
}
void handle_message (connection_ptr c, const sync_request_message &msg) {
ilog ("got a sync request message for blocks ${s} to ${e}", ("s",msg.start_block)("e", msg.end_block));
dlog ("got a sync request message for blocks ${s} to ${e}",
("s",msg.start_block)("e", msg.end_block));
sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()};
c->out_sync_state.insert (req);
c->write_block_backlog ();
}
void handle_message (connection_ptr c, const block_summary_message &msg) {
ilog ("got a block summary message");
// TODO: reconstruct actual block from cached transactions
dlog ("got a block summary message");
#warning TODO: reconstruct actual block from cached transactions
chain_controller &cc = chain_plug->chain();
if (cc.is_known_block(msg.block.id())) {
dlog ("block id ${id} is known", ("id", msg.block.id()) );
const auto& itr = c->block_state.get<by_id>();
block_state value = *itr.find(msg.block.id());
value.is_known=true;
c->block_state.insert (std::move(value));
// dlog ("block id ${id} is known", ("id", msg.block.id()) );
return;
}
try {
......@@ -516,17 +501,17 @@ namespace eos {
}
void handle_message (connection_ptr c, const SignedTransaction &msg) {
ilog ("got a SignedTransacton");
dlog ("got a SignedTransacton");
chain_plug->accept_transaction (msg);
}
void handle_message (connection_ptr c, const signed_block &msg) {
uint32_t bn = msg.block_num();
ilog ("got a signed_block, num = ${n}", ("n", bn));
dlog ("got a signed_block, num = ${n}", ("n", bn));
chain_controller &cc = chain_plug->chain();
if (cc.is_known_block(msg.id())) {
dlog ("block id ${id} is known", ("id", msg.id()) );
// dlog ("block id ${id} is known", ("id", msg.id()) );
return;
}
uint32_t num = 0;
......@@ -573,11 +558,11 @@ namespace eos {
boost::asio::buffer(c->pending_message_buffer.data(),
c->pending_message_size ),
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
ilog( "read buffer handler..." );
// ilog( "read buffer handler..." );
if( !ec ) {
try {
auto msg = fc::raw::unpack<net_message>( c->pending_message_buffer );
ilog( "received message of size: ${s}", ("s",bytes_transferred) );
// ilog( "received message of size: ${s}", ("s",bytes_transferred) );
start_read_message( c );
msgHandler m(*this, c);
......@@ -601,10 +586,55 @@ namespace eos {
c.reset ();
}
static void pending_txn (const SignedTransaction& txn) {
// dlog ("got signaled of txn!");
}
}; // class net_plugin_impl
net_plugin_impl* handshake_initializer::info;
void
handshake_initializer::populate (handshake_message &hello) {
hello.network_version = 0;
hello.chain_id = info->chain_id;
hello.node_id = info->node_id;
#if defined( __APPLE__ )
hello.os = "osx";
#elif defined( __linux__ )
hello.os = "linux";
#elif defined( _MSC_VER )
hello.os = "win32";
#else
hello.os = "other";
#endif
hello.agent = info->user_agent_name;
chain_controller& cc = info->chain_plug->chain();
try {
hello.last_irreversible_block_id = cc.get_block_id_for_num
(hello.last_irreversible_block_num = cc.last_irreversible_block_num());
}
catch (const unknown_block_exception &ex) {
hello.last_irreversible_block_id = fc::sha256::hash(0);
hello.last_irreversible_block_num = 0;
}
try {
hello.head_id = cc.get_block_id_for_num
(hello.head_num = cc.head_block_num());
}
catch (const unknown_block_exception &ex) {
hello.head_id = fc::sha256::hash(0);
hello.head_num = 0;
}
}
net_plugin::net_plugin()
:my( new net_plugin_impl ) {
handshake_initializer::info = my.get();
}
net_plugin::~net_plugin() {
......@@ -642,6 +672,9 @@ namespace eos {
my->user_agent_name = options.at ("agent-name").as< string > ();
}
my->chain_plug = app().find_plugin<chain_plugin>();
my->chain_plug->get_chain_id(my->chain_id);
fc::rand_pseudo_bytes(my->node_id.data(), my->node_id.data_size());
}
void net_plugin::plugin_startup() {
......@@ -652,6 +685,7 @@ namespace eos {
my->acceptor->set_option(tcp::acceptor::reuse_address(true));
my->acceptor->bind(my->listen_endpoint);
my->acceptor->listen();
my->chain_plug->chain().on_pending_transaction.connect (&net_plugin_impl::pending_txn);
my->start_listen_loop();
}
......@@ -687,12 +721,18 @@ namespace eos {
void net_plugin::broadcast_block (const chain::signed_block &sb) {
vector<transaction_id_type> trxs;
if (!sb.cycles.empty()) {
for (const auto& cyc : sb.cycles) {
for (const auto& thr : cyc) {
for (auto ui : thr.user_input) {
trxs.push_back (ui.id());
}
}
}
}
block_summary_message bsm = {sb, trxs};
my->send_all (bsm);
}
void net_plugin::broadcast_transaction (const chain::SignedTransaction &txn) {
my->send_all (txn);
}
}
#include <appbase/application.hpp>
#include <eos/producer_plugin/producer_plugin.hpp>
#include <eos/chain_plugin/chain_plugin.hpp>
#include <eos/http_plugin/http_plugin.hpp>
#include <eos/chain_api_plugin/chain_api_plugin.hpp>
#include <eos/net_plugin/net_plugin.hpp>
......@@ -17,8 +19,10 @@ int main(int argc, char** argv)
try {
app().register_plugin<net_plugin>();
app().register_plugin<chain_api_plugin>();
app().register_plugin<http_plugin>();
app().register_plugin<producer_plugin>();
if(!app().initialize<net_plugin, chain_plugin>(argc, argv))
app().register_plugin<chain_plugin>();
if(!app().initialize<chain_plugin, net_plugin, producer_plugin>(argc, argv))
return -1;
app().startup();
......
#!/usr/bin/perl
use strict;
use Getopt::Long;
use Env;
use File::Basename;
use File::copy;
use File::Spec;
use File::Path;
use Cwd;
my $eos_home = defined $ENV{EOS_HOME} ? $ENV{EOS_HOME} : getcwd;
my $eosd = $eos_home . "/programs/eosd/eosd";
my $nodes = defined $ENV{EOS_TEST_RING} ? $ENV{EOS_TEST_RING} : "1";
my $only_one = defined $ENV{EOS_TEST_ONE_PRODUCER} ? "1" : "";
my $prods = 21;
my $genesis = "$eos_home/genesis.json";
my $http_port_base = 8888;
my $p2p_port_base = 9876;
my $data_dir_base = './test-dir-node';
my $http_port_base = 8888;
my $first_pause = 45;
my $launch_pause = 5;
my $run_duration = 60;
if (!GetOptions("nodes=i" => \$nodes,
"first-pause=i" => \$first_pause,
"launch-pause=i" => \$launch_pause,
"duration=i" => \$run_duratiion,
"one-producer" => \$only_one)) {
print "usage: $ARGV[0] [--nodes=<n>] [--first-pause=<n>] [--launch-pause=<n>] [--duration=<n>] [--one-producer]\n";
print "where:\n";
print "--nodes=n (default = 1) sets the number of eosd instances to launch\n";
print "--first-pause=n (default = 45) sets the seconds delay after starting the first instance\n";
print "--launch-pause=n (default = 5) sets the seconds delay after starting subsequent nodes\n";
print "--duration=n (default = 60) sets the seconds delay after starting the last node before shutting down the test\n";
print "--one-producer (default = no) if set concentrates all producers into the first node\n";
print "\nproducer count currently fixed at $prods\n";
exit
}
my $per_node = int ($prods / ($only_one ? 1 : $nodes));
my $extra = $prods - ($per_node * $nodes);
my @pid;
my $prod_ndx = ord('a');
for (my $i = 0; $i < $nodes; $i++) {
my $p2p_port = $p2p_port_base + $i;
my $forward = $p2p_port + 1;
my $backward = $p2p_port - 1;
my $http_port = $http_port_base + $i;
my $data_dir = "$data_dir_base-$i";
if ($nodes > 1) {
if ($i == 0) {
$backward = $p2p_port_base + $nodes -1;
}
elsif ($i == $nodes - 1) {
$forward = $p2p_port_base;
}
}
print "purging directory $data_dir\n";
rmtree ($data_dir);
mkdir ($data_dir);
open (my $cfg, '>', "$data_dir/config.ini") ;
print $cfg "genesis-json = \"$genesis\"\n";
print $cfg "block-log-dir = \"blocks\"\n";
print $cfg "readonly = 0\n";
print $cfg "shared-file-dir = \"blockchain\"\n";
print $cfg "shared-file-size = 64\n";
print $cfg "http-server-endpoint = 127.0.0.1:$http_port\n";
print $cfg "listen-endpoint = 127.0.0.1:$p2p_port\n";
print $cfg "remote-endpoint = 127.0.0.1:$forward\n" if ($nodes > 1);
print $cfg "remote-endpoint = 127.0.0.1:$backward\n" if ($nodes > 2);
print $cfg "public-endpoint = 0.0.0.0:$p2p_port\n";
print $cfg "enable-stale-production = true\n";
print $cfg "required-participation = true\n";
print $cfg "private-key = [\"EOS6MRyAjQq8ud7hVNYcfnVPJqcVpscN5So8BhtHuGYqET5GDW5CV\",\"5KQwrPbwdL6PhXujxW37FSSQZ1JiwsST4cqQzDeyXtP79zkvFD3\"]\n";
if ($i == 0 || $only_one != "1") {
print $cfg "plugin = eos::producer_plugin\n";
$per_node += $extra if ($i == $nodes - 1);
for (my $p = 0; $p < $per_node; $p++) {
my $pname = "init" . chr($prod_ndx++);
print $cfg "producer-name = $pname\n";
}
}
close $cfg;
my @cmdline = ($eosd,
# "--genesis-timestamp=now",
"--data-dir=$data_dir");
$pid[$i] = fork;
if ($pid[$i] > 0) {
my $pause = $i == 0 ? $first_pause : $launch_pause;
print "parent process looping, child pid = $pid[$i]";
if ($i < $nodes - 1) {
print ", pausing $pause seconds\n";
sleep ($pause);
}
else {
print "\n";
}
}
elsif (defined ($pid[$i])) {
print "child execing now, pid = $$\n";
open OUTPUT, '>', "$data_dir/stdout.txt" or die $!;
open ERROR, '>', "$data_dir/stderr.txt" or die $!;
STDOUT->fdopen ( \*OUTPUT, 'w') or die $!;
STDERR->fdopen ( \*ERROR, 'w') or die $!;
exec @cmdline;
print "child terminating now\n";
exit;
}
else {
print "fork failed\n";
exit;
}
}
print "all nodes launched, network running for $run_duration seconds\n";
sleep ($run_duration);
foreach my $pp (@pid) {
print "killing $pp\n";
my $res = kill 2, $pp;
print "kill returned $res"
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册