peer.go 9.5 KB
Newer Older
Z
zelig 已提交
1 2 3
package p2p

import (
F
Felix Lange 已提交
4
	"errors"
Z
zelig 已提交
5
	"fmt"
6 7
	"io"
	"io/ioutil"
Z
zelig 已提交
8
	"net"
9 10 11 12 13
	"sort"
	"sync"
	"time"

	"github.com/ethereum/go-ethereum/logger"
F
Felix Lange 已提交
14 15
	"github.com/ethereum/go-ethereum/p2p/discover"
	"github.com/ethereum/go-ethereum/rlp"
Z
zelig 已提交
16 17
)

F
Felix Lange 已提交
18
const (
19
	baseProtocolVersion    = 3
F
Felix Lange 已提交
20 21
	baseProtocolLength     = uint64(16)
	baseProtocolMaxMsgSize = 10 * 1024 * 1024
F
Felix Lange 已提交
22 23

	disconnectGracePeriod = 2 * time.Second
F
Felix Lange 已提交
24
)
25

F
Felix Lange 已提交
26 27 28 29 30 31 32 33 34
const (
	// devp2p message codes
	handshakeMsg = 0x00
	discMsg      = 0x01
	pingMsg      = 0x02
	pongMsg      = 0x03
	getPeersMsg  = 0x04
	peersMsg     = 0x05
)
35

F
Felix Lange 已提交
36 37 38 39 40 41 42
// handshake is the RLP structure of the protocol handshake.
type handshake struct {
	Version    uint64
	Name       string
	Caps       []Cap
	ListenPort uint64
	NodeID     discover.NodeID
43 44
}

F
Felix Lange 已提交
45
// Peer represents a connected remote node.
Z
zelig 已提交
46
type Peer struct {
47 48 49 50
	// Peers have all the log methods.
	// Use them to display messages related to the peer.
	*logger.Logger

F
Felix Lange 已提交
51 52 53
	infoMu sync.Mutex
	name   string
	caps   []Cap
54

F
Felix Lange 已提交
55 56 57 58
	ourID, remoteID *discover.NodeID
	ourName         string

	rw *frameRW
59 60

	// These fields maintain the running protocols.
F
Felix Lange 已提交
61 62 63
	protocols []Protocol
	runlock   sync.RWMutex // protects running
	running   map[string]*proto
64

F
Felix Lange 已提交
65 66
	// disables protocol handshake, for testing
	noHandshake bool
67 68 69 70 71 72 73 74

	protoWG  sync.WaitGroup
	protoErr chan error
	closed   chan struct{}
	disc     chan DiscReason
}

// NewPeer returns a peer for testing purposes.
F
Felix Lange 已提交
75
func NewPeer(id discover.NodeID, name string, caps []Cap) *Peer {
76
	conn, _ := net.Pipe()
F
Felix Lange 已提交
77 78 79
	peer := newPeer(conn, nil, "", nil, &id)
	peer.setHandshakeInfo(name, caps)
	close(peer.closed) // ensures Disconnect doesn't block
Z
zelig 已提交
80 81 82
	return peer
}

F
Felix Lange 已提交
83 84 85
// ID returns the node's public key.
func (p *Peer) ID() discover.NodeID {
	return *p.remoteID
86 87
}

F
Felix Lange 已提交
88 89 90 91 92 93 94 95
// Name returns the node name that the remote node advertised.
func (p *Peer) Name() string {
	// this needs a lock because the information is part of the
	// protocol handshake.
	p.infoMu.Lock()
	name := p.name
	p.infoMu.Unlock()
	return name
96 97
}

98 99
// Caps returns the capabilities (supported subprotocols) of the remote peer.
func (p *Peer) Caps() []Cap {
F
Felix Lange 已提交
100 101 102 103 104 105
	// this needs a lock because the information is part of the
	// protocol handshake.
	p.infoMu.Lock()
	caps := p.caps
	p.infoMu.Unlock()
	return caps
106 107 108 109
}

