downloader_test.go 12.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
package downloader

import (
	"encoding/binary"
	"math/big"
	"testing"
	"time"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/event"
12 13
)

14 15 16 17
var (
	knownHash   = common.Hash{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
	unknownHash = common.Hash{9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9}
)
18

19
func createHashes(start, amount int) (hashes []common.Hash) {
20 21 22 23 24 25 26 27 28 29
	hashes = make([]common.Hash, amount+1)
	hashes[len(hashes)-1] = knownHash

	for i := range hashes[:len(hashes)-1] {
		binary.BigEndian.PutUint64(hashes[i][:8], uint64(i+2))
	}

	return
}

30 31 32 33
func createBlock(i int, prevHash, hash common.Hash) *types.Block {
	header := &types.Header{Number: big.NewInt(int64(i))}
	block := types.NewBlockWithHeader(header)
	block.HeaderHash = hash
34
	block.ParentHeaderHash = prevHash
35 36 37
	return block
}

38 39
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
	blocks := make(map[common.Hash]*types.Block)
40

41
	for i, hash := range hashes {
42
		blocks[hash] = createBlock(len(hashes)-i, knownHash, hash)
43 44 45 46 47 48
	}

	return blocks
}

type downloadTester struct {
49 50 51 52 53 54
	downloader *Downloader

	hashes []common.Hash                // Chain of hashes simulating
	blocks map[common.Hash]*types.Block // Blocks associated with the hashes
	chain  []common.Hash                // Block-chain being constructed

O
obscuren 已提交
55 56 57 58
	t            *testing.T
	pcount       int
	done         chan bool
	activePeerId string
59 60 61
}

func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types.Block) *downloadTester {
62 63 64 65 66 67 68 69 70
	tester := &downloadTester{
		t: t,

		hashes: hashes,
		blocks: blocks,
		chain:  []common.Hash{knownHash},

		done: make(chan bool),
	}
O
obscuren 已提交
71 72
	var mux event.TypeMux
	downloader := New(&mux, tester.hasBlock, tester.getBlock)
73 74 75 76 77
	tester.downloader = downloader

	return tester
}

78 79 80
// sync is a simple wrapper around the downloader to start synchronisation and
// block until it returns
func (dl *downloadTester) sync(peerId string, head common.Hash) error {
O
obscuren 已提交
81
	dl.activePeerId = peerId
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
	return dl.downloader.Synchronise(peerId, head)
}

// syncTake is starts synchronising with a remote peer, but concurrently it also
// starts fetching blocks that the downloader retrieved. IT blocks until both go
// routines terminate.
func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) {
	// Start a block collector to take blocks as they become available
	done := make(chan struct{})
	took := []*types.Block{}
	go func() {
		for running := true; running; {
			select {
			case <-done:
				running = false
			default:
				time.Sleep(time.Millisecond)
			}
			// Take a batch of blocks and accumulate
			took = append(took, dl.downloader.TakeBlocks()...)
		}
		done <- struct{}{}
	}()
	// Start the downloading, sync the taker and return
	err := dl.sync(peerId, head)

	done <- struct{}{}
	<-done

	return took, err
O
obscuren 已提交
112 113
}

114 115 116 117 118 119
func (dl *downloadTester) insertBlocks(blocks types.Blocks) {
	for _, block := range blocks {
		dl.chain = append(dl.chain, block.Hash())
	}
}

120
func (dl *downloadTester) hasBlock(hash common.Hash) bool {
121 122 123 124
	for _, h := range dl.chain {
		if h == hash {
			return true
		}
125 126 127 128
	}
	return false
}

129 130
func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
	return dl.blocks[knownHash]
131 132 133
}

func (dl *downloadTester) getHashes(hash common.Hash) error {
134
	dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes)
135 136 137 138 139
	return nil
}

func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
	return func(hashes []common.Hash) error {
140 141 142 143 144
		blocks := make([]*types.Block, 0, len(hashes))
		for _, hash := range hashes {
			if block, ok := dl.blocks[hash]; ok {
				blocks = append(blocks, block)
			}
145
		}
146
		go dl.downloader.DeliverBlocks(id, blocks)
147 148 149 150 151 152 153 154

		return nil
	}
}

func (dl *downloadTester) newPeer(id string, td *big.Int, hash common.Hash) {
	dl.pcount++

O
obscuren 已提交
155
	dl.downloader.RegisterPeer(id, hash, dl.getHashes, dl.getBlocks(id))
156 157 158 159 160 161
}

func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash) {
	dl.pcount++

	// This bad peer never returns any blocks
O
obscuren 已提交
162
	dl.downloader.RegisterPeer(id, hash, dl.getHashes, func([]common.Hash) error {
163 164 165 166 167
		return nil
	})
}

