提交 e8c6d063 编写于 作者: A Abhay Bothra 提交者: bors-libra

[network] Separate delivery of data and control notifications from PeerManager

Closes: #2130
Approved by: phlip9
上级 e435f362
......@@ -29,6 +29,7 @@ use libra_crypto::hash::CryptoHash;
use libra_types::crypto_proxies::{
LedgerInfoWithSignatures, ValidatorChangeProof, ValidatorSet, ValidatorVerifier,
};
use network::peer_manager::conn_status_channel;
use network::{
proto::ConsensusMsg_oneof,
validator_network::{ConsensusNetworkEvents, ConsensusNetworkSender},
......@@ -61,8 +62,9 @@ impl SMRNode {
let (consensus_tx, consensus_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (conn_mgr_reqs_tx, conn_mgr_reqs_rx) = channel::new_test(8);
let (_, conn_status_rx) = conn_status_channel::new();
let network_sender = ConsensusNetworkSender::new(network_reqs_tx, conn_mgr_reqs_tx);
let network_events = ConsensusNetworkEvents::new(consensus_rx);
let network_events = ConsensusNetworkEvents::new(consensus_rx, conn_status_rx);
playground.add_node(author, consensus_tx, network_reqs_rx, conn_mgr_reqs_rx);
let (commit_cb_sender, commit_cb_receiver) = mpsc::unbounded::<LedgerInfoWithSignatures>();
......
......@@ -51,6 +51,7 @@ use libra_types::crypto_proxies::{
random_validator_verifier, EpochInfo, LedgerInfoWithSignatures, ValidatorSigner,
ValidatorVerifier,
};
use network::peer_manager::conn_status_channel;
use network::{
proto::{ConsensusMsg, ConsensusMsg_oneof},
validator_network::{ConsensusNetworkEvents, ConsensusNetworkSender},
......@@ -131,8 +132,9 @@ impl NodeSetup {
let (consensus_tx, consensus_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (conn_mgr_reqs_tx, conn_mgr_reqs_rx) = channel::new_test(8);
let (_, conn_status_rx) = conn_status_channel::new();
let network_sender = ConsensusNetworkSender::new(network_reqs_tx, conn_mgr_reqs_tx);
let network_events = ConsensusNetworkEvents::new(consensus_rx);
let network_events = ConsensusNetworkEvents::new(consensus_rx, conn_status_rx);
let author = signer.author();
playground.add_node(author, consensus_tx, network_reqs_rx, conn_mgr_reqs_rx);
......
......@@ -16,7 +16,7 @@ use libra_prost_ext::MessageExt;
use libra_types::block_info::BlockInfo;
use libra_types::PeerId;
use network::{
peer_manager::{PeerManagerNotification, PeerManagerRequest},
peer_manager::{conn_status_channel, PeerManagerNotification, PeerManagerRequest},
proto::{ConsensusMsg, ConsensusMsg_oneof},
protocols::rpc::InboundRpcRequest,
validator_network::{
......@@ -389,8 +389,9 @@ fn test_network_api() {
let (consensus_tx, consensus_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (conn_mgr_reqs_tx, conn_mgr_reqs_rx) = channel::new_test(8);
let (_, conn_status_rx) = conn_status_channel::new();
let network_sender = ConsensusNetworkSender::new(network_reqs_tx, conn_mgr_reqs_tx);
let network_events = ConsensusNetworkEvents::new(consensus_rx);
let network_events = ConsensusNetworkEvents::new(consensus_rx, conn_status_rx);
playground.add_node(*peer, consensus_tx, network_reqs_rx, conn_mgr_reqs_rx);
let (self_sender, self_receiver) = channel::new_test(8);
......@@ -460,8 +461,9 @@ fn test_rpc() {
let (consensus_tx, consensus_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (conn_mgr_reqs_tx, conn_mgr_reqs_rx) = channel::new_test(8);
let (_, conn_status_rx) = conn_status_channel::new();
let network_sender = ConsensusNetworkSender::new(network_reqs_tx, conn_mgr_reqs_tx);
let network_events = ConsensusNetworkEvents::new(consensus_rx);
let network_events = ConsensusNetworkEvents::new(consensus_rx, conn_status_rx);
playground.add_node(*peer, consensus_tx, network_reqs_rx, conn_mgr_reqs_rx);
let (self_sender, self_receiver) = channel::new_test(8);
......
......@@ -14,7 +14,10 @@ use futures::{
use libra_config::config::{NetworkConfig, NodeConfig};
use libra_types::{transaction::SignedTransaction, PeerId};
use network::{
peer_manager::{PeerManagerNotification, PeerManagerRequest},
peer_manager::{
conn_status_channel, ConnectionStatusNotification, PeerManagerNotification,
PeerManagerRequest,
},
proto::MempoolSyncMsg,
validator_network::{MempoolNetworkEvents, MempoolNetworkSender},
DisconnectReason, ProtocolId,
......@@ -38,6 +41,7 @@ struct SharedMempoolNetwork {
HashMap<PeerId, libra_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>>,
network_notifs_txs:
HashMap<PeerId, libra_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>>,
network_conn_event_notifs_txs: HashMap<PeerId, conn_status_channel::Sender>,
runtimes: HashMap<PeerId, Runtime>,
subscribers: HashMap<PeerId, UnboundedReceiver<SharedMempoolNotification>>,
timers: HashMap<PeerId, UnboundedSender<SyncEvent>>,
......@@ -61,8 +65,9 @@ impl SharedMempoolNetwork {
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (network_notifs_tx, network_notifs_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (conn_status_tx, conn_status_rx) = conn_status_channel::new();
let network_sender = MempoolNetworkSender::new(network_reqs_tx);
let network_events = MempoolNetworkEvents::new(network_notifs_rx);
let network_events = MempoolNetworkEvents::new(network_notifs_rx, conn_status_rx);
let (sender, subscriber) = unbounded();
let (timer_sender, timer_receiver) = unbounded();
let (_ac_endpoint_sender, ac_endpoint_receiver) = mpsc::channel(1_024);
......@@ -92,6 +97,8 @@ impl SharedMempoolNetwork {
smp.mempools.insert(peer_id, mempool);
smp.network_reqs_rxs.insert(peer_id, network_reqs_rx);
smp.network_notifs_txs.insert(peer_id, network_notifs_tx);
smp.network_conn_event_notifs_txs
.insert(peer_id, conn_status_tx);
smp.subscribers.insert(peer_id, subscriber);
smp.timers.insert(peer_id, timer_sender);
smp.runtimes.insert(peer_id, runtime);
......@@ -107,19 +114,9 @@ impl SharedMempoolNetwork {
}
}
fn send_event(&mut self, peer: &PeerId, notif: PeerManagerNotification) {
let network_notifs_tx = self.network_notifs_txs.get_mut(peer).unwrap();
network_notifs_tx
.push(
(
*peer,
ProtocolId::from_static(
network::validator_network::MEMPOOL_DIRECT_SEND_PROTOCOL,
),
),
notif,
)
.unwrap();
fn send_connection_event(&mut self, peer: &PeerId, notif: ConnectionStatusNotification) {
let conn_notifs_tx = self.network_conn_event_notifs_txs.get_mut(peer).unwrap();
conn_notifs_tx.push(*peer, notif).unwrap();
self.wait_for_event(peer, SharedMempoolNotification::PeerStateChange);
}
......@@ -201,9 +198,9 @@ fn test_basic_flow() {
);
// A discovers new peer B
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
for seq in 0..3 {
......@@ -233,9 +230,9 @@ fn test_metric_cache_ignore_shared_txns() {
assert_eq!(smp.exist_in_metrics_cache(&peer_a, &txns[2]), true);
// Let peer_a discover new peer_b.
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
for txn in txns.iter().take(3) {
// Let peer_a share txns with peer_b
......@@ -256,13 +253,13 @@ fn test_interruption_in_sync() {
smp.add_txns(&peer_a, vec![TestTransaction::new(1, 0, 1)]);
// A discovers 2 peers
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_c, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_c, Multiaddr::empty()),
);
// make sure it delivered first transaction to both nodes
......@@ -276,9 +273,9 @@ fn test_interruption_in_sync() {
assert_eq!(peers, expected_peers);
// A loses connection to B
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::LostPeer(
ConnectionStatusNotification::LostPeer(
*peer_b,
Multiaddr::empty(),
DisconnectReason::ConnectionLost,
......@@ -297,9 +294,9 @@ fn test_interruption_in_sync() {
assert_eq!(txn.sequence_number(), 2);
// A reconnects to B
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
// B should receive transaction 2
......@@ -317,9 +314,9 @@ fn test_ready_transactions() {
vec![TestTransaction::new(1, 0, 1), TestTransaction::new(1, 2, 1)],
);
// first message delivery
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
smp.deliver_message(&peer_a);
......@@ -339,13 +336,13 @@ fn test_broadcast_self_transactions() {
smp.add_txns(&peer_a, vec![TestTransaction::new(0, 0, 1)]);
// A and B discover each other
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
smp.send_event(
smp.send_connection_event(
&peer_b,
PeerManagerNotification::NewPeer(*peer_a, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_a, Multiaddr::empty()),
);
// A sends txn to B
......@@ -372,13 +369,13 @@ fn test_broadcast_dependencies() {
smp.add_txns(&peer_b, vec![TestTransaction::new(0, 1, 1)]);
// A and B discover each other
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
smp.send_event(
smp.send_connection_event(
&peer_b,
PeerManagerNotification::NewPeer(*peer_a, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_a, Multiaddr::empty()),
);
// B receives 0
......@@ -400,13 +397,13 @@ fn test_broadcast_updated_transaction() {
smp.add_txns(&peer_a, vec![TestTransaction::new(0, 0, 1)]);
// A and B discover each other
smp.send_event(
smp.send_connection_event(
&peer_a,
PeerManagerNotification::NewPeer(*peer_b, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_b, Multiaddr::empty()),
);
smp.send_event(
smp.send_connection_event(
&peer_b,
PeerManagerNotification::NewPeer(*peer_a, Multiaddr::empty()),
ConnectionStatusNotification::NewPeer(*peer_a, Multiaddr::empty()),
);
// B receives 0
......
......@@ -13,6 +13,7 @@ use futures::channel::{
};
use libra_config::config::{NetworkConfig, NodeConfig};
use libra_types::PeerId;
use network::peer_manager::conn_status_channel;
use network::validator_network::{MempoolNetworkEvents, MempoolNetworkSender};
use std::num::NonZeroUsize;
use std::sync::{Arc, Mutex};
......@@ -48,8 +49,9 @@ pub fn mock_shared_mempool() -> (
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (_network_notifs_tx, network_notifs_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(8).unwrap(), None);
let (_, conn_notifs_rx) = conn_status_channel::new();
let network_sender = MempoolNetworkSender::new(network_reqs_tx);
let network_events = MempoolNetworkEvents::new(network_notifs_rx);
let network_events = MempoolNetworkEvents::new(network_notifs_rx, conn_notifs_rx);
let (sender, _subscriber) = unbounded();
let (ac_sender, client_events) = mpsc::channel(1_024);
let (_consensus_sender, consensus_events) = mpsc::channel(1_024);
......
......@@ -15,12 +15,9 @@
//! to the peer.
use crate::{
common::NetworkPublicKeys,
peer_manager::PeerManagerNotification,
peer_manager::{PeerManagerError, PeerManagerRequestSender},
ProtocolId,
peer_manager::{self, conn_status_channel, PeerManagerError, PeerManagerRequestSender},
};
use channel;
use channel::libra_channel;
use channel::{self};
use futures::{
channel::oneshot,
future::{BoxFuture, FutureExt},
......@@ -54,7 +51,7 @@ pub struct ConnectivityManager<TTicker, TBackoff> {
/// Channel to send requests to PeerManager.
peer_mgr_reqs_tx: PeerManagerRequestSender,
/// Channel to receive notifications from PeerManager.
peer_mgr_notifs_rx: libra_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>,
control_notifs_rx: conn_status_channel::Receiver,
/// Channel over which we receive requests from other actors.
requests_rx: channel::Receiver<ConnectivityRequest>,
/// Peers queued to be dialed, potentially with some delay. The dial can be cancelled by
......@@ -111,7 +108,7 @@ where
eligible: Arc<RwLock<HashMap<PeerId, NetworkPublicKeys>>>,
ticker: TTicker,
peer_mgr_reqs_tx: PeerManagerRequestSender,
peer_mgr_notifs_rx: libra_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>,
control_notifs_rx: conn_status_channel::Receiver,
requests_rx: channel::Receiver<ConnectivityRequest>,
backoff_strategy: TBackoff,
max_delay_ms: u64,
......@@ -122,7 +119,7 @@ where
peer_addresses: HashMap::new(),
ticker,
peer_mgr_reqs_tx,
peer_mgr_notifs_rx,
control_notifs_rx,
requests_rx,
dial_queue: HashMap::new(),
dial_states: HashMap::new(),
......@@ -153,9 +150,9 @@ where
trace!("Event Id: {}, type: ConnectivityRequest, req: {:?}", self.event_id, req);
self.handle_request(req);
},
notif = self.peer_mgr_notifs_rx.select_next_some() => {
trace!("Event Id: {}, type: PeerManagerNotification, notif: {:?}", self.event_id, notif);
self.handle_peer_mgr_notification(notif);
notif = self.control_notifs_rx.select_next_some() => {
trace!("Event Id: {}, type: peer_manager::ConnectionStatusNotification, notif: {:?}", self.event_id, notif);
self.handle_control_notification(notif);
},
peer_id = pending_dials.select_next_some() => {
trace!("Event Id: {}, type: Dial complete, peer: {}", self.event_id, peer_id.short_str());
......@@ -347,15 +344,15 @@ where
}
}
fn handle_peer_mgr_notification(&mut self, notif: PeerManagerNotification) {
fn handle_control_notification(&mut self, notif: peer_manager::ConnectionStatusNotification) {
match notif {
PeerManagerNotification::NewPeer(peer_id, addr) => {
peer_manager::ConnectionStatusNotification::NewPeer(peer_id, addr) => {
self.connected.insert(peer_id, addr);
// Cancel possible queued dial to this peer.
self.dial_states.remove(&peer_id);
self.dial_queue.remove(&peer_id);
}
PeerManagerNotification::LostPeer(peer_id, addr, _reason) => {
peer_manager::ConnectionStatusNotification::LostPeer(peer_id, addr, _reason) => {
match self.connected.get(&peer_id) {
Some(curr_addr) if *curr_addr == addr => {
// Remove node from connected peers list.
......@@ -370,9 +367,6 @@ where
}
}
}
_ => {
panic!("Received unexpected notification from peer manager");
}
}
}
}
......
......@@ -3,7 +3,9 @@
use super::*;
use crate::peer::DisconnectReason;
use crate::peer_manager::PeerManagerRequest;
use crate::peer_manager::{conn_status_channel, PeerManagerRequest};
use crate::ProtocolId;
use channel::libra_channel;
use channel::message_queues::QueueStyle;
use core::str::FromStr;
use futures::SinkExt;
......@@ -19,14 +21,13 @@ fn setup_conn_mgr(
seed_peer_id: PeerId,
) -> (
libra_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
libra_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
conn_status_channel::Sender,
channel::Sender<ConnectivityRequest>,
channel::Sender<()>,
) {
let (peer_mgr_reqs_tx, peer_mgr_reqs_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(1).unwrap(), None);
let (peer_mgr_notifs_tx, peer_mgr_notifs_rx) =
libra_channel::new(QueueStyle::FIFO, NonZeroUsize::new(1).unwrap(), None);
let (control_notifs_tx, control_notifs_rx) = conn_status_channel::new();
let (conn_mgr_reqs_tx, conn_mgr_reqs_rx) = channel::new_test(0);
let (ticker_tx, ticker_rx) = channel::new_test(0);
let mut rng = StdRng::from_seed(TEST_SEED);
......@@ -47,7 +48,7 @@ fn setup_conn_mgr(
)),
ticker_rx,
PeerManagerRequestSender::new(peer_mgr_reqs_tx),
peer_mgr_notifs_rx,
control_notifs_rx,
conn_mgr_reqs_rx,
FixedInterval::from_millis(100),
300, /* ms */
......@@ -56,7 +57,7 @@ fn setup_conn_mgr(
rt.spawn(conn_mgr.start());
(
peer_mgr_reqs_rx,
peer_mgr_notifs_tx,
control_notifs_tx,
conn_mgr_reqs_tx,
ticker_tx,
)
......@@ -86,20 +87,20 @@ async fn get_dial_queue_size(conn_mgr_reqs_tx: &mut channel::Sender<Connectivity
}
async fn send_notification_await_delivery(
peer_mgr_notifs_tx: &mut libra_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
control_notifs_tx: &mut conn_status_channel::Sender,
peer_id: PeerId,
notif: PeerManagerNotification,
notif: peer_manager::ConnectionStatusNotification,
) {
let (delivered_tx, delivered_rx) = oneshot::channel();
peer_mgr_notifs_tx
.push_with_feedback((peer_id, ProtocolId::default()), notif, Some(delivered_tx))
control_notifs_tx
.push_with_feedback(peer_id, notif, Some(delivered_tx))
.unwrap();
delivered_rx.await.unwrap();
}
async fn expect_disconnect_request(
peer_mgr_reqs_rx: &mut libra_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
peer_mgr_notifs_tx: &mut libra_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
control_notifs_tx: &mut conn_status_channel::Sender,
peer_id: PeerId,
address: Multiaddr,
result: Result<(), PeerManagerError>,
......@@ -116,9 +117,13 @@ async fn expect_disconnect_request(
}
if success {
send_notification_await_delivery(
peer_mgr_notifs_tx,
control_notifs_tx,
peer_id,
PeerManagerNotification::LostPeer(peer_id, address, DisconnectReason::Requested),
peer_manager::ConnectionStatusNotification::LostPeer(
peer_id,
address,
DisconnectReason::Requested,
),
)
.await;
}
......@@ -126,7 +131,7 @@ async fn expect_disconnect_request(
async fn expect_dial_request(
peer_mgr_reqs_rx: &mut libra_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
peer_mgr_notifs_tx: &mut libra_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
control_notifs_tx: &mut conn_status_channel::Sender,
conn_mgr_reqs_tx: &mut channel::Sender<ConnectivityRequest>,
peer_id: PeerId,
address: Multiaddr,
......@@ -149,9 +154,9 @@ async fn expect_dial_request(
peer_id.short_str()
);
send_notification_await_delivery(
peer_mgr_notifs_tx,
control_notifs_tx,
peer_id,
PeerManagerNotification::NewPeer(peer_id, address),
peer_manager::ConnectionStatusNotification::NewPeer(peer_id, address),
)
.await;
}
......@@ -174,7 +179,7 @@ fn addr_change() {
let mut rt = Runtime::new().unwrap();
let seed_peer_id = PeerId::random();
info!("Seed peer_id is {}", seed_peer_id.short_str());
let (mut peer_mgr_reqs_rx, mut peer_mgr_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
let (mut peer_mgr_reqs_rx, mut control_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
setup_conn_mgr(&mut rt, seed_peer_id);
// Fake peer manager and discovery.
......@@ -199,7 +204,7 @@ fn addr_change() {
info!("Waiting to receive dial request");
expect_dial_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
&mut conn_mgr_reqs_tx,
seed_peer_id,
seed_address.clone(),
......@@ -242,9 +247,9 @@ fn addr_change() {
// We expect the peer which changed its address to also disconnect.
info!("Sending lost peer notification for seed peer at old address");
send_notification_await_delivery(
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
seed_peer_id,
PeerManagerNotification::LostPeer(
peer_manager::ConnectionStatusNotification::LostPeer(
seed_peer_id,
seed_address,
DisconnectReason::ConnectionLost,
......@@ -260,7 +265,7 @@ fn addr_change() {
info!("Waiting to receive dial request to seed peer at new address");
expect_dial_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
&mut conn_mgr_reqs_tx,
seed_peer_id,
seed_address_new,
......@@ -277,7 +282,7 @@ fn lost_connection() {
let mut rt = Runtime::new().unwrap();
let seed_peer_id = PeerId::random();
info!("Seed peer_id is {}", seed_peer_id.short_str());
let (mut peer_mgr_reqs_rx, mut peer_mgr_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
let (mut peer_mgr_reqs_rx, mut control_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
setup_conn_mgr(&mut rt, seed_peer_id);
// Fake peer manager and discovery.
......@@ -302,7 +307,7 @@ fn lost_connection() {
info!("Waiting to receive dial request");
expect_dial_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
&mut conn_mgr_reqs_tx,
seed_peer_id,
seed_address.clone(),
......@@ -313,9 +318,9 @@ fn lost_connection() {
// Notify connectivity actor of loss of connection to seed_peer.
info!("Sending LostPeer event to signal connection loss");
send_notification_await_delivery(
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
seed_peer_id,
PeerManagerNotification::LostPeer(
peer_manager::ConnectionStatusNotification::LostPeer(
seed_peer_id,
seed_address.clone(),
DisconnectReason::ConnectionLost,
......@@ -332,7 +337,7 @@ fn lost_connection() {
info!("Waiting to receive dial request");
expect_dial_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
&mut conn_mgr_reqs_tx,
seed_peer_id,
seed_address.clone(),
......@@ -349,7 +354,7 @@ fn disconnect() {
let mut rt = Runtime::new().unwrap();
let seed_peer_id = PeerId::random();
info!("Seed peer_id is {}", seed_peer_id.short_str());
let (mut peer_mgr_reqs_rx, mut peer_mgr_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
let (mut peer_mgr_reqs_rx, mut control_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
setup_conn_mgr(&mut rt, seed_peer_id);
let events_f = async move {
......@@ -373,7 +378,7 @@ fn disconnect() {
info!("Waiting to receive dial request");
expect_dial_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
&mut conn_mgr_reqs_tx,
seed_peer_id,
seed_address.clone(),
......@@ -396,7 +401,7 @@ fn disconnect() {
info!("Waiting to receive disconnect request");
expect_disconnect_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
seed_peer_id,
seed_address.clone(),
Ok(()),
......@@ -413,7 +418,7 @@ fn retry_on_failure() {
let mut rt = Runtime::new().unwrap();
let seed_peer_id = PeerId::random();
info!("Seed peer_id is {}", seed_peer_id.short_str());
let (mut peer_mgr_reqs_rx, mut peer_mgr_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
let (mut peer_mgr_reqs_rx, mut control_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
setup_conn_mgr(&mut rt, seed_peer_id);
let events_f = async move {
......@@ -437,7 +442,7 @@ fn retry_on_failure() {
info!("Waiting to receive dial request");
expect_dial_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
&mut conn_mgr_reqs_tx,
seed_peer_id,
seed_address.clone(),
......@@ -455,7 +460,7 @@ fn retry_on_failure() {
info!("Waiting to receive dial request");
expect_dial_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
&mut conn_mgr_reqs_tx,
seed_peer_id,
seed_address.clone(),
......@@ -478,7 +483,7 @@ fn retry_on_failure() {
info!("Waiting to receive disconnect request");
expect_disconnect_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
seed_peer_id,
seed_address.clone(),
Err(PeerManagerError::IoError(io::Error::from(
......@@ -496,7 +501,7 @@ fn retry_on_failure() {
info!("Waiting to receive disconnect request");
expect_disconnect_request(
&mut peer_mgr_reqs_rx,
&mut peer_mgr_notifs_tx,
&mut control_notifs_tx,
seed_peer_id,
seed_address.clone(),
Ok(()),
......@@ -514,7 +519,7 @@ fn no_op_requests() {
let mut rt = Runtime::new().unwrap();
let seed_peer_id = PeerId::random();
info!("Seed peer_id is {}", seed_peer_id.short_str());
let (mut peer_mgr_reqs_rx, mut peer_mgr_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
let (mut peer_mgr_reqs_rx, mut control_notifs_tx, mut conn_mgr_reqs_tx, mut ticker_tx) =
setup_conn_mgr(&mut rt, seed_peer_id);