server.go 10.6 KB
Newer Older
Z
zelig 已提交
1 2 3 4
package p2p

import (
	"bytes"
F
Felix Lange 已提交
5
	"crypto/ecdsa"
6
	"errors"
Z
zelig 已提交
7 8
	"fmt"
	"net"
F
Felix Lange 已提交
9
	"runtime"
Z
zelig 已提交
10 11 12
	"sync"
	"time"

13
	"github.com/ethereum/go-ethereum/ethutil"
14
	"github.com/ethereum/go-ethereum/logger"
F
Felix Lange 已提交
15
	"github.com/ethereum/go-ethereum/p2p/discover"
16
	"github.com/ethereum/go-ethereum/p2p/nat"
Z
zelig 已提交
17 18 19
)

const (
20 21
	defaultDialTimeout   = 10 * time.Second
	refreshPeersInterval = 30 * time.Second
F
Felix Lange 已提交
22 23 24 25 26 27 28 29 30

	// total timeout for encryption handshake and protocol
	// handshake in both directions.
	handshakeTimeout = 5 * time.Second
	// maximum time allowed for reading a complete message.
	// this is effectively the amount of time a connection can be idle.
	frameReadTimeout = 1 * time.Minute
	// maximum amount of time allowed for writing a complete message.
	frameWriteTimeout = 5 * time.Second
Z
zelig 已提交
31 32
)

33
var srvlog = logger.NewLogger("P2P Server")
34
var srvjslog = logger.NewJsonLogger()
Z
zelig 已提交
35

F
Felix Lange 已提交
36 37 38 39 40 41 42
// MakeName creates a node name that follows the ethereum convention
// for such names. It adds the operation system name and Go runtime version
// the name.
func MakeName(name, version string) string {
	return fmt.Sprintf("%s/v%s/%s/%s", name, version, runtime.GOOS, runtime.Version())
}

43 44 45 46 47
// Server manages all peer connections.
//
// The fields of Server are used as configuration parameters.
// You should set them before starting the Server. Fields may not be
// modified while the server is running.
Z
zelig 已提交
48
type Server struct {
F
Felix Lange 已提交
49 50
	// This field must be set to a valid secp256k1 private key.
	PrivateKey *ecdsa.PrivateKey
51 52 53 54 55

	// MaxPeers is the maximum number of peers that can be
	// connected. It must be greater than zero.
	MaxPeers int

F
Felix Lange 已提交
56 57 58 59 60 61
	// Name sets the node name of this server.
	// Use MakeName to create a name that follows existing conventions.
	Name string

	// Bootstrap nodes are used to establish connectivity
	// with the rest of the network.
62
	BootstrapNodes []*discover.Node
F
Felix Lange 已提交
63

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
	// Protocols should contain the protocols supported
	// by the server. Matching protocols are launched for
	// each peer.
	Protocols []Protocol

	// If ListenAddr is set to a non-nil address, the server
	// will listen for incoming connections.
	//
	// If the port is zero, the operating system will pick a port. The
	// ListenAddr field will be updated with the actual address when
	// the server is started.
	ListenAddr string

	// If set to a non-nil value, the given NAT port mapper
	// is used to make the listening port available to the
	// Internet.
80
	NAT nat.Interface
81 82 83 84 85 86 87 88

	// If Dialer is set to a non-nil value, the given Dialer
	// is used to dial outbound peer connections.
	Dialer *net.Dialer

	// If NoDial is true, the server will not dial any peers.
	NoDial bool

F
Felix Lange 已提交
89
	// Hooks for testing. These are useful because we can inhibit
90
	// the whole protocol stack.
F
Felix Lange 已提交
91
	setupFunc
F
Felix Lange 已提交
92
	newPeerHook
Z
zelig 已提交
93

F
Felix Lange 已提交
94 95
	ourHandshake *protoHandshake

F
Felix Lange 已提交
96 97 98 99 100 101 102 103 104 105 106
	lock     sync.RWMutex
	running  bool
	listener net.Listener
	peers    map[discover.NodeID]*Peer

	ntab *discover.Table

	quit        chan struct{}
	loopWG      sync.WaitGroup // {dial,listen,nat}Loop
	peerWG      sync.WaitGroup // active peer goroutines
	peerConnect chan *discover.Node
Z
zelig 已提交
107 108
}

