downloader.go 60.4 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"errors"
22
	"fmt"
23
	"math/big"
24 25 26 27
	"sync"
	"sync/atomic"
	"time"

28
	ethereum "github.com/ethereum/go-ethereum"
29
	"github.com/ethereum/go-ethereum/common"
30
	"github.com/ethereum/go-ethereum/core/rawdb"
31
	"github.com/ethereum/go-ethereum/core/types"
32
	"github.com/ethereum/go-ethereum/ethdb"
33
	"github.com/ethereum/go-ethereum/event"
34
	"github.com/ethereum/go-ethereum/log"
35
	"github.com/ethereum/go-ethereum/metrics"
36
	"github.com/ethereum/go-ethereum/params"
37 38
)

39
var (
40 41 42
	MaxHashFetch    = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
43
	MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
44 45
	MaxBodyFetch    = 128 // Amount of block bodies to be fetched per retrieval request
	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
46
	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
47

48 49
	MaxForkAncestry  = 3 * params.EpochDuration // Maximum chain reorganisation
	rttMinEstimate   = 2 * time.Second          // Minimum round-trip time to target for download requests
Y
Yusup 已提交
50
	rttMaxEstimate   = 20 * time.Second         // Maximum round-trip time to target for download requests
51 52 53
	rttMinConfidence = 0.1                      // Worse confidence factor in our estimated RTT value
	ttlScaling       = 3                        // Constant scaling factor for RTT -> TTL conversion
	ttlLimit         = time.Minute              // Maximum TTL allowance to prevent reaching crazy timeouts
54 55 56 57

	qosTuningPeers   = 5    // Number of peers to tune based on (best peers)
	qosConfidenceCap = 10   // Number of peers above which not to modify RTT confidence
	qosTuningImpact  = 0.25 // Impact that a new tuning target has on the previous value
58

59 60
	maxQueuedHeaders  = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
	maxHeadersProcess = 2048      // Number of header download results to import at once into the chain
61
	maxResultsProcess = 2048      // Number of content download results to import at once into the chain
62

63 64 65 66 67
	fsHeaderCheckFrequency = 100             // Verification frequency of the downloaded headers during fast sync
	fsHeaderSafetyNet      = 2048            // Number of headers to discard in case a chain violation is detected
	fsHeaderForceVerify    = 24              // Number of headers to verify before and after the pivot to accept it
	fsHeaderContCheck      = 3 * time.Second // Time interval to check for header continuations during state download
	fsMinFullBlocks        = 64              // Number of blocks to retrieve fully even in fast sync
68
)
69

70
var (
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
	errBusy                    = errors.New("busy")
	errUnknownPeer             = errors.New("peer is unknown or unhealthy")
	errBadPeer                 = errors.New("action from bad peer ignored")
	errStallingPeer            = errors.New("peer is stalling")
	errNoPeers                 = errors.New("no peers to keep download active")
	errTimeout                 = errors.New("timeout")
	errEmptyHeaderSet          = errors.New("empty header set by peer")
	errPeersUnavailable        = errors.New("no peers available or all tried for download")
	errInvalidAncestor         = errors.New("retrieved ancestor is invalid")
	errInvalidChain            = errors.New("retrieved hash chain is invalid")
	errInvalidBlock            = errors.New("retrieved block is invalid")
	errInvalidBody             = errors.New("retrieved block body is invalid")
	errInvalidReceipt          = errors.New("retrieved receipt is invalid")
	errCancelBlockFetch        = errors.New("block download canceled (requested)")
	errCancelHeaderFetch       = errors.New("block header download canceled (requested)")
	errCancelBodyFetch         = errors.New("block body download canceled (requested)")
	errCancelReceiptFetch      = errors.New("receipt download canceled (requested)")
	errCancelStateFetch        = errors.New("state data download canceled (requested)")
	errCancelHeaderProcessing  = errors.New("header processing canceled (requested)")
	errCancelContentProcessing = errors.New("content processing canceled (requested)")
	errNoSyncActive            = errors.New("no sync active")
92
	errTooOld                  = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
93 94
)

95
type Downloader struct {
96 97
	mode SyncMode       // Synchronisation mode defining the strategy used (per sync cycle)
	mux  *event.TypeMux // Event multiplexer to announce sync operation events
98

99 100 101
	queue   *queue   // Scheduler for selecting the hashes to download
	peers   *peerSet // Set of active peers from which download can proceed
	stateDB ethdb.Database
102

103 104
	rttEstimate   uint64 // Round trip time to target for download requests
	rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
105

106
	// Statistics
107 108 109
	syncStatsChainOrigin uint64 // Origin block number where syncing started at
	syncStatsChainHeight uint64 // Highest block number known when syncing started
	syncStatsState       stateSyncStats
110
	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
111

112
	lightchain LightChain
113
	blockchain BlockChain
114

115
	// Callbacks
116
	dropPeer peerDropFn // Drops a peer for misbehaving
117

118
	// Status
119 120 121
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
	notified        int32
122
	committed       int32
123 124

	// Channels
125 126 127 128 129 130
	headerCh      chan dataPack        // [eth/62] Channel receiving inbound block headers
	bodyCh        chan dataPack        // [eth/62] Channel receiving inbound block bodies
	receiptCh     chan dataPack        // [eth/63] Channel receiving inbound receipts
	bodyWakeCh    chan bool            // [eth/62] Channel to signal the block body fetcher of new tasks
	receiptWakeCh chan bool            // [eth/63] Channel to signal the receipt fetcher of new tasks
	headerProcCh  chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
131

132 133 134 135 136
	// for stateFetcher
	stateSyncStart chan *stateSync
	trackStateReq  chan *stateReq
	stateCh        chan dataPack // [eth/63] Channel receiving inbound node state data

137
	// Cancellation and termination
138 139 140 141
	cancelPeer string         // Identifier of the peer currently being used as the master (cancel on drop)
	cancelCh   chan struct{}  // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex   // Lock to protect the cancel channel and peer in delivers
	cancelWg   sync.WaitGroup // Make sure all fetcher goroutines have exited.
142

143 144 145
	quitCh   chan struct{} // Quit channel to signal termination
	quitLock sync.RWMutex  // Lock to prevent double closes

146
	// Testing hooks
147 148 149 150
	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
151 152
}

