downloader.go 60.0 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"crypto/rand"
22
	"errors"
23
	"fmt"
24
	"math"
25
	"math/big"
26
	"strings"
27 28 29 30
	"sync"
	"sync/atomic"
	"time"

31
	ethereum "github.com/ethereum/go-ethereum"
32 33
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
34
	"github.com/ethereum/go-ethereum/ethdb"
35
	"github.com/ethereum/go-ethereum/event"
36 37
	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
38
	"github.com/ethereum/go-ethereum/params"
39
	"github.com/ethereum/go-ethereum/trie"
40
	"github.com/rcrowley/go-metrics"
41 42
)

43
var (
44 45 46
	MaxHashFetch    = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
47
	MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
48 49
	MaxBodyFetch    = 128 // Amount of block bodies to be fetched per retrieval request
	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
50
	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
51

52 53 54 55 56 57
	MaxForkAncestry  = 3 * params.EpochDuration.Uint64() // Maximum chain reorganisation
	rttMinEstimate   = 2 * time.Second                   // Minimum round-trip time to target for download requests
	rttMaxEstimate   = 20 * time.Second                  // Maximum rount-trip time to target for download requests
	rttMinConfidence = 0.1                               // Worse confidence factor in our estimated RTT value
	ttlScaling       = 3                                 // Constant scaling factor for RTT -> TTL conversion
	ttlLimit         = time.Minute                       // Maximum TTL allowance to prevent reaching crazy timeouts
58 59 60 61

	qosTuningPeers   = 5    // Number of peers to tune based on (best peers)
	qosConfidenceCap = 10   // Number of peers above which not to modify RTT confidence
	qosTuningImpact  = 0.25 // Impact that a new tuning target has on the previous value
62

63 64
	maxQueuedHeaders  = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
	maxHeadersProcess = 2048      // Number of header download results to import at once into the chain
65
	maxResultsProcess = 2048      // Number of content download results to import at once into the chain
66

67 68 69
	fsHeaderCheckFrequency = 100        // Verification frequency of the downloaded headers during fast sync
	fsHeaderSafetyNet      = 2048       // Number of headers to discard in case a chain violation is detected
	fsHeaderForceVerify    = 24         // Number of headers to verify before and after the pivot to accept it
70 71
	fsPivotInterval        = 256        // Number of headers out of which to randomize the pivot point
	fsMinFullBlocks        = 64         // Number of blocks to retrieve fully even in fast sync
72
	fsCriticalTrials       = uint32(32) // Number of times to retry in the cricical section before bailing
73
)
74

75
var (
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
	errBusy                    = errors.New("busy")
	errUnknownPeer             = errors.New("peer is unknown or unhealthy")
	errBadPeer                 = errors.New("action from bad peer ignored")
	errStallingPeer            = errors.New("peer is stalling")
	errNoPeers                 = errors.New("no peers to keep download active")
	errTimeout                 = errors.New("timeout")
	errEmptyHeaderSet          = errors.New("empty header set by peer")
	errPeersUnavailable        = errors.New("no peers available or all tried for download")
	errInvalidAncestor         = errors.New("retrieved ancestor is invalid")
	errInvalidChain            = errors.New("retrieved hash chain is invalid")
	errInvalidBlock            = errors.New("retrieved block is invalid")
	errInvalidBody             = errors.New("retrieved block body is invalid")
	errInvalidReceipt          = errors.New("retrieved receipt is invalid")
	errCancelBlockFetch        = errors.New("block download canceled (requested)")
	errCancelHeaderFetch       = errors.New("block header download canceled (requested)")
	errCancelBodyFetch         = errors.New("block body download canceled (requested)")
	errCancelReceiptFetch      = errors.New("receipt download canceled (requested)")
	errCancelStateFetch        = errors.New("state data download canceled (requested)")
	errCancelHeaderProcessing  = errors.New("header processing canceled (requested)")
	errCancelContentProcessing = errors.New("content processing canceled (requested)")
	errNoSyncActive            = errors.New("no sync active")
97
	errTooOld                  = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
98 99
)

100
type Downloader struct {
101 102
	mode SyncMode       // Synchronisation mode defining the strategy used (per sync cycle)
	mux  *event.TypeMux // Event multiplexer to announce sync operation events
103

104 105
	queue *queue   // Scheduler for selecting the hashes to download
	peers *peerSet // Set of active peers from which download can proceed
106

107
	fsPivotLock  *types.Header // Pivot header on critical section entry (cannot change between retries)
108
	fsPivotFails uint32        // Number of subsequent fast sync failures in the critical section
109

110 111
	rttEstimate   uint64 // Round trip time to target for download requests
	rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
112

113
	// Statistics
114 115 116 117
	syncStatsChainOrigin uint64       // Origin block number where syncing started at
	syncStatsChainHeight uint64       // Highest block number known when syncing started
	syncStatsStateDone   uint64       // Number of state trie entries already pulled
	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
118

119
	// Callbacks
120 121 122 123 124 125 126 127 128 129 130 131 132 133
	hasHeader        headerCheckFn            // Checks if a header is present in the chain
	hasBlockAndState blockAndStateCheckFn     // Checks if a block and associated state is present in the chain
	getHeader        headerRetrievalFn        // Retrieves a header from the chain
	getBlock         blockRetrievalFn         // Retrieves a block from the chain
	headHeader       headHeaderRetrievalFn    // Retrieves the head header from the chain
	headBlock        headBlockRetrievalFn     // Retrieves the head block from the chain
	headFastBlock    headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
	commitHeadBlock  headBlockCommitterFn     // Commits a manually assembled block as the chain head
	getTd            tdRetrievalFn            // Retrieves the TD of a block from the chain
	insertHeaders    headerChainInsertFn      // Injects a batch of headers into the chain
	insertBlocks     blockChainInsertFn       // Injects a batch of blocks into the chain
	insertReceipts   receiptChainInsertFn     // Injects a batch of blocks and their receipts into the chain
	rollback         chainRollbackFn          // Removes a batch of recently added chain links
	dropPeer         peerDropFn               // Drops a peer for misbehaving
134

135
	// Status
136 137 138
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
	notified        int32
139 140

	// Channels
141
	newPeerCh     chan *peer
142 143 144 145 146 147 148 149
	headerCh      chan dataPack        // [eth/62] Channel receiving inbound block headers
	bodyCh        chan dataPack        // [eth/62] Channel receiving inbound block bodies
	receiptCh     chan dataPack        // [eth/63] Channel receiving inbound receipts
	stateCh       chan dataPack        // [eth/63] Channel receiving inbound node state data
	bodyWakeCh    chan bool            // [eth/62] Channel to signal the block body fetcher of new tasks
	receiptWakeCh chan bool            // [eth/63] Channel to signal the receipt fetcher of new tasks
	stateWakeCh   chan bool            // [eth/63] Channel to signal the state fetcher of new tasks
	headerProcCh  chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
150

151 152
	// Cancellation and termination
	cancelPeer string        // Identifier of the peer currently being used as the master (cancel on drop)
153
	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
154
	cancelLock sync.RWMutex  // Lock to protect the cancel channel and peer in delivers
155

156 157 158
	quitCh   chan struct{} // Quit channel to signal termination
	quitLock sync.RWMutex  // Lock to prevent double closes

159
	// Testing hooks
160 161 162 163
	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
164 165
}

