server.go 11.3 KB
Newer Older
Z
zelig 已提交
1 2 3 4
package p2p

import (
	"bytes"
5
	"errors"
Z
zelig 已提交
6 7 8 9 10
	"fmt"
	"net"
	"sync"
	"time"

11
	"github.com/ethereum/go-ethereum/logger"
Z
zelig 已提交
12 13 14
)

const (
15 16 17 18
	outboundAddressPoolSize   = 500
	defaultDialTimeout        = 10 * time.Second
	portMappingUpdateInterval = 15 * time.Minute
	portMappingTimeout        = 20 * time.Minute
Z
zelig 已提交
19 20
)

21
var srvlog = logger.NewLogger("P2P Server")
Z
zelig 已提交
22

23 24 25 26 27
// Server manages all peer connections.
//
// The fields of Server are used as configuration parameters.
// You should set them before starting the Server. Fields may not be
// modified while the server is running.
Z
zelig 已提交
28
type Server struct {
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
	// This field must be set to a valid client identity.
	Identity ClientIdentity

	// MaxPeers is the maximum number of peers that can be
	// connected. It must be greater than zero.
	MaxPeers int

	// Protocols should contain the protocols supported
	// by the server. Matching protocols are launched for
	// each peer.
	Protocols []Protocol

	// If Blacklist is set to a non-nil value, the given Blacklist
	// is used to verify peer connections.
	Blacklist Blacklist

	// If ListenAddr is set to a non-nil address, the server
	// will listen for incoming connections.
	//
	// If the port is zero, the operating system will pick a port. The
	// ListenAddr field will be updated with the actual address when
	// the server is started.
	ListenAddr string

	// If set to a non-nil value, the given NAT port mapper
	// is used to make the listening port available to the
	// Internet.
	NAT NAT

	// If Dialer is set to a non-nil value, the given Dialer
	// is used to dial outbound peer connections.
	Dialer *net.Dialer

	// If NoDial is true, the server will not dial any peers.
	NoDial bool

	// Hook for testing. This is useful because we can inhibit
	// the whole protocol stack.
	newPeerFunc peerFunc
Z
zelig 已提交
68

69 70 71 72 73 74 75 76 77 78 79 80
	lock      sync.RWMutex
	running   bool
	listener  net.Listener
	laddr     *net.TCPAddr // real listen addr
	peers     []*Peer
	peerSlots chan int
	peerCount int

	quit           chan struct{}
	wg             sync.WaitGroup
	peerConnect    chan *peerAddr
	peerDisconnect chan *Peer
Z
zelig 已提交
81 82
}

83 84 85 86 87
// NAT is implemented by NAT traversal methods.
type NAT interface {
	GetExternalAddress() (net.IP, error)
	AddPortMapping(protocol string, extport, intport int, name string, lifetime time.Duration) error
	DeletePortMapping(protocol string, extport, intport int) error
Z
zelig 已提交
88

89 90
	// Should return name of the method.
	String() string
Z
zelig 已提交
91 92
}

93
type peerFunc func(srv *Server, c net.Conn, dialAddr *peerAddr) *Peer
Z
zelig 已提交
94

95 96 97 98 99
// Peers returns all connected peers.
func (srv *Server) Peers() (peers []*Peer) {
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	for _, peer := range srv.peers {
Z
zelig 已提交
100 101 102 103 104 105 106
		if peer != nil {
			peers = append(peers, peer)
		}
	}
	return
}

107 108 109 110 111
// PeerCount returns the number of connected peers.
func (srv *Server) PeerCount() int {
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	return srv.peerCount
Z
zelig 已提交
112 113
}

114 115
// SuggestPeer injects an address into the outbound address pool.
func (srv *Server) SuggestPeer(ip net.IP, port int, nodeID []byte) {
O
Merge  
obscuren 已提交
116
	addr := &peerAddr{ip, uint64(port), nodeID}
Z
zelig 已提交
117
	select {
O
Merge  
obscuren 已提交
118
	case srv.peerConnect <- addr:
119
	default: // don't block
O
Merge  
obscuren 已提交
120
		srvlog.Warnf("peer suggestion %v ignored", addr)
Z
zelig 已提交
121 122 123
	}
}