F
Felix Lange 已提交
109
type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error)
F
Felix Lange 已提交
110
type newPeerHook func(*Peer)
Z
zelig 已提交
111

112 113 114 115 116
// Peers returns all connected peers.
func (srv *Server) Peers() (peers []*Peer) {
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	for _, peer := range srv.peers {
Z
zelig 已提交
117 118 119 120 121 122 123
		if peer != nil {
			peers = append(peers, peer)
		}
	}
	return
}

124 125 126
// PeerCount returns the number of connected peers.
func (srv *Server) PeerCount() int {
	srv.lock.RLock()
F
Felix Lange 已提交
127 128 129
	n := len(srv.peers)
	srv.lock.RUnlock()
	return n
Z
zelig 已提交
130 131
}

F
Felix Lange 已提交
132 133
// SuggestPeer creates a connection to the given Node if it
// is not already connected.
134 135
func (srv *Server) SuggestPeer(n *discover.Node) {
	srv.peerConnect <- n
Z
zelig 已提交
136 137
}

138 139 140
// Broadcast sends an RLP-encoded message to all connected peers.
// This method is deprecated and will be removed later.
func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
F
Felix Lange 已提交
141 142
	var payload []byte
	if data != nil {
143
		payload = ethutil.Encode(data)
F
Felix Lange 已提交
144
	}
145 146 147
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	for _, peer := range srv.peers {
Z
zelig 已提交
148
		if peer != nil {
F
Felix Lange 已提交
149 150 151 152 153
			var msg = Msg{Code: code}
			if data != nil {
				msg.Payload = bytes.NewReader(payload)
				msg.Size = uint32(len(payload))
			}
154
			peer.writeProtoMsg(protocol, msg)
Z
zelig 已提交
155 156 157 158
		}
	}
}

159 160 161 162 163 164 165 166 167 168
// Start starts running the server.
// Servers can be re-used and started again after stopping.
func (srv *Server) Start() (err error) {
	srv.lock.Lock()
	defer srv.lock.Unlock()
	if srv.running {
		return errors.New("server already running")
	}
	srvlog.Infoln("Starting Server")

169
	// static fields
F
Felix Lange 已提交
170 171
	if srv.PrivateKey == nil {
		return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
Z
zelig 已提交
172
	}
173 174 175 176
	if srv.MaxPeers <= 0 {
		return fmt.Errorf("Server.MaxPeers must be > 0")
	}
	srv.quit = make(chan struct{})
F
Felix Lange 已提交
177 178
	srv.peers = make(map[discover.NodeID]*Peer)
	srv.peerConnect = make(chan *discover.Node)
F
Felix Lange 已提交
179 180
	if srv.setupFunc == nil {
		srv.setupFunc = setupConn
181
	}
F
Felix Lange 已提交
182

183
	// node table
F
Felix Lange 已提交
184
	ntab, err := discover.ListenUDP(srv.PrivateKey, srv.ListenAddr, srv.NAT)
F
Felix Lange 已提交
185 186 187
	if err != nil {
		return err
	}
F
Felix Lange 已提交
188 189
	srv.ntab = ntab

190
	// handshake
F
Felix Lange 已提交
191 192 193 194 195
	srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: ntab.Self()}
	for _, p := range srv.Protocols {
		srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
	}

196 197 198 199 200 201
	// listen/dial
	if srv.ListenAddr != "" {
		if err := srv.startListening(); err != nil {
			return err
		}
	}
F
Felix Lange 已提交
202 203 204
	if srv.Dialer == nil {
		srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
	}
205
	if !srv.NoDial {
F
Felix Lange 已提交
206
		srv.loopWG.Add(1)
207 208 209 210 211 212 213 214
		go srv.dialLoop()
	}
	if srv.NoDial && srv.ListenAddr == "" {
		srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
	}

	srv.running = true
	return nil
Z
zelig 已提交
215 216
}

