downloader.go 39.5 KB
Newer Older
F
Felix Lange 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with go-ethereum.  If not, see <http://www.gnu.org/licenses/>.

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"bytes"
22
	"errors"
23
	"math"
24
	"math/rand"
25 26 27 28 29
	"sync"
	"sync/atomic"
	"time"

	"github.com/ethereum/go-ethereum/common"
30
	"github.com/ethereum/go-ethereum/core"
31
	"github.com/ethereum/go-ethereum/core/types"
32
	"github.com/ethereum/go-ethereum/event"
33 34
	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
35
	"gopkg.in/fatih/set.v0"
36 37
)

38 39 40 41 42
const (
	eth60 = 60 // Constant to check for old protocol support
	eth61 = 61 // Constant to check for new protocol support
)

43
var (
44 45 46
	MinHashFetch  = 512 // Minimum amount of hashes to not consider a peer stalling
	MaxHashFetch  = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
47

48
	hashTTL         = 5 * time.Second  // Time it takes for a hash request to time out
49 50 51
	blockSoftTTL    = 3 * time.Second  // Request completion threshold for increasing or decreasing a peer's bandwidth
	blockHardTTL    = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired
	crossCheckCycle = time.Second      // Period after which to check for expired cross checks
52

53 54 55
	maxQueuedHashes = 256 * 1024 // Maximum number of hashes to queue for import (DOS protection)
	maxBannedHashes = 4096       // Number of bannable hashes before phasing old ones out
	maxBlockProcess = 256        // Number of blocks to import at once into the chain
56
)
57

58
var (
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
	errBusy             = errors.New("busy")
	errUnknownPeer      = errors.New("peer is unknown or unhealthy")
	errBadPeer          = errors.New("action from bad peer ignored")
	errStallingPeer     = errors.New("peer is stalling")
	errBannedHead       = errors.New("peer head hash already banned")
	errNoPeers          = errors.New("no peers to keep download active")
	errPendingQueue     = errors.New("pending items in queue")
	errTimeout          = errors.New("timeout")
	errEmptyHashSet     = errors.New("empty hash set by peer")
	errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
	errAlreadyInPool    = errors.New("hash already in pool")
	errInvalidChain     = errors.New("retrieved hash chain is invalid")
	errCrossCheckFailed = errors.New("block cross-check failed")
	errCancelHashFetch  = errors.New("hash fetching canceled (requested)")
	errCancelBlockFetch = errors.New("block downloading canceled (requested)")
	errNoSyncActive     = errors.New("no sync active")
75 76
)

77
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
78
type hashCheckFn func(common.Hash) bool
79 80 81 82

// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block

83 84 85
// headRetrievalFn is a callback type for retrieving the head block from the local chain.
type headRetrievalFn func() *types.Block

86 87 88
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)

89 90
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
91

O
obscuren 已提交
92 93 94 95 96
type blockPack struct {
	peerId string
	blocks []*types.Block
}

O
obscuren 已提交
97 98 99 100 101
type hashPack struct {
	peerId string
	hashes []common.Hash
}

102 103 104 105 106
type crossCheck struct {
	expire time.Time
	parent common.Hash
}

107
type Downloader struct {
108 109
	mux *event.TypeMux

110 111 112
	queue  *queue                      // Scheduler for selecting the hashes to download
	peers  *peerSet                    // Set of active peers from which download can proceed
	checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
113
	banned *set.Set                    // Set of hashes we've received and banned
114

115 116
	interrupt int32 // Atomic boolean to signal termination

117
	// Statistics
118 119 120
	importStart time.Time // Instance when the last blocks were taken from the cache
	importQueue []*Block  // Previously taken blocks to check import progress
	importDone  int       // Number of taken blocks already imported from the last batch
121 122
	importLock  sync.Mutex

123
	// Callbacks
124 125
	hasBlock    hashCheckFn      // Checks if a block is present in the chain
	getBlock    blockRetrievalFn // Retrieves a block from the chain
126
	headBlock   headRetrievalFn  // Retrieves the head block from the chain
127
	insertChain chainInsertFn    // Injects a batch of blocks into the chain
128
	dropPeer    peerDropFn       // Drops a peer for misbehaving
129

130
	// Status
131 132
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
133
	processing      int32
134
	notified        int32
135 136 137

	// Channels
	newPeerCh chan *peer
138 139 140
	hashCh    chan hashPack  // Channel receiving inbound hashes
	blockCh   chan blockPack // Channel receiving inbound blocks
	processCh chan bool      // Channel to signal the block fetcher of new or finished work
141 142 143

	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex  // Lock to protect the cancel channel in delivers
144 145
}

