downloader.go 43.3 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"errors"
22
	"math"
23
	"math/big"
24 25 26 27 28 29
	"sync"
	"sync/atomic"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
30
	"github.com/ethereum/go-ethereum/event"
31 32 33 34
	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
)

35
const (
36 37
	eth61 = 61 // Constant to check for old protocol support
	eth62 = 62 // Constant to check for new protocol support
38 39
)

40
var (
41 42
	MaxHashFetch     = 512 // Amount of hashes to be fetched per retrieval request
	MaxBlockFetch    = 128 // Amount of blocks to be fetched per retrieval request
43 44
	MaxHeaderFetch   = 192 // Amount of block headers to be fetched per retrieval request
	MaxBodyFetch     = 128 // Amount of block bodies to be fetched per retrieval request
45 46
	MaxStateFetch    = 384 // Amount of node state values to allow fetching per request
	MaxReceiptsFetch = 384 // Amount of transaction receipts to allow fetching per request
47

48 49 50 51 52 53
	hashTTL      = 5 * time.Second  // [eth/61] Time it takes for a hash request to time out
	blockSoftTTL = 3 * time.Second  // [eth/61] Request completion threshold for increasing or decreasing a peer's bandwidth
	blockHardTTL = 3 * blockSoftTTL // [eth/61] Maximum time allowance before a block request is considered expired
	headerTTL    = 5 * time.Second  // [eth/62] Time it takes for a header request to time out
	bodySoftTTL  = 3 * time.Second  // [eth/62] Request completion threshold for increasing or decreasing a peer's bandwidth
	bodyHardTTL  = 3 * bodySoftTTL  // [eth/62] Maximum time allowance before a block body request is considered expired
54

55 56 57
	maxQueuedHashes  = 256 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
	maxQueuedHeaders = 256 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
	maxBlockProcess  = 256        // Number of blocks to import at once into the chain
58
)
59

60
var (
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
	errBusy              = errors.New("busy")
	errUnknownPeer       = errors.New("peer is unknown or unhealthy")
	errBadPeer           = errors.New("action from bad peer ignored")
	errStallingPeer      = errors.New("peer is stalling")
	errNoPeers           = errors.New("no peers to keep download active")
	errPendingQueue      = errors.New("pending items in queue")
	errTimeout           = errors.New("timeout")
	errEmptyHashSet      = errors.New("empty hash set by peer")
	errEmptyHeaderSet    = errors.New("empty header set by peer")
	errPeersUnavailable  = errors.New("no peers available or all peers tried for block download process")
	errAlreadyInPool     = errors.New("hash already in pool")
	errInvalidChain      = errors.New("retrieved hash chain is invalid")
	errInvalidBody       = errors.New("retrieved block body is invalid")
	errCancelHashFetch   = errors.New("hash fetching canceled (requested)")
	errCancelBlockFetch  = errors.New("block downloading canceled (requested)")
	errCancelHeaderFetch = errors.New("block header fetching canceled (requested)")
	errCancelBodyFetch   = errors.New("block body downloading canceled (requested)")
	errNoSyncActive      = errors.New("no sync active")
79 80
)

81
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
82
type hashCheckFn func(common.Hash) bool
83 84 85 86

// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block

87 88 89
// headRetrievalFn is a callback type for retrieving the head block from the local chain.
type headRetrievalFn func() *types.Block

90 91 92
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type chainInsertFn func(types.Blocks) (int, error)

93 94
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
95

96 97 98 99 100 101 102
// hashPack is a batch of block hashes returned by a peer (eth/61).
type hashPack struct {
	peerId string
	hashes []common.Hash
}

// blockPack is a batch of blocks returned by a peer (eth/61).
O
obscuren 已提交
103 104 105 106 107
type blockPack struct {
	peerId string
	blocks []*types.Block
}

108 109 110 111
// headerPack is a batch of block headers returned by a peer.
type headerPack struct {
	peerId  string
	headers []*types.Header
O
obscuren 已提交
112 113
}

114 115 116 117 118
// bodyPack is a batch of block bodies returned by a peer.
type bodyPack struct {
	peerId       string
	transactions [][]*types.Transaction
	uncles       [][]*types.Header
119 120
}

