downloader_test.go 18.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package downloader

import (
	"encoding/binary"
	"math/big"
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/event"
12 13
)

14 15 16
var (
	knownHash   = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
	unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9}
17
	bannedHash  = common.Hash{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5}
18
)
19

20
func createHashes(start, amount int) (hashes []common.Hash) {
21 22 23 24
	hashes = make([]common.Hash, amount+1)
	hashes[len(hashes)-1] = knownHash

	for i := range hashes[:len(hashes)-1] {
25
		binary.BigEndian.PutUint64(hashes[i][:8], uint64(start+i+2))
26 27 28 29
	}
	return
}

30
func createBlock(i int, parent, hash common.Hash) *types.Block {
31 32 33
	header := &types.Header{Number: big.NewInt(int64(i))}
	block := types.NewBlockWithHeader(header)
	block.HeaderHash = hash
34
	block.ParentHeaderHash = parent
35 36 37
	return block
}

38 39
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
	blocks := make(map[common.Hash]*types.Block)
40 41 42 43 44 45
	for i := 0; i < len(hashes); i++ {
		parent := knownHash
		if i < len(hashes)-1 {
			parent = hashes[i+1]
		}
		blocks[hashes[i]] = createBlock(len(hashes)-i, parent, hashes[i])
46 47 48 49 50
	}
	return blocks
}

type downloadTester struct {
51 52 53 54 55 56
	downloader *Downloader

	hashes []common.Hash                // Chain of hashes simulating
	blocks map[common.Hash]*types.Block // Blocks associated with the hashes
	chain  []common.Hash                // Block-chain being constructed

57 58
	maxHashFetch int // Overrides the maximum number of retrieved hashes

O
obscuren 已提交
59 60 61
	t            *testing.T
	done         chan bool
	activePeerId string
62 63 64
}

func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester {
65 66 67 68 69 70 71 72 73
	tester := &downloadTester{
		t: t,

		hashes: hashes,
		blocks: blocks,
		chain:  []common.Hash{knownHash},

		done: make(chan bool),
	}
O
obscuren 已提交
74 75
	var mux event.TypeMux
	downloader := New(&mux, tester.hasBlock, tester.getBlock)
76 77 78 79 80
	tester.downloader = downloader

	return tester
}

81 82 83
// sync is a simple wrapper around the downloader to start synchronisation and
// block until it returns
func (dl *downloadTester) sync(peerId string, head common.Hash) error {
O
obscuren 已提交
84
	dl.activePeerId = peerId
85 86 87 88 89 90
	return dl.downloader.Synchronise(peerId, head)
}

// syncTake is starts synchronising with a remote peer, but concurrently it also
// starts fetching blocks that the downloader retrieved. IT blocks until both go
// routines terminate.
91
func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, error) {
92 93
	// Start a block collector to take blocks as they become available
	done := make(chan struct{})
94
	took := []*Block{}
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
	go func() {
		for running := true; running; {
			select {
			case <-done:
				running = false
			default:
				time.Sleep(time.Millisecond)
			}
			// Take a batch of blocks and accumulate
			took = append(took, dl.downloader.TakeBlocks()...)
		}
		done <- struct{}{}
	}()
	// Start the downloading, sync the taker and return
	err := dl.sync(peerId, head)

	done <- struct{}{}
	<-done

	return took, err
O
obscuren 已提交
115 116
}

117
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
118 119 120 121
	for _, h := range dl.chain {
		if h == hash {
			return true
		}
122 123 124 125
	}
	return false
}

126 127
func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
	return dl.blocks[knownHash]
128 129
}

130 131
// getHashes retrieves a batch of hashes for reconstructing the chain.
func (dl *downloadTester) getHashes(head common.Hash) error {
132
	limit := MaxHashFetch
133 134 135
	if dl.maxHashFetch > 0 {
		limit = dl.maxHashFetch
	}
136
	// Gather the next batch of hashes
137
	hashes := make([]common.Hash, 0, limit)
138 139
	for i, hash := range dl.hashes {
		if hash == head {
140
			i++
141 142 143 144 145 146 147 148
			for len(hashes) < cap(hashes) && i < len(dl.hashes) {
				hashes = append(hashes, dl.hashes[i])
				i++
			}
			break
		}
	}
	// Delay delivery a bit to allow attacks to unfold
149 150 151 152 153
	id := dl.activePeerId
	go func() {
		time.Sleep(time.Millisecond)
		dl.downloader.DeliverHashes(id, hashes)
	}()
154 155 156 157 158
	return nil
}