166
// New creates a new downloader to fetch hashes and blocks from remote peers.
167
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn,
168 169 170
	getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn,
	headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn,
	insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader {
171

172
	dl := &Downloader{
173
		mode:             mode,
174 175 176
		mux:              mux,
		queue:            newQueue(stateDb),
		peers:            newPeerSet(),
177 178
		rttEstimate:      uint64(rttMaxEstimate),
		rttConfidence:    uint64(1000000),
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
		hasHeader:        hasHeader,
		hasBlockAndState: hasBlockAndState,
		getHeader:        getHeader,
		getBlock:         getBlock,
		headHeader:       headHeader,
		headBlock:        headBlock,
		headFastBlock:    headFastBlock,
		commitHeadBlock:  commitHeadBlock,
		getTd:            getTd,
		insertHeaders:    insertHeaders,
		insertBlocks:     insertBlocks,
		insertReceipts:   insertReceipts,
		rollback:         rollback,
		dropPeer:         dropPeer,
		newPeerCh:        make(chan *peer, 1),
		headerCh:         make(chan dataPack, 1),
		bodyCh:           make(chan dataPack, 1),
		receiptCh:        make(chan dataPack, 1),
		stateCh:          make(chan dataPack, 1),
		bodyWakeCh:       make(chan bool, 1),
		receiptWakeCh:    make(chan bool, 1),
		stateWakeCh:      make(chan bool, 1),
201
		headerProcCh:     make(chan []*types.Header, 1),
202
		quitCh:           make(chan struct{}),
203
	}
204 205
	go dl.qosTuner()
	return dl
206 207
}

208 209 210
// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
211
//
L
Leif Jurvetson 已提交
212
// In addition, during the state download phase of fast synchronisation the number
213 214
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
215
func (d *Downloader) Progress() ethereum.SyncProgress {
216 217 218 219
	// Fetch the pending state count outside of the lock to prevent unforeseen deadlocks
	pendingStates := uint64(d.queue.PendingNodeData())

	// Lock the current stats and return the progress
220 221
	d.syncStatsLock.RLock()
	defer d.syncStatsLock.RUnlock()
222

223 224 225 226 227 228 229 230 231
	current := uint64(0)
	switch d.mode {
	case FullSync:
		current = d.headBlock().NumberU64()
	case FastSync:
		current = d.headFastBlock().NumberU64()
	case LightSync:
		current = d.headHeader().Number.Uint64()
	}
232 233 234 235 236 237 238
	return ethereum.SyncProgress{
		StartingBlock: d.syncStatsChainOrigin,
		CurrentBlock:  current,
		HighestBlock:  d.syncStatsChainHeight,
		PulledStates:  d.syncStatsStateDone,
		KnownStates:   d.syncStatsStateDone + pendingStates,
	}
O
obscuren 已提交
239 240
}

241
// Synchronising returns whether the downloader is currently retrieving blocks.
242
func (d *Downloader) Synchronising() bool {
243
	return atomic.LoadInt32(&d.synchronising) > 0
244 245
}

246 247
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
248
func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn,
249 250
	getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
	getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
251

252
	glog.V(logger.Detail).Infoln("Registering peer", id)
253
	if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
254 255 256
		glog.V(logger.Error).Infoln("Register failed:", err)
		return err
	}
257 258
	d.qosReduceConfidence()

259 260 261
	return nil
}

262
// UnregisterPeer remove a peer from the known list, preventing any action from
263 264
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
265
func (d *Downloader) UnregisterPeer(id string) error {
266
	// Unregister the peer from the active peer set and revoke any fetch tasks
267 268 269 270 271
	glog.V(logger.Detail).Infoln("Unregistering peer", id)
	if err := d.peers.Unregister(id); err != nil {
		glog.V(logger.Error).Infoln("Unregister failed:", err)
		return err
	}
272
	d.queue.Revoke(id)
273 274 275 276 277 278 279 280 281

	// If this peer was the master peer, abort sync immediately
	d.cancelLock.RLock()
	master := id == d.cancelPeer
	d.cancelLock.RUnlock()

	if master {
		d.cancel()
	}
282
	return nil
283 284
}

285 286
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
287
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
288
	glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
289