146 147 148 149 150 151
// Block is an origin-tagged blockchain block.
type Block struct {
	RawBlock   *types.Block
	OriginPeer string
}

152
// New creates a new downloader to fetch hashes and blocks from remote peers.
153
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
154
	// Create the base downloader
155
	downloader := &Downloader{
156 157 158 159 160
		mux:         mux,
		queue:       newQueue(),
		peers:       newPeerSet(),
		hasBlock:    hasBlock,
		getBlock:    getBlock,
161
		headBlock:   headBlock,
162 163 164 165 166
		insertChain: insertChain,
		dropPeer:    dropPeer,
		newPeerCh:   make(chan *peer, 1),
		hashCh:      make(chan hashPack, 1),
		blockCh:     make(chan blockPack, 1),
167
		processCh:   make(chan bool, 1),
168
	}
169
	// Inject all the known bad hashes
170
	downloader.banned = set.New()
171 172 173
	for hash, _ := range core.BadHashes {
		downloader.banned.Add(hash)
	}
174 175 176
	return downloader
}

177
// Stats retrieves the current status of the downloader.
178
func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
179 180 181
	// Fetch the download status
	pending, cached = d.queue.Size()

182
	// Figure out the import progress
183 184 185
	d.importLock.Lock()
	defer d.importLock.Unlock()

186
	for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
187
		d.importQueue = d.importQueue[1:]
188
		d.importDone++
189 190 191
	}
	importing = len(d.importQueue)

192 193 194 195 196
	// Make an estimate on the total sync
	estimate = 0
	if d.importDone > 0 {
		estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
	}
197
	return
O
obscuren 已提交
198 199
}

200
// Synchronising returns whether the downloader is currently retrieving blocks.
201 202 203 204
func (d *Downloader) Synchronising() bool {
	return atomic.LoadInt32(&d.synchronising) > 0
}

205 206
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
207
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) error {
208 209 210 211 212 213
	// If the peer wants to send a banned hash, reject
	if d.banned.Has(head) {
		glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id)
		return errBannedHead
	}
	// Otherwise try to construct and register the peer
214
	glog.V(logger.Detail).Infoln("Registering peer", id)
215
	if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks)); err != nil {
216 217 218
		glog.V(logger.Error).Infoln("Register failed:", err)
		return err
	}
219 220 221
	return nil
}

222 223 224 225 226 227 228 229 230
// UnregisterPeer remove a peer from the known list, preventing any action from
// the specified peer.
func (d *Downloader) UnregisterPeer(id string) error {
	glog.V(logger.Detail).Infoln("Unregistering peer", id)
	if err := d.peers.Unregister(id); err != nil {
		glog.V(logger.Error).Infoln("Unregister failed:", err)
		return err
	}
	return nil
231 232
}

233 234 235 236 237 238 239 240 241 242 243 244
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash) {
	glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head)

	switch err := d.synchronise(id, head); err {
	case nil:
		glog.V(logger.Detail).Infof("Synchronisation completed")

	case errBusy:
		glog.V(logger.Detail).Infof("Synchronisation already in progress")

245
	case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed:
246 247 248 249 250 251 252 253 254 255 256 257
		glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
		d.dropPeer(id)

	case errPendingQueue:
		glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)

	default:
		glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
	}
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
258
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
259
// checks fail an error will be returned. This method is synchronous
260
func (d *Downloader) synchronise(id string, hash common.Hash) error {
261 262 263 264
	// Mock out the synchonisation if testing
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
265
	// Make sure only one goroutine is ever allowed past this point at once
266
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
267
		return errBusy
268
	}
269
	defer atomic.StoreInt32(&d.synchronising, 0)
270

271 272
	// If the head hash is banned, terminate immediately
	if d.banned.Has(hash) {
273
		return errBannedHead
274
	}
