downloader.go 38.6 KB
Newer Older
1
// Package downloader contains the manual full chain synchronisation.
2 3 4
package downloader

import (
5
	"bytes"
6
	"errors"
7
	"math"
8
	"math/rand"
9 10 11 12 13
	"sync"
	"sync/atomic"
	"time"

	"github.com/ethereum/go-ethereum/common"
14
	"github.com/ethereum/go-ethereum/core"
15
	"github.com/ethereum/go-ethereum/core/types"
16
	"github.com/ethereum/go-ethereum/event"
17 18
	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
19
	"gopkg.in/fatih/set.v0"
20 21
)

22 23 24 25 26
const (
	eth60 = 60 // Constant to check for old protocol support
	eth61 = 61 // Constant to check for new protocol support
)

27
var (
28 29 30
	MinHashFetch  = 512 // Minimum amount of hashes to not consider a peer stalling
	MaxHashFetch  = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
31

32
	hashTTL         = 5 * time.Second  // Time it takes for a hash request to time out
33 34 35
	blockSoftTTL    = 3 * time.Second  // Request completion threshold for increasing or decreasing a peer's bandwidth
	blockHardTTL    = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired
	crossCheckCycle = time.Second      // Period after which to check for expired cross checks
36

37 38 39
	maxQueuedHashes = 256 * 1024 // Maximum number of hashes to queue for import (DOS protection)
	maxBannedHashes = 4096       // Number of bannable hashes before phasing old ones out
	maxBlockProcess = 256        // Number of blocks to import at once into the chain
40
)
41

42
var (
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
	errBusy             = errors.New("busy")
	errUnknownPeer      = errors.New("peer is unknown or unhealthy")
	errBadPeer          = errors.New("action from bad peer ignored")
	errStallingPeer     = errors.New("peer is stalling")
	errBannedHead       = errors.New("peer head hash already banned")
	errNoPeers          = errors.New("no peers to keep download active")
	errPendingQueue     = errors.New("pending items in queue")
	errTimeout          = errors.New("timeout")
	errEmptyHashSet     = errors.New("empty hash set by peer")
	errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
	errAlreadyInPool    = errors.New("hash already in pool")
	errInvalidChain     = errors.New("retrieved hash chain is invalid")
	errCrossCheckFailed = errors.New("block cross-check failed")
	errCancelHashFetch  = errors.New("hash fetching canceled (requested)")
	errCancelBlockFetch = errors.New("block downloading canceled (requested)")
	errNoSyncActive     = errors.New("no sync active")
59 60
)

61
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
62
type hashCheckFn func(common.Hash) bool
63 64 65 66

// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block

67 68 69
// headRetrievalFn is a callback type for retrieving the head block from the local chain.
type headRetrievalFn func() *types.Block

70 71 72
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)

73 74
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
75

O
obscuren 已提交
76 77 78 79 80
type blockPack struct {
	peerId string
	blocks []*types.Block
}

O
obscuren 已提交
81 82 83 84 85
type hashPack struct {
	peerId string
	hashes []common.Hash
}

86 87 88 89 90
type crossCheck struct {
	expire time.Time
	parent common.Hash
}

91
type Downloader struct {
92 93
	mux *event.TypeMux

94 95 96
	queue  *queue                      // Scheduler for selecting the hashes to download
	peers  *peerSet                    // Set of active peers from which download can proceed
	checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
97
	banned *set.Set                    // Set of hashes we've received and banned
98

99 100
	interrupt int32 // Atomic boolean to signal termination

101
	// Statistics
102 103 104
	importStart time.Time // Instance when the last blocks were taken from the cache
	importQueue []*Block  // Previously taken blocks to check import progress
	importDone  int       // Number of taken blocks already imported from the last batch
105 106
	importLock  sync.Mutex

107
	// Callbacks
108 109
	hasBlock    hashCheckFn      // Checks if a block is present in the chain
	getBlock    blockRetrievalFn // Retrieves a block from the chain
110
	headBlock   headRetrievalFn  // Retrieves the head block from the chain
111
	insertChain chainInsertFn    // Injects a batch of blocks into the chain
112
	dropPeer    peerDropFn       // Drops a peer for misbehaving
113

114
	// Status
115 116
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
117
	processing      int32
118
	notified        int32
119 120 121

	// Channels
	newPeerCh chan *peer
122 123 124
	hashCh    chan hashPack  // Channel receiving inbound hashes
	blockCh   chan blockPack // Channel receiving inbound blocks
	processCh chan bool      // Channel to signal the block fetcher of new or finished work
125 126 127

	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex  // Lock to protect the cancel channel in delivers
128 129
}

