downloader.go 77.3 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"errors"
22
	"fmt"
23
	"math/big"
24 25 26 27
	"sync"
	"sync/atomic"
	"time"

28
	"github.com/ethereum/go-ethereum"
29
	"github.com/ethereum/go-ethereum/common"
30
	"github.com/ethereum/go-ethereum/core/rawdb"
31
	"github.com/ethereum/go-ethereum/core/types"
32
	"github.com/ethereum/go-ethereum/eth/protocols/snap"
33
	"github.com/ethereum/go-ethereum/ethdb"
34
	"github.com/ethereum/go-ethereum/event"
35
	"github.com/ethereum/go-ethereum/log"
36
	"github.com/ethereum/go-ethereum/metrics"
37
	"github.com/ethereum/go-ethereum/params"
38
	"github.com/ethereum/go-ethereum/trie"
39 40
)

41
var (
42 43
	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
44
	MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
45
	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
46
	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
47

48 49 50 51 52
	rttMinEstimate   = 2 * time.Second  // Minimum round-trip time to target for download requests
	rttMaxEstimate   = 20 * time.Second // Maximum round-trip time to target for download requests
	rttMinConfidence = 0.1              // Worse confidence factor in our estimated RTT value
	ttlScaling       = 3                // Constant scaling factor for RTT -> TTL conversion
	ttlLimit         = time.Minute      // Maximum TTL allowance to prevent reaching crazy timeouts
53 54 55 56

	qosTuningPeers   = 5    // Number of peers to tune based on (best peers)
	qosConfidenceCap = 10   // Number of peers above which not to modify RTT confidence
	qosTuningImpact  = 0.25 // Impact that a new tuning target has on the previous value
57

58 59 60 61 62
	maxQueuedHeaders            = 32 * 1024                         // [eth/62] Maximum number of headers to queue for import (DOS protection)
	maxHeadersProcess           = 2048                              // Number of header download results to import at once into the chain
	maxResultsProcess           = 2048                              // Number of content download results to import at once into the chain
	fullMaxForkAncestry  uint64 = params.FullImmutabilityThreshold  // Maximum chain reorganisation (locally redeclared so tests can reduce it)
	lightMaxForkAncestry uint64 = params.LightImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
63

64 65 66
	reorgProtThreshold   = 48 // Threshold number of recent blocks to disable mini reorg protection
	reorgProtHeaderDelay = 2  // Number of headers to delay delivering to cover mini reorgs

67 68 69 70 71
	fsHeaderCheckFrequency = 100             // Verification frequency of the downloaded headers during fast sync
	fsHeaderSafetyNet      = 2048            // Number of headers to discard in case a chain violation is detected
	fsHeaderForceVerify    = 24              // Number of headers to verify before and after the pivot to accept it
	fsHeaderContCheck      = 3 * time.Second // Time interval to check for header continuations during state download
	fsMinFullBlocks        = 64              // Number of blocks to retrieve fully even in fast sync
72
)
73

74
var (
75 76 77 78
	errBusy                    = errors.New("busy")
	errUnknownPeer             = errors.New("peer is unknown or unhealthy")
	errBadPeer                 = errors.New("action from bad peer ignored")
	errStallingPeer            = errors.New("peer is stalling")
79
	errUnsyncedPeer            = errors.New("unsynced peer")
80 81 82 83 84 85 86 87 88 89
	errNoPeers                 = errors.New("no peers to keep download active")
	errTimeout                 = errors.New("timeout")
	errEmptyHeaderSet          = errors.New("empty header set by peer")
	errPeersUnavailable        = errors.New("no peers available or all tried for download")
	errInvalidAncestor         = errors.New("retrieved ancestor is invalid")
	errInvalidChain            = errors.New("retrieved hash chain is invalid")
	errInvalidBody             = errors.New("retrieved block body is invalid")
	errInvalidReceipt          = errors.New("retrieved receipt is invalid")
	errCancelStateFetch        = errors.New("state data download canceled (requested)")
	errCancelContentProcessing = errors.New("content processing canceled (requested)")
90
	errCanceled                = errors.New("syncing canceled (requested)")
91
	errNoSyncActive            = errors.New("no sync active")
92
	errTooOld                  = errors.New("peer's protocol version too old")
93
	errNoAncestorFound         = errors.New("no common ancestor found")
94 95
)

96
type Downloader struct {
97 98 99 100 101 102 103
	// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
	// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
	// guaranteed to be so aligned, so take advantage of that. For more information,
	// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
	rttEstimate   uint64 // Round trip time to target for download requests
	rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)

104
	mode uint32         // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
105
	mux  *event.TypeMux // Event multiplexer to announce sync operation events
106

107 108 109 110
	checkpoint uint64   // Checkpoint block number to enforce head against (e.g. fast sync)
	genesis    uint64   // Genesis block number to limit sync to (e.g. light client CHT)
	queue      *queue   // Scheduler for selecting the hashes to download
	peers      *peerSet // Set of active peers from which download can proceed
111 112

	stateDB    ethdb.Database  // Database to state sync into (and deduplicate via)
113
	stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
114

115
	// Statistics
116 117 118
	syncStatsChainOrigin uint64 // Origin block number where syncing started at
	syncStatsChainHeight uint64 // Highest block number known when syncing started
	syncStatsState       stateSyncStats
119
	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
120

121
	lightchain LightChain
122
	blockchain BlockChain
123

124
	// Callbacks
125
	dropPeer peerDropFn // Drops a peer for misbehaving
126

127
	// Status
128 129 130
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
	notified        int32
131
	committed       int32
132
	ancientLimit    uint64 // The maximum block number which can be regarded as ancient data.
133 134

	// Channels
135 136 137 138 139 140
	headerCh      chan dataPack        // Channel receiving inbound block headers
	bodyCh        chan dataPack        // Channel receiving inbound block bodies
	receiptCh     chan dataPack        // Channel receiving inbound receipts
	bodyWakeCh    chan bool            // Channel to signal the block body fetcher of new tasks
	receiptWakeCh chan bool            // Channel to signal the receipt fetcher of new tasks
	headerProcCh  chan []*types.Header // Channel to feed the header processor new tasks
141

142 143 144 145
	// State sync
	pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
	pivotLock   sync.RWMutex  // Lock protecting pivot header reads from updates

146 147
	snapSync       bool         // Whether to run state sync over the snap protocol
	SnapSyncer     *snap.Syncer // TODO(karalabe): make private! hack for now
148 149
	stateSyncStart chan *stateSync
	trackStateReq  chan *stateReq
150
	stateCh        chan dataPack // Channel receiving inbound node state data
151

152
	// Cancellation and termination
153 154 155 156
	cancelPeer string         // Identifier of the peer currently being used as the master (cancel on drop)
	cancelCh   chan struct{}  // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex   // Lock to protect the cancel channel and peer in delivers
	cancelWg   sync.WaitGroup // Make sure all fetcher goroutines have exited.
157

158
	quitCh   chan struct{} // Quit channel to signal termination
159
	quitLock sync.Mutex    // Lock to prevent double closes
160

161
	// Testing hooks
162 163 164 165
	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
166 167
}

N
Nick Johnson 已提交
168
// LightChain encapsulates functions required to synchronise a light chain.
169 170
type LightChain interface {
	// HasHeader verifies a header's presence in the local chain.
171
	HasHeader(common.Hash, uint64) bool
172 173 174 175 176 177 178

	// GetHeaderByHash retrieves a header from the local chain.
	GetHeaderByHash(common.Hash) *types.Header

	// CurrentHeader retrieves the head header from the local chain.
	CurrentHeader() *types.Header

179 180
	// GetTd returns the total difficulty of a local block.
	GetTd(common.Hash, uint64) *big.Int
181 182 183 184

	// InsertHeaderChain inserts a batch of headers into the local chain.
	InsertHeaderChain([]*types.Header, int) (int, error)

185 186
	// SetHead rewinds the local chain to a new head.
	SetHead(uint64) error
187 188
}

