downloader.go 58.7 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"crypto/rand"
22
	"errors"
23
	"fmt"
24
	"math"
25
	"math/big"
26 27 28 29
	"sync"
	"sync/atomic"
	"time"

30
	ethereum "github.com/ethereum/go-ethereum"
31 32
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
33
	"github.com/ethereum/go-ethereum/ethdb"
34
	"github.com/ethereum/go-ethereum/event"
35
	"github.com/ethereum/go-ethereum/log"
36
	"github.com/ethereum/go-ethereum/params"
37
	"github.com/rcrowley/go-metrics"
38 39
)

40
var (
41 42 43
	MaxHashFetch    = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
44
	MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
45 46
	MaxBodyFetch    = 128 // Amount of block bodies to be fetched per retrieval request
	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
47
	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
48

49 50 51 52 53 54
	MaxForkAncestry  = 3 * params.EpochDuration // Maximum chain reorganisation
	rttMinEstimate   = 2 * time.Second          // Minimum round-trip time to target for download requests
	rttMaxEstimate   = 20 * time.Second         // Maximum rount-trip time to target for download requests
	rttMinConfidence = 0.1                      // Worse confidence factor in our estimated RTT value
	ttlScaling       = 3                        // Constant scaling factor for RTT -> TTL conversion
	ttlLimit         = time.Minute              // Maximum TTL allowance to prevent reaching crazy timeouts
55 56 57 58

	qosTuningPeers   = 5    // Number of peers to tune based on (best peers)
	qosConfidenceCap = 10   // Number of peers above which not to modify RTT confidence
	qosTuningImpact  = 0.25 // Impact that a new tuning target has on the previous value
59

60 61
	maxQueuedHeaders  = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
	maxHeadersProcess = 2048      // Number of header download results to import at once into the chain
62
	maxResultsProcess = 2048      // Number of content download results to import at once into the chain
63

64 65 66
	fsHeaderCheckFrequency = 100        // Verification frequency of the downloaded headers during fast sync
	fsHeaderSafetyNet      = 2048       // Number of headers to discard in case a chain violation is detected
	fsHeaderForceVerify    = 24         // Number of headers to verify before and after the pivot to accept it
67 68
	fsPivotInterval        = 256        // Number of headers out of which to randomize the pivot point
	fsMinFullBlocks        = 64         // Number of blocks to retrieve fully even in fast sync
69
	fsCriticalTrials       = uint32(32) // Number of times to retry in the cricical section before bailing
70
)
71

72
var (
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
	errBusy                    = errors.New("busy")
	errUnknownPeer             = errors.New("peer is unknown or unhealthy")
	errBadPeer                 = errors.New("action from bad peer ignored")
	errStallingPeer            = errors.New("peer is stalling")
	errNoPeers                 = errors.New("no peers to keep download active")
	errTimeout                 = errors.New("timeout")
	errEmptyHeaderSet          = errors.New("empty header set by peer")
	errPeersUnavailable        = errors.New("no peers available or all tried for download")
	errInvalidAncestor         = errors.New("retrieved ancestor is invalid")
	errInvalidChain            = errors.New("retrieved hash chain is invalid")
	errInvalidBlock            = errors.New("retrieved block is invalid")
	errInvalidBody             = errors.New("retrieved block body is invalid")
	errInvalidReceipt          = errors.New("retrieved receipt is invalid")
	errCancelBlockFetch        = errors.New("block download canceled (requested)")
	errCancelHeaderFetch       = errors.New("block header download canceled (requested)")
	errCancelBodyFetch         = errors.New("block body download canceled (requested)")
	errCancelReceiptFetch      = errors.New("receipt download canceled (requested)")
	errCancelStateFetch        = errors.New("state data download canceled (requested)")
	errCancelHeaderProcessing  = errors.New("header processing canceled (requested)")
	errCancelContentProcessing = errors.New("content processing canceled (requested)")
	errNoSyncActive            = errors.New("no sync active")
94
	errTooOld                  = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
95 96
)

97
type Downloader struct {
98 99
	mode SyncMode       // Synchronisation mode defining the strategy used (per sync cycle)
	mux  *event.TypeMux // Event multiplexer to announce sync operation events
100

101 102 103
	queue   *queue   // Scheduler for selecting the hashes to download
	peers   *peerSet // Set of active peers from which download can proceed
	stateDB ethdb.Database
104

105
	fsPivotLock  *types.Header // Pivot header on critical section entry (cannot change between retries)
106
	fsPivotFails uint32        // Number of subsequent fast sync failures in the critical section
107

108 109
	rttEstimate   uint64 // Round trip time to target for download requests
	rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)
110

111
	// Statistics
112 113 114
	syncStatsChainOrigin uint64 // Origin block number where syncing started at
	syncStatsChainHeight uint64 // Highest block number known when syncing started
	syncStatsState       stateSyncStats
115
	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
116

117
	lightchain LightChain
118
	blockchain BlockChain
119

120
	// Callbacks
121
	dropPeer peerDropFn // Drops a peer for misbehaving
122

123
	// Status
124 125 126
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
	notified        int32
127 128

	// Channels
129 130 131 132 133 134
	headerCh      chan dataPack        // [eth/62] Channel receiving inbound block headers
	bodyCh        chan dataPack        // [eth/62] Channel receiving inbound block bodies
	receiptCh     chan dataPack        // [eth/63] Channel receiving inbound receipts
	bodyWakeCh    chan bool            // [eth/62] Channel to signal the block body fetcher of new tasks
	receiptWakeCh chan bool            // [eth/63] Channel to signal the receipt fetcher of new tasks
	headerProcCh  chan []*types.Header // [eth/62] Channel to feed the header processor new tasks
135

136 137 138 139 140
	// for stateFetcher
	stateSyncStart chan *stateSync
	trackStateReq  chan *stateReq
	stateCh        chan dataPack // [eth/63] Channel receiving inbound node state data

141 142
	// Cancellation and termination
	cancelPeer string        // Identifier of the peer currently being used as the master (cancel on drop)
143
	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
144
	cancelLock sync.RWMutex  // Lock to protect the cancel channel and peer in delivers
145

