server.go 12.8 KB
Newer Older
Z
zelig 已提交
1 2 3 4
package p2p

import (
	"bytes"
F
Felix Lange 已提交
5
	"crypto/ecdsa"
F
Felix Lange 已提交
6
	"crypto/rand"
7
	"errors"
Z
zelig 已提交
8 9 10 11 12
	"fmt"
	"net"
	"sync"
	"time"

13
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
14
	"github.com/ethereum/go-ethereum/logger/glog"
F
Felix Lange 已提交
15
	"github.com/ethereum/go-ethereum/p2p/discover"
16
	"github.com/ethereum/go-ethereum/p2p/nat"
17
	"github.com/ethereum/go-ethereum/rlp"
Z
zelig 已提交
18 19 20
)

const (
21 22
	defaultDialTimeout   = 10 * time.Second
	refreshPeersInterval = 30 * time.Second
F
Felix Lange 已提交
23

24 25 26 27 28
	// This is the maximum number of inbound connection
	// that are allowed to linger between 'accepted' and
	// 'added as peer'.
	maxAcceptConns = 50

F
Felix Lange 已提交
29 30 31 32 33 34 35 36
	// total timeout for encryption handshake and protocol
	// handshake in both directions.
	handshakeTimeout = 5 * time.Second
	// maximum time allowed for reading a complete message.
	// this is effectively the amount of time a connection can be idle.
	frameReadTimeout = 1 * time.Minute
	// maximum amount of time allowed for writing a complete message.
	frameWriteTimeout = 5 * time.Second
Z
zelig 已提交
37 38
)

39
var srvjslog = logger.NewJsonLogger()
Z
zelig 已提交
40

41 42 43 44 45
// Server manages all peer connections.
//
// The fields of Server are used as configuration parameters.
// You should set them before starting the Server. Fields may not be
// modified while the server is running.
Z
zelig 已提交
46
type Server struct {
F
Felix Lange 已提交
47 48
	// This field must be set to a valid secp256k1 private key.
	PrivateKey *ecdsa.PrivateKey
49 50 51 52 53

	// MaxPeers is the maximum number of peers that can be
	// connected. It must be greater than zero.
	MaxPeers int

F
Felix Lange 已提交
54
	// Name sets the node name of this server.
O
obscuren 已提交
55
	// Use common.MakeName to create a name that follows existing conventions.
F
Felix Lange 已提交
56 57 58 59
	Name string

	// Bootstrap nodes are used to establish connectivity
	// with the rest of the network.
60
	BootstrapNodes []*discover.Node
F
Felix Lange 已提交
61

62 63 64
	// NodeDatabase is the path to the database containing the previously seen
	// live nodes in the network.
	NodeDatabase string
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
	// Protocols should contain the protocols supported
	// by the server. Matching protocols are launched for
	// each peer.
	Protocols []Protocol

	// If ListenAddr is set to a non-nil address, the server
	// will listen for incoming connections.
	//
	// If the port is zero, the operating system will pick a port. The
	// ListenAddr field will be updated with the actual address when
	// the server is started.
	ListenAddr string

	// If set to a non-nil value, the given NAT port mapper
	// is used to make the listening port available to the
	// Internet.
82
	NAT nat.Interface
83 84 85 86 87 88 89 90

	// If Dialer is set to a non-nil value, the given Dialer
	// is used to dial outbound peer connections.
	Dialer *net.Dialer

	// If NoDial is true, the server will not dial any peers.
	NoDial bool

F
Felix Lange 已提交
91
	// Hooks for testing. These are useful because we can inhibit
92
	// the whole protocol stack.
F
Felix Lange 已提交
93
	setupFunc
F
Felix Lange 已提交
94
	newPeerHook
Z
zelig 已提交
95

F
Felix Lange 已提交
96 97
	ourHandshake *protoHandshake

98 99 100
	lock    sync.RWMutex // protects running and peers
	running bool
	peers   map[discover.NodeID]*Peer
F
Felix Lange 已提交
101

102 103
	ntab     *discover.Table
	listener net.Listener
F
Felix Lange 已提交
104 105 106 107 108

	quit        chan struct{}
	loopWG      sync.WaitGroup // {dial,listen,nat}Loop
	peerWG      sync.WaitGroup // active peer goroutines
	peerConnect chan *discover.Node
Z
zelig 已提交
109 110
}

