blockchain.go 45.9 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package core implements the Ethereum consensus protocol.
O
obscuren 已提交
18
package core
O
obscuren 已提交
19 20

import (
21
	"errors"
O
obscuren 已提交
22
	"fmt"
23
	"io"
24
	"math/big"
25
	mrand "math/rand"
26
	"runtime"
O
obscuren 已提交
27
	"sync"
O
obscuren 已提交
28
	"sync/atomic"
29
	"time"
30

O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33
	"github.com/ethereum/go-ethereum/core/types"
34
	"github.com/ethereum/go-ethereum/core/vm"
35
	"github.com/ethereum/go-ethereum/crypto"
36
	"github.com/ethereum/go-ethereum/ethdb"
O
obscuren 已提交
37
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
38
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
39
	"github.com/ethereum/go-ethereum/logger/glog"
40
	"github.com/ethereum/go-ethereum/metrics"
41
	"github.com/ethereum/go-ethereum/params"
42
	"github.com/ethereum/go-ethereum/pow"
43
	"github.com/ethereum/go-ethereum/rlp"
44
	"github.com/ethereum/go-ethereum/trie"
O
obscuren 已提交
45
	"github.com/hashicorp/golang-lru"
O
obscuren 已提交
46 47
)

48
var (
49
	blockInsertTimer = metrics.NewTimer("chain/inserts")
50 51

	ErrNoGenesis = errors.New("Genesis not found in chain")
52
)
Z
zelig 已提交
53

O
obscuren 已提交
54
const (
55
	bodyCacheLimit      = 256
O
obscuren 已提交
56
	blockCacheLimit     = 256
O
wip  
obscuren 已提交
57 58
	maxFutureBlocks     = 256
	maxTimeFutureBlocks = 30
59 60 61
	// must be bumped when consensus algorithm is changed, this forces the upgradedb
	// command to be run (forces the blocks to be imported again using the new algorithm)
	BlockChainVersion = 3
O
obscuren 已提交
62
)
63

64 65 66 67 68 69 70 71 72 73 74 75 76 77
// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
// Importing blocks in to the block chain happens according to the set of rules
// defined by the two stage Validator. Processing of blocks is done using the
// Processor which processes the included transaction. The validation of the state
// is done in the second part of the Validator. Failing results in aborting of
// the import.
//
// The BlockChain also helps in returning blocks from **any** chain included
// in the database as well as blocks that represents the canonical chain. It's
// important to note that GetBlock can return any block and does not need to be
// included in the canonical one where as GetBlockByNumber always represents the
// canonical chain.
78
type BlockChain struct {
79
	config *params.ChainConfig // chain & network configuration
80

81
	hc           *HeaderChain
82
	chainDb      ethdb.Database
O
obscuren 已提交
83
	eventMux     *event.TypeMux
84
	genesisBlock *types.Block
85

86 87 88
	mu      sync.RWMutex // global mutex for locking chain operations
	chainmu sync.RWMutex // blockchain insertion lock
	procmu  sync.RWMutex // block processor lock
89

90 91 92
	checkpoint       int          // checkpoint counts towards the new checkpoint
	currentBlock     *types.Block // Current head of the block chain
	currentFastBlock *types.Block // Current head of the fast-sync chain (may be above the block chain!)
O
obscuren 已提交
93

94 95 96 97 98
	stateCache   *state.StateDB // State database to reuse between imports (contains state cache)
	bodyCache    *lru.Cache     // Cache for the most recent block bodies
	bodyRLPCache *lru.Cache     // Cache for the most recent block bodies in RLP encoded format
	blockCache   *lru.Cache     // Cache for the most recent entire blocks
	futureBlocks *lru.Cache     // future blocks are blocks added for later processing
99

100
	quit    chan struct{} // blockchain quit channel
L
Leif Jurvetson 已提交
101
	running int32         // running must be called atomically
O
obscuren 已提交
102
	// procInterrupt must be atomically called
103 104
	procInterrupt int32          // interrupt signaler for block processing
	wg            sync.WaitGroup // chain processing wait group for shutting down
105

106
	pow       pow.PoW
107 108
	processor Processor // block processor interface
	validator Validator // block and state validator interface
O
obscuren 已提交
109
}
O
obscuren 已提交
110

111 112 113
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialiser the default Ethereum Validator and
// Processor.
114
func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
115 116 117 118 119
	bodyCache, _ := lru.New(bodyCacheLimit)
	bodyRLPCache, _ := lru.New(bodyCacheLimit)
	blockCache, _ := lru.New(blockCacheLimit)
	futureBlocks, _ := lru.New(maxFutureBlocks)

120
	bc := &BlockChain{
121
		config:       config,
122 123 124 125 126 127 128 129
		chainDb:      chainDb,
		eventMux:     mux,
		quit:         make(chan struct{}),
		bodyCache:    bodyCache,
		bodyRLPCache: bodyRLPCache,
		blockCache:   blockCache,
		futureBlocks: futureBlocks,
		pow:          pow,
O
obscuren 已提交
130
	}
131 132
	bc.SetValidator(NewBlockValidator(config, bc, pow))
	bc.SetProcessor(NewStateProcessor(config, bc))
133 134 135

	gv := func() HeaderValidator { return bc.Validator() }
	var err error
136
	bc.hc, err = NewHeaderChain(chainDb, config, gv, bc.getProcInterrupt)
137 138 139
	if err != nil {
		return nil, err
	}
140 141 142

	bc.genesisBlock = bc.GetBlockByNumber(0)
	if bc.genesisBlock == nil {
143
		return nil, ErrNoGenesis
144
	}
145

146
	if err := bc.loadLastState(); err != nil {
147
		return nil, err
O
obscuren 已提交
148
	}
149
	// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