146 147 148
	quitCh   chan struct{} // Quit channel to signal termination
	quitLock sync.RWMutex  // Lock to prevent double closes

149
	// Testing hooks
150 151 152 153
	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
154 155
}

156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
type LightChain interface {
	// HasHeader verifies a header's presence in the local chain.
	HasHeader(common.Hash) bool

	// GetHeaderByHash retrieves a header from the local chain.
	GetHeaderByHash(common.Hash) *types.Header

	// CurrentHeader retrieves the head header from the local chain.
	CurrentHeader() *types.Header

	// GetTdByHash returns the total difficulty of a local block.
	GetTdByHash(common.Hash) *big.Int

	// InsertHeaderChain inserts a batch of headers into the local chain.
	InsertHeaderChain([]*types.Header, int) (int, error)

	// Rollback removes a few recently added elements from the local chain.
	Rollback([]common.Hash)
}

type BlockChain interface {
	LightChain

	// HasBlockAndState verifies block and associated states' presence in the local chain.
	HasBlockAndState(common.Hash) bool

	// GetBlockByHash retrieves a block from the local chain.
	GetBlockByHash(common.Hash) *types.Block

	// CurrentBlock retrieves the head block from the local chain.
	CurrentBlock() *types.Block

	// CurrentFastBlock retrieves the head fast block from the local chain.
	CurrentFastBlock() *types.Block

	// FastSyncCommitHead directly commits the head block to a certain entity.
	FastSyncCommitHead(common.Hash) error

	// InsertChain inserts a batch of blocks into the local chain.
	InsertChain(types.Blocks) (int, error)

	// InsertReceiptChain inserts a batch of receipts into the local chain.
	InsertReceiptChain(types.Blocks, []types.Receipts) (int, error)
}

201
// New creates a new downloader to fetch hashes and blocks from remote peers.
202 203 204 205
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
	if lightchain == nil {
		lightchain = chain
	}
206

207
	dl := &Downloader{
208 209 210 211 212 213 214
		mode:           mode,
		stateDB:        stateDb,
		mux:            mux,
		queue:          newQueue(),
		peers:          newPeerSet(),
		rttEstimate:    uint64(rttMaxEstimate),
		rttConfidence:  uint64(1000000),
215
		blockchain:     chain,
216 217 218 219 220 221 222 223 224 225
		lightchain:     lightchain,
		dropPeer:       dropPeer,
		headerCh:       make(chan dataPack, 1),
		bodyCh:         make(chan dataPack, 1),
		receiptCh:      make(chan dataPack, 1),
		bodyWakeCh:     make(chan bool, 1),
		receiptWakeCh:  make(chan bool, 1),
		headerProcCh:   make(chan []*types.Header, 1),
		quitCh:         make(chan struct{}),
		stateCh:        make(chan dataPack),
226 227
		stateSyncStart: make(chan *stateSync),
		trackStateReq:  make(chan *stateReq),
228
	}
229
	go dl.qosTuner()
230
	go dl.stateFetcher()
231
	return dl
232 233
}

234 235 236
// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
237
//
L
Leif Jurvetson 已提交
238
// In addition, during the state download phase of fast synchronisation the number
239 240
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
241
func (d *Downloader) Progress() ethereum.SyncProgress {
242
	// Lock the current stats and return the progress
243 244
	d.syncStatsLock.RLock()
	defer d.syncStatsLock.RUnlock()
245

246 247 248
	current := uint64(0)
	switch d.mode {
	case FullSync:
249
		current = d.blockchain.CurrentBlock().NumberU64()
250
	case FastSync:
251
		current = d.blockchain.CurrentFastBlock().NumberU64()
252
	case LightSync:
253
		current = d.lightchain.CurrentHeader().Number.Uint64()
254
	}
255 256 257 258
	return ethereum.SyncProgress{
		StartingBlock: d.syncStatsChainOrigin,
		CurrentBlock:  current,
		HighestBlock:  d.syncStatsChainHeight,
259 260
		PulledStates:  d.syncStatsState.processed,
		KnownStates:   d.syncStatsState.processed + d.syncStatsState.pending,
261
	}
O
obscuren 已提交
262 263
}

264
// Synchronising returns whether the downloader is currently retrieving blocks.
265
func (d *Downloader) Synchronising() bool {
266
	return atomic.LoadInt32(&d.synchronising) > 0
267 268
}

269 270
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
271
func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
272

273 274
	logger := log.New("peer", id)
	logger.Trace("Registering sync peer")
275
	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
276
		logger.Error("Failed to register sync peer", "err", err)
277 278
		return err
	}
279 280
	d.qosReduceConfidence()

281 282 283
	return nil
}

284 285 286 287 288
// RegisterLightPeer injects a light client peer
func (d *Downloader) RegisterLightPeer(id string, version int, peer LightPeer) error {
	return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
}

289
// UnregisterPeer remove a peer from the known list, preventing any action from
290 291
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
292
func (d *Downloader) UnregisterPeer(id string) error {
293
	// Unregister the peer from the active peer set and revoke any fetch tasks
294 295
	logger := log.New("peer", id)
	logger.Trace("Unregistering sync peer")
296
	if err := d.peers.Unregister(id); err != nil {
297
		logger.Error("Failed to unregister sync peer", "err", err)
298 299
		return err
	}
300
	d.queue.Revoke(id)
301 302 303 304 305 306 307

	// If this peer was the master peer, abort sync immediately
	d.cancelLock.RLock()
	master := id == d.cancelPeer
	d.cancelLock.RUnlock()

	if master {
308
		d.Cancel()
309
	}
310
	return nil
311 312
}

313 314
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
315 316 317
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
	err := d.synchronise(id, head, td, mode)
	switch err {
318 319 320
	case nil:
	case errBusy:

321 322 323
	case errTimeout, errBadPeer, errStallingPeer,
		errEmptyHeaderSet, errPeersUnavailable, errTooOld,
		errInvalidAncestor, errInvalidChain:
324
		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
325 326 327
		d.dropPeer(id)

	default:
328
		log.Warn("Synchronisation failed, retrying", "err", err)
329
	}
330
	return err
