peer.go 9.3 KB
Newer Older
Z
zelig 已提交
1 2 3
package p2p

import (
F
Felix Lange 已提交
4
	"errors"
Z
zelig 已提交
5
	"fmt"
6 7
	"io"
	"io/ioutil"
Z
zelig 已提交
8
	"net"
9 10 11 12 13
	"sort"
	"sync"
	"time"

	"github.com/ethereum/go-ethereum/logger"
F
Felix Lange 已提交
14 15
	"github.com/ethereum/go-ethereum/p2p/discover"
	"github.com/ethereum/go-ethereum/rlp"
Z
zelig 已提交
16 17
)

F
Felix Lange 已提交
18 19 20 21
const (
	baseProtocolVersion    = 2
	baseProtocolLength     = uint64(16)
	baseProtocolMaxMsgSize = 10 * 1024 * 1024
F
Felix Lange 已提交
22 23

	disconnectGracePeriod = 2 * time.Second
F
Felix Lange 已提交
24
)
25

F
Felix Lange 已提交
26 27 28 29 30 31 32 33 34
const (
	// devp2p message codes
	handshakeMsg = 0x00
	discMsg      = 0x01
	pingMsg      = 0x02
	pongMsg      = 0x03
	getPeersMsg  = 0x04
	peersMsg     = 0x05
)
35

F
Felix Lange 已提交
36 37 38 39 40 41 42
// handshake is the RLP structure of the protocol handshake.
type handshake struct {
	Version    uint64
	Name       string
	Caps       []Cap
	ListenPort uint64
	NodeID     discover.NodeID
43 44
}

F
Felix Lange 已提交
45
// Peer represents a connected remote node.
Z
zelig 已提交
46
type Peer struct {
47 48 49 50
	// Peers have all the log methods.
	// Use them to display messages related to the peer.
	*logger.Logger

F
Felix Lange 已提交
51 52 53
	infoMu sync.Mutex
	name   string
	caps   []Cap
54

F
Felix Lange 已提交
55 56 57 58
	ourID, remoteID *discover.NodeID
	ourName         string

	rw *frameRW
59 60

	// These fields maintain the running protocols.
F
Felix Lange 已提交
61 62 63
	protocols []Protocol
	runlock   sync.RWMutex // protects running
	running   map[string]*proto
64

F
Felix Lange 已提交
65 66
	// disables protocol handshake, for testing
	noHandshake bool
67 68 69 70 71 72 73 74

	protoWG  sync.WaitGroup
	protoErr chan error
	closed   chan struct{}
	disc     chan DiscReason
}

// NewPeer returns a peer for testing purposes.
F
Felix Lange 已提交
75
func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer {
76
	conn, _ := net.Pipe()
F
Felix Lange 已提交
77 78 79
	peer := newPeer(conn, nil, "", nil, &id)
	peer.setHandshakeInfo(name, caps)
	close(peer.closed) // ensures Disconnect doesn't block
Z
zelig 已提交
80 81 82
	return peer
}

F
Felix Lange 已提交
83 84 85
// ID returns the node's public key.
func (p *Peer) ID() discover.NodeID {
	return *p.remoteID
86 87
}

F
Felix Lange 已提交
88 89 90 91 92 93 94 95
// Name returns the node name that the remote node advertised.
func (p *Peer) Name() string {
	// this needs a lock because the information is part of the
	// protocol handshake.
	p.infoMu.Lock()
	name := p.name
	p.infoMu.Unlock()
	return name
96 97
}

98 99
// Caps returns the capabilities (supported subprotocols) of the remote peer.
func (p *Peer) Caps() []Cap {
F
Felix Lange 已提交
100 101 102 103 104 105
	// this needs a lock because the information is part of the
	// protocol handshake.
	p.infoMu.Lock()
	caps := p.caps
	p.infoMu.Unlock()
	return caps
106 107 108 109
}

// RemoteAddr returns the remote address of the network connection.
func (p *Peer) RemoteAddr() net.Addr {
F
Felix Lange 已提交
110
	return p.rw.RemoteAddr()
111 112 113 114
}

// LocalAddr returns the local address of the network connection.
func (p *Peer) LocalAddr() net.Addr {
F
Felix Lange 已提交
115
	return p.rw.LocalAddr()
116 117 118 119 120 121 122 123 124 125 126 127 128
}

// Disconnect terminates the peer connection with the given reason.
// It returns immediately and does not wait until the connection is closed.
func (p *Peer) Disconnect(reason DiscReason) {
	select {
	case p.disc <- reason:
	case <-p.closed:
	}
}

// String implements fmt.Stringer.
func (p *Peer) String() string {
F
Felix Lange 已提交
129
	return fmt.Sprintf("Peer %.8x %v", p.remoteID[:], p.RemoteAddr())
F
Felix Lange 已提交
130 131 132
}