124 125 126
// Broadcast sends an RLP-encoded message to all connected peers.
// This method is deprecated and will be removed later.
func (srv *Server) Broadcast(protocol string, code uint64, data ...interface{}) {
F
Felix Lange 已提交
127 128 129 130
	var payload []byte
	if data != nil {
		payload = encodePayload(data...)
	}
131 132 133
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	for _, peer := range srv.peers {
Z
zelig 已提交
134
		if peer != nil {
F
Felix Lange 已提交
135 136 137 138 139
			var msg = Msg{Code: code}
			if data != nil {
				msg.Payload = bytes.NewReader(payload)
				msg.Size = uint32(len(payload))
			}
140
			peer.writeProtoMsg(protocol, msg)
Z
zelig 已提交
141 142 143 144
		}
	}
}

145 146 147 148 149 150 151 152 153 154 155 156 157
// Start starts running the server.
// Servers can be re-used and started again after stopping.
func (srv *Server) Start() (err error) {
	srv.lock.Lock()
	defer srv.lock.Unlock()
	if srv.running {
		return errors.New("server already running")
	}
	srvlog.Infoln("Starting Server")

	// initialize fields
	if srv.Identity == nil {
		return fmt.Errorf("Server.Identity must be set to a non-nil identity")
Z
zelig 已提交
158
	}
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
	if srv.MaxPeers <= 0 {
		return fmt.Errorf("Server.MaxPeers must be > 0")
	}
	srv.quit = make(chan struct{})
	srv.peers = make([]*Peer, srv.MaxPeers)
	srv.peerSlots = make(chan int, srv.MaxPeers)
	srv.peerConnect = make(chan *peerAddr, outboundAddressPoolSize)
	srv.peerDisconnect = make(chan *Peer)
	if srv.newPeerFunc == nil {
		srv.newPeerFunc = newServerPeer
	}
	if srv.Blacklist == nil {
		srv.Blacklist = NewBlacklist()
	}
	if srv.Dialer == nil {
		srv.Dialer = &net.Dialer{Timeout: defaultDialTimeout}
	}

	if srv.ListenAddr != "" {
		if err := srv.startListening(); err != nil {
			return err
Z
zelig 已提交
180 181
		}
	}
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
	if !srv.NoDial {
		srv.wg.Add(1)
		go srv.dialLoop()
	}
	if srv.NoDial && srv.ListenAddr == "" {
		srvlog.Warnln("I will be kind-of useless, neither dialing nor listening.")
	}

	// make all slots available
	for i := range srv.peers {
		srv.peerSlots <- i
	}
	// note: discLoop is not part of WaitGroup
	go srv.discLoop()
	srv.running = true
	return nil
Z
zelig 已提交
198 199
}

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
func (srv *Server) startListening() error {
	listener, err := net.Listen("tcp", srv.ListenAddr)
	if err != nil {
		return err
	}
	srv.ListenAddr = listener.Addr().String()
	srv.laddr = listener.Addr().(*net.TCPAddr)
	srv.listener = listener
	srv.wg.Add(1)
	go srv.listenLoop()
	if !srv.laddr.IP.IsLoopback() && srv.NAT != nil {
		srv.wg.Add(1)
		go srv.natLoop(srv.laddr.Port)
	}
	return nil
}

// Stop terminates the server and all active peer connections.
// It blocks until all active connections have been closed.
func (srv *Server) Stop() {
	srv.lock.Lock()
	if !srv.running {
		srv.lock.Unlock()
		return
Z
zelig 已提交
224
	}
225 226 227 228 229 230 231 232 233 234 235
	srv.running = false
	srv.lock.Unlock()

	srvlog.Infoln("Stopping server")
	if srv.listener != nil {
		// this unblocks listener Accept
		srv.listener.Close()
	}
	close(srv.quit)
	for _, peer := range srv.Peers() {
		peer.Disconnect(DiscQuitting)
Z
zelig 已提交
236
	}
237 238
	srv.wg.Wait()

Z
zelig 已提交
239
	// wait till they actually disconnect
240 241 242 243 244 245 246 247 248 249 250
	// this is checked by claiming all peerSlots.
	// slots become available as the peers disconnect.
	for i := 0; i < cap(srv.peerSlots); i++ {
		<-srv.peerSlots
	}
	// terminate discLoop
	close(srv.peerDisconnect)
}