275 276 277 278
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
		glog.V(logger.Info).Infoln("Block synchronisation started")
	}
279
	// Abort if the queue still contains some leftover data
280
	if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
281
		return errPendingQueue
282
	}
283
	// Reset the queue and peer set to clean any internal leftover state
284
	d.queue.Reset()
285
	d.peers.Reset()
286
	d.checks = make(map[common.Hash]*crossCheck)
287

288 289 290 291 292
	// Create cancel channel for aborting mid-flight
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
	d.cancelLock.Unlock()

293
	// Retrieve the origin peer and initiate the downloading process
294
	p := d.peers.Peer(id)
295
	if p == nil {
296
		return errUnknownPeer
297
	}
298
	return d.syncWithPeer(p, hash)
299 300
}

301 302
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
303
func (d *Downloader) Has(hash common.Hash) bool {
304
	return d.queue.Has(hash)
305 306
}

307 308 309
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
O
obscuren 已提交
310
	d.mux.Post(StartEvent{})
311 312 313
	defer func() {
		// reset on error
		if err != nil {
314
			d.cancel()
315 316 317
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
318 319
		}
	}()
320

321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
	glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version)
	switch p.version {
	case eth60:
		// Old eth/60 version, use reverse hash retrieval algorithm
		if err = d.fetchHashes60(p, hash); err != nil {
			return err
		}
		if err = d.fetchBlocks60(); err != nil {
			return err
		}
	case eth61:
		// New eth/61, use forward, concurrent hash and block retrieval algorithm
		number, err := d.findAncestor(p)
		if err != nil {
			return err
		}
		errc := make(chan error, 2)
		go func() { errc <- d.fetchHashes(p, number+1) }()
		go func() { errc <- d.fetchBlocks(number + 1) }()

		// If any fetcher fails, cancel the other
		if err := <-errc; err != nil {
			d.cancel()
			<-errc
			return err
		}
		return <-errc

	default:
		// Something very wrong, stop right here
		glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
		return errBadPeer
353
	}
354
	glog.V(logger.Debug).Infoln("Synchronization completed")
355 356

	return nil
357 358
}

359
// cancel cancels all of the operations and resets the queue. It returns true
360
// if the cancel operation was completed.
361
func (d *Downloader) cancel() {
362
	// Close the current cancel channel
363
	d.cancelLock.Lock()
364 365 366 367 368 369 370
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
371 372
	}
	d.cancelLock.Unlock()
373

374
	// Reset the queue
375 376 377
	d.queue.Reset()
}

378 379 380 381 382 383
// Terminate interrupts the downloader, canceling all pending operations.
func (d *Downloader) Terminate() {
	atomic.StoreInt32(&d.interrupt, 1)
	d.cancel()
}