N
Nick Johnson 已提交
153
// LightChain encapsulates functions required to synchronise a light chain.
154 155
type LightChain interface {
	// HasHeader verifies a header's presence in the local chain.
156
	HasHeader(common.Hash, uint64) bool
157 158 159 160 161 162 163

	// GetHeaderByHash retrieves a header from the local chain.
	GetHeaderByHash(common.Hash) *types.Header

	// CurrentHeader retrieves the head header from the local chain.
	CurrentHeader() *types.Header

164 165
	// GetTd returns the total difficulty of a local block.
	GetTd(common.Hash, uint64) *big.Int
166 167 168 169 170 171 172 173

	// InsertHeaderChain inserts a batch of headers into the local chain.
	InsertHeaderChain([]*types.Header, int) (int, error)

	// Rollback removes a few recently added elements from the local chain.
	Rollback([]common.Hash)
}

N
Nick Johnson 已提交
174
// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
175 176 177
type BlockChain interface {
	LightChain

178 179
	// HasBlock verifies a block's presence in the local chain.
	HasBlock(common.Hash, uint64) bool
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

	// GetBlockByHash retrieves a block from the local chain.
	GetBlockByHash(common.Hash) *types.Block

	// CurrentBlock retrieves the head block from the local chain.
	CurrentBlock() *types.Block

	// CurrentFastBlock retrieves the head fast block from the local chain.
	CurrentFastBlock() *types.Block

	// FastSyncCommitHead directly commits the head block to a certain entity.
	FastSyncCommitHead(common.Hash) error

	// InsertChain inserts a batch of blocks into the local chain.
	InsertChain(types.Blocks) (int, error)

	// InsertReceiptChain inserts a batch of receipts into the local chain.
	InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
}

200
// New creates a new downloader to fetch hashes and blocks from remote peers.
201 202 203 204
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
	if lightchain == nil {
		lightchain = chain
	}
205

206
	dl := &Downloader{
207 208 209 210 211 212 213
		mode:           mode,
		stateDB:        stateDb,
		mux:            mux,
		queue:          newQueue(),
		peers:          newPeerSet(),
		rttEstimate:    uint64(rttMaxEstimate),
		rttConfidence:  uint64(1000000),
214
		blockchain:     chain,
215 216 217 218 219 220 221 222 223 224
		lightchain:     lightchain,
		dropPeer:       dropPeer,
		headerCh:       make(chan dataPack, 1),
		bodyCh:         make(chan dataPack, 1),
		receiptCh:      make(chan dataPack, 1),
		bodyWakeCh:     make(chan bool, 1),
		receiptWakeCh:  make(chan bool, 1),
		headerProcCh:   make(chan []*types.Header, 1),
		quitCh:         make(chan struct{}),
		stateCh:        make(chan dataPack),
225
		stateSyncStart: make(chan *stateSync),
226
		syncStatsState: stateSyncStats{
227
			processed: rawdb.ReadFastTrieProgress(stateDb),
228 229
		},
		trackStateReq: make(chan *stateReq),
230
	}
231
	go dl.qosTuner()
232
	go dl.stateFetcher()
233
	return dl
234 235
}

236 237 238
// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
239
//
L
Leif Jurvetson 已提交
240
// In addition, during the state download phase of fast synchronisation the number
241 242
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
243
func (d *Downloader) Progress() ethereum.SyncProgress {
244
	// Lock the current stats and return the progress
245 246
	d.syncStatsLock.RLock()
	defer d.syncStatsLock.RUnlock()
247

248 249 250
	current := uint64(0)
	switch d.mode {
	case FullSync:
251
		current = d.blockchain.CurrentBlock().NumberU64()
252
	case FastSync:
253
		current = d.blockchain.CurrentFastBlock().NumberU64()
254
	case LightSync:
255
		current = d.lightchain.CurrentHeader().Number.Uint64()
256
	}
257 258 259 260
	return ethereum.SyncProgress{
		StartingBlock: d.syncStatsChainOrigin,
		CurrentBlock:  current,
		HighestBlock:  d.syncStatsChainHeight,
261 262
		PulledStates:  d.syncStatsState.processed,
		KnownStates:   d.syncStatsState.processed + d.syncStatsState.pending,
263
	}
O
obscuren 已提交
264 265
}

266
// Synchronising returns whether the downloader is currently retrieving blocks.
267
func (d *Downloader) Synchronising() bool {
268
	return atomic.LoadInt32(&d.synchronising) > 0
269 270
}

271 272
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
273
func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
274 275
	logger := log.New("peer", id)
	logger.Trace("Registering sync peer")
276
	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
277
		logger.Error("Failed to register sync peer", "err", err)
278 279
		return err
	}
280 281
	d.qosReduceConfidence()

282 283 284
	return nil
}

N
Nick Johnson 已提交
285
// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
286 287 288 289
func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
	return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
}

290
// UnregisterPeer remove a peer from the known list, preventing any action from
291 292
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
293
func (d *Downloader) UnregisterPeer(id string) error {
294
	// Unregister the peer from the active peer set and revoke any fetch tasks
295 296
	logger := log.New("peer", id)
	logger.Trace("Unregistering sync peer")
297
	if err := d.peers.Unregister(id); err != nil {
298
		logger.Error("Failed to unregister sync peer", "err", err)
299 300
		return err
	}
301
	d.queue.Revoke(id)
302 303 304 305 306 307 308

	// If this peer was the master peer, abort sync immediately
	d.cancelLock.RLock()
	master := id == d.cancelPeer
	d.cancelLock.RUnlock()

	if master {
309
		d.cancel()
310
	}
311
	return nil
312 313
}

