downloader.go 57.2 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"errors"
22
	"math"
23
	"math/big"
24
	"strings"
25 26 27 28 29 30
	"sync"
	"sync/atomic"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
31
	"github.com/ethereum/go-ethereum/ethdb"
32
	"github.com/ethereum/go-ethereum/event"
33 34
	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
35
	"github.com/rcrowley/go-metrics"
36 37
)

38
var (
39 40 41 42 43
	MaxHashFetch    = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
	MaxBodyFetch    = 128 // Amount of block bodies to be fetched per retrieval request
	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
44
	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
45 46 47 48 49 50 51 52

	hashTTL        = 5 * time.Second    // [eth/61] Time it takes for a hash request to time out
	blockSoftTTL   = 3 * time.Second    // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
	blockHardTTL   = 3 * blockSoftTTL   // [eth/61] Maximum time allowance before a block request is considered expired
	headerTTL      = 5 * time.Second    // [eth/62] Time it takes for a header request to time out
	bodySoftTTL    = 3 * time.Second    // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
	bodyHardTTL    = 3 * bodySoftTTL    // [eth/62] Maximum time allowance before a block body request is considered expired
	receiptSoftTTL = 3 * time.Second    // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
53 54 55
	receiptHardTTL = 3 * receiptSoftTTL // [eth/63] Maximum time allowance before a receipt request is considered expired
	stateSoftTTL   = 2 * time.Second    // [eth/63] Request completion threshold for increasing or decreasing a peer's bandwidth
	stateHardTTL   = 3 * stateSoftTTL   // [eth/63] Maximum time allowance before a node data request is considered expired
56 57 58

	maxQueuedHashes   = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
	maxQueuedHeaders  = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
59
	maxQueuedStates   = 256 * 1024 // [eth/63] Maximum number of state requests to queue (DOS protection)
60 61 62 63 64
	maxResultsProcess = 256        // Number of download results to import at once into the chain

	headerCheckFrequency = 64   // Verification frequency of the downloaded headers during fast sync
	minCheckedHeaders    = 1024 // Number of headers to verify fully when approaching the chain head
	minFullBlocks        = 1024 // Number of blocks to retrieve fully even in fast sync
65
)
66

67
var (
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
	errBusy               = errors.New("busy")
	errUnknownPeer        = errors.New("peer is unknown or unhealthy")
	errBadPeer            = errors.New("action from bad peer ignored")
	errStallingPeer       = errors.New("peer is stalling")
	errNoPeers            = errors.New("no peers to keep download active")
	errPendingQueue       = errors.New("pending items in queue")
	errTimeout            = errors.New("timeout")
	errEmptyHashSet       = errors.New("empty hash set by peer")
	errEmptyHeaderSet     = errors.New("empty header set by peer")
	errPeersUnavailable   = errors.New("no peers available or all tried for download")
	errAlreadyInPool      = errors.New("hash already in pool")
	errInvalidChain       = errors.New("retrieved hash chain is invalid")
	errInvalidBlock       = errors.New("retrieved block is invalid")
	errInvalidBody        = errors.New("retrieved block body is invalid")
	errInvalidReceipt     = errors.New("retrieved receipt is invalid")
	errCancelHashFetch    = errors.New("hash download canceled (requested)")
	errCancelBlockFetch   = errors.New("block download canceled (requested)")
	errCancelHeaderFetch  = errors.New("block header download canceled (requested)")
	errCancelBodyFetch    = errors.New("block body download canceled (requested)")
	errCancelReceiptFetch = errors.New("receipt download canceled (requested)")
	errNoSyncActive       = errors.New("no sync active")
89 90
)

91
type Downloader struct {
92 93
	mode SyncMode       // Synchronisation mode defining the strategies used
	mux  *event.TypeMux // Event multiplexer to announce sync operation events
94

95 96
	queue *queue   // Scheduler for selecting the hashes to download
	peers *peerSet // Set of active peers from which download can proceed
97

98 99
	interrupt int32 // Atomic boolean to signal termination

100
	// Statistics
101 102 103 104 105
	syncStatsChainOrigin uint64       // Origin block number where syncing started at
	syncStatsChainHeight uint64       // Highest block number known when syncing started
	syncStatsStateTotal  uint64       // Total number of node state entries known so far
	syncStatsStateDone   uint64       // Number of state trie entries already pulled
	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
106

107
	// Callbacks
108 109 110 111 112 113 114 115 116 117 118 119 120
	hasHeader       headerCheckFn            // Checks if a header is present in the chain
	hasBlock        blockCheckFn             // Checks if a block is present in the chain
	getHeader       headerRetrievalFn        // Retrieves a header from the chain
	getBlock        blockRetrievalFn         // Retrieves a block from the chain
	headHeader      headHeaderRetrievalFn    // Retrieves the head header from the chain
	headBlock       headBlockRetrievalFn     // Retrieves the head block from the chain
	headFastBlock   headFastBlockRetrievalFn // Retrieves the head fast-sync block from the chain
	commitHeadBlock headBlockCommitterFn     // Commits a manually assembled block as the chain head
	getTd           tdRetrievalFn            // Retrieves the TD of a block from the chain
	insertHeaders   headerChainInsertFn      // Injects a batch of headers into the chain
	insertBlocks    blockChainInsertFn       // Injects a batch of blocks into the chain
	insertReceipts  receiptChainInsertFn     // Injects a batch of blocks and their receipts into the chain
	dropPeer        peerDropFn               // Drops a peer for misbehaving
121

122
	// Status
123 124
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
125
	processing      int32
126
	notified        int32
127 128

	// Channels
129
	newPeerCh     chan *peer
130 131 132 133 134 135 136 137 138 139
	hashCh        chan dataPack // [eth/61] Channel receiving inbound hashes
	blockCh       chan dataPack // [eth/61] Channel receiving inbound blocks
	headerCh      chan dataPack // [eth/62] Channel receiving inbound block headers
	bodyCh        chan dataPack // [eth/62] Channel receiving inbound block bodies
	receiptCh     chan dataPack // [eth/63] Channel receiving inbound receipts
	stateCh       chan dataPack // [eth/63] Channel receiving inbound node state data
	blockWakeCh   chan bool     // [eth/61] Channel to signal the block fetcher of new tasks
	bodyWakeCh    chan bool     // [eth/62] Channel to signal the block body fetcher of new tasks
	receiptWakeCh chan bool     // [eth/63] Channel to signal the receipt fetcher of new tasks
	stateWakeCh   chan bool     // [eth/63] Channel to signal the state fetcher of new tasks
140 141 142

	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex  // Lock to protect the cancel channel in delivers
143 144

	// Testing hooks
145 146 147 148
	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
149 150
}