121
type Downloader struct {
122 123
	mux *event.TypeMux

124 125
	queue *queue   // Scheduler for selecting the hashes to download
	peers *peerSet // Set of active peers from which download can proceed
126

127 128
	interrupt int32 // Atomic boolean to signal termination

129
	// Statistics
130 131 132
	importStart time.Time // Instance when the last blocks were taken from the cache
	importQueue []*Block  // Previously taken blocks to check import progress
	importDone  int       // Number of taken blocks already imported from the last batch
133 134
	importLock  sync.Mutex

135
	// Callbacks
136 137
	hasBlock    hashCheckFn      // Checks if a block is present in the chain
	getBlock    blockRetrievalFn // Retrieves a block from the chain
138
	headBlock   headRetrievalFn  // Retrieves the head block from the chain
139
	insertChain chainInsertFn    // Injects a batch of blocks into the chain
140
	dropPeer    peerDropFn       // Drops a peer for misbehaving
141

142
	// Status
143 144
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
145
	processing      int32
146
	notified        int32
147 148 149

	// Channels
	newPeerCh chan *peer
150 151 152 153 154
	hashCh    chan hashPack   // [eth/61] Channel receiving inbound hashes
	blockCh   chan blockPack  // [eth/61] Channel receiving inbound blocks
	headerCh  chan headerPack // [eth/62] Channel receiving inbound block headers
	bodyCh    chan bodyPack   // [eth/62] Channel receiving inbound block bodies
	processCh chan bool       // Channel to signal the block fetcher of new or finished work
155 156 157

	cancelCh   chan struct{} // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex  // Lock to protect the cancel channel in delivers
158 159 160 161

	// Testing hooks
	bodyFetchHook   func([]*types.Header) // Method to call upon starting a block body fetch
	chainInsertHook func([]*Block)        // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
162 163
}

164 165 166 167 168 169
// Block is an origin-tagged blockchain block.
type Block struct {
	RawBlock   *types.Block
	OriginPeer string
}

170
// New creates a new downloader to fetch hashes and blocks from remote peers.
171
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader {
172
	return &Downloader{
173 174 175 176 177
		mux:         mux,
		queue:       newQueue(),
		peers:       newPeerSet(),
		hasBlock:    hasBlock,
		getBlock:    getBlock,
178
		headBlock:   headBlock,
179 180 181 182 183
		insertChain: insertChain,
		dropPeer:    dropPeer,
		newPeerCh:   make(chan *peer, 1),
		hashCh:      make(chan hashPack, 1),
		blockCh:     make(chan blockPack, 1),
184 185
		headerCh:    make(chan headerPack, 1),
		bodyCh:      make(chan bodyPack, 1),
186
		processCh:   make(chan bool, 1),
187 188 189
	}
}

190
// Stats retrieves the current status of the downloader.
191
func (d *Downloader) Stats() (pending int, cached int, importing int, estimate time.Duration) {
192 193 194
	// Fetch the download status
	pending, cached = d.queue.Size()

195
	// Figure out the import progress
196 197 198
	d.importLock.Lock()
	defer d.importLock.Unlock()

199
	for len(d.importQueue) > 0 && d.hasBlock(d.importQueue[0].RawBlock.Hash()) {
200
		d.importQueue = d.importQueue[1:]
201
		d.importDone++
202 203 204
	}
	importing = len(d.importQueue)

205 206 207 208 209
	// Make an estimate on the total sync
	estimate = 0
	if d.importDone > 0 {
		estimate = time.Since(d.importStart) / time.Duration(d.importDone) * time.Duration(pending+cached+importing)
	}
210
	return
O
obscuren 已提交
211 212
}

213
// Synchronising returns whether the downloader is currently retrieving blocks.
214 215 216 217
func (d *Downloader) Synchronising() bool {
	return atomic.LoadInt32(&d.synchronising) > 0
}

218 219
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
220 221 222 223
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
	getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
	getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) error {

224
	glog.V(logger.Detail).Infoln("Registering peer", id)
225
	if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies)); err != nil {
226 227 228
		glog.V(logger.Error).Infoln("Register failed:", err)
		return err
	}
229 230 231
	return nil
}

232 233 234 235 236 237 238 239 240
// UnregisterPeer remove a peer from the known list, preventing any action from
// the specified peer.
func (d *Downloader) UnregisterPeer(id string) error {
	glog.V(logger.Detail).Infoln("Unregistering peer", id)
	if err := d.peers.Unregister(id); err != nil {
		glog.V(logger.Error).Infoln("Unregister failed:", err)
		return err
	}
	return nil
241 242
}