F
Felix Lange 已提交
150
	for hash := range BadHashes {
151
		if header := bc.GetHeaderByHash(hash); header != nil {
152 153 154 155 156 157 158 159
			// get the canonical block corresponding to the offending header's number
			headerByNumber := bc.GetHeaderByNumber(header.Number.Uint64())
			// make sure the headerByNumber (if present) is in our current canonical chain
			if headerByNumber != nil && headerByNumber.Hash() == header.Hash() {
				glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4])
				bc.SetHead(header.Number.Uint64() - 1)
				glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation")
			}
160 161
		}
	}
162
	// Take ownership of this particular state
163
	go bc.update()
O
obscuren 已提交
164
	return bc, nil
165 166
}

167 168 169 170
func (self *BlockChain) getProcInterrupt() bool {
	return atomic.LoadInt32(&self.procInterrupt) == 1
}

171 172 173 174 175 176 177 178 179
// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (self *BlockChain) loadLastState() error {
	// Restore the last known head block
	head := GetHeadBlockHash(self.chainDb)
	if head == (common.Hash{}) {
		// Corrupt or empty database, init from scratch
		self.Reset()
	} else {
180
		if block := self.GetBlockByHash(head); block != nil {
181 182 183 184 185 186 187 188
			// Block found, set as the current head
			self.currentBlock = block
		} else {
			// Corrupt or empty database, init from scratch
			self.Reset()
		}
	}
	// Restore the last known head header
189
	currentHeader := self.currentBlock.Header()
190
	if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) {
191
		if header := self.GetHeaderByHash(head); header != nil {
192
			currentHeader = header
193 194
		}
	}
195
	self.hc.SetCurrentHeader(currentHeader)
196 197 198
	// Restore the last known head fast block
	self.currentFastBlock = self.currentBlock
	if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) {
199
		if block := self.GetBlockByHash(head); block != nil {
200 201 202
			self.currentFastBlock = block
		}
	}
203 204 205 206 207 208 209 210 211
	// Initialize a statedb cache to ensure singleton account bloom filter generation
	statedb, err := state.New(self.currentBlock.Root(), self.chainDb)
	if err != nil {
		return err
	}
	self.stateCache = statedb
	self.stateCache.GetAccount(common.Address{})

	// Issue a status log for the user
212 213 214
	headerTd := self.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
	blockTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	fastTd := self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64())
215

216
	glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", currentHeader.Number, currentHeader.Hash().Bytes()[:4], headerTd)
217
	glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
218
	glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
219 220 221 222

	return nil
}

223 224 225 226
// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
227
func (bc *BlockChain) SetHead(head uint64) {
228 229 230
	bc.mu.Lock()
	defer bc.mu.Unlock()

231 232
	delFn := func(hash common.Hash, num uint64) {
		DeleteBody(bc.chainDb, hash, num)
233
	}
234 235
	bc.hc.SetHead(head, delFn)

236
	// Clear out any stale content from the caches
237 238 239 240
	bc.bodyCache.Purge()
	bc.bodyRLPCache.Purge()
	bc.blockCache.Purge()
	bc.futureBlocks.Purge()
241

242
	// Update all computed fields to the new head
243 244 245
	currentHeader := bc.hc.CurrentHeader()
	if bc.currentBlock != nil && currentHeader.Number.Uint64() < bc.currentBlock.NumberU64() {
		bc.currentBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
246
	}
247 248
	if bc.currentFastBlock != nil && currentHeader.Number.Uint64() < bc.currentFastBlock.NumberU64() {
		bc.currentFastBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
249 250
	}

251 252 253
	if bc.currentBlock == nil {
		bc.currentBlock = bc.genesisBlock
	}
254 255 256
	if bc.currentFastBlock == nil {
		bc.currentFastBlock = bc.genesisBlock
	}
257

258 259 260 261 262 263
	if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head block hash: %v", err)
	}
	if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head fast block hash: %v", err)
	}
264
	bc.loadLastState()
265 266
}

267 268 269
// FastSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
270
	// Make sure that both the block as well at its state trie exists
271
	block := self.GetBlockByHash(hash)
272 273 274
	if block == nil {
		return fmt.Errorf("non existent block [%x…]", hash[:4])
	}
275
	if _, err := trie.NewSecure(block.Root(), self.chainDb, 0); err != nil {
276 277 278 279 280 281 282 283 284 285 286
		return err
	}
	// If all checks out, manually set the head block
	self.mu.Lock()
	self.currentBlock = block
	self.mu.Unlock()

	glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4])
	return nil
}

287
// GasLimit returns the gas limit of the current HEAD block.
288
func (self *BlockChain) GasLimit() *big.Int {
O
obscuren 已提交
289 290
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
291

292
	return self.currentBlock.GasLimit()
O
obscuren 已提交
293 294
}

295
// LastBlockHash return the hash of the HEAD block.
296
func (self *BlockChain) LastBlockHash() common.Hash {
297 298 299
	self.mu.RLock()
	defer self.mu.RUnlock()

300
	return self.currentBlock.Hash()
301 302
}

303
// CurrentBlock retrieves the current head block of the canonical chain. The
304
// block is retrieved from the blockchain's internal cache.
305
func (self *BlockChain) CurrentBlock() *types.Block {
O
obscuren 已提交
306 307 308 309
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentBlock
O
obscuren 已提交
310 311
}

312
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
313
// chain. The block is retrieved from the blockchain's internal cache.
314 315 316 317 318 319 320
func (self *BlockChain) CurrentFastBlock() *types.Block {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentFastBlock
}

321 322
// Status returns status information about the current chain such as the HEAD Td,
// the HEAD hash and the hash of the genesis block.
323
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
O
obscuren 已提交
324 325 326
	self.mu.RLock()
	defer self.mu.RUnlock()

327
	return self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64()), self.currentBlock.Hash(), self.genesisBlock.Hash()
328 329
}

330 331 332 333 334 335 336 337 338 339 340 341
// SetProcessor sets the processor required for making state modifications.
func (self *BlockChain) SetProcessor(processor Processor) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.processor = processor
}