331 332 333
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
334
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
335
// checks fail an error will be returned. This method is synchronous
336
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
L
Leif Jurvetson 已提交
337
	// Mock out the synchronisation if testing
338 339 340
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
341
	// Make sure only one goroutine is ever allowed past this point at once
342
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
343
		return errBusy
344
	}
345
	defer atomic.StoreInt32(&d.synchronising, 0)
346

347 348
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
349
		log.Info("Block synchronisation started")
350
	}
351
	// Reset the queue, peer set and wake channels to clean any internal leftover state
352
	d.queue.Reset()
353
	d.peers.Reset()
354

355
	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
356 357 358 359
		select {
		case <-ch:
		default:
		}
360
	}
361
	for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
362 363 364 365 366 367 368 369
		for empty := false; !empty; {
			select {
			case <-ch:
			default:
				empty = true
			}
		}
	}
370 371 372 373 374 375 376
	for empty := false; !empty; {
		select {
		case <-d.headerProcCh:
		default:
			empty = true
		}
	}
377
	// Create cancel channel for aborting mid-flight and mark the master peer
378 379
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
380
	d.cancelPeer = id
381 382
	d.cancelLock.Unlock()

383
	defer d.Cancel() // No matter what, we can't leave the cancel channel open
384

385 386
	// Set the requested sync mode, unless it's forbidden
	d.mode = mode
387
	if d.mode == FastSync && atomic.LoadUint32(&d.fsPivotFails) >= fsCriticalTrials {
388 389
		d.mode = FullSync
	}
390
	// Retrieve the origin peer and initiate the downloading process
391
	p := d.peers.Peer(id)
392
	if p == nil {
393
		return errUnknownPeer
394
	}
395
	return d.syncWithPeer(p, hash, td)
396 397
}

398 399
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
400
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
401
	d.mux.Post(StartEvent{})
402 403 404
	defer func() {
		// reset on error
		if err != nil {
405 406 407
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
408 409
		}
	}()
410 411 412
	if p.version < 62 {
		return errTooOld
	}
413

414
	log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", d.mode)
415
	defer func(start time.Time) {
416
		log.Debug("Synchronisation terminated", "elapsed", time.Since(start))
417
	}(time.Now())
418

419 420 421 422 423 424
	// Look up the sync boundaries: the common ancestor and the target block
	latest, err := d.fetchHeight(p)
	if err != nil {
		return err
	}
	height := latest.Number.Uint64()
425

426 427 428 429 430 431 432 433 434 435
	origin, err := d.findAncestor(p, height)
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()
436

437 438 439 440 441 442 443 444 445 446 447
	// Initiate the sync using a concurrent header and content retrieval algorithm
	pivot := uint64(0)
	switch d.mode {
	case LightSync:
		pivot = height
	case FastSync:
		// Calculate the new fast/slow sync pivot point
		if d.fsPivotLock == nil {
			pivotOffset, err := rand.Int(rand.Reader, big.NewInt(int64(fsPivotInterval)))
			if err != nil {
				panic(fmt.Sprintf("Failed to access crypto random source: %v", err))
448
			}
449 450
			if height > uint64(fsMinFullBlocks)+pivotOffset.Uint64() {
				pivot = height - uint64(fsMinFullBlocks) - pivotOffset.Uint64()
451
			}
452 453 454
		} else {
			// Pivot point locked in, use this and do not pick a new one!
			pivot = d.fsPivotLock.Number.Uint64()
455
		}
456 457 458 459 460 461 462
		// If the point is below the origin, move origin back to ensure state download
		if pivot < origin {
			if pivot > 0 {
				origin = pivot - 1
			} else {
				origin = 0
			}
463
		}
464
		log.Debug("Fast syncing until pivot block", "pivot", pivot)
465 466 467 468
	}
	d.queue.Prepare(origin+1, d.mode, pivot, latest)
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
469
	}
470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487

	fetchers := []func() error{
		func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },   // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
		func() error { return d.processHeaders(origin+1, td) },
	}
	if d.mode == FastSync {
		fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) })
	} else if d.mode == FullSync {
		fetchers = append(fetchers, d.processFullSyncContent)
	}
	err = d.spawnSync(fetchers)
	if err != nil && d.mode == FastSync && d.fsPivotLock != nil {
		// If sync failed in the critical section, bump the fail counter.
		atomic.AddUint32(&d.fsPivotFails, 1)
	}
	return err
488 489 490 491
}

// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
492
func (d *Downloader) spawnSync(fetchers []func() error) error {
493
	var wg sync.WaitGroup
494 495
	errc := make(chan error, len(fetchers))
	wg.Add(len(fetchers))
496 497 498 499 500 501
	for _, fn := range fetchers {
		fn := fn
		go func() { defer wg.Done(); errc <- fn() }()
	}
	// Wait for the first error, then terminate the others.
	var err error
502 503
	for i := 0; i < len(fetchers); i++ {
		if i == len(fetchers)-1 {
504 505 506 507 508 509 510 511 512 513
			// Close the queue when all fetchers have exited.
			// This will cause the block processor to end when
			// it has processed the queue.
			d.queue.Close()
		}
		if err = <-errc; err != nil {
			break
		}
	}
	d.queue.Close()
514
	d.Cancel()
515 516
	wg.Wait()
	return err
517 518
}

519
// Cancel cancels all of the operations and resets the queue. It returns true
520
// if the cancel operation was completed.
521
func (d *Downloader) Cancel() {
522
	// Close the current cancel channel
523
	d.cancelLock.Lock()
524 525 526 527 528 529 530
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
531 532
	}
	d.cancelLock.Unlock()
533 534
}

535
// Terminate interrupts the downloader, canceling all pending operations.
536
// The downloader cannot be reused after calling Terminate.
537
func (d *Downloader) Terminate() {
538 539 540 541 542 543 544 545 546 547
	// Close the termination channel (make sure double close is allowed)
	d.quitLock.Lock()
	select {
	case <-d.quitCh:
	default:
		close(d.quitCh)
	}
	d.quitLock.Unlock()

	// Cancel any pending download requests
548
	d.Cancel()
549 550
}

551 552
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
553
func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
P
Péter Szilágyi 已提交
554
	p.log.Debug("Retrieving remote chain height")