243 244
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
245
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int) {
246
	glog.V(logger.Detail).Infof("Attempting synchronisation: %v, head [%x…], TD %v", id, head[:4], td)
247

248
	switch err := d.synchronise(id, head, td); err {
249 250 251 252 253 254
	case nil:
		glog.V(logger.Detail).Infof("Synchronisation completed")

	case errBusy:
		glog.V(logger.Detail).Infof("Synchronisation already in progress")

255
	case errTimeout, errBadPeer, errStallingPeer, errEmptyHashSet, errEmptyHeaderSet, errPeersUnavailable, errInvalidChain:
256 257 258 259 260 261 262 263 264 265 266 267
		glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
		d.dropPeer(id)

	case errPendingQueue:
		glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)

	default:
		glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
	}
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
268
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
269
// checks fail an error will be returned. This method is synchronous
270
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int) error {
271 272 273 274
	// Mock out the synchonisation if testing
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
275
	// Make sure only one goroutine is ever allowed past this point at once
276
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
277
		return errBusy
278
	}
279
	defer atomic.StoreInt32(&d.synchronising, 0)
280

281 282 283 284
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
		glog.V(logger.Info).Infoln("Block synchronisation started")
	}
285
	// Abort if the queue still contains some leftover data
286
	if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
287
		return errPendingQueue
288
	}
289
	// Reset the queue and peer set to clean any internal leftover state
290
	d.queue.Reset()
291
	d.peers.Reset()
292

293 294 295 296 297
	// Create cancel channel for aborting mid-flight
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
	d.cancelLock.Unlock()

298
	// Retrieve the origin peer and initiate the downloading process
299
	p := d.peers.Peer(id)
300
	if p == nil {
301
		return errUnknownPeer
302
	}
303
	return d.syncWithPeer(p, hash, td)
304 305
}

306 307
// Has checks if the downloader knows about a particular hash, meaning that its
// either already downloaded of pending retrieval.
308
func (d *Downloader) Has(hash common.Hash) bool {
309
	return d.queue.Has(hash)
310 311
}

312 313
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
314
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
315
	d.mux.Post(StartEvent{})
316 317 318
	defer func() {
		// reset on error
		if err != nil {
319
			d.cancel()
320 321 322
			d.mux.Post(FailedEvent{err})
		} else {
			d.mux.Post(DoneEvent{})
323 324
		}
	}()
325

326 327 328 329 330
	glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
	defer glog.V(logger.Debug).Infof("Synchronisation terminated")

	switch {
	case p.version == eth61:
331
		// Old eth/61, use forward, concurrent hash and block retrieval algorithm
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
		number, err := d.findAncestor61(p)
		if err != nil {
			return err
		}
		errc := make(chan error, 2)
		go func() { errc <- d.fetchHashes61(p, td, number+1) }()
		go func() { errc <- d.fetchBlocks61(number + 1) }()

		// If any fetcher fails, cancel the other
		if err := <-errc; err != nil {
			d.cancel()
			<-errc
			return err
		}
		return <-errc

	case p.version >= eth62:
		// New eth/62, use forward, concurrent header and block body retrieval algorithm
350 351 352 353 354
		number, err := d.findAncestor(p)
		if err != nil {
			return err
		}
		errc := make(chan error, 2)
355 356
		go func() { errc <- d.fetchHeaders(p, td, number+1) }()
		go func() { errc <- d.fetchBodies(number + 1) }()
357 358 359 360 361 362 363 364 365 366 367 368 369

		// If any fetcher fails, cancel the other
		if err := <-errc; err != nil {
			d.cancel()
			<-errc
			return err
		}
		return <-errc

	default:
		// Something very wrong, stop right here
		glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
		return errBadPeer
370
	}
371
	return nil
372 373
}

374
// cancel cancels all of the operations and resets the queue. It returns true
375
// if the cancel operation was completed.
376
func (d *Downloader) cancel() {
377
	// Close the current cancel channel
378
	d.cancelLock.Lock()
379 380 381 382 383 384 385
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
386 387
	}
	d.cancelLock.Unlock()
388

389
	// Reset the queue
390 391 392
	d.queue.Reset()
}

393 394 395 396 397 398
// Terminate interrupts the downloader, canceling all pending operations.
func (d *Downloader) Terminate() {
	atomic.StoreInt32(&d.interrupt, 1)
	d.cancel()
}

399
// findAncestor61 tries to locate the common ancestor block of the local chain and
400 401
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
402 403 404
// In the rare scenario when we ended up on a long reorganization (i.e. none of
// the head blocks match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor61(p *peer) (uint64, error) {
405 406 407 408
	glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)

	// Request out head blocks to short circuit ancestor location
	head := d.headBlock().NumberU64()