// RemoteAddr returns the remote address of the network connection.
func (p *Peer) RemoteAddr() net.Addr {
F
Felix Lange 已提交
110
	return p.rw.RemoteAddr()
111 112 113 114
}

// LocalAddr returns the local address of the network connection.
func (p *Peer) LocalAddr() net.Addr {
F
Felix Lange 已提交
115
	return p.rw.LocalAddr()
116 117 118 119 120 121 122 123 124 125 126 127 128
}

// Disconnect terminates the peer connection with the given reason.
// It returns immediately and does not wait until the connection is closed.
func (p *Peer) Disconnect(reason DiscReason) {
	select {
	case p.disc <- reason:
	case <-p.closed:
	}
}

// String implements fmt.Stringer.
func (p *Peer) String() string {
F
Felix Lange 已提交
129
	return fmt.Sprintf("Peer %.8x %v", p.remoteID[:], p.RemoteAddr())
F
Felix Lange 已提交
130 131 132
}

func newPeer(conn net.Conn, protocols []Protocol, ourName string, ourID, remoteID *discover.NodeID) *Peer {
F
Felix Lange 已提交
133
	logtag := fmt.Sprintf("Peer %.8x %v", remoteID[:], conn.RemoteAddr())
F
Felix Lange 已提交
134 135 136 137 138 139 140 141 142 143 144
	return &Peer{
		Logger:    logger.NewLogger(logtag),
		rw:        newFrameRW(conn, msgWriteTimeout),
		ourID:     ourID,
		ourName:   ourName,
		remoteID:  remoteID,
		protocols: protocols,
		running:   make(map[string]*proto),
		disc:      make(chan DiscReason),
		protoErr:  make(chan error),
		closed:    make(chan struct{}),
Z
zelig 已提交
145
	}
146 147
}

F
Felix Lange 已提交
148 149 150 151 152 153
func (p *Peer) setHandshakeInfo(name string, caps []Cap) {
	p.infoMu.Lock()
	p.name = name
	p.caps = caps
	p.infoMu.Unlock()
}
154

F
Felix Lange 已提交
155 156
func (p *Peer) run() DiscReason {
	var readErr = make(chan error, 1)
157 158
	defer p.closeProtocols()
	defer close(p.closed)
F
Felix Lange 已提交
159 160

	go func() { readErr <- p.readLoop() }()
161

F
Felix Lange 已提交
162
	if !p.noHandshake {
F
Felix Lange 已提交
163 164
		if err := writeProtocolHandshake(p.rw, p.ourName, *p.ourID, p.protocols); err != nil {
			p.DebugDetailf("Protocol handshake error: %v\n", err)
F
Felix Lange 已提交
165
			p.rw.Close()
F
Felix Lange 已提交
166
			return DiscProtocolError
167 168 169
		}
	}

F
Felix Lange 已提交
170
	// Wait for an error or disconnect.
F
Felix Lange 已提交
171 172 173 174 175 176
	var reason DiscReason
	select {
	case err := <-readErr:
		// We rely on protocols to abort if there is a write error. It
		// might be more robust to handle them here as well.
		p.DebugDetailf("Read error: %v\n", err)
F
Felix Lange 已提交
177 178 179
		p.rw.Close()
		return DiscNetworkError

F
Felix Lange 已提交
180 181 182
	case err := <-p.protoErr:
		reason = discReasonForError(err)
	case reason = <-p.disc:
183
	}
F
Felix Lange 已提交
184 185 186 187
	p.politeDisconnect(reason)

	// Wait for readLoop. It will end because conn is now closed.
	<-readErr
F
Felix Lange 已提交
188 189 190
	p.Debugf("Disconnected: %v\n", reason)
	return reason
}
191