111
type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool) (*conn, error)
F
Felix Lange 已提交
112
type newPeerHook func(*Peer)
Z
zelig 已提交
113

114 115 116 117 118
// Peers returns all connected peers.
func (srv *Server) Peers() (peers []*Peer) {
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	for _, peer := range srv.peers {
Z
zelig 已提交
119 120 121 122 123 124 125
		if peer != nil {
			peers = append(peers, peer)
		}
	}
	return
}

126 127 128
// PeerCount returns the number of connected peers.
func (srv *Server) PeerCount() int {
	srv.lock.RLock()
F
Felix Lange 已提交
129 130 131
	n := len(srv.peers)
	srv.lock.RUnlock()
	return n
Z
zelig 已提交
132 133
}

F
Felix Lange 已提交
134 135
// SuggestPeer creates a connection to the given Node if it
// is not already connected.
136 137
func (srv *Server) SuggestPeer(n *discover.Node) {
	srv.peerConnect <- n
Z
zelig 已提交
138 139
}

140 141
// Broadcast sends an RLP-encoded message to all connected peers.
// This method is deprecated and will be removed later.
142
func (srv *Server) Broadcast(protocol string, code uint64, data interface{}) error {
143 144 145 146 147 148
	return srv.BroadcastLimited(protocol, code, func(i float64) float64 { return i }, data)
}

// BroadcastsRange an RLP-encoded message to a random set of peers using the limit function to limit the amount
// of peers.
func (srv *Server) BroadcastLimited(protocol string, code uint64, limit func(float64) float64, data interface{}) error {
F
Felix Lange 已提交
149 150
	var payload []byte
	if data != nil {
151 152 153 154 155
		var err error
		payload, err = rlp.EncodeToBytes(data)
		if err != nil {
			return err
		}
F
Felix Lange 已提交
156
	}
157 158
	srv.lock.RLock()
	defer srv.lock.RUnlock()
159 160

	i, max := 0, int(limit(float64(len(srv.peers))))
161
	for _, peer := range srv.peers {
162 163 164 165
		if i >= max {
			break
		}

Z
zelig 已提交
166
		if peer != nil {
F
Felix Lange 已提交
167 168 169 170 171
			var msg = Msg{Code: code}
			if data != nil {
				msg.Payload = bytes.NewReader(payload)
				msg.Size = uint32(len(payload))
			}
172
			peer.writeProtoMsg(protocol, msg)
173
			i++
Z
zelig 已提交
174 175
		}
	}
176
	return nil
Z
zelig 已提交
177 178
}

179 180 181 182 183 184 185 186
// Start starts running the server.
// Servers can be re-used and started again after stopping.
func (srv *Server) Start() (err error) {
	srv.lock.Lock()
	defer srv.lock.Unlock()
	if srv.running {
		return errors.New("server already running")
	}
O
obscuren 已提交
187
	glog.V(logger.Info).Infoln("Starting Server")
188

189
	// static fields
F
Felix Lange 已提交
190 191
	if srv.PrivateKey == nil {
		return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
Z
zelig 已提交
192
	}
193 194 195 196
	if srv.MaxPeers <= 0 {
		return fmt.Errorf("Server.MaxPeers must be > 0")
	}
	srv.quit = make(chan struct{})
F
Felix Lange 已提交
197 198
	srv.peers = make(map[discover.NodeID]*Peer)
	srv.peerConnect = make(chan *discover.Node)
F
Felix Lange 已提交
199 200
	if srv.setupFunc == nil {
		srv.setupFunc = setupConn
201
	}
F
Felix Lange 已提交
202

203
	// node table
204
	ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT, srv.NodeDatabase)
