blockchain.go 43.8 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package core implements the Ethereum consensus protocol.
O
obscuren 已提交
18
package core
O
obscuren 已提交
19 20

import (
21
	"errors"
O
obscuren 已提交
22
	"fmt"
23
	"io"
24
	"math/big"
25
	mrand "math/rand"
26
	"runtime"
O
obscuren 已提交
27
	"sync"
O
obscuren 已提交
28
	"sync/atomic"
29
	"time"
30

O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33
	"github.com/ethereum/go-ethereum/core/types"
34
	"github.com/ethereum/go-ethereum/core/vm"
35
	"github.com/ethereum/go-ethereum/crypto"
36
	"github.com/ethereum/go-ethereum/ethdb"
O
obscuren 已提交
37
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
38
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
39
	"github.com/ethereum/go-ethereum/logger/glog"
40
	"github.com/ethereum/go-ethereum/metrics"
41
	"github.com/ethereum/go-ethereum/pow"
42
	"github.com/ethereum/go-ethereum/rlp"
43
	"github.com/ethereum/go-ethereum/trie"
O
obscuren 已提交
44
	"github.com/hashicorp/golang-lru"
O
obscuren 已提交
45 46
)

47 48 49
var (
	chainlogger = logger.NewLogger("CHAIN")
	jsonlogger  = logger.NewJsonLogger()
50

51
	blockInsertTimer = metrics.NewTimer("chain/inserts")
52 53

	ErrNoGenesis = errors.New("Genesis not found in chain")
54
)
Z
zelig 已提交
55

O
obscuren 已提交
56
const (
57
	bodyCacheLimit      = 256
O
obscuren 已提交
58
	blockCacheLimit     = 256
O
wip  
obscuren 已提交
59 60
	maxFutureBlocks     = 256
	maxTimeFutureBlocks = 30
61 62 63
	// must be bumped when consensus algorithm is changed, this forces the upgradedb
	// command to be run (forces the blocks to be imported again using the new algorithm)
	BlockChainVersion = 3
O
obscuren 已提交
64
)
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79
// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
// Importing blocks in to the block chain happens according to the set of rules
// defined by the two stage Validator. Processing of blocks is done using the
// Processor which processes the included transaction. The validation of the state
// is done in the second part of the Validator. Failing results in aborting of
// the import.
//
// The BlockChain also helps in returning blocks from **any** chain included
// in the database as well as blocks that represents the canonical chain. It's
// important to note that GetBlock can return any block and does not need to be
// included in the canonical one where as GetBlockByNumber always represents the
// canonical chain.
80
type BlockChain struct {
81 82
	config *ChainConfig // chain & network configuration

83
	hc           *HeaderChain
84
	chainDb      ethdb.Database
O
obscuren 已提交
85
	eventMux     *event.TypeMux
86
	genesisBlock *types.Block
87

88 89 90
	mu      sync.RWMutex // global mutex for locking chain operations
	chainmu sync.RWMutex // blockchain insertion lock
	procmu  sync.RWMutex // block processor lock
91

92 93 94
	checkpoint       int          // checkpoint counts towards the new checkpoint
	currentBlock     *types.Block // Current head of the block chain
	currentFastBlock *types.Block // Current head of the fast-sync chain (may be above the block chain!)
O
obscuren 已提交
95

96 97 98 99 100
	stateCache   *state.StateDB // State database to reuse between imports (contains state cache)
	bodyCache    *lru.Cache     // Cache for the most recent block bodies
	bodyRLPCache *lru.Cache     // Cache for the most recent block bodies in RLP encoded format
	blockCache   *lru.Cache     // Cache for the most recent entire blocks
	futureBlocks *lru.Cache     // future blocks are blocks added for later processing
101

102
	quit    chan struct{} // blockchain quit channel
L
Leif Jurvetson 已提交
103
	running int32         // running must be called atomically
O
obscuren 已提交
104
	// procInterrupt must be atomically called
105 106
	procInterrupt int32          // interrupt signaler for block processing
	wg            sync.WaitGroup // chain processing wait group for shutting down
107

108
	pow       pow.PoW
109 110
	processor Processor // block processor interface
	validator Validator // block and state validator interface
O
obscuren 已提交
111
}
O
obscuren 已提交
112

113 114 115
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialiser the default Ethereum Validator and
// Processor.
116
func NewBlockChain(chainDb ethdb.Database, config *ChainConfig, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
117 118 119 120 121
	bodyCache, _ := lru.New(bodyCacheLimit)
	bodyRLPCache, _ := lru.New(bodyCacheLimit)
	blockCache, _ := lru.New(blockCacheLimit)
	futureBlocks, _ := lru.New(maxFutureBlocks)

122
	bc := &BlockChain{
123
		config:       config,
124 125 126 127 128 129 130 131
		chainDb:      chainDb,
		eventMux:     mux,
		quit:         make(chan struct{}),
		bodyCache:    bodyCache,
		bodyRLPCache: bodyRLPCache,
		blockCache:   blockCache,
		futureBlocks: futureBlocks,
		pow:          pow,
O
obscuren 已提交
132
	}
133 134
	bc.SetValidator(NewBlockValidator(config, bc, pow))
	bc.SetProcessor(NewStateProcessor(config, bc))
135 136 137

	gv := func() HeaderValidator { return bc.Validator() }
	var err error
138
	bc.hc, err = NewHeaderChain(chainDb, config, gv, bc.getProcInterrupt)
139 140 141
	if err != nil {
		return nil, err
	}
142 143 144

	bc.genesisBlock = bc.GetBlockByNumber(0)
	if bc.genesisBlock == nil {
145
		return nil, ErrNoGenesis
146
	}
147

148
	if err := bc.loadLastState(); err != nil {
149
		return nil, err
O
obscuren 已提交
150
	}
151
	// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
152
	for hash, _ := range BadHashes {
153
		if header := bc.GetHeaderByHash(hash); header != nil {
154 155 156
			glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4])
			bc.SetHead(header.Number.Uint64() - 1)
			glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation")
157 158
		}
	}
