downloader_test.go 29.2 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17 18 19
package downloader

import (
20
	"errors"
21
	"fmt"
22
	"math/big"
23
	"sync/atomic"
24 25 26 27
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/common"
28
	"github.com/ethereum/go-ethereum/core"
29
	"github.com/ethereum/go-ethereum/core/types"
30
	"github.com/ethereum/go-ethereum/crypto"
31
	"github.com/ethereum/go-ethereum/ethdb"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/event"
33
	"github.com/ethereum/go-ethereum/params"
34 35
)

36
var (
37 38 39 40
	testdb, _   = ethdb.NewMemDatabase()
	testKey, _  = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
	testAddress = crypto.PubkeyToAddress(testKey.PublicKey)
	genesis     = core.GenesisBlockForTesting(testdb, testAddress, big.NewInt(1000000000))
41
)
42

43 44 45 46
// makeChain creates a chain of n blocks starting at and including parent.
// the returned hash chain is ordered head->parent. In addition, every 3rd block
// contains a transaction and every 5th an uncle to allow testing correct block
// reassembly.
47
func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) {
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
	blocks := core.GenerateChain(parent, testdb, n, func(i int, block *core.BlockGen) {
		block.SetCoinbase(common.Address{seed})

		// If the block number is multiple of 3, send a bonus transaction to the miner
		if parent == genesis && i%3 == 0 {
			tx, err := types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(testKey)
			if err != nil {
				panic(err)
			}
			block.AddTx(tx)
		}
		// If the block number is a multiple of 5, add a bonus uncle to the block
		if i%5 == 0 {
			block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
		}
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
	})
	hashes := make([]common.Hash, n+1)
	hashes[len(hashes)-1] = parent.Hash()
	blockm := make(map[common.Hash]*types.Block, n+1)
	blockm[parent.Hash()] = parent
	for i, b := range blocks {
		hashes[len(hashes)-i-2] = b.Hash()
		blockm[b.Hash()] = b
	}
	return hashes, blockm
}

// makeChainFork creates two chains of length n, such that h1[:f] and
// h2[:f] are different but have a common suffix of length n-f.
func makeChainFork(n, f int, parent *types.Block) (h1, h2 []common.Hash, b1, b2 map[common.Hash]*types.Block) {
	// Create the common suffix.
79
	h, b := makeChain(n-f, 0, parent)
80 81 82 83 84 85 86 87 88 89
	// Create the forks.
	h1, b1 = makeChain(f, 1, b[h[0]])
	h1 = append(h1, h[1:]...)
	h2, b2 = makeChain(f, 2, b[h[0]])
	h2 = append(h2, h[1:]...)
	for hash, block := range b {
		b1[hash] = block
		b2[hash] = block
	}
	return h1, h2, b1, b2
90 91
}

92
// downloadTester is a test simulator for mocking out local block chain.
93
type downloadTester struct {
94 95
	downloader *Downloader

96 97 98 99 100 101
	ownHashes    []common.Hash                           // Hash chain belonging to the tester
	ownBlocks    map[common.Hash]*types.Block            // Blocks belonging to the tester
	ownChainTd   map[common.Hash]*big.Int                // Total difficulties of the blocks in the local chain
	peerHashes   map[string][]common.Hash                // Hash chain belonging to different test peers
	peerBlocks   map[string]map[common.Hash]*types.Block // Blocks belonging to different test peers
	peerChainTds map[string]map[common.Hash]*big.Int     // Total difficulties of the blocks in the peer chains
102 103
}

104
// newTester creates a new downloader test mocker.
105
func newTester() *downloadTester {
106
	tester := &downloadTester{
107 108 109 110 111 112
		ownHashes:    []common.Hash{genesis.Hash()},
		ownBlocks:    map[common.Hash]*types.Block{genesis.Hash(): genesis},
		ownChainTd:   map[common.Hash]*big.Int{genesis.Hash(): genesis.Difficulty()},
		peerHashes:   make(map[string][]common.Hash),
		peerBlocks:   make(map[string]map[common.Hash]*types.Block),
		peerChainTds: make(map[string]map[common.Hash]*big.Int),
113
	}
114
	tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.getTd, tester.insertChain, tester.dropPeer)
115 116 117 118

	return tester
}

