未验证 提交 c52f08b0 编写于 作者: W wanderingbort 提交者: GitHub

Merge pull request #5725 from EOSIO/feature/transaction-trace-logging

Transaction trace logging for visibility of transactions on P2P
......@@ -49,6 +49,9 @@ namespace fc {
const fc::string logger_name("producer_plugin");
fc::logger _log;
const fc::string trx_trace_logger_name("transaction_tracing");
fc::logger _trx_trace_log;
namespace eosio {
static appbase::abstract_plugin& _producer_plugin = app().register_plugin<producer_plugin>();
......@@ -341,12 +344,32 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
auto block_time = chain.pending_block_state()->header.timestamp.to_time_point();
auto send_response = [this, &trx, &next](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& response) {
auto send_response = [this, &trx, &chain, &next](const fc::static_variant<fc::exception_ptr, transaction_trace_ptr>& response) {
next(response);
if (response.contains<fc::exception_ptr>()) {
_transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(response.get<fc::exception_ptr>(), trx));
if (_pending_block_mode == pending_block_mode::producing) {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block num} for producer ${prod} is REJECTING tx: ${txid} : ${why} ",
("block_num", chain.head_block_num() + 1)
("prod", chain.pending_block_state()->header.producer)
("txid", trx->id())
("why",response.get<fc::exception_ptr>()->what()));
} else {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is REJECTING tx: ${txid} : ${why} ",
("txid", trx->id())
("why",response.get<fc::exception_ptr>()->what()));
}
} else {
_transaction_ack_channel.publish(std::pair<fc::exception_ptr, packed_transaction_ptr>(nullptr, trx));
if (_pending_block_mode == pending_block_mode::producing) {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block num} for producer ${prod} is ACCEPTING tx: ${txid}",
("block_num", chain.head_block_num() + 1)
("prod", chain.pending_block_state()->header.producer)
("txid", trx->id()));
} else {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is ACCEPTING tx: ${txid}",
("txid", trx->id()));
}
}
};
......@@ -373,6 +396,15 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
if (trace->except) {
if (failure_is_subjective(*trace->except, deadline_is_subjective)) {
_pending_incoming_transactions.emplace_back(trx, persist_until_expired, next);
if (_pending_block_mode == pending_block_mode::producing) {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block num} for producer ${prod} COULD NOT FIT, tx: ${txid} RETRYING ",
("block_num", chain.head_block_num() + 1)
("prod", chain.pending_block_state()->header.producer)
("txid", trx->id()));
} else {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution COULD NOT FIT tx: ${txid} RETRYING",
("txid", trx->id()));
}
} else {
auto e_ptr = trace->except->dynamic_copy_exception();
send_response(e_ptr);
......@@ -648,8 +680,13 @@ void producer_plugin::plugin_initialize(const boost::program_options::variables_
void producer_plugin::plugin_startup()
{ try {
if(fc::get_logger_map().find(logger_name) != fc::get_logger_map().end()) {
_log = fc::get_logger_map()[logger_name];
auto& logger_map = fc::get_logger_map();
if(logger_map.find(logger_name) != logger_map.end()) {
_log = logger_map[logger_name];
}
if( logger_map.find(trx_trace_logger_name) != logger_map.end()) {
_trx_trace_log = logger_map[trx_trace_logger_name];
}
ilog("producer plugin: plugin_startup() begin");
......@@ -983,8 +1020,29 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
// remove all persisted transactions that have now expired
auto& persisted_by_id = _persistent_transactions.get<by_id>();
auto& persisted_by_expiry = _persistent_transactions.get<by_expiry>();
if (!persisted_by_expiry.empty()) {
int num_expired_persistent = 0;
int orig_count = _persistent_transactions.size();
while(!persisted_by_expiry.empty() && persisted_by_expiry.begin()->expiry <= pbs->header.timestamp.to_time_point()) {
auto const& txid = persisted_by_expiry.begin()->trx_id;
if (_pending_block_mode == pending_block_mode::producing) {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Block ${block num} for producer ${prod} is EXPIRING PERSISTED tx: ${txid}",
("block_num", chain.head_block_num() + 1)
("prod", chain.pending_block_state()->header.producer)
("txid", txid));
} else {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Speculative execution is EXPIRING PERSISTED tx: ${txid}",
("txid", txid));
}
persisted_by_expiry.erase(persisted_by_expiry.begin());
num_expired_persistent++;
}
fc_dlog(_log, "Processed ${n} persisted transactions, Expired ${expired}",
("n", orig_count)
("expired", num_expired_persistent));
}
try {
......@@ -1015,6 +1073,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
for (auto& trx: unapplied_trxs) {
auto category = calculate_transaction_category(trx);
if (category == tx_category::EXPIRED || (category == tx_category::UNEXPIRED_UNPERSISTED && _producers.empty())) {
if (!_producers.empty()) {
fc_dlog(_trx_trace_log, "[TRX_TRACE] Node with producers configured is dropping an EXPIRED transaction that was PREVIOUSLY ACCEPTED : ${txid}",
("txid", trx->id));
}
chain.drop_unapplied_transaction(trx);
} else if (category == tx_category::PERSISTED || (category == tx_category::UNEXPIRED_UNPERSISTED && _pending_block_mode == pending_block_mode::producing)) {
apply_trxs.emplace_back(std::move(trx));
......@@ -1022,12 +1084,19 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
}
}
if (!apply_trxs.empty()) {
int num_applied = 0;
int num_failed = 0;
int num_processed = 0;
for (const auto& trx: apply_trxs) {
if (block_time <= fc::time_point::now()) exhausted = true;
if (exhausted) {
break;
}
num_processed++;
try {
auto deadline = fc::time_point::now() + fc::milliseconds(_max_transaction_time_ms);
bool deadline_is_subjective = false;
......@@ -1043,24 +1112,48 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
} else {
// this failed our configured maximum transaction time, we don't want to replay it
chain.drop_unapplied_transaction(trx);
num_failed++;
}
} else {
num_applied++;
}
} catch ( const guard_exception& e ) {
app().get_plugin<chain_plugin>().handle_guard_exception(e);
return start_block_result::failed;
} FC_LOG_AND_DROP();
}
fc_dlog(_log, "Processed ${m} of ${n} previously applied transactions, Applied ${applied}, Failed/Dropped ${failed}",
("m", num_processed)
("n", apply_trxs.size())
("applied", num_applied)
("failed", num_failed));
}
}
if (_pending_block_mode == pending_block_mode::producing) {
auto& blacklist_by_id = _blacklisted_transactions.get<by_id>();
auto& blacklist_by_expiry = _blacklisted_transactions.get<by_expiry>();
auto now = fc::time_point::now();
if(!blacklist_by_expiry.empty()) {
int num_expired = 0;
int orig_count = _blacklisted_transactions.size();
while (!blacklist_by_expiry.empty() && blacklist_by_expiry.begin()->expiry <= now) {
blacklist_by_expiry.erase(blacklist_by_expiry.begin());
num_expired++;
}
fc_dlog(_log, "Processed ${n} blacklisted transactions, Expired ${expired}",
("n", orig_count)
("expired", num_expired));
}
auto scheduled_trxs = chain.get_scheduled_transactions();
if (!scheduled_trxs.empty()) {
int num_applied = 0;
int num_failed = 0;
int num_processed = 0;
for (const auto& trx : scheduled_trxs) {
if (block_time <= fc::time_point::now()) exhausted = true;
......@@ -1068,6 +1161,8 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
break;
}
num_processed++;
// configurable ratio of incoming txns vs deferred txns
while (_incoming_trx_weight >= 1.0 && orig_pending_txn_size && _pending_incoming_transactions.size()) {
auto e = _pending_incoming_transactions.front();
......@@ -1102,7 +1197,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
auto expiration = fc::time_point::now() + fc::seconds(chain.get_global_properties().configuration.deferred_trx_expiration_window);
// this failed our configured maximum transaction time, we don't want to replay it add it to a blacklist
_blacklisted_transactions.insert(transaction_id_with_expiry{trx, expiration});
num_failed++;
}
} else {
num_applied++;
}
} catch ( const guard_exception& e ) {
app().get_plugin<chain_plugin>().handle_guard_exception(e);
......@@ -1112,6 +1210,14 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
_incoming_trx_weight += _incoming_defer_ratio;
if (!orig_pending_txn_size) _incoming_trx_weight = 0.0;
}
fc_dlog(_log, "Processed ${m} of ${n} scheduled transactions, Applied ${applied}, Failed/Dropped ${failed}",
("m", num_processed)
("n", scheduled_trxs.size())
("applied", num_applied)
("failed", num_failed));
}
}
if (exhausted || block_time <= fc::time_point::now()) {
......@@ -1119,6 +1225,9 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
} else {
// attempt to apply any pending incoming transactions
_incoming_trx_weight = 0.0;
if (!_pending_incoming_transactions.empty()) {
fc_dlog(_log, "Processing ${n} pending transactions");
while (orig_pending_txn_size && _pending_incoming_transactions.size()) {
auto e = _pending_incoming_transactions.front();
_pending_incoming_transactions.pop_front();
......@@ -1126,6 +1235,7 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block(bool
on_incoming_transaction_async(std::get<0>(e), std::get<1>(e), std::get<2>(e));
if (block_time <= fc::time_point::now()) return start_block_result::exhausted;
}
}
return start_block_result::succeeded;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册