N
Nick Johnson 已提交
189
// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
190 191 192
type BlockChain interface {
	LightChain

193 194
	// HasBlock verifies a block's presence in the local chain.
	HasBlock(common.Hash, uint64) bool
195

196 197 198
	// HasFastBlock verifies a fast block's presence in the local chain.
	HasFastBlock(common.Hash, uint64) bool

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
	// GetBlockByHash retrieves a block from the local chain.
	GetBlockByHash(common.Hash) *types.Block

	// CurrentBlock retrieves the head block from the local chain.
	CurrentBlock() *types.Block

	// CurrentFastBlock retrieves the head fast block from the local chain.
	CurrentFastBlock() *types.Block

	// FastSyncCommitHead directly commits the head block to a certain entity.
	FastSyncCommitHead(common.Hash) error

	// InsertChain inserts a batch of blocks into the local chain.
	InsertChain(types.Blocks) (int, error)

	// InsertReceiptChain inserts a batch of receipts into the local chain.
215
	InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
216 217
}

218
// New creates a new downloader to fetch hashes and blocks from remote peers.
219
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
220 221 222
	if lightchain == nil {
		lightchain = chain
	}
223
	dl := &Downloader{
224
		stateDB:        stateDb,
225
		stateBloom:     stateBloom,
226
		mux:            mux,
227
		checkpoint:     checkpoint,
228
		queue:          newQueue(blockCacheMaxItems, blockCacheInitialItems),
229 230 231
		peers:          newPeerSet(),
		rttEstimate:    uint64(rttMaxEstimate),
		rttConfidence:  uint64(1000000),
232
		blockchain:     chain,
233 234 235 236 237 238 239 240 241 242
		lightchain:     lightchain,
		dropPeer:       dropPeer,
		headerCh:       make(chan dataPack, 1),
		bodyCh:         make(chan dataPack, 1),
		receiptCh:      make(chan dataPack, 1),
		bodyWakeCh:     make(chan bool, 1),
		receiptWakeCh:  make(chan bool, 1),
		headerProcCh:   make(chan []*types.Header, 1),
		quitCh:         make(chan struct{}),
		stateCh:        make(chan dataPack),
243
		SnapSyncer:     snap.NewSyncer(stateDb, stateBloom),
244
		stateSyncStart: make(chan *stateSync),
245
		syncStatsState: stateSyncStats{
246
			processed: rawdb.ReadFastTrieProgress(stateDb),
247 248
		},
		trackStateReq: make(chan *stateReq),
249
	}
250
	go dl.qosTuner()
251
	go dl.stateFetcher()
252
	return dl
253 254
}

255 256 257
// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
258
//
L
Leif Jurvetson 已提交
259
// In addition, during the state download phase of fast synchronisation the number
260 261
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
262
func (d *Downloader) Progress() ethereum.SyncProgress {
263
	// Lock the current stats and return the progress
264 265
	d.syncStatsLock.RLock()
	defer d.syncStatsLock.RUnlock()
266

267
	current := uint64(0)
268
	mode := d.getMode()
269
	switch {
270
	case d.blockchain != nil && mode == FullSync:
271
		current = d.blockchain.CurrentBlock().NumberU64()
272
	case d.blockchain != nil && mode == FastSync:
273
		current = d.blockchain.CurrentFastBlock().NumberU64()
274
	case d.lightchain != nil:
275
		current = d.lightchain.CurrentHeader().Number.Uint64()
276
	default:
277
		log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
278
	}
279 280 281 282
	return ethereum.SyncProgress{
		StartingBlock: d.syncStatsChainOrigin,
		CurrentBlock:  current,
		HighestBlock:  d.syncStatsChainHeight,
283 284
		PulledStates:  d.syncStatsState.processed,
		KnownStates:   d.syncStatsState.processed + d.syncStatsState.pending,
285
	}
O
obscuren 已提交
286 287
}

288
// Synchronising returns whether the downloader is currently retrieving blocks.
289
func (d *Downloader) Synchronising() bool {
290
	return atomic.LoadInt32(&d.synchronising) > 0
291 292
}

293 294
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
295 296 297 298 299 300
func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
	var logger log.Logger
	if len(id) < 16 {
		// Tests use short IDs, don't choke on them
		logger = log.New("peer", id)
	} else {
301
		logger = log.New("peer", id[:8])
302
	}
303
	logger.Trace("Registering sync peer")
304
	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
305
		logger.Error("Failed to register sync peer", "err", err)
306 307
		return err
	}
308 309
	d.qosReduceConfidence()

310 311 312
	return nil
}

N
Nick Johnson 已提交
313
// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
314
func (d *Downloader) RegisterLightPeer(id string, version uint, peer LightPeer) error {
315 316 317
	return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
}

318
// UnregisterPeer remove a peer from the known list, preventing any action from
319 320
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
321
func (d *Downloader) UnregisterPeer(id string) error {
322
	// Unregister the peer from the active peer set and revoke any fetch tasks
323 324 325 326 327
	var logger log.Logger
	if len(id) < 16 {
		// Tests use short IDs, don't choke on them
		logger = log.New("peer", id)
	} else {
328
		logger = log.New("peer", id[:8])
329
	}
330
	logger.Trace("Unregistering sync peer")
331
	if err := d.peers.Unregister(id); err != nil {
332
		logger.Error("Failed to unregister sync peer", "err", err)
333 334
		return err
	}
335
	d.queue.Revoke(id)
336

337
	return nil
338 339
}

340 341
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
342 343
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
	err := d.synchronise(id, head, td, mode)
344

345
	switch err {
346 347 348
	case nil, errBusy, errCanceled:
		return err
	}
349

350 351 352
	if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
		errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
		errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
353 354 355 356 357 358 359 360 361 362
		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
		if d.dropPeer == nil {
			// The dropPeer method is nil when `--copydb` is used for a local copy.
			// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
			log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
		} else {
			d.dropPeer(id)
		}
		return err
	}
363
	log.Warn("Synchronisation failed, retrying", "err", err)
364
	return err
365 366 367
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
368
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
369
// checks fail an error will be returned. This method is synchronous
370
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
L
Leif Jurvetson 已提交
371
	// Mock out the synchronisation if testing
372 373 374
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
375
	// Make sure only one goroutine is ever allowed past this point at once
376
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
377
		return errBusy
378
	}
379
	defer atomic.StoreInt32(&d.synchronising, 0)
380

381 382
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
383
		log.Info("Block synchronisation started")
384
	}
385
	// If we are already full syncing, but have a fast-sync bloom filter laying
386
	// around, make sure it doesn't use memory any more. This is a special case
387 388 389 390
	// when the user attempts to fast sync a new empty network.
	if mode == FullSync && d.stateBloom != nil {
		d.stateBloom.Close()
	}
391 392 393 394 395 396 397 398 399 400
	// If snap sync was requested, create the snap scheduler and switch to fast
	// sync mode. Long term we could drop fast sync or merge the two together,
	// but until snap becomes prevalent, we should support both. TODO(karalabe).
	if mode == SnapSync {
		if !d.snapSync {
			log.Warn("Enabling snapshot sync prototype")
			d.snapSync = true
		}
		mode = FastSync
	}
401
	// Reset the queue, peer set and wake channels to clean any internal leftover state
402
	d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
403
	d.peers.Reset()
404

405
	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
406 407 408 409
		select {
		case <-ch:
		default:
		}
410
	}
411
	for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
412 413 414 415 416 417 418 419
		for empty := false; !empty; {
			select {
			case <-ch:
			default:
				empty = true
			}
		}
	}
420 421 422 423 424 425 426
	for empty := false; !empty; {
		select {
		case <-d.headerProcCh:
		default:
			empty = true
		}
	}
427
	// Create cancel channel for aborting mid-flight and mark the master peer
428 429
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
430
	d.cancelPeer = id
431 432
	d.cancelLock.Unlock()

433
	defer d.Cancel() // No matter what, we can't leave the cancel channel open
434

435 436
	// Atomically set the requested sync mode
	atomic.StoreUint32(&d.mode, uint32(mode))
437

438
	// Retrieve the origin peer and initiate the downloading process
439
	p := d.peers.Peer(id)