func (srv *Server) discLoop() {
	for peer := range srv.peerDisconnect {
251
		srv.removePeer(peer)
252 253
	}
}
Z
zelig 已提交
254

255 256 257 258 259
// main loop for adding connections via listening
func (srv *Server) listenLoop() {
	defer srv.wg.Done()

	srvlog.Infoln("Listening on", srv.listener.Addr())
Z
zelig 已提交
260 261
	for {
		select {
262
		case slot := <-srv.peerSlots:
O
Merge  
obscuren 已提交
263
			srvlog.Debugf("grabbed slot %v for listening", slot)
264 265 266 267
			conn, err := srv.listener.Accept()
			if err != nil {
				srv.peerSlots <- slot
				return
Z
zelig 已提交
268
			}
269 270 271 272
			srvlog.Debugf("Accepted conn %v (slot %d)\n", conn.RemoteAddr(), slot)
			srv.addPeer(conn, nil, slot)
		case <-srv.quit:
			return
Z
zelig 已提交
273 274 275 276
		}
	}
}

277 278
func (srv *Server) natLoop(port int) {
	defer srv.wg.Done()
Z
zelig 已提交
279
	for {
280
		srv.updatePortMapping(port)
Z
zelig 已提交
281
		select {
282 283 284 285
		case <-time.After(portMappingUpdateInterval):
			// one more round
		case <-srv.quit:
			srv.removePortMapping(port)
Z
zelig 已提交
286 287 288 289 290
			return
		}
	}
}

291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
func (srv *Server) updatePortMapping(port int) {
	srvlog.Infoln("Attempting to map port", port, "with", srv.NAT)
	err := srv.NAT.AddPortMapping("tcp", port, port, "ethereum p2p", portMappingTimeout)
	if err != nil {
		srvlog.Errorln("Port mapping error:", err)
		return
	}
	extip, err := srv.NAT.GetExternalAddress()
	if err != nil {
		srvlog.Errorln("Error getting external IP:", err)
		return
	}
	srv.lock.Lock()
	extaddr := *(srv.listener.Addr().(*net.TCPAddr))
	extaddr.IP = extip
	srvlog.Infoln("Mapped port, external addr is", &extaddr)
	srv.laddr = &extaddr
	srv.lock.Unlock()
}

func (srv *Server) removePortMapping(port int) {
	srvlog.Infoln("Removing port mapping for", port, "with", srv.NAT)
	srv.NAT.DeletePortMapping("tcp", port, port)
}

func (srv *Server) dialLoop() {
	defer srv.wg.Done()
	var (
		suggest chan *peerAddr
		slot    *int
		slots   = srv.peerSlots
	)
Z
zelig 已提交
323 324 325 326 327 328
	for {
		select {
		case i := <-slots:
			// we need a peer in slot i, slot reserved
			slot = &i
			// now we can watch for candidate peers in the next loop
329
			suggest = srv.peerConnect
Z
zelig 已提交
330 331
			// do not consume more until candidate peer is found
			slots = nil
332 333

		case desc := <-suggest:
Z
zelig 已提交
334 335
			// candidate peer found, will dial out asyncronously
			// if connection fails slot will be released
O
Merge  
obscuren 已提交
336
			srvlog.Infof("dial %v (%v)", desc, *slot)
337
			go srv.dialPeer(desc, *slot)
Z
zelig 已提交
338
			// we can watch if more peers needed in the next loop
339
			slots = srv.peerSlots
Z
zelig 已提交
340
			// until then we dont care about candidate peers
341 342 343 344 345 346
			suggest = nil

		case <-srv.quit:
			// give back the currently reserved slot
			if slot != nil {
				srv.peerSlots <- *slot
Z
zelig 已提交
347 348 349 350 351 352 353
			}
			return
		}
	}
}

// connect to peer via dial out
354 355 356
func (srv *Server) dialPeer(desc *peerAddr, slot int) {
	srvlog.Debugf("Dialing %v (slot %d)\n", desc, slot)
	conn, err := srv.Dialer.Dial(desc.Network(), desc.String())
Z
zelig 已提交
357
	if err != nil {
358 359
		srvlog.Errorf("Dial error: %v", err)
		srv.peerSlots <- slot
F
Felix Lange 已提交
360
		return
Z
zelig 已提交
361
	}
362
	go srv.addPeer(conn, desc, slot)
Z
zelig 已提交
363 364 365
}