119
// sync starts synchronizing with a remote peer, blocking until it completes.
120 121 122 123 124 125
func (dl *downloadTester) sync(id string, td *big.Int) error {
	hash := dl.peerHashes[id][0]

	// If no particular TD was requested, load from the peer's blockchain
	if td == nil {
		td = big.NewInt(1)
126 127
		if diff, ok := dl.peerChainTds[id][hash]; ok {
			td = diff
128 129 130 131
		}
	}
	err := dl.downloader.synchronise(id, hash, td)

132 133 134 135 136 137 138
	for {
		// If the queue is empty and processing stopped, break
		hashes, blocks := dl.downloader.queue.Size()
		if hashes+blocks == 0 && atomic.LoadInt32(&dl.downloader.processing) == 0 {
			break
		}
		// Otherwise sleep a bit and retry
139 140 141
		time.Sleep(time.Millisecond)
	}
	return err
O
obscuren 已提交
142 143
}

144
// hasBlock checks if a block is pres	ent in the testers canonical chain.
145
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
146
	return dl.getBlock(hash) != nil
147 148
}

149
// getBlock retrieves a block from the testers canonical chain.
150
func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
151 152 153
	return dl.ownBlocks[hash]
}

154 155 156 157 158
// headBlock retrieves the current head block from the canonical chain.
func (dl *downloadTester) headBlock() *types.Block {
	return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1])
}

159 160 161 162 163
// getTd retrieves the block's total difficulty from the canonical chain.
func (dl *downloadTester) getTd(hash common.Hash) *big.Int {
	return dl.ownChainTd[hash]
}

164 165 166 167 168 169 170 171
// insertChain injects a new batch of blocks into the simulated chain.
func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) {
	for i, block := range blocks {
		if _, ok := dl.ownBlocks[block.ParentHash()]; !ok {
			return i, errors.New("unknown parent")
		}
		dl.ownHashes = append(dl.ownHashes, block.Hash())
		dl.ownBlocks[block.Hash()] = block
172
		dl.ownChainTd[block.Hash()] = dl.ownChainTd[block.ParentHash()]
173 174 175 176
	}
	return len(blocks), nil
}

177
// newPeer registers a new block download source into the downloader.
178 179
func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error {
	return dl.newSlowPeer(id, version, hashes, blocks, 0)
180 181 182 183 184
}

// newSlowPeer registers a new block download source into the downloader, with a
// specific delay time on processing the network packets sent to it, simulating
// potentially slow network IO.
185
func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error {
186 187 188
	err := dl.downloader.RegisterPeer(id, version, hashes[0],
		dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay),
		nil, dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay))
189
	if err == nil {
190 191 192
		// Assign the owned hashes and blocks to the peer (deep copy)
		dl.peerHashes[id] = make([]common.Hash, len(hashes))
		copy(dl.peerHashes[id], hashes)
193

194
		dl.peerBlocks[id] = make(map[common.Hash]*types.Block)
195 196 197 198 199 200 201 202
		dl.peerChainTds[id] = make(map[common.Hash]*big.Int)
		for _, hash := range hashes {
			if block, ok := blocks[hash]; ok {
				dl.peerBlocks[id][hash] = block
				if parent, ok := dl.peerBlocks[id][block.ParentHash()]; ok {
					dl.peerChainTds[id][hash] = new(big.Int).Add(block.Difficulty(), dl.peerChainTds[id][parent.Hash()])
				}
			}
203
		}
204 205
	}
	return err
206 207
}

208 209 210 211
// dropPeer simulates a hard peer removal from the connection pool.
func (dl *downloadTester) dropPeer(id string) {
	delete(dl.peerHashes, id)
	delete(dl.peerBlocks, id)
212
	delete(dl.peerChainTds, id)
213 214 215 216

	dl.downloader.UnregisterPeer(id)
}