159
	// Take ownership of this particular state
160
	go bc.update()
O
obscuren 已提交
161
	return bc, nil
162 163
}

164 165 166 167
func (self *BlockChain) getProcInterrupt() bool {
	return atomic.LoadInt32(&self.procInterrupt) == 1
}

168 169 170 171 172 173 174 175 176
// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (self *BlockChain) loadLastState() error {
	// Restore the last known head block
	head := GetHeadBlockHash(self.chainDb)
	if head == (common.Hash{}) {
		// Corrupt or empty database, init from scratch
		self.Reset()
	} else {
177
		if block := self.GetBlockByHash(head); block != nil {
178 179 180 181 182 183 184 185
			// Block found, set as the current head
			self.currentBlock = block
		} else {
			// Corrupt or empty database, init from scratch
			self.Reset()
		}
	}
	// Restore the last known head header
186
	currentHeader := self.currentBlock.Header()
187
	if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) {
188
		if header := self.GetHeaderByHash(head); header != nil {
189
			currentHeader = header
190 191
		}
	}
192
	self.hc.SetCurrentHeader(currentHeader)
193 194 195
	// Restore the last known head fast block
	self.currentFastBlock = self.currentBlock
	if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) {
196
		if block := self.GetBlockByHash(head); block != nil {
197 198 199
			self.currentFastBlock = block
		}
	}
200 201 202 203 204 205 206 207 208
	// Initialize a statedb cache to ensure singleton account bloom filter generation
	statedb, err := state.New(self.currentBlock.Root(), self.chainDb)
	if err != nil {
		return err
	}
	self.stateCache = statedb
	self.stateCache.GetAccount(common.Address{})

	// Issue a status log for the user
209 210 211
	headerTd := self.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
	blockTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	fastTd := self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64())
212

213
	glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", currentHeader.Number, currentHeader.Hash().Bytes()[:4], headerTd)
214
	glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
215
	glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
216 217 218 219

	return nil
}

220 221 222 223
// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
224
func (bc *BlockChain) SetHead(head uint64) {
225 226 227
	bc.mu.Lock()
	defer bc.mu.Unlock()

228 229
	delFn := func(hash common.Hash, num uint64) {
		DeleteBody(bc.chainDb, hash, num)
230
	}
231 232
	bc.hc.SetHead(head, delFn)

233
	// Clear out any stale content from the caches
234 235 236 237
	bc.bodyCache.Purge()
	bc.bodyRLPCache.Purge()
	bc.blockCache.Purge()
	bc.futureBlocks.Purge()
238

239
	// Update all computed fields to the new head
240 241 242
	currentHeader := bc.hc.CurrentHeader()
	if bc.currentBlock != nil && currentHeader.Number.Uint64() < bc.currentBlock.NumberU64() {
		bc.currentBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
243
	}
244 245
	if bc.currentFastBlock != nil && currentHeader.Number.Uint64() < bc.currentFastBlock.NumberU64() {
		bc.currentFastBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
246 247
	}

248 249 250
	if bc.currentBlock == nil {
		bc.currentBlock = bc.genesisBlock
	}
251 252 253
	if bc.currentFastBlock == nil {
		bc.currentFastBlock = bc.genesisBlock
	}
254

255 256 257 258 259 260
	if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head block hash: %v", err)
	}
	if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head fast block hash: %v", err)
	}
261
	bc.loadLastState()
262 263
}

264 265 266
// FastSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
267
	// Make sure that both the block as well at its state trie exists
268
	block := self.GetBlockByHash(hash)
269 270 271
	if block == nil {
		return fmt.Errorf("non existent block [%x…]", hash[:4])
	}
272
	if _, err := trie.NewSecure(block.Root(), self.chainDb, 0); err != nil {
273 274 275 276 277 278 279 280 281 282 283
		return err
	}
	// If all checks out, manually set the head block
	self.mu.Lock()
	self.currentBlock = block
	self.mu.Unlock()

	glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4])
	return nil
}

284
// GasLimit returns the gas limit of the current HEAD block.
285
func (self *BlockChain) GasLimit() *big.Int {
O
obscuren 已提交
286 287
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
288

289
	return self.currentBlock.GasLimit()
O
obscuren 已提交
290 291
}

292
// LastBlockHash return the hash of the HEAD block.
293
func (self *BlockChain) LastBlockHash() common.Hash {
294 295 296
	self.mu.RLock()
	defer self.mu.RUnlock()

297
	return self.currentBlock.Hash()
298 299
}

300
// CurrentBlock retrieves the current head block of the canonical chain. The
301
// block is retrieved from the blockchain's internal cache.
302
func (self *BlockChain) CurrentBlock() *types.Block {
O
obscuren 已提交
303 304 305 306
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentBlock
O
obscuren 已提交
307 308
}

309
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
310
// chain. The block is retrieved from the blockchain's internal cache.
311 312 313 314 315 316 317
func (self *BlockChain) CurrentFastBlock() *types.Block {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentFastBlock
}

318 319
// Status returns status information about the current chain such as the HEAD Td,
// the HEAD hash and the hash of the genesis block.
320
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
O
obscuren 已提交
321 322 323
	self.mu.RLock()
	defer self.mu.RUnlock()