290 291
	err := d.synchronise(id, head, td, mode)
	switch err {
292 293 294 295 296 297
	case nil:
		glog.V(logger.Detail).Infof("Synchronisation completed")

	case errBusy:
		glog.V(logger.Detail).Infof("Synchronisation already in progress")

298 299 300
	case errTimeout, errBadPeer, errStallingPeer,
		errEmptyHeaderSet, errPeersUnavailable, errTooOld,
		errInvalidAncestor, errInvalidChain:
301 302 303 304 305 306
		glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
		d.dropPeer(id)

	default:
		glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
	}
307
	return err
308 309 310
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
311
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
312
// checks fail an error will be returned. This method is synchronous
313
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
L
Leif Jurvetson 已提交
314
	// Mock out the synchronisation if testing
315 316 317
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
318
	// Make sure only one goroutine is ever allowed past this point at once
319
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
320
		return errBusy
321
	}
322
	defer atomic.StoreInt32(&d.synchronising, 0)
323

324 325 326 327
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
		glog.V(logger.Info).Infoln("Block synchronisation started")
	}
328
	// Reset the queue, peer set and wake channels to clean any internal leftover state
329
	d.queue.Reset()
330
	d.peers.Reset()
331

332
	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
333 334 335 336
		select {
		case <-ch:
		default:
		}
337
	}
338
	for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
339 340 341 342 343 344 345 346
		for empty := false; !empty; {
			select {
			case <-ch:
			default:
				empty = true
			}
		}
	}
347 348 349 350 351 352 353
	for empty := false; !empty; {
		select {
		case <-d.headerProcCh:
		default:
			empty = true
		}
	}
354
	// Create cancel channel for aborting mid-flight and mark the master peer
355 356
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
357
	d.cancelPeer = id
358 359
	d.cancelLock.Unlock()

360 361
	defer d.cancel() // No matter what, we can't leave the cancel channel open

362 363
	// Set the requested sync mode, unless it's forbidden
	d.mode = mode
364
	if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
365 366
		d.mode = FullSync
	}
367
	// Retrieve the origin peer and initiate the downloading process
368
	p := d.peers.Peer(id)
369
	if p == nil {
370
		return errUnknownPeer
371
	}
372
	return d.syncWithPeer(p, hash, td)
373 374
}

375 376
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
377
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
378
	d.mux.Post(StartEvent{})
379 380 381
	defer func() {
		// reset on error
		if err != nil {
382 383 384
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
385 386
		}
	}()
387 388 389
	if p.version < 62 {
		return errTooOld
	}
390

391
	glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
392 393 394
	defer func(start time.Time) {
		glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
	}(time.Now())
395

396 397 398 399 400 401
	// Look up the sync boundaries: the common ancestor and the target block
	latest, err := d.fetchHeight(p)
	if err != nil {
		return err
	}
	height := latest.Number.Uint64()
402

403 404 405 406 407 408 409 410 411 412
	origin, err := d.findAncestor(p, height)
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()
413

414 415 416 417 418 419 420 421 422 423 424
	// Initiate the sync using a concurrent header and content retrieval algorithm
	pivot := uint64(0)
	switch d.mode {
	case LightSync:
		pivot = height
	case FastSync:
		// Calculate the new fast/slow sync pivot point
		if d.fsPivotLock == nil {
			pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
			if err != nil {
				panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
425
			}
426 427
			if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
				pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
428
			}
429 430 431
		} else {
			// Pivot point locked in, use this and do not pick a new one!
			pivot = d.fsPivotLock.Number.Uint64()
432
		}
433 434 435 436 437 438 439
		// If the point is below the origin, move origin back to ensure state download
		if pivot < origin {
			if pivot > 0 {
				origin = pivot - 1
			} else {
				origin = 0
			}
440
		}
441 442 443 444 445
		glog.V(logger.Debug).Infof("Fast syncing until pivot block #%d", pivot)
	}
	d.queue.Prepare(origin+1, d.mode, pivot, latest)
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
446
	}
447 448 449 450 451 452 453
	return d.spawnSync(origin+1,
		func() error { return d.fetchHeaders(p, origin+1) },    // Headers are always retrieved
		func() error { return d.processHeaders(origin+1, td) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },      // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) },    // Receipts are retrieved during fast sync
		func() error { return d.fetchNodeData() },              // Node state data is retrieved during fast sync
	)
454 455 456 457
}

// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
458
func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error {
459 460 461
	var wg sync.WaitGroup
	errc := make(chan error, len(fetchers)+1)
	wg.Add(len(fetchers) + 1)
462
	go func() { defer wg.Done(); errc <- d.processContent() }()
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
	for _, fn := range fetchers {
		fn := fn
		go func() { defer wg.Done(); errc <- fn() }()
	}
	// Wait for the first error, then terminate the others.
	var err error
	for i := 0; i < len(fetchers)+1; i++ {
		if i == len(fetchers) {
			// Close the queue when all fetchers have exited.
			// This will cause the block processor to end when
			// it has processed the queue.
			d.queue.Close()
		}
		if err = <-errc; err != nil {
			break
		}
	}
	d.queue.Close()
	d.cancel()
	wg.Wait()
483 484 485 486 487

	// If sync failed in the critical section, bump the fail counter
	if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
		atomic.AddUint32(&d.fsPivotFails, 1)
	}
488
	return err
489 490
}

491
// cancel cancels all of the operations and resets the queue. It returns true
492
// if the cancel operation was completed.
493
func (d *Downloader) cancel() {
494
	// Close the current cancel channel
495
	d.cancelLock.Lock()
496 497 498 499 500 501 502
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
503 504
	}
	d.cancelLock.Unlock()
505 506
}

507
// Terminate interrupts the downloader, canceling all pending operations.
508
// The downloader cannot be reused after calling Terminate.
509
func (d *Downloader) Terminate() {
510 511 512 513 514 515 516 517 518 519
	// Close the termination channel (make sure double close is allowed)
	d.quitLock.Lock()
	select {
	case <-d.quitCh:
	default:
		close(d.quitCh)
	}
	d.quitLock.Unlock()

	// Cancel any pending download requests
520 521 522
	d.cancel()
}