217
// peerGetRelHashesFn constructs a GetHashes function associated with a specific
218 219
// peer in the download tester. The returned function can be used to retrieve
// batches of hashes from the particularly requested peer.
220
func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) func(head common.Hash) error {
221
	return func(head common.Hash) error {
222 223
		time.Sleep(delay)

224 225
		// Gather the next batch of hashes
		hashes := dl.peerHashes[id]
226
		result := make([]common.Hash, 0, MaxHashFetch)
227 228
		for i, hash := range hashes {
			if hash == head {
229
				i++
230 231 232 233 234
				for len(result) < cap(result) && i < len(hashes) {
					result = append(result, hashes[i])
					i++
				}
				break
235 236
			}
		}
237 238 239
		// Delay delivery a bit to allow attacks to unfold
		go func() {
			time.Sleep(time.Millisecond)
240
			dl.downloader.DeliverHashes61(id, result)
241 242
		}()
		return nil
243
	}
244 245
}

246 247 248
// peerGetAbsHashesFn constructs a GetHashesFromNumber function associated with
// a particular peer in the download tester. The returned function can be used to
// retrieve batches of hashes from the particularly requested peer.
249
func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) func(uint64, int) error {
250 251 252 253 254
	return func(head uint64, count int) error {
		time.Sleep(delay)

		// Gather the next batch of hashes
		hashes := dl.peerHashes[id]
255 256
		result := make([]common.Hash, 0, count)
		for i := 0; i < count && len(hashes)-int(head)-1-i >= 0; i++ {
257 258 259 260 261
			result = append(result, hashes[len(hashes)-int(head)-1-i])
		}
		// Delay delivery a bit to allow attacks to unfold
		go func() {
			time.Sleep(time.Millisecond)
262
			dl.downloader.DeliverHashes61(id, result)
263 264 265 266 267
		}()
		return nil
	}
}

268 269 270
// peerGetBlocksFn constructs a getBlocks function associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of blocks from the particularly requested peer.
271
func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error {
272
	return func(hashes []common.Hash) error {
273
		time.Sleep(delay)
274 275
		blocks := dl.peerBlocks[id]
		result := make([]*types.Block, 0, len(hashes))
276
		for _, hash := range hashes {
277 278
			if block, ok := blocks[hash]; ok {
				result = append(result, block)
279
			}
280
		}
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
		go dl.downloader.DeliverBlocks61(id, result)

		return nil
	}
}

// peerGetAbsHeadersFn constructs a GetBlockHeaders function based on a numbered
// origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer.
func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) func(uint64, int, int, bool) error {
	return func(origin uint64, amount int, skip int, reverse bool) error {
		time.Sleep(delay)

		// Gather the next batch of hashes
		hashes := dl.peerHashes[id]
		blocks := dl.peerBlocks[id]
		result := make([]*types.Header, 0, amount)
		for i := 0; i < amount && len(hashes)-int(origin)-1-i >= 0; i++ {
			if block, ok := blocks[hashes[len(hashes)-int(origin)-1-i]]; ok {
				result = append(result, block.Header())
			}
		}
		// Delay delivery a bit to allow attacks to unfold
		go func() {
			time.Sleep(time.Millisecond)
			dl.downloader.DeliverHeaders(id, result)
		}()
		return nil
	}
}

// peerGetBodiesFn constructs a getBlockBodies method associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of block bodies from the particularly requested peer.
func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([]common.Hash) error {
	return func(hashes []common.Hash) error {
		time.Sleep(delay)
		blocks := dl.peerBlocks[id]

		transactions := make([][]*types.Transaction, 0, len(hashes))
		uncles := make([][]*types.Header, 0, len(hashes))

		for _, hash := range hashes {
			if block, ok := blocks[hash]; ok {
				transactions = append(transactions, block.Transactions())
				uncles = append(uncles, block.Uncles())
			}
		}
		go dl.downloader.DeliverBodies(id, transactions, uncles)
330 331 332 333 334

		return nil
	}
}

335 336 337
// Tests that simple synchronization against a canonical chain works correctly.
// In this test common ancestor lookup should be short circuited and not require
// binary searching.
338 339 340 341 342 343
func TestCanonicalSynchronisation61(t *testing.T) { testCanonicalSynchronisation(t, 61) }
func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62) }
func TestCanonicalSynchronisation63(t *testing.T) { testCanonicalSynchronisation(t, 63) }
func TestCanonicalSynchronisation64(t *testing.T) { testCanonicalSynchronisation(t, 64) }

