downloader_test.go 19.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package downloader

import (
	"encoding/binary"
	"math/big"
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/event"
12 13
)

14 15 16
var (
	knownHash   = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
	unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9}
17
	bannedHash  = common.Hash{5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5}
18
)
19

20
func createHashes(start, amount int) (hashes []common.Hash) {
21 22 23 24
	hashes = make([]common.Hash, amount+1)
	hashes[len(hashes)-1] = knownHash

	for i := range hashes[:len(hashes)-1] {
25
		binary.BigEndian.PutUint64(hashes[i][:8], uint64(start+i+2))
26 27 28 29
	}
	return
}

30
func createBlock(i int, parent, hash common.Hash) *types.Block {
31 32 33
	header := &types.Header{Number: big.NewInt(int64(i))}
	block := types.NewBlockWithHeader(header)
	block.HeaderHash = hash
34
	block.ParentHeaderHash = parent
35 36 37
	return block
}

38 39
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
	blocks := make(map[common.Hash]*types.Block)
40 41 42 43 44 45
	for i := 0; i < len(hashes); i++ {
		parent := knownHash
		if i < len(hashes)-1 {
			parent = hashes[i+1]
		}
		blocks[hashes[i]] = createBlock(len(hashes)-i, parent, hashes[i])
46 47 48 49 50
	}
	return blocks
}

type downloadTester struct {
51 52 53 54 55 56
	downloader *Downloader

	hashes []common.Hash                // Chain of hashes simulating
	blocks map[common.Hash]*types.Block // Blocks associated with the hashes
	chain  []common.Hash                // Block-chain being constructed

57 58
	maxHashFetch int // Overrides the maximum number of retrieved hashes

O
obscuren 已提交
59 60 61
	t            *testing.T
	done         chan bool
	activePeerId string
62 63 64
}

func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester {
65 66 67 68 69 70 71 72 73
	tester := &downloadTester{
		t: t,

		hashes: hashes,
		blocks: blocks,
		chain:  []common.Hash{knownHash},

		done: make(chan bool),
	}
O
obscuren 已提交
74 75
	var mux event.TypeMux
	downloader := New(&mux, tester.hasBlock, tester.getBlock)
76 77 78 79 80
	tester.downloader = downloader

	return tester
}

81 82 83
// sync is a simple wrapper around the downloader to start synchronisation and
// block until it returns
func (dl *downloadTester) sync(peerId string, head common.Hash) error {
O
obscuren 已提交
84
	dl.activePeerId = peerId
85 86 87 88 89 90
	return dl.downloader.Synchronise(peerId, head)
}

// syncTake is starts synchronising with a remote peer, but concurrently it also
// starts fetching blocks that the downloader retrieved. IT blocks until both go
// routines terminate.
91
func (dl *downloadTester) syncTake(peerId string, head common.Hash) ([]*Block, error) {
92 93
	// Start a block collector to take blocks as they become available
	done := make(chan struct{})
94
	took := []*Block{}
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
	go func() {
		for running := true; running; {
			select {
			case <-done:
				running = false
			default:
				time.Sleep(time.Millisecond)
			}
			// Take a batch of blocks and accumulate
			took = append(took, dl.downloader.TakeBlocks()...)
		}
		done <- struct{}{}
	}()
	// Start the downloading, sync the taker and return
	err := dl.sync(peerId, head)

	done <- struct{}{}
	<-done

	return took, err
O
obscuren 已提交
115 116
}

117
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
118 119 120 121
	for _, h := range dl.chain {
		if h == hash {
			return true
		}
122 123 124 125
	}
	return false
}

126 127
func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
	return dl.blocks[knownHash]
128 129
}

130 131
// getHashes retrieves a batch of hashes for reconstructing the chain.
func (dl *downloadTester) getHashes(head common.Hash) error {
132
	limit := MaxHashFetch
133 134 135
	if dl.maxHashFetch > 0 {
		limit = dl.maxHashFetch
	}
136
	// Gather the next batch of hashes
137
	hashes := make([]common.Hash, 0, limit)
138 139
	for i, hash := range dl.hashes {
		if hash == head {
140
			i++
141 142 143 144 145 146 147 148
			for len(hashes) < cap(hashes) && i < len(dl.hashes) {
				hashes = append(hashes, dl.hashes[i])
				i++
			}
			break
		}
	}
	// Delay delivery a bit to allow attacks to unfold
149 150 151 152 153
	id := dl.activePeerId
	go func() {
		time.Sleep(time.Millisecond)
		dl.downloader.DeliverHashes(id, hashes)
	}()
154 155 156 157 158
	return nil
}