523 524
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
525
func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
526 527 528
	glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)

	// Request the advertised remote head block and wait for the response
529 530
	head, _ := p.currentHead()
	go p.getRelHeaders(head, 1, 0, false)
531

532
	timeout := time.After(d.requestTTL())
533 534 535
	for {
		select {
		case <-d.cancelCh:
536
			return nil, errCancelBlockFetch
537

538
		case packet := <-d.headerCh:
539
			// Discard anything not from the origin peer
540 541
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
542 543 544
				break
			}
			// Make sure the peer actually gave something valid
545
			headers := packet.(*headerPack).headers
546 547
			if len(headers) != 1 {
				glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
548
				return nil, errBadPeer
549
			}
550
			return headers[0], nil
551

552 553
		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head header timeout", p)
554
			return nil, errTimeout
555

556
		case <-d.bodyCh:
557 558 559
		case <-d.stateCh:
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
560 561 562 563
		}
	}
}

564
// findAncestor tries to locate the common ancestor link of the local chain and
565
// a remote peers blockchain. In the general case when our node was in sync and
566
// on the correct chain, checking the top N links should already get us a match.
567
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
568
// the head links match), we do a binary search to find the common ancestor.
569
func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
570
	glog.V(logger.Debug).Infof("%v: looking for common ancestor (remote height %d)", p, height)
571

572 573
	// Figure out the valid ancestor range to prevent rewrite attacks
	floor, ceil := int64(-1), d.headHeader().Number.Uint64()
574
	if d.mode == FullSync {
575
		ceil = d.headBlock().NumberU64()
576
	} else if d.mode == FastSync {
577 578 579 580 581 582 583 584 585
		ceil = d.headFastBlock().NumberU64()
	}
	if ceil >= MaxForkAncestry {
		floor = int64(ceil - MaxForkAncestry)
	}
	// Request the topmost blocks to short circuit binary ancestor lookup
	head := ceil
	if head > height {
		head = height
586
	}
587
	from := int64(head) - int64(MaxHeaderFetch)
588 589 590
	if from < 0 {
		from = 0
	}
591 592 593 594 595 596 597
	// Span out with 15 block gaps into the future to catch bad head reports
	limit := 2 * MaxHeaderFetch / 16
	count := 1 + int((int64(ceil)-from)/16)
	if count > limit {
		count = limit
	}
	go p.getAbsHeaders(uint64(from), count, 15, false)
598 599 600

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
601
	timeout := time.After(d.requestTTL())
602 603 604 605

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
606
			return 0, errCancelHeaderFetch
607

608
		case packet := <-d.headerCh:
609
			// Discard anything not from the origin peer
610 611
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
612 613 614
				break
			}
			// Make sure the peer actually gave something valid
615
			headers := packet.(*headerPack).headers
616
			if len(headers) == 0 {
617
				glog.V(logger.Warn).Infof("%v: empty head header set", p)
618 619
				return 0, errEmptyHeaderSet
			}
620 621
			// Make sure the peer's reply conforms to the request
			for i := 0; i < len(headers); i++ {
622 623
				if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
					glog.V(logger.Warn).Infof("%v: head header set (item %d) broke chain ordering: requested %d, got %d", p, i, from+int64(i)*16, number)
624 625 626
					return 0, errInvalidChain
				}
			}
627 628 629
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
630
				// Skip any headers that underflow/overflow our requested set
631
				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
632 633 634
					continue
				}
				// Otherwise check if we already know the header or not
635
				if (d.mode == FullSync && d.hasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.hasHeader(headers[i].Hash())) {
636
					number, hash = headers[i].Number.Uint64(), headers[i].Hash()
637 638 639 640 641 642

					// If every header is known, even future ones, the peer straight out lied about its head
					if number > height && i == limit-1 {
						glog.V(logger.Warn).Infof("%v: lied about chain head: reported %d, found above %d", p, height, number)
						return 0, errStallingPeer
					}
643 644 645 646
					break
				}
			}

647 648 649 650
		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head header timeout", p)
			return 0, errTimeout

651
		case <-d.bodyCh:
652 653 654
		case <-d.stateCh:
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
655 656 657 658
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
659 660 661 662
		if int64(number) <= floor {
			glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
			return 0, errInvalidAncestor
		}
663 664 665 666 667
		glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
668 669 670
	if floor > 0 {
		start = uint64(floor)
	}
671 672 673 674
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

675
		timeout := time.After(d.requestTTL())
676 677 678 679 680 681
		go p.getAbsHeaders(uint64(check), 1, 0, false)

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
682
				return 0, errCancelHeaderFetch
683

684
			case packer := <-d.headerCh:
685
				// Discard anything not from the origin peer
686 687
				if packer.PeerId() != p.id {
					glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
688 689 690
					break
				}
				// Make sure the peer actually gave something valid
691
				headers := packer.(*headerPack).headers
692 693 694 695 696 697 698
				if len(headers) != 1 {
					glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
699
				if (d.mode == FullSync && !d.hasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
700 701 702
					end = check
					break
				}
703 704 705
				header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
				if header.Number.Uint64() != check {
					glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
706 707 708 709
					return 0, errBadPeer
				}
				start = check

710 711 712 713
			case <-timeout:
				glog.V(logger.Debug).Infof("%v: search header timeout", p)
				return 0, errTimeout

714
			case <-d.bodyCh:
715 716 717
			case <-d.stateCh:
			case <-d.receiptCh:
				// Out of bounds delivery, ignore
718 719 720
			}
		}
	}
721 722 723 724 725 726
	// Ensure valid ancestry and return
	if int64(start) <= floor {
		glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)
		return 0, errInvalidAncestor
	}
	glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, start, hash[:4])
727 728 729
	return start, nil
}

