blockchain.go 43.9 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package core implements the Ethereum consensus protocol.
O
obscuren 已提交
18
package core
O
obscuren 已提交
19 20

import (
21
	"errors"
O
obscuren 已提交
22
	"fmt"
23
	"io"
24
	"math/big"
25
	mrand "math/rand"
26
	"runtime"
O
obscuren 已提交
27
	"sync"
O
obscuren 已提交
28
	"sync/atomic"
29
	"time"
30

O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33
	"github.com/ethereum/go-ethereum/core/types"
34
	"github.com/ethereum/go-ethereum/core/vm"
35
	"github.com/ethereum/go-ethereum/crypto"
36
	"github.com/ethereum/go-ethereum/ethdb"
O
obscuren 已提交
37
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
38
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
39
	"github.com/ethereum/go-ethereum/logger/glog"
40
	"github.com/ethereum/go-ethereum/metrics"
41
	"github.com/ethereum/go-ethereum/params"
42
	"github.com/ethereum/go-ethereum/pow"
43
	"github.com/ethereum/go-ethereum/rlp"
44
	"github.com/ethereum/go-ethereum/trie"
O
obscuren 已提交
45
	"github.com/hashicorp/golang-lru"
O
obscuren 已提交
46 47
)

48 49 50
var (
	chainlogger = logger.NewLogger("CHAIN")
	jsonlogger  = logger.NewJsonLogger()
51

52
	blockInsertTimer = metrics.NewTimer("chain/inserts")
53 54

	ErrNoGenesis = errors.New("Genesis not found in chain")
55
)
Z
zelig 已提交
56

O
obscuren 已提交
57
const (
58
	bodyCacheLimit      = 256
O
obscuren 已提交
59
	blockCacheLimit     = 256
O
wip  
obscuren 已提交
60 61
	maxFutureBlocks     = 256
	maxTimeFutureBlocks = 30
62 63 64
	// must be bumped when consensus algorithm is changed, this forces the upgradedb
	// command to be run (forces the blocks to be imported again using the new algorithm)
	BlockChainVersion = 3
O
obscuren 已提交
65
)
66

67 68 69 70 71 72 73 74 75 76 77 78 79 80
// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
// Importing blocks in to the block chain happens according to the set of rules
// defined by the two stage Validator. Processing of blocks is done using the
// Processor which processes the included transaction. The validation of the state
// is done in the second part of the Validator. Failing results in aborting of
// the import.
//
// The BlockChain also helps in returning blocks from **any** chain included
// in the database as well as blocks that represents the canonical chain. It's
// important to note that GetBlock can return any block and does not need to be
// included in the canonical one where as GetBlockByNumber always represents the
// canonical chain.
81
type BlockChain struct {
82
	config *params.ChainConfig // chain & network configuration
83

84
	hc           *HeaderChain
85
	chainDb      ethdb.Database
O
obscuren 已提交
86
	eventMux     *event.TypeMux
87
	genesisBlock *types.Block
88

89 90 91
	mu      sync.RWMutex // global mutex for locking chain operations
	chainmu sync.RWMutex // blockchain insertion lock
	procmu  sync.RWMutex // block processor lock
92

93 94 95
	checkpoint       int          // checkpoint counts towards the new checkpoint
	currentBlock     *types.Block // Current head of the block chain
	currentFastBlock *types.Block // Current head of the fast-sync chain (may be above the block chain!)
O
obscuren 已提交
96

97 98 99 100 101
	stateCache   *state.StateDB // State database to reuse between imports (contains state cache)
	bodyCache    *lru.Cache     // Cache for the most recent block bodies
	bodyRLPCache *lru.Cache     // Cache for the most recent block bodies in RLP encoded format
	blockCache   *lru.Cache     // Cache for the most recent entire blocks
	futureBlocks *lru.Cache     // future blocks are blocks added for later processing
102

103
	quit    chan struct{} // blockchain quit channel
L
Leif Jurvetson 已提交
104
	running int32         // running must be called atomically
O
obscuren 已提交
105
	// procInterrupt must be atomically called
106 107
	procInterrupt int32          // interrupt signaler for block processing
	wg            sync.WaitGroup // chain processing wait group for shutting down
108

109
	pow       pow.PoW
110 111
	processor Processor // block processor interface
	validator Validator // block and state validator interface
O
obscuren 已提交
112
}
O
obscuren 已提交
113

114 115 116
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialiser the default Ethereum Validator and
// Processor.
117
func NewBlockChain(chainDb ethdb.Database, config *params.ChainConfig, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
118 119 120 121 122
	bodyCache, _ := lru.New(bodyCacheLimit)
	bodyRLPCache, _ := lru.New(bodyCacheLimit)
	blockCache, _ := lru.New(blockCacheLimit)
	futureBlocks, _ := lru.New(maxFutureBlocks)

123
	bc := &BlockChain{
124
		config:       config,
125 126 127 128 129 130 131 132
		chainDb:      chainDb,
		eventMux:     mux,
		quit:         make(chan struct{}),
		bodyCache:    bodyCache,
		bodyRLPCache: bodyRLPCache,
		blockCache:   blockCache,
		futureBlocks: futureBlocks,
		pow:          pow,
O
obscuren 已提交
133
	}
134 135
	bc.SetValidator(NewBlockValidator(config, bc, pow))
	bc.SetProcessor(NewStateProcessor(config, bc))
136 137 138

	gv := func() HeaderValidator { return bc.Validator() }
	var err error
139
	bc.hc, err = NewHeaderChain(chainDb, config, gv, bc.getProcInterrupt)
140 141 142
	if err != nil {
		return nil, err
	}
143 144 145

	bc.genesisBlock = bc.GetBlockByNumber(0)
	if bc.genesisBlock == nil {
146
		return nil, ErrNoGenesis
147
	}
148

149
	if err := bc.loadLastState(); err != nil {
150
		return nil, err
O
obscuren 已提交
151
	}
152
	// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
153
	for hash, _ := range BadHashes {
154
		if header := bc.GetHeaderByHash(hash); header != nil {
155 156 157
			glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4])
			bc.SetHead(header.Number.Uint64() - 1)
			glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation")
158 159
		}
	}