// SetValidator sets the validator which is used to validate incoming blocks.
func (self *BlockChain) SetValidator(validator Validator) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.validator = validator
342 343
}

344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
// Validator returns the current validator.
func (self *BlockChain) Validator() Validator {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.validator
}

// Processor returns the current processor.
func (self *BlockChain) Processor() Processor {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.processor
}

// AuxValidator returns the auxiliary validator (Proof of work atm)
func (self *BlockChain) AuxValidator() pow.PoW { return self.pow }

// State returns a new mutable state based on the current HEAD block.
362
func (self *BlockChain) State() (*state.StateDB, error) {
363 364 365 366 367 368
	return self.StateAt(self.CurrentBlock().Root())
}

// StateAt returns a new mutable state based on a particular point in time.
func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
	return self.stateCache.New(root)
O
obscuren 已提交
369 370
}

371
// Reset purges the entire blockchain, restoring it to its genesis state.
372
func (bc *BlockChain) Reset() {
373
	bc.ResetWithGenesisBlock(bc.genesisBlock)
374 375
}

376 377
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
378
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
379 380 381
	// Dump the entire block chain and purge the caches
	bc.SetHead(0)

382 383 384
	bc.mu.Lock()
	defer bc.mu.Unlock()

385
	// Prepare the genesis block and reinitialise the chain
386
	if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
387 388 389
		glog.Fatalf("failed to write genesis block TD: %v", err)
	}
	if err := WriteBlock(bc.chainDb, genesis); err != nil {
390
		glog.Fatalf("failed to write genesis block: %v", err)
391
	}
392
	bc.genesisBlock = genesis
393 394
	bc.insert(bc.genesisBlock)
	bc.currentBlock = bc.genesisBlock
395 396
	bc.hc.SetGenesis(bc.genesisBlock.Header())
	bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
397
	bc.currentFastBlock = bc.genesisBlock
398 399
}

400
// Export writes the active chain to the given writer.
401
func (self *BlockChain) Export(w io.Writer) error {
402
	return self.ExportN(w, uint64(0), self.currentBlock.NumberU64())
T
Taylor Gerring 已提交
403 404 405
}

// ExportN writes a subset of the active chain to the given writer.
406
func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
O
obscuren 已提交
407 408
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
409

T
Taylor Gerring 已提交
410 411 412 413
	if first > last {
		return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
	}

T
Cleanup  
Taylor Gerring 已提交
414
	glog.V(logger.Info).Infof("exporting %d blocks...\n", last-first+1)
415

T
Taylor Gerring 已提交
416
	for nr := first; nr <= last; nr++ {
417 418 419 420 421 422
		block := self.GetBlockByNumber(nr)
		if block == nil {
			return fmt.Errorf("export failed on #%d: not found", nr)
		}

		if err := block.EncodeRLP(w); err != nil {
423 424
			return err
		}
O
obscuren 已提交
425
	}
426

427
	return nil
O
obscuren 已提交
428 429
}

430 431
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
432 433
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
434 435
//
// Note, this function assumes that the `mu` mutex is held!
436
func (bc *BlockChain) insert(block *types.Block) {
437 438 439
	// If the block is on a side chain or an unknown one, force other heads onto it too
	updateHeads := GetCanonicalHash(bc.chainDb, block.NumberU64()) != block.Hash()

440 441 442
	// Add the block to the canonical chain number scheme and mark as the head
	if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil {
		glog.Fatalf("failed to insert block number: %v", err)
443
	}
444
	if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil {
445
		glog.Fatalf("failed to insert head block hash: %v", err)
446
	}
447
	bc.currentBlock = block
448 449 450

	// If the block is better than out head or is on a different chain, force update heads
	if updateHeads {
451
		bc.hc.SetCurrentHeader(block.Header())
452 453 454 455 456 457

		if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil {
			glog.Fatalf("failed to insert head fast block hash: %v", err)
		}
		bc.currentFastBlock = block
	}
458 459
}

O
obscuren 已提交
460
// Accessors
461
func (bc *BlockChain) Genesis() *types.Block {
O
obscuren 已提交
462 463 464
	return bc.genesisBlock
}

465 466
// GetBody retrieves a block body (transactions and uncles) from the database by
// hash, caching it if found.
467
func (self *BlockChain) GetBody(hash common.Hash) *types.Body {
468 469
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyCache.Get(hash); ok {
470 471
		body := cached.(*types.Body)
		return body
O
obscuren 已提交
472
	}
473
	body := GetBody(self.chainDb, hash, self.hc.GetBlockNumber(hash))
474 475
	if body == nil {
		return nil
476 477
	}
	// Cache the found body for next time and return
478 479
	self.bodyCache.Add(hash, body)
	return body
480
}
O
obscuren 已提交
481

482 483
// GetBodyRLP retrieves a block body in RLP encoding from the database by hash,
// caching it if found.
484
func (self *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
485 486
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyRLPCache.Get(hash); ok {
487
		return cached.(rlp.RawValue)
488
	}
489
	body := GetBodyRLP(self.chainDb, hash, self.hc.GetBlockNumber(hash))
490
	if len(body) == 0 {
491
		return nil
O
obscuren 已提交
492
	}
493 494 495 496
	// Cache the found body for next time and return
	self.bodyRLPCache.Add(hash, body)
	return body
}
O
obscuren 已提交
497

498 499
// HasBlock checks if a block is fully present in the database or not, caching
// it if present.
500
func (bc *BlockChain) HasBlock(hash common.Hash) bool {
501
	return bc.GetBlockByHash(hash) != nil
O
obscuren 已提交
502 503
}

504 505 506 507
// HasBlockAndState checks if a block and associated state trie is fully present
// in the database or not, caching it if present.
func (bc *BlockChain) HasBlockAndState(hash common.Hash) bool {
	// Check first that the block itself is known
508
	block := bc.GetBlockByHash(hash)
509 510 511 512 513 514 515 516
	if block == nil {
		return false
	}
	// Ensure the associated state is also present
	_, err := state.New(block.Root(), bc.chainDb)
	return err == nil
}