151
// New creates a new downloader to fetch hashes and blocks from remote peers.
152 153 154 155
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlock blockCheckFn, getHeader headerRetrievalFn,
	getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn,
	commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn,
	insertReceipts receiptChainInsertFn, dropPeer peerDropFn) *Downloader {
156

157
	return &Downloader{
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
		mode:            mode,
		mux:             mux,
		queue:           newQueue(stateDb),
		peers:           newPeerSet(),
		hasHeader:       hasHeader,
		hasBlock:        hasBlock,
		getHeader:       getHeader,
		getBlock:        getBlock,
		headHeader:      headHeader,
		headBlock:       headBlock,
		headFastBlock:   headFastBlock,
		commitHeadBlock: commitHeadBlock,
		getTd:           getTd,
		insertHeaders:   insertHeaders,
		insertBlocks:    insertBlocks,
		insertReceipts:  insertReceipts,
		dropPeer:        dropPeer,
		newPeerCh:       make(chan *peer, 1),
		hashCh:          make(chan dataPack, 1),
		blockCh:         make(chan dataPack, 1),
		headerCh:        make(chan dataPack, 1),
		bodyCh:          make(chan dataPack, 1),
		receiptCh:       make(chan dataPack, 1),
		stateCh:         make(chan dataPack, 1),
		blockWakeCh:     make(chan bool, 1),
		bodyWakeCh:      make(chan bool, 1),
		receiptWakeCh:   make(chan bool, 1),
		stateWakeCh:     make(chan bool, 1),
186 187 188
	}
}

189 190 191 192 193 194
// Boundaries retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended) and the
// latest known block which the synchonisation targets.
func (d *Downloader) Boundaries() (uint64, uint64) {
	d.syncStatsLock.RLock()
	defer d.syncStatsLock.RUnlock()
195

196
	return d.syncStatsChainOrigin, d.syncStatsChainHeight
O
obscuren 已提交
197 198
}

199
// Synchronising returns whether the downloader is currently retrieving blocks.
200 201 202 203
func (d *Downloader) Synchronising() bool {
	return atomic.LoadInt32(&d.synchronising) > 0
}

204 205
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
206 207
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
	getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
208 209
	getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
	getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
210

211
	glog.V(logger.Detail).Infoln("Registering peer", id)
212
	if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
213 214 215
		glog.V(logger.Error).Infoln("Register failed:", err)
		return err
	}
216 217 218
	return nil
}

219
// UnregisterPeer remove a peer from the known list, preventing any action from
220 221
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
222 223 224 225 226 227
func (d *Downloader) UnregisterPeer(id string) error {
	glog.V(logger.Detail).Infoln("Unregistering peer", id)
	if err := d.peers.Unregister(id); err != nil {
		glog.V(logger.Error).Infoln("Unregister failed:", err)
		return err
	}
228
	d.queue.Revoke(id)
229
	return nil
230 231
}

232 233
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
234
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int) {
235
	glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
236

237
	switch err := d.synchronise(id, head, td); err {
238 239 240 241 242 243
	case nil:
		glog.V(logger.Detail).Infof("Synchronisation completed")

	case errBusy:
		glog.V(logger.Detail).Infof("Synchronisation already in progress")

244
	case errTimeout, errBadPeer, errStallingPeer, errEmptyHashSet, errEmptyHeaderSet, errPeersUnavailable, errInvalidChain:
245 246 247 248 249 250 251 252 253 254 255 256
		glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
		d.dropPeer(id)

	case errPendingQueue:
		glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)

	default:
		glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
	}
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
257
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
258
// checks fail an error will be returned. This method is synchronous
259
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error {
260 261 262 263
	// Mock out the synchonisation if testing
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
264
	// Make sure only one goroutine is ever allowed past this point at once
265
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
266
		return errBusy
267
	}
268
	defer atomic.StoreInt32(&d.synchronising, 0)
269

270 271 272 273
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
		glog.V(logger.Info).Infoln("Block synchronisation started")
	}
274
	// Abort if the queue still contains some leftover data
275
	if d.queue.GetHeadResult() != nil {
276
		return errPendingQueue
277
	}
278
	// Reset the queue, peer set and wake channels to clean any internal leftover state
279
	d.queue.Reset()
280
	d.peers.Reset()
281

282
	for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
283 284 285 286
		select {
		case <-ch:
		default:
		}
287
	}
288 289 290 291 292 293
	// Reset and ephemeral sync statistics
	d.syncStatsLock.Lock()
	d.syncStatsStateTotal = 0
	d.syncStatsStateDone = 0
	d.syncStatsLock.Unlock()

294 295 296 297 298
	// Create cancel channel for aborting mid-flight
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
	d.cancelLock.Unlock()

299
	// Retrieve the origin peer and initiate the downloading process
300
	p := d.peers.Peer(id)
301
	if p == nil {
302
		return errUnknownPeer
303
	}
304
	return d.syncWithPeer(p, hash, td)
305 306
}

307
/*
308 309
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
310
func (d *Downloader) Has(hash common.Hash) bool {
311
	return d.queue.Has(hash)
312
}
313
*/
314 315
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
316
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
317
	d.mux.Post(StartEvent{})
318 319 320
	defer func() {
		// reset on error
		if err != nil {
321
			d.cancel()
322 323 324
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
325 326
		}
	}()
327

328
	glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
329 330 331
	defer func(start time.Time) {
		glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
	}(time.Now())