func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
	return func(hashes []common.Hash) error {
159 160 161 162 163
		blocks := make([]*types.Block, 0, len(hashes))
		for _, hash := range hashes {
			if block, ok := dl.blocks[hash]; ok {
				blocks = append(blocks, block)
			}
164
		}
165
		go dl.downloader.DeliverBlocks(id, blocks)
166 167 168 169 170 171

		return nil
	}
}

func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) {
O
obscuren 已提交
172
	dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id))
173 174
}

175 176 177 178
// Tests that simple synchronization, without throttling from a good peer works.
func TestSynchronisation(t *testing.T) {
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
179
	hashes := createHashes(0, targetBlocks)
180 181
	blocks := createBlocksFromHashes(hashes)

182
	tester := newTester(t, hashes, blocks)
183
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
184

185 186 187
	// Synchronise with the peer and make sure all blocks were retrieved
	if err := tester.sync("peer", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
188
	}
189 190
	if queued := len(tester.downloader.queue.blockCache); queued != targetBlocks {
		t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks)
191 192 193
	}
}

194 195 196 197
// Tests that the synchronized blocks can be correctly retrieved.
func TestBlockTaking(t *testing.T) {
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
198 199 200
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)

201 202
	tester := newTester(t, hashes, blocks)
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
203

204 205 206
	// Synchronise with the peer and test block retrieval
	if err := tester.sync("peer", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
207
	}
208 209
	if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks {
		t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks)
210
	}
211
}
212

213
// Tests that an inactive downloader will not accept incoming hashes and blocks.
214
func TestInactiveDownloader(t *testing.T) {
215 216
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
217 218 219
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashSet(createHashSet(hashes))

220
	tester := newTester(t, nil, nil)
221

222 223 224 225 226 227
	// Check that neither hashes nor blocks are accepted
	if err := tester.downloader.DeliverHashes("bad peer", hashes); err != errNoSyncActive {
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
	}
	if err := tester.downloader.DeliverBlocks("bad peer", blocks); err != errNoSyncActive {
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
228 229 230
	}
}

231
// Tests that a canceled download wipes all previously accumulated state.
232
func TestCancel(t *testing.T) {
233 234
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
235 236 237
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)

238 239
	tester := newTester(t, hashes, blocks)
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
240

241 242 243
	// Synchronise with the peer, but cancel afterwards
	if err := tester.sync("peer", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
244 245
	}
	if !tester.downloader.Cancel() {
246
		t.Fatalf("cancel operation failed")
247
	}
248 249 250 251 252 253 254
	// Make sure the queue reports empty and no blocks can be taken
	hashCount, blockCount := tester.downloader.queue.Size()
	if hashCount > 0 || blockCount > 0 {
		t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
	}
	if took := tester.downloader.TakeBlocks(); len(took) != 0 {
		t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0)
255 256 257
	}
}

258 259
// Tests that if a large batch of blocks are being downloaded, it is throttled
// until the cached blocks are retrieved.
260
func TestThrottling(t *testing.T) {
261 262
	// Create a long block chain to download and the tester
	targetBlocks := 8 * blockCacheLimit
263 264 265
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)

266 267
	tester := newTester(t, hashes, blocks)
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
268

269 270 271 272 273 274 275 276
	// Start a synchronisation concurrently
	errc := make(chan error)
	go func() {
		errc <- tester.sync("peer", hashes[0])
	}()
	// Iteratively take some blocks, always checking the retrieval count
	for total := 0; total < targetBlocks; {
		// Sleep a bit for sync to complete
277
		time.Sleep(500 * time.Millisecond)
278 279 280 281 282 283 284 285 286 287

		// Fetch the next batch of blocks
		took := tester.downloader.TakeBlocks()
		if len(took) != blockCacheLimit {
			t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit)
		}
		total += len(took)
		if total > targetBlocks {
			t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks)
		}
288
	}
289 290
	if err := <-errc; err != nil {
		t.Fatalf("block synchronization failed: %v", err)
291 292
	}
}
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309

