blockchain.go 43.7 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package core implements the Ethereum consensus protocol.
O
obscuren 已提交
18
package core
O
obscuren 已提交
19 20

import (
21
	"errors"
O
obscuren 已提交
22
	"fmt"
23
	"io"
24
	"math/big"
25
	mrand "math/rand"
26
	"runtime"
O
obscuren 已提交
27
	"sync"
O
obscuren 已提交
28
	"sync/atomic"
29
	"time"
30

O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33
	"github.com/ethereum/go-ethereum/core/types"
34
	"github.com/ethereum/go-ethereum/core/vm"
35
	"github.com/ethereum/go-ethereum/crypto"
36
	"github.com/ethereum/go-ethereum/ethdb"
O
obscuren 已提交
37
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
38
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
39
	"github.com/ethereum/go-ethereum/logger/glog"
40
	"github.com/ethereum/go-ethereum/metrics"
41
	"github.com/ethereum/go-ethereum/pow"
42
	"github.com/ethereum/go-ethereum/rlp"
43
	"github.com/ethereum/go-ethereum/trie"
O
obscuren 已提交
44
	"github.com/hashicorp/golang-lru"
O
obscuren 已提交
45 46
)

47 48 49
var (
	chainlogger = logger.NewLogger("CHAIN")
	jsonlogger  = logger.NewJsonLogger()
50

51
	blockInsertTimer = metrics.NewTimer("chain/inserts")
52 53

	ErrNoGenesis = errors.New("Genesis not found in chain")
54
)
Z
zelig 已提交
55

O
obscuren 已提交
56
const (
57
	bodyCacheLimit      = 256
O
obscuren 已提交
58
	blockCacheLimit     = 256
O
wip  
obscuren 已提交
59 60
	maxFutureBlocks     = 256
	maxTimeFutureBlocks = 30
61 62 63
	// must be bumped when consensus algorithm is changed, this forces the upgradedb
	// command to be run (forces the blocks to be imported again using the new algorithm)
	BlockChainVersion = 3
O
obscuren 已提交
64
)
65

66 67 68 69 70 71 72 73 74 75 76 77 78 79
// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
// Importing blocks in to the block chain happens according to the set of rules
// defined by the two stage Validator. Processing of blocks is done using the
// Processor which processes the included transaction. The validation of the state
// is done in the second part of the Validator. Failing results in aborting of
// the import.
//
// The BlockChain also helps in returning blocks from **any** chain included
// in the database as well as blocks that represents the canonical chain. It's
// important to note that GetBlock can return any block and does not need to be
// included in the canonical one where as GetBlockByNumber always represents the
// canonical chain.
80
type BlockChain struct {
81 82
	config *ChainConfig // chain & network configuration

83
	hc           *HeaderChain
84
	chainDb      ethdb.Database
O
obscuren 已提交
85
	eventMux     *event.TypeMux
86
	genesisBlock *types.Block
87

88 89 90
	mu      sync.RWMutex // global mutex for locking chain operations
	chainmu sync.RWMutex // blockchain insertion lock
	procmu  sync.RWMutex // block processor lock
91

92 93 94
	checkpoint       int          // checkpoint counts towards the new checkpoint
	currentBlock     *types.Block // Current head of the block chain
	currentFastBlock *types.Block // Current head of the fast-sync chain (may be above the block chain!)
O
obscuren 已提交
95

96 97 98 99 100
	stateCache   *state.StateDB // State database to reuse between imports (contains state cache)
	bodyCache    *lru.Cache     // Cache for the most recent block bodies
	bodyRLPCache *lru.Cache     // Cache for the most recent block bodies in RLP encoded format
	blockCache   *lru.Cache     // Cache for the most recent entire blocks
	futureBlocks *lru.Cache     // future blocks are blocks added for later processing
101

102
	quit    chan struct{} // blockchain quit channel
L
Leif Jurvetson 已提交
103
	running int32         // running must be called atomically
O
obscuren 已提交
104
	// procInterrupt must be atomically called
105 106
	procInterrupt int32          // interrupt signaler for block processing
	wg            sync.WaitGroup // chain processing wait group for shutting down
107

108
	pow       pow.PoW
109 110
	processor Processor // block processor interface
	validator Validator // block and state validator interface
O
obscuren 已提交
111
}
O
obscuren 已提交
112

113 114 115
// NewBlockChain returns a fully initialised block chain using information
// available in the database. It initialiser the default Ethereum Validator and
// Processor.
116
func NewBlockChain(chainDb ethdb.Database, config *ChainConfig, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
117 118 119 120 121
	bodyCache, _ := lru.New(bodyCacheLimit)
	bodyRLPCache, _ := lru.New(bodyCacheLimit)
	blockCache, _ := lru.New(blockCacheLimit)
	futureBlocks, _ := lru.New(maxFutureBlocks)

122
	bc := &BlockChain{
123
		config:       config,
124 125 126 127 128 129 130 131
		chainDb:      chainDb,
		eventMux:     mux,
		quit:         make(chan struct{}),
		bodyCache:    bodyCache,
		bodyRLPCache: bodyRLPCache,
		blockCache:   blockCache,
		futureBlocks: futureBlocks,
		pow:          pow,
O
obscuren 已提交
132
	}
133 134
	bc.SetValidator(NewBlockValidator(config, bc, pow))
	bc.SetProcessor(NewStateProcessor(config, bc))
135 136 137

	gv := func() HeaderValidator { return bc.Validator() }
	var err error
138
	bc.hc, err = NewHeaderChain(chainDb, config, gv, bc.getProcInterrupt)
139 140 141
	if err != nil {
		return nil, err
	}
142 143 144

	bc.genesisBlock = bc.GetBlockByNumber(0)
	if bc.genesisBlock == nil {
145
		return nil, ErrNoGenesis
146
	}
147

148
	if err := bc.loadLastState(); err != nil {
149
		return nil, err
O
obscuren 已提交
150
	}
151
	// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
152
	for hash, _ := range BadHashes {
153
		if header := bc.GetHeaderByHash(hash); header != nil {
154 155 156
			glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4])
			bc.SetHead(header.Number.Uint64() - 1)
			glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation")
157 158
		}
	}