F
Felix Lange 已提交
192
func (p *Peer) politeDisconnect(reason DiscReason) {
193 194
	done := make(chan struct{})
	go func() {
F
Felix Lange 已提交
195
		EncodeMsg(p.rw, discMsg, uint(reason))
F
Felix Lange 已提交
196 197
		// Wait for the other side to close the connection.
		// Discard any data that they send until then.
F
Felix Lange 已提交
198
		io.Copy(ioutil.Discard, p.rw)
199 200 201 202 203 204
		close(done)
	}()
	select {
	case <-done:
	case <-time.After(disconnectGracePeriod):
	}
F
Felix Lange 已提交
205
	p.rw.Close()
206 207
}

F
Felix Lange 已提交
208
func (p *Peer) readLoop() error {
F
Felix Lange 已提交
209
	if !p.noHandshake {
F
Felix Lange 已提交
210 211
		if err := readProtocolHandshake(p, p.rw); err != nil {
			return err
212 213
		}
	}
F
Felix Lange 已提交
214 215 216 217 218 219 220 221 222 223
	for {
		msg, err := p.rw.ReadMsg()
		if err != nil {
			return err
		}
		if err = p.handle(msg); err != nil {
			return err
		}
	}
	return nil
224 225
}

F
Felix Lange 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
func (p *Peer) handle(msg Msg) error {
	switch {
	case msg.Code == pingMsg:
		msg.Discard()
		go EncodeMsg(p.rw, pongMsg)
	case msg.Code == discMsg:
		var reason DiscReason
		// no need to discard or for error checking, we'll close the
		// connection after this.
		rlp.Decode(msg.Payload, &reason)
		p.Disconnect(DiscRequested)
		return discRequestedError(reason)
	case msg.Code < baseProtocolLength:
		// ignore other base protocol messages
		return msg.Discard()
	default:
		// it's a subprotocol message
		proto, err := p.getProto(msg.Code)
244
		if err != nil {
F
Felix Lange 已提交
245
			return fmt.Errorf("msg code out of range: %v", msg.Code)
246 247 248
		}
		proto.in <- msg
	}
F
Felix Lange 已提交
249
	return nil
250 251
}

F
Felix Lange 已提交
252 253 254 255 256
func readProtocolHandshake(p *Peer, rw MsgReadWriter) error {
	// read and handle remote handshake
	msg, err := rw.ReadMsg()
	if err != nil {
		return err
257
	}
258 259 260 261 262 263 264
	if msg.Code == discMsg {
		// disconnect before protocol handshake is valid according to the
		// spec and we send it ourself if Server.addPeer fails.
		var reason DiscReason
		rlp.Decode(msg.Payload, &reason)
		return discRequestedError(reason)
	}
F
Felix Lange 已提交
265 266
	if msg.Code != handshakeMsg {
		return newPeerError(errProtocolBreach, "expected handshake, got %x", msg.Code)
267
	}
F
Felix Lange 已提交
268
	if msg.Size > baseProtocolMaxMsgSize {
F
Felix Lange 已提交
269
		return newPeerError(errInvalidMsg, "message too big")
270
	}
F
Felix Lange 已提交
271 272 273
	var hs handshake
	if err := msg.Decode(&hs); err != nil {
		return err
274
	}
F
Felix Lange 已提交
275 276 277 278
	// validate handshake info
	if hs.Version != baseProtocolVersion {
		return newPeerError(errP2PVersionMismatch, "required version %d, received %d\n",
			baseProtocolVersion, hs.Version)
279
	}
F
Felix Lange 已提交
280 281
	if hs.NodeID == *p.remoteID {
		return newPeerError(errPubkeyForbidden, "node ID mismatch")
282
	}
F
Felix Lange 已提交
283 284 285 286
	// TODO: remove Caps with empty name
	p.setHandshakeInfo(hs.Name, hs.Caps)
	p.startSubprotocols(hs.Caps)
	return nil
287 288
}

F
Felix Lange 已提交
289 290 291 292 293 294
func writeProtocolHandshake(w MsgWriter, name string, id discover.NodeID, ps []Protocol) error {
	var caps []interface{}
	for _, proto := range ps {
		caps = append(caps, proto.cap())
	}
	return EncodeMsg(w, handshakeMsg, baseProtocolVersion, name, caps, 0, id)
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
}