555 556

	// Request the advertised remote head block and wait for the response
557 558
	head, _ := p.peer.Head()
	go p.peer.RequestHeadersByHash(head, 1, 0, false)
559

560 561
	ttl := d.requestTTL()
	timeout := time.After(ttl)
562 563 564
	for {
		select {
		case <-d.cancelCh:
565
			return nil, errCancelBlockFetch
566

567
		case packet := <-d.headerCh:
568
			// Discard anything not from the origin peer
569
			if packet.PeerId() != p.id {
570
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
571 572 573
				break
			}
			// Make sure the peer actually gave something valid
574
			headers := packet.(*headerPack).headers
575
			if len(headers) != 1 {
P
Péter Szilágyi 已提交
576
				p.log.Debug("Multiple headers for single request", "headers", len(headers))
577
				return nil, errBadPeer
578
			}
579
			head := headers[0]
P
Péter Szilágyi 已提交
580
			p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash())
581
			return head, nil
582

583
		case <-timeout:
P
Péter Szilágyi 已提交
584
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
585
			return nil, errTimeout
586

587
		case <-d.bodyCh:
588 589
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
590 591 592 593
		}
	}
}

594
// findAncestor tries to locate the common ancestor link of the local chain and
595
// a remote peers blockchain. In the general case when our node was in sync and
596
// on the correct chain, checking the top N links should already get us a match.
597
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
598
// the head links match), we do a binary search to find the common ancestor.
599
func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
600
	// Figure out the valid ancestor range to prevent rewrite attacks
601
	floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
602

P
Péter Szilágyi 已提交
603
	p.log.Debug("Looking for common ancestor", "local", ceil, "remote", height)
604
	if d.mode == FullSync {
605
		ceil = d.blockchain.CurrentBlock().NumberU64()
606
	} else if d.mode == FastSync {
607
		ceil = d.blockchain.CurrentFastBlock().NumberU64()
608 609 610 611 612 613 614 615
	}
	if ceil >= MaxForkAncestry {
		floor = int64(ceil - MaxForkAncestry)
	}
	// Request the topmost blocks to short circuit binary ancestor lookup
	head := ceil
	if head > height {
		head = height
616
	}
617
	from := int64(head) - int64(MaxHeaderFetch)
618 619 620
	if from < 0 {
		from = 0
	}
621 622 623 624 625 626
	// Span out with 15 block gaps into the future to catch bad head reports
	limit := 2 * MaxHeaderFetch / 16
	count := 1 + int((int64(ceil)-from)/16)
	if count > limit {
		count = limit
	}
627
	go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
628 629 630

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
631 632 633

	ttl := d.requestTTL()
	timeout := time.After(ttl)
634 635 636 637

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
638
			return 0, errCancelHeaderFetch
639

640
		case packet := <-d.headerCh:
641
			// Discard anything not from the origin peer
642
			if packet.PeerId() != p.id {
643
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
644 645 646
				break
			}
			// Make sure the peer actually gave something valid
647
			headers := packet.(*headerPack).headers
648
			if len(headers) == 0 {
P
Péter Szilágyi 已提交
649
				p.log.Warn("Empty head header set")
650 651
				return 0, errEmptyHeaderSet
			}
652 653
			// Make sure the peer's reply conforms to the request
			for i := 0; i < len(headers); i++ {
654
				if number := headers[i].Number.Int64(); number != from+int64(i)*16 {
P
Péter Szilágyi 已提交
655
					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", from+int64(i)*16, "received", number)
656 657 658
					return 0, errInvalidChain
				}
			}
659 660 661
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
662
				// Skip any headers that underflow/overflow our requested set
663
				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > ceil {
664 665 666
					continue
				}
				// Otherwise check if we already know the header or not
667
				if (d.mode == FullSync && d.blockchain.HasBlockAndState(headers[i].Hash())) || (d.mode != FullSync && d.lightchain.HasHeader(headers[i].Hash())) {
668
					number, hash = headers[i].Number.Uint64(), headers[i].Hash()
669 670 671

					// If every header is known, even future ones, the peer straight out lied about its head
					if number > height && i == limit-1 {
P
Péter Szilágyi 已提交
672
						p.log.Warn("Lied about chain head", "reported", height, "found", number)
673 674
						return 0, errStallingPeer
					}
675 676 677 678
					break
				}
			}

679
		case <-timeout:
P
Péter Szilágyi 已提交
680
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
681 682
			return 0, errTimeout

683
		case <-d.bodyCh:
684 685
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
686 687 688 689
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
690
		if int64(number) <= floor {
P
Péter Szilágyi 已提交
691
			p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
692 693
			return 0, errInvalidAncestor
		}
P
Péter Szilágyi 已提交
694
		p.log.Debug("Found common ancestor", "number", number, "hash", hash)
695 696 697 698
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
699 700 701
	if floor > 0 {
		start = uint64(floor)
	}
702 703 704 705
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

706 707 708
		ttl := d.requestTTL()
		timeout := time.After(ttl)

709
		go p.peer.RequestHeadersByNumber(uint64(check), 1, 0, false)
710 711 712 713 714

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
715
				return 0, errCancelHeaderFetch
716

717
			case packer := <-d.headerCh:
718
				// Discard anything not from the origin peer
719
				if packer.PeerId() != p.id {
720
					log.Debug("Received headers from incorrect peer", "peer", packer.PeerId())
721 722 723
					break
				}
				// Make sure the peer actually gave something valid
724
				headers := packer.(*headerPack).headers
725
				if len(headers) != 1 {
P
Péter Szilágyi 已提交
726
					p.log.Debug("Multiple headers for single request", "headers", len(headers))
727 728 729 730 731
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
732
				if (d.mode == FullSync && !d.blockchain.HasBlockAndState(headers[0].Hash())) || (d.mode != FullSync && !d.lightchain.HasHeader(headers[0].Hash())) {
733 734 735
					end = check
					break
				}
736
				header := d.lightchain.GetHeaderByHash(headers[0].Hash()) // Independent of sync mode, header surely exists
737
				if header.Number.Uint64() != check {
P
Péter Szilágyi 已提交
738
					p.log.Debug("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
739 740 741 742
					return 0, errBadPeer
				}
				start = check

743
			case <-timeout:
P
Péter Szilágyi 已提交
744
				p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
745 746
				return 0, errTimeout

747
			case <-d.bodyCh:
748 749
			case <-d.receiptCh:
				// Out of bounds delivery, ignore
750 751 752
			}
		}
	}