384
// fetchHashes60 starts retrieving hashes backwards from a specific peer and hash,
385 386
// up until it finds a common ancestor. If the source peer times out, alternative
// ones are tried for continuation.
387
func (d *Downloader) fetchHashes60(p *peer, h common.Hash) error {
O
obscuren 已提交
388
	var (
389
		start  = time.Now()
390 391 392
		active = p             // active peer will help determine the current active peer
		head   = common.Hash{} // common and last hash

393
		timeout     = time.NewTimer(0)                // timer to dump a non-responsive active peer
394 395
		attempted   = make(map[string]bool)           // attempted peers will help with retries
		crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
O
obscuren 已提交
396
	)
397
	defer crossTicker.Stop()
398 399 400 401 402 403
	defer timeout.Stop()

	glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
	<-timeout.C // timeout channel should be initially empty.

	getHashes := func(from common.Hash) {
404
		go active.getRelHashes(from)
405 406 407 408
		timeout.Reset(hashTTL)
	}

	// Add the hash to the queue, and start hash retrieval.
409
	d.queue.Insert([]common.Hash{h}, false)
410
	getHashes(h)
411

412 413
	attempted[p.id] = true
	for finished := false; !finished; {
414
		select {
415 416
		case <-d.cancelCh:
			return errCancelHashFetch
417

O
obscuren 已提交
418
		case hashPack := <-d.hashCh:
419
			// Make sure the active peer is giving us the hashes
420
			if hashPack.peerId != active.id {
421
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
O
obscuren 已提交
422 423
				break
			}
424
			timeout.Stop()
O
obscuren 已提交
425

426 427
			// Make sure the peer actually gave something valid
			if len(hashPack.hashes) == 0 {
428
				glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
429
				return errEmptyHashSet
430
			}
431
			for index, hash := range hashPack.hashes {
432
				if d.banned.Has(hash) {
433
					glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id)
434

435
					d.queue.Insert(hashPack.hashes[:index+1], false)
436 437 438
					if err := d.banBlocks(active.id, hash); err != nil {
						glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
					}
439
					return errInvalidChain
440 441
				}
			}
442 443
			// Determine if we're done fetching hashes (queue up all pending), and continue if not done
			done, index := false, 0
444 445
			for index, head = range hashPack.hashes {
				if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
446
					glog.V(logger.Debug).Infof("Found common hash %x", head[:4])
447
					hashPack.hashes = hashPack.hashes[:index]
448 449 450 451
					done = true
					break
				}
			}
452
			// Insert all the new hashes, but only continue if got something useful
453
			inserts := d.queue.Insert(hashPack.hashes, false)
454
			if len(inserts) == 0 && !done {
455
				glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
456
				return errBadPeer
457 458
			}
			if !done {
459
				// Check that the peer is not stalling the sync
460
				if len(inserts) < MinHashFetch {
461
					return errStallingPeer
462
				}
463
				// Try and fetch a random block to verify the hash batch
464
				// Skip the last hash as the cross check races with the next hash fetch
465 466 467
				cross := rand.Intn(len(inserts) - 1)
				origin, parent := inserts[cross], inserts[cross+1]
				glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent)
468

469
				d.checks[origin] = &crossCheck{
470
					expire: time.Now().Add(blockSoftTTL),
471 472
					parent: parent,
				}
473
				go active.getBlocks([]common.Hash{origin})
474

475
				// Also fetch a fresh batch of hashes
476
				getHashes(head)
477
				continue
478
			}
479
			// We're done, prepare the download cache and proceed pulling the blocks
480
			offset := uint64(0)
481
			if block := d.getBlock(head); block != nil {
482
				offset = block.NumberU64() + 1
483
			}
484
			d.queue.Prepare(offset)
485 486 487 488 489 490 491
			finished = true

		case blockPack := <-d.blockCh:
			// Cross check the block with the random verifications
			if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
				continue
			}
492
			block := blockPack.blocks[0]
493 494
			if check, ok := d.checks[block.Hash()]; ok {
				if block.ParentHash() != check.parent {
495
					return errCrossCheckFailed
496 497 498
				}
				delete(d.checks, block.Hash())
			}
499 500 501

		case <-crossTicker.C:
			// Iterate over all the cross checks and fail the hash chain if they're not verified
502 503
			for hash, check := range d.checks {
				if time.Now().After(check.expire) {
504
					glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
505
					return errCrossCheckFailed
506 507
				}
			}
508

509
		case <-timeout.C:
510
			glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id)
511

O
obscuren 已提交
512 513 514 515
			var p *peer // p will be set if a peer can be found
			// Attempt to find a new peer by checking inclusion of peers best hash in our
			// already fetched hash list. This can't guarantee 100% correctness but does
			// a fair job. This is always either correct or false incorrect.
516
			for _, peer := range d.peers.AllPeers() {
517
				if d.queue.Has(peer.head) && !attempted[peer.id] {
O
obscuren 已提交
518 519 520 521 522 523
					p = peer
					break
				}
			}
			// if all peers have been tried, abort the process entirely or if the hash is
			// the zero hash.
524
			if p == nil || (head == common.Hash{}) {
525
				return errTimeout
O
obscuren 已提交
526 527 528
			}
			// set p to the active peer. this will invalidate any hashes that may be returned
			// by our previous (delayed) peer.
529
			active = p
530
			getHashes(head)
531
			glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id)
532 533
		}
	}
534
	glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start))
535 536 537 538

	return nil
}