324
	return self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64()), self.currentBlock.Hash(), self.genesisBlock.Hash()
325 326
}

327 328 329 330 331 332 333 334 335 336 337 338
// SetProcessor sets the processor required for making state modifications.
func (self *BlockChain) SetProcessor(processor Processor) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.processor = processor
}

// SetValidator sets the validator which is used to validate incoming blocks.
func (self *BlockChain) SetValidator(validator Validator) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.validator = validator
339 340
}

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
// Validator returns the current validator.
func (self *BlockChain) Validator() Validator {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.validator
}

// Processor returns the current processor.
func (self *BlockChain) Processor() Processor {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.processor
}

// AuxValidator returns the auxiliary validator (Proof of work atm)
func (self *BlockChain) AuxValidator() pow.PoW { return self.pow }

// State returns a new mutable state based on the current HEAD block.
359
func (self *BlockChain) State() (*state.StateDB, error) {
360 361 362 363 364 365
	return self.StateAt(self.CurrentBlock().Root())
}

// StateAt returns a new mutable state based on a particular point in time.
func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
	return self.stateCache.New(root)
O
obscuren 已提交
366 367
}

368
// Reset purges the entire blockchain, restoring it to its genesis state.
369
func (bc *BlockChain) Reset() {
370
	bc.ResetWithGenesisBlock(bc.genesisBlock)
371 372
}

373 374
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
375
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
376 377 378
	// Dump the entire block chain and purge the caches
	bc.SetHead(0)

379 380 381
	bc.mu.Lock()
	defer bc.mu.Unlock()

382
	// Prepare the genesis block and reinitialise the chain
383
	if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
384 385 386
		glog.Fatalf("failed to write genesis block TD: %v", err)
	}
	if err := WriteBlock(bc.chainDb, genesis); err != nil {
387
		glog.Fatalf("failed to write genesis block: %v", err)
388
	}
389
	bc.genesisBlock = genesis
390 391
	bc.insert(bc.genesisBlock)
	bc.currentBlock = bc.genesisBlock
392 393
	bc.hc.SetGenesis(bc.genesisBlock.Header())
	bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
394
	bc.currentFastBlock = bc.genesisBlock
395 396
}

397
// Export writes the active chain to the given writer.
398
func (self *BlockChain) Export(w io.Writer) error {
399
	if err := self.ExportN(w, uint64(0), self.currentBlock.NumberU64()); err != nil {
T
Taylor Gerring 已提交
400 401 402 403 404 405
		return err
	}
	return nil
}

// ExportN writes a subset of the active chain to the given writer.
406
func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
O
obscuren 已提交
407 408
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
409

T
Taylor Gerring 已提交
410 411 412 413
	if first > last {
		return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
	}

T
Cleanup  
Taylor Gerring 已提交
414
	glog.V(logger.Info).Infof("exporting %d blocks...\n", last-first+1)
415

T
Taylor Gerring 已提交
416
	for nr := first; nr <= last; nr++ {
417 418 419 420 421 422
		block := self.GetBlockByNumber(nr)
		if block == nil {
			return fmt.Errorf("export failed on #%d: not found", nr)
		}

		if err := block.EncodeRLP(w); err != nil {
423 424
			return err
		}
O
obscuren 已提交
425
	}
426

427
	return nil
O
obscuren 已提交
428 429
}

430 431
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
432 433
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
434 435
//
// Note, this function assumes that the `mu` mutex is held!
436
func (bc *BlockChain) insert(block *types.Block) {
437 438 439
	// If the block is on a side chain or an unknown one, force other heads onto it too
	updateHeads := GetCanonicalHash(bc.chainDb, block.NumberU64()) != block.Hash()

440 441 442
	// Add the block to the canonical chain number scheme and mark as the head
	if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil {
		glog.Fatalf("failed to insert block number: %v", err)
443
	}
444
	if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil {
445
		glog.Fatalf("failed to insert head block hash: %v", err)
446
	}
447
	bc.currentBlock = block
448 449 450

	// If the block is better than out head or is on a different chain, force update heads
	if updateHeads {
451
		bc.hc.SetCurrentHeader(block.Header())
452 453 454 455 456 457

		if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil {
			glog.Fatalf("failed to insert head fast block hash: %v", err)
		}
		bc.currentFastBlock = block
	}
458 459
}

O
obscuren 已提交
460
// Accessors
461
func (bc *BlockChain) Genesis() *types.Block {
O
obscuren 已提交
462 463 464
	return bc.genesisBlock
}

465 466
// GetBody retrieves a block body (transactions and uncles) from the database by
// hash, caching it if found.
467
func (self *BlockChain) GetBody(hash common.Hash) *types.Body {
468 469
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyCache.Get(hash); ok {
470 471
		body := cached.(*types.Body)
		return body
O
obscuren 已提交
472
	}
473
	body := GetBody(self.chainDb, hash, self.hc.GetBlockNumber(hash))
474 475
	if body == nil {
		return nil
476 477
	}
	// Cache the found body for next time and return
478 479
	self.bodyCache.Add(hash, body)
	return body
480
}
O
obscuren 已提交
481

482 483
// GetBodyRLP retrieves a block body in RLP encoding from the database by hash,
// caching it if found.
484
func (self *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
485 486
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyRLPCache.Get(hash); ok {
487
		return cached.(rlp.RawValue)
488
	}
489
	body := GetBodyRLP(self.chainDb, hash, self.hc.GetBlockNumber(hash))
490
	if len(body) == 0 {
491
		return nil
O
obscuren 已提交
492
	}
493 494 495 496
	// Cache the found body for next time and return
	self.bodyRLPCache.Add(hash, body)
	return body
}
O
obscuren 已提交
497