130 131 132 133 134 135
// Block is an origin-tagged blockchain block.
type Block struct {
	RawBlock   *types.Block
	OriginPeer string
}

136
// New creates a new downloader to fetch hashes and blocks from remote peers.
137
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
138
	// Create the base downloader
139
	downloader := &Downloader{
140 141 142 143 144
		mux:         mux,
		queue:       newQueue(),
		peers:       newPeerSet(),
		hasBlock:    hasBlock,
		getBlock:    getBlock,
145
		headBlock:   headBlock,
146 147 148 149 150
		insertChain: insertChain,
		dropPeer:    dropPeer,
		newPeerCh:   make(chan *peer, 1),
		hashCh:      make(chan hashPack, 1),
		blockCh:     make(chan blockPack, 1),
151
		processCh:   make(chan bool, 1),
152
	}
153
	// Inject all the known bad hashes
154
	downloader.banned = set.New()
155 156 157
	for hash, _ := range core.BadHashes {
		downloader.banned.Add(hash)
	}
158 159 160
	return downloader
}

161
// Stats retrieves the current status of the downloader.
162
func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
163 164 165
	// Fetch the download status
	pending, cached = d.queue.Size()

166
	// Figure out the import progress
167 168 169
	d.importLock.Lock()
	defer d.importLock.Unlock()

170
	for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
171
		d.importQueue = d.importQueue[1:]
172
		d.importDone++
173 174 175
	}
	importing = len(d.importQueue)

176 177 178 179 180
	// Make an estimate on the total sync
	estimate = 0
	if d.importDone > 0 {
		estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
	}
181
	return
O
obscuren 已提交
182 183
}

184
// Synchronising returns whether the downloader is currently retrieving blocks.
185 186 187 188
func (d *Downloader) Synchronising() bool {
	return atomic.LoadInt32(&d.synchronising) > 0
}

189 190
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
191
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) error {
192 193 194 195 196 197
	// If the peer wants to send a banned hash, reject
	if d.banned.Has(head) {
		glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id)
		return errBannedHead
	}
	// Otherwise try to construct and register the peer
198
	glog.V(logger.Detail).Infoln("Registering peer", id)
199
	if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks)); err != nil {
200 201 202
		glog.V(logger.Error).Infoln("Register failed:", err)
		return err
	}
203 204 205
	return nil
}

206 207 208 209 210 211 212 213 214
// UnregisterPeer remove a peer from the known list, preventing any action from
// the specified peer.
func (d *Downloader) UnregisterPeer(id string) error {
	glog.V(logger.Detail).Infoln("Unregistering peer", id)
	if err := d.peers.Unregister(id); err != nil {
		glog.V(logger.Error).Infoln("Unregister failed:", err)
		return err
	}
	return nil
215 216
}

217 218 219 220 221 222 223 224 225 226 227 228
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash) {
	glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head)

	switch err := d.synchronise(id, head); err {
	case nil:
		glog.V(logger.Detail).Infof("Synchronisation completed")

	case errBusy:
		glog.V(logger.Detail).Infof("Synchronisation already in progress")

229
	case errTimeout, errBadPeer, errStallingPeer, errBannedHead, errEmptyHashSet, errPeersUnavailable, errInvalidChain, errCrossCheckFailed:
230 231 232 233 234 235 236 237 238 239 240 241
		glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
		d.dropPeer(id)

	case errPendingQueue:
		glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)

	default:
		glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
	}
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
242
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
243
// checks fail an error will be returned. This method is synchronous
244
func (d *Downloader) synchronise(id string, hash common.Hash) error {
245 246 247 248
	// Mock out the synchonisation if testing
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
249
	// Make sure only one goroutine is ever allowed past this point at once
250
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
251
		return errBusy
252
	}
253
	defer atomic.StoreInt32(&d.synchronising, 0)
254