func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer {
F
Felix Lange 已提交
133
	logtag := fmt.Sprintf("Peer %.8x %v", remoteID[:], conn.RemoteAddr())
F
Felix Lange 已提交
134 135 136 137 138 139 140 141 142 143 144
	return &Peer{
		Logger:    logger.NewLogger(logtag),
		rw:        newFrameRW(conn, msgWriteTimeout),
		ourID:     ourID,
		ourName:   ourName,
		remoteID:  remoteID,
		protocols: protocols,
		running:   make(map[string]*proto),
		disc:      make(chan DiscReason),
		protoErr:  make(chan error),
		closed:    make(chan struct{}),
Z
zelig 已提交
145
	}
146 147
}

F
Felix Lange 已提交
148 149 150 151 152 153
func (p *Peer) setHandshakeInfo(name string, caps []Cap) {
	p.infoMu.Lock()
	p.name = name
	p.caps = caps
	p.infoMu.Unlock()
}
154

F
Felix Lange 已提交
155 156
func (p *Peer) run() DiscReason {
	var readErr = make(chan error, 1)
157 158
	defer p.closeProtocols()
	defer close(p.closed)
F
Felix Lange 已提交
159 160

	go func() { readErr <- p.readLoop() }()
161

F
Felix Lange 已提交
162
	if !p.noHandshake {
F
Felix Lange 已提交
163 164
		if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil {
			p.DebugDetailf("Protocol handshake error: %v\n", err)
F
Felix Lange 已提交
165
			p.rw.Close()
F
Felix Lange 已提交
166
			return DiscProtocolError
167 168 169
		}
	}

F
Felix Lange 已提交
170
	// Wait for an error or disconnect.
F
Felix Lange 已提交
171 172 173 174 175 176
	var reason DiscReason
	select {
	case err := <-readErr:
		// We rely on protocols to abort if there is a write error. It
		// might be more robust to handle them here as well.
		p.DebugDetailf("Read error: %v\n", err)
F
Felix Lange 已提交
177 178 179
		p.rw.Close()
		return DiscNetworkError

F
Felix Lange 已提交
180 181 182
	case err := <-p.protoErr:
		reason = discReasonForError(err)
	case reason = <-p.disc:
183
	}
F
Felix Lange 已提交
184 185 186 187
	p.politeDisconnect(reason)

	// Wait for readLoop. It will end because conn is now closed.
	<-readErr
F
Felix Lange 已提交
188 189 190
	p.Debugf("Disconnected: %v\n", reason)
	return reason
}
191

F
Felix Lange 已提交
192
func (p *Peer) politeDisconnect(reason DiscReason) {
193 194
	done := make(chan struct{})
	go func() {
F
Felix Lange 已提交
195
		EncodeMsg(p.rw, discMsg, uint(reason))
F
Felix Lange 已提交
196 197
		// Wait for the other side to close the connection.
		// Discard any data that they send until then.
F
Felix Lange 已提交
198
		io.Copy(ioutil.Discard, p.rw)
199 200 201 202 203 204
		close(done)
	}()
	select {
	case <-done:
	case <-time.After(disconnectGracePeriod):
	}
F
Felix Lange 已提交
205
	p.rw.Close()
206 207
}

F
Felix Lange 已提交
208
func (p *Peer) readLoop() error {
F
Felix Lange 已提交
209
	if !p.noHandshake {
F
Felix Lange 已提交
210 211
		if err := readProtocolHandshake(p, p.rw); err != nil {
			return err
212 213
		}
	}
F
Felix Lange 已提交
214 215 216 217 218 219 220 221 222 223
	for {
		msg, err := p.rw.ReadMsg()
		if err != nil {
			return err
		}
		if err = p.handle(msg); err != nil {
			return err
		}
	}
	return nil
224 225
}

F
Felix Lange 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
func (p *Peer) handle(msg Msg) error {
	switch {
	case msg.Code == pingMsg:
		msg.Discard()
		go EncodeMsg(p.rw, pongMsg)
	case msg.Code == discMsg:
		var reason DiscReason
		// no need to discard or for error checking, we'll close the
		// connection after this.
		rlp.Decode(msg.Payload, &reason)
		p.Disconnect(DiscRequested)
		return discRequestedError(reason)
	case msg.Code < baseProtocolLength:
		// ignore other base protocol messages
		return msg.Discard()
	default:
		// it's a subprotocol message
		proto, err := p.getProto(msg.Code)
244
		if err != nil {
F
Felix Lange 已提交
245
			return fmt.Errorf("msg code out of range: %v", msg.Code)
246 247 248
		}
		proto.in <- msg
	}
F
Felix Lange 已提交
249
	return nil
250 251
}

F
Felix Lange 已提交
252 253 254 255 256
func readProtocolHandshake(p *Peer, rw MsgReadWriter) error {
	// read and handle remote handshake
	msg, err := rw.ReadMsg()
	if err != nil {
		return err
257
	}
F
Felix Lange 已提交
258 259
	if msg.Code != handshakeMsg {
		return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code)
260
	}