159
	// Take ownership of this particular state
160
	go bc.update()
O
obscuren 已提交
161
	return bc, nil
162 163
}

164 165 166 167
func (self *BlockChain) getProcInterrupt() bool {
	return atomic.LoadInt32(&self.procInterrupt) == 1
}

168 169 170 171 172 173 174 175 176
// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (self *BlockChain) loadLastState() error {
	// Restore the last known head block
	head := GetHeadBlockHash(self.chainDb)
	if head == (common.Hash{}) {
		// Corrupt or empty database, init from scratch
		self.Reset()
	} else {
177
		if block := self.GetBlockByHash(head); block != nil {
178 179 180 181 182 183 184 185
			// Block found, set as the current head
			self.currentBlock = block
		} else {
			// Corrupt or empty database, init from scratch
			self.Reset()
		}
	}
	// Restore the last known head header
186
	currentHeader := self.currentBlock.Header()
187
	if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) {
188
		if header := self.GetHeaderByHash(head); header != nil {
189
			currentHeader = header
190 191
		}
	}
192
	self.hc.SetCurrentHeader(currentHeader)
193 194 195
	// Restore the last known head fast block
	self.currentFastBlock = self.currentBlock
	if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) {
196
		if block := self.GetBlockByHash(head); block != nil {
197 198 199
			self.currentFastBlock = block
		}
	}
200 201 202 203 204 205 206 207 208
	// Initialize a statedb cache to ensure singleton account bloom filter generation
	statedb, err := state.New(self.currentBlock.Root(), self.chainDb)
	if err != nil {
		return err
	}
	self.stateCache = statedb
	self.stateCache.GetAccount(common.Address{})

	// Issue a status log for the user
209 210 211
	headerTd := self.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())
	blockTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	fastTd := self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64())
212

213
	glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", currentHeader.Number, currentHeader.Hash().Bytes()[:4], headerTd)
214
	glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
215
	glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
216 217 218 219

	return nil
}

220 221 222 223
// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
224
func (bc *BlockChain) SetHead(head uint64) {
225 226 227
	bc.mu.Lock()
	defer bc.mu.Unlock()

228 229
	delFn := func(hash common.Hash, num uint64) {
		DeleteBody(bc.chainDb, hash, num)
230
	}
231 232
	bc.hc.SetHead(head, delFn)

233
	// Clear out any stale content from the caches
234 235 236 237
	bc.bodyCache.Purge()
	bc.bodyRLPCache.Purge()
	bc.blockCache.Purge()
	bc.futureBlocks.Purge()
238

239
	// Update all computed fields to the new head
240 241 242
	currentHeader := bc.hc.CurrentHeader()
	if bc.currentBlock != nil && currentHeader.Number.Uint64() < bc.currentBlock.NumberU64() {
		bc.currentBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
243
	}
244 245
	if bc.currentFastBlock != nil && currentHeader.Number.Uint64() < bc.currentFastBlock.NumberU64() {
		bc.currentFastBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())
246 247
	}

248 249 250
	if bc.currentBlock == nil {
		bc.currentBlock = bc.genesisBlock
	}
251 252 253
	if bc.currentFastBlock == nil {
		bc.currentFastBlock = bc.genesisBlock
	}
254

255 256 257 258 259 260
	if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head block hash: %v", err)
	}
	if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil {
		glog.Fatalf("failed to reset head fast block hash: %v", err)
	}
261
	bc.loadLastState()
262 263
}

264 265 266
// FastSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
267
	// Make sure that both the block as well at its state trie exists
268
	block := self.GetBlockByHash(hash)
269 270 271
	if block == nil {
		return fmt.Errorf("non existent block [%x…]", hash[:4])
	}
272
	if _, err := trie.NewSecure(block.Root(), self.chainDb, 0); err != nil {
273 274 275 276 277 278 279 280 281 282 283
		return err
	}
	// If all checks out, manually set the head block
	self.mu.Lock()
	self.currentBlock = block
	self.mu.Unlock()

	glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4])
	return nil
}

284
// GasLimit returns the gas limit of the current HEAD block.
285
func (self *BlockChain) GasLimit() *big.Int {
O
obscuren 已提交
286 287
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
288

289
	return self.currentBlock.GasLimit()
O
obscuren 已提交
290 291
}

292
// LastBlockHash return the hash of the HEAD block.
293
func (self *BlockChain) LastBlockHash() common.Hash {
294 295 296
	self.mu.RLock()
	defer self.mu.RUnlock()

297
	return self.currentBlock.Hash()
298 299
}

300
// CurrentBlock retrieves the current head block of the canonical chain. The
301
// block is retrieved from the blockchain's internal cache.
302
func (self *BlockChain) CurrentBlock() *types.Block {
O
obscuren 已提交
303 304 305 306
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentBlock
O
obscuren 已提交
307 308
}

309
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
310
// chain. The block is retrieved from the blockchain's internal cache.
311 312 313 314 315 316 317
func (self *BlockChain) CurrentFastBlock() *types.Block {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.currentFastBlock
}

318 319
// Status returns status information about the current chain such as the HEAD Td,
// the HEAD hash and the hash of the genesis block.
320
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
O
obscuren 已提交
321 322 323
	self.mu.RLock()
	defer self.mu.RUnlock()

324
	return self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64()), self.currentBlock.Hash(), self.genesisBlock.Hash()
325 326
}

327 328 329 330 331 332 333 334 335 336 337 338
// SetProcessor sets the processor required for making state modifications.
func (self *BlockChain) SetProcessor(processor Processor) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.processor = processor
}

// SetValidator sets the validator which is used to validate incoming blocks.
func (self *BlockChain) SetValidator(validator Validator) {
	self.procmu.Lock()
	defer self.procmu.Unlock()
	self.validator = validator
339 340
}

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
// Validator returns the current validator.
func (self *BlockChain) Validator() Validator {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.validator
}