730 731 732 733 734
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
735
// other peers are only accepted if they map cleanly to the skeleton. If no one
736 737 738 739
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
	glog.V(logger.Debug).Infof("%v: directing header downloads from #%d", p, from)
740 741
	defer glog.V(logger.Debug).Infof("%v: header download terminated", p)

742 743 744
	// Create a timeout timer, and the associated header fetcher
	skeleton := true            // Skeleton assembly phase or finishing up
	request := time.Now()       // time of the last skeleton fetch request
745 746 747 748 749
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

	getHeaders := func(from uint64) {
750
		request = time.Now()
751
		timeout.Reset(d.requestTTL())
752

753 754 755 756 757 758 759
		if skeleton {
			glog.V(logger.Detail).Infof("%v: fetching %d skeleton headers from #%d", p, MaxHeaderFetch, from)
			go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
		} else {
			glog.V(logger.Detail).Infof("%v: fetching %d full headers from #%d", p, MaxHeaderFetch, from)
			go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
		}
760
	}
761
	// Start pulling the header chain skeleton until all is done
762 763 764 765 766 767 768
	getHeaders(from)

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderFetch

769
		case packet := <-d.headerCh:
770
			// Make sure the active peer is giving us the skeleton headers
771
			if packet.PeerId() != p.id {
772
				glog.V(logger.Debug).Infof("Received skeleton headers from incorrect peer (%s)", packet.PeerId())
773 774
				break
			}
775
			headerReqTimer.UpdateSince(request)
776 777
			timeout.Stop()

778 779 780 781 782 783
			// If the skeleton's finished, pull any remaining head headers directly from the origin
			if packet.Items() == 0 && skeleton {
				skeleton = false
				getHeaders(from)
				continue
			}
784
			// If no more headers are inbound, notify the content fetchers and return
785
			if packet.Items() == 0 {
786
				glog.V(logger.Debug).Infof("%v: no available headers", p)
787 788 789 790 791 792
				select {
				case d.headerProcCh <- nil:
					return nil
				case <-d.cancelCh:
					return errCancelHeaderFetch
				}
793
			}
794
			headers := packet.(*headerPack).headers
795

796 797
			// If we received a skeleton batch, resolve internals concurrently
			if skeleton {
798
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
799 800
				if err != nil {
					glog.V(logger.Debug).Infof("%v: skeleton chain invalid: %v", p, err)
801 802
					return errInvalidChain
				}
803 804
				headers = filled[proced:]
				from += uint64(proced)
805
			}
806
			// Insert all the new headers and fetch the next batch
807 808 809 810 811 812 813 814
			if len(headers) > 0 {
				glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
					return errCancelHeaderFetch
				}
				from += uint64(len(headers))
815
			}
816 817 818 819 820
			getHeaders(from)

		case <-timeout.C:
			// Header retrieval timed out, consider the peer bad and drop
			glog.V(logger.Debug).Infof("%v: header request timed out", p)
821
			headerTimeoutMeter.Mark(1)
822 823 824
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
825
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
826 827 828 829
				select {
				case ch <- false:
				case <-d.cancelCh:
				}
830
			}
831 832 833 834 835
			select {
			case d.headerProcCh <- nil:
			case <-d.cancelCh:
			}
			return errBadPeer
836 837 838 839
		}
	}
}

840 841
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
842 843 844 845 846 847 848 849
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
// The method returs the entire filled skeleton and also the number of headers
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
850 851 852 853 854 855
	glog.V(logger.Debug).Infof("Filling up skeleton from #%d", from)
	d.queue.ScheduleSkeleton(from, skeleton)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
856
			return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
857
		}
858
		expire   = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
859 860 861 862 863
		throttle = func() bool { return false }
		reserve  = func(p *peer, count int) (*fetchRequest, bool, error) {
			return d.queue.ReserveHeaders(p, count), false, nil
		}
		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
864
		capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) }
865 866 867 868 869 870 871
		setIdle  = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) }
	)
	err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
		d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "Header")

	glog.V(logger.Debug).Infof("Skeleton fill terminated: %v", err)
872 873 874

	filled, proced := d.queue.RetrieveHeaders()
	return filled, proced, err
875 876
}

877 878 879 880 881 882
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
	glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)

883
	var (
884
		deliver = func(packet dataPack) (int, error) {
885
			pack := packet.(*bodyPack)
886
			return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
887
		}
888
		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
889
		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
890
		capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) }
891
		setIdle  = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) }
892
	)
893
	err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
894 895
		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "Body")
896 897 898 899 900 901 902 903 904 905 906 907

	glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
	return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
	glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)

	var (
908
		deliver = func(packet dataPack) (int, error) {
909 910 911
			pack := packet.(*receiptPack)
			return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
		}
912
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
913
		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
914
		capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) }
915
		setIdle  = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) }
916
	)
917
	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
918
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
919
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")
920 921 922 923 924

	glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
	return err
}