F
Felix Lange 已提交
205 206 207
	if err != nil {
		return err
	}
F
Felix Lange 已提交
208 209
	srv.ntab = ntab

210
	// handshake
211
	srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self().ID}
F
Felix Lange 已提交
212 213 214 215
	for _, p := range srv.Protocols {
		srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
	}

216 217 218 219 220 221
	// listen/dial
	if srv.ListenAddr != "" {
		if err := srv.startListening(); err != nil {
			return err
		}
	}
F
Felix Lange 已提交
222 223 224
	if srv.Dialer == nil {
		srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
	}
225
	if !srv.NoDial {
F
Felix Lange 已提交
226
		srv.loopWG.Add(1)
227 228 229
		go srv.dialLoop()
	}
	if srv.NoDial && srv.ListenAddr == "" {
O
obscuren 已提交
230
		glog.V(logger.Warn).Infoln("I will be kind-of useless, neither dialing nor listening.")
231 232 233 234
	}

	srv.running = true
	return nil
Z
zelig 已提交
235 236
}

237 238 239 240 241
func (srv *Server) startListening() error {
	listener, err := net.Listen("tcp", srv.ListenAddr)
	if err != nil {
		return err
	}
242 243
	laddr := listener.Addr().(*net.TCPAddr)
	srv.ListenAddr = laddr.String()
244
	srv.listener = listener
F
Felix Lange 已提交
245
	srv.loopWG.Add(1)
246
	go srv.listenLoop()
247
	if !laddr.IP.IsLoopback() && srv.NAT != nil {
F
Felix Lange 已提交
248
		srv.loopWG.Add(1)
249 250 251 252
		go func() {
			nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
			srv.loopWG.Done()
		}()
253 254 255 256 257 258 259 260 261 262 263
	}
	return nil
}

// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
	srv.lock.Lock()
	if !srv.running {
		srv.lock.Unlock()
		return
Z
zelig 已提交
264
	}
265 266 267
	srv.running = false
	srv.lock.Unlock()

O
obscuren 已提交
268
	glog.V(logger.Info).Infoln("Stopping Server")
F
Felix Lange 已提交
269
	srv.ntab.Close()
270 271 272 273 274
	if srv.listener != nil {
		// this unblocks listener Accept
		srv.listener.Close()
	}
	close(srv.quit)
F
Felix Lange 已提交
275
	srv.loopWG.Wait()
276

F
Felix Lange 已提交
277 278 279
	// No new peers can be added at this point because dialLoop and
	// listenLoop are down. It is safe to call peerWG.Wait because
	// peerWG.Add is not called outside of those loops.
280
	srv.lock.Lock()
F
Felix Lange 已提交
281 282
	for _, peer := range srv.peers {
		peer.Disconnect(DiscQuitting)
283
	}
284
	srv.lock.Unlock()
F
Felix Lange 已提交
285
	srv.peerWG.Wait()
286
}
Z
zelig 已提交
287

288 289
// Self returns the local node's endpoint information.
func (srv *Server) Self() *discover.Node {
290 291 292 293 294
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	if !srv.running {
		return &discover.Node{IP: net.ParseIP("0.0.0.0")}
	}
295 296 297
	return srv.ntab.Self()
}

298 299
// main loop for adding connections via listening
func (srv *Server) listenLoop() {
F
Felix Lange 已提交
300
	defer srv.loopWG.Done()
301 302 303 304 305 306 307 308 309

	// This channel acts as a semaphore limiting
	// active inbound connections that are lingering pre-handshake.
	// If all slots are taken, no further connections are accepted.
	slots := make(chan struct{}, maxAcceptConns)
	for i := 0; i < maxAcceptConns; i++ {
		slots <- struct{}{}
	}

O
obscuren 已提交
310
	glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr())
Z
zelig 已提交
311
	for {
312
		<-slots
F
Felix Lange 已提交
313 314
		conn, err := srv.listener.Accept()
		if err != nil {
315
			return
Z
zelig 已提交
316
		}
O
obscuren 已提交
317
		glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr())