409
	from := int64(head) - int64(MaxHashFetch) + 1
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447
	if from < 0 {
		from = 0
	}
	go p.getAbsHashes(uint64(from), MaxHashFetch)

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
	timeout := time.After(hashTTL)

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
			return 0, errCancelHashFetch

		case hashPack := <-d.hashCh:
			// Discard anything not from the origin peer
			if hashPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
				break
			}
			// Make sure the peer actually gave something valid
			hashes := hashPack.hashes
			if len(hashes) == 0 {
				glog.V(logger.Debug).Infof("%v: empty head hash set", p)
				return 0, errEmptyHashSet
			}
			// Check if a common ancestor was found
			finished = true
			for i := len(hashes) - 1; i >= 0; i-- {
				if d.hasBlock(hashes[i]) {
					number, hash = uint64(from)+uint64(i), hashes[i]
					break
				}
			}

		case <-d.blockCh:
			// Out of bounds blocks received, ignore them

448 449 450 451 452 453
		case <-d.headerCh:
			// Out of bounds eth/62 block headers received, ignore them

		case <-d.bodyCh:
			// Out of bounds eth/62 block bodies received, ignore them

454 455 456 457 458 459 460
		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head hash timeout", p)
			return 0, errTimeout
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
461
		glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

		timeout := time.After(hashTTL)
		go p.getAbsHashes(uint64(check), 1)

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
				return 0, errCancelHashFetch

			case hashPack := <-d.hashCh:
				// Discard anything not from the origin peer
				if hashPack.peerId != p.id {
					glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
					break
				}
				// Make sure the peer actually gave something valid
				hashes := hashPack.hashes
				if len(hashes) != 1 {
					glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
				block := d.getBlock(hashes[0])
				if block == nil {
					end = check
					break
				}
				if block.NumberU64() != check {
500
					glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
501 502 503 504 505 506 507
					return 0, errBadPeer
				}
				start = check

			case <-d.blockCh:
				// Out of bounds blocks received, ignore them

508 509 510 511 512 513
			case <-d.headerCh:
				// Out of bounds eth/62 block headers received, ignore them

			case <-d.bodyCh:
				// Out of bounds eth/62 block bodies received, ignore them

514 515 516 517 518 519 520 521 522
			case <-timeout:
				glog.V(logger.Debug).Infof("%v: search hash timeout", p)
				return 0, errTimeout
			}
		}
	}
	return start, nil
}

523
// fetchHashes61 keeps retrieving hashes from the requested number, until no more
524
// are returned, potentially throttling on the way.
525
func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
526 527 528 529 530 531 532 533
	glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)

	// Create a timeout timer, and the associated hash fetcher
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

	getHashes := func(from uint64) {
534 535
		glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)

536 537 538 539 540
		go p.getAbsHashes(from, MaxHashFetch)
		timeout.Reset(hashTTL)
	}
	// Start pulling hashes, until all are exhausted
	getHashes(from)
541 542
	gotHashes := false

543 544 545 546 547
	for {
		select {
		case <-d.cancelCh:
			return errCancelHashFetch

548 549 550 551 552 553
		case <-d.headerCh:
			// Out of bounds eth/62 block headers received, ignore them

		case <-d.bodyCh:
			// Out of bounds eth/62 block bodies received, ignore them

554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
		case hashPack := <-d.hashCh:
			// Make sure the active peer is giving us the hashes
			if hashPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId)
				break
			}
			timeout.Stop()

			// If no more hashes are inbound, notify the block fetcher and return
			if len(hashPack.hashes) == 0 {
				glog.V(logger.Debug).Infof("%v: no available hashes", p)

				select {
				case d.processCh <- false:
				case <-d.cancelCh:
				}
570 571 572 573 574 575 576 577 578 579 580 581 582
				// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
				// better chain compared to ours. The only exception is if it's promised blocks were
				// already imported by other means (e.g. fecher):
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new hashes up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
				if !gotHashes && td.Cmp(d.headBlock().Td) > 0 {
583 584
					return errStallingPeer
				}
585 586
				return nil
			}
587 588
			gotHashes = true

589
			// Otherwise insert all the new hashes, aborting in case of junk
590 591
			glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from)

592
			inserts := d.queue.Insert61(hashPack.hashes, true)
593 594 595 596
			if len(inserts) != len(hashPack.hashes) {
				glog.V(logger.Debug).Infof("%v: stale hashes", p)
				return errBadPeer
			}
597 598
			// Notify the block fetcher of new hashes, but stop if queue is full
			cont := d.queue.Pending() < maxQueuedHashes