// creates the new peer object and inserts it into its slot
366 367 368 369 370 371
func (srv *Server) addPeer(conn net.Conn, desc *peerAddr, slot int) *Peer {
	srv.lock.Lock()
	defer srv.lock.Unlock()
	if !srv.running {
		conn.Close()
		srv.peerSlots <- slot // release slot
F
Felix Lange 已提交
372
		return nil
Z
zelig 已提交
373
	}
374 375 376 377 378
	peer := srv.newPeerFunc(srv, conn, desc)
	peer.slot = slot
	srv.peers[slot] = peer
	srv.peerCount++
	go func() { peer.loop(); srv.peerDisconnect <- peer }()
F
Felix Lange 已提交
379
	return peer
Z
zelig 已提交
380 381 382
}

// removes peer: sending disconnect msg, stop peer, remove rom list/table, release slot
383 384 385
func (srv *Server) removePeer(peer *Peer) {
	srv.lock.Lock()
	defer srv.lock.Unlock()
386
	srvlog.Debugf("Removing %v (slot %v)\n", peer, peer.slot)
387 388
	if srv.peers[peer.slot] != peer {
		srvlog.Warnln("Invalid peer to remove:", peer)
Z
zelig 已提交
389 390 391
		return
	}
	// remove from list and index
392 393
	srv.peerCount--
	srv.peers[peer.slot] = nil
Z
zelig 已提交
394
	// release slot to signal need for a new peer, last!
395
	srv.peerSlots <- peer.slot
Z
zelig 已提交
396 397
}

398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
func (srv *Server) verifyPeer(addr *peerAddr) error {
	if srv.Blacklist.Exists(addr.Pubkey) {
		return errors.New("blacklisted")
	}
	if bytes.Equal(srv.Identity.Pubkey()[1:], addr.Pubkey) {
		return newPeerError(errPubkeyForbidden, "not allowed to connect to srv")
	}
	srv.lock.RLock()
	defer srv.lock.RUnlock()
	for _, peer := range srv.peers {
		if peer != nil {
			id := peer.Identity()
			if id != nil && bytes.Equal(id.Pubkey(), addr.Pubkey) {
				return errors.New("already connected")
			}
F
Felix Lange 已提交
413 414
		}
	}
415
	return nil
F
Felix Lange 已提交
416 417
}

418
// TODO replace with "Set"
419 420 421 422 423 424 425 426 427 428
type Blacklist interface {
	Get([]byte) (bool, error)
	Put([]byte) error
	Delete([]byte) error
	Exists(pubkey []byte) (ok bool)
}

type BlacklistMap struct {
	blacklist map[string]bool
	lock      sync.RWMutex
Z
zelig 已提交
429 430
}

431 432 433
func NewBlacklist() *BlacklistMap {
	return &BlacklistMap{
		blacklist: make(map[string]bool),
Z
zelig 已提交
434
	}
435
}
Z
zelig 已提交
436

437 438 439 440 441 442 443
func (self *BlacklistMap) Get(pubkey []byte) (bool, error) {
	self.lock.RLock()
	defer self.lock.RUnlock()
	v, ok := self.blacklist[string(pubkey)]
	var err error
	if !ok {
		err = fmt.Errorf("not found")
Z
zelig 已提交
444
	}
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465
	return v, err
}

func (self *BlacklistMap) Exists(pubkey []byte) (ok bool) {
	self.lock.RLock()
	defer self.lock.RUnlock()
	_, ok = self.blacklist[string(pubkey)]
	return
}

func (self *BlacklistMap) Put(pubkey []byte) error {
	self.lock.RLock()
	defer self.lock.RUnlock()
	self.blacklist[string(pubkey)] = true
	return nil
}

func (self *BlacklistMap) Delete(pubkey []byte) error {
	self.lock.RLock()
	defer self.lock.RUnlock()
	delete(self.blacklist, string(pubkey))
Z
zelig 已提交
466 467
	return nil
}