332 333

	switch {
334
	case p.version == 61:
335 336 337 338 339 340
		// Look up the sync boundaries: the common ancestor and the target block
		latest, err := d.fetchHeight61(p)
		if err != nil {
			return err
		}
		origin, err := d.findAncestor61(p)
341 342 343
		if err != nil {
			return err
		}
344
		d.syncStatsLock.Lock()
345 346
		if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
			d.syncStatsChainOrigin = origin
347
		}
348
		d.syncStatsChainHeight = latest
349 350 351 352 353 354
		d.syncStatsLock.Unlock()

		// Initiate the sync using a  concurrent hash and block retrieval algorithm
		if d.syncInitHook != nil {
			d.syncInitHook(origin, latest)
		}
355
		d.queue.Prepare(origin+1, d.mode, 0)
356

357
		errc := make(chan error, 2)
358 359
		go func() { errc <- d.fetchHashes61(p, td, origin+1) }()
		go func() { errc <- d.fetchBlocks61(origin + 1) }()
360 361 362 363 364 365 366 367 368

		// If any fetcher fails, cancel the other
		if err := <-errc; err != nil {
			d.cancel()
			<-errc
			return err
		}
		return <-errc

369
	case p.version >= 62:
370 371
		// Look up the sync boundaries: the common ancestor and the target block
		latest, err := d.fetchHeight(p)
372 373 374
		if err != nil {
			return err
		}
375 376 377 378 379
		origin, err := d.findAncestor(p)
		if err != nil {
			return err
		}
		d.syncStatsLock.Lock()
380 381
		if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
			d.syncStatsChainOrigin = origin
382
		}
383
		d.syncStatsChainHeight = latest
384 385
		d.syncStatsLock.Unlock()

386
		// Initiate the sync using a  concurrent header and content retrieval algorithm
387 388 389
		pivot := uint64(0)
		if latest > uint64(minFullBlocks) {
			pivot = latest - uint64(minFullBlocks)
390
		}
391
		d.queue.Prepare(origin+1, d.mode, pivot)
392

393 394 395
		if d.syncInitHook != nil {
			d.syncInitHook(origin, latest)
		}
396
		errc := make(chan error, 4)
397
		go func() { errc <- d.fetchHeaders(p, td, origin+1) }() // Headers are always retrieved
398
		go func() { errc <- d.fetchBodies(origin + 1) }()       // Bodies are retrieved during normal and fast sync
399
		go func() { errc <- d.fetchReceipts(origin + 1) }()     // Receipts are retrieved during fast sync
400
		go func() { errc <- d.fetchNodeData() }()               // Node state data is retrieved during fast sync
401 402 403 404 405 406 407 408 409 410

		// If any fetcher fails, cancel the others
		var fail error
		for i := 0; i < cap(errc); i++ {
			if err := <-errc; err != nil {
				if fail == nil {
					fail = err
					d.cancel()
				}
			}
411
		}
412
		return fail
413 414 415 416 417

	default:
		// Something very wrong, stop right here
		glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
		return errBadPeer
418
	}
419
	return nil
420 421
}

422
// cancel cancels all of the operations and resets the queue. It returns true
423
// if the cancel operation was completed.
424
func (d *Downloader) cancel() {
425
	// Close the current cancel channel
426
	d.cancelLock.Lock()
427 428 429 430 431 432 433
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
434 435
	}
	d.cancelLock.Unlock()
436

437
	// Reset the queue
438 439 440
	d.queue.Reset()
}

441 442 443 444 445 446
// Terminate interrupts the downloader, canceling all pending operations.
func (d *Downloader) Terminate() {
	atomic.StoreInt32(&d.interrupt, 1)
	d.cancel()
}

447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
// fetchHeight61 retrieves the head block of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
	glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)

	// Request the advertised remote head block and wait for the response
	go p.getBlocks([]common.Hash{p.head})

	timeout := time.After(blockSoftTTL)
	for {
		select {
		case <-d.cancelCh:
			return 0, errCancelBlockFetch

		case <-d.headerCh:
			// Out of bounds eth/62 block headers received, ignore them

		case <-d.bodyCh:
			// Out of bounds eth/62 block bodies received, ignore them

		case <-d.hashCh:
			// Out of bounds hashes received, ignore them

470
		case packet := <-d.blockCh:
471
			// Discard anything not from the origin peer
472 473
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId())
474 475 476
				break
			}
			// Make sure the peer actually gave something valid
477
			blocks := packet.(*blockPack).blocks
478 479 480 481 482 483 484 485 486 487 488 489 490
			if len(blocks) != 1 {
				glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
				return 0, errBadPeer
			}
			return blocks[0].NumberU64(), nil

		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head block timeout", p)
			return 0, errTimeout
		}
	}
}

491
// findAncestor61 tries to locate the common ancestor block of the local chain and
492 493
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
494 495 496
// In the rare scenario when we ended up on a long reorganization (i.e. none of
// the head blocks match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
497 498 499 500
	glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)

	// Request out head blocks to short circuit ancestor location
	head := d.headBlock().NumberU64()
501
	from := int64(head) - int64(MaxHashFetch) + 1
502 503 504 505 506 507 508 509 510 511 512 513 514 515
	if from < 0 {
		from = 0
	}
	go p.getAbsHashes(uint64(from), MaxHashFetch)

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
	timeout := time.After(hashTTL)

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
			return 0, errCancelHashFetch

516
		case packet := <-d.hashCh:
517
			// Discard anything not from the origin peer
518 519
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
520 521 522
				break
			}
			// Make sure the peer actually gave something valid
523
			hashes := packet.(*hashPack).hashes
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
			if len(hashes) == 0 {
				glog.V(logger.Debug).Infof("%v: empty head hash set", p)
				return 0, errEmptyHashSet
			}
			// Check if a common ancestor was found
			finished = true
			for i := len(hashes) - 1; i >= 0; i-- {
				if d.hasBlock(hashes[i]) {
					number, hash = uint64(from)+uint64(i), hashes[i]
					break
				}
			}

		case <-d.blockCh:
			// Out of bounds blocks received, ignore them

540 541 542 543 544 545
		case <-d.headerCh:
			// Out of bounds eth/62 block headers received, ignore them

		case <-d.bodyCh:
			// Out of bounds eth/62 block bodies received, ignore them