func testCanonicalSynchronisation(t *testing.T, protocol int) {
344 345 346 347
	// Create a small enough block chain to download
	targetBlocks := blockCacheLimit - 15
	hashes, blocks := makeChain(targetBlocks, 0, genesis)

348
	tester := newTester()
349
	tester.newPeer("peer", protocol, hashes, blocks)
350

351
	// Synchronise with the peer and make sure all blocks were retrieved
352
	if err := tester.sync("peer", nil); err != nil {
353
		t.Fatalf("failed to synchronise blocks: %v", err)
354
	}
355 356
	if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
357 358 359
	}
}

360 361
// Tests that if a large batch of blocks are being downloaded, it is throttled
// until the cached blocks are retrieved.
362 363 364 365
func TestThrottling61(t *testing.T) { testThrottling(t, 61) }
func TestThrottling62(t *testing.T) { testThrottling(t, 62) }
func TestThrottling63(t *testing.T) { testThrottling(t, 63) }
func TestThrottling64(t *testing.T) { testThrottling(t, 64) }
366

367
func testThrottling(t *testing.T, protocol int) {
368 369
	// Create a long block chain to download and the tester
	targetBlocks := 8 * blockCacheLimit
370
	hashes, blocks := makeChain(targetBlocks, 0, genesis)
371

372
	tester := newTester()
373
	tester.newPeer("peer", protocol, hashes, blocks)
374

375
	// Wrap the importer to allow stepping
376 377 378 379
	blocked, proceed := uint32(0), make(chan struct{})
	tester.downloader.chainInsertHook = func(blocks []*Block) {
		atomic.StoreUint32(&blocked, uint32(len(blocks)))
		<-proceed
380
	}
381 382 383
	// Start a synchronisation concurrently
	errc := make(chan error)
	go func() {
384
		errc <- tester.sync("peer", nil)
385 386
	}()
	// Iteratively take some blocks, always checking the retrieval count
387 388 389
	for len(tester.ownBlocks) < targetBlocks+1 {
		// Wait a bit for sync to throttle itself
		var cached int
390
		for start := time.Now(); time.Since(start) < time.Second; {
391
			time.Sleep(25 * time.Millisecond)
392 393

			cached = len(tester.downloader.queue.blockPool)
394
			if cached == blockCacheLimit || len(tester.ownBlocks)+cached+int(atomic.LoadUint32(&blocked)) == targetBlocks+1 {
395 396 397
				break
			}
		}
398 399
		// Make sure we filled up the cache, then exhaust it
		time.Sleep(25 * time.Millisecond) // give it a chance to screw up
400 401
		if cached != blockCacheLimit && len(tester.ownBlocks)+cached+int(atomic.LoadUint32(&blocked)) != targetBlocks+1 {
			t.Fatalf("block count mismatch: have %v, want %v (owned %v, target %v)", cached, blockCacheLimit, len(tester.ownBlocks), targetBlocks+1)
402
		}
403 404 405 406
		// Permit the blocked blocks to import
		if atomic.LoadUint32(&blocked) > 0 {
			atomic.StoreUint32(&blocked, uint32(0))
			proceed <- struct{}{}
407
		}
408 409 410 411
	}
	// Check that we haven't pulled more blocks than available
	if len(tester.ownBlocks) > targetBlocks+1 {
		t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1)
412
	}
413 414
	if err := <-errc; err != nil {
		t.Fatalf("block synchronization failed: %v", err)
415 416
	}
}
417

418 419 420
// Tests that simple synchronization against a forked chain works correctly. In
// this test common ancestor lookup should *not* be short circuited, and a full
// binary search should be executed.
421 422 423 424 425 426
func TestForkedSynchronisation61(t *testing.T) { testForkedSynchronisation(t, 61) }
func TestForkedSynchronisation62(t *testing.T) { testForkedSynchronisation(t, 62) }
func TestForkedSynchronisation63(t *testing.T) { testForkedSynchronisation(t, 63) }
func TestForkedSynchronisation64(t *testing.T) { testForkedSynchronisation(t, 64) }

func testForkedSynchronisation(t *testing.T, protocol int) {
427 428 429 430 431
	// Create a long enough forked chain
	common, fork := MaxHashFetch, 2*MaxHashFetch
	hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis)

	tester := newTester()
432 433
	tester.newPeer("fork A", protocol, hashesA, blocksA)
	tester.newPeer("fork B", protocol, hashesB, blocksB)
434 435

	// Synchronise with the peer and make sure all blocks were retrieved