160
	// Take ownership of this particular state
161
	go bc.update()
O
obscuren 已提交
162
	return bc, nil
163 164
}

165 166 167 168
func (self *BlockChain) getProcInterrupt() bool {
	return atomic.LoadInt32(&self.procInterrupt) == 1
}

169 170 171 172 173 174 175 176 177
// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (self *BlockChain) loadLastState() error {
	// Restore the last known head block
	head := GetHeadBlockHash(self.chainDb)
	if head == (common.Hash{}) {
		// Corrupt or empty database, init from scratch
		self.Reset()
	} else {
178
		if block := self.GetBlockByHash(head); block != nil {
179 180 181 182 183 184 185 186
			// Block found, set as the current head
			self.currentBlock = block
		} else {
			// Corrupt or empty database, init from scratch
			self.Reset()
		}
	}
	// Restore the last known head header
187
	currentHeader := self.currentBlock.Header()
188
	if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) {
189
		if header := self.GetHeaderByHash(head); header != nil {
190
			currentHeader = header
191 192
		}
	}
193
	self.hc.SetCurrentHeader(currentHeader)
194 195 196
	// Restore the last known head fast block
	self.currentFastBlock = self.currentBlock
	if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) {
197
		if block := self.GetBlockByHash(head); block != nil {
198 199 200
			self.currentFastBlock = block
		}
	}
201 202 203 204 205 206 207 208 209
	// Initialize a statedb cache to ensure singleton account bloom filter generation
	statedb, err := state.New(self.currentBlock.Root(), self.chainDb)
	if err != nil {
		return err
	}
	self.stateCache = statedb
	self.stateCache.GetAccount(common.Address{})

	// Issue a status log for the user
210 211 212
	headerTd := self.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
	blockTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	fastTd := self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64())
213

214
	glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", currentHeader.Number, currentHeader.Hash().Bytes()[:4], headerTd)
215
	glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
216
	glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
217 218 219 220

	return nil
}

221 222 223 224
// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
225
func (bc *BlockChain) SetHead(head uint64) {
226 227 228
	bc.mu.Lock()
	defer bc.mu.Unlock()

229 230
	delFn := func(hash common.Hash, num uint64) {
		DeleteBody(bc.chainDb, hash, num)
231
	}
232 233
	bc.hc.SetHead(head, delFn)

234
	// Clear out any stale content from the caches
235 236 237 238
	bc.bodyCache.Purge()
	bc.bodyRLPCache.Purge()
	bc.blockCache.Purge()
	bc.futureBlocks.Purge()
239

240
	// Update all computed fields to the new head
241 242 243
	currentHeader := bc.hc.CurrentHeader()
	if bc.currentBlock != nil && currentHeader.Number.Uint64() < bc.currentBlock.NumberU64() {
		bc.currentBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
244
	}
245 246
	if bc.currentFastBlock != nil && currentHeader.Number.Uint64() < bc.currentFastBlock.NumberU64() {
		bc.currentFastBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
247 248
	}

249 250 251
	if bc.currentBlock == nil {
		bc.currentBlock = bc.genesisBlock
	}
252 253 254
	if bc.currentFastBlock == nil {
		bc.currentFastBlock = bc.genesisBlock
	}
255

256 257 258 259 260 261
	if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head block hash: %v", err)
	}
	if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head fast block hash: %v", err)
	}
262
	bc.loadLastState()
263 264
}

265 266 267
// FastSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
268
	// Make sure that both the block as well at its state trie exists
269
	block := self.GetBlockByHash(hash)
270 271 272
	if block == nil {
		return fmt.Errorf("non existent block [%x…]", hash[:4])
	}
273
	if _, err := trie.NewSecure(block.Root(), self.chainDb, 0); err != nil {
274 275 276 277 278 279 280 281 282 283 284
		return err
	}
	// If all checks out, manually set the head block
	self.mu.Lock()
	self.currentBlock = block
	self.mu.Unlock()

	glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4])
	return nil
}

285
// GasLimit returns the gas limit of the current HEAD block.
286
func (self *BlockChain) GasLimit() *big.Int {
O
obscuren 已提交
287 288
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
289

290
	return self.currentBlock.GasLimit()
O
obscuren 已提交
291 292
}

293
// LastBlockHash return the hash of the HEAD block.
294
func (self *BlockChain) LastBlockHash() common.Hash {
295 296 297
	self.mu.RLock()
	defer self.mu.RUnlock()

298
	return self.currentBlock.Hash()
299 300
}

301
// CurrentBlock retrieves the current head block of the canonical chain. The
302
// block is retrieved from the blockchain's internal cache.
303
func (self *BlockChain) CurrentBlock() *types.Block {
O
obscuren 已提交
304 305 306 307
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentBlock
O
obscuren 已提交
308 309
}

310
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
311
// chain. The block is retrieved from the blockchain's internal cache.
312 313 314 315 316 317 318
func (self *BlockChain) CurrentFastBlock() *types.Block {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentFastBlock
}

319 320
// Status returns status information about the current chain such as the HEAD Td,
// the HEAD hash and the hash of the genesis block.
321
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
O
obscuren 已提交
322 323 324
	self.mu.RLock()
	defer self.mu.RUnlock()