498 499
// HasBlock checks if a block is fully present in the database or not, caching
// it if present.
500
func (bc *BlockChain) HasBlock(hash common.Hash) bool {
501
	return bc.GetBlockByHash(hash) != nil
O
obscuren 已提交
502 503
}

504 505 506 507
// HasBlockAndState checks if a block and associated state trie is fully present
// in the database or not, caching it if present.
func (bc *BlockChain) HasBlockAndState(hash common.Hash) bool {
	// Check first that the block itself is known
508
	block := bc.GetBlockByHash(hash)
509 510 511 512 513 514 515 516
	if block == nil {
		return false
	}
	// Ensure the associated state is also present
	_, err := state.New(block.Root(), bc.chainDb)
	return err == nil
}

517 518 519
// GetBlock retrieves a block from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
520 521
	// Short circuit if the block's already in the cache, retrieve otherwise
	if block, ok := self.blockCache.Get(hash); ok {
522 523
		return block.(*types.Block)
	}
524
	block := GetBlock(self.chainDb, hash, number)
525
	if block == nil {
O
obscuren 已提交
526 527
		return nil
	}
528 529 530
	// Cache the found block for next time and return
	self.blockCache.Add(block.Hash(), block)
	return block
O
obscuren 已提交
531 532
}

533 534 535 536 537
// GetBlockByHash retrieves a block from the database by hash, caching it if found.
func (self *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
	return self.GetBlock(hash, self.hc.GetBlockNumber(hash))
}

538 539
// GetBlockByNumber retrieves a block from the database by number, caching it
// (associated with its hash) if found.
540
func (self *BlockChain) GetBlockByNumber(number uint64) *types.Block {
541
	hash := GetCanonicalHash(self.chainDb, number)
542 543 544
	if hash == (common.Hash{}) {
		return nil
	}
545
	return self.GetBlock(hash, number)
546
}
547

548
// [deprecated by eth/62]
F
Felix Lange 已提交
549
// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
550
func (self *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
551
	number := self.hc.GetBlockNumber(hash)
F
Felix Lange 已提交
552
	for i := 0; i < n; i++ {
553
		block := self.GetBlock(hash, number)
F
Felix Lange 已提交
554 555 556 557 558
		if block == nil {
			break
		}
		blocks = append(blocks, block)
		hash = block.ParentHash()
559
		number--
F
Felix Lange 已提交
560 561 562 563
	}
	return
}

564 565 566 567
// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
	uncles := []*types.Header{}
568 569
	for i := 0; block != nil && i < length; i++ {
		uncles = append(uncles, block.Uncles()...)
570
		block = self.GetBlock(block.ParentHash(), block.NumberU64()-1)
571
	}
572
	return uncles
O
obscuren 已提交
573
}
O
obscuren 已提交
574

575 576
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
577
func (bc *BlockChain) Stop() {
578 579 580
	if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
		return
	}
581
	close(bc.quit)
O
obscuren 已提交
582
	atomic.StoreInt32(&bc.procInterrupt, 1)
583 584 585 586

	bc.wg.Wait()

	glog.V(logger.Info).Infoln("Chain manager stopped")
587 588
}

589
func (self *BlockChain) procFutureBlocks() {
590 591 592 593 594
	blocks := make([]*types.Block, 0, self.futureBlocks.Len())
	for _, hash := range self.futureBlocks.Keys() {
		if block, exist := self.futureBlocks.Get(hash); exist {
			blocks = append(blocks, block.(*types.Block))
		}
595
	}
596 597 598 599
	if len(blocks) > 0 {
		types.BlockBy(types.Number).Sort(blocks)
		self.InsertChain(blocks)
	}
600 601
}

602
type WriteStatus byte
603 604

const (
605
	NonStatTy WriteStatus = iota
606 607 608
	CanonStatTy
	SplitStatTy
	SideStatTy
609 610
)

611 612 613
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (self *BlockChain) Rollback(chain []common.Hash) {
614 615 616
	self.mu.Lock()
	defer self.mu.Unlock()

617 618 619
	for i := len(chain) - 1; i >= 0; i-- {
		hash := chain[i]

620 621 622
		currentHeader := self.hc.CurrentHeader()
		if currentHeader.Hash() == hash {
			self.hc.SetCurrentHeader(self.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
623 624
		}
		if self.currentFastBlock.Hash() == hash {
625
			self.currentFastBlock = self.GetBlock(self.currentFastBlock.ParentHash(), self.currentFastBlock.NumberU64()-1)
626 627 628
			WriteHeadFastBlockHash(self.chainDb, self.currentFastBlock.Hash())
		}
		if self.currentBlock.Hash() == hash {
629
			self.currentBlock = self.GetBlock(self.currentBlock.ParentHash(), self.currentBlock.NumberU64()-1)
630 631 632 633 634
			WriteHeadBlockHash(self.chainDb, self.currentBlock.Hash())
		}
	}
}

635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665
// SetReceiptsData computes all the non-consensus fields of the receipts
func SetReceiptsData(block *types.Block, receipts types.Receipts) {
	transactions, logIndex := block.Transactions(), uint(0)

	for j := 0; j < len(receipts); j++ {
		// The transaction hash can be retrieved from the transaction itself
		receipts[j].TxHash = transactions[j].Hash()

		// The contract address can be derived from the transaction itself
		if MessageCreatesContract(transactions[j]) {
			from, _ := transactions[j].From()
			receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce())
		}
		// The used gas can be calculated based on previous receipts
		if j == 0 {
			receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed)
		} else {
			receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed)
		}
		// The derived log fields can simply be set from the block and transaction
		for k := 0; k < len(receipts[j].Logs); k++ {
			receipts[j].Logs[k].BlockNumber = block.NumberU64()
			receipts[j].Logs[k].BlockHash = block.Hash()
			receipts[j].Logs[k].TxHash = receipts[j].TxHash
			receipts[j].Logs[k].TxIndex = uint(j)
			receipts[j].Logs[k].Index = logIndex
			logIndex++
		}
	}
}