440
	if p == nil {
441
		return errUnknownPeer
442
	}
443
	return d.syncWithPeer(p, hash, td)
444 445
}

446 447 448 449
func (d *Downloader) getMode() SyncMode {
	return SyncMode(atomic.LoadUint32(&d.mode))
}

450 451
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
452
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
453
	d.mux.Post(StartEvent{})
454 455 456
	defer func() {
		// reset on error
		if err != nil {
457 458
			d.mux.Post(FailedEvent{err})
		} else {
459 460
			latest := d.lightchain.CurrentHeader()
			d.mux.Post(DoneEvent{latest})
461 462
		}
	}()
463
	if p.version < 64 {
464
		return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, 64)
465
	}
466
	mode := d.getMode()
467

468
	log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
469
	defer func(start time.Time) {
470
		log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
471
	}(time.Now())
472

473
	// Look up the sync boundaries: the common ancestor and the target block
474
	latest, pivot, err := d.fetchHead(p)
475 476 477
	if err != nil {
		return err
	}
478 479 480 481 482 483 484
	if mode == FastSync && pivot == nil {
		// If no pivot block was returned, the head is below the min full block
		// threshold (i.e. new chian). In that case we won't really fast sync
		// anyway, but still need a valid pivot block to avoid some code hitting
		// nil panics on an access.
		pivot = d.blockchain.CurrentBlock().Header()
	}
485
	height := latest.Number.Uint64()
486

487
	origin, err := d.findAncestor(p, latest)
488 489 490 491 492 493 494 495 496
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()
497

498
	// Ensure our origin point is below any fast sync pivot point
499
	if mode == FastSync {
500 501
		if height <= uint64(fsMinFullBlocks) {
			origin = 0
502
		} else {
503 504 505
			pivotNumber := pivot.Number.Uint64()
			if pivotNumber <= origin {
				origin = pivotNumber - 1
506
			}
507 508
			// Write out the pivot into the database so a rollback beyond it will
			// reenable fast sync
509
			rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
510
		}
511
	}
512
	d.committed = 1
513
	if mode == FastSync && pivot.Number.Uint64() != 0 {
514 515
		d.committed = 0
	}
516
	if mode == FastSync {
517
		// Set the ancient data limitation.
518 519 520
		// If we are running fast sync, all block data older than ancientLimit will be
		// written to the ancient store. More recent data will be written to the active
		// database and will wait for the freezer to migrate.
521
		//
522 523 524
		// If there is a checkpoint available, then calculate the ancientLimit through
		// that. Otherwise calculate the ancient limit through the advertised height
		// of the remote peer.
525
		//
526 527 528 529 530
		// The reason for picking checkpoint first is that a malicious peer can give us
		// a fake (very high) height, forcing the ancient limit to also be very high.
		// The peer would start to feed us valid blocks until head, resulting in all of
		// the blocks might be written into the ancient store. A following mini-reorg
		// could cause issues.
531
		if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
532
			d.ancientLimit = d.checkpoint
533 534
		} else if height > fullMaxForkAncestry+1 {
			d.ancientLimit = height - fullMaxForkAncestry - 1
535 536
		} else {
			d.ancientLimit = 0
537 538
		}
		frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
539

540 541 542 543 544 545 546 547 548 549
		// If a part of blockchain data has already been written into active store,
		// disable the ancient style insertion explicitly.
		if origin >= frozen && frozen != 0 {
			d.ancientLimit = 0
			log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
		} else if d.ancientLimit > 0 {
			log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
		}
		// Rewind the ancient store and blockchain if reorg happens.
		if origin+1 < frozen {
550 551
			if err := d.lightchain.SetHead(origin + 1); err != nil {
				return err
552 553 554
			}
		}
	}
555
	// Initiate the sync using a concurrent header and content retrieval algorithm
556
	d.queue.Prepare(origin+1, mode)
557 558
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
559
	}
560
	fetchers := []func() error{
561 562 563 564
		func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },   // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
		func() error { return d.processHeaders(origin+1, td) },
565
	}
566
	if mode == FastSync {
567 568 569 570 571
		d.pivotLock.Lock()
		d.pivotHeader = pivot
		d.pivotLock.Unlock()

		fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
572
	} else if mode == FullSync {
573 574
		fetchers = append(fetchers, d.processFullSyncContent)
	}
575
	return d.spawnSync(fetchers)
576 577 578 579
}

// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
580 581
func (d *Downloader) spawnSync(fetchers []func() error) error {
	errc := make(chan error, len(fetchers))
582
	d.cancelWg.Add(len(fetchers))
583 584
	for _, fn := range fetchers {
		fn := fn
585
		go func() { defer d.cancelWg.Done(); errc <- fn() }()
586 587 588
	}
	// Wait for the first error, then terminate the others.
	var err error
589 590
	for i := 0; i < len(fetchers); i++ {
		if i == len(fetchers)-1 {
591 592 593 594 595
			// Close the queue when all fetchers have exited.
			// This will cause the block processor to end when
			// it has processed the queue.
			d.queue.Close()
		}
596
		if err = <-errc; err != nil && err != errCanceled {
597 598 599 600
			break
		}
	}
	d.queue.Close()
601
	d.Cancel()
602
	return err
603 604
}

605 606 607 608
// cancel aborts all of the operations and resets the queue. However, cancel does
// not wait for the running download goroutines to finish. This method should be
// used when cancelling the downloads from inside the downloader.
func (d *Downloader) cancel() {
609
	// Close the current cancel channel
610
	d.cancelLock.Lock()
611 612
	defer d.cancelLock.Unlock()

613 614 615 616 617 618 619
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
620
	}
621 622 623 624 625 626
}

// Cancel aborts all of the operations and waits for all download goroutines to
// finish before returning.
func (d *Downloader) Cancel() {
	d.cancel()
627
	d.cancelWg.Wait()
628 629
}

630
// Terminate interrupts the downloader, canceling all pending operations.
631
// The downloader cannot be reused after calling Terminate.
632
func (d *Downloader) Terminate() {
633 634 635 636 637 638 639
	// Close the termination channel (make sure double close is allowed)
	d.quitLock.Lock()
	select {
	case <-d.quitCh:
	default:
		close(d.quitCh)
	}
640 641 642
	if d.stateBloom != nil {
		d.stateBloom.Close()
	}
643 644 645
	d.quitLock.Unlock()

	// Cancel any pending download requests
646
	d.Cancel()
647 648
}

649 650 651 652 653
// fetchHead retrieves the head header and prior pivot block (if available) from
// a remote peer.
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
	p.log.Debug("Retrieving remote chain head")
	mode := d.getMode()
654 655

	// Request the advertised remote head block and wait for the response
656 657 658 659 660 661
	latest, _ := p.peer.Head()
	fetch := 1
	if mode == FastSync {
		fetch = 2 // head + pivot headers
	}
	go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)
662

663 664
	ttl := d.requestTTL()
	timeout := time.After(ttl)
665 666 667
	for {
		select {
		case <-d.cancelCh:
668
			return nil, nil, errCanceled
669

670
		case packet := <-d.headerCh:
671
			// Discard anything not from the origin peer
672
			if packet.PeerId() != p.id {
673
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
674 675
				break
			}
676
			// Make sure the peer gave us at least one and at most the requested headers
677
			headers := packet.(*headerPack).headers
678 679
			if len(headers) == 0 || len(headers) > fetch {
				return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
680
			}
681 682 683
			// The first header needs to be the head, validate against the checkpoint
			// and request. If only 1 header was returned, make sure there's no pivot
			// or there was not one requested.
684
			head := headers[0]
685
			if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
686 687 688 689 690 691 692 693 694 695 696 697 698 699
				return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
			}
			if len(headers) == 1 {
				if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
					return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
				}
				p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
				return head, nil, nil
			}
			// At this point we have 2 headers in total and the first is the
			// validated head of the chian. Check the pivot number and return,
			pivot := headers[1]
			if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
				return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
700
			}
701
			return head, pivot, nil
702

703
		case <-timeout:
P
Péter Szilágyi 已提交
704
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
705
			return nil, nil, errTimeout