539
// fetchBlocks60 iteratively downloads the entire schedules block-chain, taking
540 541
// any available peers, reserving a chunk of blocks for each, wait for delivery
// and periodically checking for timeouts.
542
func (d *Downloader) fetchBlocks60() error {
543
	glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
544 545
	start := time.Now()

546
	// Start a ticker to continue throttled downloads and check for bad peers
547
	ticker := time.NewTicker(20 * time.Millisecond)
548 549
	defer ticker.Stop()

550 551 552
out:
	for {
		select {
553 554
		case <-d.cancelCh:
			return errCancelBlockFetch
555

556 557 558
		case <-d.hashCh:
			// Out of bounds hashes received, ignore them

559
		case blockPack := <-d.blockCh:
560 561 562 563 564
			// Short circuit if it's a stale cross check
			if len(blockPack.blocks) == 1 {
				block := blockPack.blocks[0]
				if _, ok := d.checks[block.Hash()]; ok {
					delete(d.checks, block.Hash())
565
					break
566 567
				}
			}
568 569
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
570
			if peer := d.peers.Peer(blockPack.peerId); peer != nil {
571
				// Deliver the received chunk of blocks, and demote in case of errors
572 573 574 575 576 577 578 579 580
				err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
				switch err {
				case nil:
					// If no blocks were delivered, demote the peer (need the delivery above)
					if len(blockPack.blocks) == 0 {
						peer.Demote()
						peer.SetIdle()
						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
						break
581
					}
582
					// All was successful, promote the peer and potentially start processing
583 584 585
					peer.Promote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
586
					go d.process()
587

588
				case errInvalidChain:
589 590 591 592 593 594
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
595
					peer.Demote()
596
					peer.SetIdle()
597 598 599 600 601 602 603 604 605 606 607 608
					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: stale delivery", peer)

				default:
					// Peer did something semi-useful, demote but keep it around
609 610
					peer.Demote()
					peer.SetIdle()
611
					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
612
					go d.process()
613
				}
614
			}
615

616
		case <-ticker.C:
617 618 619 620 621
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
			// Check for block request timeouts and demote the responsible peers
622
			badPeers := d.queue.Expire(blockHardTTL)
623
			for _, pid := range badPeers {
624 625
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
626
					glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
627
				}
628
			}
629
			// If there are unrequested hashes left start fetching from the available peers
630 631 632
			if d.queue.Pending() > 0 {
				// Throttle the download if block cache is full and waiting processing
				if d.queue.Throttle() {
633
					break
634
				}
635
				// Send a download request to all idle peers, until throttled
636 637
				idlePeers := d.peers.IdlePeers()
				for _, peer := range idlePeers {
638 639 640 641
					// Short circuit if throttling activated since above
					if d.queue.Throttle() {
						break
					}
642 643
					// Get a possible chunk. If nil is returned no chunk
					// could be returned due to no hashes available.
644
					request := d.queue.Reserve(peer, peer.Capacity())
645
					if request == nil {
646 647
						continue
					}
648 649 650
					if glog.V(logger.Detail) {
						glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
					}
651 652 653
					// Fetch the chunk and check for error. If the peer was somehow
					// already fetching a chunk due to a bug, it will be returned to
					// the queue
654
					if err := peer.Fetch(request); err != nil {
655
						glog.V(logger.Error).Infof("Peer %s received double work", peer.id)
656
						d.queue.Cancel(request)
657 658
					}
				}
659
				// Make sure that we have peers available for fetching. If all peers have been tried
660
				// and all failed throw an error
661
				if d.queue.InFlight() == 0 {
662
					return errPeersUnavailable
663 664
				}

665 666
			} else if d.queue.InFlight() == 0 {
				// When there are no more queue and no more in flight, We can
667 668 669 670 671 672
				// safely assume we're done. Another part of the process will  check
				// for parent errors and will re-request anything that's missing
				break out
			}
		}
	}
673 674 675 676
	glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
	return nil
}