666 667 668 669 670 671 672
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
	self.wg.Add(1)
	defer self.wg.Done()

	// Collect some import statistics to report on
673
	stats := struct{ processed, ignored int32 }{}
674 675
	start := time.Now()

676 677
	// Create the block importing task queue and worker functions
	tasks := make(chan int, len(blockChain))
678
	for i := 0; i < len(blockChain) && i < len(receiptChain); i++ {
679 680 681
		tasks <- i
	}
	close(tasks)
682

683 684 685 686 687 688 689 690
	errs, failed := make([]error, len(tasks)), int32(0)
	process := func(worker int) {
		for index := range tasks {
			block, receipts := blockChain[index], receiptChain[index]

			// Short circuit insertion if shutting down or processing failed
			if atomic.LoadInt32(&self.procInterrupt) == 1 {
				return
691
			}
692 693
			if atomic.LoadInt32(&failed) > 0 {
				return
694
			}
695 696 697 698 699
			// Short circuit if the owner header is unknown
			if !self.HasHeader(block.Hash()) {
				errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
				atomic.AddInt32(&failed, 1)
				return
700
			}
701 702 703 704 705 706
			// Skip if the entire data is already known
			if self.HasBlock(block.Hash()) {
				atomic.AddInt32(&stats.ignored, 1)
				continue
			}
			// Compute all the non-consensus fields of the receipts
707
			SetReceiptsData(block, receipts)
708
			// Write all the data out into the database
709
			if err := WriteBody(self.chainDb, block.Hash(), block.NumberU64(), block.Body()); err != nil {
710 711 712 713 714
				errs[index] = fmt.Errorf("failed to write block body: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
715
			if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
716 717 718 719 720
				errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
721 722 723 724 725 726
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
727 728 729 730 731 732 733 734 735 736 737 738
			if err := WriteTransactions(self.chainDb, block); err != nil {
				errs[index] = fmt.Errorf("failed to write individual transactions: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write individual receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
739
			atomic.AddInt32(&stats.processed, 1)
740
		}
741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757
	}
	// Start as many worker threads as goroutines allowed
	pending := new(sync.WaitGroup)
	for i := 0; i < runtime.GOMAXPROCS(0); i++ {
		pending.Add(1)
		go func(id int) {
			defer pending.Done()
			process(id)
		}(i)
	}
	pending.Wait()

	// If anything failed, report
	if failed > 0 {
		for i, err := range errs {
			if err != nil {
				return i, err
758 759 760
			}
		}
	}
761 762 763 764
	if atomic.LoadInt32(&self.procInterrupt) == 1 {
		glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
		return 0, nil
	}
765 766 767
	// Update the head fast sync block if better
	self.mu.Lock()
	head := blockChain[len(errs)-1]
768
	if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(self.GetTd(head.Hash(), head.NumberU64())) < 0 {
769 770 771 772 773 774 775
		if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil {
			glog.Fatalf("failed to update head fast block hash: %v", err)
		}
		self.currentFastBlock = head
	}
	self.mu.Unlock()

776 777
	// Report some public statistics so the user has a clue what's going on
	first, last := blockChain[0], blockChain[len(blockChain)-1]
778 779 780 781 782

	ignored := ""
	if stats.ignored > 0 {
		ignored = fmt.Sprintf(" (%d ignored)", stats.ignored)
	}
783
	glog.V(logger.Info).Infof("imported %d receipts in %9v. #%d [%x… / %x…]%s", stats.processed, common.PrettyDuration(time.Since(start)), last.Number(), first.Hash().Bytes()[:4], last.Hash().Bytes()[:4], ignored)
784 785 786 787

	return 0, nil
}

788
// WriteBlock writes the block to the chain.
789
func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err error) {
790 791 792
	self.wg.Add(1)
	defer self.wg.Done()

793
	// Calculate the total difficulty of the block
794
	ptd := self.GetTd(block.ParentHash(), block.NumberU64()-1)
795 796 797
	if ptd == nil {
		return NonStatTy, ParentError(block.ParentHash())
	}
798 799 800
	// Make sure no inconsistent state is leaked during insertion
	self.mu.Lock()
	defer self.mu.Unlock()
801

802 803 804
	localTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	externTd := new(big.Int).Add(block.Difficulty(), ptd)

805 806 807 808 809 810 811 812
	// Irrelevant of the canonical status, write the block itself to the database
	if err := self.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
		glog.Fatalf("failed to write block total difficulty: %v", err)
	}
	if err := WriteBlock(self.chainDb, block); err != nil {
		glog.Fatalf("failed to write block contents: %v", err)
	}

813
	// If the total difficulty is higher than our known, add it to the canonical chain
814 815 816
	// Second clause in the if statement reduces the vulnerability to selfish mining.
	// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
	if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
817
		// Reorganise the chain if the parent is not the head block
818 819
		if block.ParentHash() != self.currentBlock.Hash() {
			if err := self.reorg(self.currentBlock, block); err != nil {
820
				return NonStatTy, err
821 822
			}
		}
823
		self.insert(block) // Insert the block as the new head of the chain
824
		status = CanonStatTy
825
	} else {
826
		status = SideStatTy
827
	}
828

829
	self.futureBlocks.Remove(block.Hash())
830 831 832 833

	return
}