325
	return self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64()), self.currentBlock.Hash(), self.genesisBlock.Hash()
326 327
}

328 329 330 331 332 333 334 335 336 337 338 339
// SetProcessor sets the processor required for making state modifications.
func (self *BlockChain) SetProcessor(processor Processor) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.processor = processor
}

// SetValidator sets the validator which is used to validate incoming blocks.
func (self *BlockChain) SetValidator(validator Validator) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.validator = validator
340 341
}

342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
// Validator returns the current validator.
func (self *BlockChain) Validator() Validator {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.validator
}

// Processor returns the current processor.
func (self *BlockChain) Processor() Processor {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.processor
}

// AuxValidator returns the auxiliary validator (Proof of work atm)
func (self *BlockChain) AuxValidator() pow.PoW { return self.pow }

// State returns a new mutable state based on the current HEAD block.
360
func (self *BlockChain) State() (*state.StateDB, error) {
361 362 363 364 365 366
	return self.StateAt(self.CurrentBlock().Root())
}

// StateAt returns a new mutable state based on a particular point in time.
func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
	return self.stateCache.New(root)
O
obscuren 已提交
367 368
}

369
// Reset purges the entire blockchain, restoring it to its genesis state.
370
func (bc *BlockChain) Reset() {
371
	bc.ResetWithGenesisBlock(bc.genesisBlock)
372 373
}

374 375
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
376
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
377 378 379
	// Dump the entire block chain and purge the caches
	bc.SetHead(0)

380 381 382
	bc.mu.Lock()
	defer bc.mu.Unlock()

383
	// Prepare the genesis block and reinitialise the chain
384
	if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
385 386 387
		glog.Fatalf("failed to write genesis block TD: %v", err)
	}
	if err := WriteBlock(bc.chainDb, genesis); err != nil {
388
		glog.Fatalf("failed to write genesis block: %v", err)
389
	}
390
	bc.genesisBlock = genesis
391 392
	bc.insert(bc.genesisBlock)
	bc.currentBlock = bc.genesisBlock
393 394
	bc.hc.SetGenesis(bc.genesisBlock.Header())
	bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
395
	bc.currentFastBlock = bc.genesisBlock
396 397
}

398
// Export writes the active chain to the given writer.
399
func (self *BlockChain) Export(w io.Writer) error {
400
	if err := self.ExportN(w, uint64(0), self.currentBlock.NumberU64()); err != nil {
T
Taylor Gerring 已提交
401 402 403 404 405 406
		return err
	}
	return nil
}

// ExportN writes a subset of the active chain to the given writer.
407
func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
O
obscuren 已提交
408 409
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
410

T
Taylor Gerring 已提交
411 412 413 414
	if first > last {
		return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
	}

T
Cleanup  
Taylor Gerring 已提交
415
	glog.V(logger.Info).Infof("exporting %d blocks...\n", last-first+1)
416

T
Taylor Gerring 已提交
417
	for nr := first; nr <= last; nr++ {
418 419 420 421 422 423
		block := self.GetBlockByNumber(nr)
		if block == nil {
			return fmt.Errorf("export failed on #%d: not found", nr)
		}

		if err := block.EncodeRLP(w); err != nil {
424 425
			return err
		}
O
obscuren 已提交
426
	}
427

428
	return nil
O
obscuren 已提交
429 430
}

431 432
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
433 434
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
435 436
//
// Note, this function assumes that the `mu` mutex is held!
437
func (bc *BlockChain) insert(block *types.Block) {
438 439 440
	// If the block is on a side chain or an unknown one, force other heads onto it too
	updateHeads := GetCanonicalHash(bc.chainDb, block.NumberU64()) != block.Hash()

441 442 443
	// Add the block to the canonical chain number scheme and mark as the head
	if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil {
		glog.Fatalf("failed to insert block number: %v", err)
444
	}
445
	if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil {
446
		glog.Fatalf("failed to insert head block hash: %v", err)
447
	}
448
	bc.currentBlock = block
449 450 451

	// If the block is better than out head or is on a different chain, force update heads
	if updateHeads {
452
		bc.hc.SetCurrentHeader(block.Header())
453 454 455 456 457 458

		if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil {
			glog.Fatalf("failed to insert head fast block hash: %v", err)
		}
		bc.currentFastBlock = block
	}
459 460
}

O
obscuren 已提交
461
// Accessors
462
func (bc *BlockChain) Genesis() *types.Block {
O
obscuren 已提交
463 464 465
	return bc.genesisBlock
}

466 467
// GetBody retrieves a block body (transactions and uncles) from the database by
// hash, caching it if found.
468
func (self *BlockChain) GetBody(hash common.Hash) *types.Body {
469 470
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyCache.Get(hash); ok {
471 472
		body := cached.(*types.Body)
		return body
O
obscuren 已提交
473
	}
474
	body := GetBody(self.chainDb, hash, self.hc.GetBlockNumber(hash))
475 476
	if body == nil {
		return nil
477 478
	}
	// Cache the found body for next time and return
479 480
	self.bodyCache.Add(hash, body)
	return body
481
}
O
obscuren 已提交
482

483 484
// GetBodyRLP retrieves a block body in RLP encoding from the database by hash,
// caching it if found.
485
func (self *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
486 487
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyRLPCache.Get(hash); ok {
488
		return cached.(rlp.RawValue)
489
	}
490
	body := GetBodyRLP(self.chainDb, hash, self.hc.GetBlockNumber(hash))
491
	if len(body) == 0 {
492
		return nil
O
obscuren 已提交
493
	}
494 495 496 497
	// Cache the found body for next time and return
	self.bodyRLPCache.Add(hash, body)
	return body
}
O
obscuren 已提交
498