255 256
	// If the head hash is banned, terminate immediately
	if d.banned.Has(hash) {
257
		return errBannedHead
258
	}
259 260 261 262
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
		glog.V(logger.Info).Infoln("Block synchronisation started")
	}
263
	// Abort if the queue still contains some leftover data
264
	if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
265
		return errPendingQueue
266
	}
267
	// Reset the queue and peer set to clean any internal leftover state
268
	d.queue.Reset()
269
	d.peers.Reset()
270
	d.checks = make(map[common.Hash]*crossCheck)
271

272 273 274 275 276
	// Create cancel channel for aborting mid-flight
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
	d.cancelLock.Unlock()

277
	// Retrieve the origin peer and initiate the downloading process
278
	p := d.peers.Peer(id)
279
	if p == nil {
280
		return errUnknownPeer
281
	}
282
	return d.syncWithPeer(p, hash)
283 284
}

285 286
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
287
func (d *Downloader) Has(hash common.Hash) bool {
288
	return d.queue.Has(hash)
289 290
}

291 292 293
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
O
obscuren 已提交
294
	d.mux.Post(StartEvent{})
295 296 297
	defer func() {
		// reset on error
		if err != nil {
298
			d.cancel()
299 300 301
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
302 303
		}
	}()
304

305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
	glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version)
	switch p.version {
	case eth60:
		// Old eth/60 version, use reverse hash retrieval algorithm
		if err = d.fetchHashes60(p, hash); err != nil {
			return err
		}
		if err = d.fetchBlocks60(); err != nil {
			return err
		}
	case eth61:
		// New eth/61, use forward, concurrent hash and block retrieval algorithm
		number, err := d.findAncestor(p)
		if err != nil {
			return err
		}
		errc := make(chan error, 2)
		go func() { errc <- d.fetchHashes(p, number+1) }()
		go func() { errc <- d.fetchBlocks(number + 1) }()

		// If any fetcher fails, cancel the other
		if err := <-errc; err != nil {
			d.cancel()
			<-errc
			return err
		}
		return <-errc

	default:
		// Something very wrong, stop right here
		glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
		return errBadPeer
337
	}
338
	glog.V(logger.Debug).Infoln("Synchronization completed")
339 340

	return nil
341 342
}

343
// cancel cancels all of the operations and resets the queue. It returns true
344
// if the cancel operation was completed.
345
func (d *Downloader) cancel() {
346
	// Close the current cancel channel
347
	d.cancelLock.Lock()
348 349 350 351 352 353 354
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
355 356
	}
	d.cancelLock.Unlock()
357

358
	// Reset the queue
359 360 361
	d.queue.Reset()
}

362 363 364 365 366 367
// Terminate interrupts the downloader, canceling all pending operations.
func (d *Downloader) Terminate() {
	atomic.StoreInt32(&d.interrupt, 1)
	d.cancel()
}

368
// fetchHashes60 starts retrieving hashes backwards from a specific peer and hash,
369 370
// up until it finds a common ancestor. If the source peer times out, alternative
// ones are tried for continuation.
371
func (d *Downloader) fetchHashes60(p *peer, h common.Hash) error {
O
obscuren 已提交
372
	var (
373
		start  = time.Now()
374 375 376
		active = p             // active peer will help determine the current active peer
		head   = common.Hash{} // common and last hash

377
		timeout     = time.NewTimer(0)                // timer to dump a non-responsive active peer
378 379
		attempted   = make(map[string]bool)           // attempted peers will help with retries
		crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
O
obscuren 已提交
380
	)
381
	defer crossTicker.Stop()
382 383 384 385 386 387
	defer timeout.Stop()

	glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id)
	<-timeout.C // timeout channel should be initially empty.

	getHashes := func(from common.Hash) {
388
		go active.getRelHashes(from)
389 390 391 392
		timeout.Reset(hashTTL)
	}

	// Add the hash to the queue, and start hash retrieval.
393
	d.queue.Insert([]common.Hash{h}, false)
394
	getHashes(h)
395

