downloader.go 70.7 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"errors"
22
	"fmt"
23
	"math/big"
24 25 26 27
	"sync"
	"sync/atomic"
	"time"

28
	"github.com/ethereum/go-ethereum"
29
	"github.com/ethereum/go-ethereum/common"
30
	"github.com/ethereum/go-ethereum/core/rawdb"
31
	"github.com/ethereum/go-ethereum/core/types"
32
	"github.com/ethereum/go-ethereum/ethdb"
33
	"github.com/ethereum/go-ethereum/event"
34
	"github.com/ethereum/go-ethereum/log"
35
	"github.com/ethereum/go-ethereum/metrics"
36
	"github.com/ethereum/go-ethereum/params"
37
	"github.com/ethereum/go-ethereum/trie"
38 39
)

40
var (
41 42 43
	MaxHashFetch    = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
44
	MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
45
	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
46
	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
47

48 49 50 51 52
	rttMinEstimate   = 2 * time.Second  // Minimum round-trip time to target for download requests
	rttMaxEstimate   = 20 * time.Second // Maximum round-trip time to target for download requests
	rttMinConfidence = 0.1              // Worse confidence factor in our estimated RTT value
	ttlScaling       = 3                // Constant scaling factor for RTT -> TTL conversion
	ttlLimit         = time.Minute      // Maximum TTL allowance to prevent reaching crazy timeouts
53 54 55 56

	qosTuningPeers   = 5    // Number of peers to tune based on (best peers)
	qosConfidenceCap = 10   // Number of peers above which not to modify RTT confidence
	qosTuningImpact  = 0.25 // Impact that a new tuning target has on the previous value
57

58 59 60 61 62
	maxQueuedHeaders            = 32 * 1024                         // [eth/62] Maximum number of headers to queue for import (DOS protection)
	maxHeadersProcess           = 2048                              // Number of header download results to import at once into the chain
	maxResultsProcess           = 2048                              // Number of content download results to import at once into the chain
	fullMaxForkAncestry  uint64 = params.FullImmutabilityThreshold  // Maximum chain reorganisation (locally redeclared so tests can reduce it)
	lightMaxForkAncestry uint64 = params.LightImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
63

64 65 66
	reorgProtThreshold   = 48 // Threshold number of recent blocks to disable mini reorg protection
	reorgProtHeaderDelay = 2  // Number of headers to delay delivering to cover mini reorgs

67 68 69 70 71
	fsHeaderCheckFrequency = 100             // Verification frequency of the downloaded headers during fast sync
	fsHeaderSafetyNet      = 2048            // Number of headers to discard in case a chain violation is detected
	fsHeaderForceVerify    = 24              // Number of headers to verify before and after the pivot to accept it
	fsHeaderContCheck      = 3 * time.Second // Time interval to check for header continuations during state download
	fsMinFullBlocks        = 64              // Number of blocks to retrieve fully even in fast sync
72
)
73

74
var (
75 76 77 78
	errBusy                    = errors.New("busy")
	errUnknownPeer             = errors.New("peer is unknown or unhealthy")
	errBadPeer                 = errors.New("action from bad peer ignored")
	errStallingPeer            = errors.New("peer is stalling")
79
	errUnsyncedPeer            = errors.New("unsynced peer")
80 81 82 83 84 85 86 87 88 89
	errNoPeers                 = errors.New("no peers to keep download active")
	errTimeout                 = errors.New("timeout")
	errEmptyHeaderSet          = errors.New("empty header set by peer")
	errPeersUnavailable        = errors.New("no peers available or all tried for download")
	errInvalidAncestor         = errors.New("retrieved ancestor is invalid")
	errInvalidChain            = errors.New("retrieved hash chain is invalid")
	errInvalidBody             = errors.New("retrieved block body is invalid")
	errInvalidReceipt          = errors.New("retrieved receipt is invalid")
	errCancelStateFetch        = errors.New("state data download canceled (requested)")
	errCancelContentProcessing = errors.New("content processing canceled (requested)")
90
	errCanceled                = errors.New("syncing canceled (requested)")
91
	errNoSyncActive            = errors.New("no sync active")
R
rene 已提交
92
	errTooOld                  = errors.New("peer doesn't speak recent enough protocol version (need version >= 63)")
93 94
)

95
type Downloader struct {
96 97 98 99 100 101 102
	// WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
	// On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
	// guaranteed to be so aligned, so take advantage of that. For more information,
	// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
	rttEstimate   uint64 // Round trip time to target for download requests
	rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)

103
	mode uint32         // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
104
	mux  *event.TypeMux // Event multiplexer to announce sync operation events
105

106 107 108 109
	checkpoint uint64   // Checkpoint block number to enforce head against (e.g. fast sync)
	genesis    uint64   // Genesis block number to limit sync to (e.g. light client CHT)
	queue      *queue   // Scheduler for selecting the hashes to download
	peers      *peerSet // Set of active peers from which download can proceed
110 111

	stateDB    ethdb.Database  // Database to state sync into (and deduplicate via)
112
	stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
113

114
	// Statistics
115 116 117
	syncStatsChainOrigin uint64 // Origin block number where syncing started at
	syncStatsChainHeight uint64 // Highest block number known when syncing started
	syncStatsState       stateSyncStats
118
	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
119

120
	lightchain LightChain
121
	blockchain BlockChain
122

123
	// Callbacks
124
	dropPeer peerDropFn // Drops a peer for misbehaving
125

126
	// Status
127 128 129
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
	notified        int32
130
	committed       int32
131
	ancientLimit    uint64 // The maximum block number which can be regarded as ancient data.
132 133

	// Channels
134 135 136 137 138 139
	headerCh      chan dataPack        // [eth/62] Channel receiving inbound block headers
	bodyCh        chan dataPack        // [eth/62] Channel receiving inbound block bodies
	receiptCh     chan dataPack        // [eth/63] Channel receiving inbound receipts
	bodyWakeCh    chan bool            // [eth/62] Channel to signal the block body fetcher of new tasks
	receiptWakeCh chan bool            // [eth/63] Channel to signal the receipt fetcher of new tasks
	headerProcCh  chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
140

141 142 143 144 145
	// for stateFetcher
	stateSyncStart chan *stateSync
	trackStateReq  chan *stateReq
	stateCh        chan dataPack // [eth/63] Channel receiving inbound node state data

146
	// Cancellation and termination
147 148 149 150
	cancelPeer string         // Identifier of the peer currently being used as the master (cancel on drop)
	cancelCh   chan struct{}  // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex   // Lock to protect the cancel channel and peer in delivers
	cancelWg   sync.WaitGroup // Make sure all fetcher goroutines have exited.
151

152 153 154
	quitCh   chan struct{} // Quit channel to signal termination
	quitLock sync.RWMutex  // Lock to prevent double closes

155
	// Testing hooks
156 157 158 159
	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
160 161
}

N
Nick Johnson 已提交
162
// LightChain encapsulates functions required to synchronise a light chain.
163 164
type LightChain interface {
	// HasHeader verifies a header's presence in the local chain.
165
	HasHeader(common.Hash, uint64) bool
166 167 168 169 170 171 172

	// GetHeaderByHash retrieves a header from the local chain.
	GetHeaderByHash(common.Hash) *types.Header

	// CurrentHeader retrieves the head header from the local chain.
	CurrentHeader() *types.Header

173 174
	// GetTd returns the total difficulty of a local block.
	GetTd(common.Hash, uint64) *big.Int
175 176 177 178

	// InsertHeaderChain inserts a batch of headers into the local chain.
	InsertHeaderChain([]*types.Header, int) (int, error)

179 180
	// SetHead rewinds the local chain to a new head.
	SetHead(uint64) error
181 182
}