499 500
// HasBlock checks if a block is fully present in the database or not, caching
// it if present.
501
func (bc *BlockChain) HasBlock(hash common.Hash) bool {
502
	return bc.GetBlockByHash(hash) != nil
O
obscuren 已提交
503 504
}

505 506 507 508
// HasBlockAndState checks if a block and associated state trie is fully present
// in the database or not, caching it if present.
func (bc *BlockChain) HasBlockAndState(hash common.Hash) bool {
	// Check first that the block itself is known
509
	block := bc.GetBlockByHash(hash)
510 511 512 513 514 515 516 517
	if block == nil {
		return false
	}
	// Ensure the associated state is also present
	_, err := state.New(block.Root(), bc.chainDb)
	return err == nil
}

518 519 520
// GetBlock retrieves a block from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
521 522
	// Short circuit if the block's already in the cache, retrieve otherwise
	if block, ok := self.blockCache.Get(hash); ok {
523 524
		return block.(*types.Block)
	}
525
	block := GetBlock(self.chainDb, hash, number)
526
	if block == nil {
O
obscuren 已提交
527 528
		return nil
	}
529 530 531
	// Cache the found block for next time and return
	self.blockCache.Add(block.Hash(), block)
	return block
O
obscuren 已提交
532 533
}

534 535 536 537 538
// GetBlockByHash retrieves a block from the database by hash, caching it if found.
func (self *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
	return self.GetBlock(hash, self.hc.GetBlockNumber(hash))
}

539 540
// GetBlockByNumber retrieves a block from the database by number, caching it
// (associated with its hash) if found.
541
func (self *BlockChain) GetBlockByNumber(number uint64) *types.Block {
542
	hash := GetCanonicalHash(self.chainDb, number)
543 544 545
	if hash == (common.Hash{}) {
		return nil
	}
546
	return self.GetBlock(hash, number)
547
}
548

549
// [deprecated by eth/62]
F
Felix Lange 已提交
550
// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
551
func (self *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
552
	number := self.hc.GetBlockNumber(hash)
F
Felix Lange 已提交
553
	for i := 0; i < n; i++ {
554
		block := self.GetBlock(hash, number)
F
Felix Lange 已提交
555 556 557 558 559
		if block == nil {
			break
		}
		blocks = append(blocks, block)
		hash = block.ParentHash()
560
		number--
F
Felix Lange 已提交
561 562 563 564
	}
	return
}

565 566 567 568
// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
	uncles := []*types.Header{}
569 570
	for i := 0; block != nil && i < length; i++ {
		uncles = append(uncles, block.Uncles()...)
571
		block = self.GetBlock(block.ParentHash(), block.NumberU64()-1)
572
	}
573
	return uncles
O
obscuren 已提交
574
}
O
obscuren 已提交
575

576 577
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
578
func (bc *BlockChain) Stop() {
579 580 581
	if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
		return
	}
582
	close(bc.quit)
O
obscuren 已提交
583
	atomic.StoreInt32(&bc.procInterrupt, 1)
584 585 586 587

	bc.wg.Wait()

	glog.V(logger.Info).Infoln("Chain manager stopped")
588 589
}

590
func (self *BlockChain) procFutureBlocks() {
591 592 593 594 595
	blocks := make([]*types.Block, 0, self.futureBlocks.Len())
	for _, hash := range self.futureBlocks.Keys() {
		if block, exist := self.futureBlocks.Get(hash); exist {
			blocks = append(blocks, block.(*types.Block))
		}
596
	}
597 598 599 600
	if len(blocks) > 0 {
		types.BlockBy(types.Number).Sort(blocks)
		self.InsertChain(blocks)
	}
601 602
}

603
type WriteStatus byte
604 605

const (
606
	NonStatTy WriteStatus = iota
607 608 609
	CanonStatTy
	SplitStatTy
	SideStatTy
610 611
)

612 613 614
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (self *BlockChain) Rollback(chain []common.Hash) {
615 616 617
	self.mu.Lock()
	defer self.mu.Unlock()

618 619 620
	for i := len(chain) - 1; i >= 0; i-- {
		hash := chain[i]

621 622 623
		currentHeader := self.hc.CurrentHeader()
		if currentHeader.Hash() == hash {
			self.hc.SetCurrentHeader(self.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
624 625
		}
		if self.currentFastBlock.Hash() == hash {
626
			self.currentFastBlock = self.GetBlock(self.currentFastBlock.ParentHash(), self.currentFastBlock.NumberU64()-1)
627 628 629
			WriteHeadFastBlockHash(self.chainDb, self.currentFastBlock.Hash())
		}
		if self.currentBlock.Hash() == hash {
630
			self.currentBlock = self.GetBlock(self.currentBlock.ParentHash(), self.currentBlock.NumberU64()-1)
631 632 633 634 635
			WriteHeadBlockHash(self.chainDb, self.currentBlock.Hash())
		}
	}
}

636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
// SetReceiptsData computes all the non-consensus fields of the receipts
func SetReceiptsData(block *types.Block, receipts types.Receipts) {
	transactions, logIndex := block.Transactions(), uint(0)

	for j := 0; j < len(receipts); j++ {
		// The transaction hash can be retrieved from the transaction itself
		receipts[j].TxHash = transactions[j].Hash()

		// The contract address can be derived from the transaction itself
		if MessageCreatesContract(transactions[j]) {
			from, _ := transactions[j].From()
			receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce())
		}
		// The used gas can be calculated based on previous receipts
		if j == 0 {
			receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed)
		} else {
			receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed)
		}
		// The derived log fields can simply be set from the block and transaction
		for k := 0; k < len(receipts[j].Logs); k++ {
			receipts[j].Logs[k].BlockNumber = block.NumberU64()
			receipts[j].Logs[k].BlockHash = block.Hash()
			receipts[j].Logs[k].TxHash = receipts[j].TxHash
			receipts[j].Logs[k].TxIndex = uint(j)
			receipts[j].Logs[k].Index = logIndex
			logIndex++
		}
	}
}