436
	if err := tester.sync("fork A", nil); err != nil {
437 438 439 440 441 442
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
	if imported := len(tester.ownBlocks); imported != common+fork+1 {
		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+fork+1)
	}
	// Synchronise with the second peer and make sure that fork is pulled too
443
	if err := tester.sync("fork B", nil); err != nil {
444 445 446 447 448 449 450 451
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
	if imported := len(tester.ownBlocks); imported != common+2*fork+1 {
		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+2*fork+1)
	}
}

// Tests that an inactive downloader will not accept incoming hashes and blocks.
452
func TestInactiveDownloader61(t *testing.T) {
453 454 455
	tester := newTester()

	// Check that neither hashes nor blocks are accepted
456 457 458 459 460 461 462 463 464 465 466 467 468 469
	if err := tester.downloader.DeliverHashes61("bad peer", []common.Hash{}); err != errNoSyncActive {
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
	}
	if err := tester.downloader.DeliverBlocks61("bad peer", []*types.Block{}); err != errNoSyncActive {
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
	}
}

// Tests that an inactive downloader will not accept incoming block headers and bodies.
func TestInactiveDownloader62(t *testing.T) {
	tester := newTester()

	// Check that neither block headers nor bodies are accepted
	if err := tester.downloader.DeliverHeaders("bad peer", []*types.Header{}); err != errNoSyncActive {
470 471
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
	}
472
	if err := tester.downloader.DeliverBodies("bad peer", [][]*types.Transaction{}, [][]*types.Header{}); err != errNoSyncActive {
473 474 475 476 477
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
	}
}

// Tests that a canceled download wipes all previously accumulated state.
478 479 480 481
func TestCancel61(t *testing.T) { testCancel(t, 61) }
func TestCancel62(t *testing.T) { testCancel(t, 62) }
func TestCancel63(t *testing.T) { testCancel(t, 63) }
func TestCancel64(t *testing.T) { testCancel(t, 64) }
482

483
func testCancel(t *testing.T, protocol int) {
484 485 486 487 488
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
	if targetBlocks >= MaxHashFetch {
		targetBlocks = MaxHashFetch - 15
	}
489 490 491
	if targetBlocks >= MaxHeaderFetch {
		targetBlocks = MaxHeaderFetch - 15
	}
492 493 494
	hashes, blocks := makeChain(targetBlocks, 0, genesis)

	tester := newTester()
495
	tester.newPeer("peer", protocol, hashes, blocks)
496 497 498

	// Make sure canceling works with a pristine downloader
	tester.downloader.cancel()
499 500 501
	downloading, importing := tester.downloader.queue.Size()
	if downloading > 0 || importing > 0 {
		t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing)
502 503
	}
	// Synchronise with the peer, but cancel afterwards
504
	if err := tester.sync("peer", nil); err != nil {
505 506 507
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
	tester.downloader.cancel()
508 509 510
	downloading, importing = tester.downloader.queue.Size()
	if downloading > 0 || importing > 0 {
		t.Errorf("download or import count mismatch: %d downloading, %d importing, want 0", downloading, importing)
511 512 513
	}
}

514
// Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
515 516 517 518
func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, 61) }
func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62) }
func TestMultiSynchronisation63(t *testing.T) { testMultiSynchronisation(t, 63) }
func TestMultiSynchronisation64(t *testing.T) { testMultiSynchronisation(t, 64) }
519 520

func testMultiSynchronisation(t *testing.T, protocol int) {
521
	// Create various peers with various parts of the chain
522
	targetPeers := 8
523
	targetBlocks := targetPeers*blockCacheLimit - 15
524
	hashes, blocks := makeChain(targetBlocks, 0, genesis)
525 526 527 528

	tester := newTester()
	for i := 0; i < targetPeers; i++ {
		id := fmt.Sprintf("peer #%d", i)
529
		tester.newPeer(id, protocol, hashes[i*blockCacheLimit:], blocks)
530 531 532
	}
	// Synchronise with the middle peer and make sure half of the blocks were retrieved
	id := fmt.Sprintf("peer #%d", targetPeers/2)
533
	if err := tester.sync(id, nil); err != nil {
534 535 536 537 538 539
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
	if imported := len(tester.ownBlocks); imported != len(tester.peerHashes[id]) {
		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(tester.peerHashes[id]))
	}
	// Synchronise with the best peer and make sure everything is retrieved
540
	if err := tester.sync("peer #0", nil); err != nil {
541 542 543 544 545 546 547
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
	if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
	}
}