N
Nick Johnson 已提交
183
// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
184 185 186
type BlockChain interface {
	LightChain

187 188
	// HasBlock verifies a block's presence in the local chain.
	HasBlock(common.Hash, uint64) bool
189

190 191 192
	// HasFastBlock verifies a fast block's presence in the local chain.
	HasFastBlock(common.Hash, uint64) bool

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
	// GetBlockByHash retrieves a block from the local chain.
	GetBlockByHash(common.Hash) *types.Block

	// CurrentBlock retrieves the head block from the local chain.
	CurrentBlock() *types.Block

	// CurrentFastBlock retrieves the head fast block from the local chain.
	CurrentFastBlock() *types.Block

	// FastSyncCommitHead directly commits the head block to a certain entity.
	FastSyncCommitHead(common.Hash) error

	// InsertChain inserts a batch of blocks into the local chain.
	InsertChain(types.Blocks) (int, error)

	// InsertReceiptChain inserts a batch of receipts into the local chain.
209
	InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
210 211
}

212
// New creates a new downloader to fetch hashes and blocks from remote peers.
213
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
214 215 216
	if lightchain == nil {
		lightchain = chain
	}
217
	dl := &Downloader{
218
		stateDB:        stateDb,
219
		stateBloom:     stateBloom,
220
		mux:            mux,
221
		checkpoint:     checkpoint,
222
		queue:          newQueue(blockCacheMaxItems, blockCacheInitialItems),
223 224 225
		peers:          newPeerSet(),
		rttEstimate:    uint64(rttMaxEstimate),
		rttConfidence:  uint64(1000000),
226
		blockchain:     chain,
227 228 229 230 231 232 233 234 235 236
		lightchain:     lightchain,
		dropPeer:       dropPeer,
		headerCh:       make(chan dataPack, 1),
		bodyCh:         make(chan dataPack, 1),
		receiptCh:      make(chan dataPack, 1),
		bodyWakeCh:     make(chan bool, 1),
		receiptWakeCh:  make(chan bool, 1),
		headerProcCh:   make(chan []*types.Header, 1),
		quitCh:         make(chan struct{}),
		stateCh:        make(chan dataPack),
237
		stateSyncStart: make(chan *stateSync),
238
		syncStatsState: stateSyncStats{
239
			processed: rawdb.ReadFastTrieProgress(stateDb),
240 241
		},
		trackStateReq: make(chan *stateReq),
242
	}
243
	go dl.qosTuner()
244
	go dl.stateFetcher()
245
	return dl
246 247
}

248 249 250
// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
251
//
L
Leif Jurvetson 已提交
252
// In addition, during the state download phase of fast synchronisation the number
253 254
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
255
func (d *Downloader) Progress() ethereum.SyncProgress {
256
	// Lock the current stats and return the progress
257 258
	d.syncStatsLock.RLock()
	defer d.syncStatsLock.RUnlock()
259

260
	current := uint64(0)
261
	mode := d.getMode()
262
	switch {
263
	case d.blockchain != nil && mode == FullSync:
264
		current = d.blockchain.CurrentBlock().NumberU64()
265
	case d.blockchain != nil && mode == FastSync:
266
		current = d.blockchain.CurrentFastBlock().NumberU64()
267
	case d.lightchain != nil:
268
		current = d.lightchain.CurrentHeader().Number.Uint64()
269
	default:
270
		log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
271
	}
272 273 274 275
	return ethereum.SyncProgress{
		StartingBlock: d.syncStatsChainOrigin,
		CurrentBlock:  current,
		HighestBlock:  d.syncStatsChainHeight,
276 277
		PulledStates:  d.syncStatsState.processed,
		KnownStates:   d.syncStatsState.processed + d.syncStatsState.pending,
278
	}
O
obscuren 已提交
279 280
}

281
// Synchronising returns whether the downloader is currently retrieving blocks.
282
func (d *Downloader) Synchronising() bool {
283
	return atomic.LoadInt32(&d.synchronising) > 0
284 285
}

286 287 288 289 290 291 292 293 294
// SyncBloomContains tests if the syncbloom filter contains the given hash:
//   - false: the bloom definitely does not contain hash
//   - true:  the bloom maybe contains hash
//
// While the bloom is being initialized (or is closed), all queries will return true.
func (d *Downloader) SyncBloomContains(hash []byte) bool {
	return d.stateBloom == nil || d.stateBloom.Contains(hash)
}

295 296
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
297
func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
298 299
	logger := log.New("peer", id)
	logger.Trace("Registering sync peer")
300
	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
301
		logger.Error("Failed to register sync peer", "err", err)
302 303
		return err
	}
304 305
	d.qosReduceConfidence()

306 307 308
	return nil
}

N
Nick Johnson 已提交
309
// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
310 311 312 313
func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
	return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
}

314
// UnregisterPeer remove a peer from the known list, preventing any action from
315 316
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
317
func (d *Downloader) UnregisterPeer(id string) error {
318
	// Unregister the peer from the active peer set and revoke any fetch tasks
319 320
	logger := log.New("peer", id)
	logger.Trace("Unregistering sync peer")
321
	if err := d.peers.Unregister(id); err != nil {
322
		logger.Error("Failed to unregister sync peer", "err", err)
323 324
		return err
	}
325
	d.queue.Revoke(id)
326

327
	return nil
328 329
}

330 331
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
332 333
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
	err := d.synchronise(id, head, td, mode)
334

335
	switch err {
336 337 338
	case nil, errBusy, errCanceled:
		return err
	}
339

340 341 342
	if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
		errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
		errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
343 344 345 346 347 348 349 350 351 352
		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
		if d.dropPeer == nil {
			// The dropPeer method is nil when `--copydb` is used for a local copy.
			// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
			log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
		} else {
			d.dropPeer(id)
		}
		return err
	}
353
	log.Warn("Synchronisation failed, retrying", "err", err)
354
	return err
355 356 357
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
358
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
359
// checks fail an error will be returned. This method is synchronous
360
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
L
Leif Jurvetson 已提交
361
	// Mock out the synchronisation if testing
362 363 364
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
365
	// Make sure only one goroutine is ever allowed past this point at once
366
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
367
		return errBusy
368
	}
369
	defer atomic.StoreInt32(&d.synchronising, 0)
370

371 372
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
373
		log.Info("Block synchronisation started")
374
	}
375
	// If we are already full syncing, but have a fast-sync bloom filter laying
376
	// around, make sure it doesn't use memory any more. This is a special case
377 378 379 380
	// when the user attempts to fast sync a new empty network.
	if mode == FullSync && d.stateBloom != nil {
		d.stateBloom.Close()
	}
381
	// Reset the queue, peer set and wake channels to clean any internal leftover state
382
	d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
383
	d.peers.Reset()
384

385
	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
386 387 388 389
		select {
		case <-ch:
		default:
		}
390
	}
391
	for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