667 668 669 670 671 672 673
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
	self.wg.Add(1)
	defer self.wg.Done()

	// Collect some import statistics to report on
674
	stats := struct{ processed, ignored int32 }{}
675 676
	start := time.Now()

677 678
	// Create the block importing task queue and worker functions
	tasks := make(chan int, len(blockChain))
679
	for i := 0; i < len(blockChain) && i < len(receiptChain); i++ {
680 681 682
		tasks <- i
	}
	close(tasks)
683

684 685 686 687 688 689 690 691
	errs, failed := make([]error, len(tasks)), int32(0)
	process := func(worker int) {
		for index := range tasks {
			block, receipts := blockChain[index], receiptChain[index]

			// Short circuit insertion if shutting down or processing failed
			if atomic.LoadInt32(&self.procInterrupt) == 1 {
				return
692
			}
693 694
			if atomic.LoadInt32(&failed) > 0 {
				return
695
			}
696 697 698 699 700
			// Short circuit if the owner header is unknown
			if !self.HasHeader(block.Hash()) {
				errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
				atomic.AddInt32(&failed, 1)
				return
701
			}
702 703 704 705 706 707
			// Skip if the entire data is already known
			if self.HasBlock(block.Hash()) {
				atomic.AddInt32(&stats.ignored, 1)
				continue
			}
			// Compute all the non-consensus fields of the receipts
708
			SetReceiptsData(block, receipts)
709
			// Write all the data out into the database
710
			if err := WriteBody(self.chainDb, block.Hash(), block.NumberU64(), block.Body()); err != nil {
711 712 713 714 715
				errs[index] = fmt.Errorf("failed to write block body: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
716
			if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
717 718 719 720 721
				errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
722 723 724 725 726 727
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
728 729 730 731 732 733 734 735 736 737 738 739
			if err := WriteTransactions(self.chainDb, block); err != nil {
				errs[index] = fmt.Errorf("failed to write individual transactions: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write individual receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
740
			atomic.AddInt32(&stats.processed, 1)
741
		}
742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758
	}
	// Start as many worker threads as goroutines allowed
	pending := new(sync.WaitGroup)
	for i := 0; i < runtime.GOMAXPROCS(0); i++ {
		pending.Add(1)
		go func(id int) {
			defer pending.Done()
			process(id)
		}(i)
	}
	pending.Wait()

	// If anything failed, report
	if failed > 0 {
		for i, err := range errs {
			if err != nil {
				return i, err
759 760 761
			}
		}
	}
762 763 764 765
	if atomic.LoadInt32(&self.procInterrupt) == 1 {
		glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
		return 0, nil
	}
766 767 768
	// Update the head fast sync block if better
	self.mu.Lock()
	head := blockChain[len(errs)-1]
769
	if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(self.GetTd(head.Hash(), head.NumberU64())) < 0 {
770 771 772 773 774 775 776
		if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil {
			glog.Fatalf("failed to update head fast block hash: %v", err)
		}
		self.currentFastBlock = head
	}
	self.mu.Unlock()

777 778
	// Report some public statistics so the user has a clue what's going on
	first, last := blockChain[0], blockChain[len(blockChain)-1]
779 780 781 782 783

	ignored := ""
	if stats.ignored > 0 {
		ignored = fmt.Sprintf(" (%d ignored)", stats.ignored)
	}
784
	glog.V(logger.Info).Infof("imported %d receipts in %9v. #%d [%x… / %x…]%s", stats.processed, common.PrettyDuration(time.Since(start)), last.Number(), first.Hash().Bytes()[:4], last.Hash().Bytes()[:4], ignored)
785 786 787 788

	return 0, nil
}

789
// WriteBlock writes the block to the chain.
790
func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err error) {
791 792 793
	self.wg.Add(1)
	defer self.wg.Done()

794
	// Calculate the total difficulty of the block
795
	ptd := self.GetTd(block.ParentHash(), block.NumberU64()-1)
796 797 798
	if ptd == nil {
		return NonStatTy, ParentError(block.ParentHash())
	}
799 800 801
	// Make sure no inconsistent state is leaked during insertion
	self.mu.Lock()
	defer self.mu.Unlock()
802

803 804 805
	localTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	externTd := new(big.Int).Add(block.Difficulty(), ptd)

806 807 808 809 810 811 812 813
	// Irrelevant of the canonical status, write the block itself to the database
	if err := self.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
		glog.Fatalf("failed to write block total difficulty: %v", err)
	}
	if err := WriteBlock(self.chainDb, block); err != nil {
		glog.Fatalf("failed to write block contents: %v", err)
	}

814
	// If the total difficulty is higher than our known, add it to the canonical chain
815 816 817
	// Second clause in the if statement reduces the vulnerability to selfish mining.
	// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
	if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
818
		// Reorganise the chain if the parent is not the head block
819 820
		if block.ParentHash() != self.currentBlock.Hash() {
			if err := self.reorg(self.currentBlock, block); err != nil {
821
				return NonStatTy, err
822 823
			}
		}
824
		self.insert(block) // Insert the block as the new head of the chain
825
		status = CanonStatTy
826
	} else {
827
		status = SideStatTy
828
	}
829

830
	self.futureBlocks.Remove(block.Hash())
831 832 833 834

	return
}