706

707
		case <-d.bodyCh:
708 709
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
710 711 712 713
		}
	}
}

714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736
// calculateRequestSpan calculates what headers to request from a peer when trying to determine the
// common ancestor.
// It returns parameters to be used for peer.RequestHeadersByNumber:
//  from - starting block number
//  count - number of headers to request
//  skip - number of headers to skip
// and also returns 'max', the last block which is expected to be returned by the remote peers,
// given the (from,count,skip)
func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) {
	var (
		from     int
		count    int
		MaxCount = MaxHeaderFetch / 16
	)
	// requestHead is the highest block that we will ask for. If requestHead is not offset,
	// the highest block that we will get is 16 blocks back from head, which means we
	// will fetch 14 or 15 blocks unnecessarily in the case the height difference
	// between us and the peer is 1-2 blocks, which is most common
	requestHead := int(remoteHeight) - 1
	if requestHead < 0 {
		requestHead = 0
	}
	// requestBottom is the lowest block we want included in the query
737
	// Ideally, we want to include the one just below our own head
738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765
	requestBottom := int(localHeight - 1)
	if requestBottom < 0 {
		requestBottom = 0
	}
	totalSpan := requestHead - requestBottom
	span := 1 + totalSpan/MaxCount
	if span < 2 {
		span = 2
	}
	if span > 16 {
		span = 16
	}

	count = 1 + totalSpan/span
	if count > MaxCount {
		count = MaxCount
	}
	if count < 2 {
		count = 2
	}
	from = requestHead - (count-1)*span
	if from < 0 {
		from = 0
	}
	max := from + (count-1)*span
	return int64(from), count, span - 1, uint64(max)
}

766
// findAncestor tries to locate the common ancestor link of the local chain and
767
// a remote peers blockchain. In the general case when our node was in sync and
768
// on the correct chain, checking the top N links should already get us a match.
769
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
770
// the head links match), we do a binary search to find the common ancestor.
771
func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
772
	// Figure out the valid ancestor range to prevent rewrite attacks
773 774 775 776 777
	var (
		floor        = int64(-1)
		localHeight  uint64
		remoteHeight = remoteHeader.Number.Uint64()
	)
778 779
	mode := d.getMode()
	switch mode {
780 781 782 783 784 785
	case FullSync:
		localHeight = d.blockchain.CurrentBlock().NumberU64()
	case FastSync:
		localHeight = d.blockchain.CurrentFastBlock().NumberU64()
	default:
		localHeight = d.lightchain.CurrentHeader().Number.Uint64()
786
	}
787
	p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
788 789

	// Recap floor value for binary search
790 791 792 793
	maxForkAncestry := fullMaxForkAncestry
	if d.getMode() == LightSync {
		maxForkAncestry = lightMaxForkAncestry
	}
794
	if localHeight >= maxForkAncestry {
795
		// We're above the max reorg threshold, find the earliest fork point
796
		floor = int64(localHeight - maxForkAncestry)
797 798 799
	}
	// If we're doing a light sync, ensure the floor doesn't go below the CHT, as
	// all headers before that point will be missing.
800
	if mode == LightSync {
801
		// If we don't know the current CHT position, find it
802 803 804 805 806 807
		if d.genesis == 0 {
			header := d.lightchain.CurrentHeader()
			for header != nil {
				d.genesis = header.Number.Uint64()
				if floor >= int64(d.genesis)-1 {
					break
808
				}
809
				header = d.lightchain.GetHeaderByHash(header.ParentHash)
810
			}
811 812 813 814
		}
		// We already know the "genesis" block number, cap floor to that
		if floor < int64(d.genesis)-1 {
			floor = int64(d.genesis) - 1
815
		}
816
	}
817

818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837
	ancestor, err := d.findAncestorSpanSearch(p, mode, remoteHeight, localHeight, floor)
	if err == nil {
		return ancestor, nil
	}
	// The returned error was not nil.
	// If the error returned does not reflect that a common ancestor was not found, return it.
	// If the error reflects that a common ancestor was not found, continue to binary search,
	// where the error value will be reassigned.
	if !errors.Is(err, errNoAncestorFound) {
		return 0, err
	}

	ancestor, err = d.findAncestorBinarySearch(p, mode, remoteHeight, floor)
	if err != nil {
		return 0, err
	}
	return ancestor, nil
}

func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, remoteHeight, localHeight uint64, floor int64) (commonAncestor uint64, err error) {
838
	from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
839 840

	p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip)
841
	go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)
842 843 844

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
845 846 847

	ttl := d.requestTTL()
	timeout := time.After(ttl)
848 849 850 851

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
852
			return 0, errCanceled
853

854
		case packet := <-d.headerCh:
855
			// Discard anything not from the origin peer
856
			if packet.PeerId() != p.id {
857
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
858 859 860
				break
			}
			// Make sure the peer actually gave something valid
861
			headers := packet.(*headerPack).headers
862
			if len(headers) == 0 {
P
Péter Szilágyi 已提交
863
				p.log.Warn("Empty head header set")
864 865
				return 0, errEmptyHeaderSet
			}
866
			// Make sure the peer's reply conforms to the request
867
			for i, header := range headers {
868
				expectNumber := from + int64(i)*int64(skip+1)
869 870
				if number := header.Number.Int64(); number != expectNumber {
					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
871
					return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering"))
872 873
				}
			}
874 875 876
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
877
				// Skip any headers that underflow/overflow our requested set
878
				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max {
879 880 881
					continue
				}
				// Otherwise check if we already know the header or not
882 883
				h := headers[i].Hash()
				n := headers[i].Number.Uint64()
884 885

				var known bool
886
				switch mode {
887 888 889 890 891 892 893 894
				case FullSync:
					known = d.blockchain.HasBlock(h, n)
				case FastSync:
					known = d.blockchain.HasFastBlock(h, n)
				default:
					known = d.lightchain.HasHeader(h, n)
				}
				if known {
895
					number, hash = n, h
896 897 898 899
					break
				}
			}

900
		case <-timeout:
P
Péter Szilágyi 已提交
901
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
902 903
			return 0, errTimeout

904
		case <-d.bodyCh:
905 906
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
907 908 909
		}
	}
	// If the head fetch already found an ancestor, return
910
	if hash != (common.Hash{}) {
911
		if int64(number) <= floor {
P
Péter Szilágyi 已提交
912
			p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
913 914
			return 0, errInvalidAncestor
		}
P
Péter Szilágyi 已提交
915
		p.log.Debug("Found common ancestor", "number", number, "hash", hash)
916 917
		return number, nil
	}
918 919 920 921 922 923
	return 0, errNoAncestorFound
}

func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode, remoteHeight uint64, floor int64) (commonAncestor uint64, err error) {
	hash := common.Hash{}

924
	// Ancestor not found, we need to binary search over our chain
925
	start, end := uint64(0), remoteHeight
926 927 928
	if floor > 0 {
		start = uint64(floor)
	}
929 930
	p.log.Trace("Binary searching for common ancestor", "start", start, "end", end)

931 932 933 934
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

935 936 937
		ttl := d.requestTTL()
		timeout := time.After(ttl)

938
		go p.peer.RequestHeadersByNumber(check, 1, 0, false)
939 940 941 942 943

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
944
				return 0, errCanceled
945

946
			case packet := <-d.headerCh:
947
				// Discard anything not from the origin peer
948 949
				if packet.PeerId() != p.id {
					log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
950 951 952
					break
				}
				// Make sure the peer actually gave something valid
953
				headers := packet.(*headerPack).headers
954
				if len(headers) != 1 {
955
					p.log.Warn("Multiple headers for single request", "headers", len(headers))
956
					return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
957 958 959 960
				}
				arrived = true

				// Modify the search interval based on the response
961 962
				h := headers[0].Hash()
				n := headers[0].Number.Uint64()
963 964

				var known bool
965
				switch mode {
966 967 968 969 970 971 972 973
				case FullSync:
					known = d.blockchain.HasBlock(h, n)
				case FastSync:
					known = d.blockchain.HasFastBlock(h, n)
				default:
					known = d.lightchain.HasHeader(h, n)
				}
				if !known {
974 975 976
					end = check
					break
				}
977
				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
978
				if header.Number.Uint64() != check {
979
					p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
980
					return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
981 982
				}
				start = check
983
				hash = h
984

985
			case <-timeout:
P
Péter Szilágyi 已提交
986
				p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
987 988
				return 0, errTimeout

989
			case <-d.bodyCh:
990 991
			case <-d.receiptCh:
				// Out of bounds delivery, ignore
992 993 994
			}
		}
	}