// Processor returns the current processor.
func (self *BlockChain) Processor() Processor {
	self.procmu.RLock()
	defer self.procmu.RUnlock()
	return self.processor
}

// AuxValidator returns the auxiliary validator (Proof of work atm)
func (self *BlockChain) AuxValidator() pow.PoW { return self.pow }

// State returns a new mutable state based on the current HEAD block.
359
func (self *BlockChain) State() (*state.StateDB, error) {
360 361 362 363 364 365
	return self.StateAt(self.CurrentBlock().Root())
}

// StateAt returns a new mutable state based on a particular point in time.
func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) {
	return self.stateCache.New(root)
O
obscuren 已提交
366 367
}

368
// Reset purges the entire blockchain, restoring it to its genesis state.
369
func (bc *BlockChain) Reset() {
370
	bc.ResetWithGenesisBlock(bc.genesisBlock)
371 372
}

373 374
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
375
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
376 377 378
	// Dump the entire block chain and purge the caches
	bc.SetHead(0)

379 380 381
	bc.mu.Lock()
	defer bc.mu.Unlock()

382
	// Prepare the genesis block and reinitialise the chain
383
	if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
384 385 386
		glog.Fatalf("failed to write genesis block TD: %v", err)
	}
	if err := WriteBlock(bc.chainDb, genesis); err != nil {
387
		glog.Fatalf("failed to write genesis block: %v", err)
388
	}
389
	bc.genesisBlock = genesis
390 391
	bc.insert(bc.genesisBlock)
	bc.currentBlock = bc.genesisBlock
392 393
	bc.hc.SetGenesis(bc.genesisBlock.Header())
	bc.hc.SetCurrentHeader(bc.genesisBlock.Header())
394
	bc.currentFastBlock = bc.genesisBlock
395 396
}

397
// Export writes the active chain to the given writer.
398
func (self *BlockChain) Export(w io.Writer) error {
399
	if err := self.ExportN(w, uint64(0), self.currentBlock.NumberU64()); err != nil {
T
Taylor Gerring 已提交
400 401 402 403 404 405
		return err
	}
	return nil
}

// ExportN writes a subset of the active chain to the given writer.
406
func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
O
obscuren 已提交
407 408
	self.mu.RLock()
	defer self.mu.RUnlock()
O
obscuren 已提交
409

T
Taylor Gerring 已提交
410 411 412 413
	if first > last {
		return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
	}

T
Cleanup  
Taylor Gerring 已提交
414
	glog.V(logger.Info).Infof("exporting %d blocks...\n", last-first+1)
415

T
Taylor Gerring 已提交
416
	for nr := first; nr <= last; nr++ {
417 418 419 420 421 422
		block := self.GetBlockByNumber(nr)
		if block == nil {
			return fmt.Errorf("export failed on #%d: not found", nr)
		}

		if err := block.EncodeRLP(w); err != nil {
423 424
			return err
		}
O
obscuren 已提交
425
	}
426

427
	return nil
O
obscuren 已提交
428 429
}

430 431
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
432 433
// header and the head fast sync block to this very same block if they are older
// or if they are on a different side chain.
434 435
//
// Note, this function assumes that the `mu` mutex is held!
436
func (bc *BlockChain) insert(block *types.Block) {
437 438 439
	// If the block is on a side chain or an unknown one, force other heads onto it too
	updateHeads := GetCanonicalHash(bc.chainDb, block.NumberU64()) != block.Hash()

440 441 442
	// Add the block to the canonical chain number scheme and mark as the head
	if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil {
		glog.Fatalf("failed to insert block number: %v", err)
443
	}
444
	if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil {
445
		glog.Fatalf("failed to insert head block hash: %v", err)
446
	}
447
	bc.currentBlock = block
448 449 450

	// If the block is better than out head or is on a different chain, force update heads
	if updateHeads {
451
		bc.hc.SetCurrentHeader(block.Header())
452 453 454 455 456 457

		if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil {
			glog.Fatalf("failed to insert head fast block hash: %v", err)
		}
		bc.currentFastBlock = block
	}
458 459
}

O
obscuren 已提交
460
// Accessors
461
func (bc *BlockChain) Genesis() *types.Block {
O
obscuren 已提交
462 463 464
	return bc.genesisBlock
}

465 466
// GetBody retrieves a block body (transactions and uncles) from the database by
// hash, caching it if found.
467
func (self *BlockChain) GetBody(hash common.Hash) *types.Body {
468 469
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyCache.Get(hash); ok {
470 471
		body := cached.(*types.Body)
		return body
O
obscuren 已提交
472
	}
473
	body := GetBody(self.chainDb, hash, self.hc.GetBlockNumber(hash))
474 475
	if body == nil {
		return nil
476 477
	}
	// Cache the found body for next time and return
478 479
	self.bodyCache.Add(hash, body)
	return body
480
}
O
obscuren 已提交
481

482 483
// GetBodyRLP retrieves a block body in RLP encoding from the database by hash,
// caching it if found.
484
func (self *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
485 486
	// Short circuit if the body's already in the cache, retrieve otherwise
	if cached, ok := self.bodyRLPCache.Get(hash); ok {
487
		return cached.(rlp.RawValue)
488
	}
489
	body := GetBodyRLP(self.chainDb, hash, self.hc.GetBlockNumber(hash))
490
	if len(body) == 0 {
491
		return nil
O
obscuren 已提交
492
	}
493 494 495 496
	// Cache the found body for next time and return
	self.bodyRLPCache.Add(hash, body)
	return body
}
O
obscuren 已提交
497

498 499
// HasBlock checks if a block is fully present in the database or not, caching
// it if present.
500
func (bc *BlockChain) HasBlock(hash common.Hash) bool {
501
	return bc.GetBlockByHash(hash) != nil
O
obscuren 已提交
502 503
}

