未验证 提交 c7d08d47 编写于 作者: M Matt Witherspoon 提交者: GitHub

Merge branch 'master' into feature/more-bad-wasms

......@@ -1055,26 +1055,22 @@ namespace eosio {
class bnet_plugin_impl : public std::enable_shared_from_this<bnet_plugin_impl> {
public:
bnet_plugin_impl() {
_peer_pk = fc::crypto::private_key::generate();
_peer_id = _peer_pk.get_public_key();
}
string _bnet_endpoint_address = "0.0.0.0";
uint16_t _bnet_endpoint_port = 4321;
bool _request_trx = true;
public_key_type _peer_id;
private_key_type _peer_pk; /// one time random key to identify this process
bnet_plugin_impl() = default;
const private_key_type _peer_pk = fc::crypto::private_key::generate(); /// one time random key to identify this process
public_key_type _peer_id = _peer_pk.get_public_key();
string _bnet_endpoint_address = "0.0.0.0";
uint16_t _bnet_endpoint_port = 4321;
bool _request_trx = true;
std::vector<std::string> _connect_to_peers; /// list of peers to connect to
std::vector<std::thread> _socket_threads;
int32_t _num_threads = 1;
std::unique_ptr<boost::asio::io_context> _ioc; // lifetime guarded by shared_ptr of bnet_plugin_impl
std::shared_ptr<listener> _listener;
std::shared_ptr<boost::asio::deadline_timer> _timer;
std::vector<std::string> _connect_to_peers; /// list of peers to connect to
std::vector<std::thread> _socket_threads;
int32_t _num_threads = 1;
std::map<const session*, std::weak_ptr<session> > _sessions;
std::unique_ptr<boost::asio::io_context> _ioc; // lifetime guarded by shared_ptr of bnet_plugin_impl
std::shared_ptr<listener> _listener;
std::shared_ptr<boost::asio::deadline_timer> _timer; // only access on app io_service
std::map<const session*, std::weak_ptr<session> > _sessions; // only access on app io_service
channels::irreversible_block::channel_type::handle _on_irb_handle;
channels::accepted_block::channel_type::handle _on_accepted_block_handle;
......@@ -1091,6 +1087,7 @@ namespace eosio {
}
void on_session_close( const session* s ) {
if( !app().get_io_service().get_executor().running_in_this_thread() ) { elog( "wrong strand"); }
auto itr = _sessions.find(s);
if( _sessions.end() != itr )
_sessions.erase(itr);
......@@ -1098,14 +1095,16 @@ namespace eosio {
template<typename Call>
void for_each_session( Call callback ) {
for( const auto& item : _sessions ) {
if( auto ses = item.second.lock() ) {
ses->_ios.post( boost::asio::bind_executor(
ses->_strand,
[ses,cb=callback](){ cb(ses); }
));
app().get_io_service().post([this, callback = callback] {
for (const auto& item : _sessions) {
if (auto ses = item.second.lock()) {
ses->_ios.post(boost::asio::bind_executor(
ses->_strand,
[ses, cb = callback]() { cb(ses); }
));
}
}
}
});
}
void on_accepted_transaction( transaction_metadata_ptr trx ) {
......@@ -1152,6 +1151,7 @@ namespace eosio {
};
void on_reconnect_peers() {
if( !app().get_io_service().get_executor().running_in_this_thread() ) { elog( "wrong strand"); }
for( const auto& peer : _connect_to_peers ) {
bool found = false;
for( const auto& con : _sessions ) {
......
......@@ -56,6 +56,7 @@ add_test(NAME distributed-transactions-test COMMAND tests/distributed-transactio
add_test(NAME distributed-transactions-remote-test COMMAND tests/distributed-transactions-remote-test.py -v --dump-error-detail WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
# TODO removed on slim: add_test(NAME restart-scenarios-test_resync COMMAND tests/restart-scenarios-test.py -c resync -p4 -v --dump-error-details WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
# add_test(NAME restart-scenarios-test_replay COMMAND tests/restart-scenarios-test.py -c replay -p4 -v --dump-error-details WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
add_test(NAME restart-scenarios-test_hard_replay COMMAND tests/restart-scenarios-test.py -c hardReplay -p4 -v --dump-error-details WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
# TODO: add_test(NAME consensus-validation-malicious-producers COMMAND tests/consensus-validation-malicious-producers.py -w 80 --dump-error-details WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
if(ENABLE_COVERAGE_TESTING)
......
......@@ -4,7 +4,6 @@ import testUtils
import argparse
import random
import signal
###############################################################
# Test for different nodes restart scenarios.
......@@ -65,8 +64,8 @@ testSuccessful=False
random.seed(seed) # Use a fixed seed for repeatability.
cluster=testUtils.Cluster()
walletMgr=testUtils.WalletMgr(False)
cluster=testUtils.Cluster(walletd=True)
walletMgr=testUtils.WalletMgr(True)
try:
cluster.setChainStrategy(chainSyncStrategyStr)
......@@ -87,6 +86,12 @@ try:
if not cluster.waitOnClusterBlockNumSync(3):
errorExit("Cluster never stabilized")
Print("Stand up EOS wallet keosd")
walletMgr.killall()
walletMgr.cleanup()
if walletMgr.launch() is False:
errorExit("Failed to stand up keosd.")
accountsCount=total_nodes
walletName="MyWallet"
Print("Creating wallet %s if one doesn't already exist." % walletName)
......@@ -99,14 +104,15 @@ try:
errorExit("Wallet initialization failed.")
defproduceraAccount=cluster.defproduceraAccount
eosioAccount=cluster.eosioAccount
Print("Importing keys for account %s into wallet %s." % (defproduceraAccount.name, wallet.name))
if not walletMgr.importKey(defproduceraAccount, wallet):
errorExit("Failed to import key for account %s" % (defproduceraAccount.name))
Print("Create accounts.")
#if not cluster.createAccounts(wallet):
if not cluster.createAccounts(defproduceraAccount):
#if not cluster.createAccounts(defproduceraAccount):
if not cluster.createAccounts(eosioAccount):
errorExit("Accounts creation failed.")
Print("Wait on cluster sync.")
......@@ -115,9 +121,9 @@ try:
# TBD: Known issue (Issue 2043) that 'get currency0000 balance' doesn't return balance.
# Uncomment when functional
# Print("Spread funds and validate")
# if not cluster.spreadFundsAndValidate(10):
# errorExit("Failed to spread and validate funds.")
Print("Spread funds and validate")
if not cluster.spreadFundsAndValidate(10):
errorExit("Failed to spread and validate funds.")
Print("Wait on cluster sync.")
if not cluster.waitOnClusterSync():
......@@ -130,9 +136,9 @@ try:
# TBD: Known issue (Issue 2043) that 'get currency0000 balance' doesn't return balance.
# Uncomment when functional
# Print("Spread funds and validate")
# if not cluster.spreadFundsAndValidate(10):
# errorExit("Failed to spread and validate funds.")
Print("Spread funds and validate")
if not cluster.spreadFundsAndValidate(10):
errorExit("Failed to spread and validate funds.")
Print("Wait on cluster sync.")
if not cluster.waitOnClusterSync():
......@@ -150,9 +156,9 @@ try:
# TBD: Known issue (Issue 2043) that 'get currency0000 balance' doesn't return balance.
# Uncomment when functional
# Print("Spread funds and validate")
# if not cluster.spreadFundsAndValidate(10):
# errorExit("Failed to spread and validate funds.")
Print("Spread funds and validate")
if not cluster.spreadFundsAndValidate(10):
errorExit("Failed to spread and validate funds.")
Print("Wait on cluster sync.")
if not cluster.waitOnClusterSync():
......@@ -173,6 +179,5 @@ finally:
Print("Cleanup cluster and wallet data.")
cluster.cleanup()
walletMgr.cleanup()
pass
exit(0)
......@@ -16,6 +16,7 @@ import sys
import random
import json
import shlex
from sys import stdout
from core_symbol import CORE_SYMBOL
......@@ -39,7 +40,7 @@ class Utils:
def Print(*args, **kwargs):
stackDepth=len(inspect.stack())-2
s=' '*stackDepth
sys.stdout.write(s)
stdout.write(s)
print(*args, **kwargs)
SyncStrategy=namedtuple("ChainSyncStrategy", "name id arg")
......@@ -47,6 +48,7 @@ class Utils:
SyncNoneTag="none"
SyncReplayTag="replay"
SyncResyncTag="resync"
SyncHardReplayTag="hardReplay"
SigKillTag="kill"
SigTermTag="term"
......@@ -78,6 +80,9 @@ class Utils:
chainSyncStrategy=Utils.SyncStrategy(Utils.SyncResyncTag, 2, "--delete-all-blocks")
chainSyncStrategies[chainSyncStrategy.name]=chainSyncStrategy
chainSyncStrategy=Utils.SyncStrategy(Utils.SyncHardReplayTag, 3, "--hard-replay-blockchain")
chainSyncStrategies[chainSyncStrategy.name]=chainSyncStrategy
return chainSyncStrategies
@staticmethod
......@@ -98,14 +103,24 @@ class Utils:
timeout=60
endTime=time.time()+timeout
while endTime > time.time():
ret=lam()
if ret is not None:
return ret
sleepTime=3
Utils.Print("cmd: sleep %d seconds, remaining time: %d seconds" %
(sleepTime, endTime - time.time()))
time.sleep(sleepTime)
needsNewLine=False
try:
while endTime > time.time():
ret=lam()
if ret is not None:
return ret
sleepTime=3
if Utils.Debug:
Utils.Print("cmd: sleep %d seconds, remaining time: %d seconds" %
(sleepTime, endTime - time.time()))
else:
stdout.write('.')
stdout.flush()
needsNewLine=True
time.sleep(sleepTime)
finally:
if needsNewLine:
Utils.Print()
return None
......@@ -170,16 +185,17 @@ class Node(object):
assert trans["processed"]["receipt"]["status"] == "executed", printTrans(trans)
@staticmethod
def runCmdReturnJson(cmd, trace=False):
def runCmdReturnJson(cmd, trace=False, silentErrors=False):
cmdArr=shlex.split(cmd)
retStr=Utils.checkOutput(cmdArr)
jStr=Node.filterJsonObject(retStr)
if trace: Utils.Print ("RAW > %s"% (retStr))
if trace: Utils.Print ("JSON> %s"% (jStr))
if not jStr:
msg="Expected JSON response"
Utils.Print ("ERROR: "+ msg)
Utils.Print ("RAW > %s"% retStr)
msg="Received empty JSON response"
if not silentErrors:
Utils.Print ("ERROR: "+ msg)
Utils.Print ("RAW > %s"% retStr)
raise TypeError(msg)
try:
......@@ -446,7 +462,7 @@ class Node(object):
refBlockNum=int(refBlockNum)+1
except (TypeError, ValueError, KeyError) as _:
Utils.Print("transaction parsing failed. Transaction: %s" % (trans))
raise
return None
headBlockNum=self.getHeadBlockNum()
assert(headBlockNum)
......@@ -1000,7 +1016,7 @@ class Node(object):
cmd="%s %s get info" % (Utils.EosClientPath, self.endpointArgs)
if Utils.Debug: Utils.Print("cmd: %s" % (cmd))
try:
trans=Node.runCmdReturnJson(cmd)
trans=Node.runCmdReturnJson(cmd, silentErrors=silentErrors)
return trans
except subprocess.CalledProcessError as ex:
if not silentErrors:
......@@ -1070,7 +1086,7 @@ class Node(object):
return False
if not Utils.waitForBool(myFunc):
Utils.Print("ERROR: Failed to kill node (%s)." % (self.cmd), ex)
Utils.Print("ERROR: Failed to validate node shutdown.")
return False
# mark node as killed
......@@ -1079,30 +1095,40 @@ class Node(object):
return True
# TBD: make nodeId an internal property
def relaunch(self, nodeId, chainArg):
def relaunch(self, nodeId, chainArg, timeout=Utils.systemWaitTimeout):
assert(self.pid is None)
assert(self.killed)
if Utils.Debug: Utils.Print("Launching node process, Id: %d" % (nodeId))
dataDir="var/lib/node_%02d" % (nodeId)
dt = datetime.datetime.now()
dateStr="%d_%02d_%02d_%02d_%02d_%02d" % (
dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
stdoutFile="%s/stdout.%s.txt" % (dataDir, dateStr)
stderrFile="%s/stderr.%s.txt" % (dataDir, dateStr)
with open(stdoutFile, 'w') as sout, open(stderrFile, 'w') as serr:
cmd=self.cmd + ("" if chainArg is None else (" " + chainArg))
Utils.Print("cmd: %s" % (cmd))
popen=subprocess.Popen(cmd.split(), stdout=sout, stderr=serr)
self.pid=popen.pid
running=True
try:
os.kill(self.pid, 0) #check if process with pid is running
except OSError as _:
running=False
def isNodeAlive():
"""wait for node to be responsive."""
try:
return True if self.checkPulse() else False
except (TypeError) as _:
pass
return False
if running:
Utils.Print("WARNING: A process with pid (%d) is already running." % (self.pid))
isAlive=Utils.waitForBool(isNodeAlive, timeout)
if isAlive:
Utils.Print("Node relaunch was successfull.")
else:
if Utils.Debug: Utils.Print("Launching node process, Id: %d" % (nodeId))
dataDir="var/lib/node_%02d" % (nodeId)
dt = datetime.datetime.now()
dateStr="%d_%02d_%02d_%02d_%02d_%02d" % (
dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
stdoutFile="%s/stdout.%s.txt" % (dataDir, dateStr)
stderrFile="%s/stderr.%s.txt" % (dataDir, dateStr)
with open(stdoutFile, 'w') as sout, open(stderrFile, 'w') as serr:
cmd=self.cmd + ("" if chainArg is None else (" " + chainArg))
Utils.Print("cmd: %s" % (cmd))
popen=subprocess.Popen(cmd.split(), stdout=sout, stderr=serr)
self.pid=popen.pid
Utils.Print("ERROR: Node relaunch Failed.")
self.pid=None
return False
self.killed=False
return True
......@@ -1726,7 +1752,7 @@ class Cluster(object):
Utils.Print("ERROR: Failed to spread funds across nodes.")
return False
Utils.Print("Funds spread across all accounts. Noew validate funds")
Utils.Print("Funds spread across all accounts. Now validate funds")
if False == self.validateSpreadFunds(initialBalances, transferAmount, self.defproduceraAccount, self.accounts):
Utils.Print("ERROR: Failed to validate funds transfer across nodes.")
......@@ -2157,7 +2183,7 @@ class Cluster(object):
for i in range(0, len(self.nodes)):
node=self.nodes[i]
if not node.relaunch(i, chainArg):
if node.killed and not node.relaunch(i, chainArg):
return False
return True
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册