peer_test.go 7.5 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
package whisperv2
18 19 20 21 22 23

import (
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/p2p"
24
	"github.com/ethereum/go-ethereum/p2p/discover"
25 26 27 28 29 30 31 32 33 34
)

type testPeer struct {
	client *Whisper
	stream *p2p.MsgPipeRW
	termed chan struct{}
}

func startTestPeer() *testPeer {
	// Create a simulated P2P remote peer and data streams to it
35
	remote := p2p.NewPeer(discover.NodeID{}, "", nil)
36 37 38 39
	tester, tested := p2p.MsgPipe()

	// Create a whisper client and connect with it to the tester peer
	client := New()
40
	client.Start(nil)
41 42 43 44 45 46 47 48 49

	termed := make(chan struct{})
	go func() {
		defer client.Stop()
		defer close(termed)
		defer tested.Close()

		client.handlePeer(remote, tested)
	}()
50

51 52 53 54 55 56 57
	return &testPeer{
		client: client,
		stream: tester,
		termed: termed,
	}
}

58 59 60 61 62 63 64 65 66 67 68 69 70 71
func startTestPeerInited() (*testPeer, error) {
	peer := startTestPeer()

	if err := p2p.ExpectMsg(peer.stream, statusCode, []uint64{protocolVersion}); err != nil {
		peer.stream.Close()
		return nil, err
	}
	if err := p2p.SendItems(peer.stream, statusCode, protocolVersion); err != nil {
		peer.stream.Close()
		return nil, err
	}
	return peer, nil
}

72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
func TestPeerStatusMessage(t *testing.T) {
	tester := startTestPeer()

	// Wait for the handshake status message and check it
	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	// Terminate the node
	tester.stream.Close()

	select {
	case <-tester.termed:
	case <-time.After(time.Second):
		t.Fatalf("local close timed out")
	}
}

func TestPeerHandshakeFail(t *testing.T) {
	tester := startTestPeer()

	// Wait for and check the handshake
	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	// Send an invalid handshake status and verify disconnect
	if err := p2p.SendItems(tester.stream, messagesCode); err != nil {
		t.Fatalf("failed to send malformed status: %v", err)
	}
	select {
	case <-tester.termed:
	case <-time.After(time.Second):
		t.Fatalf("remote close timed out")
	}
}

func TestPeerHandshakeSuccess(t *testing.T) {
	tester := startTestPeer()

	// Wait for and check the handshake
	if err := p2p.ExpectMsg(tester.stream, statusCode, []uint64{protocolVersion}); err != nil {
		t.Fatalf("status message mismatch: %v", err)
	}
	// Send a valid handshake status and make sure connection stays live
	if err := p2p.SendItems(tester.stream, statusCode, protocolVersion); err != nil {
		t.Fatalf("failed to send status: %v", err)
	}
	select {
	case <-tester.termed:
		t.Fatalf("valid handshake disconnected")

	case <-time.After(100 * time.Millisecond):
	}
	// Clean up the test
	tester.stream.Close()

	select {
	case <-tester.termed:
	case <-time.After(time.Second):
		t.Fatalf("local close timed out")
	}
}

func TestPeerSend(t *testing.T) {
	// Start a tester and execute the handshake
136 137 138 139
	tester, err := startTestPeerInited()
	if err != nil {
		t.Fatalf("failed to start initialized peer: %v", err)
	}
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
	defer tester.stream.Close()

	// Construct a message and inject into the tester
	message := NewMessage([]byte("peer broadcast test message"))
	envelope, err := message.Wrap(DefaultPoW, Options{
		TTL: DefaultTTL,
	})
	if err != nil {
		t.Fatalf("failed to wrap message: %v", err)
	}
	if err := tester.client.Send(envelope); err != nil {
		t.Fatalf("failed to send message: %v", err)
	}
	// Check that the message is eventually forwarded
	payload := []interface{}{envelope}
	if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
		t.Fatalf("message mismatch: %v", err)
	}
	// Make sure that even with a re-insert, an empty batch is received
	if err := tester.client.Send(envelope); err != nil {
		t.Fatalf("failed to send message: %v", err)
	}
	if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
		t.Fatalf("message mismatch: %v", err)
	}
}

func TestPeerDeliver(t *testing.T) {
	// Start a tester and execute the handshake
169 170 171 172
	tester, err := startTestPeerInited()
	if err != nil {
		t.Fatalf("failed to start initialized peer: %v", err)
	}
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
	defer tester.stream.Close()

	// Watch for all inbound messages
	arrived := make(chan struct{}, 1)
	tester.client.Watch(Filter{
		Fn: func(message *Message) {
			arrived <- struct{}{}
		},
	})
	// Construct a message and deliver it to the tester peer
	message := NewMessage([]byte("peer broadcast test message"))
	envelope, err := message.Wrap(DefaultPoW, Options{
		TTL: DefaultTTL,
	})
	if err != nil {
		t.Fatalf("failed to wrap message: %v", err)
	}
	if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
		t.Fatalf("failed to transfer message: %v", err)
	}
	// Check that the message is delivered upstream
	select {
	case <-arrived:
	case <-time.After(time.Second):
		t.Fatalf("message delivery timeout")
	}
	// Check that a resend is not delivered
	if err := p2p.Send(tester.stream, messagesCode, []*Envelope{envelope}); err != nil {
		t.Fatalf("failed to transfer message: %v", err)
	}
	select {
204
	case <-time.After(2 * transmissionCycle):
205 206 207 208
	case <-arrived:
		t.Fatalf("repeating message arrived")
	}
}
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241

func TestPeerMessageExpiration(t *testing.T) {
	// Start a tester and execute the handshake
	tester, err := startTestPeerInited()
	if err != nil {
		t.Fatalf("failed to start initialized peer: %v", err)
	}
	defer tester.stream.Close()

	// Fetch the peer instance for later inspection
	tester.client.peerMu.RLock()
	if peers := len(tester.client.peers); peers != 1 {
		t.Fatalf("peer pool size mismatch: have %v, want %v", peers, 1)
	}
	var peer *peer
	for peer, _ = range tester.client.peers {
		break
	}
	tester.client.peerMu.RUnlock()

	// Construct a message and pass it through the tester
	message := NewMessage([]byte("peer test message"))
	envelope, err := message.Wrap(DefaultPoW, Options{
		TTL: time.Second,
	})
	if err != nil {
		t.Fatalf("failed to wrap message: %v", err)
	}
	if err := tester.client.Send(envelope); err != nil {
		t.Fatalf("failed to send message: %v", err)
	}
	payload := []interface{}{envelope}
	if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
J
Jeffrey Wilcke 已提交
242 243 244 245
		// A premature empty message may have been broadcast, check the next too
		if err := p2p.ExpectMsg(tester.stream, messagesCode, payload); err != nil {
			t.Fatalf("message mismatch: %v", err)
		}
246 247 248 249 250 251
	}
	// Check that the message is inside the cache
	if !peer.known.Has(envelope.Hash()) {
		t.Fatalf("message not found in cache")
	}
	// Discard messages until expiration and check cache again
J
Jeffrey Wilcke 已提交
252
	exp := time.Now().Add(time.Second + 2*expirationCycle + 100*time.Millisecond)
253 254 255 256 257 258 259 260 261
	for time.Now().Before(exp) {
		if err := p2p.ExpectMsg(tester.stream, messagesCode, []interface{}{}); err != nil {
			t.Fatalf("message mismatch: %v", err)
		}
	}
	if peer.known.Has(envelope.Hash()) {
		t.Fatalf("message not expired from cache")
	}
}