995 996
	// Ensure valid ancestry and return
	if int64(start) <= floor {
P
Péter Szilágyi 已提交
997
		p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
998 999
		return 0, errInvalidAncestor
	}
P
Péter Szilágyi 已提交
1000
	p.log.Debug("Found common ancestor", "number", start, "hash", hash)
1001 1002 1003
	return start, nil
}

1004 1005 1006 1007 1008
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
1009
// other peers are only accepted if they map cleanly to the skeleton. If no one
1010 1011
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
1012
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
P
Péter Szilágyi 已提交
1013 1014
	p.log.Debug("Directing header downloads", "origin", from)
	defer p.log.Debug("Header download terminated")
1015

1016 1017
	// Create a timeout timer, and the associated header fetcher
	skeleton := true            // Skeleton assembly phase or finishing up
1018
	pivoting := false           // Whether the next request is pivot verification
1019
	request := time.Now()       // time of the last skeleton fetch request
1020 1021 1022 1023
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

1024
	var ttl time.Duration
1025
	getHeaders := func(from uint64) {
1026
		request = time.Now()
1027 1028 1029

		ttl = d.requestTTL()
		timeout.Reset(ttl)
1030

1031
		if skeleton {
P
Péter Szilágyi 已提交
1032
			p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
1033
			go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
1034
		} else {
P
Péter Szilágyi 已提交
1035
			p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
1036
			go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
1037
		}
1038
	}
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052
	getNextPivot := func() {
		pivoting = true
		request = time.Now()

		ttl = d.requestTTL()
		timeout.Reset(ttl)

		d.pivotLock.RLock()
		pivot := d.pivotHeader.Number.Uint64()
		d.pivotLock.RUnlock()

		p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
		go p.peer.RequestHeadersByNumber(pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
	}
1053
	// Start pulling the header chain skeleton until all is done
1054
	ancestor := from
1055 1056
	getHeaders(from)

1057
	mode := d.getMode()
1058 1059 1060
	for {
		select {
		case <-d.cancelCh:
1061
			return errCanceled
1062

1063
		case packet := <-d.headerCh:
1064
			// Make sure the active peer is giving us the skeleton headers
1065
			if packet.PeerId() != p.id {
1066
				log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
1067 1068
				break
			}
1069
			headerReqTimer.UpdateSince(request)
1070 1071
			timeout.Stop()

1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109
			// If the pivot is being checked, move if it became stale and run the real retrieval
			var pivot uint64

			d.pivotLock.RLock()
			if d.pivotHeader != nil {
				pivot = d.pivotHeader.Number.Uint64()
			}
			d.pivotLock.RUnlock()

			if pivoting {
				if packet.Items() == 2 {
					// Retrieve the headers and do some sanity checks, just in case
					headers := packet.(*headerPack).headers

					if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want {
						log.Warn("Peer sent invalid next pivot", "have", have, "want", want)
						return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want)
					}
					if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want {
						log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want)
						return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want)
					}
					log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number)
					pivot = headers[0].Number.Uint64()

					d.pivotLock.Lock()
					d.pivotHeader = headers[0]
					d.pivotLock.Unlock()

					// Write out the pivot into the database so a rollback beyond
					// it will reenable fast sync and update the state root that
					// the state syncer will be downloading.
					rawdb.WriteLastPivotNumber(d.stateDB, pivot)
				}
				pivoting = false
				getHeaders(from)
				continue
			}
1110
			// If the skeleton's finished, pull any remaining head headers directly from the origin
1111
			if skeleton && packet.Items() == 0 {
1112 1113 1114 1115
				skeleton = false
				getHeaders(from)
				continue
			}
1116
			// If no more headers are inbound, notify the content fetchers and return
1117
			if packet.Items() == 0 {
1118 1119 1120 1121 1122 1123 1124 1125
				// Don't abort header fetches while the pivot is downloading
				if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
					p.log.Debug("No headers, waiting for pivot commit")
					select {
					case <-time.After(fsHeaderContCheck):
						getHeaders(from)
						continue
					case <-d.cancelCh:
1126
						return errCanceled
1127 1128 1129
					}
				}
				// Pivot done (or not in fast sync) and no more headers, terminate the process
P
Péter Szilágyi 已提交
1130
				p.log.Debug("No more headers available")
1131 1132 1133 1134
				select {
				case d.headerProcCh <- nil:
					return nil
				case <-d.cancelCh:
1135
					return errCanceled
1136
				}
1137
			}
1138
			headers := packet.(*headerPack).headers
1139

1140 1141
			// If we received a skeleton batch, resolve internals concurrently
			if skeleton {
1142
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
1143
				if err != nil {
P
Péter Szilágyi 已提交
1144
					p.log.Debug("Skeleton chain invalid", "err", err)
1145
					return fmt.Errorf("%w: %v", errInvalidChain, err)
1146
				}
1147 1148
				headers = filled[proced:]
				from += uint64(proced)
1149 1150 1151 1152 1153 1154
			} else {
				// If we're closing in on the chain head, but haven't yet reached it, delay
				// the last few headers so mini reorgs on the head don't cause invalid hash
				// chain errors.
				if n := len(headers); n > 0 {
					// Retrieve the current head we're at
1155
					var head uint64
1156
					if mode == LightSync {
1157 1158 1159 1160 1161 1162 1163
						head = d.lightchain.CurrentHeader().Number.Uint64()
					} else {
						head = d.blockchain.CurrentFastBlock().NumberU64()
						if full := d.blockchain.CurrentBlock().NumberU64(); head < full {
							head = full
						}
					}
1164 1165 1166 1167 1168 1169
					// If the head is below the common ancestor, we're actually deduplicating
					// already existing chain segments, so use the ancestor as the fake head.
					// Otherwise we might end up delaying header deliveries pointlessly.
					if head < ancestor {
						head = ancestor
					}
1170 1171 1172 1173 1174 1175 1176 1177 1178
					// If the head is way older than this batch, delay the last few headers
					if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
						delay := reorgProtHeaderDelay
						if delay > n {
							delay = n
						}
						headers = headers[:n-delay]
					}
				}
1179
			}
1180
			// Insert all the new headers and fetch the next batch
1181
			if len(headers) > 0 {
P
Péter Szilágyi 已提交
1182
				p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
1183 1184 1185
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
1186
					return errCanceled
1187 1188
				}
				from += uint64(len(headers))
1189 1190 1191 1192 1193 1194 1195 1196

				// If we're still skeleton filling fast sync, check pivot staleness
				// before continuing to the next skeleton filling
				if skeleton && pivot > 0 {
					getNextPivot()
				} else {
					getHeaders(from)
				}
1197 1198 1199 1200 1201 1202 1203 1204
			} else {
				// No headers delivered, or all of them being delayed, sleep a bit and retry
				p.log.Trace("All headers delayed, waiting")
				select {
				case <-time.After(fsHeaderContCheck):
					getHeaders(from)
					continue
				case <-d.cancelCh:
1205
					return errCanceled
1206
				}
1207
			}
1208 1209

		case <-timeout.C:
1210 1211 1212 1213 1214 1215
			if d.dropPeer == nil {
				// The dropPeer method is nil when `--copydb` is used for a local copy.
				// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
				p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id)
				break
			}
1216
			// Header retrieval timed out, consider the peer bad and drop
P
Péter Szilágyi 已提交
1217
			p.log.Debug("Header request timed out", "elapsed", ttl)
1218
			headerTimeoutMeter.Mark(1)
1219 1220 1221
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
1222
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1223 1224 1225 1226
				select {
				case ch <- false:
				case <-d.cancelCh:
				}