504 505 506 507
// HasBlockAndState checks if a block and associated state trie is fully present
// in the database or not, caching it if present.
func (bc *BlockChain) HasBlockAndState(hash common.Hash) bool {
	// Check first that the block itself is known
508
	block := bc.GetBlockByHash(hash)
509 510 511 512 513 514 515 516
	if block == nil {
		return false
	}
	// Ensure the associated state is also present
	_, err := state.New(block.Root(), bc.chainDb)
	return err == nil
}

517 518 519
// GetBlock retrieves a block from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
520 521
	// Short circuit if the block's already in the cache, retrieve otherwise
	if block, ok := self.blockCache.Get(hash); ok {
522 523
		return block.(*types.Block)
	}
524
	block := GetBlock(self.chainDb, hash, number)
525
	if block == nil {
O
obscuren 已提交
526 527
		return nil
	}
528 529 530
	// Cache the found block for next time and return
	self.blockCache.Add(block.Hash(), block)
	return block
O
obscuren 已提交
531 532
}

533 534 535 536 537
// GetBlockByHash retrieves a block from the database by hash, caching it if found.
func (self *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
	return self.GetBlock(hash, self.hc.GetBlockNumber(hash))
}

538 539
// GetBlockByNumber retrieves a block from the database by number, caching it
// (associated with its hash) if found.
540
func (self *BlockChain) GetBlockByNumber(number uint64) *types.Block {
541
	hash := GetCanonicalHash(self.chainDb, number)
542 543 544
	if hash == (common.Hash{}) {
		return nil
	}
545
	return self.GetBlock(hash, number)
546
}
547

548
// [deprecated by eth/62]
F
Felix Lange 已提交
549
// GetBlocksFromHash returns the block corresponding to hash and up to n-1 ancestors.
550
func (self *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*types.Block) {
551
	number := self.hc.GetBlockNumber(hash)
F
Felix Lange 已提交
552
	for i := 0; i < n; i++ {
553
		block := self.GetBlock(hash, number)
F
Felix Lange 已提交
554 555 556 557 558
		if block == nil {
			break
		}
		blocks = append(blocks, block)
		hash = block.ParentHash()
559
		number--
F
Felix Lange 已提交
560 561 562 563
	}
	return
}

564 565 566 567
// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
	uncles := []*types.Header{}
568 569
	for i := 0; block != nil && i < length; i++ {
		uncles = append(uncles, block.Uncles()...)
570
		block = self.GetBlock(block.ParentHash(), block.NumberU64()-1)
571
	}
572
	return uncles
O
obscuren 已提交
573
}
O
obscuren 已提交
574

575 576
// Stop stops the blockchain service. If any imports are currently in progress
// it will abort them using the procInterrupt.
577
func (bc *BlockChain) Stop() {
578 579 580
	if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
		return
	}
581
	close(bc.quit)
O
obscuren 已提交
582
	atomic.StoreInt32(&bc.procInterrupt, 1)
583 584 585 586

	bc.wg.Wait()

	glog.V(logger.Info).Infoln("Chain manager stopped")
587 588
}

589
func (self *BlockChain) procFutureBlocks() {
590 591 592 593 594
	blocks := make([]*types.Block, 0, self.futureBlocks.Len())
	for _, hash := range self.futureBlocks.Keys() {
		if block, exist := self.futureBlocks.Get(hash); exist {
			blocks = append(blocks, block.(*types.Block))
		}
595
	}
596 597 598 599
	if len(blocks) > 0 {
		types.BlockBy(types.Number).Sort(blocks)
		self.InsertChain(blocks)
	}
600 601
}

602
type WriteStatus byte
603 604

const (
605
	NonStatTy WriteStatus = iota
606 607 608
	CanonStatTy
	SplitStatTy
	SideStatTy
609 610
)

611 612 613
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (self *BlockChain) Rollback(chain []common.Hash) {
614 615 616
	self.mu.Lock()
	defer self.mu.Unlock()

617 618 619
	for i := len(chain) - 1; i >= 0; i-- {
		hash := chain[i]

620 621 622
		currentHeader := self.hc.CurrentHeader()
		if currentHeader.Hash() == hash {
			self.hc.SetCurrentHeader(self.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1))
623 624
		}
		if self.currentFastBlock.Hash() == hash {
625
			self.currentFastBlock = self.GetBlock(self.currentFastBlock.ParentHash(), self.currentFastBlock.NumberU64()-1)
626 627 628
			WriteHeadFastBlockHash(self.chainDb, self.currentFastBlock.Hash())
		}
		if self.currentBlock.Hash() == hash {
629
			self.currentBlock = self.GetBlock(self.currentBlock.ParentHash(), self.currentBlock.NumberU64()-1)
630 631 632 633 634
			WriteHeadBlockHash(self.chainDb, self.currentBlock.Hash())
		}
	}
}

635 636 637 638 639 640 641
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
	self.wg.Add(1)
	defer self.wg.Done()

	// Collect some import statistics to report on
642
	stats := struct{ processed, ignored int32 }{}
643 644
	start := time.Now()

645 646
	// Create the block importing task queue and worker functions
	tasks := make(chan int, len(blockChain))
647
	for i := 0; i < len(blockChain) && i < len(receiptChain); i++ {
648 649 650
		tasks <- i
	}
	close(tasks)
651

