server.go 15.5 KB
Newer Older
Z
zelig 已提交
1 2 3
package p2p

import (
F
Felix Lange 已提交
4
	"crypto/ecdsa"
F
Felix Lange 已提交
5
	"crypto/rand"
6
	"errors"
Z
zelig 已提交
7 8 9 10 11
	"fmt"
	"net"
	"sync"
	"time"

12
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
13
	"github.com/ethereum/go-ethereum/logger/glog"
F
Felix Lange 已提交
14
	"github.com/ethereum/go-ethereum/p2p/discover"
15
	"github.com/ethereum/go-ethereum/p2p/nat"
Z
zelig 已提交
16 17 18
)

const (
19
	defaultDialTimeout      = 15 * time.Second
20 21
	refreshPeersInterval    = 30 * time.Second
	staticPeerCheckInterval = 15 * time.Second
F
Felix Lange 已提交
22

23
	// Maximum number of concurrently handshaking inbound connections.
24
	maxAcceptConns = 50
25

26
	// Maximum number of concurrently dialing outbound connections.
27
	maxDialingConns = 10
28

F
Felix Lange 已提交
29 30 31 32 33 34 35 36
	// total timeout for encryption handshake and protocol
	// handshake in both directions.
	handshakeTimeout = 5 * time.Second
	// maximum time allowed for reading a complete message.
	// this is effectively the amount of time a connection can be idle.
	frameReadTimeout = 1 * time.Minute
	// maximum amount of time allowed for writing a complete message.
	frameWriteTimeout = 5 * time.Second
Z
zelig 已提交
37 38
)

39
var srvjslog = logger.NewJsonLogger()
Z
zelig 已提交
40

41 42 43 44 45
// Server manages all peer connections.
//
// The fields of Server are used as configuration parameters.
// You should set them before starting the Server. Fields may not be
// modified while the server is running.
Z
zelig 已提交
46
type Server struct {
F
Felix Lange 已提交
47 48
	// This field must be set to a valid secp256k1 private key.
	PrivateKey *ecdsa.PrivateKey
49 50 51 52 53

	// MaxPeers is the maximum number of peers that can be
	// connected. It must be greater than zero.
	MaxPeers int

54 55 56 57 58
	// MaxPendingPeers is the maximum number of peers that can be pending in the
	// handshake phase, counted separately for inbound and outbound connections.
	// Zero defaults to preset values.
	MaxPendingPeers int

F
Felix Lange 已提交
59
	// Name sets the node name of this server.
O
obscuren 已提交
60
	// Use common.MakeName to create a name that follows existing conventions.
F
Felix Lange 已提交
61 62 63 64
	Name string

	// Bootstrap nodes are used to establish connectivity
	// with the rest of the network.
65
	BootstrapNodes []*discover.Node
F
Felix Lange 已提交
66

67 68 69
	// Static nodes are used as pre-configured connections which are always
	// maintained and re-connected on disconnects.
	StaticNodes []*discover.Node
70

71 72 73 74
	// Trusted nodes are used as pre-configured connections which are always
	// allowed to connect, even above the peer limit.
	TrustedNodes []*discover.Node

75 76 77
	// NodeDatabase is the path to the database containing the previously seen
	// live nodes in the network.
	NodeDatabase string
78

79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
	// Protocols should contain the protocols supported
	// by the server. Matching protocols are launched for
	// each peer.
	Protocols []Protocol

	// If ListenAddr is set to a non-nil address, the server
	// will listen for incoming connections.
	//
	// If the port is zero, the operating system will pick a port. The
	// ListenAddr field will be updated with the actual address when
	// the server is started.
	ListenAddr string

	// If set to a non-nil value, the given NAT port mapper
	// is used to make the listening port available to the
	// Internet.
95
	NAT nat.Interface
96 97 98 99 100 101 102 103

	// If Dialer is set to a non-nil value, the given Dialer
	// is used to dial outbound peer connections.
	Dialer *net.Dialer

	// If NoDial is true, the server will not dial any peers.
	NoDial bool

F
Felix Lange 已提交
104
	// Hooks for testing. These are useful because we can inhibit
105
	// the whole protocol stack.
F
Felix Lange 已提交
106
	setupFunc
F
Felix Lange 已提交
107
	newPeerHook
Z
zelig 已提交
108

F
Felix Lange 已提交
109 110
	ourHandshake *protoHandshake

111 112 113 114 115 116 117
	lock         sync.RWMutex // protects running, peers and the trust fields
	running      bool
	peers        map[discover.NodeID]*Peer
	staticNodes  map[discover.NodeID]*discover.Node // Map of currently maintained static remote nodes
	staticDial   chan *discover.Node                // Dial request channel reserved for the static nodes
	staticCycle  time.Duration                      // Overrides staticPeerCheckInterval, used for testing
	trustedNodes map[discover.NodeID]bool           // Set of currently trusted remote nodes
F
Felix Lange 已提交
118

119 120
	ntab     *discover.Table
	listener net.Listener
F
Felix Lange 已提交
121

122 123 124
	quit   chan struct{}
	loopWG sync.WaitGroup // {dial,listen,nat}Loop
	peerWG sync.WaitGroup // active peer goroutines
Z
zelig 已提交
125 126
}