753 754
	// Ensure valid ancestry and return
	if int64(start) <= floor {
P
Péter Szilágyi 已提交
755
		p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
756 757
		return 0, errInvalidAncestor
	}
P
Péter Szilágyi 已提交
758
	p.log.Debug("Found common ancestor", "number", start, "hash", hash)
759 760 761
	return start, nil
}

762 763 764 765 766
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
767
// other peers are only accepted if they map cleanly to the skeleton. If no one
768 769
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
770
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
P
Péter Szilágyi 已提交
771 772
	p.log.Debug("Directing header downloads", "origin", from)
	defer p.log.Debug("Header download terminated")
773

774 775 776
	// Create a timeout timer, and the associated header fetcher
	skeleton := true            // Skeleton assembly phase or finishing up
	request := time.Now()       // time of the last skeleton fetch request
777 778 779 780
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

781
	var ttl time.Duration
782
	getHeaders := func(from uint64) {
783
		request = time.Now()
784 785 786

		ttl = d.requestTTL()
		timeout.Reset(ttl)
787

788
		if skeleton {
P
Péter Szilágyi 已提交
789
			p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
790
			go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
791
		} else {
P
Péter Szilágyi 已提交
792
			p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
793
			go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
794
		}
795
	}
796
	// Start pulling the header chain skeleton until all is done
797 798 799 800 801 802 803
	getHeaders(from)

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderFetch

804
		case packet := <-d.headerCh:
805
			// Make sure the active peer is giving us the skeleton headers
806
			if packet.PeerId() != p.id {
807
				log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
808 809
				break
			}
810
			headerReqTimer.UpdateSince(request)
811 812
			timeout.Stop()

813 814 815 816 817 818
			// If the skeleton's finished, pull any remaining head headers directly from the origin
			if packet.Items() == 0 && skeleton {
				skeleton = false
				getHeaders(from)
				continue
			}
819
			// If no more headers are inbound, notify the content fetchers and return
820
			if packet.Items() == 0 {
P
Péter Szilágyi 已提交
821
				p.log.Debug("No more headers available")
822 823 824 825 826 827
				select {
				case d.headerProcCh <- nil:
					return nil
				case <-d.cancelCh:
					return errCancelHeaderFetch
				}
828
			}
829
			headers := packet.(*headerPack).headers
830

831 832
			// If we received a skeleton batch, resolve internals concurrently
			if skeleton {
833
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
834
				if err != nil {
P
Péter Szilágyi 已提交
835
					p.log.Debug("Skeleton chain invalid", "err", err)
836 837
					return errInvalidChain
				}
838 839
				headers = filled[proced:]
				from += uint64(proced)
840
			}
841
			// Insert all the new headers and fetch the next batch
842
			if len(headers) > 0 {
P
Péter Szilágyi 已提交
843
				p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
844 845 846 847 848 849
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
					return errCancelHeaderFetch
				}
				from += uint64(len(headers))
850
			}
851 852 853 854
			getHeaders(from)

		case <-timeout.C:
			// Header retrieval timed out, consider the peer bad and drop
P
Péter Szilágyi 已提交
855
			p.log.Debug("Header request timed out", "elapsed", ttl)
856
			headerTimeoutMeter.Mark(1)
857 858 859
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
860
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
861 862 863 864
				select {
				case ch <- false:
				case <-d.cancelCh:
				}
865
			}
866 867 868 869 870
			select {
			case d.headerProcCh <- nil:
			case <-d.cancelCh:
			}
			return errBadPeer
871 872 873 874
		}
	}
}

875 876
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
877 878 879 880 881 882 883 884
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
// The method returs the entire filled skeleton and also the number of headers
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
885
	log.Debug("Filling up skeleton", "from", from)
886 887 888 889 890
	d.queue.ScheduleSkeleton(from, skeleton)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
891
			return d.queue.DeliverHeaders(pack.peerId, pack.headers, d.headerProcCh)
892
		}
893
		expire   = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
894
		throttle = func() bool { return false }
895
		reserve  = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
896 897
			return d.queue.ReserveHeaders(p, count), false, nil
		}
898 899 900
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
		capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
901 902 903
	)
	err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
		d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
904
		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
905

906
	log.Debug("Skeleton fill terminated", "err", err)
907 908 909

	filled, proced := d.queue.RetrieveHeaders()
	return filled, proced, err
910 911
}

912 913 914 915
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
916
	log.Debug("Downloading block bodies", "origin", from)
917

918
	var (
919
		deliver = func(packet dataPack) (int, error) {
920
			pack := packet.(*bodyPack)
921
			return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
922
		}
923
		expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
924 925 926
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
927
	)
928
	err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
929
		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
930
		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
931

932
	log.Debug("Block body download terminated", "err", err)
933 934 935 936 937 938 939
	return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
940
	log.Debug("Downloading transaction receipts", "origin", from)
941 942

	var (
943
		deliver = func(packet dataPack) (int, error) {
944 945 946
			pack := packet.(*receiptPack)
			return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
		}
947
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
948 949 950
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
		setIdle  = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
951
	)
952
	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
953
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
954
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
955

956
	log.Debug("Transaction receipt download terminated", "err", err)
957 958 959 960 961 962
	return err
}

// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
//  - errCancel:   error type to return if the fetch operation is cancelled (mostly makes logging nicer)
//  - deliveryCh:  channel from which to retrieve downloaded data packets (merged from all concurrent peers)
//  - deliver:     processing callback to deliver data packets into type specific download queues (usually within `queue`)
//  - wakeCh:      notification channel for waking the fetcher when new tasks are available (or sync completed)
//  - expire:      task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
//  - pending:     task callback for the number of requests still needing download (detect completion/non-completability)
//  - inFlight:    task callback for the number of in-progress requests (wait for all active downloads to finish)
//  - throttle:    task callback to check if the processing queue is full and activate throttling (bound memory use)
//  - reserve:     task callback to reserve new download tasks to a particular peer (also signals partial completions)
//  - fetchHook:   tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
//  - fetch:       network callback to actually send a particular download request to a physical remote peer
//  - cancel:      task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
P
Péter Szilágyi 已提交
981
//  - capacity:    network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
982 983 984
//  - idle:        network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
//  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
//  - kind:        textual label of the type being downloaded to display in log mesages
985
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
986 987 988
	expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
	fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