925 926 927 928 929 930 931
// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
// available peers, reserving a chunk of nodes for each, waiting for delivery and
// also periodically checking for timeouts.
func (d *Downloader) fetchNodeData() error {
	glog.V(logger.Debug).Infof("Downloading node state data")

	var (
932
		deliver = func(packet dataPack) (int, error) {
933
			start := time.Now()
934
			return d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states, func(delivered int, progressed bool, err error) {
935 936
				// If the peer returned old-requested data, forgive
				if err == trie.ErrNotRequested {
937
					glog.V(logger.Debug).Infof("peer %s: replied to stale state request, forgiving", packet.PeerId())
938
					return
939
				}
940 941 942 943 944 945
				if err != nil {
					// If the node data processing failed, the root hash is very wrong, abort
					glog.V(logger.Error).Infof("peer %d: state processing failed: %v", packet.PeerId(), err)
					d.cancel()
					return
				}
946
				// Processing succeeded, notify state fetcher of continuation
947 948
				pending := d.queue.PendingNodeData()
				if pending > 0 {
949 950 951 952 953 954 955
					select {
					case d.stateWakeCh <- true:
					default:
					}
				}
				d.syncStatsLock.Lock()
				d.syncStatsStateDone += uint64(delivered)
956
				syncStatsStateDone := d.syncStatsStateDone // Thread safe copy for the log below
957 958
				d.syncStatsLock.Unlock()

959 960 961 962 963
				// If real database progress was made, reset any fast-sync pivot failure
				if progressed && atomic.LoadUint32(&d.fsPivotFails) > 1 {
					glog.V(logger.Debug).Infof("fast-sync progressed, resetting fail counter from %d", atomic.LoadUint32(&d.fsPivotFails))
					atomic.StoreUint32(&d.fsPivotFails, 1) // Don't ever reset to 0, as that will unlock the pivot block
				}
964 965
				// Log a message to the user and return
				if delivered > 0 {
966
					glog.V(logger.Info).Infof("imported %3d state entries in %9v: processed %d, pending at least %d", delivered, common.PrettyDuration(time.Since(start)), syncStatsStateDone, pending)
967
				}
968
			})
969
		}
970
		expire   = func() map[string]int { return d.queue.ExpireNodeData(d.requestTTL()) }
971 972 973 974 975
		throttle = func() bool { return false }
		reserve  = func(p *peer, count int) (*fetchRequest, bool, error) {
			return d.queue.ReserveNodeData(p, count), false, nil
		}
		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
976
		capacity = func(p *peer) int { return p.NodeDataCapacity(d.requestRTT()) }
977
		setIdle  = func(p *peer, accepted int) { p.SetNodeDataIdle(accepted) }
978
	)
979
	err := d.fetchParts(errCancelStateFetch, d.stateCh, deliver, d.stateWakeCh, expire,
980
		d.queue.PendingNodeData, d.queue.InFlightNodeData, throttle, reserve, nil, fetch,
981
		d.queue.CancelNodeData, capacity, d.peers.NodeDataIdlePeers, setIdle, "State")
982 983 984 985 986

	glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
	return err
}

987 988 989
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
//  - errCancel:   error type to return if the fetch operation is cancelled (mostly makes logging nicer)
//  - deliveryCh:  channel from which to retrieve downloaded data packets (merged from all concurrent peers)
//  - deliver:     processing callback to deliver data packets into type specific download queues (usually within `queue`)
//  - wakeCh:      notification channel for waking the fetcher when new tasks are available (or sync completed)
//  - expire:      task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
//  - pending:     task callback for the number of requests still needing download (detect completion/non-completability)
//  - inFlight:    task callback for the number of in-progress requests (wait for all active downloads to finish)
//  - throttle:    task callback to check if the processing queue is full and activate throttling (bound memory use)
//  - reserve:     task callback to reserve new download tasks to a particular peer (also signals partial completions)
//  - fetchHook:   tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
//  - fetch:       network callback to actually send a particular download request to a physical remote peer
//  - cancel:      task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
P
Péter Szilágyi 已提交
1008
//  - capacity:    network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
1009 1010 1011
//  - idle:        network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
//  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
//  - kind:        textual label of the type being downloaded to display in log mesages
1012 1013
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
	expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error),
1014
	fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int,