// startProtocols starts matching named subprotocols.
func (p *Peer) startSubprotocols(caps []Cap) {
	sort.Sort(capsByName(caps))
	p.runlock.Lock()
	defer p.runlock.Unlock()
	offset := baseProtocolLength
outer:
	for _, cap := range caps {
		for _, proto := range p.protocols {
			if proto.Name == cap.Name &&
				proto.Version == cap.Version &&
				p.running[cap.Name] == nil {
				p.running[cap.Name] = p.startProto(offset, proto)
				offset += proto.Length
				continue outer
			}
		}
	}
}

func (p *Peer) startProto(offset uint64, impl Protocol) *proto {
F
Felix Lange 已提交
318
	p.DebugDetailf("Starting protocol %s/%d\n", impl.Name, impl.Version)
319
	rw := &proto{
F
Felix Lange 已提交
320
		name:    impl.Name,
321 322 323
		in:      make(chan Msg),
		offset:  offset,
		maxcode: impl.Length,
F
Felix Lange 已提交
324
		w:       p.rw,
325 326 327 328 329
	}
	p.protoWG.Add(1)
	go func() {
		err := impl.Run(p, rw)
		if err == nil {
F
Felix Lange 已提交
330
			p.DebugDetailf("Protocol %s/%d returned\n", impl.Name, impl.Version)
F
Felix Lange 已提交
331
			err = errors.New("protocol returned")
332
		} else {
F
Felix Lange 已提交
333
			p.DebugDetailf("Protocol %s/%d error: %v\n", impl.Name, impl.Version, err)
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
		}
		select {
		case p.protoErr <- err:
		case <-p.closed:
		}
		p.protoWG.Done()
	}()
	return rw
}

// getProto finds the protocol responsible for handling
// the given message code.
func (p *Peer) getProto(code uint64) (*proto, error) {
	p.runlock.RLock()
	defer p.runlock.RUnlock()
	for _, proto := range p.running {
		if code >= proto.offset && code < proto.offset+proto.maxcode {
			return proto, nil
		}
	}
	return nil, newPeerError(errInvalidMsgCode, "%d", code)
}

func (p *Peer) closeProtocols() {
	p.runlock.RLock()
	for _, p := range p.running {
		close(p.in)
	}
	p.runlock.RUnlock()
	p.protoWG.Wait()
}

// writeProtoMsg sends the given message on behalf of the given named protocol.
F
Felix Lange 已提交
367
// this exists because of Server.Broadcast.
368 369 370 371 372 373 374 375 376 377 378
func (p *Peer) writeProtoMsg(protoName string, msg Msg) error {
	p.runlock.RLock()
	proto, ok := p.running[protoName]
	p.runlock.RUnlock()
	if !ok {
		return fmt.Errorf("protocol %s not handled by peer", protoName)
	}
	if msg.Code >= proto.maxcode {
		return newPeerError(errInvalidMsgCode, "code %x is out of range for protocol %q", msg.Code, protoName)
	}
	msg.Code += proto.offset
F
Felix Lange 已提交
379
	return p.rw.WriteMsg(msg)
380 381 382 383 384 385
}

type proto struct {
	name            string
	in              chan Msg
	maxcode, offset uint64
F
Felix Lange 已提交
386
	w               MsgWriter
387 388 389 390 391 392 393
}

func (rw *proto) WriteMsg(msg Msg) error {
	if msg.Code >= rw.maxcode {
		return newPeerError(errInvalidMsgCode, "not handled")
	}
	msg.Code += rw.offset
F
Felix Lange 已提交
394
	return rw.w.WriteMsg(msg)
Z
zelig 已提交
395 396
}

397 398 399 400 401 402 403
func (rw *proto) ReadMsg() (Msg, error) {
	msg, ok := <-rw.in
	if !ok {
		return msg, io.EOF
	}
	msg.Code -= rw.offset
	return msg, nil
Z
zelig 已提交
404
}