652 653 654 655 656 657 658 659
	errs, failed := make([]error, len(tasks)), int32(0)
	process := func(worker int) {
		for index := range tasks {
			block, receipts := blockChain[index], receiptChain[index]

			// Short circuit insertion if shutting down or processing failed
			if atomic.LoadInt32(&self.procInterrupt) == 1 {
				return
660
			}
661 662
			if atomic.LoadInt32(&failed) > 0 {
				return
663
			}
664 665 666 667 668
			// Short circuit if the owner header is unknown
			if !self.HasHeader(block.Hash()) {
				errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
				atomic.AddInt32(&failed, 1)
				return
669
			}
670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702
			// Skip if the entire data is already known
			if self.HasBlock(block.Hash()) {
				atomic.AddInt32(&stats.ignored, 1)
				continue
			}
			// Compute all the non-consensus fields of the receipts
			transactions, logIndex := block.Transactions(), uint(0)
			for j := 0; j < len(receipts); j++ {
				// The transaction hash can be retrieved from the transaction itself
				receipts[j].TxHash = transactions[j].Hash()

				// The contract address can be derived from the transaction itself
				if MessageCreatesContract(transactions[j]) {
					from, _ := transactions[j].From()
					receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce())
				}
				// The used gas can be calculated based on previous receipts
				if j == 0 {
					receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed)
				} else {
					receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed)
				}
				// The derived log fields can simply be set from the block and transaction
				for k := 0; k < len(receipts[j].Logs); k++ {
					receipts[j].Logs[k].BlockNumber = block.NumberU64()
					receipts[j].Logs[k].BlockHash = block.Hash()
					receipts[j].Logs[k].TxHash = receipts[j].TxHash
					receipts[j].Logs[k].TxIndex = uint(j)
					receipts[j].Logs[k].Index = logIndex
					logIndex++
				}
			}
			// Write all the data out into the database
703
			if err := WriteBody(self.chainDb, block.Hash(), block.NumberU64(), block.Body()); err != nil {
704 705 706 707 708
				errs[index] = fmt.Errorf("failed to write block body: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
709
			if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
710 711 712 713 714
				errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
715 716 717 718 719 720
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
721 722 723 724 725 726 727 728 729 730 731 732
			if err := WriteTransactions(self.chainDb, block); err != nil {
				errs[index] = fmt.Errorf("failed to write individual transactions: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
				errs[index] = fmt.Errorf("failed to write individual receipts: %v", err)
				atomic.AddInt32(&failed, 1)
				glog.Fatal(errs[index])
				return
			}
733
			atomic.AddInt32(&stats.processed, 1)
734
		}
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751
	}
	// Start as many worker threads as goroutines allowed
	pending := new(sync.WaitGroup)
	for i := 0; i < runtime.GOMAXPROCS(0); i++ {
		pending.Add(1)
		go func(id int) {
			defer pending.Done()
			process(id)
		}(i)
	}
	pending.Wait()

	// If anything failed, report
	if failed > 0 {
		for i, err := range errs {
			if err != nil {
				return i, err
752 753 754
			}
		}
	}
755 756 757 758
	if atomic.LoadInt32(&self.procInterrupt) == 1 {
		glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
		return 0, nil
	}
759 760 761
	// Update the head fast sync block if better
	self.mu.Lock()
	head := blockChain[len(errs)-1]
762
	if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(self.GetTd(head.Hash(), head.NumberU64())) < 0 {
763 764 765 766 767 768 769
		if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil {
			glog.Fatalf("failed to update head fast block hash: %v", err)
		}
		self.currentFastBlock = head
	}
	self.mu.Unlock()

770 771
	// Report some public statistics so the user has a clue what's going on
	first, last := blockChain[0], blockChain[len(blockChain)-1]
772 773 774 775 776

	ignored := ""
	if stats.ignored > 0 {
		ignored = fmt.Sprintf(" (%d ignored)", stats.ignored)
	}
777
	glog.V(logger.Info).Infof("imported %d receipts in %9v. #%d [%x… / %x…]%s", stats.processed, common.PrettyDuration(time.Since(start)), last.Number(), first.Hash().Bytes()[:4], last.Hash().Bytes()[:4], ignored)
778 779 780 781

	return 0, nil
}

782
// WriteBlock writes the block to the chain.
783
func (self *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err error) {
784 785 786
	self.wg.Add(1)
	defer self.wg.Done()

787
	// Calculate the total difficulty of the block
788
	ptd := self.GetTd(block.ParentHash(), block.NumberU64()-1)
789 790 791
	if ptd == nil {
		return NonStatTy, ParentError(block.ParentHash())
	}
792 793 794
	// Make sure no inconsistent state is leaked during insertion
	self.mu.Lock()
	defer self.mu.Unlock()
795

796 797 798
	localTd := self.GetTd(self.currentBlock.Hash(), self.currentBlock.NumberU64())
	externTd := new(big.Int).Add(block.Difficulty(), ptd)

799 800 801 802 803 804 805 806
	// Irrelevant of the canonical status, write the block itself to the database
	if err := self.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil {
		glog.Fatalf("failed to write block total difficulty: %v", err)
	}
	if err := WriteBlock(self.chainDb, block); err != nil {
		glog.Fatalf("failed to write block contents: %v", err)
	}

807
	// If the total difficulty is higher than our known, add it to the canonical chain
808 809 810
	// Second clause in the if statement reduces the vulnerability to selfish mining.
	// Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf
	if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) {
811
		// Reorganise the chain if the parent is not the head block
812 813
		if block.ParentHash() != self.currentBlock.Hash() {
			if err := self.reorg(self.currentBlock, block); err != nil {
814
				return NonStatTy, err
815 816
			}
		}
817
		self.insert(block) // Insert the block as the new head of the chain
818
		status = CanonStatTy
819
	} else {
820
		status = SideStatTy
821
	}
822

823
	self.futureBlocks.Remove(block.Hash())
824 825 826 827

	return
}

828 829
// InsertChain will attempt to insert the given chain in to the canonical chain or, otherwise, create a fork. It an error is returned
// it will return the index number of the failing block as well an error describing what went wrong (for possible errors see core/errors.go).
830
func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
831 832 833
	self.wg.Add(1)
	defer self.wg.Done()

O
obscuren 已提交
834 835 836
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

F
Felix Lange 已提交
837 838 839
	// A queued approach to delivering events. This is generally
	// faster than direct delivery and requires much less mutex
	// acquiring.
O
obscuren 已提交
840
	var (
F
Felix Lange 已提交
841
		stats         = insertStats{startTime: time.Now()}
842 843
		events        = make([]interface{}, 0, len(chain))
		coalescedLogs vm.Logs
F
Felix Lange 已提交
844
		nonceChecked  = make([]bool, len(chain))
O
obscuren 已提交
845
	)