677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
// findAncestor tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
// In the rare scenario when we ended up on a long soft fork (i.e. none of the
// head blocks match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peer) (uint64, error) {
	glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)

	// Request out head blocks to short circuit ancestor location
	head := d.headBlock().NumberU64()
	from := int64(head) - int64(MaxHashFetch)
	if from < 0 {
		from = 0
	}
	go p.getAbsHashes(uint64(from), MaxHashFetch)

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
	timeout := time.After(hashTTL)

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
			return 0, errCancelHashFetch

		case hashPack := <-d.hashCh:
			// Discard anything not from the origin peer
			if hashPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
				break
			}
			// Make sure the peer actually gave something valid
			hashes := hashPack.hashes
			if len(hashes) == 0 {
				glog.V(logger.Debug).Infof("%v: empty head hash set", p)
				return 0, errEmptyHashSet
			}
			// Check if a common ancestor was found
			finished = true
			for i := len(hashes) - 1; i >= 0; i-- {
				if d.hasBlock(hashes[i]) {
					number, hash = uint64(from)+uint64(i), hashes[i]
					break
				}
			}

		case <-d.blockCh:
			// Out of bounds blocks received, ignore them

		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head hash timeout", p)
			return 0, errTimeout
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
		glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x]", p, number, hash[:4])
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

		timeout := time.After(hashTTL)
		go p.getAbsHashes(uint64(check), 1)

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
				return 0, errCancelHashFetch

			case hashPack := <-d.hashCh:
				// Discard anything not from the origin peer
				if hashPack.peerId != p.id {
					glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
					break
				}
				// Make sure the peer actually gave something valid
				hashes := hashPack.hashes
				if len(hashes) != 1 {
					glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
				block := d.getBlock(hashes[0])
				if block == nil {
					end = check
					break
				}
				if block.NumberU64() != check {
					glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
					return 0, errBadPeer
				}
				start = check

			case <-d.blockCh:
				// Out of bounds blocks received, ignore them

			case <-timeout:
				glog.V(logger.Debug).Infof("%v: search hash timeout", p)
				return 0, errTimeout
			}
		}
	}
	return start, nil
}

// fetchHashes keeps retrieving hashes from the requested number, until no more
// are returned, potentially throttling on the way.
func (d *Downloader) fetchHashes(p *peer, from uint64) error {
	glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)

	// Create a timeout timer, and the associated hash fetcher
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

	getHashes := func(from uint64) {
800 801
		glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)

802 803 804 805 806
		go p.getAbsHashes(from, MaxHashFetch)
		timeout.Reset(hashTTL)
	}
	// Start pulling hashes, until all are exhausted
	getHashes(from)
807 808
	gotHashes := false

809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829
	for {
		select {
		case <-d.cancelCh:
			return errCancelHashFetch

		case hashPack := <-d.hashCh:
			// Make sure the active peer is giving us the hashes
			if hashPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
				break
			}
			timeout.Stop()

			// If no more hashes are inbound, notify the block fetcher and return
			if len(hashPack.hashes) == 0 {
				glog.V(logger.Debug).Infof("%v: no available hashes", p)

				select {
				case d.processCh <- false:
				case <-d.cancelCh:
				}
830 831 832 833
				// Error out if no hashes were retrieved at all
				if !gotHashes {
					return errStallingPeer
				}
834 835
				return nil
			}
836 837
			gotHashes = true

838
			// Otherwise insert all the new hashes, aborting in case of junk
839 840
			glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from)

841 842 843 844 845
			inserts := d.queue.Insert(hashPack.hashes, true)
			if len(inserts) != len(hashPack.hashes) {
				glog.V(logger.Debug).Infof("%v: stale hashes", p)
				return errBadPeer
			}
846 847
			// Notify the block fetcher of new hashes, but stop if queue is full
			cont := d.queue.Pending() < maxQueuedHashes
848
			select {
849
			case d.processCh <- cont:
850 851
			default:
			}
852 853 854 855
			if !cont {
				return nil
			}
			// Queue not yet full, fetch the next batch
856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
			from += uint64(len(hashPack.hashes))
			getHashes(from)

		case <-timeout.C:
			glog.V(logger.Debug).Infof("%v: hash request timed out", p)
			return errTimeout
		}
	}
}

