handler.go 13.0 KB
Newer Older
1 2 3 4
package eth

import (
	"fmt"
5
	"math"
6
	"sync"
7
	"time"
8

9
	"github.com/ethereum/go-ethereum/pow"
10
	"github.com/ethereum/go-ethereum/common"
11
	"github.com/ethereum/go-ethereum/core"
12 13
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/eth/downloader"
14
	"github.com/ethereum/go-ethereum/eth/fetcher"
15
	"github.com/ethereum/go-ethereum/event"
16 17 18 19 20 21
	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
	"github.com/ethereum/go-ethereum/p2p"
	"github.com/ethereum/go-ethereum/rlp"
)

22 23 24 25 26
// This is the target maximum size of returned blocks for the
// getBlocks message. The reply message may exceed it
// if a single block is larger than the limit.
const maxBlockRespSize = 2 * 1024 * 1024

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
func errResp(code errCode, format string, v ...interface{}) error {
	return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}

type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error

// extProt is an interface which is passed around so we can expose GetHashes and GetBlock without exposing it to the rest of the protocol
// extProt is passed around to peers which require to GetHashes and GetBlocks
type extProt struct {
	getHashes hashFetcherFn
	getBlocks blockFetcherFn
}

func (ep extProt) GetHashes(hash common.Hash) error    { return ep.getHashes(hash) }
func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }

44
type ProtocolManager struct {
45 46
	protVer, netId int
	txpool         txPool
47
	chainman       *core.ChainManager
48
	downloader     *downloader.Downloader
49
	fetcher        *fetcher.Fetcher
50
	peers          *peerSet
51 52

	SubProtocol p2p.Protocol
53 54 55 56

	eventMux      *event.TypeMux
	txSub         event.Subscription
	minedBlockSub event.Subscription
57

58
	// channels for fetcher, syncer, txsyncLoop
59 60 61
	newPeerCh chan *peer
	txsyncCh  chan *txsync
	quitSync  chan struct{}
62

63 64 65 66
	// wait group is used for graceful shutdowns during downloading
	// and processing
	wg   sync.WaitGroup
	quit bool
67 68
}

69 70
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
71
func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager {
72
	// Create the protocol manager and initialize peer handlers
73
	manager := &ProtocolManager{
74 75 76 77 78 79 80
		eventMux:  mux,
		txpool:    txpool,
		chainman:  chainman,
		peers:     newPeerSet(),
		newPeerCh: make(chan *peer, 1),
		txsyncCh:  make(chan *txsync),
		quitSync:  make(chan struct{}),
81
	}
82 83 84 85 86 87
	manager.SubProtocol = p2p.Protocol{
		Name:    "eth",
		Version: uint(protocolVersion),
		Length:  ProtocolLength,
		Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
			peer := manager.newPeer(protocolVersion, networkId, p, rw)
88 89
			manager.newPeerCh <- peer
			return manager.handle(peer)
90 91
		},
	}
92 93 94
	// Construct the different synchronisation mechanisms
	manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)

95 96 97
	validator := func(block *types.Block, parent *types.Block) error {
		return core.ValidateHeader(pow, block.Header(), parent.Header(), true)
	}
98 99 100
	heighter := func() uint64 {
		return manager.chainman.CurrentBlock().NumberU64()
	}
101
	manager.fetcher = fetcher.New(manager.chainman.GetBlock, validator, manager.BroadcastBlock, heighter, manager.chainman.InsertChain, manager.removePeer)
102 103

	return manager
104 105
}

106
func (pm *ProtocolManager) removePeer(id string) {
107 108 109 110 111 112
	// Short circuit if the peer was already removed
	peer := pm.peers.Peer(id)
	if peer == nil {
		return
	}
	glog.V(logger.Debug).Infoln("Removing peer", id)
113

114 115
	// Unregister the peer from the downloader and Ethereum peer set
	pm.downloader.UnregisterPeer(id)
116
	if err := pm.peers.Unregister(id); err != nil {
117 118
		glog.V(logger.Error).Infoln("Removal failed:", err)
	}
119 120 121 122
	// Hard disconnect at the networking layer
	if peer != nil {
		peer.Peer.Disconnect(p2p.DiscUselessPeer)
	}
123 124
}

125 126 127 128 129 130 131
func (pm *ProtocolManager) Start() {
	// broadcast transactions
	pm.txSub = pm.eventMux.Subscribe(core.TxPreEvent{})
	go pm.txBroadcastLoop()
	// broadcast mined blocks
	pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
	go pm.minedBroadcastLoop()
132

133
	// start sync handlers
134
	go pm.syncer()
135
	go pm.txsyncLoop()
136 137 138
}