217 218 219 220 221
func (srv *Server) startListening() error {
	listener, err := net.Listen("tcp", srv.ListenAddr)
	if err != nil {
		return err
	}
222 223
	laddr := listener.Addr().(*net.TCPAddr)
	srv.ListenAddr = laddr.String()
224
	srv.listener = listener
F
Felix Lange 已提交
225
	srv.loopWG.Add(1)
226
	go srv.listenLoop()
227
	if !laddr.IP.IsLoopback() && srv.NAT != nil {
F
Felix Lange 已提交
228
		srv.loopWG.Add(1)
229 230 231 232
		go func() {
			nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
			srv.loopWG.Done()
		}()
233 234 235 236 237 238 239 240 241 242 243
	}
	return nil
}

// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
	srv.lock.Lock()
	if !srv.running {
		srv.lock.Unlock()
		return
Z
zelig 已提交
244
	}
245 246 247
	srv.running = false
	srv.lock.Unlock()

F
Felix Lange 已提交
248 249
	srvlog.Infoln("Stopping Server")
	srv.ntab.Close()
250 251 252 253 254
	if srv.listener != nil {
		// this unblocks listener Accept
		srv.listener.Close()
	}
	close(srv.quit)
F
Felix Lange 已提交
255
	srv.loopWG.Wait()
256

F
Felix Lange 已提交
257 258 259 260 261
	// No new peers can be added at this point because dialLoop and
	// listenLoop are down. It is safe to call peerWG.Wait because
	// peerWG.Add is not called outside of those loops.
	for _, peer := range srv.peers {
		peer.Disconnect(DiscQuitting)
262
	}
F
Felix Lange 已提交
263
	srv.peerWG.Wait()
264
}
Z
zelig 已提交
265

266 267
// main loop for adding connections via listening
func (srv *Server) listenLoop() {
F
Felix Lange 已提交
268
	defer srv.loopWG.Done()
269
	srvlog.Infoln("Listening on", srv.listener.Addr())
Z
zelig 已提交
270
	for {
F
Felix Lange 已提交
271 272
		conn, err := srv.listener.Accept()
		if err != nil {
273
			return
Z
zelig 已提交
274
		}
F
Felix Lange 已提交
275 276 277
		srvlog.Debugf("Accepted conn %v\n", conn.RemoteAddr())
		srv.peerWG.Add(1)
		go srv.startPeer(conn, nil)
Z
zelig 已提交
278 279 280
	}
}

281
func (srv *Server) dialLoop() {
F
Felix Lange 已提交
282 283 284 285 286 287 288 289 290 291 292 293 294 295
	defer srv.loopWG.Done()
	refresh := time.NewTicker(refreshPeersInterval)
	defer refresh.Stop()

	srv.ntab.Bootstrap(srv.BootstrapNodes)
	go srv.findPeers()

	dialed := make(chan *discover.Node)
	dialing := make(map[discover.NodeID]bool)

	// TODO: limit number of active dials
	// TODO: ensure only one findPeers goroutine is running
	// TODO: pause findPeers when we're at capacity

Z
zelig 已提交
296 297
	for {
		select {
F
Felix Lange 已提交
298
		case <-refresh.C:
299

F
Felix Lange 已提交
300 301 302
			go srv.findPeers()

		case dest := <-srv.peerConnect:
F
Felix Lange 已提交
303 304 305
			// avoid dialing nodes that are already connected.
			// there is another check for this in addPeer,
			// which runs after the handshake.
F
Felix Lange 已提交
306 307 308
			srv.lock.Lock()
			_, isconnected := srv.peers[dest.ID]
			srv.lock.Unlock()
F
Felix Lange 已提交
309
			if isconnected || dialing[dest.ID] || dest.ID == srv.ntab.Self() {
F
Felix Lange 已提交
310
				continue
Z
zelig 已提交
311
			}
F
Felix Lange 已提交
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326

			dialing[dest.ID] = true
			srv.peerWG.Add(1)
			go func() {
				srv.dialNode(dest)
				// at this point, the peer has been added
				// or discarded. either way, we're not dialing it anymore.
				dialed <- dest
			}()

		case dest := <-dialed:
			delete(dialing, dest.ID)

		case <-srv.quit:
			// TODO: maybe wait for active dials
Z
zelig 已提交
327 328 329 330 331
			return
		}
	}
}