392 393 394 395 396 397 398 399
		for empty := false; !empty; {
			select {
			case <-ch:
			default:
				empty = true
			}
		}
	}
400 401 402 403 404 405 406
	for empty := false; !empty; {
		select {
		case <-d.headerProcCh:
		default:
			empty = true
		}
	}
407
	// Create cancel channel for aborting mid-flight and mark the master peer
408 409
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
410
	d.cancelPeer = id
411 412
	d.cancelLock.Unlock()

413
	defer d.Cancel() // No matter what, we can't leave the cancel channel open
414

415 416
	// Atomically set the requested sync mode
	atomic.StoreUint32(&d.mode, uint32(mode))
417

418
	// Retrieve the origin peer and initiate the downloading process
419
	p := d.peers.Peer(id)
420
	if p == nil {
421
		return errUnknownPeer
422
	}
423
	return d.syncWithPeer(p, hash, td)
424 425
}

426 427 428 429
func (d *Downloader) getMode() SyncMode {
	return SyncMode(atomic.LoadUint32(&d.mode))
}

430 431
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
432
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
433
	d.mux.Post(StartEvent{})
434 435 436
	defer func() {
		// reset on error
		if err != nil {
437 438
			d.mux.Post(FailedEvent{err})
		} else {
439 440
			latest := d.lightchain.CurrentHeader()
			d.mux.Post(DoneEvent{latest})
441 442
		}
	}()
R
rene 已提交
443
	if p.version < 63 {
444 445
		return errTooOld
	}
446
	mode := d.getMode()
447

448
	log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
449
	defer func(start time.Time) {
450
		log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
451
	}(time.Now())
452

453 454 455 456 457 458
	// Look up the sync boundaries: the common ancestor and the target block
	latest, err := d.fetchHeight(p)
	if err != nil {
		return err
	}
	height := latest.Number.Uint64()
459

460
	origin, err := d.findAncestor(p, latest)
461 462 463 464 465 466 467 468 469
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()
470

471
	// Ensure our origin point is below any fast sync pivot point
472
	pivot := uint64(0)
473
	if mode == FastSync {
474 475
		if height <= uint64(fsMinFullBlocks) {
			origin = 0
476
		} else {
477 478
			pivot = height - uint64(fsMinFullBlocks)
			if pivot <= origin {
479 480
				origin = pivot - 1
			}
481 482 483
			// Write out the pivot into the database so a rollback beyond it will
			// reenable fast sync
			rawdb.WriteLastPivotNumber(d.stateDB, pivot)
484
		}
485
	}
486
	d.committed = 1
487
	if mode == FastSync && pivot != 0 {
488 489
		d.committed = 0
	}
490
	if mode == FastSync {
491
		// Set the ancient data limitation.
492 493 494
		// If we are running fast sync, all block data older than ancientLimit will be
		// written to the ancient store. More recent data will be written to the active
		// database and will wait for the freezer to migrate.
495
		//
496 497 498
		// If there is a checkpoint available, then calculate the ancientLimit through
		// that. Otherwise calculate the ancient limit through the advertised height
		// of the remote peer.
499
		//
500 501 502 503 504
		// The reason for picking checkpoint first is that a malicious peer can give us
		// a fake (very high) height, forcing the ancient limit to also be very high.
		// The peer would start to feed us valid blocks until head, resulting in all of
		// the blocks might be written into the ancient store. A following mini-reorg
		// could cause issues.
505
		if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
506
			d.ancientLimit = d.checkpoint
507 508
		} else if height > fullMaxForkAncestry+1 {
			d.ancientLimit = height - fullMaxForkAncestry - 1
509 510
		}
		frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
511

512 513 514 515 516 517 518 519 520 521
		// If a part of blockchain data has already been written into active store,
		// disable the ancient style insertion explicitly.
		if origin >= frozen && frozen != 0 {
			d.ancientLimit = 0
			log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
		} else if d.ancientLimit > 0 {
			log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
		}
		// Rewind the ancient store and blockchain if reorg happens.
		if origin+1 < frozen {
522 523
			if err := d.lightchain.SetHead(origin + 1); err != nil {
				return err
524 525 526
			}
		}
	}
527
	// Initiate the sync using a concurrent header and content retrieval algorithm
528
	d.queue.Prepare(origin+1, mode)
529 530
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
531
	}
532
	fetchers := []func() error{
533 534 535 536
		func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },          // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) },        // Receipts are retrieved during fast sync
		func() error { return d.processHeaders(origin+1, pivot, td) },
537
	}
538
	if mode == FastSync {
539
		fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
540
	} else if mode == FullSync {
541 542
		fetchers = append(fetchers, d.processFullSyncContent)
	}
543
	return d.spawnSync(fetchers)
544 545 546 547
}

// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
548 549
func (d *Downloader) spawnSync(fetchers []func() error) error {
	errc := make(chan error, len(fetchers))
550
	d.cancelWg.Add(len(fetchers))
551 552
	for _, fn := range fetchers {
		fn := fn
553
		go func() { defer d.cancelWg.Done(); errc <- fn() }()
554 555 556
	}
	// Wait for the first error, then terminate the others.
	var err error
557 558
	for i := 0; i < len(fetchers); i++ {
		if i == len(fetchers)-1 {
559 560 561 562 563
			// Close the queue when all fetchers have exited.
			// This will cause the block processor to end when
			// it has processed the queue.
			d.queue.Close()
		}
564
		if err = <-errc; err != nil && err != errCanceled {
565 566 567 568
			break
		}
	}
	d.queue.Close()
569
	d.Cancel()
570
	return err
571 572
}

573 574 575 576
// cancel aborts all of the operations and resets the queue. However, cancel does
// not wait for the running download goroutines to finish. This method should be
// used when cancelling the downloads from inside the downloader.
func (d *Downloader) cancel() {
577
	// Close the current cancel channel
578
	d.cancelLock.Lock()
579 580
	defer d.cancelLock.Unlock()

581 582 583 584 585 586 587
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
588
	}
589 590 591 592 593 594
}

// Cancel aborts all of the operations and waits for all download goroutines to
// finish before returning.
func (d *Downloader) Cancel() {
	d.cancel()
595
	d.cancelWg.Wait()
596 597 598

	d.ancientLimit = 0
	log.Debug("Reset ancient limit to zero")
599 600
}

601
// Terminate interrupts the downloader, canceling all pending operations.
602
// The downloader cannot be reused after calling Terminate.
603
func (d *Downloader) Terminate() {
604 605 606 607 608 609 610
	// Close the termination channel (make sure double close is allowed)
	d.quitLock.Lock()
	select {
	case <-d.quitCh:
	default:
		close(d.quitCh)
	}
611 612 613
	if d.stateBloom != nil {
		d.stateBloom.Close()
	}
614 615 616
	d.quitLock.Unlock()

	// Cancel any pending download requests
617
	d.Cancel()
618 619
}

620 621
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
622
func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
P
Péter Szilágyi 已提交
623
	p.log.Debug("Retrieving remote chain height")
624 625

	// Request the advertised remote head block and wait for the response
626 627
	head, _ := p.peer.Head()
	go p.peer.RequestHeadersByHash(head, 1, 0, false)
628

629 630
	ttl := d.requestTTL()
	timeout := time.After(ttl)
631
	mode := d.getMode()