396 397
	attempted[p.id] = true
	for finished := false; !finished; {
398
		select {
399 400
		case <-d.cancelCh:
			return errCancelHashFetch
401

O
obscuren 已提交
402
		case hashPack := <-d.hashCh:
403
			// Make sure the active peer is giving us the hashes
404
			if hashPack.peerId != active.id {
405
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
O
obscuren 已提交
406 407
				break
			}
408
			timeout.Stop()
O
obscuren 已提交
409

410 411
			// Make sure the peer actually gave something valid
			if len(hashPack.hashes) == 0 {
412
				glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
413
				return errEmptyHashSet
414
			}
415
			for index, hash := range hashPack.hashes {
416
				if d.banned.Has(hash) {
417
					glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id)
418

419
					d.queue.Insert(hashPack.hashes[:index+1], false)
420 421 422
					if err := d.banBlocks(active.id, hash); err != nil {
						glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
					}
423
					return errInvalidChain
424 425
				}
			}
426 427
			// Determine if we're done fetching hashes (queue up all pending), and continue if not done
			done, index := false, 0
428 429
			for index, head = range hashPack.hashes {
				if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
430
					glog.V(logger.Debug).Infof("Found common hash %x", head[:4])
431
					hashPack.hashes = hashPack.hashes[:index]
432 433 434 435
					done = true
					break
				}
			}
436
			// Insert all the new hashes, but only continue if got something useful
437
			inserts := d.queue.Insert(hashPack.hashes, false)
438
			if len(inserts) == 0 && !done {
439
				glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
440
				return errBadPeer
441 442
			}
			if !done {
443
				// Check that the peer is not stalling the sync
444
				if len(inserts) < MinHashFetch {
445
					return errStallingPeer
446
				}
447
				// Try and fetch a random block to verify the hash batch
448
				// Skip the last hash as the cross check races with the next hash fetch
449 450 451
				cross := rand.Intn(len(inserts) - 1)
				origin, parent := inserts[cross], inserts[cross+1]
				glog.V(logger.Detail).Infof("Cross checking (%s) with %x/%x", active.id, origin, parent)
452

453
				d.checks[origin] = &crossCheck{
454
					expire: time.Now().Add(blockSoftTTL),
455 456
					parent: parent,
				}
457
				go active.getBlocks([]common.Hash{origin})
458

459
				// Also fetch a fresh batch of hashes
460
				getHashes(head)
461
				continue
462
			}
463
			// We're done, prepare the download cache and proceed pulling the blocks
464
			offset := uint64(0)
465
			if block := d.getBlock(head); block != nil {
466
				offset = block.NumberU64() + 1
467
			}
468
			d.queue.Prepare(offset)
469 470 471 472 473 474 475
			finished = true

		case blockPack := <-d.blockCh:
			// Cross check the block with the random verifications
			if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
				continue
			}
476
			block := blockPack.blocks[0]
477 478
			if check, ok := d.checks[block.Hash()]; ok {
				if block.ParentHash() != check.parent {
479
					return errCrossCheckFailed
480 481 482
				}
				delete(d.checks, block.Hash())
			}
483 484 485

		case <-crossTicker.C:
			// Iterate over all the cross checks and fail the hash chain if they're not verified
486 487
			for hash, check := range d.checks {
				if time.Now().After(check.expire) {
488
					glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
489
					return errCrossCheckFailed
490 491
				}
			}
492

493
		case <-timeout.C:
494
			glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request", p.id)
495

O
obscuren 已提交
496 497 498 499
			var p *peer // p will be set if a peer can be found
			// Attempt to find a new peer by checking inclusion of peers best hash in our
			// already fetched hash list. This can't guarantee 100% correctness but does
			// a fair job. This is always either correct or false incorrect.
500
			for _, peer := range d.peers.AllPeers() {
501
				if d.queue.Has(peer.head) && !attempted[peer.id] {
O
obscuren 已提交
502 503 504 505 506 507
					p = peer
					break
				}
			}
			// if all peers have been tried, abort the process entirely or if the hash is
			// the zero hash.
508
			if p == nil || (head == common.Hash{}) {
509
				return errTimeout
O
obscuren 已提交
510 511 512
			}
			// set p to the active peer. this will invalidate any hashes that may be returned
			// by our previous (delayed) peer.
513
			active = p
514
			getHashes(head)
515
			glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)", p.id)
516 517
		}
	}
518
	glog.V(logger.Debug).Infof("Downloaded hashes (%d) in %v", d.queue.Pending(), time.Since(start))
519 520 521 522

	return nil
}