314 315
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
316 317 318
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
	err := d.synchronise(id, head, td, mode)
	switch err {
319 320 321
	case nil:
	case errBusy:

322 323 324
	case errTimeout, errBadPeer, errStallingPeer,
		errEmptyHeaderSet, errPeersUnavailable, errTooOld,
		errInvalidAncestor, errInvalidChain:
325
		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
326 327 328 329 330 331 332
		if d.dropPeer == nil {
			// The dropPeer method is nil when `--copydb` is used for a local copy.
			// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
			log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
		} else {
			d.dropPeer(id)
		}
333
	default:
334
		log.Warn("Synchronisation failed, retrying", "err", err)
335
	}
336
	return err
337 338 339
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
340
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
341
// checks fail an error will be returned. This method is synchronous
342
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
L
Leif Jurvetson 已提交
343
	// Mock out the synchronisation if testing
344 345 346
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
347
	// Make sure only one goroutine is ever allowed past this point at once
348
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
349
		return errBusy
350
	}
351
	defer atomic.StoreInt32(&d.synchronising, 0)
352

353 354
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
355
		log.Info("Block synchronisation started")
356
	}
357
	// Reset the queue, peer set and wake channels to clean any internal leftover state
358
	d.queue.Reset()
359
	d.peers.Reset()
360

361
	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
362 363 364 365
		select {
		case <-ch:
		default:
		}
366
	}
367
	for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
368 369 370 371 372 373 374 375
		for empty := false; !empty; {
			select {
			case <-ch:
			default:
				empty = true
			}
		}
	}
376 377 378 379 380 381 382
	for empty := false; !empty; {
		select {
		case <-d.headerProcCh:
		default:
			empty = true
		}
	}
383
	// Create cancel channel for aborting mid-flight and mark the master peer
384 385
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
386
	d.cancelPeer = id
387 388
	d.cancelLock.Unlock()

389
	defer d.Cancel() // No matter what, we can't leave the cancel channel open
390

391 392
	// Set the requested sync mode, unless it's forbidden
	d.mode = mode
393

394
	// Retrieve the origin peer and initiate the downloading process
395
	p := d.peers.Peer(id)
396
	if p == nil {
397
		return errUnknownPeer
398
	}
399
	return d.syncWithPeer(p, hash, td)
400 401
}

402 403
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
404
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
405
	d.mux.Post(StartEvent{})
406 407 408
	defer func() {
		// reset on error
		if err != nil {
409 410 411
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
412 413
		}
	}()
414 415 416
	if p.version < 62 {
		return errTooOld
	}
417

418
	log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
419
	defer func(start time.Time) {
420
		log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
421
	}(time.Now())
422

423 424 425 426 427 428
	// Look up the sync boundaries: the common ancestor and the target block
	latest, err := d.fetchHeight(p)
	if err != nil {
		return err
	}
	height := latest.Number.Uint64()
429

430 431 432 433 434 435 436 437 438 439
	origin, err := d.findAncestor(p, height)
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()
440

441
	// Ensure our origin point is below any fast sync pivot point
442
	pivot := uint64(0)
443 444 445
	if d.mode == FastSync {
		if height <= uint64(fsMinFullBlocks) {
			origin = 0
446
		} else {
447 448
			pivot = height - uint64(fsMinFullBlocks)
			if pivot <= origin {
449 450
				origin = pivot - 1
			}
451
		}
452
	}
453 454 455 456 457 458
	d.committed = 1
	if d.mode == FastSync && pivot != 0 {
		d.committed = 0
	}
	// Initiate the sync using a concurrent header and content retrieval algorithm
	d.queue.Prepare(origin+1, d.mode)
459 460
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
461
	}
462 463

	fetchers := []func() error{
464 465 466 467
		func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },          // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) },        // Receipts are retrieved during fast sync
		func() error { return d.processHeaders(origin+1, pivot, td) },
468 469 470 471 472 473
	}
	if d.mode == FastSync {
		fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
	} else if d.mode == FullSync {
		fetchers = append(fetchers, d.processFullSyncContent)
	}
474
	return d.spawnSync(fetchers)
475 476 477 478
}

// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
479 480
func (d *Downloader) spawnSync(fetchers []func() error) error {
	errc := make(chan error, len(fetchers))
481
	d.cancelWg.Add(len(fetchers))
482 483
	for _, fn := range fetchers {
		fn := fn
484
		go func() { defer d.cancelWg.Done(); errc <- fn() }()
485 486 487
	}
	// Wait for the first error, then terminate the others.
	var err error
488 489
	for i := 0; i < len(fetchers); i++ {
		if i == len(fetchers)-1 {
490 491 492 493 494 495 496 497 498 499
			// Close the queue when all fetchers have exited.
			// This will cause the block processor to end when
			// it has processed the queue.
			d.queue.Close()
		}
		if err = <-errc; err != nil {
			break
		}
	}
	d.queue.Close()
500
	d.Cancel()
501
	return err
502 503
}

504 505 506 507
// cancel aborts all of the operations and resets the queue. However, cancel does
// not wait for the running download goroutines to finish. This method should be
// used when cancelling the downloads from inside the downloader.
func (d *Downloader) cancel() {
508
	// Close the current cancel channel
509
	d.cancelLock.Lock()
510 511 512 513 514 515 516
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
517 518
	}
	d.cancelLock.Unlock()
519 520 521 522 523 524
}

// Cancel aborts all of the operations and waits for all download goroutines to
// finish before returning.
func (d *Downloader) Cancel() {
	d.cancel()
525
	d.cancelWg.Wait()
526 527
}

528
// Terminate interrupts the downloader, canceling all pending operations.
529
// The downloader cannot be reused after calling Terminate.
530
func (d *Downloader) Terminate() {
531 532 533 534 535 536 537 538 539 540
	// Close the termination channel (make sure double close is allowed)
	d.quitLock.Lock()
	select {
	case <-d.quitCh:
	default:
		close(d.quitCh)
	}
	d.quitLock.Unlock()

	// Cancel any pending download requests
541
	d.Cancel()
542 543
}