835 836
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
837
func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
838 839 840
	self.wg.Add(1)
	defer self.wg.Done()

O
obscuren 已提交
841 842 843
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

F
Felix Lange 已提交
844 845 846
	// A queued approach to delivering events. This is generally
	// faster than direct delivery and requires much less mutex
	// acquiring.
O
obscuren 已提交
847
	var (
F
Felix Lange 已提交
848
		stats         = insertStats{startTime: time.Now()}
849 850
		events        = make([]interface{}, 0, len(chain))
		coalescedLogs vm.Logs
F
Felix Lange 已提交
851
		nonceChecked  = make([]bool, len(chain))
O
obscuren 已提交
852
	)
853

F
Felix Lange 已提交
854
	// Start the parallel nonce verifier.
855 856
	nonceAbort, nonceResults := verifyNoncesFromBlocks(self.pow, chain)
	defer close(nonceAbort)
857

858
	for i, block := range chain {
O
obscuren 已提交
859
		if atomic.LoadInt32(&self.procInterrupt) == 1 {
860
			glog.V(logger.Debug).Infoln("Premature abort during block chain processing")
O
obscuren 已提交
861 862
			break
		}
863

O
obscuren 已提交
864 865 866 867
		bstart := time.Now()
		// Wait for block i's nonce to be verified before processing
		// its state transition.
		for !nonceChecked[i] {
868 869
			r := <-nonceResults
			nonceChecked[r.index] = true
O
obscuren 已提交
870
			if !r.valid {
871 872
				block := chain[r.index]
				return r.index, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
O
obscuren 已提交
873
			}
O
obscuren 已提交
874
		}
O
obscuren 已提交
875

O
obscuren 已提交
876
		if BadHashes[block.Hash()] {
877
			err := BadHashError(block.Hash())
878
			reportBlock(block, err)
O
obscuren 已提交
879 880
			return i, err
		}
881 882 883
		// Stage 1 validation of the block using the chain's validator
		// interface.
		err := self.Validator().ValidateBlock(block)
O
obscuren 已提交
884 885 886 887 888
		if err != nil {
			if IsKnownBlockErr(err) {
				stats.ignored++
				continue
			}
889

O
obscuren 已提交
890 891 892 893
			if err == BlockFutureErr {
				// Allow up to MaxFuture second in the future blocks. If this limit
				// is exceeded the chain is discarded and processed at a later time
				// if given.
894 895
				max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
				if block.Time().Cmp(max) == 1 {
O
obscuren 已提交
896
					return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
897
				}
O
obscuren 已提交
898

899
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
900 901 902
				stats.queued++
				continue
			}
O
obscuren 已提交
903

904 905
			if IsParentErr(err) && self.futureBlocks.Contains(block.ParentHash()) {
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
906 907
				stats.queued++
				continue
908
			}
909

910
			reportBlock(block, err)
O
obscuren 已提交
911

912 913
			return i, err
		}
914

915 916
		// Create a new statedb using the parent block and report an
		// error if it fails.
917 918 919 920 921
		switch {
		case i == 0:
			err = self.stateCache.Reset(self.GetBlock(block.ParentHash(), block.NumberU64()-1).Root())
		default:
			err = self.stateCache.Reset(chain[i-1].Root())
922
		}
923 924
		if err != nil {
			reportBlock(block, err)
O
obscuren 已提交
925 926
			return i, err
		}
927
		// Process block using the parent state as reference point.
928
		receipts, logs, usedGas, err := self.processor.Process(block, self.stateCache, vm.Config{})
929 930 931 932 933
		if err != nil {
			reportBlock(block, err)
			return i, err
		}
		// Validate the state using the default validator
934
		err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash(), block.NumberU64()-1), self.stateCache, receipts, usedGas)
935 936 937 938 939
		if err != nil {
			reportBlock(block, err)
			return i, err
		}
		// Write state changes to database
940
		_, err = self.stateCache.Commit(self.config.IsEIP158(block.Number()))
941 942 943 944 945 946 947
		if err != nil {
			return i, err
		}

		// coalesce logs for later processing
		coalescedLogs = append(coalescedLogs, logs...)

948
		if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
949
			return i, err
950
		}
O
obscuren 已提交
951

952
		// write the block to the chain and get the status
953
		status, err := self.WriteBlock(block)
954 955 956
		if err != nil {
			return i, err
		}
957

958
		switch status {
959
		case CanonStatTy:
O
obscuren 已提交
960
			if glog.V(logger.Debug) {
961
				glog.Infof("inserted block #%d [%x…] in %9v: %3d txs %7v gas %d uncles.", block.Number(), block.Hash().Bytes()[0:4], common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), block.GasUsed(), len(block.Uncles()))
O
obscuren 已提交
962
			}
963
			blockInsertTimer.UpdateSince(bstart)
964
			events = append(events, ChainEvent{block, block.Hash(), logs})
965 966

			// This puts transactions in a extra db for rpc
967
			if err := WriteTransactions(self.chainDb, block); err != nil {
968 969
				return i, err
			}
970
			// store the receipts
971
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
972 973 974 975 976 977
				return i, err
			}
			// Write map map bloom filters
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				return i, err
			}
978
		case SideStatTy:
O
obscuren 已提交
979
			if glog.V(logger.Detail) {
980
				glog.Infof("inserted forked block #%d [%x…] (TD=%v) in %9v: %3d txs %d uncles.", block.Number(), block.Hash().Bytes()[0:4], block.Difficulty(), common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), len(block.Uncles()))
O
obscuren 已提交
981
			}