846

F
Felix Lange 已提交
847
	// Start the parallel nonce verifier.
848 849
	nonceAbort, nonceResults := verifyNoncesFromBlocks(self.pow, chain)
	defer close(nonceAbort)
850

851
	for i, block := range chain {
O
obscuren 已提交
852
		if atomic.LoadInt32(&self.procInterrupt) == 1 {
853
			glog.V(logger.Debug).Infoln("Premature abort during block chain processing")
O
obscuren 已提交
854 855
			break
		}
856

O
obscuren 已提交
857 858 859 860
		bstart := time.Now()
		// Wait for block i's nonce to be verified before processing
		// its state transition.
		for !nonceChecked[i] {
861 862
			r := <-nonceResults
			nonceChecked[r.index] = true
O
obscuren 已提交
863
			if !r.valid {
864 865
				block := chain[r.index]
				return r.index, &BlockNonceErr{Hash: block.Hash(), Number: block.Number(), Nonce: block.Nonce()}
O
obscuren 已提交
866
			}
O
obscuren 已提交
867
		}
O
obscuren 已提交
868

O
obscuren 已提交
869
		if BadHashes[block.Hash()] {
870
			err := BadHashError(block.Hash())
871
			reportBlock(block, err)
O
obscuren 已提交
872 873
			return i, err
		}
874 875 876
		// Stage 1 validation of the block using the chain's validator
		// interface.
		err := self.Validator().ValidateBlock(block)
O
obscuren 已提交
877 878 879 880 881
		if err != nil {
			if IsKnownBlockErr(err) {
				stats.ignored++
				continue
			}
882

O
obscuren 已提交
883 884 885 886
			if err == BlockFutureErr {
				// Allow up to MaxFuture second in the future blocks. If this limit
				// is exceeded the chain is discarded and processed at a later time
				// if given.
887 888
				max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
				if block.Time().Cmp(max) == 1 {
O
obscuren 已提交
889
					return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max)
890
				}
O
obscuren 已提交
891

892
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
893 894 895
				stats.queued++
				continue
			}
O
obscuren 已提交
896

897 898
			if IsParentErr(err) && self.futureBlocks.Contains(block.ParentHash()) {
				self.futureBlocks.Add(block.Hash(), block)
O
obscuren 已提交
899 900
				stats.queued++
				continue
901
			}
902

903
			reportBlock(block, err)
O
obscuren 已提交
904

905 906
			return i, err
		}
907

908 909
		// Create a new statedb using the parent block and report an
		// error if it fails.
910 911 912 913 914
		switch {
		case i == 0:
			err = self.stateCache.Reset(self.GetBlock(block.ParentHash(), block.NumberU64()-1).Root())
		default:
			err = self.stateCache.Reset(chain[i-1].Root())
915
		}
916 917
		if err != nil {
			reportBlock(block, err)
O
obscuren 已提交
918 919
			return i, err
		}
920
		// Process block using the parent state as reference point.
921
		receipts, logs, usedGas, err := self.processor.Process(block, self.stateCache, self.config.VmConfig)
922 923 924 925 926
		if err != nil {
			reportBlock(block, err)
			return i, err
		}
		// Validate the state using the default validator
927
		err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash(), block.NumberU64()-1), self.stateCache, receipts, usedGas)
928 929 930 931 932
		if err != nil {
			reportBlock(block, err)
			return i, err
		}
		// Write state changes to database
933
		_, err = self.stateCache.Commit()
934 935 936 937 938 939 940
		if err != nil {
			return i, err
		}

		// coalesce logs for later processing
		coalescedLogs = append(coalescedLogs, logs...)

941
		if err := WriteBlockReceipts(self.chainDb, block.Hash(), block.NumberU64(), receipts); err != nil {
942
			return i, err
943
		}
O
obscuren 已提交
944

945
		// write the block to the chain and get the status
946
		status, err := self.WriteBlock(block)
947 948 949
		if err != nil {
			return i, err
		}
950

951
		switch status {
952
		case CanonStatTy:
O
obscuren 已提交
953
			if glog.V(logger.Debug) {
954
				glog.Infof("inserted block #%d [%x…] in %9v: %3d txs %7v gas %d uncles.", block.Number(), block.Hash().Bytes()[0:4], common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), block.GasUsed(), len(block.Uncles()))
O
obscuren 已提交
955
			}
956
			blockInsertTimer.UpdateSince(bstart)
957
			events = append(events, ChainEvent{block, block.Hash(), logs})
958 959

			// This puts transactions in a extra db for rpc
960
			if err := WriteTransactions(self.chainDb, block); err != nil {
961 962
				return i, err
			}
963
			// store the receipts
964
			if err := WriteReceipts(self.chainDb, receipts); err != nil {
965 966 967 968 969 970
				return i, err
			}
			// Write map map bloom filters
			if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
				return i, err
			}
971
		case SideStatTy:
O
obscuren 已提交
972
			if glog.V(logger.Detail) {
973
				glog.Infof("inserted forked block #%d [%x…] (TD=%v) in %9v: %3d txs %d uncles.", block.Number(), block.Hash().Bytes()[0:4], block.Difficulty(), common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), len(block.Uncles()))
O
obscuren 已提交
974
			}
975
			blockInsertTimer.UpdateSince(bstart)
976 977
			events = append(events, ChainSideEvent{block, logs})

978
		case SplitStatTy:
979
			events = append(events, ChainSplitEvent{block, logs})
980
		}
F
Felix Lange 已提交
981

O
obscuren 已提交
982
		stats.processed++
F
Felix Lange 已提交
983
		if glog.V(logger.Info) {
984
			stats.usedGas += usedGas.Uint64()
F
Felix Lange 已提交
985 986
			stats.report(chain, i)
		}
O
obscuren 已提交
987 988
	}

989
	go self.postChainEvents(events, coalescedLogs)