544 545
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
546
func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
P
Péter Szilágyi 已提交
547
	p.log.Debug("Retrieving remote chain height")
548 549

	// Request the advertised remote head block and wait for the response
550 551
	head, _ := p.peer.Head()
	go p.peer.RequestHeadersByHash(head, 1, 0, false)
552

553 554
	ttl := d.requestTTL()
	timeout := time.After(ttl)
555 556 557
	for {
		select {
		case <-d.cancelCh:
558
			return nil, errCancelBlockFetch
559

560
		case packet := <-d.headerCh:
561
			// Discard anything not from the origin peer
562
			if packet.PeerId() != p.id {
563
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
564 565 566
				break
			}
			// Make sure the peer actually gave something valid
567
			headers := packet.(*headerPack).headers
568
			if len(headers) != 1 {
P
Péter Szilágyi 已提交
569
				p.log.Debug("Multiple headers for single request", "headers", len(headers))
570
				return nil, errBadPeer
571
			}
572
			head := headers[0]
P
Péter Szilágyi 已提交
573
			p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
574
			return head, nil
575

576
		case <-timeout:
P
Péter Szilágyi 已提交
577
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
578
			return nil, errTimeout
579

580
		case <-d.bodyCh:
581 582
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
583 584 585 586
		}
	}
}

587
// findAncestor tries to locate the common ancestor link of the local chain and
588
// a remote peers blockchain. In the general case when our node was in sync and
589
// on the correct chain, checking the top N links should already get us a match.
590
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
591
// the head links match), we do a binary search to find the common ancestor.
592
func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
593
	// Figure out the valid ancestor range to prevent rewrite attacks
594
	floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
595

596
	if d.mode == FullSync {
597
		ceil = d.blockchain.CurrentBlock().NumberU64()
598
	} else if d.mode == FastSync {
599
		ceil = d.blockchain.CurrentFastBlock().NumberU64()
600 601 602 603
	}
	if ceil >= MaxForkAncestry {
		floor = int64(ceil - MaxForkAncestry)
	}
604 605
	p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)

606 607 608 609
	// Request the topmost blocks to short circuit binary ancestor lookup
	head := ceil
	if head > height {
		head = height
610
	}
611
	from := int64(head) - int64(MaxHeaderFetch)
612 613 614
	if from < 0 {
		from = 0
	}
615 616 617 618 619 620
	// Span out with 15 block gaps into the future to catch bad head reports
	limit := 2 * MaxHeaderFetch / 16
	count := 1 + int((int64(ceil)-from)/16)
	if count > limit {
		count = limit
	}
621
	go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
622 623 624

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
625 626 627

	ttl := d.requestTTL()
	timeout := time.After(ttl)
628 629 630 631

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
632
			return 0, errCancelHeaderFetch
633

634
		case packet := <-d.headerCh:
635
			// Discard anything not from the origin peer
636
			if packet.PeerId() != p.id {
637
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
638 639 640
				break
			}
			// Make sure the peer actually gave something valid
641
			headers := packet.(*headerPack).headers
642
			if len(headers) == 0 {
P
Péter Szilágyi 已提交
643
				p.log.Warn("Empty head header set")
644 645
				return 0, errEmptyHeaderSet
			}
646 647
			// Make sure the peer's reply conforms to the request
			for i := 0; i < len(headers); i++ {
648
				if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
P
Péter Szilágyi 已提交
649
					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
650 651 652
					return 0, errInvalidChain
				}
			}
653 654 655
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
656
				// Skip any headers that underflow/overflow our requested set
657
				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
658 659 660
					continue
				}
				// Otherwise check if we already know the header or not
661 662 663 664
				h := headers[i].Hash()
				n := headers[i].Number.Uint64()
				if (d.mode == FullSync && d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && d.lightchain.HasHeader(h, n)) {
					number, hash = n, h
665 666 667

					// If every header is known, even future ones, the peer straight out lied about its head
					if number > height && i == limit-1 {
P
Péter Szilágyi 已提交
668
						p.log.Warn("Lied about chain head", "reported", height, "found", number)
669 670
						return 0, errStallingPeer
					}
671 672 673 674
					break
				}
			}

675
		case <-timeout:
P
Péter Szilágyi 已提交
676
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
677 678
			return 0, errTimeout

679
		case <-d.bodyCh:
680 681
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
682 683 684
		}
	}
	// If the head fetch already found an ancestor, return
685
	if hash != (common.Hash{}) {
686
		if int64(number) <= floor {
P
Péter Szilágyi 已提交
687
			p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
688 689
			return 0, errInvalidAncestor
		}
P
Péter Szilágyi 已提交
690
		p.log.Debug("Found common ancestor", "number", number, "hash", hash)
691 692 693 694
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
695 696 697
	if floor > 0 {
		start = uint64(floor)
	}
698 699 700 701
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

702 703 704
		ttl := d.requestTTL()
		timeout := time.After(ttl)

705
		go p.peer.RequestHeadersByNumber(check, 1, 0, false)
706 707 708 709 710

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
711
				return 0, errCancelHeaderFetch
712

713
			case packer := <-d.headerCh:
714
				// Discard anything not from the origin peer
715
				if packer.PeerId() != p.id {
716
					log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
717 718 719
					break
				}
				// Make sure the peer actually gave something valid
720
				headers := packer.(*headerPack).headers
721
				if len(headers) != 1 {
P
Péter Szilágyi 已提交
722
					p.log.Debug("Multiple headers for single request", "headers", len(headers))
723 724 725 726 727
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
728 729 730
				h := headers[0].Hash()
				n := headers[0].Number.Uint64()
				if (d.mode == FullSync && !d.blockchain.HasBlock(h, n)) || (d.mode != FullSync && !d.lightchain.HasHeader(h, n)) {
731 732 733
					end = check
					break
				}
734
				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
735
				if header.Number.Uint64() != check {
P
Péter Szilágyi 已提交
736
					p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
737 738 739 740
					return 0, errBadPeer
				}
				start = check

741
			case <-timeout:
P
Péter Szilágyi 已提交
742
				p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
743 744
				return 0, errTimeout

745
			case <-d.bodyCh:
746 747
			case <-d.receiptCh:
				// Out of bounds delivery, ignore
748 749 750
			}
		}
	}