1227
			}
1228 1229 1230 1231
			select {
			case d.headerProcCh <- nil:
			case <-d.cancelCh:
			}
1232
			return fmt.Errorf("%w: header request timed out", errBadPeer)
1233 1234 1235 1236
		}
	}
}

1237 1238
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
1239 1240 1241 1242 1243
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
Y
Yusup 已提交
1244
// The method returns the entire filled skeleton and also the number of headers
1245 1246
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
1247
	log.Debug("Filling up skeleton", "from", from)
1248 1249 1250 1251 1252
	d.queue.ScheduleSkeleton(from, skeleton)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
1253
			return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
1254
		}
1255 1256 1257
		expire  = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
		reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
			return d.queue.ReserveHeaders(p, count), false, false
1258
		}
1259 1260
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
		capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
1261 1262 1263
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
			p.SetHeadersIdle(accepted, deliveryTime)
		}
1264
	)
1265
	err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
1266
		d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
1267
		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
1268

1269
	log.Debug("Skeleton fill terminated", "err", err)
1270 1271 1272

	filled, proced := d.queue.RetrieveHeaders()
	return filled, proced, err
1273 1274
}

1275 1276 1277 1278
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
1279
	log.Debug("Downloading block bodies", "origin", from)
1280

1281
	var (
1282
		deliver = func(packet dataPack) (int, error) {
1283
			pack := packet.(*bodyPack)
1284
			return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
1285
		}
1286
		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
1287 1288
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
1289
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
1290
	)
1291
	err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
1292
		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
1293
		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
1294

1295
	log.Debug("Block body download terminated", "err", err)
1296 1297 1298 1299 1300 1301 1302
	return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
1303
	log.Debug("Downloading transaction receipts", "origin", from)
1304 1305

	var (
1306
		deliver = func(packet dataPack) (int, error) {
1307
			pack := packet.(*receiptPack)
1308
			return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
1309
		}
1310
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
1311 1312
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
1313 1314 1315
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
			p.SetReceiptsIdle(accepted, deliveryTime)
		}
1316
	)
1317
	err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
1318
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
1319
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
1320

1321
	log.Debug("Transaction receipt download terminated", "err", err)
1322 1323 1324 1325 1326 1327
	return err
}

// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
//  - errCancel:   error type to return if the fetch operation is cancelled (mostly makes logging nicer)
//  - deliveryCh:  channel from which to retrieve downloaded data packets (merged from all concurrent peers)
//  - deliver:     processing callback to deliver data packets into type specific download queues (usually within `queue`)
//  - wakeCh:      notification channel for waking the fetcher when new tasks are available (or sync completed)
//  - expire:      task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
//  - pending:     task callback for the number of requests still needing download (detect completion/non-completability)
//  - inFlight:    task callback for the number of in-progress requests (wait for all active downloads to finish)
//  - throttle:    task callback to check if the processing queue is full and activate throttling (bound memory use)
//  - reserve:     task callback to reserve new download tasks to a particular peer (also signals partial completions)
//  - fetchHook:   tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
//  - fetch:       network callback to actually send a particular download request to a physical remote peer
//  - cancel:      task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
P
Péter Szilágyi 已提交
1346
//  - capacity:    network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
1347 1348
//  - idle:        network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
//  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
1349
//  - kind:        textual label of the type being downloaded to display in log messages
1350
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1351
	expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
1352
	fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
1353
	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
1354

1355
	// Create a ticker to detect expired retrieval tasks
1356 1357 1358 1359 1360
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

1361
	// Prepare the queue and fetch block parts until the block header fetcher's done
1362 1363 1364 1365
	finished := false
	for {
		select {
		case <-d.cancelCh:
1366
			return errCanceled
1367

1368
		case packet := <-deliveryCh:
1369
			deliveryTime := time.Now()
1370 1371
			// If the peer was previously banned and failed to deliver its pack
			// in a reasonable time frame, ignore its message.
1372
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
1373 1374
				// Deliver the received chunk of data and check chain validity
				accepted, err := deliver(packet)
1375
				if errors.Is(err, errInvalidChain) {
1376
					return err
1377 1378 1379 1380
				}
				// Unless a peer delivered something completely else than requested (usually
				// caused by a timed out request which came through in the end), set it to
				// idle. If the delivery's stale, the peer should have already been idled.
1381
				if !errors.Is(err, errStaleDelivery) {
1382
					setIdle(peer, accepted, deliveryTime)
1383 1384 1385 1386
				}
				// Issue a log to the user to see what's going on
				switch {
				case err == nil && packet.Items() == 0:
P
Péter Szilágyi 已提交
1387
					peer.log.Trace("Requested data not delivered", "type", kind)
1388
				case err == nil:
P
Péter Szilágyi 已提交
1389
					peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
1390
				default:
P
Péter Szilágyi 已提交
1391
					peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
1392 1393 1394 1395 1396 1397 1398 1399
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

1400
		case cont := <-wakeCh:
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
1423
			// Check for fetch request timeouts and demote the responsible peers
1424
			for pid, fails := range expire() {
1425
				if peer := d.peers.Peer(pid); peer != nil {
1426 1427 1428 1429 1430 1431 1432 1433
					// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
					// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
					// out that sync wise we need to get rid of the peer.
					//
					// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
					// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
					if fails > 2 {
P
Péter Szilágyi 已提交
1434
						peer.log.Trace("Data delivery timed out", "type", kind)
1435
						setIdle(peer, 0, time.Now())
1436
					} else {
P
Péter Szilágyi 已提交
1437
						peer.log.Debug("Stalling delivery, dropping", "type", kind)
1438

1439 1440 1441 1442 1443 1444
						if d.dropPeer == nil {
							// The dropPeer method is nil when `--copydb` is used for a local copy.
							// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
							peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
						} else {
							d.dropPeer(pid)
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454

							// If this peer was the master peer, abort sync immediately
							d.cancelLock.RLock()
							master := pid == d.cancelPeer
							d.cancelLock.RUnlock()

							if master {
								d.cancel()
								return errTimeout
							}
1455
						}
1456
					}
1457 1458
				}
			}
1459 1460
			// If there's nothing more to fetch, wait or terminate
			if pending() == 0 {
1461
				if !inFlight() && finished {
1462
					log.Debug("Data fetching completed", "type", kind)
1463 1464 1465 1466 1467
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
1468
			progressed, throttled, running := false, false, inFlight()
1469
			idles, total := idle()
1470
			pendCount := pending()
1471
			for _, peer := range idles {
1472
				// Short circuit if throttling activated
1473
				if throttled {
1474 1475
					break
				}
1476
				// Short circuit if there is no more available task.
1477
				if pendCount = pending(); pendCount == 0 {
1478 1479
					break
				}
1480 1481
				// Reserve a chunk of fetches for a peer. A nil can mean either that
				// no more headers are available, or that the peer is known not to
1482
				// have them.
1483
				request, progress, throttle := reserve(peer, capacity(peer))
1484 1485
				if progress {
					progressed = true
1486
				}
1487 1488 1489 1490
				if throttle {
					throttled = true
					throttleCounter.Inc(1)
				}
1491 1492 1493
				if request == nil {
					continue
				}
1494
				if request.From > 0 {
P
Péter Szilágyi 已提交
1495
					peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
1496
				} else {
1497
					peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
1498
				}
1499
				// Fetch the chunk and make sure any errors return the hashes to the queue
1500 1501
				if fetchHook != nil {
					fetchHook(request.Headers)
1502
				}
1503
				if err := fetch(peer, request); err != nil {
1504 1505 1506 1507 1508
					// Although we could try and make an attempt to fix this, this error really
					// means that we've double allocated a fetch task to a peer. If that is the
					// case, the internal state of the downloader and the queue is very wrong so
					// better hard crash and note the error instead of silently accumulating into
					// a much bigger issue.
1509
					panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
1510
				}
1511
				running = true
1512 1513 1514
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
1515
			if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
1516 1517 1518 1519 1520 1521
				return errPeersUnavailable
			}
		}
	}
}

1522 1523 1524
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
1525
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
1526
	// Keep a count of uncertain headers to roll back
1527
	var (
1528
		rollback    uint64 // Zero means no rollback (fine as you can't unroll the genesis)
1529 1530 1531
		rollbackErr error
		mode        = d.getMode()
	)
1532
	defer func() {
1533
		if rollback > 0 {
1534
			lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
1535
			if mode != LightSync {
1536 1537
				lastFastBlock = d.blockchain.CurrentFastBlock().Number()
				lastBlock = d.blockchain.CurrentBlock().Number()
1538
			}
1539 1540 1541 1542
			if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
				// We're already unwinding the stack, only print the error to make it more visible
				log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
			}
1543
			curFastBlock, curBlock := common.Big0, common.Big0
1544
			if mode != LightSync {
1545 1546
				curFastBlock = d.blockchain.CurrentFastBlock().Number()
				curBlock = d.blockchain.CurrentBlock().Number()
1547
			}
1548
			log.Warn("Rolled back chain segment",
1549
				"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
1550
				"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
1551
				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
1552 1553 1554 1555 1556 1557 1558 1559
		}
	}()
	// Wait for batches of headers to process
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
1560
			rollbackErr = errCanceled
1561
			return errCanceled
1562 1563 1564 1565 1566

		case headers := <-d.headerProcCh:
			// Terminate header processing if we synced up
			if len(headers) == 0 {
				// Notify everyone that headers are fully processed
1567
				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1568 1569 1570 1571 1572
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
				}
1573 1574
				// If no headers were retrieved at all, the peer violated its TD promise that it had a
				// better chain compared to ours. The only exception is if its promised blocks were
1575
				// already imported by other means (e.g. fetcher):
1576 1577 1578 1579 1580 1581 1582 1583 1584
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
1585
				if mode != LightSync {
1586 1587
					head := d.blockchain.CurrentBlock()
					if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
1588 1589
						return errStallingPeer
					}
1590 1591 1592 1593 1594 1595 1596 1597
				}
				// If fast or light syncing, ensure promised headers are indeed delivered. This is
				// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
				// of delivering the post-pivot blocks that would flag the invalid content.
				//
				// This check cannot be executed "as is" for full imports, since blocks may still be
				// queued for processing when the header download completes. However, as long as the
				// peer gave us something useful, we're already happy/progressed (above check).
1598
				if mode == FastSync || mode == LightSync {
1599 1600
					head := d.lightchain.CurrentHeader()
					if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
1601 1602 1603 1604
						return errStallingPeer
					}
				}
				// Disable any rollback and return