F
Felix Lange 已提交
127
type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, func(discover.NodeID) bool) (*conn, error)
F
Felix Lange 已提交
128
type newPeerHook func(*Peer)
Z
zelig 已提交
129

130 131 132 133 134
// Peers returns all connected peers.
func (srv *Server) Peers() (peers []*Peer) {
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	for _, peer := range srv.peers {
Z
zelig 已提交
135 136 137 138 139 140 141
		if peer != nil {
			peers = append(peers, peer)
		}
	}
	return
}

142 143 144
// PeerCount returns the number of connected peers.
func (srv *Server) PeerCount() int {
	srv.lock.RLock()
F
Felix Lange 已提交
145 146 147
	n := len(srv.peers)
	srv.lock.RUnlock()
	return n
Z
zelig 已提交
148 149
}

150 151 152 153
// AddPeer connects to the given node and maintains the connection until the
// server is shut down. If the connection fails for any reason, the server will
// attempt to reconnect the peer.
func (srv *Server) AddPeer(node *discover.Node) {
154 155 156
	srv.lock.Lock()
	defer srv.lock.Unlock()

157
	srv.staticNodes[node.ID] = node
Z
zelig 已提交
158 159
}

160 161 162 163 164 165 166 167
// Start starts running the server.
// Servers can be re-used and started again after stopping.
func (srv *Server) Start() (err error) {
	srv.lock.Lock()
	defer srv.lock.Unlock()
	if srv.running {
		return errors.New("server already running")
	}
O
obscuren 已提交
168
	glog.V(logger.Info).Infoln("Starting Server")
169

170
	// static fields
F
Felix Lange 已提交
171 172
	if srv.PrivateKey == nil {
		return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
Z
zelig 已提交
173
	}
174 175 176 177
	if srv.MaxPeers <= 0 {
		return fmt.Errorf("Server.MaxPeers must be > 0")
	}
	srv.quit = make(chan struct{})
F
Felix Lange 已提交
178
	srv.peers = make(map[discover.NodeID]*Peer)
179

180 181 182 183 184
	// Create the current trust maps, and the associated dialing channel
	srv.trustedNodes = make(map[discover.NodeID]bool)
	for _, node := range srv.TrustedNodes {
		srv.trustedNodes[node.ID] = true
	}
185
	srv.staticNodes = make(map[discover.NodeID]*discover.Node)
186
	for _, node := range srv.StaticNodes {
187
		srv.staticNodes[node.ID] = node
188
	}
189
	srv.staticDial = make(chan *discover.Node)
190

F
Felix Lange 已提交
191 192
	if srv.setupFunc == nil {
		srv.setupFunc = setupConn
193
	}
F
Felix Lange 已提交
194

195
	// node table
196
	ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
F
Felix Lange 已提交
197 198 199
	if err != nil {
		return err
	}
F
Felix Lange 已提交
200 201
	srv.ntab = ntab

202
	// handshake
203
	srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self().ID}
F
Felix Lange 已提交
204 205 206 207
	for _, p := range srv.Protocols {
		srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
	}

208 209 210 211 212 213
	// listen/dial
	if srv.ListenAddr != "" {
		if err := srv.startListening(); err != nil {
			return err
		}
	}
F
Felix Lange 已提交
214 215 216
	if srv.Dialer == nil {
		srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
	}
217
	if !srv.NoDial {
F
Felix Lange 已提交
218
		srv.loopWG.Add(1)
219 220 221
		go srv.dialLoop()
	}
	if srv.NoDial && srv.ListenAddr == "" {
O
obscuren 已提交
222
		glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
223
	}
224 225
	// maintain the static peers
	go srv.staticNodesLoop()
226 227 228

	srv.running = true
	return nil
Z
zelig 已提交
229 230
}

