peer_test.go 5.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
package whisper

import (
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/p2p"
)

type testPeer struct {
	client *Whisper
	stream *p2p.MsgPipeRW
	termed chan struct{}
}

func startTestPeer() *testPeer {
	// Create a simulated P2P remote peer and data streams to it
	remote := p2p.NewPeer(randomNodeID(), randomNodeName(), whisperCaps())
	tester, tested := p2p.MsgPipe()

	// Create a whisper client and connect with it to the tester peer
	client := New()
	client.Start()

	termed := make(chan struct{})
	go func() {
		defer client.Stop()
		defer close(termed)
		defer tested.Close()

		client.handlePeer(remote, tested)
	}()
	// Assemble and return the test peer
	return &testPeer{
		client: client,
		stream: tester,
		termed: termed,
	}
}

func TestPeerStatusMessage(t *testing.T) {
	tester := startTestPeer()

	// Wait for the handshake status message and check it
	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	// Terminate the node
	tester.stream.Close()

	select {
	case <-tester.termed:
	case <-time.After(time.Second):
		t.Fatalf("local close timed out")
	}
}

func TestPeerHandshakeFail(t *testing.T) {
	tester := startTestPeer()

	// Wait for and check the handshake
	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	// Send an invalid handshake status and verify disconnect
	if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
		t.Fatalf("failed to send malformed status: %v", err)
	}
	select {
	case <-tester.termed:
	case <-time.After(time.Second):
		t.Fatalf("remote close timed out")
	}
}

func TestPeerHandshakeSuccess(t *testing.T) {
	tester := startTestPeer()

	// Wait for and check the handshake
	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	// Send a valid handshake status and make sure connection stays live
	if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
		t.Fatalf("failed to send status: %v", err)
	}
	select {
	case <-tester.termed:
		t.Fatalf("valid handshake disconnected")

	case <-time.After(100 * time.Millisecond):
	}
	// Clean up the test
	tester.stream.Close()

	select {
	case <-tester.termed:
	case <-time.After(time.Second):
		t.Fatalf("local close timed out")
	}
}

func TestPeerSend(t *testing.T) {
	// Start a tester and execute the handshake
	tester := startTestPeer()
	defer tester.stream.Close()

	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
		t.Fatalf("failed to send status: %v", err)
	}
	// Construct a message and inject into the tester
	message := NewMessage([]byte("peer broadcast test message"))
	envelope, err := message.Wrap(DefaultPoW, Options{
		TTL: DefaultTTL,
	})
	if err != nil {
		t.Fatalf("failed to wrap message: %v", err)
	}
	if err := tester.client.Send(envelope); err != nil {
		t.Fatalf("failed to send message: %v", err)
	}
	// Check that the message is eventually forwarded
	payload := []interface{}{envelope}
	if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
		t.Fatalf("message mismatch: %v", err)
	}
	// Make sure that even with a re-insert, an empty batch is received
	if err := tester.client.Send(envelope); err != nil {
		t.Fatalf("failed to send message: %v", err)
	}
	if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
		t.Fatalf("message mismatch: %v", err)
	}
}

func TestPeerDeliver(t *testing.T) {
	// Start a tester and execute the handshake
	tester := startTestPeer()
	defer tester.stream.Close()

	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
		t.Fatalf("failed to send status: %v", err)
	}
	// Watch for all inbound messages
	arrived := make(chan struct{}, 1)
	tester.client.Watch(Filter{
		Fn: func(message *Message) {
			arrived <- struct{}{}
		},
	})
	// Construct a message and deliver it to the tester peer
	message := NewMessage([]byte("peer broadcast test message"))
	envelope, err := message.Wrap(DefaultPoW, Options{
		TTL: DefaultTTL,
	})
	if err != nil {
		t.Fatalf("failed to wrap message: %v", err)
	}
	if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
		t.Fatalf("failed to transfer message: %v", err)
	}
	// Check that the message is delivered upstream
	select {
	case <-arrived:
	case <-time.After(time.Second):
		t.Fatalf("message delivery timeout")
	}
	// Check that a resend is not delivered
	if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
		t.Fatalf("failed to transfer message: %v", err)
	}
	select {
	case <-time.After(2 * transmissionTicks):
	case <-arrived:
		t.Fatalf("repeating message arrived")
	}
}