func TestDownload(t *testing.T) {
168
	minDesiredPeerCount = 4
O
obscuren 已提交
169
	blockTtl = 1 * time.Second
170

171 172
	targetBlocks := 1000
	hashes := createHashes(0, targetBlocks)
173 174 175
	blocks := createBlocksFromHashes(hashes)
	tester := newTester(t, hashes, blocks)

176
	tester.newPeer("peer1", big.NewInt(10000), hashes[0])
177 178 179
	tester.newPeer("peer2", big.NewInt(0), common.Hash{})
	tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
	tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
O
obscuren 已提交
180
	tester.activePeerId = "peer1"
181

O
obscuren 已提交
182
	err := tester.sync("peer1", hashes[0])
183 184 185 186
	if err != nil {
		t.Error("download error", err)
	}

187
	inqueue := len(tester.downloader.queue.blockCache)
188 189
	if inqueue != targetBlocks {
		t.Error("expected", targetBlocks, "have", inqueue)
190 191
	}
}
192 193

func TestMissing(t *testing.T) {
194
	targetBlocks := 1000
195 196 197 198 199 200 201 202 203 204
	hashes := createHashes(0, 1000)
	extraHashes := createHashes(1001, 1003)
	blocks := createBlocksFromHashes(append(extraHashes, hashes...))
	tester := newTester(t, hashes, blocks)

	tester.newPeer("peer1", big.NewInt(10000), hashes[len(hashes)-1])

	hashes = append(extraHashes, hashes[:len(hashes)-1]...)
	tester.newPeer("peer2", big.NewInt(0), common.Hash{})

O
obscuren 已提交
205
	err := tester.sync("peer1", hashes[0])
206 207
	if err != nil {
		t.Error("download error", err)
208 209
	}

210
	inqueue := len(tester.downloader.queue.blockCache)
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
	if inqueue != targetBlocks {
		t.Error("expected", targetBlocks, "have", inqueue)
	}
}

func TestTaking(t *testing.T) {
	minDesiredPeerCount = 4
	blockTtl = 1 * time.Second

	targetBlocks := 1000
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)
	tester := newTester(t, hashes, blocks)

	tester.newPeer("peer1", big.NewInt(10000), hashes[0])
	tester.newPeer("peer2", big.NewInt(0), common.Hash{})
	tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
	tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})

O
obscuren 已提交
230
	err := tester.sync("peer1", hashes[0])
231 232 233
	if err != nil {
		t.Error("download error", err)
	}
234
	bs := tester.downloader.TakeBlocks()
235 236
	if len(bs) != targetBlocks {
		t.Error("retrieved block mismatch: have %v, want %v", len(bs), targetBlocks)
237
	}
238
}
239

240 241 242 243 244 245
func TestInactiveDownloader(t *testing.T) {
	targetBlocks := 1000
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashSet(createHashSet(hashes))
	tester := newTester(t, hashes, nil)

246
	err := tester.downloader.DeliverHashes("bad peer 001", hashes)
247 248 249 250
	if err != errNoSyncActive {
		t.Error("expected no sync error, got", err)
	}

251
	err = tester.downloader.DeliverBlocks("bad peer 001", blocks)
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
	if err != errNoSyncActive {
		t.Error("expected no sync error, got", err)
	}
}

func TestCancel(t *testing.T) {
	minDesiredPeerCount = 4
	blockTtl = 1 * time.Second

	targetBlocks := 1000
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)
	tester := newTester(t, hashes, blocks)

	tester.newPeer("peer1", big.NewInt(10000), hashes[0])

	err := tester.sync("peer1", hashes[0])
	if err != nil {
		t.Error("download error", err)
	}

	if !tester.downloader.Cancel() {
		t.Error("cancel operation unsuccessfull")
	}

	hashSize, blockSize := tester.downloader.queue.Size()
	if hashSize > 0 || blockSize > 0 {
		t.Error("block (", blockSize, ") or hash (", hashSize, ") not 0")
	}
}

283 284 285 286
func TestThrottling(t *testing.T) {
	minDesiredPeerCount = 4
	blockTtl = 1 * time.Second

287
	targetBlocks := 16 * blockCacheLimit
288 289 290 291 292 293 294 295 296 297
	hashes := createHashes(0, targetBlocks)
	blocks := createBlocksFromHashes(hashes)
	tester := newTester(t, hashes, blocks)

	tester.newPeer("peer1", big.NewInt(10000), hashes[0])
	tester.newPeer("peer2", big.NewInt(0), common.Hash{})
	tester.badBlocksPeer("peer3", big.NewInt(0), common.Hash{})
	tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})

	// Concurrently download and take the blocks