// Tests that if a peer returns an invalid chain with a block pointing to a non-
// existing parent, it is correctly detected and handled.
func TestNonExistingParentAttack(t *testing.T) {
	// Forge a single-link chain with a forged header
	hashes := createHashes(0, 1)
	blocks := createBlocksFromHashes(hashes)

	forged := blocks[hashes[0]]
	forged.ParentHeaderHash = unknownHash

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, blocks)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if err := tester.sync("attack", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
310 311 312
	bs := tester.downloader.TakeBlocks()
	if len(bs) != 1 {
		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
313
	}
314
	if tester.hasBlock(bs[0].RawBlock.ParentHash()) {
315
		t.Fatalf("tester knows about the unknown hash")
316 317 318 319 320 321 322 323 324
	}
	tester.downloader.Cancel()

	// Reconstruct a valid chain, and try to synchronize with it
	forged.ParentHeaderHash = knownHash
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
325
	bs = tester.downloader.TakeBlocks()
326
	if len(bs) != 1 {
327
		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
328
	}
329
	if !tester.hasBlock(bs[0].RawBlock.ParentHash()) {
330 331
		t.Fatalf("tester doesn't know about the origin hash")
	}
332
}
333 334 335 336 337

// Tests that if a malicious peers keeps sending us repeating hashes, we don't
// loop indefinitely.
func TestRepeatingHashAttack(t *testing.T) {
	// Create a valid chain, but drop the last link
338
	hashes := createHashes(0, blockCacheLimit)
339
	blocks := createBlocksFromHashes(hashes)
340
	forged := hashes[:len(hashes)-1]
341 342

	// Try and sync with the malicious node
343 344
	tester := newTester(t, forged, blocks)
	tester.newPeer("attack", big.NewInt(10000), forged[0])
345 346 347 348 349 350 351 352

	errc := make(chan error)
	go func() {
		errc <- tester.sync("attack", hashes[0])
	}()

	// Make sure that syncing returns and does so with a failure
	select {
353
	case <-time.After(time.Second):
354 355 356 357 358 359
		t.Fatalf("synchronisation blocked")
	case err := <-errc:
		if err == nil {
			t.Fatalf("synchronisation succeeded")
		}
	}
360 361 362 363 364 365
	// Ensure that a valid chain can still pass sync
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
366
}
367 368 369 370 371

// Tests that if a malicious peers returns a non-existent block hash, it should
// eventually time out and the sync reattempted.
func TestNonExistingBlockAttack(t *testing.T) {
	// Create a valid chain, but forge the last link
372
	hashes := createHashes(0, blockCacheLimit)
373
	blocks := createBlocksFromHashes(hashes)
374
	origin := hashes[len(hashes)/2]
375 376 377 378 379 380 381 382 383

	hashes[len(hashes)/2] = unknownHash

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, blocks)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable)
	}
384 385 386 387 388 389
	// Ensure that a valid chain can still pass sync
	hashes[len(hashes)/2] = origin
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
390
}
391 392 393 394 395 396 397 398

// Tests that if a malicious peer is returning hashes in a weird order, that the
// sync throttler doesn't choke on them waiting for the valid blocks.
func TestInvalidHashOrderAttack(t *testing.T) {
	// Create a valid long chain, but reverse some hashes within
	hashes := createHashes(0, 4*blockCacheLimit)
	blocks := createBlocksFromHashes(hashes)

399 400 401 402 403
	chunk1 := make([]common.Hash, blockCacheLimit)
	chunk2 := make([]common.Hash, blockCacheLimit)
	copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit])
	copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit])

404 405
	reverse := make([]common.Hash, len(hashes))
	copy(reverse, hashes)
406 407
	copy(reverse[2*blockCacheLimit:], chunk1)
	copy(reverse[blockCacheLimit:], chunk2)
408 409 410 411 412 413 414 415 416 417 418 419 420 421

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, reverse, blocks)
	tester.newPeer("attack", big.NewInt(10000), reverse[0])
	if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
	}
	// Ensure that a valid chain can still pass sync
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if _, err := tester.syncTake("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
}
422 423 424 425

// Tests that if a malicious peer makes up a random hash chain and tries to push
// indefinitely, it actually gets caught with it.
func TestMadeupHashChainAttack(t *testing.T) {
426
	blockSoftTTL = 100 * time.Millisecond
427 428 429 430 431 432 433 434 435 436 437 438
	crossCheckCycle = 25 * time.Millisecond

	// Create a long chain of hashes without backing blocks
	hashes := createHashes(0, 1024*blockCacheLimit)

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, nil)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
	}
}
439