517 518 519
// GetBlock retrieves a block from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
520 521
	// Short circuit if the block's already in the cache, retrieve otherwise
	if block, ok := self.blockCache.Get(hash); ok {
522 523
		return block.(*types.Block)
	}
524
	block := GetBlock(self.chainDb, hash, number)
525
	if block == nil {
O
obscuren 已提交
526 527
		return nil
	}
528 529 530
	// Cache the found block for next time and return
	self.blockCache.Add(block.Hash(), block)
	return block
O
obscuren 已提交
531 532
}

533 534 535 536 537
// GetBlockByHash retrieves a block from the database by hash, caching it if found.
func (self *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
	return self.GetBlock(hash, self.hc.GetBlockNumber(hash))
}

538 539
// GetBlockByNumber retrieves a block from the database by number, caching it
// (associated with its hash) if found.
540
func (self *BlockChain) GetBlockByNumber(number uint64) *types.Block {
541
	hash := GetCanonicalHash(self.chainDb, number)
542 543 544
	if hash == (common.Hash{}) {
		return nil
	}
545
	return self.GetBlock(hash, number)
546
}
547

548
// [deprecated by eth/62]
F
Felix Lange 已提交
549
// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
550
func (self *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
551
	number := self.hc.GetBlockNumber(hash)
F
Felix Lange 已提交
552
	for i := 0; i < n; i++ {
553
		block := self.GetBlock(hash, number)
F
Felix Lange 已提交
554 555 556 557 558
		if block == nil {
			break
		}
		blocks = append(blocks, block)
		hash = block.ParentHash()
559
		number--
F
Felix Lange 已提交
560 561 562 563
	}
	return
}

564 565 566 567
// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
	uncles := []*types.Header{}
568 569
	for i := 0; block != nil && i < length; i++ {
		uncles = append(uncles, block.Uncles()...)
570
		block = self.GetBlock(block.ParentHash(), block.NumberU64()-1)
571
	}
572
	return uncles
O
obscuren 已提交
573
}
O
obscuren 已提交
574

575 576
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
577
func (bc *BlockChain) Stop() {
578 579 580
	if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
		return
	}
581
	close(bc.quit)
O
obscuren 已提交
582
	atomic.StoreInt32(&bc.procInterrupt, 1)
583 584 585 586

	bc.wg.Wait()

	glog.V(logger.Info).Infoln("Chain manager stopped")
587 588
}

589
func (self *BlockChain) procFutureBlocks() {
590 591 592 593 594
	blocks := make([]*types.Block, 0, self.futureBlocks.Len())
	for _, hash := range self.futureBlocks.Keys() {
		if block, exist := self.futureBlocks.Get(hash); exist {
			blocks = append(blocks, block.(*types.Block))
		}
595
	}
596 597
	if len(blocks) > 0 {
		types.BlockBy(types.Number).Sort(blocks)
598 599 600 601 602

		// Insert one by one as chain insertion needs contiguous ancestry between blocks
		for i := range blocks {
			self.InsertChain(blocks[i : i+1])
		}
603
	}
604 605
}

606
type WriteStatus byte
607 608

const (
609
	NonStatTy WriteStatus = iota
610 611 612
	CanonStatTy
	SplitStatTy
	SideStatTy
613 614
)

615 616 617
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (self *BlockChain) Rollback(chain []common.Hash) {
618 619 620
	self.mu.Lock()
	defer self.mu.Unlock()

621 622 623
	for i := len(chain) - 1; i >= 0; i-- {
		hash := chain[i]

624 625 626
		currentHeader := self.hc.CurrentHeader()
		if currentHeader.Hash() == hash {
			self.hc.SetCurrentHeader(self.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
627 628
		}
		if self.currentFastBlock.Hash() == hash {
629
			self.currentFastBlock = self.GetBlock(self.currentFastBlock.ParentHash(), self.currentFastBlock.NumberU64()-1)
630 631 632
			WriteHeadFastBlockHash(self.chainDb, self.currentFastBlock.Hash())
		}
		if self.currentBlock.Hash() == hash {
633
			self.currentBlock = self.GetBlock(self.currentBlock.ParentHash(), self.currentBlock.NumberU64()-1)
634 635 636 637 638
			WriteHeadBlockHash(self.chainDb, self.currentBlock.Hash())
		}
	}
}

639
// SetReceiptsData computes all the non-consensus fields of the receipts
J
Jeffrey Wilcke 已提交
640 641 642
func SetReceiptsData(config *params.ChainConfig, block *types.Block, receipts types.Receipts) {
	signer := types.MakeSigner(config, block.Number())

643 644 645 646 647 648
	transactions, logIndex := block.Transactions(), uint(0)

	for j := 0; j < len(receipts); j++ {
		// The transaction hash can be retrieved from the transaction itself
		receipts[j].TxHash = transactions[j].Hash()

J
Jeffrey Wilcke 已提交
649
		tx, _ := transactions[j].AsMessage(signer)
650
		// The contract address can be derived from the transaction itself
J
Jeffrey Wilcke 已提交
651 652
		if MessageCreatesContract(tx) {
			receipts[j].ContractAddress = crypto.CreateAddress(tx.From(), tx.Nonce())
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671
		}
		// The used gas can be calculated based on previous receipts
		if j == 0 {
			receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed)
		} else {
			receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed)
		}
		// The derived log fields can simply be set from the block and transaction
		for k := 0; k < len(receipts[j].Logs); k++ {
			receipts[j].Logs[k].BlockNumber = block.NumberU64()
			receipts[j].Logs[k].BlockHash = block.Hash()
			receipts[j].Logs[k].TxHash = receipts[j].TxHash
			receipts[j].Logs[k].TxIndex = uint(j)
			receipts[j].Logs[k].Index = logIndex
			logIndex++
		}
	}
}