548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
// Tests that if a block is empty (i.e. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself.
func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) }
func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) }
func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) }

func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
	// Create a small enough block chain to download
	targetBlocks := blockCacheLimit - 15
	hashes, blocks := makeChain(targetBlocks, 0, genesis)

	tester := newTester()
	tester.newPeer("peer", protocol, hashes, blocks)

	// Instrument the downloader to signal body requests
	requested := int32(0)
	tester.downloader.bodyFetchHook = func(headers []*types.Header) {
		atomic.AddInt32(&requested, int32(len(headers)))
	}
	// Synchronise with the peer and make sure all blocks were retrieved
	if err := tester.sync("peer", nil); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
	if imported := len(tester.ownBlocks); imported != targetBlocks+1 {
		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1)
	}
	// Validate the number of block bodies that should have been requested
	needed := 0
	for _, block := range blocks {
		if block != genesis && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
			needed++
		}
	}
	if int(requested) != needed {
		t.Fatalf("block body retrieval count mismatch: have %v, want %v", requested, needed)
	}
}

// Tests that if a peer sends an invalid body for a requested block, it gets
// dropped immediately by the downloader.
func TestInvalidBlockBodyAttack62(t *testing.T) { testInvalidBlockBodyAttack(t, 62) }
func TestInvalidBlockBodyAttack63(t *testing.T) { testInvalidBlockBodyAttack(t, 63) }
func TestInvalidBlockBodyAttack64(t *testing.T) { testInvalidBlockBodyAttack(t, 64) }

func testInvalidBlockBodyAttack(t *testing.T, protocol int) {
	// Create two peers, one feeding invalid block bodies
	targetBlocks := 4*blockCacheLimit - 15
	hashes, validBlocks := makeChain(targetBlocks, 0, genesis)

	invalidBlocks := make(map[common.Hash]*types.Block)
	for hash, block := range validBlocks {
		invalidBlocks[hash] = types.NewBlockWithHeader(block.Header())
	}

	tester := newTester()
	tester.newPeer("valid", protocol, hashes, validBlocks)
	tester.newPeer("attack", protocol, hashes, invalidBlocks)

	// Synchronise with the valid peer (will pull contents from the attacker too)
	if err := tester.sync("valid", nil); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
	if imported := len(tester.ownBlocks); imported != len(hashes) {
		t.Fatalf("synchronised block mismatch: have %v, want %v", imported, len(hashes))
	}
	// Make sure the attacker was detected and dropped in the mean time
	if _, ok := tester.peerHashes["attack"]; ok {
		t.Fatalf("block body attacker not detected/dropped")
	}
}

619 620
// Tests that a peer advertising an high TD doesn't get to stall the downloader
// afterwards by not sending any useful hashes.
621 622 623 624 625 626
func TestHighTDStarvationAttack61(t *testing.T) { testHighTDStarvationAttack(t, 61) }
func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62) }
func TestHighTDStarvationAttack63(t *testing.T) { testHighTDStarvationAttack(t, 63) }
func TestHighTDStarvationAttack64(t *testing.T) { testHighTDStarvationAttack(t, 64) }

func testHighTDStarvationAttack(t *testing.T, protocol int) {
627
	tester := newTester()
628 629 630
	hashes, blocks := makeChain(0, 0, genesis)

	tester.newPeer("attack", protocol, []common.Hash{hashes[0]}, blocks)
631
	if err := tester.sync("attack", big.NewInt(1000000)); err != errStallingPeer {
632 633 634 635
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer)
	}
}

636
// Tests that misbehaving peers are disconnected, whilst behaving ones are not.
637 638 639 640 641 642
func TestBlockHeaderAttackerDropping61(t *testing.T) { testBlockHeaderAttackerDropping(t, 61) }
func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) }
func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }

func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
643
	// Define the disconnection requirement for individual hash fetch errors