834 835
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
836
func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
837 838 839
	self.wg.Add(1)
	defer self.wg.Done()

O
obscuren 已提交
840 841 842
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

F
Felix Lange 已提交
843 844 845
	// A queued approach to delivering events. This is generally
	// faster than direct delivery and requires much less mutex
	// acquiring.
O
obscuren 已提交
846
	var (
F
Felix Lange 已提交
847
		stats         = insertStats{startTime: time.Now()}
848 849
		events        = make([]interface{}, 0, len(chain))
		coalescedLogs vm.Logs
F
Felix Lange 已提交
850
		nonceChecked  = make([]bool, len(chain))
O
obscuren 已提交
851
	)
852

F
Felix Lange 已提交
853
	// Start the parallel nonce verifier.
854 855
	nonceAbort, nonceResults := verifyNoncesFromBlocks(self.pow, chain)
	defer close(nonceAbort)
856

857
	for i, block := range chain {
O
obscuren 已提交
858
		if atomic.LoadInt32(&self.procInterrupt) == 1 {
859
			glog.V(logger.Debug).Infoln("Premature abort during block chain processing")
O
obscuren 已提交
860 861
			break
		}
862

O
obscuren 已提交
863 864 865 866
		bstart := time.Now()
		// Wait for block i's nonce to be verified before processing
		// its state transition.
		for !nonceChecked[i] {
867 868
			r := <-nonceResults
			nonceChecked[r.index] = true
O
obscuren 已提交
869
			if !r.valid {
870 871
				block := chain[r.index]
				return r.index, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
O
obscuren 已提交
872
			}
O
obscuren 已提交
873
		}
O
obscuren 已提交
874

O
obscuren 已提交
875
		if BadHashes[block.Hash()] {
876
			err := BadHashError(block.Hash())
877
			reportBlock(block, err)
O
obscuren 已提交
878 879
			return i, err
		}
880 881 882
		// Stage 1 validation of the block using the chain's validator
		// interface.
		err := self.Validator().ValidateBlock(block)
O
obscuren 已提交
883 884 885 886 887
		if err != nil {
			if IsKnownBlockErr(err) {
				stats.ignored++
				continue
			}
888

O
obscuren 已提交
889 890 891 892
			if err == BlockFutureErr {
				// Allow up to MaxFuture second in the future blocks. If this limit
				// is exceeded the chain is discarded and processed at a later time
				// if given.
893 894
				max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
				if block.Time().Cmp(max) == 1 {
O
obscuren 已提交
895
					return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
896
				}
O
obscuren 已提交
897

898
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
899 900 901
				stats.queued++
				continue
			}
O
obscuren 已提交
902

903 904
			if IsParentErr(err) && self.futureBlocks.Contains(block.ParentHash()) {
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
905 906
				stats.queued++
				continue
907
			}
908

909
			reportBlock(block, err)
O
obscuren 已提交
910

911 912
			return i, err
		}
913

914 915
		// Create a new statedb using the parent block and report an
		// error if it fails.
916 917 918 919 920
		switch {
		case i == 0:
			err = self.stateCache.Reset(self.GetBlock(block.ParentHash(), block.NumberU64()-1).Root())
		default:
			err = self.stateCache.Reset(chain[i-1].Root())
921
		}
922 923
		if err != nil {
			reportBlock(block, err)
O
obscuren 已提交
924 925
			return i, err
		}
926
		// Process block using the parent state as reference point.
927
		receipts, logs, usedGas, err := self.processor.Process(block, self.stateCache, self.config.VmConfig)
928 929 930 931 932
		if err != nil {
			reportBlock(block, err)
			return i, err
		}
		// Validate the state using the default validator
933
		err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash(), block.NumberU64()-1), self.stateCache, receipts, usedGas)
934 935 936 937 938
		if err != nil {
			reportBlock(block, err)
			return i, err
		}
		// Write state changes to database
939
		_, err = self.stateCache.Commit()
940 941 942 943 944 945 946
		if err != nil {
			return i, err
		}

		// coalesce logs for later processing
		coalescedLogs = append(coalescedLogs, logs...)

947
		if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
948
			return i, err
949
		}
O
obscuren 已提交
950

951
		// write the block to the chain and get the status
952
		status, err := self.WriteBlock(block)
953 954 955
		if err != nil {
			return i, err
		}
956

957
		switch status {
958
		case CanonStatTy:
O
obscuren 已提交
959
			if glog.V(logger.Debug) {
960
				glog.Infof("inserted block #%d [%x…] in %9v: %3d txs %7v gas %d uncles.", block.Number(), block.Hash().Bytes()[0:4], common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), block.GasUsed(), len(block.Uncles()))
O
obscuren 已提交
961
			}
962
			blockInsertTimer.UpdateSince(bstart)
963
			events = append(events, ChainEvent{block, block.Hash(), logs})
964 965

			// This puts transactions in a extra db for rpc
966
			if err := WriteTransactions(self.chainDb, block); err != nil {
967 968
				return i, err
			}
969
			// store the receipts
970
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
971 972 973 974 975 976
				return i, err
			}
			// Write map map bloom filters
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				return i, err
			}
977
		case SideStatTy:
O
obscuren 已提交
978
			if glog.V(logger.Detail) {
979
				glog.Infof("inserted forked block #%d [%x…] (TD=%v) in %9v: %3d txs %d uncles.", block.Number(), block.Hash().Bytes()[0:4], block.Difficulty(), common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), len(block.Uncles()))
O
obscuren 已提交
980
			}