632 633 634
	for {
		select {
		case <-d.cancelCh:
635
			return nil, errCanceled
636

637
		case packet := <-d.headerCh:
638
			// Discard anything not from the origin peer
639
			if packet.PeerId() != p.id {
640
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
641 642 643
				break
			}
			// Make sure the peer actually gave something valid
644
			headers := packet.(*headerPack).headers
645
			if len(headers) != 1 {
646
				p.log.Warn("Multiple headers for single request", "headers", len(headers))
647
				return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
648
			}
649
			head := headers[0]
650
			if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
651 652 653
				p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash())
				return nil, errUnsyncedPeer
			}
P
Péter Szilágyi 已提交
654
			p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
655
			return head, nil
656

657
		case <-timeout:
P
Péter Szilágyi 已提交
658
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
659
			return nil, errTimeout
660

661
		case <-d.bodyCh:
662 663
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
664 665 666 667
		}
	}
}

668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690
// calculateRequestSpan calculates what headers to request from a peer when trying to determine the
// common ancestor.
// It returns parameters to be used for peer.RequestHeadersByNumber:
//  from - starting block number
//  count - number of headers to request
//  skip - number of headers to skip
// and also returns 'max', the last block which is expected to be returned by the remote peers,
// given the (from,count,skip)
func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) {
	var (
		from     int
		count    int
		MaxCount = MaxHeaderFetch / 16
	)
	// requestHead is the highest block that we will ask for. If requestHead is not offset,
	// the highest block that we will get is 16 blocks back from head, which means we
	// will fetch 14 or 15 blocks unnecessarily in the case the height difference
	// between us and the peer is 1-2 blocks, which is most common
	requestHead := int(remoteHeight) - 1
	if requestHead < 0 {
		requestHead = 0
	}
	// requestBottom is the lowest block we want included in the query
691
	// Ideally, we want to include the one just below our own head
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
	requestBottom := int(localHeight - 1)
	if requestBottom < 0 {
		requestBottom = 0
	}
	totalSpan := requestHead - requestBottom
	span := 1 + totalSpan/MaxCount
	if span < 2 {
		span = 2
	}
	if span > 16 {
		span = 16
	}

	count = 1 + totalSpan/span
	if count > MaxCount {
		count = MaxCount
	}
	if count < 2 {
		count = 2
	}
	from = requestHead - (count-1)*span
	if from < 0 {
		from = 0
	}
	max := from + (count-1)*span
	return int64(from), count, span - 1, uint64(max)
}

720
// findAncestor tries to locate the common ancestor link of the local chain and
721
// a remote peers blockchain. In the general case when our node was in sync and
722
// on the correct chain, checking the top N links should already get us a match.
723
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
724
// the head links match), we do a binary search to find the common ancestor.
725
func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
726
	// Figure out the valid ancestor range to prevent rewrite attacks
727 728 729 730 731
	var (
		floor        = int64(-1)
		localHeight  uint64
		remoteHeight = remoteHeader.Number.Uint64()
	)
732 733
	mode := d.getMode()
	switch mode {
734 735 736 737 738 739
	case FullSync:
		localHeight = d.blockchain.CurrentBlock().NumberU64()
	case FastSync:
		localHeight = d.blockchain.CurrentFastBlock().NumberU64()
	default:
		localHeight = d.lightchain.CurrentHeader().Number.Uint64()
740
	}
741
	p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
742 743

	// Recap floor value for binary search
744 745 746 747
	maxForkAncestry := fullMaxForkAncestry
	if d.getMode() == LightSync {
		maxForkAncestry = lightMaxForkAncestry
	}
748
	if localHeight >= maxForkAncestry {
749
		// We're above the max reorg threshold, find the earliest fork point
750
		floor = int64(localHeight - maxForkAncestry)
751 752 753
	}
	// If we're doing a light sync, ensure the floor doesn't go below the CHT, as
	// all headers before that point will be missing.
754
	if mode == LightSync {
755
		// If we don't know the current CHT position, find it
756 757 758 759 760 761
		if d.genesis == 0 {
			header := d.lightchain.CurrentHeader()
			for header != nil {
				d.genesis = header.Number.Uint64()
				if floor >= int64(d.genesis)-1 {
					break
762
				}
763
				header = d.lightchain.GetHeaderByHash(header.ParentHash)
764
			}
765 766 767 768
		}
		// We already know the "genesis" block number, cap floor to that
		if floor < int64(d.genesis)-1 {
			floor = int64(d.genesis) - 1
769
		}
770
	}
771

772
	from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
773 774

	p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip)
775
	go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)
776 777 778

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
779 780 781

	ttl := d.requestTTL()
	timeout := time.After(ttl)
782 783 784 785

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
786
			return 0, errCanceled
787

788
		case packet := <-d.headerCh:
789
			// Discard anything not from the origin peer
790
			if packet.PeerId() != p.id {
791
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
792 793 794
				break
			}
			// Make sure the peer actually gave something valid
795
			headers := packet.(*headerPack).headers
796
			if len(headers) == 0 {
P
Péter Szilágyi 已提交
797
				p.log.Warn("Empty head header set")
798 799
				return 0, errEmptyHeaderSet
			}
800
			// Make sure the peer's reply conforms to the request
801
			for i, header := range headers {
802
				expectNumber := from + int64(i)*int64(skip+1)
803 804
				if number := header.Number.Int64(); number != expectNumber {
					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
805
					return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering"))
806 807
				}
			}
808 809 810
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
811
				// Skip any headers that underflow/overflow our requested set
812
				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max {
813 814 815
					continue
				}
				// Otherwise check if we already know the header or not
816 817
				h := headers[i].Hash()
				n := headers[i].Number.Uint64()
818 819

				var known bool
820
				switch mode {
821 822 823 824 825 826 827 828
				case FullSync:
					known = d.blockchain.HasBlock(h, n)
				case FastSync:
					known = d.blockchain.HasFastBlock(h, n)
				default:
					known = d.lightchain.HasHeader(h, n)
				}
				if known {
829
					number, hash = n, h
830 831 832 833
					break
				}
			}

834
		case <-timeout:
P
Péter Szilágyi 已提交
835
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
836 837
			return 0, errTimeout

838
		case <-d.bodyCh:
839 840
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
841 842 843
		}
	}
	// If the head fetch already found an ancestor, return