F
Felix Lange 已提交
332
func (srv *Server) dialNode(dest *discover.Node) {
333 334 335
	addr := &net.TCPAddr{IP: dest.IP, Port: dest.TCPPort}
	srvlog.Debugf("Dialing %v\n", dest)
	conn, err := srv.Dialer.Dial("tcp", addr.String())
Z
zelig 已提交
336
	if err != nil {
337
		srvlog.DebugDetailf("dial error: %v", err)
F
Felix Lange 已提交
338
		return
Z
zelig 已提交
339
	}
F
Felix Lange 已提交
340
	srv.startPeer(conn, dest)
Z
zelig 已提交
341 342
}

F
Felix Lange 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
func (srv *Server) findPeers() {
	far := srv.ntab.Self()
	for i := range far {
		far[i] = ^far[i]
	}
	closeToSelf := srv.ntab.Lookup(srv.ntab.Self())
	farFromSelf := srv.ntab.Lookup(far)

	for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ {
		if i < len(closeToSelf) {
			srv.peerConnect <- closeToSelf[i]
		}
		if i < len(farFromSelf) {
			srv.peerConnect <- farFromSelf[i]
		}
	}
}

F
Felix Lange 已提交
361
func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) {
362
	// TODO: handle/store session token
F
Felix Lange 已提交
363
	fd.SetDeadline(time.Now().Add(handshakeTimeout))
F
Felix Lange 已提交
364
	conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest)
F
Felix Lange 已提交
365
	if err != nil {
F
Felix Lange 已提交
366 367
		fd.Close()
		srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err)
F
Felix Lange 已提交
368 369
		return
	}
F
Felix Lange 已提交
370 371 372 373 374

	conn.MsgReadWriter = &netWrapper{
		wrapped: conn.MsgReadWriter,
		conn:    fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout,
	}
F
Felix Lange 已提交
375
	p := newPeer(fd, conn, srv.Protocols)
F
Felix Lange 已提交
376
	if ok, reason := srv.addPeer(conn.ID, p); !ok {
377 378
		srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason)
		p.politeDisconnect(reason)
F
Felix Lange 已提交
379
		return
Z
zelig 已提交
380
	}
381

382
	srvlog.Debugf("Added %v\n", p)
383 384
	srvjslog.LogJson(&logger.P2PConnected{
		RemoteId:            fmt.Sprintf("%x", conn.ID[:]),
F
Felix Lange 已提交
385
		RemoteAddress:       fd.RemoteAddr().String(),
386 387 388
		RemoteVersionString: conn.Name,
		NumConnections:      srv.PeerCount(),
	})
F
Felix Lange 已提交
389

F
Felix Lange 已提交
390 391 392
	if srv.newPeerHook != nil {
		srv.newPeerHook(p)
	}
393
	discreason := p.run()
F
Felix Lange 已提交
394
	srv.removePeer(p)
395

396
	srvlog.Debugf("Removed %v (%v)\n", p, discreason)
397 398 399 400
	srvjslog.LogJson(&logger.P2PDisconnected{
		RemoteId:       fmt.Sprintf("%x", conn.ID[:]),
		NumConnections: srv.PeerCount(),
	})
Z
zelig 已提交
401 402
}

F
Felix Lange 已提交
403
func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) {
404 405
	srv.lock.Lock()
	defer srv.lock.Unlock()
F
Felix Lange 已提交
406 407 408 409 410 411 412 413 414 415 416 417
	switch {
	case !srv.running:
		return false, DiscQuitting
	case len(srv.peers) >= srv.MaxPeers:
		return false, DiscTooManyPeers
	case srv.peers[id] != nil:
		return false, DiscAlreadyConnected
	case id == srv.ntab.Self():
		return false, DiscSelf
	}
	srv.peers[id] = p
	return true, 0
Z
zelig 已提交
418 419
}

F
Felix Lange 已提交
420 421
func (srv *Server) removePeer(p *Peer) {
	srv.lock.Lock()
F
Felix Lange 已提交
422
	delete(srv.peers, p.ID())
F
Felix Lange 已提交
423 424
	srv.lock.Unlock()
	srv.peerWG.Done()
F
Felix Lange 已提交
425
}