231 232 233 234 235
func (srv *Server) startListening() error {
	listener, err := net.Listen("tcp", srv.ListenAddr)
	if err != nil {
		return err
	}
236 237
	laddr := listener.Addr().(*net.TCPAddr)
	srv.ListenAddr = laddr.String()
238
	srv.listener = listener
F
Felix Lange 已提交
239
	srv.loopWG.Add(1)
240
	go srv.listenLoop()
241
	if !laddr.IP.IsLoopback() && srv.NAT != nil {
F
Felix Lange 已提交
242
		srv.loopWG.Add(1)
243 244 245 246
		go func() {
			nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
			srv.loopWG.Done()
		}()
247 248 249 250 251 252 253 254 255 256 257
	}
	return nil
}

// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
	srv.lock.Lock()
	if !srv.running {
		srv.lock.Unlock()
		return
Z
zelig 已提交
258
	}
259 260 261
	srv.running = false
	srv.lock.Unlock()

O
obscuren 已提交
262
	glog.V(logger.Info).Infoln("Stopping Server")
F
Felix Lange 已提交
263
	srv.ntab.Close()
264 265 266 267 268
	if srv.listener != nil {
		// this unblocks listener Accept
		srv.listener.Close()
	}
	close(srv.quit)
F
Felix Lange 已提交
269
	srv.loopWG.Wait()
270

F
Felix Lange 已提交
271 272 273
	// No new peers can be added at this point because dialLoop and
	// listenLoop are down. It is safe to call peerWG.Wait because
	// peerWG.Add is not called outside of those loops.
274
	srv.lock.Lock()
F
Felix Lange 已提交
275 276
	for _, peer := range srv.peers {
		peer.Disconnect(DiscQuitting)
277
	}
278
	srv.lock.Unlock()
F
Felix Lange 已提交
279
	srv.peerWG.Wait()
280
}
Z
zelig 已提交
281

282 283
// Self returns the local node's endpoint information.
func (srv *Server) Self() *discover.Node {
284 285 286 287 288
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	if !srv.running {
		return &discover.Node{IP: net.ParseIP("0.0.0.0")}
	}
289 290 291
	return srv.ntab.Self()
}

292 293
// main loop for adding connections via listening
func (srv *Server) listenLoop() {
F
Felix Lange 已提交
294
	defer srv.loopWG.Done()
295 296 297 298

	// This channel acts as a semaphore limiting
	// active inbound connections that are lingering pre-handshake.
	// If all slots are taken, no further connections are accepted.
299 300 301 302 303 304
	tokens := maxAcceptConns
	if srv.MaxPendingPeers > 0 {
		tokens = srv.MaxPendingPeers
	}
	slots := make(chan struct{}, tokens)
	for i := 0; i < tokens; i++ {
305 306 307
		slots <- struct{}{}
	}

O
obscuren 已提交
308
	glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
Z
zelig 已提交
309
	for {
310
		<-slots
F
Felix Lange 已提交
311 312
		conn, err := srv.listener.Accept()
		if err != nil {
313
			return
Z
zelig 已提交
314
		}
O
obscuren 已提交
315
		glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr())
F
Felix Lange 已提交
316
		srv.peerWG.Add(1)
317 318 319 320
		go func() {
			srv.startPeer(conn, nil)
			slots <- struct{}{}
		}()
Z
zelig 已提交
321 322 323
	}
}

324
// staticNodesLoop is responsible for periodically checking that static
325
// connections are actually live, and requests dialing if not.
326
func (srv *Server) staticNodesLoop() {
327 328 329 330 331 332 333
	// Create a default maintenance ticker, but override it requested
	cycle := staticPeerCheckInterval
	if srv.staticCycle != 0 {
		cycle = srv.staticCycle
	}
	tick := time.NewTicker(cycle)

334 335 336 337 338
	for {
		select {
		case <-srv.quit:
			return

339
		case <-tick.C:
340
			// Collect all the non-connected static nodes
341 342
			needed := []*discover.Node{}
			srv.lock.RLock()
343
			for id, node := range srv.staticNodes {
344 345 346 347 348 349 350 351
				if _, ok := srv.peers[id]; !ok {
					needed = append(needed, node)
				}
			}
			srv.lock.RUnlock()

			// Try to dial each of them (don't hang if server terminates)
			for _, node := range needed {
352
				glog.V(logger.Debug).Infof("Dialing static peer %v", node)
353
				select {
354
				case srv.staticDial <- node:
355 356 357 358 359 360 361 362
				case <-srv.quit:
					return
				}
			}
		}
	}
}