// fetchBlocks iteratively downloads the scheduled hashes, taking any available
// peers, reserving a chunk of blocks for each, waiting for delivery and also
// periodically checking for timeouts.
func (d *Downloader) fetchBlocks(from uint64) error {
	glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
	defer glog.V(logger.Debug).Infof("Block download terminated")

	// Create a timeout timer for scheduling expiration tasks
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

	// Prepare the queue and fetch blocks until the hash fetcher's done
	d.queue.Prepare(from)
	finished := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelBlockFetch

		case blockPack := <-d.blockCh:
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
			if peer := d.peers.Peer(blockPack.peerId); peer != nil {
				// Deliver the received chunk of blocks, and demote in case of errors
				err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
				switch err {
				case nil:
					// If no blocks were delivered, demote the peer (need the delivery above)
					if len(blockPack.blocks) == 0 {
						peer.Demote()
						peer.SetIdle()
						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
						break
					}
					// All was successful, promote the peer and potentially start processing
					peer.Promote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
					go d.process()

				case errInvalidChain:
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
					peer.Demote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: stale delivery", peer)

				default:
					// Peer did something semi-useful, demote but keep it around
					peer.Demote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
					go d.process()
				}
			}
			// Blocks arrived, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case cont := <-d.processCh:
			// The hash fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Hashes arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
			// Check for block request timeouts and demote the responsible peers
			for _, pid := range d.queue.Expire(blockHardTTL) {
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
				}
			}
			// If there's noting more to fetch, wait or terminate
			if d.queue.Pending() == 0 {
				if d.queue.InFlight() == 0 && finished {
					glog.V(logger.Debug).Infof("Block fetching completed")
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
			for _, peer := range d.peers.IdlePeers() {
				// Short circuit if throttling activated
				if d.queue.Throttle() {
					break
				}
				// Reserve a chunk of hashes for a peer. A nil can mean either that
				// no more hashes are available, or that the peer is known not to
				// have them.
				request := d.queue.Reserve(peer, peer.Capacity())
				if request == nil {
					continue
				}
				if glog.V(logger.Detail) {
					glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
				}
				// Fetch the chunk and make sure any errors return the hashes to the queue
				if err := peer.Fetch(request); err != nil {
					glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
					d.queue.Cancel(request)
				}
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
			if !d.queue.Throttle() && d.queue.InFlight() == 0 {
				return errPeersUnavailable
			}
		}
	}
}

1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024
// banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes,
// and bans the head of the retrieved batch.
//
// This method only fetches one single batch as the goal is not ban an entire
// (potentially long) invalid chain - wasting a lot of time in the meanwhile -,
// but rather to gradually build up a blacklist if the peer keeps reconnecting.
func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
	glog.V(logger.Debug).Infof("Banning a batch out of %d blocks from %s", d.queue.Pending(), peerId)

	// Ask the peer being banned for a batch of blocks from the banning point
	peer := d.peers.Peer(peerId)
	if peer == nil {
		return nil
	}
1025
	request := d.queue.Reserve(peer, MaxBlockFetch)
1026 1027 1028 1029 1030 1031 1032
	if request == nil {
		return nil
	}
	if err := peer.Fetch(request); err != nil {
		return err
	}
	// Wait a bit for the reply to arrive, and ban if done so
1033
	timeout := time.After(blockHardTTL)
1034 1035 1036 1037 1038 1039
	for {
		select {
		case <-d.cancelCh:
			return errCancelBlockFetch

		case <-timeout:
1040
			return errTimeout
1041

1042 1043 1044
		case <-d.hashCh:
			// Out of bounds hashes received, ignore them

1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063
		case blockPack := <-d.blockCh:
			blocks := blockPack.blocks

			// Short circuit if it's a stale cross check
			if len(blocks) == 1 {
				block := blocks[0]
				if _, ok := d.checks[block.Hash()]; ok {
					delete(d.checks, block.Hash())
					break
				}
			}
			// Short circuit if it's not from the peer being banned
			if blockPack.peerId != peerId {
				break
			}
			// Short circuit if no blocks were returned
			if len(blocks) == 0 {
				return errors.New("no blocks returned to ban")
			}
1064 1065
			// Reconstruct the original chain order and ensure we're banning the correct blocks
			types.BlockBy(types.Number).Sort(blocks)
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075
			if bytes.Compare(blocks[0].Hash().Bytes(), head.Bytes()) != 0 {
				return errors.New("head block not the banned one")
			}
			index := 0
			for _, block := range blocks[1:] {
				if bytes.Compare(block.ParentHash().Bytes(), blocks[index].Hash().Bytes()) != 0 {
					break
				}
				index++
			}
1076
			// Ban the head hash and phase out any excess
1077
			d.banned.Add(blocks[index].Hash())
1078
			for d.banned.Size() > maxBannedHashes {
1079 1080
				var evacuate common.Hash

1081 1082 1083 1084 1085
				d.banned.Each(func(item interface{}) bool {
					// Skip any hard coded bans
					if core.BadHashes[item.(common.Hash)] {
						return true
					}
1086
					evacuate = item.(common.Hash)
1087 1088
					return false
				})
1089
				d.banned.Remove(evacuate)
1090 1091
			}
			glog.V(logger.Debug).Infof("Banned %d blocks from: %s", index+1, peerId)
1092 1093 1094 1095 1096
			return nil
		}
	}
}