523
// fetchBlocks60 iteratively downloads the entire schedules block-chain, taking
524 525
// any available peers, reserving a chunk of blocks for each, wait for delivery
// and periodically checking for timeouts.
526
func (d *Downloader) fetchBlocks60() error {
527
	glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)")
528 529
	start := time.Now()

530
	// Start a ticker to continue throttled downloads and check for bad peers
531
	ticker := time.NewTicker(20 * time.Millisecond)
532 533
	defer ticker.Stop()

534 535 536
out:
	for {
		select {
537 538
		case <-d.cancelCh:
			return errCancelBlockFetch
539

540 541 542
		case <-d.hashCh:
			// Out of bounds hashes received, ignore them

543
		case blockPack := <-d.blockCh:
544 545 546 547 548
			// Short circuit if it's a stale cross check
			if len(blockPack.blocks) == 1 {
				block := blockPack.blocks[0]
				if _, ok := d.checks[block.Hash()]; ok {
					delete(d.checks, block.Hash())
549
					break
550 551
				}
			}
552 553
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
554
			if peer := d.peers.Peer(blockPack.peerId); peer != nil {
555
				// Deliver the received chunk of blocks, and demote in case of errors
556 557 558 559 560 561 562 563 564
				err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
				switch err {
				case nil:
					// If no blocks were delivered, demote the peer (need the delivery above)
					if len(blockPack.blocks) == 0 {
						peer.Demote()
						peer.SetIdle()
						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
						break
565
					}
566
					// All was successful, promote the peer and potentially start processing
567 568 569
					peer.Promote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
570
					go d.process()
571

572
				case errInvalidChain:
573 574 575 576 577 578
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
579
					peer.Demote()
580
					peer.SetIdle()
581 582 583 584 585 586 587 588 589 590 591 592
					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: stale delivery", peer)

				default:
					// Peer did something semi-useful, demote but keep it around
593 594
					peer.Demote()
					peer.SetIdle()
595
					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
596
					go d.process()
597
				}
598
			}
599

600
		case <-ticker.C:
601 602 603 604 605
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
			// Check for block request timeouts and demote the responsible peers
606
			badPeers := d.queue.Expire(blockHardTTL)
607
			for _, pid := range badPeers {
608 609
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
610
					glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
611
				}
612
			}
613
			// If there are unrequested hashes left start fetching from the available peers
614 615 616
			if d.queue.Pending() > 0 {
				// Throttle the download if block cache is full and waiting processing
				if d.queue.Throttle() {
617
					break
618
				}
619
				// Send a download request to all idle peers, until throttled
620 621
				idlePeers := d.peers.IdlePeers()
				for _, peer := range idlePeers {
622 623 624 625
					// Short circuit if throttling activated since above
					if d.queue.Throttle() {
						break
					}
626 627
					// Get a possible chunk. If nil is returned no chunk
					// could be returned due to no hashes available.
628
					request := d.queue.Reserve(peer, peer.Capacity())
629
					if request == nil {
630 631
						continue
					}
632 633 634
					if glog.V(logger.Detail) {
						glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
					}
635 636 637
					// Fetch the chunk and check for error. If the peer was somehow
					// already fetching a chunk due to a bug, it will be returned to
					// the queue
638
					if err := peer.Fetch(request); err != nil {
639
						glog.V(logger.Error).Infof("Peer %s received double work", peer.id)
640
						d.queue.Cancel(request)
641 642
					}
				}
643
				// Make sure that we have peers available for fetching. If all peers have been tried
644
				// and all failed throw an error
645
				if d.queue.InFlight() == 0 {
646
					return errPeersUnavailable
647 648
				}

649 650
			} else if d.queue.InFlight() == 0 {
				// When there are no more queue and no more in flight, We can
651 652 653 654 655 656
				// safely assume we're done. Another part of the process will  check
				// for parent errors and will re-request anything that's missing
				break out
			}
		}
	}
657 658 659 660
	glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
	return nil
}