F
Felix Lange 已提交
318
		srv.peerWG.Add(1)
319 320 321 322
		go func() {
			srv.startPeer(conn, nil)
			slots <- struct{}{}
		}()
Z
zelig 已提交
323 324 325
	}
}

326
func (srv *Server) dialLoop() {
F
Felix Lange 已提交
327 328 329 330 331 332
	var (
		dialed      = make(chan *discover.Node)
		dialing     = make(map[discover.NodeID]bool)
		findresults = make(chan []*discover.Node)
		refresh     = time.NewTimer(0)
	)
F
Felix Lange 已提交
333 334 335
	defer srv.loopWG.Done()
	defer refresh.Stop()

F
Felix Lange 已提交
336 337 338 339 340 341
	// TODO: maybe limit number of active dials
	dial := func(dest *discover.Node) {
		// Don't dial nodes that would fail the checks in addPeer.
		// This is important because the connection handshake is a lot
		// of work and we'd rather avoid doing that work for peers
		// that can't be added.
342 343 344
		srv.lock.RLock()
		ok, _ := srv.checkPeer(dest.ID)
		srv.lock.RUnlock()
F
Felix Lange 已提交
345 346 347
		if !ok || dialing[dest.ID] {
			return
		}
348

F
Felix Lange 已提交
349 350 351 352 353 354 355
		dialing[dest.ID] = true
		srv.peerWG.Add(1)
		go func() {
			srv.dialNode(dest)
			dialed <- dest
		}()
	}
F
Felix Lange 已提交
356

F
Felix Lange 已提交
357
	srv.ntab.Bootstrap(srv.BootstrapNodes)
Z
zelig 已提交
358 359
	for {
		select {
F
Felix Lange 已提交
360
		case <-refresh.C:
361 362
			// Grab some nodes to connect to if we're not at capacity.
			srv.lock.RLock()
363
			needpeers := len(srv.peers) < srv.MaxPeers/2
364
			srv.lock.RUnlock()
F
Felix Lange 已提交
365 366 367 368 369 370
			if needpeers {
				go func() {
					var target discover.NodeID
					rand.Read(target[:])
					findresults <- srv.ntab.Lookup(target)
				}()
F
Felix Lange 已提交
371 372 373 374
			} else {
				// Make sure we check again if the peer count falls
				// below MaxPeers.
				refresh.Reset(refreshPeersInterval)
Z
zelig 已提交
375
			}
F
Felix Lange 已提交
376 377 378 379 380 381 382
		case dest := <-srv.peerConnect:
			dial(dest)
		case dests := <-findresults:
			for _, dest := range dests {
				dial(dest)
			}
			refresh.Reset(refreshPeersInterval)
F
Felix Lange 已提交
383 384
		case dest := <-dialed:
			delete(dialing, dest.ID)
F
Felix Lange 已提交
385 386 387 388
			if len(dialing) == 0 {
				// Check again immediately after dialing all current candidates.
				refresh.Reset(0)
			}
F
Felix Lange 已提交
389 390
		case <-srv.quit:
			// TODO: maybe wait for active dials
Z
zelig 已提交
391 392 393 394 395
			return
		}
	}
}

F
Felix Lange 已提交
396
func (srv *Server) dialNode(dest *discover.Node) {
F
Felix Lange 已提交
397
	addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
O
obscuren 已提交
398
	glog.V(logger.Debug).Infof("Dialing %v\n", dest)
399
	conn, err := srv.Dialer.Dial("tcp", addr.String())
Z
zelig 已提交
400
	if err != nil {
401 402 403 404
		// dialLoop adds to the wait group counter when launching
		// dialNode, so we need to count it down again. startPeer also
		// does that when an error occurs.
		srv.peerWG.Done()
O
obscuren 已提交
405
		glog.V(logger.Detail).Infof("dial error: %v", err)
F
Felix Lange 已提交
406
		return
Z
zelig 已提交
407
	}
F
Felix Lange 已提交
408
	srv.startPeer(conn, dest)
Z
zelig 已提交
409 410
}