982
			blockInsertTimer.UpdateSince(bstart)
983 984
			events = append(events, ChainSideEvent{block, logs})

985
		case SplitStatTy:
986
			events = append(events, ChainSplitEvent{block, logs})
987
		}
F
Felix Lange 已提交
988

O
obscuren 已提交
989
		stats.processed++
F
Felix Lange 已提交
990
		if glog.V(logger.Info) {
991
			stats.usedGas += usedGas.Uint64()
F
Felix Lange 已提交
992 993
			stats.report(chain, i)
		}
O
obscuren 已提交
994 995
	}

996
	go self.postChainEvents(events, coalescedLogs)
997

998
	return 0, nil
O
obscuren 已提交
999
}
1000

F
Felix Lange 已提交
1001 1002 1003
// insertStats tracks and reports on block insertion.
type insertStats struct {
	queued, processed, ignored int
1004
	usedGas                    uint64
F
Felix Lange 已提交
1005 1006 1007 1008
	lastIndex                  int
	startTime                  time.Time
}

1009 1010 1011
// statsReportLimit is the time limit during import after which we always print
// out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
F
Felix Lange 已提交
1012 1013 1014 1015

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int) {
1016
	// Fetch the timings for the batch
1017 1018 1019 1020
	var (
		now     = time.Now()
		elapsed = now.Sub(st.startTime)
	)
1021 1022 1023 1024
	if elapsed == 0 { // Yes Windows, I'm looking at you
		elapsed = 1
	}
	// If we're at the last block of the batch or report period reached, log
1025
	if index == len(chain)-1 || elapsed >= statsReportLimit {
F
Felix Lange 已提交
1026
		start, end := chain[st.lastIndex], chain[index]
J
Jeffrey Wilcke 已提交
1027
		txcount := countTransactions(chain[st.lastIndex : index+1])
1028 1029 1030 1031 1032

		extra := ""
		if st.queued > 0 || st.ignored > 0 {
			extra = fmt.Sprintf(" (%d queued %d ignored)", st.queued, st.ignored)
		}
1033 1034 1035 1036 1037 1038 1039
		hashes := ""
		if st.processed > 1 {
			hashes = fmt.Sprintf("%x… / %x…", start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
		} else {
			hashes = fmt.Sprintf("%x…", end.Hash().Bytes()[:4])
		}
		glog.Infof("imported %d blocks, %5d txs (%7.3f Mg) in %9v (%6.3f Mg/s). #%v [%s]%s", st.processed, txcount, float64(st.usedGas)/1000000, common.PrettyDuration(elapsed), float64(st.usedGas)*1000/float64(elapsed), end.Number(), hashes, extra)
1040

F
Felix Lange 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
		*st = insertStats{startTime: now, lastIndex: index}
	}
}

func countTransactions(chain []*types.Block) (c int) {
	for _, b := range chain {
		c += len(b.Transactions())
	}
	return c
}

1052 1053 1054
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
1055
func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
1056
	var (
1057 1058 1059 1060 1061 1062 1063 1064
		newChain          types.Blocks
		oldChain          types.Blocks
		commonBlock       *types.Block
		oldStart          = oldBlock
		newStart          = newBlock
		deletedTxs        types.Transactions
		deletedLogs       vm.Logs
		deletedLogsByHash = make(map[common.Hash]vm.Logs)
J
Jeffrey Wilcke 已提交
1065 1066 1067 1068 1069
		// collectLogs collects the logs that were generated during the
		// processing of the block that corresponds with the given hash.
		// These logs are later announced as deleted.
		collectLogs = func(h common.Hash) {
			// Coalesce logs
1070
			receipts := GetBlockReceipts(self.chainDb, h, self.hc.GetBlockNumber(h))
J
Jeffrey Wilcke 已提交
1071 1072
			for _, receipt := range receipts {
				deletedLogs = append(deletedLogs, receipt.Logs...)
1073 1074

				deletedLogsByHash[h] = receipt.Logs
J
Jeffrey Wilcke 已提交
1075 1076
			}
		}
1077
	)
1078 1079 1080 1081

	// first reduce whoever is higher bound
	if oldBlock.NumberU64() > newBlock.NumberU64() {
		// reduce old chain
1082
		for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
1083
			oldChain = append(oldChain, oldBlock)
1084
			deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1085 1086

			collectLogs(oldBlock.Hash())
1087 1088 1089
		}
	} else {
		// reduce new chain and append new chain blocks for inserting later on
1090
		for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
1091 1092
			newChain = append(newChain, newBlock)
		}
O
obscuren 已提交
1093
	}
O
obscuren 已提交
1094
	if oldBlock == nil {
1095
		return fmt.Errorf("Invalid old chain")
O
obscuren 已提交
1096 1097
	}
	if newBlock == nil {
1098
		return fmt.Errorf("Invalid new chain")
O
obscuren 已提交
1099
	}
O
obscuren 已提交
1100

1101
	numSplit := newBlock.Number()
1102 1103
	for {
		if oldBlock.Hash() == newBlock.Hash() {
1104
			commonBlock = oldBlock
1105 1106
			break
		}
1107 1108

		oldChain = append(oldChain, oldBlock)
1109
		newChain = append(newChain, newBlock)
1110
		deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1111
		collectLogs(oldBlock.Hash())
O
obscuren 已提交
1112

1113
		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
1114
		if oldBlock == nil {
1115
			return fmt.Errorf("Invalid old chain")
1116 1117
		}
		if newBlock == nil {
1118
			return fmt.Errorf("Invalid new chain")
1119
		}
1120 1121
	}

1122
	if glog.V(logger.Debug) {
1123
		commonHash := commonBlock.Hash()
1124
		glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
1125 1126
	}