672 673
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
J
Jeffrey Wilcke 已提交
674
// XXX should this be moved to the test?
675
func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
676 677 678 679 680 681 682 683 684 685 686 687
	// Do a sanity check that the provided chain is actually ordered and linked
	for i := 1; i < len(blockChain); i++ {
		if blockChain[i].NumberU64() != blockChain[i-1].NumberU64()+1 || blockChain[i].ParentHash() != blockChain[i-1].Hash() {
			// Chain broke ancestry, log a messge (programming error) and skip insertion
			failure := fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, blockChain[i-1].NumberU64(),
				blockChain[i-1].Hash().Bytes()[:4], i, blockChain[i].NumberU64(), blockChain[i].Hash().Bytes()[:4], blockChain[i].ParentHash().Bytes()[:4])

			glog.V(logger.Error).Info(failure.Error())
			return 0, failure
		}
	}
	// Pre-checks passed, start the block body and receipt imports
688 689 690 691
	self.wg.Add(1)
	defer self.wg.Done()

	// Collect some import statistics to report on
692
	stats := struct{ processed, ignored int32 }{}
693 694
	start := time.Now()

695 696
	// Create the block importing task queue and worker functions
	tasks := make(chan int, len(blockChain))
697
	for i := 0; i < len(blockChain) && i < len(receiptChain); i++ {
698 699 700
		tasks <- i
	}
	close(tasks)
701

702 703 704 705 706 707 708 709
	errs, failed := make([]error, len(tasks)), int32(0)
	process := func(worker int) {
		for index := range tasks {
			block, receipts := blockChain[index], receiptChain[index]

			// Short circuit insertion if shutting down or processing failed
			if atomic.LoadInt32(&self.procInterrupt) == 1 {
				return
710
			}
711 712
			if atomic.LoadInt32(&failed) > 0 {
				return
713
			}
714 715 716 717 718
			// Short circuit if the owner header is unknown
			if !self.HasHeader(block.Hash()) {
				errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
				atomic.AddInt32(&failed, 1)
				return
719
			}
720 721 722 723 724 725
			// Skip if the entire data is already known
			if self.HasBlock(block.Hash()) {
				atomic.AddInt32(&stats.ignored, 1)
				continue
			}
			// Compute all the non-consensus fields of the receipts
J
Jeffrey Wilcke 已提交
726
			SetReceiptsData(self.config, block, receipts)
727
			// Write all the data out into the database
728
			if err := WriteBody(self.chainDb, block.Hash(), block.NumberU64(), block.Body()); err != nil {
729 730 731 732 733
				errs[index] = fmt.Errorf("failed to write block body: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
734
			if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
735 736 737 738 739
				errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
740 741 742 743 744 745
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
746 747 748 749 750 751 752 753 754 755 756 757
			if err := WriteTransactions(self.chainDb, block); err != nil {
				errs[index] = fmt.Errorf("failed to write individual transactions: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write individual receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
758
			atomic.AddInt32(&stats.processed, 1)
759
		}
760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
	}
	// Start as many worker threads as goroutines allowed
	pending := new(sync.WaitGroup)
	for i := 0; i < runtime.GOMAXPROCS(0); i++ {
		pending.Add(1)
		go func(id int) {
			defer pending.Done()
			process(id)
		}(i)
	}
	pending.Wait()

	// If anything failed, report
	if failed > 0 {
		for i, err := range errs {
			if err != nil {
				return i, err
777 778 779
			}
		}
	}
780 781 782 783
	if atomic.LoadInt32(&self.procInterrupt) == 1 {
		glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
		return 0, nil
	}
784 785 786
	// Update the head fast sync block if better
	self.mu.Lock()
	head := blockChain[len(errs)-1]
787
	if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(self.GetTd(head.Hash(), head.NumberU64())) < 0 {
788 789 790 791 792 793 794
		if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil {
			glog.Fatalf("failed to update head fast block hash: %v", err)
		}
		self.currentFastBlock = head
	}
	self.mu.Unlock()

795 796
	// Report some public statistics so the user has a clue what's going on
	first, last := blockChain[0], blockChain[len(blockChain)-1]
797 798 799 800 801

	ignored := ""
	if stats.ignored > 0 {
		ignored = fmt.Sprintf(" (%d ignored)", stats.ignored)
	}
802
	glog.V(logger.Info).Infof("imported %d receipts in %9v. #%d [%x… / %x…]%s", stats.processed, common.PrettyDuration(time.Since(start)), last.Number(), first.Hash().Bytes()[:4], last.Hash().Bytes()[:4], ignored)
803 804 805 806

	return 0, nil
}

807
// WriteBlock writes the block to the chain.
808
func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err error) {
809 810 811
	self.wg.Add(1)
	defer self.wg.Done()

812
	// Calculate the total difficulty of the block
813
	ptd := self.GetTd(block.ParentHash(), block.NumberU64()-1)
814 815 816
	if ptd == nil {
		return NonStatTy, ParentError(block.ParentHash())
	}
817 818 819
	// Make sure no inconsistent state is leaked during insertion
	self.mu.Lock()
	defer self.mu.Unlock()
820

821 822 823
	localTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	externTd := new(big.Int).Add(block.Difficulty(), ptd)

824 825 826 827 828 829 830 831
	// Irrelevant of the canonical status, write the block itself to the database
	if err := self.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
		glog.Fatalf("failed to write block total difficulty: %v", err)
	}
	if err := WriteBlock(self.chainDb, block); err != nil {
		glog.Fatalf("failed to write block contents: %v", err)
	}

832
	// If the total difficulty is higher than our known, add it to the canonical chain
833 834 835
	// Second clause in the if statement reduces the vulnerability to selfish mining.
	// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
	if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
836
		// Reorganise the chain if the parent is not the head block
837 838
		if block.ParentHash() != self.currentBlock.Hash() {
			if err := self.reorg(self.currentBlock, block); err != nil {
839
				return NonStatTy, err
840 841
			}
		}
842
		self.insert(block) // Insert the block as the new head of the chain
843
		status = CanonStatTy
844
	} else {
845
		status = SideStatTy
846
	}
847

848
	self.futureBlocks.Remove(block.Hash())
849 850 851 852

	return
}