599
			select {
600
			case d.processCh <- cont:
601 602
			default:
			}
603 604 605 606
			if !cont {
				return nil
			}
			// Queue not yet full, fetch the next batch
607 608 609 610 611 612 613 614 615 616
			from += uint64(len(hashPack.hashes))
			getHashes(from)

		case <-timeout.C:
			glog.V(logger.Debug).Infof("%v: hash request timed out", p)
			return errTimeout
		}
	}
}

617
// fetchBlocks61 iteratively downloads the scheduled hashes, taking any available
618 619
// peers, reserving a chunk of blocks for each, waiting for delivery and also
// periodically checking for timeouts.
620
func (d *Downloader) fetchBlocks61(from uint64) error {
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
	glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
	defer glog.V(logger.Debug).Infof("Block download terminated")

	// Create a timeout timer for scheduling expiration tasks
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

	// Prepare the queue and fetch blocks until the hash fetcher's done
	d.queue.Prepare(from)
	finished := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelBlockFetch

639 640 641 642 643 644
		case <-d.headerCh:
			// Out of bounds eth/62 block headers received, ignore them

		case <-d.bodyCh:
			// Out of bounds eth/62 block bodies received, ignore them

645 646 647 648 649
		case blockPack := <-d.blockCh:
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
			if peer := d.peers.Peer(blockPack.peerId); peer != nil {
				// Deliver the received chunk of blocks, and demote in case of errors
650
				err := d.queue.Deliver61(blockPack.peerId, blockPack.blocks)
651 652 653 654 655
				switch err {
				case nil:
					// If no blocks were delivered, demote the peer (need the delivery above)
					if len(blockPack.blocks) == 0 {
						peer.Demote()
656
						peer.SetIdle61()
657 658 659 660 661
						glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
						break
					}
					// All was successful, promote the peer and potentially start processing
					peer.Promote()
662
					peer.SetIdle61()
663 664 665 666 667 668 669 670 671 672 673
					glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
					go d.process()

				case errInvalidChain:
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
					peer.Demote()
674
					peer.SetIdle61()
675 676 677 678 679 680 681 682 683 684 685 686 687
					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: stale delivery", peer)

				default:
					// Peer did something semi-useful, demote but keep it around
					peer.Demote()
688
					peer.SetIdle61()
689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745
					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
					go d.process()
				}
			}
			// Blocks arrived, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case cont := <-d.processCh:
			// The hash fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Hashes arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
			// Check for block request timeouts and demote the responsible peers
			for _, pid := range d.queue.Expire(blockHardTTL) {
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
				}
			}
			// If there's noting more to fetch, wait or terminate
			if d.queue.Pending() == 0 {
				if d.queue.InFlight() == 0 && finished {
					glog.V(logger.Debug).Infof("Block fetching completed")
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
			for _, peer := range d.peers.IdlePeers() {
				// Short circuit if throttling activated
				if d.queue.Throttle() {
					break
				}
				// Reserve a chunk of hashes for a peer. A nil can mean either that
				// no more hashes are available, or that the peer is known not to
				// have them.
746
				request := d.queue.Reserve61(peer, peer.Capacity())
747 748 749 750 751 752 753
				if request == nil {
					continue
				}
				if glog.V(logger.Detail) {
					glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
				}
				// Fetch the chunk and make sure any errors return the hashes to the queue
754
				if err := peer.Fetch61(request); err != nil {
755 756 757 758 759 760 761 762 763 764 765 766 767
					glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
					d.queue.Cancel(request)
				}
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
			if !d.queue.Throttle() && d.queue.InFlight() == 0 {
				return errPeersUnavailable
			}
		}
	}
}