981
			blockInsertTimer.UpdateSince(bstart)
982 983
			events = append(events, ChainSideEvent{block, logs})

984
		case SplitStatTy:
985
			events = append(events, ChainSplitEvent{block, logs})
986
		}
F
Felix Lange 已提交
987

O
obscuren 已提交
988
		stats.processed++
F
Felix Lange 已提交
989
		if glog.V(logger.Info) {
990
			stats.usedGas += usedGas.Uint64()
F
Felix Lange 已提交
991 992
			stats.report(chain, i)
		}
O
obscuren 已提交
993 994
	}

995
	go self.postChainEvents(events, coalescedLogs)
996

997
	return 0, nil
O
obscuren 已提交
998
}
999

F
Felix Lange 已提交
1000 1001 1002
// insertStats tracks and reports on block insertion.
type insertStats struct {
	queued, processed, ignored int
1003
	usedGas                    uint64
F
Felix Lange 已提交
1004 1005 1006 1007
	lastIndex                  int
	startTime                  time.Time
}

1008 1009 1010
// statsReportLimit is the time limit during import after which we always print
// out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
F
Felix Lange 已提交
1011 1012 1013 1014

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int) {
1015
	// Fetch the timings for the batch
1016 1017 1018 1019
	var (
		now     = time.Now()
		elapsed = now.Sub(st.startTime)
	)
1020 1021 1022 1023
	if elapsed == 0 { // Yes Windows, I'm looking at you
		elapsed = 1
	}
	// If we're at the last block of the batch or report period reached, log
1024
	if index == len(chain)-1 || elapsed >= statsReportLimit {
F
Felix Lange 已提交
1025
		start, end := chain[st.lastIndex], chain[index]
J
Jeffrey Wilcke 已提交
1026
		txcount := countTransactions(chain[st.lastIndex : index+1])
1027 1028 1029 1030 1031

		extra := ""
		if st.queued > 0 || st.ignored > 0 {
			extra = fmt.Sprintf(" (%d queued %d ignored)", st.queued, st.ignored)
		}
1032 1033 1034 1035 1036 1037 1038
		hashes := ""
		if st.processed > 1 {
			hashes = fmt.Sprintf("%x… / %x…", start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
		} else {
			hashes = fmt.Sprintf("%x…", end.Hash().Bytes()[:4])
		}
		glog.Infof("imported %d blocks, %5d txs (%7.3f Mg) in %9v (%6.3f Mg/s). #%v [%s]%s", st.processed, txcount, float64(st.usedGas)/1000000, common.PrettyDuration(elapsed), float64(st.usedGas)*1000/float64(elapsed), end.Number(), hashes, extra)
1039

F
Felix Lange 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
		*st = insertStats{startTime: now, lastIndex: index}
	}
}

func countTransactions(chain []*types.Block) (c int) {
	for _, b := range chain {
		c += len(b.Transactions())
	}
	return c
}

1051 1052 1053
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
1054
func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
1055
	var (
1056 1057 1058 1059 1060 1061 1062 1063
		newChain          types.Blocks
		oldChain          types.Blocks
		commonBlock       *types.Block
		oldStart          = oldBlock
		newStart          = newBlock
		deletedTxs        types.Transactions
		deletedLogs       vm.Logs
		deletedLogsByHash = make(map[common.Hash]vm.Logs)
J
Jeffrey Wilcke 已提交
1064 1065 1066 1067 1068
		// collectLogs collects the logs that were generated during the
		// processing of the block that corresponds with the given hash.
		// These logs are later announced as deleted.
		collectLogs = func(h common.Hash) {
			// Coalesce logs
1069
			receipts := GetBlockReceipts(self.chainDb, h, self.hc.GetBlockNumber(h))
J
Jeffrey Wilcke 已提交
1070 1071
			for _, receipt := range receipts {
				deletedLogs = append(deletedLogs, receipt.Logs...)
1072 1073

				deletedLogsByHash[h] = receipt.Logs
J
Jeffrey Wilcke 已提交
1074 1075
			}
		}
1076
	)
1077 1078 1079 1080

	// first reduce whoever is higher bound
	if oldBlock.NumberU64() > newBlock.NumberU64() {
		// reduce old chain
1081
		for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
1082
			oldChain = append(oldChain, oldBlock)
1083
			deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1084 1085

			collectLogs(oldBlock.Hash())
1086 1087 1088
		}
	} else {
		// reduce new chain and append new chain blocks for inserting later on
1089
		for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
1090 1091
			newChain = append(newChain, newBlock)
		}
O
obscuren 已提交
1092
	}
O
obscuren 已提交
1093
	if oldBlock == nil {
1094
		return fmt.Errorf("Invalid old chain")
O
obscuren 已提交
1095 1096
	}
	if newBlock == nil {
1097
		return fmt.Errorf("Invalid new chain")
O
obscuren 已提交
1098
	}
O
obscuren 已提交
1099

1100
	numSplit := newBlock.Number()
1101 1102
	for {
		if oldBlock.Hash() == newBlock.Hash() {
1103
			commonBlock = oldBlock
1104 1105
			break
		}
1106 1107

		oldChain = append(oldChain, oldBlock)
1108
		newChain = append(newChain, newBlock)
1109
		deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1110
		collectLogs(oldBlock.Hash())
O
obscuren 已提交
1111

1112
		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
1113
		if oldBlock == nil {
1114
			return fmt.Errorf("Invalid old chain")
1115 1116
		}
		if newBlock == nil {
1117
			return fmt.Errorf("Invalid new chain")
1118
		}
1119 1120
	}