F
Felix Lange 已提交
261
	if msg.Size > baseProtocolMaxMsgSize {
F
Felix Lange 已提交
262
		return newPeerError(errInvalidMsg, "message too big")
263
	}
F
Felix Lange 已提交
264 265 266
	var hs handshake
	if err := msg.Decode(&hs); err != nil {
		return err
267
	}
F
Felix Lange 已提交
268 269 270 271
	// validate handshake info
	if hs.Version != baseProtocolVersion {
		return newPeerError(errP2PVersionMismatch, "required version %d, received %d\n",
			baseProtocolVersion, hs.Version)
272
	}
F
Felix Lange 已提交
273 274
	if hs.NodeID == *p.remoteID {
		return newPeerError(errPubkeyForbidden, "node ID mismatch")
275
	}
F
Felix Lange 已提交
276 277 278 279
	// TODO: remove Caps with empty name
	p.setHandshakeInfo(hs.Name, hs.Caps)
	p.startSubprotocols(hs.Caps)
	return nil
280 281
}

F
Felix Lange 已提交
282 283 284 285 286 287
func writeProtocolHandshake(w MsgWriter, name string, id discover.NodeID, ps []Protocol) error {
	var caps []interface{}
	for _, proto := range ps {
		caps = append(caps, proto.cap())
	}
	return EncodeMsg(w, handshakeMsg, baseProtocolVersion, name, caps, 0, id)
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
}

// startProtocols starts matching named subprotocols.
func (p *Peer) startSubprotocols(caps []Cap) {
	sort.Sort(capsByName(caps))
	p.runlock.Lock()
	defer p.runlock.Unlock()
	offset := baseProtocolLength
outer:
	for _, cap := range caps {
		for _, proto := range p.protocols {
			if proto.Name == cap.Name &&
				proto.Version == cap.Version &&
				p.running[cap.Name] == nil {
				p.running[cap.Name] = p.startProto(offset, proto)
				offset += proto.Length
				continue outer
			}
		}
	}
}

func (p *Peer) startProto(offset uint64, impl Protocol) *proto {
F
Felix Lange 已提交
311
	p.DebugDetailf("Starting protocol %s/%d\n", impl.Name, impl.Version)
312
	rw := &proto{
F
Felix Lange 已提交
313
		name:    impl.Name,
314 315 316
		in:      make(chan Msg),
		offset:  offset,
		maxcode: impl.Length,
F
Felix Lange 已提交
317
		w:       p.rw,
318 319 320 321 322
	}
	p.protoWG.Add(1)
	go func() {
		err := impl.Run(p, rw)
		if err == nil {
F
Felix Lange 已提交
323
			p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version)
F
Felix Lange 已提交
324
			err = errors.New("protocol returned")
325
		} else {
F
Felix Lange 已提交
326
			p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err)
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
		}
		select {
		case p.protoErr <- err:
		case <-p.closed:
		}
		p.protoWG.Done()
	}()
	return rw
}

// getProto finds the protocol responsible for handling
// the given message code.
func (p *Peer) getProto(code uint64) (*proto, error) {
	p.runlock.RLock()
	defer p.runlock.RUnlock()
	for _, proto := range p.running {
		if code >= proto.offset && code < proto.offset+proto.maxcode {
			return proto, nil
		}
	}
	return nil, newPeerError(errInvalidMsgCode, "%d", code)
}

func (p *Peer) closeProtocols() {
	p.runlock.RLock()
	for _, p := range p.running {
		close(p.in)
	}
	p.runlock.RUnlock()
	p.protoWG.Wait()
}

// writeProtoMsg sends the given message on behalf of the given named protocol.
F
Felix Lange 已提交
360
// this exists because of Server.Broadcast.
361 362 363 364 365 366 367 368 369 370 371
func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
	p.runlock.RLock()
	proto, ok := p.running[protoName]
	p.runlock.RUnlock()
	if !ok {
		return fmt.Errorf("protocol %s not handled by peer", protoName)
	}
	if msg.Code >= proto.maxcode {
		return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName)
	}
	msg.Code += proto.offset
F
Felix Lange 已提交
372
	return p.rw.WriteMsg(msg)
373 374 375 376 377 378
}

type proto struct {
	name            string
	in              chan Msg
	maxcode, offset uint64
F
Felix Lange 已提交
379
	w               MsgWriter
380 381 382 383 384 385 386
}

func (rw *proto) WriteMsg(msg Msg) error {
	if msg.Code >= rw.maxcode {
		return newPeerError(errInvalidMsgCode, "not handled")
	}
	msg.Code += rw.offset
F
Felix Lange 已提交
387
	return rw.w.WriteMsg(msg)
Z
zelig 已提交
388 389
}

390 391 392 393 394 395 396
func (rw *proto) ReadMsg() (Msg, error) {
	msg, ok := <-rw.in
	if !ok {
		return msg, io.EOF
	}
	msg.Code -= rw.offset
	return msg, nil
Z
zelig 已提交
397
}