768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
// findAncestor tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
// In the rare scenario when we ended up on a long reorganization (i.e. none of
// the head blocks match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peer) (uint64, error) {
	glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)

	// Request our head blocks to short circuit ancestor location
	head := d.headBlock().NumberU64()
	from := int64(head) - int64(MaxHeaderFetch) + 1
	if from < 0 {
		from = 0
	}
	go p.getAbsHeaders(uint64(from), MaxHeaderFetch, 0, false)

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
	timeout := time.After(hashTTL)

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
			return 0, errCancelHashFetch

		case headerPack := <-d.headerCh:
			// Discard anything not from the origin peer
			if headerPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
				break
			}
			// Make sure the peer actually gave something valid
			headers := headerPack.headers
			if len(headers) == 0 {
				glog.V(logger.Debug).Infof("%v: empty head header set", p)
				return 0, errEmptyHeaderSet
			}
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
				if d.hasBlock(headers[i].Hash()) {
					number, hash = headers[i].Number.Uint64(), headers[i].Hash()
					break
				}
			}

		case <-d.bodyCh:
			// Out of bounds block bodies received, ignore them

		case <-d.hashCh:
			// Out of bounds eth/61 hashes received, ignore them

		case <-d.blockCh:
			// Out of bounds eth/61 blocks received, ignore them

		case <-timeout:
			glog.V(logger.Debug).Infof("%v: head header timeout", p)
			return 0, errTimeout
		}
	}
	// If the head fetch already found an ancestor, return
	if !common.EmptyHash(hash) {
		glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
		return number, nil
	}
	// Ancestor not found, we need to binary search over our chain
	start, end := uint64(0), head
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

		timeout := time.After(hashTTL)
		go p.getAbsHeaders(uint64(check), 1, 0, false)

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
				return 0, errCancelHashFetch

			case headerPack := <-d.headerCh:
				// Discard anything not from the origin peer
				if headerPack.peerId != p.id {
					glog.V(logger.Debug).Infof("Received headers from incorrect peer(%s)", headerPack.peerId)
					break
				}
				// Make sure the peer actually gave something valid
				headers := headerPack.headers
				if len(headers) != 1 {
					glog.V(logger.Debug).Infof("%v: invalid search header set (%d)", p, len(headers))
					return 0, errBadPeer
				}
				arrived = true

				// Modify the search interval based on the response
				block := d.getBlock(headers[0].Hash())
				if block == nil {
					end = check
					break
				}
				if block.NumberU64() != check {
					glog.V(logger.Debug).Infof("%v: non requested header #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
					return 0, errBadPeer
				}
				start = check

			case <-d.bodyCh:
				// Out of bounds block bodies received, ignore them

			case <-d.hashCh:
				// Out of bounds eth/61 hashes received, ignore them

			case <-d.blockCh:
				// Out of bounds eth/61 blocks received, ignore them

			case <-timeout:
				glog.V(logger.Debug).Infof("%v: search header timeout", p)
				return 0, errTimeout
			}
		}
	}
	return start, nil
}

// fetchHeaders keeps retrieving headers from the requested number, until no more
// are returned, potentially throttling on the way.
func (d *Downloader) fetchHeaders(p *peer, td *big.Int, from uint64) error {
	glog.V(logger.Debug).Infof("%v: downloading headers from #%d", p, from)
	defer glog.V(logger.Debug).Infof("%v: header download terminated", p)

	// Create a timeout timer, and the associated hash fetcher
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

	getHeaders := func(from uint64) {
		glog.V(logger.Detail).Infof("%v: fetching %d headers from #%d", p, MaxHeaderFetch, from)

		go p.getAbsHeaders(from, MaxHeaderFetch, 0, false)
		timeout.Reset(headerTTL)
	}
	// Start pulling headers, until all are exhausted
	getHeaders(from)
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelHeaderFetch

		case <-d.hashCh:
			// Out of bounds eth/61 hashes received, ignore them

		case <-d.blockCh:
			// Out of bounds eth/61 blocks received, ignore them

		case headerPack := <-d.headerCh:
			// Make sure the active peer is giving us the headers
			if headerPack.peerId != p.id {
				glog.V(logger.Debug).Infof("Received headers from incorrect peer (%s)", headerPack.peerId)
				break
			}
			timeout.Stop()

			// If no more headers are inbound, notify the body fetcher and return
			if len(headerPack.headers) == 0 {
				glog.V(logger.Debug).Infof("%v: no available headers", p)

				select {
				case d.processCh <- false:
				case <-d.cancelCh:
				}
				// If no headers were retrieved at all, the peer violated it's TD promise that it had a
				// better chain compared to ours. The only exception is if it's promised blocks were
				// already imported by other means (e.g. fecher):
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
				if !gotHeaders && td.Cmp(d.headBlock().Td) > 0 {
					return errStallingPeer
				}
				return nil
			}
			gotHeaders = true

			// Otherwise insert all the new headers, aborting in case of junk
			glog.V(logger.Detail).Infof("%v: inserting %d headers from #%d", p, len(headerPack.headers), from)

			inserts := d.queue.Insert(headerPack.headers)
			if len(inserts) != len(headerPack.headers) {
				glog.V(logger.Debug).Infof("%v: stale headers", p)
				return errBadPeer
			}
			// Notify the block fetcher of new headers, but stop if queue is full
			cont := d.queue.Pending() < maxQueuedHeaders
			select {
			case d.processCh <- cont:
			default:
			}
			if !cont {
				return nil
			}
			// Queue not yet full, fetch the next batch
			from += uint64(len(headerPack.headers))
			getHeaders(from)

		case <-timeout.C:
			// Header retrieval timed out, consider the peer bad and drop
			glog.V(logger.Debug).Infof("%v: header request timed out", p)
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
			select {
			case d.processCh <- false:
			default:
			}
			return nil
		}
	}
}

// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
	glog.V(logger.Debug).Infof("Downloading block bodies from #%d", from)
	defer glog.V(logger.Debug).Infof("Block body download terminated")

	// Create a timeout timer for scheduling expiration tasks
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

	// Prepare the queue and fetch block bodies until the block header fetcher's done
	d.queue.Prepare(from)
	finished := false

	for {
		select {
		case <-d.cancelCh:
			return errCancelBlockFetch

		case <-d.hashCh:
			// Out of bounds eth/61 hashes received, ignore them

		case <-d.blockCh:
			// Out of bounds eth/61 blocks received, ignore them

		case bodyPack := <-d.bodyCh:
			// If the peer was previously banned and failed to deliver it's pack
			// in a reasonable time frame, ignore it's message.
			if peer := d.peers.Peer(bodyPack.peerId); peer != nil {
				// Deliver the received chunk of bodies, and demote in case of errors
				err := d.queue.Deliver(bodyPack.peerId, bodyPack.transactions, bodyPack.uncles)
				switch err {
				case nil:
					// If no blocks were delivered, demote the peer (need the delivery above)
					if len(bodyPack.transactions) == 0 || len(bodyPack.uncles) == 0 {
						peer.Demote()
						peer.SetIdle()
						glog.V(logger.Detail).Infof("%s: no block bodies delivered", peer)
						break
					}
					// All was successful, promote the peer and potentially start processing
					peer.Promote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivered %d:%d block bodies", peer, len(bodyPack.transactions), len(bodyPack.uncles))
					go d.process()

				case errInvalidChain:
					// The hash chain is invalid (blocks are not ordered properly), abort
					return err

				case errInvalidBody:
					// The peer delivered something very bad, drop immediately
					glog.V(logger.Error).Infof("%s: delivered invalid block, dropping", peer)
					d.dropPeer(peer.id)

				case errNoFetchesPending:
					// Peer probably timed out with its delivery but came through
					// in the end, demote, but allow to to pull from this peer.
					peer.Demote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: out of bound delivery", peer)

				case errStaleDelivery:
					// Delivered something completely else than requested, usually
					// caused by a timeout and delivery during a new sync cycle.
					// Don't set it to idle as the original request should still be
					// in flight.
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: stale delivery", peer)

				default:
					// Peer did something semi-useful, demote but keep it around
					peer.Demote()
					peer.SetIdle()
					glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err)
					go d.process()
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case cont := <-d.processCh:
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
			// Check for block body request timeouts and demote the responsible peers
			for _, pid := range d.queue.Expire(bodyHardTTL) {
				if peer := d.peers.Peer(pid); peer != nil {
					peer.Demote()
					glog.V(logger.Detail).Infof("%s: block body delivery timeout", peer)
				}
			}
			// If there's noting more to fetch, wait or terminate
			if d.queue.Pending() == 0 {
				if d.queue.InFlight() == 0 && finished {
					glog.V(logger.Debug).Infof("Block body fetching completed")
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
			queuedEmptyBlocks, throttled := false, false
			for _, peer := range d.peers.IdlePeers() {
				// Short circuit if throttling activated
				if d.queue.Throttle() {
					throttled = true
					break
				}
				// Reserve a chunk of hashes for a peer. A nil can mean either that
				// no more hashes are available, or that the peer is known not to
				// have them.
				request, process, err := d.queue.Reserve(peer, peer.Capacity())
				if err != nil {
					return err
				}
				if process {
					queuedEmptyBlocks = true
					go d.process()
				}
				if request == nil {
					continue
				}
				if glog.V(logger.Detail) {
					glog.Infof("%s: requesting %d block bodies", peer, len(request.Headers))
				}
				// Fetch the chunk and make sure any errors return the hashes to the queue
				if d.bodyFetchHook != nil {
					d.bodyFetchHook(request.Headers)
				}
				if err := peer.Fetch(request); err != nil {
					glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer)
					d.queue.Cancel(request)
				}
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
			if !queuedEmptyBlocks && !throttled && d.queue.InFlight() == 0 {
				return errPeersUnavailable
			}
		}
	}
}

1163
// process takes blocks from the queue and tries to import them into the chain.
1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
//
// The algorithmic flow is as follows:
//  - The `processing` flag is swapped to 1 to ensure singleton access
//  - The current `cancel` channel is retrieved to detect sync abortions
//  - Blocks are iteratively taken from the cache and inserted into the chain
//  - When the cache becomes empty, insertion stops
//  - The `processing` flag is swapped back to 0
//  - A post-exit check is made whether new blocks became available
//     - This step is important: it handles a potential race condition between
//       checking for no more work, and releasing the processing "mutex". In
//       between these state changes, a block may have arrived, but a processing
//       attempt denied, so we need to re-enter to ensure the block isn't left
//       to idle in the cache.
1177
func (d *Downloader) process() {
1178 1179 1180 1181 1182 1183 1184 1185 1186
	// Make sure only one goroutine is ever allowed to process blocks at once
	if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
		return
	}
	// If the processor just exited, but there are freshly pending items, try to
	// reenter. This is needed because the goroutine spinned up for processing
	// the fresh blocks might have been rejected entry to to this present thread
	// not yet releasing the `processing` state.
	defer func() {
1187 1188
		if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
			d.process()
1189 1190
		}
	}()
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
	// Release the lock upon exit (note, before checking for reentry!), and set
	// the import statistics to zero.
	defer func() {
		d.importLock.Lock()
		d.importQueue = nil
		d.importDone = 0
		d.importLock.Unlock()

		atomic.StoreInt32(&d.processing, 0)
	}()
1201 1202 1203 1204 1205
	// Repeat the processing as long as there are blocks to import
	for {
		// Fetch the next batch of blocks
		blocks := d.queue.TakeBlocks()
		if len(blocks) == 0 {
1206
			return
1207
		}
1208 1209 1210
		if d.chainInsertHook != nil {
			d.chainInsertHook(blocks)
		}
1211 1212 1213 1214 1215 1216 1217 1218 1219
		// Reset the import statistics
		d.importLock.Lock()
		d.importStart = time.Now()
		d.importQueue = blocks
		d.importDone = 0
		d.importLock.Unlock()

		// Actually import the blocks
		glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
1220
		for len(blocks) != 0 {
1221
			// Check for any termination requests
1222
			if atomic.LoadInt32(&d.interrupt) == 1 {
1223
				return
1224 1225 1226 1227 1228 1229 1230 1231 1232 1233
			}
			// Retrieve the first batch of blocks to insert
			max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
			raw := make(types.Blocks, 0, max)
			for _, block := range blocks[:max] {
				raw = append(raw, block.RawBlock)
			}
			// Try to inset the blocks, drop the originating peer if there's an error
			index, err := d.insertChain(raw)
			if err != nil {
1234
				glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
1235
				d.dropPeer(blocks[index].OriginPeer)
1236
				d.cancel()
1237
				return
1238 1239 1240 1241 1242 1243
			}
			blocks = blocks[max:]
		}
	}
}