751 752
	// Ensure valid ancestry and return
	if int64(start) <= floor {
P
Péter Szilágyi 已提交
753
		p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
754 755
		return 0, errInvalidAncestor
	}
P
Péter Szilágyi 已提交
756
	p.log.Debug("Found common ancestor", "number", start, "hash", hash)
757 758 759
	return start, nil
}

760 761 762 763 764
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
765
// other peers are only accepted if they map cleanly to the skeleton. If no one
766 767
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
768
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error {
P
Péter Szilágyi 已提交
769 770
	p.log.Debug("Directing header downloads", "origin", from)
	defer p.log.Debug("Header download terminated")
771

772 773 774
	// Create a timeout timer, and the associated header fetcher
	skeleton := true            // Skeleton assembly phase or finishing up
	request := time.Now()       // time of the last skeleton fetch request
775 776 777 778
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

779
	var ttl time.Duration
780
	getHeaders := func(from uint64) {
781
		request = time.Now()
782 783 784

		ttl = d.requestTTL()
		timeout.Reset(ttl)
785

786
		if skeleton {
P
Péter Szilágyi 已提交
787
			p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
788
			go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
789
		} else {
P
Péter Szilágyi 已提交
790
			p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
791
			go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
792
		}
793
	}
794
	// Start pulling the header chain skeleton until all is done
795 796 797 798 799 800 801
	getHeaders(from)

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderFetch

802
		case packet := <-d.headerCh:
803
			// Make sure the active peer is giving us the skeleton headers
804
			if packet.PeerId() != p.id {
805
				log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
806 807
				break
			}
808
			headerReqTimer.UpdateSince(request)
809 810
			timeout.Stop()

811 812 813 814 815 816
			// If the skeleton's finished, pull any remaining head headers directly from the origin
			if packet.Items() == 0 && skeleton {
				skeleton = false
				getHeaders(from)
				continue
			}
817
			// If no more headers are inbound, notify the content fetchers and return
818
			if packet.Items() == 0 {
819 820 821 822 823 824 825 826 827 828 829 830
				// Don't abort header fetches while the pivot is downloading
				if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
					p.log.Debug("No headers, waiting for pivot commit")
					select {
					case <-time.After(fsHeaderContCheck):
						getHeaders(from)
						continue
					case <-d.cancelCh:
						return errCancelHeaderFetch
					}
				}
				// Pivot done (or not in fast sync) and no more headers, terminate the process
P
Péter Szilágyi 已提交
831
				p.log.Debug("No more headers available")
832 833 834 835 836 837
				select {
				case d.headerProcCh <- nil:
					return nil
				case <-d.cancelCh:
					return errCancelHeaderFetch
				}
838
			}
839
			headers := packet.(*headerPack).headers
840

841 842
			// If we received a skeleton batch, resolve internals concurrently
			if skeleton {
843
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
844
				if err != nil {
P
Péter Szilágyi 已提交
845
					p.log.Debug("Skeleton chain invalid", "err", err)
846 847
					return errInvalidChain
				}
848 849
				headers = filled[proced:]
				from += uint64(proced)
850
			}
851
			// Insert all the new headers and fetch the next batch
852
			if len(headers) > 0 {
P
Péter Szilágyi 已提交
853
				p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
854 855 856 857 858 859
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
					return errCancelHeaderFetch
				}
				from += uint64(len(headers))
860
			}
861 862 863
			getHeaders(from)

		case <-timeout.C:
864 865 866 867 868 869
			if d.dropPeer == nil {
				// The dropPeer method is nil when `--copydb` is used for a local copy.
				// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
				p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id)
				break
			}
870
			// Header retrieval timed out, consider the peer bad and drop
P
Péter Szilágyi 已提交
871
			p.log.Debug("Header request timed out", "elapsed", ttl)
872
			headerTimeoutMeter.Mark(1)
873 874 875
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
876
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
877 878 879 880
				select {
				case ch <- false:
				case <-d.cancelCh:
				}
881
			}
882 883 884 885 886
			select {
			case d.headerProcCh <- nil:
			case <-d.cancelCh:
			}
			return errBadPeer
887 888 889 890
		}
	}
}

891 892
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
893 894 895 896 897
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
Y
Yusup 已提交
898
// The method returns the entire filled skeleton and also the number of headers
899 900
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
901
	log.Debug("Filling up skeleton", "from", from)
902 903 904 905 906
	d.queue.ScheduleSkeleton(from, skeleton)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
907
			return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
908
		}
909
		expire   = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
910
		throttle = func() bool { return false }
911
		reserve  = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
912 913
			return d.queue.ReserveHeaders(p, count), false, nil
		}
914 915 916
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
		capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
917 918 919
	)
	err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
		d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
920
		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
921

922
	log.Debug("Skeleton fill terminated", "err", err)
923 924 925

	filled, proced := d.queue.RetrieveHeaders()
	return filled, proced, err
926 927
}

928 929 930 931
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
932
	log.Debug("Downloading block bodies", "origin", from)
933

934
	var (
935
		deliver = func(packet dataPack) (int, error) {
936
			pack := packet.(*bodyPack)
937
			return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
938
		}
939
		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
940 941 942
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
943
	)
944
	err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
945
		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
946
		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
947

948
	log.Debug("Block body download terminated", "err", err)
949 950 951 952 953 954 955
	return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