func (pm *ProtocolManager) Stop() {
139 140 141 142 143
	// Showing a log message. During download / process this could actually
	// take between 5 to 10 seconds and therefor feedback is required.
	glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")

	pm.quit = true
144 145
	pm.txSub.Unsubscribe()         // quits txBroadcastLoop
	pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
146
	close(pm.quitSync)             // quits syncer, fetcher, txsyncLoop
147 148 149 150 151

	// Wait for any process action
	pm.wg.Wait()

	glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
152 153
}

154
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
155 156
	td, current, genesis := pm.chainman.Status()

157
	return newPeer(pv, nv, genesis, current, td, p, rw)
158 159
}

160
func (pm *ProtocolManager) handle(p *peer) error {
161
	// Execute the Ethereum handshake.
162 163 164
	if err := p.handleStatus(); err != nil {
		return err
	}
165 166

	// Register the peer locally.
167 168 169 170 171
	glog.V(logger.Detail).Infoln("Adding peer", p.id)
	if err := pm.peers.Register(p); err != nil {
		glog.V(logger.Error).Infoln("Addition failed:", err)
		return err
	}
172
	defer pm.removePeer(p.id)
173

174 175
	// Register the peer in the downloader. If the downloader
	// considers it banned, we disconnect.
176
	if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
177 178
		return err
	}
179 180

	// Propagate existing transactions. new transactions appearing
181
	// after this will be sent via broadcasts.
182 183
	pm.syncTransactions(p)

184 185 186 187 188 189 190 191 192
	// main loop. handle incoming messages.
	for {
		if err := pm.handleMsg(p); err != nil {
			return err
		}
	}
	return nil
}

193
func (pm *ProtocolManager) handleMsg(p *peer) error {
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
	msg, err := p.rw.ReadMsg()
	if err != nil {
		return err
	}
	if msg.Size > ProtocolMaxMsgSize {
		return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
	}
	// make sure that the payload has been fully consumed
	defer msg.Discard()

	switch msg.Code {
	case StatusMsg:
		return errResp(ErrExtraStatusMsg, "uncontrolled status message")

	case TxMsg:
		// TODO: rework using lazy RLP stream
		var txs []*types.Transaction
		if err := msg.Decode(&txs); err != nil {
			return errResp(ErrDecode, "msg %v: %v", msg, err)
		}
		for i, tx := range txs {
			if tx == nil {
				return errResp(ErrDecode, "transaction %d is nil", i)
			}
			jsonlogger.LogJson(&logger.EthTxReceived{
				TxHash:   tx.Hash().Hex(),
				RemoteId: p.ID().String(),
			})
		}
223
		pm.txpool.AddTransactions(txs)
224 225 226 227 228 229 230

	case GetBlockHashesMsg:
		var request getBlockHashesMsgData
		if err := msg.Decode(&request); err != nil {
			return errResp(ErrDecode, "->msg %v: %v", msg, err)
		}

231 232
		if request.Amount > uint64(downloader.MaxHashFetch) {
			request.Amount = uint64(downloader.MaxHashFetch)
233
		}
234

235
		hashes := pm.chainman.GetBlockHashesFromHash(request.Hash, request.Amount)
236 237 238 239 240 241 242 243

		if glog.V(logger.Debug) {
			if len(hashes) == 0 {
				glog.Infof("invalid block hash %x", request.Hash.Bytes()[:4])
			}
		}

		// returns either requested hashes or nothing (i.e. not found)
244
		return p.sendBlockHashes(hashes)
245

246
	case BlockHashesMsg:
247
		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
248 249 250 251 252

		var hashes []common.Hash
		if err := msgStream.Decode(&hashes); err != nil {
			break
		}
253
		err := pm.downloader.DeliverHashes(p.id, hashes)
254 255 256
		if err != nil {
			glog.V(logger.Debug).Infoln(err)
		}
257 258

	case GetBlocksMsg:
259
		// Decode the retrieval message
260
		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
261 262 263
		if _, err := msgStream.List(); err != nil {
			return err
		}
264
		// Gather blocks until the fetch or network limits is reached
265
		var (
266 267
			hash   common.Hash
			bytes  common.StorageSize
268
			hashes []common.Hash
269
			blocks []*types.Block
270
		)
271 272 273 274 275 276 277
		for {
			err := msgStream.Decode(&hash)
			if err == rlp.EOL {
				break
			} else if err != nil {
				return errResp(ErrDecode, "msg %v: %v", msg, err)
			}
278 279
			hashes = append(hashes, hash)

280 281
			// Retrieve the requested block, stopping if enough was found
			if block := pm.chainman.GetBlock(hash); block != nil {
282
				blocks = append(blocks, block)
283 284 285 286
				bytes += block.Size()
				if len(blocks) >= downloader.MaxBlockFetch || bytes > maxBlockRespSize {
					break
				}
287 288
			}
		}
289 290 291 292 293 294 295 296 297
		if glog.V(logger.Detail) && len(blocks) == 0 && len(hashes) > 0 {
			list := "["
			for _, hash := range hashes {
				list += fmt.Sprintf("%x, ", hash[:4])
			}
			list = list[:len(list)-2] + "]"

			glog.Infof("Peer %s: no blocks found for requested hashes %s", p.id, list)
		}
298
		return p.sendBlocks(blocks)
299

300
	case BlocksMsg:
301
		// Decode the arrived block message
302
		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
303 304

		var blocks []*types.Block
305 306 307 308
		if err := msgStream.Decode(&blocks); err != nil {
			glog.V(logger.Detail).Infoln("Decode error", err)
			blocks = nil
		}
309 310 311 312
		// Update the receive timestamp of each block
		for i:=0; i<len(blocks); i++ {
			blocks[i].ReceivedAt = msg.ReceivedAt
		}
313
		// Filter out any explicitly requested blocks, deliver the rest to the downloader
314 315
		if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 {
			pm.downloader.DeliverBlocks(p.id, blocks)
316 317 318 319 320 321 322 323 324 325 326 327 328
		}

	case NewBlockHashesMsg:
		// Retrieve and deseralize the remote new block hashes notification
		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))

		var hashes []common.Hash
		if err := msgStream.Decode(&hashes); err != nil {
			break
		}
		// Mark the hashes as present at the remote node
		for _, hash := range hashes {
			p.blockHashes.Add(hash)
329
			p.SetHead(hash)
330
		}