546 547 548 549 550 551 552
		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head hash timeout", p)
			return 0, errTimeout
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
553
		glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

		timeout := time.After(hashTTL)
		go p.getAbsHashes(uint64(check), 1)

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
				return 0, errCancelHashFetch

571
			case packet := <-d.hashCh:
572
				// Discard anything not from the origin peer
573 574
				if packet.PeerId() != p.id {
					glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
575 576 577
					break
				}
				// Make sure the peer actually gave something valid
578
				hashes := packet.(*hashPack).hashes
579 580 581 582 583 584 585 586 587 588 589 590 591
				if len(hashes) != 1 {
					glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
				block := d.getBlock(hashes[0])
				if block == nil {
					end = check
					break
				}
				if block.NumberU64() != check {
592
					glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
593 594 595 596 597 598 599
					return 0, errBadPeer
				}
				start = check

			case <-d.blockCh:
				// Out of bounds blocks received, ignore them

600 601 602 603 604 605
			case <-d.headerCh:
				// Out of bounds eth/62 block headers received, ignore them

			case <-d.bodyCh:
				// Out of bounds eth/62 block bodies received, ignore them

606 607 608 609 610 611 612 613 614
			case <-timeout:
				glog.V(logger.Debug).Infof("%v: search hash timeout", p)
				return 0, errTimeout
			}
		}
	}
	return start, nil
}

615
// fetchHashes61 keeps retrieving hashes from the requested number, until no more
616
// are returned, potentially throttling on the way.
617
func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
618 619 620
	glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)

	// Create a timeout timer, and the associated hash fetcher
621
	request := time.Now()       // time of the last fetch request
622 623 624 625 626
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

	getHashes := func(from uint64) {
627 628
		glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)

629
		go p.getAbsHashes(from, MaxHashFetch)
630
		request = time.Now()
631 632 633 634
		timeout.Reset(hashTTL)
	}
	// Start pulling hashes, until all are exhausted
	getHashes(from)
635 636
	gotHashes := false

637 638 639 640 641
	for {
		select {
		case <-d.cancelCh:
			return errCancelHashFetch

642 643 644 645 646 647
		case <-d.headerCh:
			// Out of bounds eth/62 block headers received, ignore them

		case <-d.bodyCh:
			// Out of bounds eth/62 block bodies received, ignore them

648
		case packet := <-d.hashCh:
649
			// Make sure the active peer is giving us the hashes
650 651
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
652 653
				break
			}
654
			hashReqTimer.UpdateSince(request)
655 656 657
			timeout.Stop()

			// If no more hashes are inbound, notify the block fetcher and return
658
			if packet.Items() == 0 {
659 660 661
				glog.V(logger.Debug).Infof("%v: no available hashes", p)

				select {
662
				case d.blockWakeCh <- false:
663 664
				case <-d.cancelCh:
				}
665 666 667 668 669 670 671 672 673 674 675 676
				// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
				// better chain compared to ours. The only exception is if it's promised blocks were
				// already imported by other means (e.g. fecher):
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new hashes up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
677
				if !gotHashes && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
678 679
					return errStallingPeer
				}
680 681
				return nil
			}
682
			gotHashes = true
683
			hashes := packet.(*hashPack).hashes
684

685
			// Otherwise insert all the new hashes, aborting in case of junk
686
			glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from)
687

688 689
			inserts := d.queue.Schedule61(hashes, true)
			if len(inserts) != len(hashes) {
690 691 692
				glog.V(logger.Debug).Infof("%v: stale hashes", p)
				return errBadPeer
			}
693
			// Notify the block fetcher of new hashes, but stop if queue is full
694
			if d.queue.PendingBlocks() < maxQueuedHashes {
695 696
				// We still have hashes to fetch, send continuation wake signal (potential)
				select {
697
				case d.blockWakeCh <- true:
698 699 700 701 702
				default:
				}
			} else {
				// Hash limit reached, send a termination wake signal (enforced)
				select {
703
				case d.blockWakeCh <- false:
704 705
				case <-d.cancelCh:
				}
706 707 708
				return nil
			}
			// Queue not yet full, fetch the next batch
709
			from += uint64(len(hashes))
710 711 712 713
			getHashes(from)

		case <-timeout.C:
			glog.V(logger.Debug).Infof("%v: hash request timed out", p)
714
			hashTimeoutMeter.Mark(1)
715 716 717 718 719
			return errTimeout
		}
	}
}