853 854
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
855
func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
856 857 858 859 860 861 862 863 864 865 866 867
	// Do a sanity check that the provided chain is actually ordered and linked
	for i := 1; i < len(chain); i++ {
		if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
			// Chain broke ancestry, log a messge (programming error) and skip insertion
			failure := fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])",
				i-1, chain[i-1].NumberU64(), chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])

			glog.V(logger.Error).Info(failure.Error())
			return 0, failure
		}
	}
	// Pre-checks passed, start the full block imports
868 869 870
	self.wg.Add(1)
	defer self.wg.Done()

O
obscuren 已提交
871 872 873
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

F
Felix Lange 已提交
874 875 876
	// A queued approach to delivering events. This is generally
	// faster than direct delivery and requires much less mutex
	// acquiring.
O
obscuren 已提交
877
	var (
F
Felix Lange 已提交
878
		stats         = insertStats{startTime: time.Now()}
879
		events        = make([]interface{}, 0, len(chain))
F
Felix Lange 已提交
880
		coalescedLogs []*types.Log
F
Felix Lange 已提交
881
		nonceChecked  = make([]bool, len(chain))
O
obscuren 已提交
882
	)
883

F
Felix Lange 已提交
884
	// Start the parallel nonce verifier.
885 886
	nonceAbort, nonceResults := verifyNoncesFromBlocks(self.pow, chain)
	defer close(nonceAbort)
887

888
	for i, block := range chain {
O
obscuren 已提交
889
		if atomic.LoadInt32(&self.procInterrupt) == 1 {
890
			glog.V(logger.Debug).Infoln("Premature abort during block chain processing")
O
obscuren 已提交
891 892
			break
		}
893

O
obscuren 已提交
894 895 896 897
		bstart := time.Now()
		// Wait for block i's nonce to be verified before processing
		// its state transition.
		for !nonceChecked[i] {
898 899
			r := <-nonceResults
			nonceChecked[r.index] = true
O
obscuren 已提交
900
			if !r.valid {
901 902
				block := chain[r.index]
				return r.index, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
O
obscuren 已提交
903
			}
O
obscuren 已提交
904
		}
O
obscuren 已提交
905

O
obscuren 已提交
906
		if BadHashes[block.Hash()] {
907
			err := BadHashError(block.Hash())
908
			self.reportBlock(block, nil, err)
O
obscuren 已提交
909 910
			return i, err
		}
911 912 913
		// Stage 1 validation of the block using the chain's validator
		// interface.
		err := self.Validator().ValidateBlock(block)
O
obscuren 已提交
914 915 916 917 918
		if err != nil {
			if IsKnownBlockErr(err) {
				stats.ignored++
				continue
			}
919

O
obscuren 已提交
920 921 922 923
			if err == BlockFutureErr {
				// Allow up to MaxFuture second in the future blocks. If this limit
				// is exceeded the chain is discarded and processed at a later time
				// if given.
924 925
				max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
				if block.Time().Cmp(max) == 1 {
O
obscuren 已提交
926
					return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
927
				}
O
obscuren 已提交
928

929
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
930 931 932
				stats.queued++
				continue
			}
O
obscuren 已提交
933

934 935
			if IsParentErr(err) && self.futureBlocks.Contains(block.ParentHash()) {
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
936 937
				stats.queued++
				continue
938
			}
939

940
			self.reportBlock(block, nil, err)
941 942 943 944
			return i, err
		}
		// Create a new statedb using the parent block and report an
		// error if it fails.
945 946 947 948 949
		switch {
		case i == 0:
			err = self.stateCache.Reset(self.GetBlock(block.ParentHash(), block.NumberU64()-1).Root())
		default:
			err = self.stateCache.Reset(chain[i-1].Root())
950
		}
951
		if err != nil {
952
			self.reportBlock(block, nil, err)
O
obscuren 已提交
953 954
			return i, err
		}
955
		// Process block using the parent state as reference point.
956
		receipts, logs, usedGas, err := self.processor.Process(block, self.stateCache, vm.Config{})
957
		if err != nil {
958
			self.reportBlock(block, receipts, err)
959 960 961
			return i, err
		}
		// Validate the state using the default validator
962
		err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash(), block.NumberU64()-1), self.stateCache, receipts, usedGas)
963
		if err != nil {
964
			self.reportBlock(block, receipts, err)
965 966 967
			return i, err
		}
		// Write state changes to database
968
		_, err = self.stateCache.Commit(self.config.IsEIP158(block.Number()))
969 970 971 972 973 974 975
		if err != nil {
			return i, err
		}

		// coalesce logs for later processing
		coalescedLogs = append(coalescedLogs, logs...)

976
		if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
977
			return i, err
978
		}
O
obscuren 已提交
979

980
		// write the block to the chain and get the status
981
		status, err := self.WriteBlock(block)
982 983 984
		if err != nil {
			return i, err
		}
985

986
		switch status {
987
		case CanonStatTy:
O
obscuren 已提交
988
			if glog.V(logger.Debug) {
989
				glog.Infof("inserted block #%d [%x…] in %9v: %3d txs %7v gas %d uncles.", block.Number(), block.Hash().Bytes()[0:4], common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), block.GasUsed(), len(block.Uncles()))
O
obscuren 已提交
990
			}
991
			blockInsertTimer.UpdateSince(bstart)
992
			events = append(events, ChainEvent{block, block.Hash(), logs})
993 994

			// This puts transactions in a extra db for rpc
995
			if err := WriteTransactions(self.chainDb, block); err != nil {
996 997
				return i, err
			}
998
			// store the receipts
999
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
1000 1001 1002 1003 1004 1005
				return i, err
			}
			// Write map map bloom filters
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				return i, err
			}