844
	if hash != (common.Hash{}) {
845
		if int64(number) <= floor {
P
Péter Szilágyi 已提交
846
			p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
847 848
			return 0, errInvalidAncestor
		}
P
Péter Szilágyi 已提交
849
		p.log.Debug("Found common ancestor", "number", number, "hash", hash)
850 851 852
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
853
	start, end := uint64(0), remoteHeight
854 855 856
	if floor > 0 {
		start = uint64(floor)
	}
857 858
	p.log.Trace("Binary searching for common ancestor", "start", start, "end", end)

859 860 861 862
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

863 864 865
		ttl := d.requestTTL()
		timeout := time.After(ttl)

866
		go p.peer.RequestHeadersByNumber(check, 1, 0, false)
867 868 869 870 871

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
872
				return 0, errCanceled
873

874
			case packer := <-d.headerCh:
875
				// Discard anything not from the origin peer
876
				if packer.PeerId() != p.id {
877
					log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
878 879 880
					break
				}
				// Make sure the peer actually gave something valid
881
				headers := packer.(*headerPack).headers
882
				if len(headers) != 1 {
883
					p.log.Warn("Multiple headers for single request", "headers", len(headers))
884
					return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
885 886 887 888
				}
				arrived = true

				// Modify the search interval based on the response
889 890
				h := headers[0].Hash()
				n := headers[0].Number.Uint64()
891 892

				var known bool
893
				switch mode {
894 895 896 897 898 899 900 901
				case FullSync:
					known = d.blockchain.HasBlock(h, n)
				case FastSync:
					known = d.blockchain.HasFastBlock(h, n)
				default:
					known = d.lightchain.HasHeader(h, n)
				}
				if !known {
902 903 904
					end = check
					break
				}
905
				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
906
				if header.Number.Uint64() != check {
907
					p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
908
					return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
909 910
				}
				start = check
911
				hash = h
912

913
			case <-timeout:
P
Péter Szilágyi 已提交
914
				p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
915 916
				return 0, errTimeout

917
			case <-d.bodyCh:
918 919
			case <-d.receiptCh:
				// Out of bounds delivery, ignore
920 921 922
			}
		}
	}
923 924
	// Ensure valid ancestry and return
	if int64(start) <= floor {
P
Péter Szilágyi 已提交
925
		p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
926 927
		return 0, errInvalidAncestor
	}
P
Péter Szilágyi 已提交
928
	p.log.Debug("Found common ancestor", "number", start, "hash", hash)
929 930 931
	return start, nil
}

932 933 934 935 936
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
937
// other peers are only accepted if they map cleanly to the skeleton. If no one
938 939
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
940
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
P
Péter Szilágyi 已提交
941 942
	p.log.Debug("Directing header downloads", "origin", from)
	defer p.log.Debug("Header download terminated")
943

944 945 946
	// Create a timeout timer, and the associated header fetcher
	skeleton := true            // Skeleton assembly phase or finishing up
	request := time.Now()       // time of the last skeleton fetch request
947 948 949 950
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

951
	var ttl time.Duration
952
	getHeaders := func(from uint64) {
953
		request = time.Now()
954 955 956

		ttl = d.requestTTL()
		timeout.Reset(ttl)
957

958
		if skeleton {
P
Péter Szilágyi 已提交
959
			p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
960
			go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
961
		} else {
P
Péter Szilágyi 已提交
962
			p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
963
			go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
964
		}
965
	}
966
	// Start pulling the header chain skeleton until all is done
967
	ancestor := from
968 969
	getHeaders(from)

970
	mode := d.getMode()
971 972 973
	for {
		select {
		case <-d.cancelCh:
974
			return errCanceled
975

976
		case packet := <-d.headerCh:
977
			// Make sure the active peer is giving us the skeleton headers
978
			if packet.PeerId() != p.id {
979
				log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
980 981
				break
			}
982
			headerReqTimer.UpdateSince(request)
983 984
			timeout.Stop()

985 986 987 988 989 990
			// If the skeleton's finished, pull any remaining head headers directly from the origin
			if packet.Items() == 0 && skeleton {
				skeleton = false
				getHeaders(from)
				continue
			}
991
			// If no more headers are inbound, notify the content fetchers and return
992
			if packet.Items() == 0 {
993 994 995 996 997 998 999 1000
				// Don't abort header fetches while the pivot is downloading
				if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
					p.log.Debug("No headers, waiting for pivot commit")
					select {
					case <-time.After(fsHeaderContCheck):
						getHeaders(from)
						continue
					case <-d.cancelCh:
1001
						return errCanceled
1002 1003 1004
					}
				}
				// Pivot done (or not in fast sync) and no more headers, terminate the process
P
Péter Szilágyi 已提交
1005
				p.log.Debug("No more headers available")
1006 1007 1008 1009
				select {
				case d.headerProcCh <- nil:
					return nil
				case <-d.cancelCh:
1010
					return errCanceled
1011
				}
1012
			}
1013
			headers := packet.(*headerPack).headers
1014

1015 1016
			// If we received a skeleton batch, resolve internals concurrently
			if skeleton {
1017
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
1018
				if err != nil {
P
Péter Szilágyi 已提交
1019
					p.log.Debug("Skeleton chain invalid", "err", err)
1020
					return fmt.Errorf("%w: %v", errInvalidChain, err)
1021
				}
1022 1023
				headers = filled[proced:]
				from += uint64(proced)
1024 1025 1026 1027 1028 1029
			} else {
				// If we're closing in on the chain head, but haven't yet reached it, delay
				// the last few headers so mini reorgs on the head don't cause invalid hash
				// chain errors.
				if n := len(headers); n > 0 {
					// Retrieve the current head we're at
1030
					var head uint64
1031
					if mode == LightSync {
1032 1033 1034 1035 1036 1037 1038
						head = d.lightchain.CurrentHeader().Number.Uint64()
					} else {
						head = d.blockchain.CurrentFastBlock().NumberU64()
						if full := d.blockchain.CurrentBlock().NumberU64(); head < full {
							head = full
						}
					}
1039 1040 1041 1042 1043 1044
					// If the head is below the common ancestor, we're actually deduplicating
					// already existing chain segments, so use the ancestor as the fake head.
					// Otherwise we might end up delaying header deliveries pointlessly.
					if head < ancestor {
						head = ancestor
					}
1045 1046 1047 1048 1049 1050 1051 1052 1053
					// If the head is way older than this batch, delay the last few headers
					if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
						delay := reorgProtHeaderDelay
						if delay > n {
							delay = n
						}
						headers = headers[:n-delay]
					}
				}
1054
			}
1055
			// Insert all the new headers and fetch the next batch
1056
			if len(headers) > 0 {
P
Péter Szilágyi 已提交
1057
				p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
1058 1059 1060
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
1061
					return errCanceled
1062 1063
				}
				from += uint64(len(headers))
1064 1065 1066 1067 1068 1069 1070 1071 1072
				getHeaders(from)
			} else {
				// No headers delivered, or all of them being delayed, sleep a bit and retry
				p.log.Trace("All headers delayed, waiting")
				select {
				case <-time.After(fsHeaderContCheck):
					getHeaders(from)
					continue
				case <-d.cancelCh:
1073
					return errCanceled
1074
				}
1075
			}
1076 1077

		case <-timeout.C:
1078 1079 1080 1081 1082 1083
			if d.dropPeer == nil {
				// The dropPeer method is nil when `--copydb` is used for a local copy.
				// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
				p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id)
				break
			}
1084
			// Header retrieval timed out, consider the peer bad and drop
P
Péter Szilágyi 已提交
1085
			p.log.Debug("Header request timed out", "elapsed", ttl)
1086
			headerTimeoutMeter.Mark(1)
1087 1088 1089
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
1090
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1091 1092 1093 1094
				select {
				case ch <- false:
				case <-d.cancelCh:
				}
1095
			}
1096 1097 1098 1099
			select {
			case d.headerProcCh <- nil:
			case <-d.cancelCh:
			}
1100
			return fmt.Errorf("%w: header request timed out", errBadPeer)
1101 1102 1103 1104
		}
	}
}

1105 1106
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
1107 1108 1109 1110 1111
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
Y
Yusup 已提交
1112
// The method returns the entire filled skeleton and also the number of headers
1113 1114
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
1115
	log.Debug("Filling up skeleton", "from", from)