1097
// process takes blocks from the queue and tries to import them into the chain.
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
//
// The algorithmic flow is as follows:
//  - The `processing` flag is swapped to 1 to ensure singleton access
//  - The current `cancel` channel is retrieved to detect sync abortions
//  - Blocks are iteratively taken from the cache and inserted into the chain
//  - When the cache becomes empty, insertion stops
//  - The `processing` flag is swapped back to 0
//  - A post-exit check is made whether new blocks became available
//     - This step is important: it handles a potential race condition between
//       checking for no more work, and releasing the processing "mutex". In
//       between these state changes, a block may have arrived, but a processing
//       attempt denied, so we need to re-enter to ensure the block isn't left
//       to idle in the cache.
1111
func (d *Downloader) process() {
1112 1113 1114 1115 1116 1117 1118 1119 1120
	// Make sure only one goroutine is ever allowed to process blocks at once
	if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
		return
	}
	// If the processor just exited, but there are freshly pending items, try to
	// reenter. This is needed because the goroutine spinned up for processing
	// the fresh blocks might have been rejected entry to to this present thread
	// not yet releasing the `processing` state.
	defer func() {
1121 1122
		if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
			d.process()
1123 1124
		}
	}()
1125 1126 1127 1128 1129 1130 1131 1132 1133 1134
	// Release the lock upon exit (note, before checking for reentry!), and set
	// the import statistics to zero.
	defer func() {
		d.importLock.Lock()
		d.importQueue = nil
		d.importDone = 0
		d.importLock.Unlock()

		atomic.StoreInt32(&d.processing, 0)
	}()
1135 1136 1137 1138 1139
	// Repeat the processing as long as there are blocks to import
	for {
		// Fetch the next batch of blocks
		blocks := d.queue.TakeBlocks()
		if len(blocks) == 0 {
1140
			return
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
		}
		// Reset the import statistics
		d.importLock.Lock()
		d.importStart = time.Now()
		d.importQueue = blocks
		d.importDone = 0
		d.importLock.Unlock()

		// Actually import the blocks
		glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
1151
		for len(blocks) != 0 {
1152
			// Check for any termination requests
1153
			if atomic.LoadInt32(&d.interrupt) == 1 {
1154
				return
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164
			}
			// Retrieve the first batch of blocks to insert
			max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
			raw := make(types.Blocks, 0, max)
			for _, block := range blocks[:max] {
				raw = append(raw, block.RawBlock)
			}
			// Try to inset the blocks, drop the originating peer if there's an error
			index, err := d.insertChain(raw)
			if err != nil {
1165
				glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
1166
				d.dropPeer(blocks[index].OriginPeer)
1167
				d.cancel()
1168
				return
1169 1170 1171 1172 1173 1174
			}
			blocks = blocks[max:]
		}
	}
}

1175 1176 1177
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
1178 1179 1180 1181
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
1182 1183 1184 1185
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1186

1187 1188 1189 1190 1191 1192 1193
	select {
	case d.blockCh <- blockPack{id, blocks}:
		return nil

	case <-cancel:
		return errNoSyncActive
	}
O
moved  
obscuren 已提交
1194 1195
}

1196 1197 1198 1199
// DeliverHashes injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
1200 1201 1202 1203
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
1204 1205 1206 1207
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1208

1209 1210 1211 1212 1213 1214 1215
	select {
	case d.hashCh <- hashPack{id, hashes}:
		return nil

	case <-cancel:
		return errNoSyncActive
	}
1216
}