720
// fetchBlocks61 iteratively downloads the scheduled hashes, taking any available
721 722
// peers, reserving a chunk of blocks for each, waiting for delivery and also
// periodically checking for timeouts.
723
func (d *Downloader) fetchBlocks61(from uint64) error {
724 725 726 727 728 729 730 731 732
	glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
	defer glog.V(logger.Debug).Infof("Block download terminated")

	// Create a timeout timer for scheduling expiration tasks
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

733
	// Fetch blocks until the hash fetcher's done
734 735 736 737 738 739
	finished := false
	for {
		select {
		case <-d.cancelCh:
			return errCancelBlockFetch

740 741 742 743 744 745
		case <-d.headerCh:
			// Out of bounds eth/62 block headers received, ignore them

		case <-d.bodyCh:
			// Out of bounds eth/62 block bodies received, ignore them

746
		case packet := <-d.blockCh:
747 748
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
749
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
750
				// Deliver the received chunk of blocks, and demote in case of errors
751 752
				blocks := packet.(*blockPack).blocks
				err := d.queue.DeliverBlocks(peer.id, blocks)
753 754 755
				switch err {
				case nil:
					// If no blocks were delivered, demote the peer (need the delivery above)
756
					if len(blocks) == 0 {
757
						peer.Demote()
758
						peer.SetBlocksIdle()
759 760 761 762 763
						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
						break
					}
					// All was successful, promote the peer and potentially start processing
					peer.Promote()
764
					peer.SetBlocksIdle()
765
					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
766 767 768 769 770 771 772 773 774 775
					go d.process()

				case errInvalidChain:
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
					peer.Demote()
776
					peer.SetBlocksIdle()
777 778 779 780 781 782 783 784 785 786 787 788 789
					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: stale delivery", peer)

				default:
					// Peer did something semi-useful, demote but keep it around
					peer.Demote()
790
					peer.SetBlocksIdle()
791 792 793 794 795 796 797 798 799 800
					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
					go d.process()
				}
			}
			// Blocks arrived, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

801
		case cont := <-d.blockWakeCh:
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824
			// The hash fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Hashes arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
			// Check for block request timeouts and demote the responsible peers
825
			for _, pid := range d.queue.ExpireBlocks(blockHardTTL) {
826 827 828 829 830
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
				}
			}
831 832
			// If there's nothing more to fetch, wait or terminate
			if d.queue.PendingBlocks() == 0 {
833 834 835 836 837 838 839
				if d.queue.InFlight() == 0 && finished {
					glog.V(logger.Debug).Infof("Block fetching completed")
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
840
			throttled := false
841
			idles, total := d.peers.BlockIdlePeers()
842 843

			for _, peer := range idles {
844
				// Short circuit if throttling activated
845
				if d.queue.ThrottleBlocks() {
846
					throttled = true
847 848 849 850 851
					break
				}
				// Reserve a chunk of hashes for a peer. A nil can mean either that
				// no more hashes are available, or that the peer is known not to
				// have them.
852
				request := d.queue.ReserveBlocks(peer, peer.BlockCapacity())
853 854 855 856 857 858 859
				if request == nil {
					continue
				}
				if glog.V(logger.Detail) {
					glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
				}
				// Fetch the chunk and make sure any errors return the hashes to the queue
860
				if err := peer.Fetch61(request); err != nil {
861
					glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
862
					d.queue.CancelBlocks(request)
863 864 865 866
				}
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
867
			if !throttled && d.queue.InFlight() == 0 && len(idles) == total {
868 869 870 871 872 873
				return errPeersUnavailable
			}
		}
	}
}

874 875 876 877 878 879 880 881 882 883 884 885 886 887
// fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peer) (uint64, error) {
	glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)

	// Request the advertised remote head block and wait for the response
	go p.getRelHeaders(p.head, 1, 0, false)

	timeout := time.After(headerTTL)
	for {
		select {
		case <-d.cancelCh:
			return 0, errCancelBlockFetch

888
		case packet := <-d.headerCh:
889
			// Discard anything not from the origin peer
890 891
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
892 893 894
				break
			}
			// Make sure the peer actually gave something valid
895
			headers := packet.(*headerPack).headers
896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917
			if len(headers) != 1 {
				glog.V(logger.Debug).Infof("%v: invalid number of head headers: %d != 1", p, len(headers))
				return 0, errBadPeer
			}
			return headers[0].Number.Uint64(), nil

		case <-d.bodyCh:
			// Out of bounds block bodies received, ignore them

		case <-d.hashCh:
			// Out of bounds eth/61 hashes received, ignore them

		case <-d.blockCh:
			// Out of bounds eth/61 blocks received, ignore them

		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head header timeout", p)
			return 0, errTimeout
		}
	}
}

918
// findAncestor tries to locate the common ancestor link of the local chain and
919
// a remote peers blockchain. In the general case when our node was in sync and
920
// on the correct chain, checking the top N links should already get us a match.
921
// In the rare scenario when we ended up on a long reorganization (i.e. none of
922
// the head links match), we do a binary search to find the common ancestor.
923 924 925
func (d *Downloader) findAncestor(p *peer) (uint64, error) {
	glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)

926 927 928 929
	// Request our head headers to short circuit ancestor location
	head := d.headHeader().Number.Uint64()
	if d.mode == FullSync {
		head = d.headBlock().NumberU64()
930 931
	} else if d.mode == FastSync {
		head = d.headFastBlock().NumberU64()
932
	}
933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
	from := int64(head) - int64(MaxHeaderFetch) + 1
	if from < 0 {
		from = 0
	}
	go p.getAbsHeaders(uint64(from), MaxHeaderFetch, 0, false)

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
	timeout := time.After(hashTTL)

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
			return 0, errCancelHashFetch

948
		case packet := <-d.headerCh:
949
			// Discard anything not from the origin peer
950 951
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packet.PeerId())
952 953 954
				break
			}
			// Make sure the peer actually gave something valid
955
			headers := packet.(*headerPack).headers
956 957 958 959 960 961 962
			if len(headers) == 0 {
				glog.V(logger.Debug).Infof("%v: empty head header set", p)
				return 0, errEmptyHeaderSet
			}
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
963
				if (d.mode != LightSync && d.hasBlock(headers[i].Hash())) || (d.mode == LightSync && d.hasHeader(headers[i].Hash())) {
964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002
					number, hash = headers[i].Number.Uint64(), headers[i].Hash()
					break
				}
			}

		case <-d.bodyCh:
			// Out of bounds block bodies received, ignore them

		case <-d.hashCh:
			// Out of bounds eth/61 hashes received, ignore them

		case <-d.blockCh:
			// Out of bounds eth/61 blocks received, ignore them

		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head header timeout", p)
			return 0, errTimeout
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
		glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

		timeout := time.After(hashTTL)
		go p.getAbsHeaders(uint64(check), 1, 0, false)

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
				return 0, errCancelHashFetch

1003
			case packer := <-d.headerCh:
1004
				// Discard anything not from the origin peer
1005 1006
				if packer.PeerId() != p.id {
					glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", packer.PeerId())
1007 1008 1009
					break
				}
				// Make sure the peer actually gave something valid
1010
				headers := packer.(*headerPack).headers
1011 1012 1013 1014 1015 1016 1017
				if len(headers) != 1 {
					glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
1018
				if (d.mode == FullSync && !d.hasBlock(headers[0].Hash())) || (d.mode != FullSync && !d.hasHeader(headers[0].Hash())) {
1019 1020 1021
					end = check
					break
				}
1022 1023 1024
				header := d.getHeader(headers[0].Hash()) // Independent of sync mode, header surely exists
				if header.Number.Uint64() != check {
					glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, header.Number, header.Hash().Bytes()[:4], check)
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
					return 0, errBadPeer
				}
				start = check

			case <-d.bodyCh:
				// Out of bounds block bodies received, ignore them

			case <-d.hashCh:
				// Out of bounds eth/61 hashes received, ignore them

			case <-d.blockCh:
				// Out of bounds eth/61 blocks received, ignore them

			case <-timeout:
				glog.V(logger.Debug).Infof("%v: search header timeout", p)
				return 0, errTimeout
			}
		}
	}
	return start, nil
}