990

991
	return 0, nil
O
obscuren 已提交
992
}
993

F
Felix Lange 已提交
994 995 996
// insertStats tracks and reports on block insertion.
type insertStats struct {
	queued, processed, ignored int
997
	usedGas                    uint64
F
Felix Lange 已提交
998 999 1000 1001
	lastIndex                  int
	startTime                  time.Time
}

1002 1003 1004
// statsReportLimit is the time limit during import after which we always print
// out progress. This avoids the user wondering what's going on.
const statsReportLimit = 8 * time.Second
F
Felix Lange 已提交
1005 1006 1007 1008

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int) {
1009
	// Fetch the timings for the batch
1010 1011 1012 1013
	var (
		now     = time.Now()
		elapsed = now.Sub(st.startTime)
	)
1014 1015 1016 1017
	if elapsed == 0 { // Yes Windows, I'm looking at you
		elapsed = 1
	}
	// If we're at the last block of the batch or report period reached, log
1018
	if index == len(chain)-1 || elapsed >= statsReportLimit {
F
Felix Lange 已提交
1019
		start, end := chain[st.lastIndex], chain[index]
J
Jeffrey Wilcke 已提交
1020
		txcount := countTransactions(chain[st.lastIndex : index+1])
1021 1022 1023 1024 1025

		extra := ""
		if st.queued > 0 || st.ignored > 0 {
			extra = fmt.Sprintf(" (%d queued %d ignored)", st.queued, st.ignored)
		}
1026 1027 1028 1029 1030 1031 1032
		hashes := ""
		if st.processed > 1 {
			hashes = fmt.Sprintf("%x… / %x…", start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
		} else {
			hashes = fmt.Sprintf("%x…", end.Hash().Bytes()[:4])
		}
		glog.Infof("imported %d blocks, %5d txs (%7.3f Mg) in %9v (%6.3f Mg/s). #%v [%s]%s", st.processed, txcount, float64(st.usedGas)/1000000, common.PrettyDuration(elapsed), float64(st.usedGas)*1000/float64(elapsed), end.Number(), hashes, extra)
1033

F
Felix Lange 已提交
1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
		*st = insertStats{startTime: now, lastIndex: index}
	}
}

func countTransactions(chain []*types.Block) (c int) {
	for _, b := range chain {
		c += len(b.Transactions())
	}
	return c
}

1045 1046 1047
// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
1048
func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
1049
	var (
1050 1051 1052 1053 1054 1055 1056 1057
		newChain          types.Blocks
		oldChain          types.Blocks
		commonBlock       *types.Block
		oldStart          = oldBlock
		newStart          = newBlock
		deletedTxs        types.Transactions
		deletedLogs       vm.Logs
		deletedLogsByHash = make(map[common.Hash]vm.Logs)
J
Jeffrey Wilcke 已提交
1058 1059 1060 1061 1062
		// collectLogs collects the logs that were generated during the
		// processing of the block that corresponds with the given hash.
		// These logs are later announced as deleted.
		collectLogs = func(h common.Hash) {
			// Coalesce logs
1063
			receipts := GetBlockReceipts(self.chainDb, h, self.hc.GetBlockNumber(h))
J
Jeffrey Wilcke 已提交
1064 1065
			for _, receipt := range receipts {
				deletedLogs = append(deletedLogs, receipt.Logs...)
1066 1067

				deletedLogsByHash[h] = receipt.Logs
J
Jeffrey Wilcke 已提交
1068 1069
			}
		}
1070
	)
1071 1072 1073 1074

	// first reduce whoever is higher bound
	if oldBlock.NumberU64() > newBlock.NumberU64() {
		// reduce old chain
1075
		for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
1076
			oldChain = append(oldChain, oldBlock)
1077
			deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1078 1079

			collectLogs(oldBlock.Hash())
1080 1081 1082
		}
	} else {
		// reduce new chain and append new chain blocks for inserting later on
1083
		for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
1084 1085
			newChain = append(newChain, newBlock)
		}
O
obscuren 已提交
1086
	}
O
obscuren 已提交
1087
	if oldBlock == nil {
1088
		return fmt.Errorf("Invalid old chain")
O
obscuren 已提交
1089 1090
	}
	if newBlock == nil {
1091
		return fmt.Errorf("Invalid new chain")
O
obscuren 已提交
1092
	}
O
obscuren 已提交
1093

1094
	numSplit := newBlock.Number()
1095 1096
	for {
		if oldBlock.Hash() == newBlock.Hash() {
1097
			commonBlock = oldBlock
1098 1099
			break
		}
1100 1101

		oldChain = append(oldChain, oldBlock)
1102
		newChain = append(newChain, newBlock)
1103
		deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
J
Jeffrey Wilcke 已提交
1104
		collectLogs(oldBlock.Hash())
O
obscuren 已提交
1105

1106
		oldBlock, newBlock = self.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), self.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
1107
		if oldBlock == nil {
1108
			return fmt.Errorf("Invalid old chain")
1109 1110
		}
		if newBlock == nil {
1111
			return fmt.Errorf("Invalid new chain")
1112
		}
1113 1114
	}

1115
	if glog.V(logger.Debug) {
1116
		commonHash := commonBlock.Hash()
1117
		glog.Infof("Chain split detected @ %x. Reorganising chain from #%v %x to %x", commonHash[:4], numSplit, oldStart.Hash().Bytes()[:4], newStart.Hash().Bytes()[:4])
1118 1119
	}

1120
	var addedTxs types.Transactions
1121
	// insert blocks. Order does not matter. Last block will be written in ImportChain itself which creates the new head properly
1122
	for _, block := range newChain {
1123
		// insert the block in the canonical way, re-writing history
1124
		self.insert(block)
1125
		// write canonical receipts and transactions
1126
		if err := WriteTransactions(self.chainDb, block); err != nil {
1127 1128
			return err
		}
1129
		receipts := GetBlockReceipts(self.chainDb, block.Hash(), block.NumberU64())
1130
		// write receipts
1131
		if err := WriteReceipts(self.chainDb, receipts); err != nil {
1132 1133 1134 1135 1136 1137
			return err
		}
		// Write map map bloom filters
		if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
			return err
		}