1116 1117 1118 1119 1120
	d.queue.ScheduleSkeleton(from, skeleton)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
1121
			return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
1122
		}
1123 1124 1125
		expire  = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
		reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
			return d.queue.ReserveHeaders(p, count), false, false
1126
		}
1127 1128
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
		capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
1129 1130 1131
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
			p.SetHeadersIdle(accepted, deliveryTime)
		}
1132
	)
1133
	err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
1134
		d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
1135
		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
1136

1137
	log.Debug("Skeleton fill terminated", "err", err)
1138 1139 1140

	filled, proced := d.queue.RetrieveHeaders()
	return filled, proced, err
1141 1142
}

1143 1144 1145 1146
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
1147
	log.Debug("Downloading block bodies", "origin", from)
1148

1149
	var (
1150
		deliver = func(packet dataPack) (int, error) {
1151
			pack := packet.(*bodyPack)
1152
			return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
1153
		}
1154
		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
1155 1156
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
1157
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
1158
	)
1159
	err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
1160
		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
1161
		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
1162

1163
	log.Debug("Block body download terminated", "err", err)
1164 1165 1166 1167 1168 1169 1170
	return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
1171
	log.Debug("Downloading transaction receipts", "origin", from)
1172 1173

	var (
1174
		deliver = func(packet dataPack) (int, error) {
1175
			pack := packet.(*receiptPack)
1176
			return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
1177
		}
1178
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
1179 1180
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
1181 1182 1183
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
			p.SetReceiptsIdle(accepted, deliveryTime)
		}
1184
	)
1185
	err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
1186
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
1187
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
1188

1189
	log.Debug("Transaction receipt download terminated", "err", err)
1190 1191 1192 1193 1194 1195
	return err
}

// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
//  - errCancel:   error type to return if the fetch operation is cancelled (mostly makes logging nicer)
//  - deliveryCh:  channel from which to retrieve downloaded data packets (merged from all concurrent peers)
//  - deliver:     processing callback to deliver data packets into type specific download queues (usually within `queue`)
//  - wakeCh:      notification channel for waking the fetcher when new tasks are available (or sync completed)
//  - expire:      task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
//  - pending:     task callback for the number of requests still needing download (detect completion/non-completability)
//  - inFlight:    task callback for the number of in-progress requests (wait for all active downloads to finish)
//  - throttle:    task callback to check if the processing queue is full and activate throttling (bound memory use)
//  - reserve:     task callback to reserve new download tasks to a particular peer (also signals partial completions)
//  - fetchHook:   tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
//  - fetch:       network callback to actually send a particular download request to a physical remote peer
//  - cancel:      task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
P
Péter Szilágyi 已提交
1214
//  - capacity:    network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
1215 1216
//  - idle:        network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
//  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
1217
//  - kind:        textual label of the type being downloaded to display in log messages
1218
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1219
	expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
1220
	fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
1221
	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
1222

1223
	// Create a ticker to detect expired retrieval tasks
1224 1225 1226 1227 1228
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

1229
	// Prepare the queue and fetch block parts until the block header fetcher's done
1230 1231 1232 1233
	finished := false
	for {
		select {
		case <-d.cancelCh:
1234
			return errCanceled
1235

1236
		case packet := <-deliveryCh:
1237
			deliveryTime := time.Now()
1238 1239
			// If the peer was previously banned and failed to deliver its pack
			// in a reasonable time frame, ignore its message.
1240
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
1241 1242
				// Deliver the received chunk of data and check chain validity
				accepted, err := deliver(packet)
1243
				if errors.Is(err, errInvalidChain) {
1244
					return err
1245 1246 1247 1248
				}
				// Unless a peer delivered something completely else than requested (usually
				// caused by a timed out request which came through in the end), set it to
				// idle. If the delivery's stale, the peer should have already been idled.
1249
				if !errors.Is(err, errStaleDelivery) {
1250
					setIdle(peer, accepted, deliveryTime)
1251 1252 1253 1254
				}
				// Issue a log to the user to see what's going on
				switch {
				case err == nil && packet.Items() == 0:
P
Péter Szilágyi 已提交
1255
					peer.log.Trace("Requested data not delivered", "type", kind)
1256
				case err == nil:
P
Péter Szilágyi 已提交
1257
					peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
1258
				default:
P
Péter Szilágyi 已提交
1259
					peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
1260 1261 1262 1263 1264 1265 1266 1267
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

1268
		case cont := <-wakeCh:
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
1291
			// Check for fetch request timeouts and demote the responsible peers
1292
			for pid, fails := range expire() {
1293
				if peer := d.peers.Peer(pid); peer != nil {
1294 1295 1296 1297 1298 1299 1300 1301
					// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
					// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
					// out that sync wise we need to get rid of the peer.
					//
					// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
					// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
					if fails > 2 {
P
Péter Szilágyi 已提交
1302
						peer.log.Trace("Data delivery timed out", "type", kind)
1303
						setIdle(peer, 0, time.Now())
1304
					} else {
P
Péter Szilágyi 已提交
1305
						peer.log.Debug("Stalling delivery, dropping", "type", kind)
1306

1307 1308 1309 1310 1311 1312
						if d.dropPeer == nil {
							// The dropPeer method is nil when `--copydb` is used for a local copy.
							// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
							peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
						} else {
							d.dropPeer(pid)
1313 1314 1315 1316 1317 1318 1319 1320 1321 1322

							// If this peer was the master peer, abort sync immediately
							d.cancelLock.RLock()
							master := pid == d.cancelPeer
							d.cancelLock.RUnlock()

							if master {
								d.cancel()
								return errTimeout
							}
1323
						}
1324
					}
1325 1326
				}
			}
1327 1328
			// If there's nothing more to fetch, wait or terminate
			if pending() == 0 {
1329
				if !inFlight() && finished {
1330
					log.Debug("Data fetching completed", "type", kind)
1331 1332 1333 1334 1335
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
1336
			progressed, throttled, running := false, false, inFlight()
1337
			idles, total := idle()
1338
			pendCount := pending()
1339
			for _, peer := range idles {
1340
				// Short circuit if throttling activated
1341
				if throttled {
1342 1343
					break
				}
1344
				// Short circuit if there is no more available task.
1345
				if pendCount = pending(); pendCount == 0 {
1346 1347
					break
				}
1348 1349
				// Reserve a chunk of fetches for a peer. A nil can mean either that
				// no more headers are available, or that the peer is known not to
1350
				// have them.
1351
				request, progress, throttle := reserve(peer, capacity(peer))
1352 1353
				if progress {
					progressed = true
1354
				}
1355 1356 1357 1358
				if throttle {
					throttled = true
					throttleCounter.Inc(1)
				}
1359 1360 1361
				if request == nil {
					continue
				}
1362
				if request.From > 0 {
P
Péter Szilágyi 已提交
1363
					peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
1364
				} else {
1365
					peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
1366
				}
1367
				// Fetch the chunk and make sure any errors return the hashes to the queue
1368 1369
				if fetchHook != nil {
					fetchHook(request.Headers)
1370
				}
1371
				if err := fetch(peer, request); err != nil {
1372 1373 1374 1375 1376
					// Although we could try and make an attempt to fix this, this error really
					// means that we've double allocated a fetch task to a peer. If that is the
					// case, the internal state of the downloader and the queue is very wrong so
					// better hard crash and note the error instead of silently accumulating into
					// a much bigger issue.
1377
					panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
1378
				}
1379
				running = true
1380 1381 1382
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
1383
			if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
1384 1385 1386 1387 1388 1389
				return errPeersUnavailable
			}
		}
	}
}

1390 1391 1392
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
1393
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
1394
	// Keep a count of uncertain headers to roll back
1395
	var (
1396
		rollback    uint64 // Zero means no rollback (fine as you can't unroll the genesis)
1397 1398 1399
		rollbackErr error
		mode        = d.getMode()
	)
1400
	defer func() {
1401
		if rollback > 0 {
1402
			lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
1403
			if mode != LightSync {
1404 1405
				lastFastBlock = d.blockchain.CurrentFastBlock().Number()
				lastBlock = d.blockchain.CurrentBlock().Number()
1406
			}
1407 1408 1409 1410
			if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
				// We're already unwinding the stack, only print the error to make it more visible
				log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
			}
1411
			curFastBlock, curBlock := common.Big0, common.Big0
1412
			if mode != LightSync {
1413 1414
				curFastBlock = d.blockchain.CurrentFastBlock().Number()
				curBlock = d.blockchain.CurrentBlock().Number()
1415
			}
1416
			log.Warn("Rolled back chain segment",
1417
				"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
1418
				"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
1419
				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
1420 1421 1422 1423 1424 1425 1426 1427
		}
	}()
	// Wait for batches of headers to process
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
1428
			rollbackErr = errCanceled
1429
			return errCanceled
1430 1431 1432 1433 1434

		case headers := <-d.headerProcCh:
			// Terminate header processing if we synced up
			if len(headers) == 0 {
				// Notify everyone that headers are fully processed
1435
				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1436 1437 1438 1439 1440
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
				}
1441 1442
				// If no headers were retrieved at all, the peer violated its TD promise that it had a
				// better chain compared to ours. The only exception is if its promised blocks were
1443
				// already imported by other means (e.g. fetcher):
1444 1445 1446 1447 1448 1449 1450 1451 1452
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
1453
				if mode != LightSync {
1454 1455
					head := d.blockchain.CurrentBlock()
					if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
1456 1457
						return errStallingPeer
					}
1458 1459 1460 1461 1462 1463 1464 1465
				}
				// If fast or light syncing, ensure promised headers are indeed delivered. This is
				// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
				// of delivering the post-pivot blocks that would flag the invalid content.
				//
				// This check cannot be executed "as is" for full imports, since blocks may still be
				// queued for processing when the header download completes. However, as long as the
				// peer gave us something useful, we're already happy/progressed (above check).
1466
				if mode == FastSync || mode == LightSync {
1467 1468
					head := d.lightchain.CurrentHeader()
					if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
1469 1470 1471 1472
						return errStallingPeer
					}
				}
				// Disable any rollback and return