1127
	var addedTxs types.Transactions
1128
	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
1129
	for _, block := range newChain {
1130
		// insert the block in the canonical way, re-writing history
1131
		self.insert(block)
1132
		// write canonical receipts and transactions
1133
		if err := WriteTransactions(self.chainDb, block); err != nil {
1134 1135
			return err
		}
1136
		receipts := GetBlockReceipts(self.chainDb, block.Hash(), block.NumberU64())
1137
		// write receipts
1138
		if err := WriteReceipts(self.chainDb, receipts); err != nil {
1139 1140 1141 1142 1143 1144
			return err
		}
		// Write map map bloom filters
		if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
			return err
		}
1145 1146 1147 1148 1149 1150 1151 1152 1153 1154
		addedTxs = append(addedTxs, block.Transactions()...)
	}

	// calculate the difference between deleted and added transactions
	diff := types.TxDifference(deletedTxs, addedTxs)
	// When transactions get deleted from the database that means the
	// receipts that were created in the fork must also be deleted
	for _, tx := range diff {
		DeleteReceipt(self.chainDb, tx.Hash())
		DeleteTransaction(self.chainDb, tx.Hash())
1155
	}
1156 1157
	// Must be posted in a goroutine because of the transaction pool trying
	// to acquire the chain manager lock
J
Jeffrey Wilcke 已提交
1158 1159 1160 1161
	if len(diff) > 0 {
		go self.eventMux.Post(RemovedTransactionEvent{diff})
	}
	if len(deletedLogs) > 0 {
1162
		go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
J
Jeffrey Wilcke 已提交
1163
	}
1164

1165 1166 1167 1168 1169 1170 1171 1172
	if len(oldChain) > 0 {
		go func() {
			for _, block := range oldChain {
				self.eventMux.Post(ChainSideEvent{Block: block, Logs: deletedLogsByHash[block.Hash()]})
			}
		}()
	}

1173
	return nil
1174 1175
}

1176 1177
// postChainEvents iterates over the events generated by a chain insertion and
// posts them into the event mux.
1178 1179 1180
func (self *BlockChain) postChainEvents(events []interface{}, logs vm.Logs) {
	// post event logs for further processing
	self.eventMux.Post(logs)
1181 1182 1183 1184
	for _, event := range events {
		if event, ok := event.(ChainEvent); ok {
			// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
			// and in most cases isn't even necessary.
1185
			if self.LastBlockHash() == event.Hash {
1186 1187 1188 1189 1190 1191 1192 1193
				self.eventMux.Post(ChainHeadEvent{event.Block})
			}
		}
		// Fire the insertion events individually too
		self.eventMux.Post(event)
	}
}

1194
func (self *BlockChain) update() {
O
obscuren 已提交
1195
	futureTimer := time.Tick(5 * time.Second)
1196 1197
	for {
		select {
O
obscuren 已提交
1198
		case <-futureTimer:
1199
			self.procFutureBlocks()
1200
		case <-self.quit:
1201
			return
1202 1203 1204
		}
	}
}
1205

F
Felix Lange 已提交
1206
// reportBlock logs a bad block error.
1207
func reportBlock(block *types.Block, err error) {
1208 1209 1210 1211
	if glog.V(logger.Error) {
		glog.Errorf("Bad block #%v (%s)\n", block.Number(), block.Hash().Hex())
		glog.Errorf("    %v", err)
	}
F
Felix Lange 已提交
1212
}
1213 1214 1215 1216 1217 1218 1219

// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
L
Leif Jurvetson 已提交
1220
// of the header retrieval mechanisms already need to verify nonces, as well as
1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
// because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
	// Make sure only one thread manipulates the chain at once
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

	self.wg.Add(1)
	defer self.wg.Done()

	whFunc := func(header *types.Header) error {
		self.mu.Lock()
		defer self.mu.Unlock()

		_, err := self.hc.WriteHeader(header)
		return err
	}

	return self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
}

// writeHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (self *BlockChain) writeHeader(header *types.Header) error {
	self.wg.Add(1)
	defer self.wg.Done()

	self.mu.Lock()
	defer self.mu.Unlock()

	_, err := self.hc.WriteHeader(header)
	return err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (self *BlockChain) CurrentHeader() *types.Header {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.hc.CurrentHeader()
}

// GetTd retrieves a block's total difficulty in the canonical chain from the
1271 1272 1273 1274 1275 1276
// database by hash and number, caching it if found.
func (self *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
	return self.hc.GetTd(hash, number)
}

// GetTdByHash retrieves a block's total difficulty in the canonical chain from the
1277
// database by hash, caching it if found.
1278 1279 1280 1281 1282 1283 1284 1285
func (self *BlockChain) GetTdByHash(hash common.Hash) *big.Int {
	return self.hc.GetTdByHash(hash)
}

// GetHeader retrieves a block header from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header {
	return self.hc.GetHeader(hash, number)
1286 1287
}

1288
// GetHeaderByHash retrieves a block header from the database by hash, caching it if
1289
// found.
1290 1291
func (self *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
	return self.hc.GetHeaderByHash(hash)
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310
}

// HasHeader checks if a block header is present in the database or not, caching
// it if present.
func (bc *BlockChain) HasHeader(hash common.Hash) bool {
	return bc.hc.HasHeader(hash)
}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
// hash, fetching towards the genesis block.
func (self *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
	return self.hc.GetBlockHashesFromHash(hash, max)
}

// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (self *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
	return self.hc.GetHeaderByNumber(number)
}
1311 1312

// Config retrieves the blockchain's chain configuration.
1313
func (self *BlockChain) Config() *params.ChainConfig { return self.config }