1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266
// DeliverHashes61 injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
func (d *Downloader) DeliverHashes61(id string, hashes []common.Hash) error {
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()

	select {
	case d.hashCh <- hashPack{id, hashes}:
		return nil

	case <-cancel:
		return errNoSyncActive
	}
}

// DeliverBlocks61 injects a new batch of blocks received from a remote node.
1267
// This is usually invoked through the BlocksMsg by the protocol handler.
1268
func (d *Downloader) DeliverBlocks61(id string, blocks []*types.Block) error {
1269 1270 1271 1272
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
1273 1274 1275 1276
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1277

1278 1279 1280 1281 1282 1283 1284
	select {
	case d.blockCh <- blockPack{id, blocks}:
		return nil

	case <-cancel:
		return errNoSyncActive
	}
O
moved  
obscuren 已提交
1285 1286
}

1287 1288 1289
// DeliverHeaders injects a new batch of blck headers received from a remote
// node into the download schedule.
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error {
1290 1291 1292 1293
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
1294 1295 1296 1297
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
1298

1299
	select {
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320
	case d.headerCh <- headerPack{id, headers}:
		return nil

	case <-cancel:
		return errNoSyncActive
	}
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
	// Make sure the downloader is active
	if atomic.LoadInt32(&d.synchronising) == 0 {
		return errNoSyncActive
	}
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()

	select {
	case d.bodyCh <- bodyPack{id, transactions, uncles}:
1321 1322 1323 1324 1325
		return nil

	case <-cancel:
		return errNoSyncActive
	}
1326
}