// fetchHeaders keeps retrieving headers from the requested number, until no more
// are returned, potentially throttling on the way.
1049 1050 1051
//
// The queue parameter can be used to switch between queuing headers for block
// body download too, or directly import as pure header chains.
1052 1053 1054 1055 1056
func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
	glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from)
	defer glog.V(logger.Debug).Infof("%v: header download terminated", p)

	// Create a timeout timer, and the associated hash fetcher
1057
	request := time.Now()       // time of the last fetch request
1058 1059 1060 1061 1062 1063 1064 1065
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

	getHeaders := func(from uint64) {
		glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from)

		go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
1066
		request = time.Now()
1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
		timeout.Reset(headerTTL)
	}
	// Start pulling headers, until all are exhausted
	getHeaders(from)
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderFetch

		case <-d.hashCh:
			// Out of bounds eth/61 hashes received, ignore them

		case <-d.blockCh:
			// Out of bounds eth/61 blocks received, ignore them

1084
		case packet := <-d.headerCh:
1085
			// Make sure the active peer is giving us the headers
1086 1087
			if packet.PeerId() != p.id {
				glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", packet.PeerId())
1088 1089
				break
			}
1090
			headerReqTimer.UpdateSince(request)
1091 1092
			timeout.Stop()

1093
			// If no more headers are inbound, notify the content fetchers and return
1094
			if packet.Items() == 0 {
1095 1096
				glog.V(logger.Debug).Infof("%v: no available headers", p)

1097
				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
1098 1099 1100 1101
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
				}
				// If no headers were retrieved at all, the peer violated it's TD promise that it had a
				// better chain compared to ours. The only exception is if it's promised blocks were
				// already imported by other means (e.g. fecher):
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
1115
				if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
1116 1117 1118 1119 1120
					return errStallingPeer
				}
				return nil
			}
			gotHeaders = true
1121
			headers := packet.(*headerPack).headers
1122 1123

			// Otherwise insert all the new headers, aborting in case of junk
1124
			glog.V(logger.Detail).Infof("%v: schedule %d headers from #%d", p, len(headers), from)
1125

1126
			if d.mode == FastSync || d.mode == LightSync {
1127 1128
				if n, err := d.insertHeaders(headers, false); err != nil {
					glog.V(logger.Debug).Infof("%v: invalid header #%d [%x…]: %v", p, headers[n].Number, headers[n].Hash().Bytes()[:4], err)
1129 1130 1131
					return errInvalidChain
				}
			}
1132
			if d.mode == FullSync || d.mode == FastSync {
1133 1134
				inserts := d.queue.Schedule(headers, from)
				if len(inserts) != len(headers) {
1135 1136
					glog.V(logger.Debug).Infof("%v: stale headers", p)
					return errBadPeer
1137
				}
1138 1139 1140
			}
			// Notify the content fetchers of new headers, but stop if queue is full
			cont := d.queue.PendingBlocks() < maxQueuedHeaders || d.queue.PendingReceipts() < maxQueuedHeaders
1141
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154
				if cont {
					// We still have headers to fetch, send continuation wake signal (potential)
					select {
					case ch <- true:
					default:
					}
				} else {
					// Header limit reached, send a termination wake signal (enforced)
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
					return nil
1155
				}
1156 1157
			}
			// Queue not yet full, fetch the next batch
1158
			from += uint64(len(headers))
1159 1160 1161 1162 1163
			getHeaders(from)

		case <-timeout.C:
			// Header retrieval timed out, consider the peer bad and drop
			glog.V(logger.Debug).Infof("%v: header request timed out", p)
1164
			headerTimeoutMeter.Mark(1)
1165 1166 1167
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
1168
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
1169 1170 1171 1172
				select {
				case ch <- false:
				case <-d.cancelCh:
				}
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184
			}
			return nil
		}
	}
}

// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
	glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)

1185
	var (
1186
		deliver = func(packet dataPack) error {
1187
			pack := packet.(*bodyPack)
1188
			return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
1189
		}
1190
		expire   = func() []string { return d.queue.ExpireBodies(bodyHardTTL) }
1191 1192
		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) }
		capacity = func(p *peer) int { return p.BlockCapacity() }
1193
		getIdles = func() ([]*peer, int) { return d.peers.BodyIdlePeers() }
1194 1195
		setIdle  = func(p *peer) { p.SetBlocksIdle() }
	)
1196 1197 1198
	err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
		d.queue.PendingBlocks, d.queue.ThrottleBlocks, d.queue.ReserveBodies, d.bodyFetchHook,
		fetch, d.queue.CancelBodies, capacity, getIdles, setIdle, "Body")
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210

	glog.V(logger.Debug).Infof("Block body download terminated: %v", err)
	return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
	glog.V(logger.Debug).Infof("Downloading receipts from #%d", from)

	var (
1211
		deliver = func(packet dataPack) error {
1212 1213 1214 1215 1216 1217 1218 1219
			pack := packet.(*receiptPack)
			return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
		}
		expire   = func() []string { return d.queue.ExpireReceipts(bodyHardTTL) }
		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) }
		capacity = func(p *peer) int { return p.ReceiptCapacity() }
		setIdle  = func(p *peer) { p.SetReceiptsIdle() }
	)
1220
	err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
1221 1222 1223 1224 1225 1226 1227
		d.queue.PendingReceipts, d.queue.ThrottleReceipts, d.queue.ReserveReceipts, d.receiptFetchHook,
		fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "Receipt")

	glog.V(logger.Debug).Infof("Receipt download terminated: %v", err)
	return err
}