1015
	idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error {
1016

1017
	// Create a ticker to detect expired retrieval tasks
1018 1019 1020 1021 1022
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

1023
	// Prepare the queue and fetch block parts until the block header fetcher's done
1024 1025 1026 1027
	finished := false
	for {
		select {
		case <-d.cancelCh:
1028
			return errCancel
1029

1030
		case packet := <-deliveryCh:
1031 1032
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
1033
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
1034 1035 1036
				// Deliver the received chunk of data and check chain validity
				accepted, err := deliver(packet)
				if err == errInvalidChain {
1037
					return err
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
				}
				// Unless a peer delivered something completely else than requested (usually
				// caused by a timed out request which came through in the end), set it to
				// idle. If the delivery's stale, the peer should have already been idled.
				if err != errStaleDelivery {
					setIdle(peer, accepted)
				}
				// Issue a log to the user to see what's going on
				switch {
				case err == nil && packet.Items() == 0:
					glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
				case err == nil:
					glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
1051
				default:
1052
					glog.V(logger.Detail).Infof("%s: %s delivery failed: %v", peer, strings.ToLower(kind), err)
1053 1054 1055 1056 1057 1058 1059 1060
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

1061
		case cont := <-wakeCh:
1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
1084
			// Check for fetch request timeouts and demote the responsible peers
1085
			for pid, fails := range expire() {
1086
				if peer := d.peers.Peer(pid); peer != nil {
1087 1088 1089 1090 1091 1092 1093 1094
					// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
					// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
					// out that sync wise we need to get rid of the peer.
					//
					// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
					// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
					if fails > 2 {
1095 1096 1097 1098 1099 1100
						glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
						setIdle(peer, 0)
					} else {
						glog.V(logger.Debug).Infof("%s: stalling %s delivery, dropping", peer, strings.ToLower(kind))
						d.dropPeer(pid)
					}
1101 1102
				}
			}
1103 1104
			// If there's nothing more to fetch, wait or terminate
			if pending() == 0 {
1105
				if !inFlight() && finished {
1106
					glog.V(logger.Debug).Infof("%s fetching completed", kind)
1107 1108 1109 1110 1111
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
1112
			progressed, throttled, running := false, false, inFlight()
1113 1114 1115
			idles, total := idle()

			for _, peer := range idles {
1116
				// Short circuit if throttling activated
1117
				if throttle() {
1118 1119 1120
					throttled = true
					break
				}
1121 1122
				// Reserve a chunk of fetches for a peer. A nil can mean either that
				// no more headers are available, or that the peer is known not to
1123
				// have them.
1124
				request, progress, err := reserve(peer, capacity(peer))
1125 1126 1127
				if err != nil {
					return err
				}
1128 1129
				if progress {
					progressed = true
1130 1131 1132 1133 1134
				}
				if request == nil {
					continue
				}
				if glog.V(logger.Detail) {
1135 1136 1137
					if request.From > 0 {
						glog.Infof("%s: requesting %s(s) from #%d", peer, strings.ToLower(kind), request.From)
					} else if len(request.Headers) > 0 {
1138 1139 1140 1141
						glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
					} else {
						glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
					}
1142 1143
				}
				// Fetch the chunk and make sure any errors return the hashes to the queue
1144 1145
				if fetchHook != nil {
					fetchHook(request.Headers)
1146
				}
1147
				if err := fetch(peer, request); err != nil {
1148 1149 1150 1151 1152
					// Although we could try and make an attempt to fix this, this error really
					// means that we've double allocated a fetch task to a peer. If that is the
					// case, the internal state of the downloader and the queue is very wrong so
					// better hard crash and note the error instead of silently accumulating into
					// a much bigger issue.
F
Felix Lange 已提交
1153
					panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, strings.ToLower(kind)))
1154
				}
1155
				running = true
1156 1157 1158
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
1159
			if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
1160 1161 1162 1163 1164 1165
				return errPeersUnavailable
			}
		}
	}
}

1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
	// Calculate the pivoting point for switching from fast to slow sync
	pivot := d.queue.FastSyncPivot()

	// Keep a count of uncertain headers to roll back
	rollback := []*types.Header{}
	defer func() {
		if len(rollback) > 0 {
			// Flatten the headers and roll them back
			hashes := make([]common.Hash, len(rollback))
			for i, header := range rollback {
				hashes[i] = header.Hash()
			}
1182 1183 1184 1185 1186 1187 1188
			lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0
			if d.headFastBlock != nil {
				lastFastBlock = d.headFastBlock().Number()
			}
			if d.headBlock != nil {
				lastBlock = d.headBlock().Number()
			}
1189
			d.rollback(hashes)
1190 1191 1192 1193 1194 1195 1196
			curFastBlock, curBlock := common.Big0, common.Big0
			if d.headFastBlock != nil {
				curFastBlock = d.headFastBlock().Number()
			}
			if d.headBlock != nil {
				curBlock = d.headBlock().Number()
			}
1197
			glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)",
1198
				len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, curFastBlock, lastBlock, curBlock)
1199

1200
			// If we're already past the pivot point, this could be an attack, thread carefully
1201
			if rollback[len(rollback)-1].Number.Uint64() > pivot {
1202
				// If we didn't ever fail, lock in te pivot header (must! not! change!)
1203
				if atomic.LoadUint32(&d.fsPivotFails) == 0 {
1204 1205 1206 1207 1208 1209 1210
					for _, header := range rollback {
						if header.Number.Uint64() == pivot {
							glog.V(logger.Warn).Infof("Fast-sync critical section failure, locked pivot to header #%d [%x…]", pivot, header.Hash().Bytes()[:4])
							d.fsPivotLock = header
						}
					}
				}
1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
			}
		}
	}()

	// Wait for batches of headers to process
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderProcessing

		case headers := <-d.headerProcCh:
			// Terminate header processing if we synced up
			if len(headers) == 0 {
				// Notify everyone that headers are fully processed
				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
				}
				// If no headers were retrieved at all, the peer violated it's TD promise that it had a
				// better chain compared to ours. The only exception is if it's promised blocks were
				// already imported by other means (e.g. fecher):
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
1245 1246 1247 1248
				if d.mode != LightSync {
					if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
						return errStallingPeer
					}
1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
				}
				// If fast or light syncing, ensure promised headers are indeed delivered. This is
				// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
				// of delivering the post-pivot blocks that would flag the invalid content.
				//
				// This check cannot be executed "as is" for full imports, since blocks may still be
				// queued for processing when the header download completes. However, as long as the
				// peer gave us something useful, we're already happy/progressed (above check).
				if d.mode == FastSync || d.mode == LightSync {
					if td.Cmp(d.getTd(d.headHeader().Hash())) > 0 {
						return errStallingPeer
					}
				}
				// Disable any rollback and return
				rollback = nil
				return nil
			}
			// Otherwise split the chunk of headers into batches and process them
			gotHeaders = true

			for len(headers) > 0 {
				// Terminate if something failed in between processing chunks
				select {
				case <-d.cancelCh:
					return errCancelHeaderProcessing
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]

				// In case of header only syncing, validate the chunk immediately
				if d.mode == FastSync || d.mode == LightSync {
					// Collect the yet unknown headers to mark them as uncertain
					unknown := make([]*types.Header, 0, len(headers))
					for _, header := range chunk {
						if !d.hasHeader(header.Hash()) {
							unknown = append(unknown, header)
						}
					}
					// If we're importing pure headers, verify based on their recentness
					frequency := fsHeaderCheckFrequency
					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
						frequency = 1
					}
					if n, err := d.insertHeaders(chunk, frequency); err != nil {
						// If some headers were inserted, add them too to the rollback list
						if n > 0 {
							rollback = append(rollback, chunk[:n]...)
						}
						glog.V(logger.Debug).Infof("invalid header #%d [%x…]: %v", chunk[n].Number, chunk[n].Hash().Bytes()[:4], err)
						return errInvalidChain
					}
					// All verifications passed, store newly found uncertain headers
					rollback = append(rollback, unknown...)
					if len(rollback) > fsHeaderSafetyNet {
						rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
					}
				}