661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
// findAncestor tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
// In the rare scenario when we ended up on a long soft fork (i.e. none of the
// head blocks match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peer) (uint64, error) {
	glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)

	// Request out head blocks to short circuit ancestor location
	head := d.headBlock().NumberU64()
	from := int64(head) - int64(MaxHashFetch)
	if from < 0 {
		from = 0
	}
	go p.getAbsHashes(uint64(from), MaxHashFetch)

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
	timeout := time.After(hashTTL)

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
			return 0, errCancelHashFetch

		case hashPack := <-d.hashCh:
			// Discard anything not from the origin peer
			if hashPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
				break
			}
			// Make sure the peer actually gave something valid
			hashes := hashPack.hashes
			if len(hashes) == 0 {
				glog.V(logger.Debug).Infof("%v: empty head hash set", p)
				return 0, errEmptyHashSet
			}
			// Check if a common ancestor was found
			finished = true
			for i := len(hashes) - 1; i >= 0; i-- {
				if d.hasBlock(hashes[i]) {
					number, hash = uint64(from)+uint64(i), hashes[i]
					break
				}
			}

		case <-d.blockCh:
			// Out of bounds blocks received, ignore them

		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head hash timeout", p)
			return 0, errTimeout
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
		glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x]", p, number, hash[:4])
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

		timeout := time.After(hashTTL)
		go p.getAbsHashes(uint64(check), 1)

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
				return 0, errCancelHashFetch

			case hashPack := <-d.hashCh:
				// Discard anything not from the origin peer
				if hashPack.peerId != p.id {
					glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
					break
				}
				// Make sure the peer actually gave something valid
				hashes := hashPack.hashes
				if len(hashes) != 1 {
					glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
				block := d.getBlock(hashes[0])
				if block == nil {
					end = check
					break
				}
				if block.NumberU64() != check {
					glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
					return 0, errBadPeer
				}
				start = check

			case <-d.blockCh:
				// Out of bounds blocks received, ignore them

			case <-timeout:
				glog.V(logger.Debug).Infof("%v: search hash timeout", p)
				return 0, errTimeout
			}
		}
	}
	return start, nil
}

// fetchHashes keeps retrieving hashes from the requested number, until no more
// are returned, potentially throttling on the way.
func (d *Downloader) fetchHashes(p *peer, from uint64) error {
	glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)

	// Create a timeout timer, and the associated hash fetcher
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

	getHashes := func(from uint64) {
784 785
		glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)

786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814
		go p.getAbsHashes(from, MaxHashFetch)
		timeout.Reset(hashTTL)
	}
	// Start pulling hashes, until all are exhausted
	getHashes(from)
	for {
		select {
		case <-d.cancelCh:
			return errCancelHashFetch

		case hashPack := <-d.hashCh:
			// Make sure the active peer is giving us the hashes
			if hashPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
				break
			}
			timeout.Stop()

			// If no more hashes are inbound, notify the block fetcher and return
			if len(hashPack.hashes) == 0 {
				glog.V(logger.Debug).Infof("%v: no available hashes", p)

				select {
				case d.processCh <- false:
				case <-d.cancelCh:
				}
				return nil
			}
			// Otherwise insert all the new hashes, aborting in case of junk
815 816
			glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from)

817 818 819 820 821
			inserts := d.queue.Insert(hashPack.hashes, true)
			if len(inserts) != len(hashPack.hashes) {
				glog.V(logger.Debug).Infof("%v: stale hashes", p)
				return errBadPeer
			}
822 823
			// Notify the block fetcher of new hashes, but stop if queue is full
			cont := d.queue.Pending() < maxQueuedHashes
824
			select {
825
			case d.processCh <- cont:
826 827
			default:
			}
828 829 830 831
			if !cont {
				return nil
			}
			// Queue not yet full, fetch the next batch
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986
			from += uint64(len(hashPack.hashes))
			getHashes(from)

		case <-timeout.C:
			glog.V(logger.Debug).Infof("%v: hash request timed out", p)
			return errTimeout
		}
	}
}