1605
				rollback = 0
1606 1607 1608 1609 1610 1611 1612 1613
				return nil
			}
			// Otherwise split the chunk of headers into batches and process them
			gotHeaders = true
			for len(headers) > 0 {
				// Terminate if something failed in between processing chunks
				select {
				case <-d.cancelCh:
1614
					rollbackErr = errCanceled
1615
					return errCanceled
1616 1617 1618 1619 1620 1621 1622 1623
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]
1624

1625
				// In case of header only syncing, validate the chunk immediately
1626
				if mode == FastSync || mode == LightSync {
1627
					// If we're importing pure headers, verify based on their recentness
1628 1629 1630 1631 1632 1633 1634 1635
					var pivot uint64

					d.pivotLock.RLock()
					if d.pivotHeader != nil {
						pivot = d.pivotHeader.Number.Uint64()
					}
					d.pivotLock.RUnlock()

1636 1637 1638 1639
					frequency := fsHeaderCheckFrequency
					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
						frequency = 1
					}
1640
					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1641
						rollbackErr = err
1642 1643

						// If some headers were inserted, track them as uncertain
1644
						if (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {
1645
							rollback = chunk[0].Number.Uint64()
1646
						}
1647
						log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
1648
						return fmt.Errorf("%w: %v", errInvalidChain, err)
1649
					}
1650
					// All verifications passed, track all headers within the alloted limits
1651 1652 1653 1654 1655 1656 1657
					if mode == FastSync {
						head := chunk[len(chunk)-1].Number.Uint64()
						if head-rollback > uint64(fsHeaderSafetyNet) {
							rollback = head - uint64(fsHeaderSafetyNet)
						} else {
							rollback = 1
						}
1658 1659 1660
					}
				}
				// Unless we're doing light chains, schedule the headers for associated content retrieval
1661
				if mode == FullSync || mode == FastSync {
1662 1663 1664 1665
					// If we've reached the allowed number of pending headers, stall a bit
					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
						select {
						case <-d.cancelCh:
1666
							rollbackErr = errCanceled
1667
							return errCanceled
1668 1669 1670 1671 1672 1673
						case <-time.After(time.Second):
						}
					}
					// Otherwise insert the headers for content retrieval
					inserts := d.queue.Schedule(chunk, origin)
					if len(inserts) != len(chunk) {
1674
						rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
1675
						return fmt.Errorf("%w: stale headers", errBadPeer)
1676 1677 1678 1679 1680
					}
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
1681 1682 1683 1684 1685 1686 1687
			// Update the highest block number we know if a higher one is found.
			d.syncStatsLock.Lock()
			if d.syncStatsChainHeight < origin {
				d.syncStatsChainHeight = origin - 1
			}
			d.syncStatsLock.Unlock()

1688
			// Signal the content downloaders of the availablility of new tasks
1689
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1690 1691 1692 1693 1694 1695 1696 1697 1698
				select {
				case ch <- true:
				default:
				}
			}
		}
	}
}

1699 1700
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
1701
	for {
1702
		results := d.queue.Results(true)
1703
		if len(results) == 0 {
1704
			return nil
1705
		}
1706
		if d.chainInsertHook != nil {
1707
			d.chainInsertHook(results)
1708
		}
1709 1710 1711 1712 1713 1714 1715
		if err := d.importBlockResults(results); err != nil {
			return err
		}
	}
}

func (d *Downloader) importBlockResults(results []*fetchResult) error {
1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting downloaded chain", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnum", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	}
	if index, err := d.blockchain.InsertChain(blocks); err != nil {
1736 1737 1738 1739 1740 1741 1742 1743 1744
		if index < len(results) {
			log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
		} else {
			// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
			// when it needs to preprocess blocks to import a sidechain.
			// The importer will put together a new list of blocks to import, which is a superset
			// of the blocks delivered from the downloader, and the indexing will be off.
			log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
		}
1745
		return fmt.Errorf("%w: %v", errInvalidChain, err)
1746 1747 1748 1749 1750 1751
	}
	return nil
}

// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
1752
func (d *Downloader) processFastSyncContent() error {
1753 1754
	// Start syncing state of the reported head block. This should get us most of
	// the state of the pivot block.
1755 1756 1757 1758
	d.pivotLock.RLock()
	sync := d.syncState(d.pivotHeader.Root)
	d.pivotLock.RUnlock()

1759 1760 1761 1762 1763 1764 1765
	defer func() {
		// The `sync` object is replaced every time the pivot moves. We need to
		// defer close the very last active one, hence the lazy evaluation vs.
		// calling defer sync.Cancel() !!!
		sync.Cancel()
	}()

1766 1767
	closeOnErr := func(s *stateSync) {
		if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
1768
			d.queue.Close() // wake up Results
1769
		}
1770 1771
	}
	go closeOnErr(sync)
1772

1773
	// To cater for moving pivot points, track the pivot block and subsequently
Y
Yusup 已提交
1774
	// accumulated download results separately.
1775 1776 1777 1778
	var (
		oldPivot *fetchResult   // Locked in pivot block, might change eventually
		oldTail  []*fetchResult // Downloaded content after the pivot
	)