1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
// fetchNodeData iteratively downloads the scheduled state trie nodes, taking any
// available peers, reserving a chunk of nodes for each, waiting for delivery and
// also periodically checking for timeouts.
func (d *Downloader) fetchNodeData() error {
	glog.V(logger.Debug).Infof("Downloading node state data")

	var (
		deliver = func(packet dataPack) error {
			start := time.Now()
			done, found, err := d.queue.DeliverNodeData(packet.PeerId(), packet.(*statePack).states)

			d.syncStatsLock.Lock()
			totalDone, totalKnown := d.syncStatsStateDone+uint64(done), d.syncStatsStateTotal+uint64(found)
			d.syncStatsStateDone, d.syncStatsStateTotal = totalDone, totalKnown
			d.syncStatsLock.Unlock()

			glog.V(logger.Info).Infof("imported %d [%d / %d] state entries in %v.", done, totalDone, totalKnown, time.Since(start))
			return err
		}
		expire   = func() []string { return d.queue.ExpireNodeData(stateHardTTL) }
		throttle = func() bool { return false }
		reserve  = func(p *peer, count int) (*fetchRequest, bool, error) {
			return d.queue.ReserveNodeData(p, count), false, nil
		}
		fetch    = func(p *peer, req *fetchRequest) error { return p.FetchNodeData(req) }
		capacity = func(p *peer) int { return p.NodeDataCapacity() }
		setIdle  = func(p *peer) { p.SetNodeDataIdle() }
	)
	err := d.fetchParts(errCancelReceiptFetch, d.stateCh, deliver, d.stateWakeCh, expire,
		d.queue.PendingNodeData, throttle, reserve, nil, fetch, d.queue.CancelNodeData,
		capacity, d.peers.ReceiptIdlePeers, setIdle, "State")

	glog.V(logger.Debug).Infof("Node state data download terminated: %v", err)
	return err
}