363
func (srv *Server) dialLoop() {
F
Felix Lange 已提交
364 365 366 367 368 369
	var (
		dialed      = make(chan *discover.Node)
		dialing     = make(map[discover.NodeID]bool)
		findresults = make(chan []*discover.Node)
		refresh     = time.NewTimer(0)
	)
F
Felix Lange 已提交
370 371 372
	defer srv.loopWG.Done()
	defer refresh.Stop()

373
	// Limit the number of concurrent dials
374
	tokens := maxDialingConns
375 376 377 378 379
	if srv.MaxPendingPeers > 0 {
		tokens = srv.MaxPendingPeers
	}
	slots := make(chan struct{}, tokens)
	for i := 0; i < tokens; i++ {
380 381
		slots <- struct{}{}
	}
F
Felix Lange 已提交
382 383 384 385 386
	dial := func(dest *discover.Node) {
		// Don't dial nodes that would fail the checks in addPeer.
		// This is important because the connection handshake is a lot
		// of work and we'd rather avoid doing that work for peers
		// that can't be added.
387 388 389
		srv.lock.RLock()
		ok, _ := srv.checkPeer(dest.ID)
		srv.lock.RUnlock()
F
Felix Lange 已提交
390 391 392
		if !ok || dialing[dest.ID] {
			return
		}
393 394
		// Request a dial slot to prevent CPU exhaustion
		<-slots
395

F
Felix Lange 已提交
396 397 398 399
		dialing[dest.ID] = true
		srv.peerWG.Add(1)
		go func() {
			srv.dialNode(dest)
400
			slots <- struct{}{}
401
			dialed <- dest
F
Felix Lange 已提交
402 403
		}()
	}
F
Felix Lange 已提交
404

F
Felix Lange 已提交
405
	srv.ntab.Bootstrap(srv.BootstrapNodes)
Z
zelig 已提交
406 407
	for {
		select {
F
Felix Lange 已提交
408
		case <-refresh.C:
409 410
			// Grab some nodes to connect to if we're not at capacity.
			srv.lock.RLock()
411
			needpeers := len(srv.peers) < srv.MaxPeers/2
412
			srv.lock.RUnlock()
F
Felix Lange 已提交
413 414 415 416 417 418
			if needpeers {
				go func() {
					var target discover.NodeID
					rand.Read(target[:])
					findresults <- srv.ntab.Lookup(target)
				}()
F
Felix Lange 已提交
419 420 421 422
			} else {
				// Make sure we check again if the peer count falls
				// below MaxPeers.
				refresh.Reset(refreshPeersInterval)
Z
zelig 已提交
423
			}
424
		case dest := <-srv.staticDial:
F
Felix Lange 已提交
425 426 427 428 429 430
			dial(dest)
		case dests := <-findresults:
			for _, dest := range dests {
				dial(dest)
			}
			refresh.Reset(refreshPeersInterval)
F
Felix Lange 已提交
431 432
		case dest := <-dialed:
			delete(dialing, dest.ID)
F
Felix Lange 已提交
433 434 435 436
			if len(dialing) == 0 {
				// Check again immediately after dialing all current candidates.
				refresh.Reset(0)
			}
F
Felix Lange 已提交
437 438
		case <-srv.quit:
			// TODO: maybe wait for active dials
Z
zelig 已提交
439 440 441 442 443
			return
		}
	}
}

F
Felix Lange 已提交
444
func (srv *Server) dialNode(dest *discover.Node) {
F
Felix Lange 已提交
445
	addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
O
obscuren 已提交
446
	glog.V(logger.Debug).Infof("Dialing %v\n", dest)
447
	conn, err := srv.Dialer.Dial("tcp", addr.String())
Z
zelig 已提交
448
	if err != nil {
449 450 451 452
		// dialLoop adds to the wait group counter when launching
		// dialNode, so we need to count it down again. startPeer also
		// does that when an error occurs.
		srv.peerWG.Done()
O
obscuren 已提交
453
		glog.V(logger.Detail).Infof("dial error: %v", err)
F
Felix Lange 已提交
454
		return
Z
zelig 已提交
455
	}
F
Felix Lange 已提交
456
	srv.startPeer(conn, dest)
Z
zelig 已提交
457 458
}

F
Felix Lange 已提交
459
func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
460
	// TODO: handle/store session token
461 462 463 464 465

	// Run setupFunc, which should create an authenticated connection
	// and run the capability exchange. Note that any early error
	// returns during that exchange need to call peerWG.Done because
	// the callers of startPeer added the peer to the wait group already.
F
Felix Lange 已提交
466
	fd.SetDeadline(time.Now().Add(handshakeTimeout))
467

F
Felix Lange 已提交
468
	conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, srv.keepconn)