1473
				rollback = 0
1474 1475 1476 1477 1478 1479 1480 1481
				return nil
			}
			// Otherwise split the chunk of headers into batches and process them
			gotHeaders = true
			for len(headers) > 0 {
				// Terminate if something failed in between processing chunks
				select {
				case <-d.cancelCh:
1482
					rollbackErr = errCanceled
1483
					return errCanceled
1484 1485 1486 1487 1488 1489 1490 1491
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]
1492

1493
				// In case of header only syncing, validate the chunk immediately
1494
				if mode == FastSync || mode == LightSync {
1495 1496 1497 1498 1499
					// If we're importing pure headers, verify based on their recentness
					frequency := fsHeaderCheckFrequency
					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
						frequency = 1
					}
1500
					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1501
						rollbackErr = err
1502 1503 1504 1505

						// If some headers were inserted, track them as uncertain
						if n > 0 && rollback == 0 {
							rollback = chunk[0].Number.Uint64()
1506
						}
1507
						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
1508
						return fmt.Errorf("%w: %v", errInvalidChain, err)
1509
					}
1510 1511 1512 1513
					// All verifications passed, track all headers within the alloted limits
					head := chunk[len(chunk)-1].Number.Uint64()
					if head-rollback > uint64(fsHeaderSafetyNet) {
						rollback = head - uint64(fsHeaderSafetyNet)
1514 1515
					} else {
						rollback = 1
1516 1517 1518
					}
				}
				// Unless we're doing light chains, schedule the headers for associated content retrieval
1519
				if mode == FullSync || mode == FastSync {
1520 1521 1522 1523
					// If we've reached the allowed number of pending headers, stall a bit
					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
						select {
						case <-d.cancelCh:
1524
							rollbackErr = errCanceled
1525
							return errCanceled
1526 1527 1528 1529 1530 1531
						case <-time.After(time.Second):
						}
					}
					// Otherwise insert the headers for content retrieval
					inserts := d.queue.Schedule(chunk, origin)
					if len(inserts) != len(chunk) {
1532
						rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
1533
						return fmt.Errorf("%w: stale headers", errBadPeer)
1534 1535 1536 1537 1538
					}
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
1539 1540 1541 1542 1543 1544 1545
			// Update the highest block number we know if a higher one is found.
			d.syncStatsLock.Lock()
			if d.syncStatsChainHeight < origin {
				d.syncStatsChainHeight = origin - 1
			}
			d.syncStatsLock.Unlock()

1546
			// Signal the content downloaders of the availablility of new tasks
1547
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1548 1549 1550 1551 1552 1553 1554 1555 1556
				select {
				case ch <- true:
				default:
				}
			}
		}
	}
}

1557 1558
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
1559
	for {
1560
		results := d.queue.Results(true)
1561
		if len(results) == 0 {
1562
			return nil
1563
		}
1564
		if d.chainInsertHook != nil {
1565
			d.chainInsertHook(results)
1566
		}
1567 1568 1569 1570 1571 1572 1573
		if err := d.importBlockResults(results); err != nil {
			return err
		}
	}
}

func (d *Downloader) importBlockResults(results []*fetchResult) error {
1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting downloaded chain", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnum", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	}
	if index, err := d.blockchain.InsertChain(blocks); err != nil {
1594 1595 1596 1597 1598 1599 1600 1601 1602
		if index < len(results) {
			log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
		} else {
			// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
			// when it needs to preprocess blocks to import a sidechain.
			// The importer will put together a new list of blocks to import, which is a superset
			// of the blocks delivered from the downloader, and the indexing will be off.
			log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
		}
1603
		return fmt.Errorf("%w: %v", errInvalidChain, err)
1604 1605 1606 1607 1608 1609 1610
	}
	return nil
}

// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
1611 1612
	// Start syncing state of the reported head block. This should get us most of
	// the state of the pivot block.
1613
	sync := d.syncState(latest.Root)
1614 1615 1616 1617 1618 1619 1620
	defer func() {
		// The `sync` object is replaced every time the pivot moves. We need to
		// defer close the very last active one, hence the lazy evaluation vs.
		// calling defer sync.Cancel() !!!
		sync.Cancel()
	}()

1621 1622
	closeOnErr := func(s *stateSync) {
		if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled {
1623
			d.queue.Close() // wake up Results
1624
		}
1625 1626
	}
	go closeOnErr(sync)
1627