1121
	if glog.V(logger.Debug) {
1122
		commonHash := commonBlock.Hash()
1123
		glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
1124 1125
	}

1126
	var addedTxs types.Transactions
1127
	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
1128
	for _, block := range newChain {
1129
		// insert the block in the canonical way, re-writing history
1130
		self.insert(block)
1131
		// write canonical receipts and transactions
1132
		if err := WriteTransactions(self.chainDb, block); err != nil {
1133 1134
			return err
		}
1135
		receipts := GetBlockReceipts(self.chainDb, block.Hash(), block.NumberU64())
1136
		// write receipts
1137
		if err := WriteReceipts(self.chainDb, receipts); err != nil {
1138 1139 1140 1141 1142 1143
			return err
		}
		// Write map map bloom filters
		if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
			return err
		}
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
		addedTxs = append(addedTxs, block.Transactions()...)
	}

	// calculate the difference between deleted and added transactions
	diff := types.TxDifference(deletedTxs, addedTxs)
	// When transactions get deleted from the database that means the
	// receipts that were created in the fork must also be deleted
	for _, tx := range diff {
		DeleteReceipt(self.chainDb, tx.Hash())
		DeleteTransaction(self.chainDb, tx.Hash())
1154
	}
1155 1156
	// Must be posted in a goroutine because of the transaction pool trying
	// to acquire the chain manager lock
J
Jeffrey Wilcke 已提交
1157 1158 1159 1160
	if len(diff) > 0 {
		go self.eventMux.Post(RemovedTransactionEvent{diff})
	}
	if len(deletedLogs) > 0 {
1161
		go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
J
Jeffrey Wilcke 已提交
1162
	}
1163

1164 1165 1166 1167 1168 1169 1170 1171
	if len(oldChain) > 0 {
		go func() {
			for _, block := range oldChain {
				self.eventMux.Post(ChainSideEvent{Block: block, Logs: deletedLogsByHash[block.Hash()]})
			}
		}()
	}

1172
	return nil
1173 1174
}

1175 1176
// postChainEvents iterates over the events generated by a chain insertion and
// posts them into the event mux.
1177 1178 1179
func (self *BlockChain) postChainEvents(events []interface{}, logs vm.Logs) {
	// post event logs for further processing
	self.eventMux.Post(logs)
1180 1181 1182 1183
	for _, event := range events {
		if event, ok := event.(ChainEvent); ok {
			// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
			// and in most cases isn't even necessary.
1184
			if self.LastBlockHash() == event.Hash {
1185 1186 1187 1188 1189 1190 1191 1192
				self.eventMux.Post(ChainHeadEvent{event.Block})
			}
		}
		// Fire the insertion events individually too
		self.eventMux.Post(event)
	}
}

1193
func (self *BlockChain) update() {
O
obscuren 已提交
1194
	futureTimer := time.Tick(5 * time.Second)
1195 1196
	for {
		select {
O
obscuren 已提交
1197
		case <-futureTimer:
1198
			self.procFutureBlocks()
1199
		case <-self.quit:
1200
			return
1201 1202 1203
		}
	}
}
1204

F
Felix Lange 已提交
1205
// reportBlock logs a bad block error.
1206
func reportBlock(block *types.Block, err error) {
1207 1208 1209 1210
	if glog.V(logger.Error) {
		glog.Errorf("Bad block #%v (%s)\n", block.Number(), block.Hash().Hex())
		glog.Errorf("    %v", err)
	}
F
Felix Lange 已提交
1211
}
1212 1213 1214 1215 1216 1217 1218

// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
L
Leif Jurvetson 已提交
1219
// of the header retrieval mechanisms already need to verify nonces, as well as
1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
// because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
	// Make sure only one thread manipulates the chain at once
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

	self.wg.Add(1)
	defer self.wg.Done()

	whFunc := func(header *types.Header) error {
		self.mu.Lock()
		defer self.mu.Unlock()

		_, err := self.hc.WriteHeader(header)
		return err
	}

	return self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
}

// writeHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (self *BlockChain) writeHeader(header *types.Header) error {
	self.wg.Add(1)
	defer self.wg.Done()

	self.mu.Lock()
	defer self.mu.Unlock()

	_, err := self.hc.WriteHeader(header)
	return err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (self *BlockChain) CurrentHeader() *types.Header {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.hc.CurrentHeader()
}

// GetTd retrieves a block's total difficulty in the canonical chain from the
1270 1271 1272 1273 1274 1275
// database by hash and number, caching it if found.
func (self *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
	return self.hc.GetTd(hash, number)
}

// GetTdByHash retrieves a block's total difficulty in the canonical chain from the
1276
// database by hash, caching it if found.
1277 1278 1279 1280 1281 1282 1283 1284
func (self *BlockChain) GetTdByHash(hash common.Hash) *big.Int {
	return self.hc.GetTdByHash(hash)
}

// GetHeader retrieves a block header from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header {
	return self.hc.GetHeader(hash, number)
1285 1286
}

1287
// GetHeaderByHash retrieves a block header from the database by hash, caching it if
1288
// found.
1289 1290
func (self *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
	return self.hc.GetHeaderByHash(hash)
1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
}

// HasHeader checks if a block header is present in the database or not, caching
// it if present.
func (bc *BlockChain) HasHeader(hash common.Hash) bool {
	return bc.hc.HasHeader(hash)
}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
// hash, fetching towards the genesis block.
func (self *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
	return self.hc.GetBlockHashesFromHash(hash, max)
}

// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (self *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
	return self.hc.GetHeaderByNumber(number)
}
1310 1311 1312

// Config retrieves the blockchain's chain configuration.
func (self *BlockChain) Config() *ChainConfig { return self.config }