1138 1139 1140 1141 1142 1143 1144 1145 1146 1147
		addedTxs = append(addedTxs, block.Transactions()...)
	}

	// calculate the difference between deleted and added transactions
	diff := types.TxDifference(deletedTxs, addedTxs)
	// When transactions get deleted from the database that means the
	// receipts that were created in the fork must also be deleted
	for _, tx := range diff {
		DeleteReceipt(self.chainDb, tx.Hash())
		DeleteTransaction(self.chainDb, tx.Hash())
1148
	}
1149 1150
	// Must be posted in a goroutine because of the transaction pool trying
	// to acquire the chain manager lock
J
Jeffrey Wilcke 已提交
1151 1152 1153 1154
	if len(diff) > 0 {
		go self.eventMux.Post(RemovedTransactionEvent{diff})
	}
	if len(deletedLogs) > 0 {
1155
		go self.eventMux.Post(RemovedLogsEvent{deletedLogs})
J
Jeffrey Wilcke 已提交
1156
	}
1157

1158 1159 1160 1161 1162 1163 1164 1165
	if len(oldChain) > 0 {
		go func() {
			for _, block := range oldChain {
				self.eventMux.Post(ChainSideEvent{Block: block, Logs: deletedLogsByHash[block.Hash()]})
			}
		}()
	}

1166
	return nil
1167 1168
}

1169 1170
// postChainEvents iterates over the events generated by a chain insertion and
// posts them into the event mux.
1171 1172 1173
func (self *BlockChain) postChainEvents(events []interface{}, logs vm.Logs) {
	// post event logs for further processing
	self.eventMux.Post(logs)
1174 1175 1176 1177
	for _, event := range events {
		if event, ok := event.(ChainEvent); ok {
			// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
			// and in most cases isn't even necessary.
1178
			if self.LastBlockHash() == event.Hash {
1179 1180 1181 1182 1183 1184 1185 1186
				self.eventMux.Post(ChainHeadEvent{event.Block})
			}
		}
		// Fire the insertion events individually too
		self.eventMux.Post(event)
	}
}

1187
func (self *BlockChain) update() {
O
obscuren 已提交
1188
	futureTimer := time.Tick(5 * time.Second)
1189 1190
	for {
		select {
O
obscuren 已提交
1191
		case <-futureTimer:
1192
			self.procFutureBlocks()
1193
		case <-self.quit:
1194
			return
1195 1196 1197
		}
	}
}
1198

F
Felix Lange 已提交
1199
// reportBlock logs a bad block error.
1200
func reportBlock(block *types.Block, err error) {
1201 1202 1203 1204
	if glog.V(logger.Error) {
		glog.Errorf("Bad block #%v (%s)\n", block.Number(), block.Hash().Hex())
		glog.Errorf("    %v", err)
	}
F
Felix Lange 已提交
1205
}
1206 1207 1208 1209 1210 1211 1212

// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
L
Leif Jurvetson 已提交
1213
// of the header retrieval mechanisms already need to verify nonces, as well as
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
// because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
	// Make sure only one thread manipulates the chain at once
	self.chainmu.Lock()
	defer self.chainmu.Unlock()

	self.wg.Add(1)
	defer self.wg.Done()

	whFunc := func(header *types.Header) error {
		self.mu.Lock()
		defer self.mu.Unlock()

		_, err := self.hc.WriteHeader(header)
		return err
	}

	return self.hc.InsertHeaderChain(chain, checkFreq, whFunc)
}

// writeHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (self *BlockChain) writeHeader(header *types.Header) error {
	self.wg.Add(1)
	defer self.wg.Done()

	self.mu.Lock()
	defer self.mu.Unlock()

	_, err := self.hc.WriteHeader(header)
	return err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (self *BlockChain) CurrentHeader() *types.Header {
	self.mu.RLock()
	defer self.mu.RUnlock()

	return self.hc.CurrentHeader()
}

// GetTd retrieves a block's total difficulty in the canonical chain from the
1264 1265 1266 1267 1268 1269
// database by hash and number, caching it if found.
func (self *BlockChain) GetTd(hash common.Hash, number uint64) *big.Int {
	return self.hc.GetTd(hash, number)
}

// GetTdByHash retrieves a block's total difficulty in the canonical chain from the
1270
// database by hash, caching it if found.
1271 1272 1273 1274 1275 1276 1277 1278
func (self *BlockChain) GetTdByHash(hash common.Hash) *big.Int {
	return self.hc.GetTdByHash(hash)
}

// GetHeader retrieves a block header from the database by hash and number,
// caching it if found.
func (self *BlockChain) GetHeader(hash common.Hash, number uint64) *types.Header {
	return self.hc.GetHeader(hash, number)
1279 1280
}

1281
// GetHeaderByHash retrieves a block header from the database by hash, caching it if
1282
// found.
1283 1284
func (self *BlockChain) GetHeaderByHash(hash common.Hash) *types.Header {
	return self.hc.GetHeaderByHash(hash)
1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
}

// HasHeader checks if a block header is present in the database or not, caching
// it if present.
func (bc *BlockChain) HasHeader(hash common.Hash) bool {
	return bc.hc.HasHeader(hash)
}

// GetBlockHashesFromHash retrieves a number of block hashes starting at a given
// hash, fetching towards the genesis block.
func (self *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []common.Hash {
	return self.hc.GetBlockHashesFromHash(hash, max)
}

// GetHeaderByNumber retrieves a block header from the database by number,
// caching it (associated with its hash) if found.
func (self *BlockChain) GetHeaderByNumber(number uint64) *types.Header {
	return self.hc.GetHeaderByNumber(number)
}
1304 1305 1306

// Config retrieves the blockchain's chain configuration.
func (self *BlockChain) Config() *ChainConfig { return self.config }