func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
	return func(hashes []common.Hash) error {
159 160 161 162 163
		blocks := make([]*types.Block, 0, len(hashes))
		for _, hash := range hashes {
			if block, ok := dl.blocks[hash]; ok {
				blocks = append(blocks, block)
			}
164
		}
165
		go dl.downloader.DeliverBlocks(id, blocks)
166 167 168 169 170 171

		return nil
	}
}

func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) {
O
obscuren 已提交
172
	dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id))
173 174
}

175 176 177 178
// Tests that simple synchronization, without throttling from a good peer works.
func TestSynchronisation(t *testing.T) {
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
179
	hashes := createHashes(0, targetBlocks)
180 181
	blocks := createBlocksFromHashes(hashes)

182
	tester := newTester(t, hashes, blocks)
183
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
184

185 186 187
	// Synchronise with the peer and make sure all blocks were retrieved
	if err := tester.sync("peer", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
188
	}
189
	if queued := len(tester.downloader.queue.blockPool); queued != targetBlocks {
190
		t.Fatalf("synchronised block mismatch: have %v, want %v", queued, targetBlocks)
191 192 193
	}
}

194 195 196 197
// Tests that the synchronized blocks can be correctly retrieved.
func TestBlockTaking(t *testing.T) {
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
198 199 200
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)

201 202
	tester := newTester(t, hashes, blocks)
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
203

204 205 206
	// Synchronise with the peer and test block retrieval
	if err := tester.sync("peer", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
207
	}
208 209
	if took := tester.downloader.TakeBlocks(); len(took) != targetBlocks {
		t.Fatalf("took block mismatch: have %v, want %v", len(took), targetBlocks)
210
	}
211
}
212

213
// Tests that an inactive downloader will not accept incoming hashes and blocks.
214
func TestInactiveDownloader(t *testing.T) {
215 216
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
217 218 219
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashSet(createHashSet(hashes))

220
	tester := newTester(t, nil, nil)
221

222 223 224 225 226 227
	// Check that neither hashes nor blocks are accepted
	if err := tester.downloader.DeliverHashes("bad peer", hashes); err != errNoSyncActive {
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
	}
	if err := tester.downloader.DeliverBlocks("bad peer", blocks); err != errNoSyncActive {
		t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
228 229 230
	}
}

231
// Tests that a canceled download wipes all previously accumulated state.
232
func TestCancel(t *testing.T) {
233 234
	// Create a small enough block chain to download and the tester
	targetBlocks := blockCacheLimit - 15
235 236 237
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)

238 239
	tester := newTester(t, hashes, blocks)
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
240

241 242 243
	// Synchronise with the peer, but cancel afterwards
	if err := tester.sync("peer", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
244 245
	}
	if !tester.downloader.Cancel() {
246
		t.Fatalf("cancel operation failed")
247
	}
248 249 250 251 252 253 254
	// Make sure the queue reports empty and no blocks can be taken
	hashCount, blockCount := tester.downloader.queue.Size()
	if hashCount > 0 || blockCount > 0 {
		t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
	}
	if took := tester.downloader.TakeBlocks(); len(took) != 0 {
		t.Errorf("taken blocks mismatch: have %d, want %d", len(took), 0)
255 256 257
	}
}

258 259
// Tests that if a large batch of blocks are being downloaded, it is throttled
// until the cached blocks are retrieved.
260
func TestThrottling(t *testing.T) {
261 262
	// Create a long block chain to download and the tester
	targetBlocks := 8 * blockCacheLimit
263 264 265
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)

266 267
	tester := newTester(t, hashes, blocks)
	tester.newPeer("peer", big.NewInt(10000), hashes[0])
268

269 270 271 272 273 274 275
	// Start a synchronisation concurrently
	errc := make(chan error)
	go func() {
		errc <- tester.sync("peer", hashes[0])
	}()
	// Iteratively take some blocks, always checking the retrieval count
	for total := 0; total < targetBlocks; {
276 277 278 279 280 281 282
		// Wait a bit for sync to complete
		for start := time.Now(); time.Since(start) < 3*time.Second; {
			time.Sleep(25 * time.Millisecond)
			if len(tester.downloader.queue.blockPool) == blockCacheLimit {
				break
			}
		}
283 284 285 286 287 288 289 290 291
		// Fetch the next batch of blocks
		took := tester.downloader.TakeBlocks()
		if len(took) != blockCacheLimit {
			t.Fatalf("block count mismatch: have %v, want %v", len(took), blockCacheLimit)
		}
		total += len(took)
		if total > targetBlocks {
			t.Fatalf("target block count mismatch: have %v, want %v", total, targetBlocks)
		}
292
	}