F
Felix Lange 已提交
411
func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
412
	// TODO: handle/store session token
413 414 415 416 417

	// Run setupFunc, which should create an authenticated connection
	// and run the capability exchange. Note that any early error
	// returns during that exchange need to call peerWG.Done because
	// the callers of startPeer added the peer to the wait group already.
F
Felix Lange 已提交
418
	fd.SetDeadline(time.Now().Add(handshakeTimeout))
419 420 421 422
	srv.lock.RLock()
	atcap := len(srv.peers) == srv.MaxPeers
	srv.lock.RUnlock()
	conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap)
F
Felix Lange 已提交
423
	if err != nil {
F
Felix Lange 已提交
424
		fd.Close()
O
obscuren 已提交
425
		glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err)
426
		srv.peerWG.Done()
F
Felix Lange 已提交
427 428
		return
	}
F
Felix Lange 已提交
429 430 431 432
	conn.MsgReadWriter = &netWrapper{
		wrapped: conn.MsgReadWriter,
		conn:    fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
	}
F
Felix Lange 已提交
433
	p := newPeer(fd, conn, srv.Protocols)
F
Felix Lange 已提交
434
	if ok, reason := srv.addPeer(conn.ID, p); !ok {
O
obscuren 已提交
435
		glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason)
436
		p.politeDisconnect(reason)
437
		srv.peerWG.Done()
F
Felix Lange 已提交
438
		return
Z
zelig 已提交
439
	}
F
Felix Lange 已提交
440 441 442 443
	// The handshakes are done and it passed all checks.
	// Spawn the Peer loops.
	go srv.runPeer(p)
}
444

F
Felix Lange 已提交
445
func (srv *Server) runPeer(p *Peer) {
O
obscuren 已提交
446
	glog.V(logger.Debug).Infof("Added %v\n", p)
447
	srvjslog.LogJson(&logger.P2PConnected{
F
Felix Lange 已提交
448 449 450
		RemoteId:            p.ID().String(),
		RemoteAddress:       p.RemoteAddr().String(),
		RemoteVersionString: p.Name(),
451 452
		NumConnections:      srv.PeerCount(),
	})
F
Felix Lange 已提交
453 454 455
	if srv.newPeerHook != nil {
		srv.newPeerHook(p)
	}
456
	discreason := p.run()
F
Felix Lange 已提交
457
	srv.removePeer(p)
O
obscuren 已提交
458
	glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason)
459
	srvjslog.LogJson(&logger.P2PDisconnected{
F
Felix Lange 已提交
460
		RemoteId:       p.ID().String(),
461 462
		NumConnections: srv.PeerCount(),
	})
Z
zelig 已提交
463 464
}

F
Felix Lange 已提交
465
func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
466 467
	srv.lock.Lock()
	defer srv.lock.Unlock()
F
Felix Lange 已提交
468 469 470 471 472 473 474 475
	if ok, reason := srv.checkPeer(id); !ok {
		return false, reason
	}
	srv.peers[id] = p
	return true, 0
}

func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) {
F
Felix Lange 已提交
476 477 478 479 480 481 482
	switch {
	case !srv.running:
		return false, DiscQuitting
	case len(srv.peers) >= srv.MaxPeers:
		return false, DiscTooManyPeers
	case srv.peers[id] != nil:
		return false, DiscAlreadyConnected
483
	case id == srv.ntab.Self().ID:
F
Felix Lange 已提交
484
		return false, DiscSelf
F
Felix Lange 已提交
485 486
	default:
		return true, 0
F
Felix Lange 已提交
487
	}
Z
zelig 已提交
488 489
}

F
Felix Lange 已提交
490 491
func (srv *Server) removePeer(p *Peer) {
	srv.lock.Lock()
F
Felix Lange 已提交
492
	delete(srv.peers, p.ID())
F
Felix Lange 已提交
493 494
	srv.lock.Unlock()
	srv.peerWG.Done()
F
Felix Lange 已提交
495
}