// fetchBlocks iteratively downloads the scheduled hashes, taking any available
// peers, reserving a chunk of blocks for each, waiting for delivery and also
// periodically checking for timeouts.
func (d *Downloader) fetchBlocks(from uint64) error {
	glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
	defer glog.V(logger.Debug).Infof("Block download terminated")

	// Create a timeout timer for scheduling expiration tasks
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

	// Prepare the queue and fetch blocks until the hash fetcher's done
	d.queue.Prepare(from)
	finished := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelBlockFetch

		case blockPack := <-d.blockCh:
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
			if peer := d.peers.Peer(blockPack.peerId); peer != nil {
				// Deliver the received chunk of blocks, and demote in case of errors
				err := d.queue.Deliver(blockPack.peerId, blockPack.blocks)
				switch err {
				case nil:
					// If no blocks were delivered, demote the peer (need the delivery above)
					if len(blockPack.blocks) == 0 {
						peer.Demote()
						peer.SetIdle()
						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
						break
					}
					// All was successful, promote the peer and potentially start processing
					peer.Promote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
					go d.process()

				case errInvalidChain:
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
					peer.Demote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: stale delivery", peer)

				default:
					// Peer did something semi-useful, demote but keep it around
					peer.Demote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
					go d.process()
				}
			}
			// Blocks arrived, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case cont := <-d.processCh:
			// The hash fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Hashes arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
			// Check for block request timeouts and demote the responsible peers
			for _, pid := range d.queue.Expire(blockHardTTL) {
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
				}
			}
			// If there's noting more to fetch, wait or terminate
			if d.queue.Pending() == 0 {
				if d.queue.InFlight() == 0 && finished {
					glog.V(logger.Debug).Infof("Block fetching completed")
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
			for _, peer := range d.peers.IdlePeers() {
				// Short circuit if throttling activated
				if d.queue.Throttle() {
					break
				}
				// Reserve a chunk of hashes for a peer. A nil can mean either that
				// no more hashes are available, or that the peer is known not to
				// have them.
				request := d.queue.Reserve(peer, peer.Capacity())
				if request == nil {
					continue
				}
				if glog.V(logger.Detail) {
					glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
				}
				// Fetch the chunk and make sure any errors return the hashes to the queue
				if err := peer.Fetch(request); err != nil {
					glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
					d.queue.Cancel(request)
				}
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
			if !d.queue.Throttle() && d.queue.InFlight() == 0 {
				return errPeersUnavailable
			}
		}
	}
}

987 988 989 990 991 992 993 994 995 996 997 998 999 1000
// banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes,
// and bans the head of the retrieved batch.
//
// This method only fetches one single batch as the goal is not ban an entire
// (potentially long) invalid chain - wasting a lot of time in the meanwhile -,
// but rather to gradually build up a blacklist if the peer keeps reconnecting.
func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
	glog.V(logger.Debug).Infof("Banning a batch out of %d blocks from %s", d.queue.Pending(), peerId)

	// Ask the peer being banned for a batch of blocks from the banning point
	peer := d.peers.Peer(peerId)
	if peer == nil {
		return nil
	}
1001
	request := d.queue.Reserve(peer, MaxBlockFetch)
1002 1003 1004 1005 1006 1007 1008
	if request == nil {
		return nil
	}
	if err := peer.Fetch(request); err != nil {
		return err
	}
	// Wait a bit for the reply to arrive, and ban if done so
1009
	timeout := time.After(blockHardTTL)
1010 1011 1012 1013 1014 1015
	for {
		select {
		case <-d.cancelCh:
			return errCancelBlockFetch

		case <-timeout:
1016
			return errTimeout
1017

1018 1019 1020
		case <-d.hashCh:
			// Out of bounds hashes received, ignore them

1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039
		case blockPack := <-d.blockCh:
			blocks := blockPack.blocks

			// Short circuit if it's a stale cross check
			if len(blocks) == 1 {
				block := blocks[0]
				if _, ok := d.checks[block.Hash()]; ok {
					delete(d.checks, block.Hash())
					break
				}
			}
			// Short circuit if it's not from the peer being banned
			if blockPack.peerId != peerId {
				break
			}
			// Short circuit if no blocks were returned
			if len(blocks) == 0 {
				return errors.New("no blocks returned to ban")
			}
1040 1041
			// Reconstruct the original chain order and ensure we're banning the correct blocks
			types.BlockBy(types.Number).Sort(blocks)
1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
			if bytes.Compare(blocks[0].Hash().Bytes(), head.Bytes()) != 0 {
				return errors.New("head block not the banned one")
			}
			index := 0
			for _, block := range blocks[1:] {
				if bytes.Compare(block.ParentHash().Bytes(), blocks[index].Hash().Bytes()) != 0 {
					break
				}
				index++
			}
1052
			// Ban the head hash and phase out any excess
1053
			d.banned.Add(blocks[index].Hash())
1054
			for d.banned.Size() > maxBannedHashes {
1055 1056
				var evacuate common.Hash

1057 1058 1059 1060 1061
				d.banned.Each(func(item interface{}) bool {
					// Skip any hard coded bans
					if core.BadHashes[item.(common.Hash)] {
						return true
					}
1062
					evacuate = item.(common.Hash)
1063 1064
					return false
				})
1065
				d.banned.Remove(evacuate)
1066 1067
			}
			glog.V(logger.Debug).Infof("Banned %d blocks from: %s", index+1, peerId)
1068 1069 1070 1071 1072
			return nil
		}
	}
}