293 294
	if err := <-errc; err != nil {
		t.Fatalf("block synchronization failed: %v", err)
295 296
	}
}
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313

// Tests that if a peer returns an invalid chain with a block pointing to a non-
// existing parent, it is correctly detected and handled.
func TestNonExistingParentAttack(t *testing.T) {
	// Forge a single-link chain with a forged header
	hashes := createHashes(0, 1)
	blocks := createBlocksFromHashes(hashes)

	forged := blocks[hashes[0]]
	forged.ParentHeaderHash = unknownHash

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, blocks)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if err := tester.sync("attack", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
314 315 316
	bs := tester.downloader.TakeBlocks()
	if len(bs) != 1 {
		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
317
	}
318
	if tester.hasBlock(bs[0].RawBlock.ParentHash()) {
319
		t.Fatalf("tester knows about the unknown hash")
320 321 322 323 324 325 326 327 328
	}
	tester.downloader.Cancel()

	// Reconstruct a valid chain, and try to synchronize with it
	forged.ParentHeaderHash = knownHash
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
329
	bs = tester.downloader.TakeBlocks()
330
	if len(bs) != 1 {
331
		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
332
	}
333
	if !tester.hasBlock(bs[0].RawBlock.ParentHash()) {
334 335
		t.Fatalf("tester doesn't know about the origin hash")
	}
336
}
337 338 339 340 341

// Tests that if a malicious peers keeps sending us repeating hashes, we don't
// loop indefinitely.
func TestRepeatingHashAttack(t *testing.T) {
	// Create a valid chain, but drop the last link
342
	hashes := createHashes(0, blockCacheLimit)
343
	blocks := createBlocksFromHashes(hashes)
344
	forged := hashes[:len(hashes)-1]
345 346

	// Try and sync with the malicious node
347 348
	tester := newTester(t, forged, blocks)
	tester.newPeer("attack", big.NewInt(10000), forged[0])
349 350 351 352 353 354 355 356

	errc := make(chan error)
	go func() {
		errc <- tester.sync("attack", hashes[0])
	}()

	// Make sure that syncing returns and does so with a failure
	select {
357
	case <-time.After(time.Second):
358 359 360 361 362 363
		t.Fatalf("synchronisation blocked")
	case err := <-errc:
		if err == nil {
			t.Fatalf("synchronisation succeeded")
		}
	}
364 365 366 367 368 369
	// Ensure that a valid chain can still pass sync
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
370
}
371 372 373 374 375

// Tests that if a malicious peers returns a non-existent block hash, it should
// eventually time out and the sync reattempted.
func TestNonExistingBlockAttack(t *testing.T) {
	// Create a valid chain, but forge the last link
376
	hashes := createHashes(0, blockCacheLimit)
377
	blocks := createBlocksFromHashes(hashes)
378
	origin := hashes[len(hashes)/2]
379 380 381 382 383 384 385 386 387

	hashes[len(hashes)/2] = unknownHash

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, blocks)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable)
	}
388 389 390 391 392 393
	// Ensure that a valid chain can still pass sync
	hashes[len(hashes)/2] = origin
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
394
}
395 396 397 398 399 400 401 402

// Tests that if a malicious peer is returning hashes in a weird order, that the
// sync throttler doesn't choke on them waiting for the valid blocks.
func TestInvalidHashOrderAttack(t *testing.T) {
	// Create a valid long chain, but reverse some hashes within
	hashes := createHashes(0, 4*blockCacheLimit)
	blocks := createBlocksFromHashes(hashes)

403 404 405 406 407
	chunk1 := make([]common.Hash, blockCacheLimit)
	chunk2 := make([]common.Hash, blockCacheLimit)
	copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit])
	copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit])

408 409
	reverse := make([]common.Hash, len(hashes))
	copy(reverse, hashes)
410 411
	copy(reverse[2*blockCacheLimit:], chunk1)
	copy(reverse[blockCacheLimit:], chunk2)
412 413 414 415 416 417 418 419 420 421 422 423 424 425

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, reverse, blocks)
	tester.newPeer("attack", big.NewInt(10000), reverse[0])
	if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
	}
	// Ensure that a valid chain can still pass sync
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if _, err := tester.syncTake("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
}
426 427 428 429