1628 1629 1630 1631 1632 1633 1634
	// Figure out the ideal pivot block. Note, that this goalpost may move if the
	// sync takes long enough for the chain head to move significantly.
	pivot := uint64(0)
	if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
		pivot = height - uint64(fsMinFullBlocks)
	}
	// To cater for moving pivot points, track the pivot block and subsequently
Y
Yusup 已提交
1635
	// accumulated download results separately.
1636 1637 1638 1639
	var (
		oldPivot *fetchResult   // Locked in pivot block, might change eventually
		oldTail  []*fetchResult // Downloaded content after the pivot
	)
1640
	for {
1641 1642 1643
		// Wait for the next batch of downloaded data to be available, and if the pivot
		// block became stale, move the goalpost
		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
1644
		if len(results) == 0 {
1645 1646
			// If pivot sync is done, stop
			if oldPivot == nil {
1647
				return sync.Cancel()
1648 1649 1650 1651
			}
			// If sync failed, stop
			select {
			case <-d.cancelCh:
1652
				sync.Cancel()
1653
				return errCanceled
1654 1655
			default:
			}
1656 1657 1658 1659
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
1660 1661 1662 1663 1664 1665 1666 1667 1668
		if oldPivot != nil {
			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
		}
		// Split around the pivot block and process the two sides via fast/full sync
		if atomic.LoadInt32(&d.committed) == 0 {
			latest = results[len(results)-1].Header
			if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
				log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
				pivot = height - uint64(fsMinFullBlocks)
1669 1670 1671 1672

				// Write out the pivot into the database so a rollback beyond it will
				// reenable fast sync
				rawdb.WriteLastPivotNumber(d.stateDB, pivot)
1673 1674
			}
		}
1675
		P, beforeP, afterP := splitAroundPivot(pivot, results)
1676
		if err := d.commitFastSyncData(beforeP, sync); err != nil {
1677 1678 1679
			return err
		}
		if P != nil {
1680 1681
			// If new pivot block found, cancel old state retrieval and restart
			if oldPivot != P {
1682 1683
				sync.Cancel()
				sync = d.syncState(P.Header.Root)
1684

1685
				go closeOnErr(sync)
1686 1687 1688 1689
				oldPivot = P
			}
			// Wait for completion, occasionally checking for pivot staleness
			select {
1690 1691 1692
			case <-sync.done:
				if sync.err != nil {
					return sync.err
1693 1694 1695 1696 1697 1698 1699 1700 1701
				}
				if err := d.commitPivotBlock(P); err != nil {
					return err
				}
				oldPivot = nil

			case <-time.After(time.Second):
				oldTail = afterP
				continue
1702
			}
1703
		}
1704
		// Fast sync done, pivot commit done, full import
1705 1706 1707 1708 1709 1710 1711
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
1712 1713 1714 1715 1716 1717 1718 1719
	if len(results) == 0 {
		return nil, nil, nil
	}
	if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
		// the pivot is somewhere in the future
		return nil, results, nil
	}
	// This can also be optimized, but only happens very seldom
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734
	for _, result := range results {
		num := result.Header.Number.Uint64()
		switch {
		case num < pivot:
			before = append(before, result)
		case num == pivot:
			p = result
		default:
			after = append(after, result)
		}
	}
	return p, before, after
}

func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	case <-stateSync.done:
		if err := stateSync.Wait(); err != nil {
			return err
1745
		}
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting fast-sync blocks", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnumn", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	receipts := make([]types.Receipts, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
		receipts[i] = result.Receipts
	}
1760
	if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
1761
		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
1762
		return fmt.Errorf("%w: %v", errInvalidChain, err)
1763 1764 1765 1766 1767
	}
	return nil
}

func (d *Downloader) commitPivotBlock(result *fetchResult) error {
1768 1769
	block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
1770 1771

	// Commit the pivot block as the new head, will require full sync from here on
1772
	if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil {
1773 1774
		return err
	}
1775
	if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
1776
		return err
1777
	}
1778
	atomic.StoreInt32(&d.committed, 1)
1779 1780 1781 1782 1783 1784 1785 1786 1787

	// If we had a bloom filter for the state sync, deallocate it now. Note, we only
	// deallocate internally, but keep the empty wrapper. This ensures that if we do
	// a rollback after committing the pivot and restarting fast sync, we don't end
	// up using a nil bloom. Empty bloom is fine, it just returns that it does not
	// have the info we need, so reach down to the database instead.
	if d.stateBloom != nil {
		d.stateBloom.Close()
	}
1788
	return nil
1789 1790
}

L
Leif Jurvetson 已提交
1791
// DeliverHeaders injects a new batch of block headers received from a remote
1792
// node into the download schedule.
1793
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
1794
	return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
1795 1796 1797
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
1798
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
1799
	return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
1800 1801 1802 1803
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
1804 1805 1806 1807 1808 1809 1810 1811 1812 1813
	return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}

// DeliverNodeData injects a new batch of node state data received from a remote node.
func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
	return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}

// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
1814
	// Update the delivery metrics for both good and failed deliveries
1815
	inMeter.Mark(int64(packet.Items()))
1816 1817
	defer func() {
		if err != nil {
1818
			dropMeter.Mark(int64(packet.Items()))
1819 1820 1821 1822 1823 1824
		}
	}()
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1825 1826 1827
	if cancel == nil {
		return errNoSyncActive
	}
1828
	select {
1829
	case destCh <- packet:
1830 1831 1832 1833
		return nil
	case <-cancel:
		return errNoSyncActive
	}
1834
}
1835 1836 1837 1838 1839 1840

// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
	for {
		// Retrieve the current median RTT and integrate into the previoust target RTT
1841
		rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
1842 1843 1844 1845 1846 1847 1848 1849
		atomic.StoreUint64(&d.rttEstimate, uint64(rtt))

		// A new RTT cycle passed, increase our confidence in the estimated RTT
		conf := atomic.LoadUint64(&d.rttConfidence)
		conf = conf + (1000000-conf)/2
		atomic.StoreUint64(&d.rttConfidence, conf)

		// Log the new QoS values and sleep until the next RTT
1850
		log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863
		select {
		case <-d.quitCh:
			return
		case <-time.After(rtt):
		}
	}
}

// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
	// If we have a single peer, confidence is always 1
	peers := uint64(d.peers.Len())
1864 1865 1866 1867
	if peers == 0 {
		// Ensure peer connectivity races don't catch us off guard
		return
	}
1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883
	if peers == 1 {
		atomic.StoreUint64(&d.rttConfidence, 1000000)
		return
	}
	// If we have a ton of peers, don't drop confidence)
	if peers >= uint64(qosConfidenceCap) {
		return
	}
	// Otherwise drop the confidence factor
	conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
	if float64(conf)/1000000 < rttMinConfidence {
		conf = uint64(rttMinConfidence * 1000000)
	}
	atomic.StoreUint64(&d.rttConfidence, conf)

	rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
1884
	log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
1885 1886 1887 1888 1889 1890 1891
}

// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
Y
Yusup 已提交
1892
// be adapted to, but smaller ones are preferred (stabler download stream).
1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909
func (d *Downloader) requestRTT() time.Duration {
	return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}

// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
	var (
		rtt  = time.Duration(atomic.LoadUint64(&d.rttEstimate))
		conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
	)
	ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
	if ttl > ttlLimit {
		ttl = ttlLimit
	}
	return ttl
}