331 332 333
		// Schedule all the unknown hashes for retrieval
		unknown := make([]common.Hash, 0, len(hashes))
		for _, hash := range hashes {
334
			if !pm.chainman.HasBlock(hash) {
335
				unknown = append(unknown, hash)
336
			}
337
		}
338
		for _, hash := range unknown {
339
			pm.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
340
		}
341 342

	case NewBlockMsg:
343
		// Retrieve and decode the propagated block
344 345 346 347 348 349 350
		var request newBlockMsgData
		if err := msg.Decode(&request); err != nil {
			return errResp(ErrDecode, "%v: %v", msg, err)
		}
		if err := request.Block.ValidateFields(); err != nil {
			return errResp(ErrDecode, "block validation %v: %v", msg, err)
		}
351
		request.Block.ReceivedAt = msg.ReceivedAt
352

353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
		// Mark the block's arrival for whatever reason
		_, chainHead, _ := pm.chainman.Status()
		jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
			BlockHash:     request.Block.Hash().Hex(),
			BlockNumber:   request.Block.Number(),
			ChainHeadHash: chainHead.Hex(),
			BlockPrevHash: request.Block.ParentHash().Hex(),
			RemoteId:      p.ID().String(),
		})
		// Mark the peer as owning the block and schedule it for import
		p.blockHashes.Add(request.Block.Hash())
		p.SetHead(request.Block.Hash())

		pm.fetcher.Enqueue(p.id, request.Block)

		// TODO: Schedule a sync to cover potential gaps (this needs proto update)
		p.SetTd(request.TD)
		go pm.synchronise(p)
O
obscuren 已提交
371

372 373 374 375 376
	default:
		return errResp(ErrInvalidMsgCode, "%v", msg.Code)
	}
	return nil
}
377

378 379 380
// BroadcastBlock will either propagate a block to a subset of it's peers, or
// will only announce it's availability (depending what's requested).
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
381 382
	hash := block.Hash()
	peers := pm.peers.PeersWithoutBlock(hash)
383

384 385 386 387 388 389 390
	// If propagation is requested, send to a subset of the peer
	if propagate {
		transfer := peers[:int(math.Sqrt(float64(len(peers))))]
		for _, peer := range transfer {
			peer.sendNewBlock(block)
		}
		glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt))
391
	}
392 393 394 395 396 397
	// Otherwise if the block is indeed in out own chain, announce it
	if pm.chainman.HasBlock(hash) {
		for _, peer := range peers {
			peer.sendNewBlockHashes([]common.Hash{hash})
		}
		glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
398 399
	}
}
400 401 402 403 404

// BroadcastTx will propagate the block to its connected peers. It will sort
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
405
	// Broadcast transaction to a batch of peers not knowing about it
406
	peers := pm.peers.PeersWithoutTx(hash)
O
obscuren 已提交
407
	//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
408 409 410 411 412 413 414 415 416 417 418 419
	for _, peer := range peers {
		peer.sendTransaction(tx)
	}
	glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
}

// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
	// automatically stops if unsubscribe
	for obj := range self.minedBlockSub.Chan() {
		switch ev := obj.(type) {
		case core.NewMinedBlockEvent:
420 421
			self.BroadcastBlock(ev.Block, false)
			self.BroadcastBlock(ev.Block, true)
422 423 424 425 426 427 428 429 430 431 432
		}
	}
}

func (self *ProtocolManager) txBroadcastLoop() {
	// automatically stops if unsubscribe
	for obj := range self.txSub.Chan() {
		event := obj.(core.TxPreEvent)
		self.BroadcastTx(event.Tx.Hash(), event.Tx)
	}
}