F
Felix Lange 已提交
469
	if err != nil {
F
Felix Lange 已提交
470
		fd.Close()
O
obscuren 已提交
471
		glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
472
		srv.peerWG.Done()
F
Felix Lange 已提交
473 474
		return
	}
F
Felix Lange 已提交
475 476 477 478
	conn.MsgReadWriter = &netWrapper{
		wrapped: conn.MsgReadWriter,
		conn:    fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
	}
F
Felix Lange 已提交
479
	p := newPeer(fd, conn, srv.Protocols)
480
	if ok, reason := srv.addPeer(conn, p); !ok {
O
obscuren 已提交
481
		glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
482
		p.politeDisconnect(reason)
483
		srv.peerWG.Done()
F
Felix Lange 已提交
484
		return
Z
zelig 已提交
485
	}
F
Felix Lange 已提交
486 487 488 489
	// The handshakes are done and it passed all checks.
	// Spawn the Peer loops.
	go srv.runPeer(p)
}
490

F
Felix Lange 已提交
491 492 493 494 495 496 497 498 499 500 501 502 503 504 505
// preflight checks whether a connection should be kept. it runs
// after the encryption handshake, as soon as the remote identity is
// known.
func (srv *Server) keepconn(id discover.NodeID) bool {
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	if _, ok := srv.staticNodes[id]; ok {
		return true // static nodes are always allowed
	}
	if _, ok := srv.trustedNodes[id]; ok {
		return true // trusted nodes are always allowed
	}
	return len(srv.peers) < srv.MaxPeers
}

F
Felix Lange 已提交
506
func (srv *Server) runPeer(p *Peer) {
O
obscuren 已提交
507
	glog.V(logger.Debug).Infof("Added %v\n", p)
508
	srvjslog.LogJson(&logger.P2PConnected{
F
Felix Lange 已提交
509 510 511
		RemoteId:            p.ID().String(),
		RemoteAddress:       p.RemoteAddr().String(),
		RemoteVersionString: p.Name(),
512 513
		NumConnections:      srv.PeerCount(),
	})
F
Felix Lange 已提交
514 515 516
	if srv.newPeerHook != nil {
		srv.newPeerHook(p)
	}
517
	discreason := p.run()
F
Felix Lange 已提交
518
	srv.removePeer(p)
O
obscuren 已提交
519
	glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
520
	srvjslog.LogJson(&logger.P2PDisconnected{
F
Felix Lange 已提交
521
		RemoteId:       p.ID().String(),
522 523
		NumConnections: srv.PeerCount(),
	})
Z
zelig 已提交
524 525
}

526 527 528 529 530 531
func (srv *Server) addPeer(conn *conn, p *Peer) (bool, DiscReason) {
	// drop connections with no matching protocols.
	if len(srv.Protocols) > 0 && countMatchingProtocols(srv.Protocols, conn.protoHandshake.Caps) == 0 {
		return false, DiscUselessPeer
	}
	// add the peer if it passes the other checks.
532 533
	srv.lock.Lock()
	defer srv.lock.Unlock()
534
	if ok, reason := srv.checkPeer(conn.ID); !ok {
F
Felix Lange 已提交
535 536
		return false, reason
	}
537
	srv.peers[conn.ID] = p
F
Felix Lange 已提交
538 539 540
	return true, 0
}

541 542
// checkPeer verifies whether a peer looks promising and should be allowed/kept
// in the pool, or if it's of no use.
F
Felix Lange 已提交
543
func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
544
	// First up, figure out if the peer is static or trusted
545
	_, static := srv.staticNodes[id]
546
	trusted := srv.trustedNodes[id]
547 548

	// Make sure the peer passes all required checks
F
Felix Lange 已提交
549 550 551
	switch {
	case !srv.running:
		return false, DiscQuitting
552
	case !static && !trusted && len(srv.peers) >= srv.MaxPeers:
F
Felix Lange 已提交
553 554 555
		return false, DiscTooManyPeers
	case srv.peers[id] != nil:
		return false, DiscAlreadyConnected
556
	case id == srv.ntab.Self().ID:
F
Felix Lange 已提交
557
		return false, DiscSelf
F
Felix Lange 已提交
558 559
	default:
		return true, 0
F
Felix Lange 已提交
560
	}
Z
zelig 已提交
561 562
}

F
Felix Lange 已提交
563 564
func (srv *Server) removePeer(p *Peer) {
	srv.lock.Lock()
F
Felix Lange 已提交
565
	delete(srv.peers, p.ID())
F
Felix Lange 已提交
566 567
	srv.lock.Unlock()
	srv.peerWG.Done()
F
Felix Lange 已提交
568
}