440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
// Tests that if a malicious peer makes up a random hash chain, and tries to push
// indefinitely, one hash at a time, it actually gets caught with it. The reason
// this is separate from the classical made up chain attack is that sending hashes
// one by one prevents reliable block/parent verification.
func TestMadeupHashChainDrippingAttack(t *testing.T) {
	// Create a random chain of hashes to drip
	hashes := createHashes(0, 16*blockCacheLimit)
	tester := newTester(t, hashes, nil)

	// Try and sync with the attacker, one hash at a time
	tester.maxHashFetch = 1
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer)
	}
}

457 458 459
// Tests that if a malicious peer makes up a random block chain, and tried to
// push indefinitely, it actually gets caught with it.
func TestMadeupBlockChainAttack(t *testing.T) {
460
	defaultBlockTTL := blockSoftTTL
461 462
	defaultCrossCheckCycle := crossCheckCycle

463
	blockSoftTTL = 100 * time.Millisecond
464 465 466
	crossCheckCycle = 25 * time.Millisecond

	// Create a long chain of blocks and simulate an invalid chain by dropping every second
467
	hashes := createHashes(0, 16*blockCacheLimit)
468 469 470 471 472 473 474 475 476 477 478 479 480
	blocks := createBlocksFromHashes(hashes)

	gapped := make([]common.Hash, len(hashes)/2)
	for i := 0; i < len(gapped); i++ {
		gapped[i] = hashes[2*i]
	}
	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, gapped, blocks)
	tester.newPeer("attack", big.NewInt(10000), gapped[0])
	if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
	}
	// Ensure that a valid chain can still pass sync
481
	blockSoftTTL = defaultBlockTTL
482 483
	crossCheckCycle = defaultCrossCheckCycle

484 485 486 487 488 489
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if _, err := tester.syncTake("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
}
490 491 492 493 494

// Advanced form of the above forged blockchain attack, where not only does the
// attacker make up a valid hashes for random blocks, but also forges the block
// parents to point to existing hashes.
func TestMadeupParentBlockChainAttack(t *testing.T) {
495
	defaultBlockTTL := blockSoftTTL
496 497
	defaultCrossCheckCycle := crossCheckCycle

498
	blockSoftTTL = 100 * time.Millisecond
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
	crossCheckCycle = 25 * time.Millisecond

	// Create a long chain of blocks and simulate an invalid chain by dropping every second
	hashes := createHashes(0, 16*blockCacheLimit)
	blocks := createBlocksFromHashes(hashes)
	forges := createBlocksFromHashes(hashes)
	for hash, block := range forges {
		block.ParentHeaderHash = hash // Simulate pointing to already known hash
	}
	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, forges)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
	}
	// Ensure that a valid chain can still pass sync
515
	blockSoftTTL = defaultBlockTTL
516 517 518 519 520 521 522 523
	crossCheckCycle = defaultCrossCheckCycle

	tester.blocks = blocks
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if _, err := tester.syncTake("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
}
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557

// Tests that if one/multiple malicious peers try to feed a banned blockchain to
// the downloader, it will not keep refetching the same chain indefinitely, but
// gradually block pieces of it, until it's head is also blocked.
func TestBannedChainStarvationAttack(t *testing.T) {
	// Construct a valid chain, but ban one of the hashes in it
	hashes := createHashes(0, 8*blockCacheLimit)
	hashes[len(hashes)/2+23] = bannedHash // weird index to have non multiple of ban chunk size

	blocks := createBlocksFromHashes(hashes)

	// Create the tester and ban the selected hash
	tester := newTester(t, hashes, blocks)
	tester.downloader.banned.Add(bannedHash)

	// Iteratively try to sync, and verify that the banned hash list grows until
	// the head of the invalid chain is blocked too.
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	for banned := tester.downloader.banned.Size(); ; {
		// Try to sync with the attacker, check hash chain failure
		if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain {
			t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
		}
		// Check that the ban list grew with at least 1 new item, or all banned
		bans := tester.downloader.banned.Size()
		if bans < banned+1 {
			if tester.downloader.banned.Has(hashes[0]) {
				break
			}
			t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1)
		}
		banned = bans
	}
}