644 645 646 647
	tests := []struct {
		result error
		drop   bool
	}{
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
		{nil, false},                  // Sync succeeded, all is well
		{errBusy, false},              // Sync is already in progress, no problem
		{errUnknownPeer, false},       // Peer is unknown, was already dropped, don't double drop
		{errBadPeer, true},            // Peer was deemed bad for some reason, drop it
		{errStallingPeer, true},       // Peer was detected to be stalling, drop it
		{errNoPeers, false},           // No peers to download from, soft race, no issue
		{errPendingQueue, false},      // There are blocks still cached, wait to exhaust, no issue
		{errTimeout, true},            // No hashes received in due time, drop the peer
		{errEmptyHashSet, true},       // No hashes were returned as a response, drop as it's a dead end
		{errEmptyHeaderSet, true},     // No headers were returned as a response, drop as it's a dead end
		{errPeersUnavailable, true},   // Nobody had the advertised blocks, drop the advertiser
		{errInvalidChain, true},       // Hash chain was detected as invalid, definitely drop
		{errInvalidBody, false},       // A bad peer was detected, but not the sync origin
		{errCancelHashFetch, false},   // Synchronisation was canceled, origin may be innocent, don't drop
		{errCancelBlockFetch, false},  // Synchronisation was canceled, origin may be innocent, don't drop
		{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
		{errCancelBodyFetch, false},   // Synchronisation was canceled, origin may be innocent, don't drop
665 666 667 668 669 670
	}
	// Run the tests and check disconnection status
	tester := newTester()
	for i, tt := range tests {
		// Register a new peer and ensure it's presence
		id := fmt.Sprintf("test %d", i)
671
		if err := tester.newPeer(id, protocol, []common.Hash{genesis.Hash()}, nil); err != nil {
672 673 674 675 676 677 678 679
			t.Fatalf("test %d: failed to register new peer: %v", i, err)
		}
		if _, ok := tester.peerHashes[id]; !ok {
			t.Fatalf("test %d: registered peer not found", i)
		}
		// Simulate a synchronisation and check the required result
		tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result }

680
		tester.downloader.Synchronise(id, genesis.Hash(), big.NewInt(1000))
681 682 683 684 685
		if _, ok := tester.peerHashes[id]; !ok != tt.drop {
			t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop)
		}
	}
}
686 687

// Tests that feeding bad blocks will result in a peer drop.
688 689 690 691 692 693
func TestBlockBodyAttackerDropping61(t *testing.T) { testBlockBodyAttackerDropping(t, 61) }
func TestBlockBodyAttackerDropping62(t *testing.T) { testBlockBodyAttackerDropping(t, 62) }
func TestBlockBodyAttackerDropping63(t *testing.T) { testBlockBodyAttackerDropping(t, 63) }
func TestBlockBodyAttackerDropping64(t *testing.T) { testBlockBodyAttackerDropping(t, 64) }

func testBlockBodyAttackerDropping(t *testing.T, protocol int) {
694 695 696 697
	// Define the disconnection requirement for individual block import errors
	tests := []struct {
		failure bool
		drop    bool
698 699 700 701
	}{
		{true, true},
		{false, false},
	}
702 703 704 705 706 707

	// Run the tests and check disconnection status
	tester := newTester()
	for i, tt := range tests {
		// Register a new peer and ensure it's presence
		id := fmt.Sprintf("test %d", i)
708
		if err := tester.newPeer(id, protocol, []common.Hash{common.Hash{}}, nil); err != nil {
709 710 711 712 713 714
			t.Fatalf("test %d: failed to register new peer: %v", i, err)
		}
		if _, ok := tester.peerHashes[id]; !ok {
			t.Fatalf("test %d: registered peer not found", i)
		}
		// Assemble a good or bad block, depending of the test
715
		raw := core.GenerateChain(genesis, testdb, 1, nil)[0]
716
		if tt.failure {
717 718
			parent := types.NewBlock(&types.Header{}, nil, nil, nil)
			raw = core.GenerateChain(parent, testdb, 1, nil)[0]
719 720 721 722 723 724 725 726 727 728 729
		}
		block := &Block{OriginPeer: id, RawBlock: raw}

		// Simulate block processing and check the result
		tester.downloader.queue.blockCache[0] = block
		tester.downloader.process()
		if _, ok := tester.peerHashes[id]; !ok != tt.drop {
			t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.failure, !ok, tt.drop)
		}
	}
}