956
	log.Debug("Downloading transaction receipts", "origin", from)
957 958

	var (
959
		deliver = func(packet dataPack) (int, error) {
960
			pack := packet.(*receiptPack)
961
			return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
962
		}
963
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
964 965 966
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
967
	)
968
	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
969
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
970
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
971

972
	log.Debug("Transaction receipt download terminated", "err", err)
973 974 975 976 977 978
	return err
}

// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
//  - errCancel:   error type to return if the fetch operation is cancelled (mostly makes logging nicer)
//  - deliveryCh:  channel from which to retrieve downloaded data packets (merged from all concurrent peers)
//  - deliver:     processing callback to deliver data packets into type specific download queues (usually within `queue`)
//  - wakeCh:      notification channel for waking the fetcher when new tasks are available (or sync completed)
//  - expire:      task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
//  - pending:     task callback for the number of requests still needing download (detect completion/non-completability)
//  - inFlight:    task callback for the number of in-progress requests (wait for all active downloads to finish)
//  - throttle:    task callback to check if the processing queue is full and activate throttling (bound memory use)
//  - reserve:     task callback to reserve new download tasks to a particular peer (also signals partial completions)
//  - fetchHook:   tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
//  - fetch:       network callback to actually send a particular download request to a physical remote peer
//  - cancel:      task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
P
Péter Szilágyi 已提交
997
//  - capacity:    network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
998 999 1000
//  - idle:        network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
//  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
//  - kind:        textual label of the type being downloaded to display in log mesages
1001
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1002 1003 1004
	expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
	fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
1005

1006
	// Create a ticker to detect expired retrieval tasks
1007 1008 1009 1010 1011
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

1012
	// Prepare the queue and fetch block parts until the block header fetcher's done