989

990
	// Create a ticker to detect expired retrieval tasks
991 992 993 994 995
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

996
	// Prepare the queue and fetch block parts until the block header fetcher's done
997 998 999 1000
	finished := false
	for {
		select {
		case <-d.cancelCh:
1001
			return errCancel
1002

1003
		case packet := <-deliveryCh:
1004 1005
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
1006
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
1007 1008 1009
				// Deliver the received chunk of data and check chain validity
				accepted, err := deliver(packet)
				if err == errInvalidChain {
1010
					return err
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020
				}
				// Unless a peer delivered something completely else than requested (usually
				// caused by a timed out request which came through in the end), set it to
				// idle. If the delivery's stale, the peer should have already been idled.
				if err != errStaleDelivery {
					setIdle(peer, accepted)
				}
				// Issue a log to the user to see what's going on
				switch {
				case err == nil && packet.Items() == 0:
P
Péter Szilágyi 已提交
1021
					peer.log.Trace("Requested data not delivered", "type", kind)
1022
				case err == nil:
P
Péter Szilágyi 已提交
1023
					peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
1024
				default:
P
Péter Szilágyi 已提交
1025
					peer.log.Trace("Failed to deliver retrieved data", "type", kind, "err", err)
1026 1027 1028 1029 1030 1031 1032 1033
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

1034
		case cont := <-wakeCh:
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
1057
			// Check for fetch request timeouts and demote the responsible peers
1058
			for pid, fails := range expire() {
1059
				if peer := d.peers.Peer(pid); peer != nil {
1060 1061 1062 1063 1064 1065 1066 1067
					// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
					// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
					// out that sync wise we need to get rid of the peer.
					//
					// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
					// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
					if fails > 2 {
P
Péter Szilágyi 已提交
1068
						peer.log.Trace("Data delivery timed out", "type", kind)
1069 1070
						setIdle(peer, 0)
					} else {
P
Péter Szilágyi 已提交
1071
						peer.log.Debug("Stalling delivery, dropping", "type", kind)
1072 1073
						d.dropPeer(pid)
					}
1074 1075
				}
			}
1076 1077
			// If there's nothing more to fetch, wait or terminate
			if pending() == 0 {
1078
				if !inFlight() && finished {
1079
					log.Debug("Data fetching completed", "type", kind)
1080 1081 1082 1083 1084
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
1085
			progressed, throttled, running := false, false, inFlight()
1086 1087 1088
			idles, total := idle()

			for _, peer := range idles {
1089
				// Short circuit if throttling activated
1090
				if throttle() {
1091 1092 1093
					throttled = true
					break
				}
1094 1095
				// Reserve a chunk of fetches for a peer. A nil can mean either that
				// no more headers are available, or that the peer is known not to
1096
				// have them.
1097
				request, progress, err := reserve(peer, capacity(peer))
1098 1099 1100
				if err != nil {
					return err
				}
1101 1102
				if progress {
					progressed = true
1103 1104 1105 1106
				}
				if request == nil {
					continue
				}
1107
				if request.From > 0 {
P
Péter Szilágyi 已提交
1108
					peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
1109
				} else if len(request.Headers) > 0 {
P
Péter Szilágyi 已提交
1110
					peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
1111
				} else {
P
Péter Szilágyi 已提交
1112
					peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Hashes))
1113
				}
1114
				// Fetch the chunk and make sure any errors return the hashes to the queue
1115 1116
				if fetchHook != nil {
					fetchHook(request.Headers)
1117
				}
1118
				if err := fetch(peer, request); err != nil {
1119 1120 1121 1122 1123
					// Although we could try and make an attempt to fix this, this error really
					// means that we've double allocated a fetch task to a peer. If that is the
					// case, the internal state of the downloader and the queue is very wrong so
					// better hard crash and note the error instead of silently accumulating into
					// a much bigger issue.
1124
					panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
1125
				}
1126
				running = true
1127 1128 1129
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
1130
			if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
1131 1132 1133 1134 1135 1136
				return errPeersUnavailable
			}
		}
	}
}