1311 1312 1313 1314 1315 1316 1317
				// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
				if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
					if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
						glog.V(logger.Warn).Infof("Pivot doesn't match locked in version: have #%v [%x…], want #%v [%x…]", pivot.Number, pivot.Hash().Bytes()[:4], d.fsPivotLock.Number, d.fsPivotLock.Hash().Bytes()[:4])
						return errInvalidChain
					}
				}
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
				// Unless we're doing light chains, schedule the headers for associated content retrieval
				if d.mode == FullSync || d.mode == FastSync {
					// If we've reached the allowed number of pending headers, stall a bit
					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
						select {
						case <-d.cancelCh:
							return errCancelHeaderProcessing
						case <-time.After(time.Second):
						}
					}
					// Otherwise insert the headers for content retrieval
					inserts := d.queue.Schedule(chunk, origin)
					if len(inserts) != len(chunk) {
						glog.V(logger.Debug).Infof("stale headers")
						return errBadPeer
					}
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
			// Signal the content downloaders of the availablility of new tasks
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
				select {
				case ch <- true:
				default:
				}
			}
		}
	}
}

// processContent takes fetch results from the queue and tries to import them
// into the chain. The type of import operation will depend on the result contents.
func (d *Downloader) processContent() error {
1352
	pivot := d.queue.FastSyncPivot()
1353
	for {
1354
		results := d.queue.WaitResults()
1355
		if len(results) == 0 {
1356
			return nil // queue empty
1357
		}
1358
		if d.chainInsertHook != nil {
1359
			d.chainInsertHook(results)
1360
		}
1361
		// Actually import the blocks
1362 1363
		if glog.V(logger.Debug) {
			first, last := results[0].Header, results[len(results)-1].Header
1364
			glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
1365 1366
		}
		for len(results) != 0 {
1367
			// Check for any termination requests
1368 1369
			select {
			case <-d.quitCh:
1370
				return errCancelContentProcessing
1371
			default:
1372
			}
1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
			// Retrieve the a batch of results to import
			var (
				blocks   = make([]*types.Block, 0, maxResultsProcess)
				receipts = make([]types.Receipts, 0, maxResultsProcess)
			)
			items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
			for _, result := range results[:items] {
				switch {
				case d.mode == FullSync:
					blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
				case d.mode == FastSync:
					blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
1385
					if result.Header.Number.Uint64() <= pivot {
1386 1387
						receipts = append(receipts, result.Receipts)
					}
1388 1389 1390 1391 1392 1393 1394 1395
				}
			}
			// Try to process the results, aborting if there's an error
			var (
				err   error
				index int
			)
			switch {
1396 1397
			case len(receipts) > 0:
				index, err = d.insertReceipts(blocks, receipts)
1398 1399
				if err == nil && blocks[len(blocks)-1].NumberU64() == pivot {
					glog.V(logger.Debug).Infof("Committing block #%d [%x…] as the new head", blocks[len(blocks)-1].Number(), blocks[len(blocks)-1].Hash().Bytes()[:4])
1400
					index, err = len(blocks)-1, d.commitHeadBlock(blocks[len(blocks)-1].Hash())
1401 1402 1403
				}
			default:
				index, err = d.insertBlocks(blocks)
1404 1405
			}
			if err != nil {
1406
				glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
1407
				return errInvalidChain
1408
			}
1409 1410
			// Shift the results to the next batch
			results = results[items:]
1411 1412 1413 1414
		}
	}
}

L
Leif Jurvetson 已提交
1415
// DeliverHeaders injects a new batch of block headers received from a remote
1416
// node into the download schedule.
1417
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
1418
	return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
1419 1420 1421
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
1422
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
1423
	return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
1424 1425 1426 1427
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
1428 1429 1430 1431 1432 1433 1434 1435 1436 1437
	return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}

// DeliverNodeData injects a new batch of node state data received from a remote node.
func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
	return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}

// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
1438
	// Update the delivery metrics for both good and failed deliveries
1439
	inMeter.Mark(int64(packet.Items()))
1440 1441
	defer func() {
		if err != nil {
1442
			dropMeter.Mark(int64(packet.Items()))
1443 1444 1445 1446 1447 1448
		}
	}()
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1449 1450 1451
	if cancel == nil {
		return errNoSyncActive
	}
1452
	select {
1453
	case destCh <- packet:
1454 1455 1456 1457
		return nil
	case <-cancel:
		return errNoSyncActive
	}
1458
}
1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529

// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
	for {
		// Retrieve the current median RTT and integrate into the previoust target RTT
		rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
		atomic.StoreUint64(&d.rttEstimate, uint64(rtt))

		// A new RTT cycle passed, increase our confidence in the estimated RTT
		conf := atomic.LoadUint64(&d.rttConfidence)
		conf = conf + (1000000-conf)/2
		atomic.StoreUint64(&d.rttConfidence, conf)

		// Log the new QoS values and sleep until the next RTT
		glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
		select {
		case <-d.quitCh:
			return
		case <-time.After(rtt):
		}
	}
}

// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
	// If we have a single peer, confidence is always 1
	peers := uint64(d.peers.Len())
	if peers == 1 {
		atomic.StoreUint64(&d.rttConfidence, 1000000)
		return
	}
	// If we have a ton of peers, don't drop confidence)
	if peers >= uint64(qosConfidenceCap) {
		return
	}
	// Otherwise drop the confidence factor
	conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
	if float64(conf)/1000000 < rttMinConfidence {
		conf = uint64(rttMinConfidence * 1000000)
	}
	atomic.StoreUint64(&d.rttConfidence, conf)

	rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
	glog.V(logger.Debug).Infof("Quality of service: rtt %v, conf %.3f, ttl %v", rtt, float64(conf)/1000000.0, d.requestTTL())
}

// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
// be adapted to, but smaller ones are preffered (stabler download stream).
func (d *Downloader) requestRTT() time.Duration {
	return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}

// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
	var (
		rtt  = time.Duration(atomic.LoadUint64(&d.rttEstimate))
		conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
	)
	ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
	if ttl > ttlLimit {
		ttl = ttlLimit
	}
	return ttl
}