1013 1014 1015 1016
	finished := false
	for {
		select {
		case <-d.cancelCh:
1017
			return errCancel
1018

1019
		case packet := <-deliveryCh:
1020 1021
			// If the peer was previously banned and failed to deliver its pack
			// in a reasonable time frame, ignore its message.
1022
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
1023 1024 1025
				// Deliver the received chunk of data and check chain validity
				accepted, err := deliver(packet)
				if err == errInvalidChain {
1026
					return err
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
				}
				// Unless a peer delivered something completely else than requested (usually
				// caused by a timed out request which came through in the end), set it to
				// idle. If the delivery's stale, the peer should have already been idled.
				if err != errStaleDelivery {
					setIdle(peer, accepted)
				}
				// Issue a log to the user to see what's going on
				switch {
				case err == nil && packet.Items() == 0:
P
Péter Szilágyi 已提交
1037
					peer.log.Trace("Requested data not delivered", "type", kind)
1038
				case err == nil:
P
Péter Szilágyi 已提交
1039
					peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
1040
				default:
P
Péter Szilágyi 已提交
1041
					peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
1042 1043 1044 1045 1046 1047 1048 1049
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

1050
		case cont := <-wakeCh:
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
1073
			// Check for fetch request timeouts and demote the responsible peers
1074
			for pid, fails := range expire() {
1075
				if peer := d.peers.Peer(pid); peer != nil {
1076 1077 1078 1079 1080 1081 1082 1083
					// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
					// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
					// out that sync wise we need to get rid of the peer.
					//
					// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
					// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
					if fails > 2 {
P
Péter Szilágyi 已提交
1084
						peer.log.Trace("Data delivery timed out", "type", kind)
1085 1086
						setIdle(peer, 0)
					} else {
P
Péter Szilágyi 已提交
1087
						peer.log.Debug("Stalling delivery, dropping", "type", kind)
1088 1089 1090 1091 1092 1093 1094
						if d.dropPeer == nil {
							// The dropPeer method is nil when `--copydb` is used for a local copy.
							// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
							peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
						} else {
							d.dropPeer(pid)
						}
1095
					}
1096 1097
				}
			}
1098 1099
			// If there's nothing more to fetch, wait or terminate
			if pending() == 0 {
1100
				if !inFlight() && finished {
1101
					log.Debug("Data fetching completed", "type", kind)
1102 1103 1104 1105 1106
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
1107
			progressed, throttled, running := false, false, inFlight()
1108 1109 1110
			idles, total := idle()

			for _, peer := range idles {
1111
				// Short circuit if throttling activated
1112
				if throttle() {
1113 1114 1115
					throttled = true
					break
				}
1116 1117 1118 1119
				// Short circuit if there is no more available task.
				if pending() == 0 {
					break
				}
1120 1121
				// Reserve a chunk of fetches for a peer. A nil can mean either that
				// no more headers are available, or that the peer is known not to
1122
				// have them.
1123
				request, progress, err := reserve(peer, capacity(peer))
1124 1125 1126
				if err != nil {
					return err
				}
1127 1128
				if progress {
					progressed = true
1129 1130 1131 1132
				}
				if request == nil {
					continue
				}
1133
				if request.From > 0 {
P
Péter Szilágyi 已提交
1134
					peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
1135
				} else {
1136
					peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
1137
				}
1138
				// Fetch the chunk and make sure any errors return the hashes to the queue
1139 1140
				if fetchHook != nil {
					fetchHook(request.Headers)
1141
				}
1142
				if err := fetch(peer, request); err != nil {
1143 1144 1145 1146 1147
					// Although we could try and make an attempt to fix this, this error really
					// means that we've double allocated a fetch task to a peer. If that is the
					// case, the internal state of the downloader and the queue is very wrong so
					// better hard crash and note the error instead of silently accumulating into
					// a much bigger issue.
1148
					panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
1149
				}
1150
				running = true
1151 1152 1153
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
1154
			if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
1155 1156 1157 1158 1159 1160
				return errPeersUnavailable
			}
		}
	}
}

1161 1162 1163
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
1164
func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error {
1165 1166 1167 1168 1169 1170 1171 1172 1173
	// Keep a count of uncertain headers to roll back
	rollback := []*types.Header{}
	defer func() {
		if len(rollback) > 0 {
			// Flatten the headers and roll them back
			hashes := make([]common.Hash, len(rollback))
			for i, header := range rollback {
				hashes[i] = header.Hash()
			}
1174 1175
			lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
			if d.mode != LightSync {
1176 1177
				lastFastBlock = d.blockchain.CurrentFastBlock().Number()
				lastBlock = d.blockchain.CurrentBlock().Number()
1178
			}
1179
			d.lightchain.Rollback(hashes)
1180
			curFastBlock, curBlock := common.Big0, common.Big0
1181
			if d.mode != LightSync {
1182 1183
				curFastBlock = d.blockchain.CurrentFastBlock().Number()
				curBlock = d.blockchain.CurrentBlock().Number()
1184
			}
1185
			log.Warn("Rolled back headers", "count", len(hashes),
1186
				"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
1187 1188
				"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
		}
	}()

	// Wait for batches of headers to process
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderProcessing

		case headers := <-d.headerProcCh:
			// Terminate header processing if we synced up
			if len(headers) == 0 {
				// Notify everyone that headers are fully processed
1204
				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1205 1206 1207 1208 1209
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
				}
1210 1211
				// If no headers were retrieved at all, the peer violated its TD promise that it had a
				// better chain compared to ours. The only exception is if its promised blocks were
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221
				// already imported by other means (e.g. fecher):
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
1222
				if d.mode != LightSync {
1223 1224
					head := d.blockchain.CurrentBlock()
					if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
1225 1226
						return errStallingPeer
					}
1227 1228 1229 1230 1231 1232 1233 1234 1235
				}
				// If fast or light syncing, ensure promised headers are indeed delivered. This is
				// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
				// of delivering the post-pivot blocks that would flag the invalid content.
				//
				// This check cannot be executed "as is" for full imports, since blocks may still be
				// queued for processing when the header download completes. However, as long as the
				// peer gave us something useful, we're already happy/progressed (above check).
				if d.mode == FastSync || d.mode == LightSync {
1236 1237
					head := d.lightchain.CurrentHeader()
					if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
						return errStallingPeer
					}
				}
				// Disable any rollback and return
				rollback = nil
				return nil
			}
			// Otherwise split the chunk of headers into batches and process them
			gotHeaders = true

			for len(headers) > 0 {
				// Terminate if something failed in between processing chunks
				select {
				case <-d.cancelCh:
					return errCancelHeaderProcessing
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]

				// In case of header only syncing, validate the chunk immediately
				if d.mode == FastSync || d.mode == LightSync {
					// Collect the yet unknown headers to mark them as uncertain
					unknown := make([]*types.Header, 0, len(headers))
					for _, header := range chunk {
1267
						if !d.lightchain.HasHeader(header.Hash(), header.Number.Uint64()) {
1268 1269 1270 1271 1272 1273 1274 1275
							unknown = append(unknown, header)
						}
					}
					// If we're importing pure headers, verify based on their recentness
					frequency := fsHeaderCheckFrequency
					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
						frequency = 1
					}
1276
					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1277 1278 1279 1280
						// If some headers were inserted, add them too to the rollback list
						if n > 0 {
							rollback = append(rollback, chunk[:n]...)
						}
1281
						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
						return errInvalidChain
					}
					// All verifications passed, store newly found uncertain headers
					rollback = append(rollback, unknown...)
					if len(rollback) > fsHeaderSafetyNet {
						rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
					}
				}
				// Unless we're doing light chains, schedule the headers for associated content retrieval
				if d.mode == FullSync || d.mode == FastSync {
					// If we've reached the allowed number of pending headers, stall a bit
					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
						select {
						case <-d.cancelCh:
							return errCancelHeaderProcessing
						case <-time.After(time.Second):
						}
					}
					// Otherwise insert the headers for content retrieval
					inserts := d.queue.Schedule(chunk, origin)
					if len(inserts) != len(chunk) {
1303
						log.Debug("Stale headers")
1304 1305 1306 1307 1308 1309
						return errBadPeer
					}
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
1310 1311 1312 1313 1314 1315 1316 1317

			// Update the highest block number we know if a higher one is found.
			d.syncStatsLock.Lock()
			if d.syncStatsChainHeight < origin {
				d.syncStatsChainHeight = origin - 1
			}
			d.syncStatsLock.Unlock()

1318
			// Signal the content downloaders of the availablility of new tasks
1319
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1320 1321 1322 1323 1324 1325 1326 1327 1328
				select {
				case ch <- true:
				default:
				}
			}
		}
	}
}

1329 1330
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
1331
	for {
1332
		results := d.queue.Results(true)
1333
		if len(results) == 0 {
1334
			return nil
1335
		}
1336
		if d.chainInsertHook != nil {
1337
			d.chainInsertHook(results)
1338
		}
1339 1340 1341 1342 1343 1344 1345
		if err := d.importBlockResults(results); err != nil {
			return err
		}
	}
}

func (d *Downloader) importBlockResults(results []*fetchResult) error {
1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting downloaded chain", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnum", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	}
	if index, err := d.blockchain.InsertChain(blocks); err != nil {
		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
		return errInvalidChain
1368 1369 1370 1371 1372 1373 1374
	}
	return nil
}

// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
1375 1376
	// Start syncing state of the reported head block. This should get us most of
	// the state of the pivot block.
1377 1378 1379
	stateSync := d.syncState(latest.Root)
	defer stateSync.Cancel()
	go func() {
1380
		if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
1381 1382 1383
			d.queue.Close() // wake up WaitResults
		}
	}()
1384 1385 1386 1387 1388 1389 1390
	// Figure out the ideal pivot block. Note, that this goalpost may move if the
	// sync takes long enough for the chain head to move significantly.
	pivot := uint64(0)
	if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) {
		pivot = height - uint64(fsMinFullBlocks)
	}
	// To cater for moving pivot points, track the pivot block and subsequently