// Tests that if a malicious peer makes up a random hash chain and tries to push
// indefinitely, it actually gets caught with it.
func TestMadeupHashChainAttack(t *testing.T) {
430
	blockSoftTTL = 100 * time.Millisecond
431 432 433 434 435 436 437 438 439 440 441 442
	crossCheckCycle = 25 * time.Millisecond

	// Create a long chain of hashes without backing blocks
	hashes := createHashes(0, 1024*blockCacheLimit)

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, nil)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
	}
}
443

444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460
// Tests that if a malicious peer makes up a random hash chain, and tries to push
// indefinitely, one hash at a time, it actually gets caught with it. The reason
// this is separate from the classical made up chain attack is that sending hashes
// one by one prevents reliable block/parent verification.
func TestMadeupHashChainDrippingAttack(t *testing.T) {
	// Create a random chain of hashes to drip
	hashes := createHashes(0, 16*blockCacheLimit)
	tester := newTester(t, hashes, nil)

	// Try and sync with the attacker, one hash at a time
	tester.maxHashFetch = 1
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer)
	}
}

461 462 463
// Tests that if a malicious peer makes up a random block chain, and tried to
// push indefinitely, it actually gets caught with it.
func TestMadeupBlockChainAttack(t *testing.T) {
464
	defaultBlockTTL := blockSoftTTL
465 466
	defaultCrossCheckCycle := crossCheckCycle

467
	blockSoftTTL = 100 * time.Millisecond
468 469 470
	crossCheckCycle = 25 * time.Millisecond

	// Create a long chain of blocks and simulate an invalid chain by dropping every second
471
	hashes := createHashes(0, 16*blockCacheLimit)
472 473 474 475 476 477 478 479 480 481 482 483 484
	blocks := createBlocksFromHashes(hashes)

	gapped := make([]common.Hash, len(hashes)/2)
	for i := 0; i < len(gapped); i++ {
		gapped[i] = hashes[2*i]
	}
	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, gapped, blocks)
	tester.newPeer("attack", big.NewInt(10000), gapped[0])
	if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
	}
	// Ensure that a valid chain can still pass sync
485
	blockSoftTTL = defaultBlockTTL
486 487
	crossCheckCycle = defaultCrossCheckCycle

488 489 490 491 492 493
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if _, err := tester.syncTake("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
}
494 495 496 497 498

// Advanced form of the above forged blockchain attack, where not only does the
// attacker make up a valid hashes for random blocks, but also forges the block
// parents to point to existing hashes.
func TestMadeupParentBlockChainAttack(t *testing.T) {
499
	defaultBlockTTL := blockSoftTTL
500 501
	defaultCrossCheckCycle := crossCheckCycle

502
	blockSoftTTL = 100 * time.Millisecond
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
	crossCheckCycle = 25 * time.Millisecond

	// Create a long chain of blocks and simulate an invalid chain by dropping every second
	hashes := createHashes(0, 16*blockCacheLimit)
	blocks := createBlocksFromHashes(hashes)
	forges := createBlocksFromHashes(hashes)
	for hash, block := range forges {
		block.ParentHeaderHash = hash // Simulate pointing to already known hash
	}
	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, forges)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
	}
	// Ensure that a valid chain can still pass sync
519
	blockSoftTTL = defaultBlockTTL
520 521 522 523 524 525 526 527
	crossCheckCycle = defaultCrossCheckCycle

	tester.blocks = blocks
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if _, err := tester.syncTake("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
}
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561

// Tests that if one/multiple malicious peers try to feed a banned blockchain to
// the downloader, it will not keep refetching the same chain indefinitely, but
// gradually block pieces of it, until it's head is also blocked.
func TestBannedChainStarvationAttack(t *testing.T) {
	// Construct a valid chain, but ban one of the hashes in it
	hashes := createHashes(0, 8*blockCacheLimit)
	hashes[len(hashes)/2+23] = bannedHash // weird index to have non multiple of ban chunk size

	blocks := createBlocksFromHashes(hashes)

	// Create the tester and ban the selected hash
	tester := newTester(t, hashes, blocks)
	tester.downloader.banned.Add(bannedHash)

	// Iteratively try to sync, and verify that the banned hash list grows until
	// the head of the invalid chain is blocked too.
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	for banned := tester.downloader.banned.Size(); ; {
		// Try to sync with the attacker, check hash chain failure
		if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain {
			t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
		}
		// Check that the ban list grew with at least 1 new item, or all banned
		bans := tester.downloader.banned.Size()
		if bans < banned+1 {
			if tester.downloader.banned.Has(hashes[0]) {
				break
			}
			t.Fatalf("ban count mismatch: have %v, want %v+", bans, banned+1)
		}
		banned = bans
	}
}