1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
	// Calculate the pivoting point for switching from fast to slow sync
	pivot := d.queue.FastSyncPivot()

	// Keep a count of uncertain headers to roll back
	rollback := []*types.Header{}
	defer func() {
		if len(rollback) > 0 {
			// Flatten the headers and roll them back
			hashes := make([]common.Hash, len(rollback))
			for i, header := range rollback {
				hashes[i] = header.Hash()
			}
1153 1154
			lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
			if d.mode != LightSync {
1155 1156
				lastFastBlock = d.blockchain.CurrentFastBlock().Number()
				lastBlock = d.blockchain.CurrentBlock().Number()
1157
			}
1158
			d.lightchain.Rollback(hashes)
1159
			curFastBlock, curBlock := common.Big0, common.Big0
1160
			if d.mode != LightSync {
1161 1162
				curFastBlock = d.blockchain.CurrentFastBlock().Number()
				curBlock = d.blockchain.CurrentBlock().Number()
1163
			}
1164
			log.Warn("Rolled back headers", "count", len(hashes),
1165
				"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
1166 1167
				"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock))
1168

1169
			// If we're already past the pivot point, this could be an attack, thread carefully
1170
			if rollback[len(rollback)-1].Number.Uint64() > pivot {
1171
				// If we didn't ever fail, lock in te pivot header (must! not! change!)
1172
				if atomic.LoadUint32(&d.fsPivotFails) == 0 {
1173 1174
					for _, header := range rollback {
						if header.Number.Uint64() == pivot {
1175
							log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash())
1176 1177 1178 1179
							d.fsPivotLock = header
						}
					}
				}
1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195
			}
		}
	}()

	// Wait for batches of headers to process
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderProcessing

		case headers := <-d.headerProcCh:
			// Terminate header processing if we synced up
			if len(headers) == 0 {
				// Notify everyone that headers are fully processed
1196
				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
				}
				// If no headers were retrieved at all, the peer violated it's TD promise that it had a
				// better chain compared to ours. The only exception is if it's promised blocks were
				// already imported by other means (e.g. fecher):
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
1214
				if d.mode != LightSync {
1215
					if !gotHeaders && td.Cmp(d.blockchain.GetTdByHash(d.blockchain.CurrentBlock().Hash())) > 0 {
1216 1217
						return errStallingPeer
					}
1218 1219 1220 1221 1222 1223 1224 1225 1226
				}
				// If fast or light syncing, ensure promised headers are indeed delivered. This is
				// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
				// of delivering the post-pivot blocks that would flag the invalid content.
				//
				// This check cannot be executed "as is" for full imports, since blocks may still be
				// queued for processing when the header download completes. However, as long as the
				// peer gave us something useful, we're already happy/progressed (above check).
				if d.mode == FastSync || d.mode == LightSync {
1227
					if td.Cmp(d.lightchain.GetTdByHash(d.lightchain.CurrentHeader().Hash())) > 0 {
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
						return errStallingPeer
					}
				}
				// Disable any rollback and return
				rollback = nil
				return nil
			}
			// Otherwise split the chunk of headers into batches and process them
			gotHeaders = true

			for len(headers) > 0 {
				// Terminate if something failed in between processing chunks
				select {
				case <-d.cancelCh:
					return errCancelHeaderProcessing
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]

				// In case of header only syncing, validate the chunk immediately
				if d.mode == FastSync || d.mode == LightSync {
					// Collect the yet unknown headers to mark them as uncertain
					unknown := make([]*types.Header, 0, len(headers))
					for _, header := range chunk {
1257
						if !d.lightchain.HasHeader(header.Hash()) {
1258 1259 1260 1261 1262 1263 1264 1265
							unknown = append(unknown, header)
						}
					}
					// If we're importing pure headers, verify based on their recentness
					frequency := fsHeaderCheckFrequency
					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
						frequency = 1
					}
1266
					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1267 1268 1269 1270
						// If some headers were inserted, add them too to the rollback list
						if n > 0 {
							rollback = append(rollback, chunk[:n]...)
						}
1271
						log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err)
1272 1273 1274 1275 1276 1277 1278 1279
						return errInvalidChain
					}
					// All verifications passed, store newly found uncertain headers
					rollback = append(rollback, unknown...)
					if len(rollback) > fsHeaderSafetyNet {
						rollback = append(rollback[:0], rollback[len(rollback)-fsHeaderSafetyNet:]...)
					}
				}
1280 1281 1282
				// If we're fast syncing and just pulled in the pivot, make sure it's the one locked in
				if d.mode == FastSync && d.fsPivotLock != nil && chunk[0].Number.Uint64() <= pivot && chunk[len(chunk)-1].Number.Uint64() >= pivot {
					if pivot := chunk[int(pivot-chunk[0].Number.Uint64())]; pivot.Hash() != d.fsPivotLock.Hash() {
1283
						log.Warn("Pivot doesn't match locked in one", "remoteNumber", pivot.Number, "remoteHash", pivot.Hash(), "localNumber", d.fsPivotLock.Number, "localHash", d.fsPivotLock.Hash())
1284 1285 1286
						return errInvalidChain
					}
				}
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
				// Unless we're doing light chains, schedule the headers for associated content retrieval
				if d.mode == FullSync || d.mode == FastSync {
					// If we've reached the allowed number of pending headers, stall a bit
					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
						select {
						case <-d.cancelCh:
							return errCancelHeaderProcessing
						case <-time.After(time.Second):
						}
					}
					// Otherwise insert the headers for content retrieval
					inserts := d.queue.Schedule(chunk, origin)
					if len(inserts) != len(chunk) {
1300
						log.Debug("Stale headers")
1301 1302 1303 1304 1305 1306 1307
						return errBadPeer
					}
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
			// Signal the content downloaders of the availablility of new tasks
1308
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1309 1310 1311 1312 1313 1314 1315 1316 1317
				select {
				case ch <- true:
				default:
				}
			}
		}
	}
}

1318 1319
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
1320
	for {
1321
		results := d.queue.WaitResults()
1322
		if len(results) == 0 {
1323
			return nil
1324
		}
1325
		if d.chainInsertHook != nil {
1326
			d.chainInsertHook(results)
1327
		}
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344
		if err := d.importBlockResults(results); err != nil {
			return err
		}
	}
}

func (d *Downloader) importBlockResults(results []*fetchResult) error {
	for len(results) != 0 {
		// Check for any termination requests. This makes clean shutdown faster.
		select {
		case <-d.quitCh:
			return errCancelContentProcessing
		default:
		}
		// Retrieve the a batch of results to import
		items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
		first, last := results[0].Header, results[items-1].Header
1345
		log.Debug("Inserting downloaded chain", "items", len(results),
1346 1347 1348
			"firstnum", first.Number, "firsthash", first.Hash(),
			"lastnum", last.Number, "lasthash", last.Hash(),
		)
1349 1350 1351 1352
		blocks := make([]*types.Block, items)
		for i, result := range results[:items] {
			blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
		}
1353
		if index, err := d.blockchain.InsertChain(blocks); err != nil {
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392
			log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
			return errInvalidChain
		}
		// Shift the results to the next batch
		results = results[items:]
	}
	return nil
}

// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent(latest *types.Header) error {
	// Start syncing state of the reported head block.
	// This should get us most of the state of the pivot block.
	stateSync := d.syncState(latest.Root)
	defer stateSync.Cancel()
	go func() {
		if err := stateSync.Wait(); err != nil {
			d.queue.Close() // wake up WaitResults
		}
	}()

	pivot := d.queue.FastSyncPivot()
	for {
		results := d.queue.WaitResults()
		if len(results) == 0 {
			return stateSync.Cancel()
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
		P, beforeP, afterP := splitAroundPivot(pivot, results)
		if err := d.commitFastSyncData(beforeP, stateSync); err != nil {
			return err
		}
		if P != nil {
			stateSync.Cancel()
			if err := d.commitPivotBlock(P); err != nil {
				return err
1393

1394
			}
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425
		}
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
	for _, result := range results {
		num := result.Header.Number.Uint64()
		switch {
		case num < pivot:
			before = append(before, result)
		case num == pivot:
			p = result
		default:
			after = append(after, result)
		}
	}
	return p, before, after
}

func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
	for len(results) != 0 {
		// Check for any termination requests.
		select {
		case <-d.quitCh:
			return errCancelContentProcessing
		case <-stateSync.done:
			if err := stateSync.Wait(); err != nil {
				return err
1426
			}
1427
		default:
1428
		}
1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
		// Retrieve the a batch of results to import
		items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
		first, last := results[0].Header, results[items-1].Header
		log.Debug("Inserting fast-sync blocks", "items", len(results),
			"firstnum", first.Number, "firsthash", first.Hash(),
			"lastnumn", last.Number, "lasthash", last.Hash(),
		)
		blocks := make([]*types.Block, items)
		receipts := make([]types.Receipts, items)
		for i, result := range results[:items] {
			blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
			receipts[i] = result.Receipts
		}
1442
		if index, err := d.blockchain.InsertReceiptChain(blocks, receipts); err != nil {
1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
			log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
			return errInvalidChain
		}
		// Shift the results to the next batch
		results = results[items:]
	}
	return nil
}

func (d *Downloader) commitPivotBlock(result *fetchResult) error {
	b := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	// Sync the pivot block state. This should complete reasonably quickly because
	// we've already synced up to the reported head block state earlier.
	if err := d.syncState(b.Root()).Wait(); err != nil {
		return err
	}
	log.Debug("Committing fast sync pivot as new head", "number", b.Number(), "hash", b.Hash())
1460
	if _, err := d.blockchain.InsertReceiptChain([]*types.Block{b}, []types.Receipts{result.Receipts}); err != nil {
1461
		return err
1462
	}
1463
	return d.blockchain.FastSyncCommitHead(b.Hash())
1464 1465
}

L
Leif Jurvetson 已提交
1466
// DeliverHeaders injects a new batch of block headers received from a remote
1467
// node into the download schedule.
1468
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
1469
	return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
1470 1471 1472
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
1473
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
1474
	return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
1475 1476 1477 1478
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488
	return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}

// DeliverNodeData injects a new batch of node state data received from a remote node.
func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
	return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}

// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
1489
	// Update the delivery metrics for both good and failed deliveries
1490
	inMeter.Mark(int64(packet.Items()))
1491 1492
	defer func() {
		if err != nil {
1493
			dropMeter.Mark(int64(packet.Items()))
1494 1495 1496 1497 1498 1499
		}
	}()
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1500 1501 1502
	if cancel == nil {
		return errNoSyncActive
	}
1503
	select {
1504
	case destCh <- packet:
1505 1506 1507 1508
		return nil
	case <-cancel:
		return errNoSyncActive
	}
1509
}
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524

// qosTuner is the quality of service tuning loop that occasionally gathers the
// peer latency statistics and updates the estimated request round trip time.
func (d *Downloader) qosTuner() {
	for {
		// Retrieve the current median RTT and integrate into the previoust target RTT
		rtt := time.Duration(float64(1-qosTuningImpact)*float64(atomic.LoadUint64(&d.rttEstimate)) + qosTuningImpact*float64(d.peers.medianRTT()))
		atomic.StoreUint64(&d.rttEstimate, uint64(rtt))

		// A new RTT cycle passed, increase our confidence in the estimated RTT
		conf := atomic.LoadUint64(&d.rttConfidence)
		conf = conf + (1000000-conf)/2
		atomic.StoreUint64(&d.rttConfidence, conf)

		// Log the new QoS values and sleep until the next RTT
1525
		log.Debug("Recalculated downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538
		select {
		case <-d.quitCh:
			return
		case <-time.After(rtt):
		}
	}
}

// qosReduceConfidence is meant to be called when a new peer joins the downloader's
// peer set, needing to reduce the confidence we have in out QoS estimates.
func (d *Downloader) qosReduceConfidence() {
	// If we have a single peer, confidence is always 1
	peers := uint64(d.peers.Len())
1539 1540 1541 1542
	if peers == 0 {
		// Ensure peer connectivity races don't catch us off guard
		return
	}
1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558
	if peers == 1 {
		atomic.StoreUint64(&d.rttConfidence, 1000000)
		return
	}
	// If we have a ton of peers, don't drop confidence)
	if peers >= uint64(qosConfidenceCap) {
		return
	}
	// Otherwise drop the confidence factor
	conf := atomic.LoadUint64(&d.rttConfidence) * (peers - 1) / peers
	if float64(conf)/1000000 < rttMinConfidence {
		conf = uint64(rttMinConfidence * 1000000)
	}
	atomic.StoreUint64(&d.rttConfidence, conf)

	rtt := time.Duration(atomic.LoadUint64(&d.rttEstimate))
1559
	log.Debug("Relaxed downloader QoS values", "rtt", rtt, "confidence", float64(conf)/1000000.0, "ttl", d.requestTTL())
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
}

// requestRTT returns the current target round trip time for a download request
// to complete in.
//
// Note, the returned RTT is .9 of the actually estimated RTT. The reason is that
// the downloader tries to adapt queries to the RTT, so multiple RTT values can
// be adapted to, but smaller ones are preffered (stabler download stream).
func (d *Downloader) requestRTT() time.Duration {
	return time.Duration(atomic.LoadUint64(&d.rttEstimate)) * 9 / 10
}

// requestTTL returns the current timeout allowance for a single download request
// to finish under.
func (d *Downloader) requestTTL() time.Duration {
	var (
		rtt  = time.Duration(atomic.LoadUint64(&d.rttEstimate))
		conf = float64(atomic.LoadUint64(&d.rttConfidence)) / 1000000.0
	)
	ttl := time.Duration(ttlScaling) * time.Duration(float64(rtt)/conf)
	if ttl > ttlLimit {
		ttl = ttlLimit
	}
	return ttl
}