1006
		case SideStatTy:
O
obscuren 已提交
1007
			if glog.V(logger.Detail) {
1008
				glog.Infof("inserted forked block #%d [%x…] (TD=%v) in %9v: %3d txs %d uncles.", block.Number(), block.Hash().Bytes()[0:4], block.Difficulty(), common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), len(block.Uncles()))
O
obscuren 已提交
1009
			}
1010
			blockInsertTimer.UpdateSince(bstart)
1011
			events = append(events, ChainSideEvent{block})
1012

1013
		case SplitStatTy:
1014
			events = append(events, ChainSplitEvent{block, logs})
1015
		}
F
Felix Lange 已提交
1016

O
obscuren 已提交
1017
		stats.processed++
F
Felix Lange 已提交
1018
		if glog.V(logger.Info) {
1019
			stats.usedGas += usedGas.Uint64()
F
Felix Lange 已提交
1020 1021
			stats.report(chain, i)
		}
O
obscuren 已提交
1022 1023
	}

1024
	go self.postChainEvents(events, coalescedLogs)
1025

1026
	return 0, nil
O
obscuren 已提交
1027
}
1028

F
Felix Lange 已提交
1029 1030 1031
// insertStats tracks and reports on block insertion.
type insertStats struct {
	queued, processed, ignored int
1032
	usedGas                    uint64
F
Felix Lange 已提交
1033 1034 1035 1036
	lastIndex                  int
	startTime                  time.Time
}

1037 1038 1039
// statsReportLimit is the time limit during import after which we always print
// out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
F
Felix Lange 已提交
1040 1041 1042 1043

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int) {
1044
	// Fetch the timings for the batch
1045 1046 1047 1048
	var (
		now     = time.Now()
		elapsed = now.Sub(st.startTime)
	)
1049 1050 1051 1052
	if elapsed == 0 { // Yes Windows, I'm looking at you
		elapsed = 1
	}
	// If we're at the last block of the batch or report period reached, log
1053
	if index == len(chain)-1 || elapsed >= statsReportLimit {
F
Felix Lange 已提交
1054
		start, end := chain[st.lastIndex], chain[index]
J
Jeffrey Wilcke 已提交
1055
		txcount := countTransactions(chain[st.lastIndex : index+1])
1056 1057 1058 1059 1060

		extra := ""
		if st.queued > 0 || st.ignored > 0 {
			extra = fmt.Sprintf(" (%d queued %d ignored)", st.queued, st.ignored)
		}
1061 1062 1063 1064 1065 1066 1067
		hashes := ""
		if st.processed > 1 {
			hashes = fmt.Sprintf("%x… / %x…", start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
		} else {
			hashes = fmt.Sprintf("%x…", end.Hash().Bytes()[:4])
		}
		glog.Infof("imported %d blocks, %5d txs (%7.3f Mg) in %9v (%6.3f Mg/s). #%v [%s]%s", st.processed, txcount, float64(st.usedGas)/1000000, common.PrettyDuration(elapsed), float64(st.usedGas)*1000/float64(elapsed), end.Number(), hashes, extra)
1068

F
Felix Lange 已提交
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079
		*st = insertStats{startTime: now, lastIndex: index}
	}
}

func countTransactions(chain []*types.Block) (c int) {
	for _, b := range chain {
		c += len(b.Transactions())
	}
	return c
}

1080 1081 1082
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
1083
func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
1084
	var (
1085 1086 1087 1088 1089 1090
		newChain    types.Blocks
		oldChain    types.Blocks
		commonBlock *types.Block
		oldStart    = oldBlock
		newStart    = newBlock
		deletedTxs  types.Transactions
F
Felix Lange 已提交
1091
		deletedLogs []*types.Log
J
Jeffrey Wilcke 已提交
1092 1093 1094 1095
		// collectLogs collects the logs that were generated during the
		// processing of the block that corresponds with the given hash.
		// These logs are later announced as deleted.
		collectLogs = func(h common.Hash) {
1096
			// Coalesce logs and set 'Removed'.
1097
			receipts := GetBlockReceipts(self.chainDb, h, self.hc.GetBlockNumber(h))
J
Jeffrey Wilcke 已提交
1098
			for _, receipt := range receipts {
1099 1100 1101 1102 1103
				for _, log := range receipt.Logs {
					del := *log
					del.Removed = true
					deletedLogs = append(deletedLogs, &del)
				}
J
Jeffrey Wilcke 已提交
1104 1105
			}
		}
1106
	)
1107 1108 1109 1110

	// first reduce whoever is higher bound
	if oldBlock.NumberU64() > newBlock.NumberU64() {
		// reduce old chain
1111
		for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
1112
			oldChain = append(oldChain, oldBlock)
1113
			deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1114 1115

			collectLogs(oldBlock.Hash())
1116 1117 1118
		}
	} else {
		// reduce new chain and append new chain blocks for inserting later on
1119
		for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
1120 1121
			newChain = append(newChain, newBlock)
		}
O
obscuren 已提交
1122
	}
O
obscuren 已提交
1123
	if oldBlock == nil {
1124
		return fmt.Errorf("Invalid old chain")
O
obscuren 已提交
1125 1126
	}
	if newBlock == nil {
1127
		return fmt.Errorf("Invalid new chain")
O
obscuren 已提交
1128
	}
O
obscuren 已提交
1129

1130
	numSplit := newBlock.Number()
1131 1132
	for {
		if oldBlock.Hash() == newBlock.Hash() {
1133
			commonBlock = oldBlock
1134 1135
			break
		}
1136 1137

		oldChain = append(oldChain, oldBlock)
1138
		newChain = append(newChain, newBlock)
1139
		deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1140
		collectLogs(oldBlock.Hash())
O
obscuren 已提交
1141

1142
		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
1143
		if oldBlock == nil {
1144
			return fmt.Errorf("Invalid old chain")
1145 1146
		}
		if newBlock == nil {
1147
			return fmt.Errorf("Invalid new chain")
1148
		}
1149 1150
	}