Y
Yusup 已提交
1391
	// accumulated download results separately.
1392 1393 1394 1395
	var (
		oldPivot *fetchResult   // Locked in pivot block, might change eventually
		oldTail  []*fetchResult // Downloaded content after the pivot
	)
1396
	for {
1397 1398 1399
		// Wait for the next batch of downloaded data to be available, and if the pivot
		// block became stale, move the goalpost
		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
1400
		if len(results) == 0 {
1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
			// If pivot sync is done, stop
			if oldPivot == nil {
				return stateSync.Cancel()
			}
			// If sync failed, stop
			select {
			case <-d.cancelCh:
				return stateSync.Cancel()
			default:
			}
1411 1412 1413 1414
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
		if oldPivot != nil {
			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
		}
		// Split around the pivot block and process the two sides via fast/full sync
		if atomic.LoadInt32(&d.committed) == 0 {
			latest = results[len(results)-1].Header
			if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) {
				log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks))
				pivot = height - uint64(fsMinFullBlocks)
			}
		}
1426 1427 1428 1429 1430
		P, beforeP, afterP := splitAroundPivot(pivot, results)
		if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
			return err
		}
		if P != nil {
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457
			// If new pivot block found, cancel old state retrieval and restart
			if oldPivot != P {
				stateSync.Cancel()

				stateSync = d.syncState(P.Header.Root)
				defer stateSync.Cancel()
				go func() {
					if err := stateSync.Wait(); err != nil && err != errCancelStateFetch {
						d.queue.Close() // wake up WaitResults
					}
				}()
				oldPivot = P
			}
			// Wait for completion, occasionally checking for pivot staleness
			select {
			case <-stateSync.done:
				if stateSync.err != nil {
					return stateSync.err
				}
				if err := d.commitPivotBlock(P); err != nil {
					return err
				}
				oldPivot = nil

			case <-time.After(time.Second):
				oldTail = afterP
				continue
1458
			}
1459
		}
1460
		// Fast sync done, pivot commit done, full import
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
	for _, result := range results {
		num := result.Header.Number.Uint64()
		switch {
		case num < pivot:
			before = append(before, result)
		case num == pivot:
			p = result
		default:
			after = append(after, result)
		}
	}
	return p, before, after
}

func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
1483 1484 1485 1486 1487 1488 1489 1490 1491 1492
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	case <-stateSync.done:
		if err := stateSync.Wait(); err != nil {
			return err
1493
		}
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting fast-sync blocks", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnumn", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	receipts := make([]types.Receipts, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
		receipts[i] = result.Receipts
	}
	if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
		return errInvalidChain
1511 1512 1513 1514 1515
	}
	return nil
}

func (d *Downloader) commitPivotBlock(result *fetchResult) error {
1516 1517 1518
	block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
	if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}); err != nil {
1519 1520
		return err
	}
1521
	if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
1522
		return err
1523
	}
1524 1525
	atomic.StoreInt32(&d.committed, 1)
	return nil
1526 1527
}

L
Leif Jurvetson 已提交
1528
// DeliverHeaders injects a new batch of block headers received from a remote
1529
// node into the download schedule.
1530
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
1531
	return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
1532 1533 1534
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
1535
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
1536
	return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
1537 1538 1539 1540
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
1541 1542 1543 1544 1545 1546 1547 1548 1549 1550
	return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}

// DeliverNodeData injects a new batch of node state data received from a remote node.
func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
	return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}

// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
1551
	// Update the delivery metrics for both good and failed deliveries
1552
	inMeter.Mark(int64(packet.Items()))
1553 1554
	defer func() {
		if err != nil {
1555
			dropMeter.Mark(int64(packet.Items()))
1556 1557 1558 1559 1560 1561
		}
	}()
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1562 1563 1564
	if cancel == nil {
		return errNoSyncActive
	}
1565
	select {
1566
	case destCh <- packet:
1567 1568 1569 1570
		return nil
	case <-cancel:
		return errNoSyncActive
	}
1571
}
1572 1573 1574 1575 1576 1577

// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
	for {
		// Retrieve the current median RTT and integrate into the previoust target RTT
1578
		rtt := time.Duration((1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
1579 1580 1581 1582 1583 1584 1585 1586
		atomic.StoreUint64(&d.rttEstimate, uint64(rtt))

		// A new RTT cycle passed, increase our confidence in the estimated RTT
		conf := atomic.LoadUint64(&d.rttConfidence)
		conf = conf + (1000000-conf)/2
		atomic.StoreUint64(&d.rttConfidence, conf)

		// Log the new QoS values and sleep until the next RTT
1587
		log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600
		select {
		case <-d.quitCh:
			return
		case <-time.After(rtt):
		}
	}
}

// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
	// If we have a single peer, confidence is always 1
	peers := uint64(d.peers.Len())
1601 1602 1603 1604
	if peers == 0 {
		// Ensure peer connectivity races don't catch us off guard
		return
	}
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620
	if peers == 1 {
		atomic.StoreUint64(&d.rttConfidence, 1000000)
		return
	}
	// If we have a ton of peers, don't drop confidence)
	if peers >= uint64(qosConfidenceCap) {
		return
	}
	// Otherwise drop the confidence factor
	conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
	if float64(conf)/1000000 < rttMinConfidence {
		conf = uint64(rttMinConfidence * 1000000)
	}
	atomic.StoreUint64(&d.rttConfidence, conf)

	rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
1621
	log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
1622 1623 1624 1625 1626 1627 1628
}

// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
Y
Yusup 已提交
1629
// be adapted to, but smaller ones are preferred (stabler download stream).
1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646
func (d *Downloader) requestRTT() time.Duration {
	return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}

// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
	var (
		rtt  = time.Duration(atomic.LoadUint64(&d.rttEstimate))
		conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
	)
	ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
	if ttl > ttlLimit {
		ttl = ttlLimit
	}
	return ttl
}