1779
	for {
1780 1781 1782
		// Wait for the next batch of downloaded data to be available, and if the pivot
		// block became stale, move the goalpost
		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
1783
		if len(results) == 0 {
1784 1785
			// If pivot sync is done, stop
			if oldPivot == nil {
1786
				return sync.Cancel()
1787 1788 1789 1790
			}
			// If sync failed, stop
			select {
			case <-d.cancelCh:
1791
				sync.Cancel()
1792
				return errCanceled
1793 1794
			default:
			}
1795 1796 1797 1798
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812
		// If we haven't downloaded the pivot block yet, check pivot staleness
		// notifications from the header downloader
		d.pivotLock.RLock()
		pivot := d.pivotHeader
		d.pivotLock.RUnlock()

		if oldPivot == nil {
			if pivot.Root != sync.root {
				sync.Cancel()
				sync = d.syncState(pivot.Root)

				go closeOnErr(sync)
			}
		} else {
1813 1814 1815 1816
			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
		}
		// Split around the pivot block and process the two sides via fast/full sync
		if atomic.LoadInt32(&d.committed) == 0 {
1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831
			latest := results[len(results)-1].Header
			// If the height is above the pivot block by 2 sets, it means the pivot
			// become stale in the network and it was garbage collected, move to a
			// new pivot.
			//
			// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
			// need to be taken into account, otherwise we're detecting the pivot move
			// late and will drop peers due to unavailable state!!!
			if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
				log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
				pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted

				d.pivotLock.Lock()
				d.pivotHeader = pivot
				d.pivotLock.Unlock()
1832 1833 1834

				// Write out the pivot into the database so a rollback beyond it will
				// reenable fast sync
1835
				rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())
1836 1837
			}
		}
1838
		P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)
1839
		if err := d.commitFastSyncData(beforeP, sync); err != nil {
1840 1841 1842
			return err
		}
		if P != nil {
1843 1844
			// If new pivot block found, cancel old state retrieval and restart
			if oldPivot != P {
1845 1846
				sync.Cancel()
				sync = d.syncState(P.Header.Root)
1847

1848
				go closeOnErr(sync)
1849 1850 1851 1852
				oldPivot = P
			}
			// Wait for completion, occasionally checking for pivot staleness
			select {
1853 1854 1855
			case <-sync.done:
				if sync.err != nil {
					return sync.err
1856 1857 1858 1859 1860 1861 1862 1863 1864
				}
				if err := d.commitPivotBlock(P); err != nil {
					return err
				}
				oldPivot = nil

			case <-time.After(time.Second):
				oldTail = afterP
				continue
1865
			}
1866
		}
1867
		// Fast sync done, pivot commit done, full import
1868 1869 1870 1871 1872 1873 1874
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
1875 1876 1877 1878 1879 1880 1881 1882
	if len(results) == 0 {
		return nil, nil, nil
	}
	if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
		// the pivot is somewhere in the future
		return nil, results, nil
	}
	// This can also be optimized, but only happens very seldom
1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897
	for _, result := range results {
		num := result.Header.Number.Uint64()
		switch {
		case num < pivot:
			before = append(before, result)
		case num == pivot:
			p = result
		default:
			after = append(after, result)
		}
	}
	return p, before, after
}

func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
1898 1899 1900 1901 1902 1903 1904 1905 1906 1907
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	case <-stateSync.done:
		if err := stateSync.Wait(); err != nil {
			return err
1908
		}
1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting fast-sync blocks", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnumn", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	receipts := make([]types.Receipts, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
		receipts[i] = result.Receipts
	}
1923
	if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
1924
		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
1925
		return fmt.Errorf("%w: %v", errInvalidChain, err)
1926 1927 1928 1929 1930
	}
	return nil
}

func (d *Downloader) commitPivotBlock(result *fetchResult) error {
1931 1932
	block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
1933 1934

	// Commit the pivot block as the new head, will require full sync from here on
1935
	if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil {
1936 1937
		return err
	}
1938
	if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
1939
		return err
1940
	}
1941
	atomic.StoreInt32(&d.committed, 1)
1942 1943 1944 1945 1946 1947 1948 1949 1950

	// If we had a bloom filter for the state sync, deallocate it now. Note, we only
	// deallocate internally, but keep the empty wrapper. This ensures that if we do
	// a rollback after committing the pivot and restarting fast sync, we don't end
	// up using a nil bloom. Empty bloom is fine, it just returns that it does not
	// have the info we need, so reach down to the database instead.
	if d.stateBloom != nil {
		d.stateBloom.Close()
	}
1951
	return nil
1952 1953
}

L
Leif Jurvetson 已提交
1954
// DeliverHeaders injects a new batch of block headers received from a remote
1955
// node into the download schedule.
1956 1957
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error {
	return d.deliver(d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
1958 1959 1960
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
1961 1962
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
	return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
1963 1964 1965
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
1966 1967
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
	return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
1968 1969 1970
}

// DeliverNodeData injects a new batch of node state data received from a remote node.
1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998
func (d *Downloader) DeliverNodeData(id string, data [][]byte) error {
	return d.deliver(d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}

// DeliverSnapPacket is invoked from a peer's message handler when it transmits a
// data packet for the local node to consume.
func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) error {
	switch packet := packet.(type) {
	case *snap.AccountRangePacket:
		hashes, accounts, err := packet.Unpack()
		if err != nil {
			return err
		}
		return d.SnapSyncer.OnAccounts(peer, packet.ID, hashes, accounts, packet.Proof)

	case *snap.StorageRangesPacket:
		hashset, slotset := packet.Unpack()
		return d.SnapSyncer.OnStorage(peer, packet.ID, hashset, slotset, packet.Proof)

	case *snap.ByteCodesPacket:
		return d.SnapSyncer.OnByteCodes(peer, packet.ID, packet.Codes)

	case *snap.TrieNodesPacket:
		return d.SnapSyncer.OnTrieNodes(peer, packet.ID, packet.Nodes)

	default:
		return fmt.Errorf("unexpected snap packet type: %T", packet)
	}
1999 2000 2001
}

// deliver injects a new batch of data received from a remote node.
2002
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
2003
	// Update the delivery metrics for both good and failed deliveries
2004
	inMeter.Mark(int64(packet.Items()))
2005 2006
	defer func() {
		if err != nil {
2007
			dropMeter.Mark(int64(packet.Items()))
2008 2009 2010 2011 2012 2013
		}
	}()
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
2014 2015 2016
	if cancel == nil {
		return errNoSyncActive
	}
2017
	select {
2018
	case destCh <- packet:
2019 2020 2021 2022
		return nil
	case <-cancel:
		return errNoSyncActive
	}
2023
}
2024 2025 2026 2027 2028 2029

// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
	for {
		// Retrieve the current median RTT and integrate into the previoust target RTT
2030
		rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
2031 2032 2033 2034 2035 2036 2037 2038
		atomic.StoreUint64(&d.rttEstimate, uint64(rtt))

		// A new RTT cycle passed, increase our confidence in the estimated RTT
		conf := atomic.LoadUint64(&d.rttConfidence)
		conf = conf + (1000000-conf)/2
		atomic.StoreUint64(&d.rttConfidence, conf)

		// Log the new QoS values and sleep until the next RTT
2039
		log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052
		select {
		case <-d.quitCh:
			return
		case <-time.After(rtt):
		}
	}
}

// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
	// If we have a single peer, confidence is always 1
	peers := uint64(d.peers.Len())
2053 2054 2055 2056
	if peers == 0 {
		// Ensure peer connectivity races don't catch us off guard
		return
	}
2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072
	if peers == 1 {
		atomic.StoreUint64(&d.rttConfidence, 1000000)
		return
	}
	// If we have a ton of peers, don't drop confidence)
	if peers >= uint64(qosConfidenceCap) {
		return
	}
	// Otherwise drop the confidence factor
	conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
	if float64(conf)/1000000 < rttMinConfidence {
		conf = uint64(rttMinConfidence * 1000000)
	}
	atomic.StoreUint64(&d.rttConfidence, conf)

	rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
2073
	log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
2074 2075 2076 2077 2078 2079 2080
}

// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
Y
Yusup 已提交
2081
// be adapted to, but smaller ones are preferred (stabler download stream).
2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098
func (d *Downloader) requestRTT() time.Duration {
	return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}

// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
	var (
		rtt  = time.Duration(atomic.LoadUint64(&d.rttEstimate))
		conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
	)
	ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
	if ttl > ttlLimit {
		ttl = ttlLimit
	}
	return ttl
}