298
	took, err := tester.syncTake("peer1", hashes[0])
299
	if err != nil {
300
		t.Fatalf("failed to synchronise blocks: %v", err)
301 302 303 304 305
	}
	if len(took) != targetBlocks {
		t.Fatalf("downloaded block mismatch: have %v, want %v", len(took), targetBlocks)
	}
}
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322

// Tests that if a peer returns an invalid chain with a block pointing to a non-
// existing parent, it is correctly detected and handled.
func TestNonExistingParentAttack(t *testing.T) {
	// Forge a single-link chain with a forged header
	hashes := createHashes(0, 1)
	blocks := createBlocksFromHashes(hashes)

	forged := blocks[hashes[0]]
	forged.ParentHeaderHash = unknownHash

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, blocks)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if err := tester.sync("attack", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
323 324 325
	bs := tester.downloader.TakeBlocks()
	if len(bs) != 1 {
		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
326
	}
327 328
	if tester.hasBlock(bs[0].ParentHash()) {
		t.Fatalf("tester knows about the unknown hash")
329 330 331 332 333 334 335 336 337
	}
	tester.downloader.Cancel()

	// Reconstruct a valid chain, and try to synchronize with it
	forged.ParentHeaderHash = knownHash
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
338
	bs = tester.downloader.TakeBlocks()
339
	if len(bs) != 1 {
340
		t.Fatalf("retrieved block mismatch: have %v, want %v", len(bs), 1)
341
	}
342 343 344
	if !tester.hasBlock(bs[0].ParentHash()) {
		t.Fatalf("tester doesn't know about the origin hash")
	}
345
}
346 347 348 349 350

// Tests that if a malicious peers keeps sending us repeating hashes, we don't
// loop indefinitely.
func TestRepeatingHashAttack(t *testing.T) {
	// Create a valid chain, but drop the last link
351
	hashes := createHashes(0, blockCacheLimit)
352
	blocks := createBlocksFromHashes(hashes)
353
	forged := hashes[:len(hashes)-1]
354 355

	// Try and sync with the malicious node
356 357
	tester := newTester(t, forged, blocks)
	tester.newPeer("attack", big.NewInt(10000), forged[0])
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372

	errc := make(chan error)
	go func() {
		errc <- tester.sync("attack", hashes[0])
	}()

	// Make sure that syncing returns and does so with a failure
	select {
	case <-time.After(100 * time.Millisecond):
		t.Fatalf("synchronisation blocked")
	case err := <-errc:
		if err == nil {
			t.Fatalf("synchronisation succeeded")
		}
	}
373 374 375 376 377 378
	// Ensure that a valid chain can still pass sync
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
379
}
380 381 382 383 384

// Tests that if a malicious peers returns a non-existent block hash, it should
// eventually time out and the sync reattempted.
func TestNonExistingBlockAttack(t *testing.T) {
	// Create a valid chain, but forge the last link
385
	hashes := createHashes(0, blockCacheLimit)
386
	blocks := createBlocksFromHashes(hashes)
387
	origin := hashes[len(hashes)/2]
388 389 390 391 392 393 394 395 396

	hashes[len(hashes)/2] = unknownHash

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, hashes, blocks)
	tester.newPeer("attack", big.NewInt(10000), hashes[0])
	if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable)
	}
397 398 399 400 401 402
	// Ensure that a valid chain can still pass sync
	hashes[len(hashes)/2] = origin
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if err := tester.sync("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
403
}
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431

// Tests that if a malicious peer is returning hashes in a weird order, that the
// sync throttler doesn't choke on them waiting for the valid blocks.
func TestInvalidHashOrderAttack(t *testing.T) {
	// Create a valid long chain, but reverse some hashes within
	hashes := createHashes(0, 4*blockCacheLimit)
	blocks := createBlocksFromHashes(hashes)

	reverse := make([]common.Hash, len(hashes))
	copy(reverse, hashes)

	for i := len(hashes) / 4; i < 2*len(hashes)/4; i++ {
		reverse[i], reverse[len(hashes)-i-1] = reverse[len(hashes)-i-1], reverse[i]
	}

	// Try and sync with the malicious node and check that it fails
	tester := newTester(t, reverse, blocks)
	tester.newPeer("attack", big.NewInt(10000), reverse[0])
	if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain {
		t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
	}
	// Ensure that a valid chain can still pass sync
	tester.hashes = hashes
	tester.newPeer("valid", big.NewInt(20000), hashes[0])
	if _, err := tester.syncTake("valid", hashes[0]); err != nil {
		t.Fatalf("failed to synchronise blocks: %v", err)
	}
}