1073
// process takes blocks from the queue and tries to import them into the chain.
1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
//
// The algorithmic flow is as follows:
//  - The `processing` flag is swapped to 1 to ensure singleton access
//  - The current `cancel` channel is retrieved to detect sync abortions
//  - Blocks are iteratively taken from the cache and inserted into the chain
//  - When the cache becomes empty, insertion stops
//  - The `processing` flag is swapped back to 0
//  - A post-exit check is made whether new blocks became available
//     - This step is important: it handles a potential race condition between
//       checking for no more work, and releasing the processing "mutex". In
//       between these state changes, a block may have arrived, but a processing
//       attempt denied, so we need to re-enter to ensure the block isn't left
//       to idle in the cache.
1087
func (d *Downloader) process() {
1088 1089 1090 1091 1092 1093 1094 1095 1096
	// Make sure only one goroutine is ever allowed to process blocks at once
	if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
		return
	}
	// If the processor just exited, but there are freshly pending items, try to
	// reenter. This is needed because the goroutine spinned up for processing
	// the fresh blocks might have been rejected entry to to this present thread
	// not yet releasing the `processing` state.
	defer func() {
1097 1098
		if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
			d.process()
1099 1100
		}
	}()
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
	// Release the lock upon exit (note, before checking for reentry!), and set
	// the import statistics to zero.
	defer func() {
		d.importLock.Lock()
		d.importQueue = nil
		d.importDone = 0
		d.importLock.Unlock()

		atomic.StoreInt32(&d.processing, 0)
	}()
1111 1112 1113 1114 1115
	// Repeat the processing as long as there are blocks to import
	for {
		// Fetch the next batch of blocks
		blocks := d.queue.TakeBlocks()
		if len(blocks) == 0 {
1116
			return
1117 1118 1119 1120 1121 1122 1123 1124 1125 1126
		}
		// Reset the import statistics
		d.importLock.Lock()
		d.importStart = time.Now()
		d.importQueue = blocks
		d.importDone = 0
		d.importLock.Unlock()

		// Actually import the blocks
		glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
1127
		for len(blocks) != 0 {
1128
			// Check for any termination requests
1129
			if atomic.LoadInt32(&d.interrupt) == 1 {
1130
				return
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140
			}
			// Retrieve the first batch of blocks to insert
			max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
			raw := make(types.Blocks, 0, max)
			for _, block := range blocks[:max] {
				raw = append(raw, block.RawBlock)
			}
			// Try to inset the blocks, drop the originating peer if there's an error
			index, err := d.insertChain(raw)
			if err != nil {
1141
				glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
1142
				d.dropPeer(blocks[index].OriginPeer)
1143
				d.cancel()
1144
				return
1145 1146 1147 1148 1149 1150
			}
			blocks = blocks[max:]
		}
	}
}

1151 1152 1153
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
1154 1155 1156 1157
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
1158 1159 1160 1161
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1162

1163 1164 1165 1166 1167 1168 1169
	select {
	case d.blockCh <- blockPack{id, blocks}:
		return nil

	case <-cancel:
		return errNoSyncActive
	}
O
moved  
obscuren 已提交
1170 1171
}

1172 1173 1174 1175
// DeliverHashes injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
1176 1177 1178 1179
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
1180 1181 1182 1183
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1184

1185 1186 1187 1188 1189 1190 1191
	select {
	case d.hashCh <- hashPack{id, hashes}:
		return nil

	case <-cancel:
		return errNoSyncActive
	}
1192
}