1264 1265 1266
// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
1267
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(packet dataPack) error, wakeCh chan bool,
1268 1269 1270 1271
	expire func() []string, pending func() int, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), fetchHook func([]*types.Header),
	fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, idle func() ([]*peer, int), setIdle func(*peer), kind string) error {

	// Create a ticker to detect expired retreival tasks
1272 1273 1274 1275 1276
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

1277
	// Prepare the queue and fetch block parts until the block header fetcher's done
1278 1279 1280 1281
	finished := false
	for {
		select {
		case <-d.cancelCh:
1282
			return errCancel
1283 1284 1285 1286 1287 1288 1289

		case <-d.hashCh:
			// Out of bounds eth/61 hashes received, ignore them

		case <-d.blockCh:
			// Out of bounds eth/61 blocks received, ignore them

1290
		case packet := <-deliveryCh:
1291 1292
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
1293 1294 1295
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
				// Deliver the received chunk of data, and demote in case of errors
				switch err := deliver(packet); err {
1296
				case nil:
1297
					// If no blocks were delivered, demote the peer (need the delivery above to clean internal queue!)
1298
					if packet.Items() == 0 {
1299
						peer.Demote()
1300 1301
						setIdle(peer)
						glog.V(logger.Detail).Infof("%s: no %s delivered", peer, strings.ToLower(kind))
1302 1303 1304 1305
						break
					}
					// All was successful, promote the peer and potentially start processing
					peer.Promote()
1306 1307
					setIdle(peer)
					glog.V(logger.Detail).Infof("%s: delivered %s %s(s)", peer, packet.Stats(), strings.ToLower(kind))
1308 1309 1310 1311 1312 1313
					go d.process()

				case errInvalidChain:
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

1314
				case errInvalidBody, errInvalidReceipt:
1315
					// The peer delivered something very bad, drop immediately
1316
					glog.V(logger.Error).Infof("%s: delivered invalid %s, dropping", peer, strings.ToLower(kind))
1317 1318 1319 1320 1321 1322
					d.dropPeer(peer.id)

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
					peer.Demote()
1323 1324
					setIdle(peer)
					glog.V(logger.Detail).Infof("%s: out of bound %s delivery", peer, strings.ToLower(kind))
1325 1326 1327 1328 1329 1330 1331

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
1332
					glog.V(logger.Detail).Infof("%s: %s stale delivery", peer, strings.ToLower(kind))
1333 1334 1335 1336

				default:
					// Peer did something semi-useful, demote but keep it around
					peer.Demote()
1337 1338
					setIdle(peer)
					glog.V(logger.Detail).Infof("%s: %s delivery partially failed: %v", peer, strings.ToLower(kind), err)
1339 1340 1341 1342 1343 1344 1345 1346 1347
					go d.process()
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

1348
		case cont := <-wakeCh:
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
1371 1372
			// Check for fetch request timeouts and demote the responsible peers
			for _, pid := range expire() {
1373 1374
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
1375
					setIdle(peer)
1376
					glog.V(logger.Detail).Infof("%s: %s delivery timeout", peer, strings.ToLower(kind))
1377 1378
				}
			}
1379 1380
			// If there's nothing more to fetch, wait or terminate
			if pending() == 0 {
1381
				if d.queue.InFlight() == 0 && finished {
1382
					glog.V(logger.Debug).Infof("%s fetching completed", kind)
1383 1384 1385 1386 1387
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
1388 1389 1390 1391
			progressed, throttled := false, false
			idles, total := idle()

			for _, peer := range idles {
1392
				// Short circuit if throttling activated
1393
				if throttle() {
1394 1395 1396
					throttled = true
					break
				}
1397 1398
				// Reserve a chunk of fetches for a peer. A nil can mean either that
				// no more headers are available, or that the peer is known not to
1399
				// have them.
1400
				request, progress, err := reserve(peer, capacity(peer))
1401 1402 1403
				if err != nil {
					return err
				}
1404 1405
				if progress {
					progressed = true
1406 1407 1408 1409 1410 1411
					go d.process()
				}
				if request == nil {
					continue
				}
				if glog.V(logger.Detail) {
1412 1413 1414 1415 1416
					if len(request.Headers) > 0 {
						glog.Infof("%s: requesting %d %s(s), first at #%d", peer, len(request.Headers), strings.ToLower(kind), request.Headers[0].Number)
					} else {
						glog.Infof("%s: requesting %d %s(s)", peer, len(request.Hashes), strings.ToLower(kind))
					}
1417 1418
				}
				// Fetch the chunk and make sure any errors return the hashes to the queue
1419 1420
				if fetchHook != nil {
					fetchHook(request.Headers)
1421
				}
1422 1423 1424
				if err := fetch(peer, request); err != nil {
					glog.V(logger.Error).Infof("%v: %s fetch failed, rescheduling", peer, strings.ToLower(kind))
					cancel(request)
1425 1426 1427 1428
				}
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
1429
			if !progressed && !throttled && d.queue.InFlight() == 0 && len(idles) == total {
1430 1431 1432 1433 1434 1435
				return errPeersUnavailable
			}
		}
	}
}

1436 1437 1438
// process takes fetch results from the queue and tries to import them into the
// chain. The type of import operation will depend on the result contents:
//  -
1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451
//
// The algorithmic flow is as follows:
//  - The `processing` flag is swapped to 1 to ensure singleton access
//  - The current `cancel` channel is retrieved to detect sync abortions
//  - Blocks are iteratively taken from the cache and inserted into the chain
//  - When the cache becomes empty, insertion stops
//  - The `processing` flag is swapped back to 0
//  - A post-exit check is made whether new blocks became available
//     - This step is important: it handles a potential race condition between
//       checking for no more work, and releasing the processing "mutex". In
//       between these state changes, a block may have arrived, but a processing
//       attempt denied, so we need to re-enter to ensure the block isn't left
//       to idle in the cache.
1452
func (d *Downloader) process() {
1453 1454 1455 1456 1457 1458
	// Make sure only one goroutine is ever allowed to process blocks at once
	if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
		return
	}
	// If the processor just exited, but there are freshly pending items, try to
	// reenter. This is needed because the goroutine spinned up for processing
1459
	// the fresh results might have been rejected entry to to this present thread
1460 1461
	// not yet releasing the `processing` state.
	defer func() {
1462
		if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadResult() != nil {
1463
			d.process()
1464 1465
		}
	}()
1466
	// Release the lock upon exit (note, before checking for reentry!)
1467
	// the import statistics to zero.
1468
	defer atomic.StoreInt32(&d.processing, 0)
1469

1470
	// Repeat the processing as long as there are results to process
1471
	for {
1472 1473 1474
		// Fetch the next batch of results
		results := d.queue.TakeResults()
		if len(results) == 0 {
1475
			return
1476
		}
1477
		if d.chainInsertHook != nil {
1478
			d.chainInsertHook(results)
1479
		}
1480
		// Actually import the blocks
1481 1482
		if glog.V(logger.Debug) {
			first, last := results[0].Header, results[len(results)-1].Header
1483
			glog.Infof("Inserting chain with %d items (#%d [%x…] - #%d [%x…])", len(results), first.Number, first.Hash().Bytes()[:4], last.Number, last.Hash().Bytes()[:4])
1484 1485
		}
		for len(results) != 0 {
1486
			// Check for any termination requests
1487
			if atomic.LoadInt32(&d.interrupt) == 1 {
1488
				return
1489
			}
1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502
			// Retrieve the a batch of results to import
			var (
				headers  = make([]*types.Header, 0, maxResultsProcess)
				blocks   = make([]*types.Block, 0, maxResultsProcess)
				receipts = make([]types.Receipts, 0, maxResultsProcess)
			)
			items := int(math.Min(float64(len(results)), float64(maxResultsProcess)))
			for _, result := range results[:items] {
				switch {
				case d.mode == FullSync:
					blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
				case d.mode == FastSync:
					blocks = append(blocks, types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles))
1503 1504 1505
					if result.Header.Number.Uint64() <= d.queue.fastSyncPivot {
						receipts = append(receipts, result.Receipts)
					}
1506 1507 1508 1509 1510 1511 1512 1513 1514 1515
				case d.mode == LightSync:
					headers = append(headers, result.Header)
				}
			}
			// Try to process the results, aborting if there's an error
			var (
				err   error
				index int
			)
			switch {
1516
			case len(headers) > 0:
1517
				index, err = d.insertHeaders(headers, true)
1518 1519 1520 1521 1522 1523 1524 1525

			case len(receipts) > 0:
				index, err = d.insertReceipts(blocks, receipts)
				if err == nil && blocks[len(blocks)-1].NumberU64() == d.queue.fastSyncPivot {
					err = d.commitHeadBlock(blocks[len(blocks)-1].Hash())
				}
			default:
				index, err = d.insertBlocks(blocks)
1526 1527
			}
			if err != nil {
1528
				glog.V(logger.Debug).Infof("Result #%d [%x…] processing failed: %v", results[index].Header.Number, results[index].Header.Hash().Bytes()[:4], err)
1529
				d.cancel()
1530
				return
1531
			}
1532 1533
			// Shift the results to the next batch
			results = results[items:]
1534 1535 1536 1537
		}
	}
}

1538
// DeliverHashes injects a new batch of hashes received from a remote node into
1539 1540
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
1541 1542
func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) {
	return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter)
1543 1544
}

1545
// DeliverBlocks injects a new batch of blocks received from a remote node.
1546
// This is usually invoked through the BlocksMsg by the protocol handler.
1547 1548
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) {
	return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter)
O
moved  
obscuren 已提交
1549 1550
}

1551 1552
// DeliverHeaders injects a new batch of blck headers received from a remote
// node into the download schedule.
1553
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {
1554
	return d.deliver(id, d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
1555 1556 1557
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
1558
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) (err error) {
1559
	return d.deliver(id, d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
1560 1561 1562 1563
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) (err error) {
1564 1565 1566 1567 1568 1569 1570 1571 1572 1573
	return d.deliver(id, d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}

// DeliverNodeData injects a new batch of node state data received from a remote node.
func (d *Downloader) DeliverNodeData(id string, data [][]byte) (err error) {
	return d.deliver(id, d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}

// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(id string, destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
1574
	// Update the delivery metrics for both good and failed deliveries
1575
	inMeter.Mark(int64(packet.Items()))
1576 1577
	defer func() {
		if err != nil {
1578
			dropMeter.Mark(int64(packet.Items()))
1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590
		}
	}()
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()

	select {
1591
	case destCh <- packet:
1592 1593 1594 1595 1596
		return nil

	case <-cancel:
		return errNoSyncActive
	}
1597
}