1151
	if glog.V(logger.Debug) {
1152
		commonHash := commonBlock.Hash()
1153
		glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
1154 1155
	}

1156
	var addedTxs types.Transactions
1157
	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
1158
	for _, block := range newChain {
1159
		// insert the block in the canonical way, re-writing history
1160
		self.insert(block)
1161
		// write canonical receipts and transactions
1162
		if err := WriteTransactions(self.chainDb, block); err != nil {
1163 1164
			return err
		}
1165
		receipts := GetBlockReceipts(self.chainDb, block.Hash(), block.NumberU64())
1166
		// write receipts
1167
		if err := WriteReceipts(self.chainDb, receipts); err != nil {
1168 1169 1170 1171 1172 1173
			return err
		}
		// Write map map bloom filters
		if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
			return err
		}
1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
		addedTxs = append(addedTxs, block.Transactions()...)
	}

	// calculate the difference between deleted and added transactions
	diff := types.TxDifference(deletedTxs, addedTxs)
	// When transactions get deleted from the database that means the
	// receipts that were created in the fork must also be deleted
	for _, tx := range diff {
		DeleteReceipt(self.chainDb, tx.Hash())
		DeleteTransaction(self.chainDb, tx.Hash())
1184
	}
1185 1186
	// Must be posted in a goroutine because of the transaction pool trying
	// to acquire the chain manager lock
J
Jeffrey Wilcke 已提交
1187 1188 1189 1190
	if len(diff) > 0 {
		go self.eventMux.Post(RemovedTransactionEvent{diff})
	}
	if len(deletedLogs) > 0 {
1191
		go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
J
Jeffrey Wilcke 已提交
1192
	}
1193

1194 1195 1196
	if len(oldChain) > 0 {
		go func() {
			for _, block := range oldChain {
1197
				self.eventMux.Post(ChainSideEvent{Block: block})
1198 1199 1200 1201
			}
		}()
	}

1202
	return nil
1203 1204
}

1205 1206
// postChainEvents iterates over the events generated by a chain insertion and
// posts them into the event mux.
F
Felix Lange 已提交
1207
func (self *BlockChain) postChainEvents(events []interface{}, logs []*types.Log) {
1208 1209
	// post event logs for further processing
	self.eventMux.Post(logs)
1210 1211 1212 1213
	for _, event := range events {
		if event, ok := event.(ChainEvent); ok {
			// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
			// and in most cases isn't even necessary.
1214
			if self.LastBlockHash() == event.Hash {
1215 1216 1217 1218 1219 1220 1221 1222
				self.eventMux.Post(ChainHeadEvent{event.Block})
			}
		}
		// Fire the insertion events individually too
		self.eventMux.Post(event)
	}
}

1223
func (self *BlockChain) update() {
O
obscuren 已提交
1224
	futureTimer := time.Tick(5 * time.Second)
1225 1226
	for {
		select {
O
obscuren 已提交
1227
		case <-futureTimer:
1228
			self.procFutureBlocks()
1229
		case <-self.quit:
1230
			return
1231 1232 1233
		}
	}
}
1234

F
Felix Lange 已提交
1235
// reportBlock logs a bad block error.
1236
func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
1237
	if glog.V(logger.Error) {
1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
		var receiptString string
		for _, receipt := range receipts {
			receiptString += fmt.Sprintf("\t%v\n", receipt)
		}
		glog.Errorf(`
########## BAD BLOCK #########
Chain config: %v

Number: %v
Hash: 0x%x
%v

Error: %v
##############################
`, bc.config, block.Number(), block.Hash(), receiptString, err)
1253
	}
F
Felix Lange 已提交
1254
}
1255 1256 1257 1258 1259 1260 1261

// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
L
Leif Jurvetson 已提交
1262
// of the header retrieval mechanisms already need to verify nonces, as well as
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312
// because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
	// Make sure only one thread manipulates the chain at once
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

	self.wg.Add(1)
	defer self.wg.Done()

	whFunc := func(header *types.Header) error {
		self.mu.Lock()
		defer self.mu.Unlock()

		_, err := self.hc.WriteHeader(header)
		return err
	}

	return self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
}

// writeHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (self *BlockChain) writeHeader(header *types.Header) error {
	self.wg.Add(1)
	defer self.wg.Done()

	self.mu.Lock()
	defer self.mu.Unlock()

	_, err := self.hc.WriteHeader(header)
	return err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (self *BlockChain) CurrentHeader() *types.Header {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.hc.CurrentHeader()
}

// GetTd retrieves a block's total difficulty in the canonical chain from the
1313 1314 1315 1316 1317 1318
// database by hash and number, caching it if found.
func (self *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
	return self.hc.GetTd(hash, number)
}

// GetTdByHash retrieves a block's total difficulty in the canonical chain from the
1319
// database by hash, caching it if found.
1320 1321 1322 1323 1324 1325 1326 1327
func (self *BlockChain) GetTdByHash(hash common.Hash) *big.Int {
	return self.hc.GetTdByHash(hash)
}

// GetHeader retrieves a block header from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header {
	return self.hc.GetHeader(hash, number)
1328 1329
}

1330
// GetHeaderByHash retrieves a block header from the database by hash, caching it if
1331
// found.
1332 1333
func (self *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
	return self.hc.GetHeaderByHash(hash)
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
}

// HasHeader checks if a block header is present in the database or not, caching
// it if present.
func (bc *BlockChain) HasHeader(hash common.Hash) bool {
	return bc.hc.HasHeader(hash)
}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
// hash, fetching towards the genesis block.
func (self *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
	return self.hc.GetBlockHashesFromHash(hash, max)
}

// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (self *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
	return self.hc.GetHeaderByNumber(number)
}
1353 1354

// Config retrieves the blockchain's chain configuration.
1355
func (self *BlockChain) Config() *params.ChainConfig { return self.config }