core: make txpool operate on immutable state

上级 e7408b55
......@@ -81,7 +81,6 @@ type BlockChain struct {
hc *HeaderChain
chainDb ethdb.Database
rmTxFeed event.Feed
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
......@@ -1194,15 +1193,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
for _, tx := range diff {
DeleteTxLookupEntry(bc.chainDb, tx.Hash())
}
// Must be posted in a goroutine because of the transaction pool trying
// to acquire the chain manager lock
if len(diff) > 0 {
go bc.rmTxFeed.Send(RemovedTransactionEvent{diff})
}
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
......@@ -1401,11 +1394,6 @@ func (bc *BlockChain) Config() *params.ChainConfig { return bc.config }
// Engine retrieves the blockchain's consensus engine.
func (bc *BlockChain) Engine() consensus.Engine { return bc.engine }
// SubscribeRemovedTxEvent registers a subscription of RemovedTransactionEvent.
func (bc *BlockChain) SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription {
return bc.scope.Track(bc.rmTxFeed.Subscribe(ch))
}
// SubscribeRemovedLogsEvent registers a subscription of RemovedLogsEvent.
func (bc *BlockChain) SubscribeRemovedLogsEvent(ch chan<- RemovedLogsEvent) event.Subscription {
return bc.scope.Track(bc.rmLogsFeed.Subscribe(ch))
......
......@@ -28,4 +28,8 @@ var (
// ErrBlacklistedHash is returned if a block to import is on the blacklist.
ErrBlacklistedHash = errors.New("blacklisted hash")
// ErrNonceTooHigh is returned if the nonce of a transaction is higher than the
// next one expected based on the local chain.
ErrNonceTooHigh = errors.New("nonce too high")
)
......@@ -18,7 +18,6 @@ package core
import (
"errors"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
......@@ -197,8 +196,11 @@ func (st *StateTransition) preCheck() error {
// Make sure this transaction's nonce is correct
if msg.CheckNonce() {
if n := st.state.GetNonce(sender.Address()); n != msg.Nonce() {
return fmt.Errorf("invalid nonce: have %d, expected %d", msg.Nonce(), n)
nonce := st.state.GetNonce(sender.Address())
if nonce < msg.Nonce() {
return ErrNonceTooHigh
} else if nonce > msg.Nonce() {
return ErrNonceTooLow
}
}
return st.buyGas()
......
......@@ -298,6 +298,7 @@ func (l *txList) Filter(costLimit, gasLimit *big.Int) (types.Transactions, types
// If the list was strict, filter anything above the lowest nonce
var invalids types.Transactions
if l.strict && len(removed) > 0 {
lowest := uint64(math.MaxUint64)
for _, tx := range removed {
......
......@@ -105,10 +105,11 @@ var (
// blockChain provides the state of blockchain and current gas limit to do
// some pre checks in tx pool and event subscribers.
type blockChain interface {
State() (*state.StateDB, error)
GasLimit() *big.Int
CurrentHeader() *types.Header
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
SubscribeRemovedTxEvent(ch chan<- RemovedTransactionEvent) event.Subscription
GetBlock(hash common.Hash, number uint64) *types.Block
StateAt(root common.Hash) (*state.StateDB, error)
}
// TxPoolConfig are the configuration parameters of the transaction pool.
......@@ -174,18 +175,19 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
blockChain blockChain
pendingState *state.ManagedState
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
rmTxCh chan RemovedTransactionEvent
rmTxSub event.Subscription
signer types.Signer
mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
currentMaxGas *big.Int // Current gas limit for transaction caps
locals *accountSet // Set of local transaction to exepmt from evicion rules
journal *txJournal // Journal of local transaction to back up to disk
......@@ -202,28 +204,26 @@ type TxPool struct {
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// trnsactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain blockChain) *TxPool {
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
blockChain: blockChain,
signer: types.NewEIP155Signer(chainconfig.ChainId),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: make(map[common.Hash]*types.Transaction),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
rmTxCh: make(chan RemovedTransactionEvent, rmTxChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
pendingState: nil,
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainId),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: make(map[common.Hash]*types.Transaction),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(&pool.all)
pool.reset()
pool.reset(nil, chain.CurrentHeader())
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
......@@ -237,8 +237,8 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, blockChain
}
}
// Subscribe events from blockchain
pool.chainHeadSub = pool.blockChain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.rmTxSub = pool.blockChain.SubscribeRemovedTxEvent(pool.rmTxCh)
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return
pool.wg.Add(1)
go pool.loop()
......@@ -264,31 +264,28 @@ func (pool *TxPool) loop() {
journal := time.NewTicker(pool.config.Rejournal)
defer journal.Stop()
// Track the previous head headers for transaction reorgs
head := pool.chain.CurrentHeader()
// Keep waiting for and reacting to the various events
for {
select {
// Handle ChainHeadEvent
case ev := <-pool.chainHeadCh:
pool.mu.Lock()
if ev.Block != nil {
pool.mu.Lock()
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
pool.homestead = true
}
pool.reset(head, ev.Block.Header())
head = ev.Block.Header()
pool.mu.Unlock()
}
pool.reset()
pool.mu.Unlock()
// Be unsubscribed due to system stopped
case <-pool.chainHeadSub.Err():
return
// Handle RemovedTransactionEvent
case ev := <-pool.rmTxCh:
pool.addTxs(ev.Txs, false)
// Be unsubscribed due to system stopped
case <-pool.rmTxSub.Err():
return
// Handle stats reporting ticks
case <-report.C:
pool.mu.RLock()
......@@ -333,28 +330,76 @@ func (pool *TxPool) loop() {
// lockedReset is a wrapper around reset to allow calling it in a thread safe
// manner. This method is only ever used in the tester!
func (pool *TxPool) lockedReset() {
func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.reset()
pool.reset(oldHead, newHead)
}
// reset retrieves the current state of the blockchain and ensures the content
// of the transaction pool is valid with regard to the chain state.
func (pool *TxPool) reset() {
currentState, err := pool.blockChain.State()
func (pool *TxPool) reset(oldHead, newHead *types.Header) {
// If we're reorging an old state, reinject all dropped transactions
var reinject types.Transactions
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {
var discarded, included types.Transactions
var (
rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64())
add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64())
)
for rem.NumberU64() > add.NumberU64() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
}
for add.NumberU64() > rem.NumberU64() {
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
for rem.Hash() != add.Hash() {
discarded = append(discarded, rem.Transactions()...)
if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil {
log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash())
return
}
included = append(included, add.Transactions()...)
if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil {
log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash())
return
}
}
reinject = types.TxDifference(discarded, included)
}
// Initialize the internal state to the current head
if newHead == nil {
newHead = pool.chain.CurrentHeader() // Special case during testing
}
statedb, err := pool.chain.StateAt(newHead.Root)
if err != nil {
log.Error("Failed reset txpool state", "err", err)
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.pendingState = state.ManageState(currentState)
pool.currentState = statedb
pool.pendingState = state.ManageState(statedb)
pool.currentMaxGas = newHead.GasLimit
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
pool.addTxsLocked(reinject, false)
// validate the pool of pending transactions, this will remove
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
// higher gas price)
pool.demoteUnexecutables(currentState)
pool.demoteUnexecutables()
// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
......@@ -363,16 +408,16 @@ func (pool *TxPool) reset() {
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
pool.promoteExecutables(currentState, nil)
pool.promoteExecutables(nil)
}
// Stop terminates the transaction pool.
func (pool *TxPool) Stop() {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()
// Unsubscribe subscriptions registered from blockchain
pool.chainHeadSub.Unsubscribe()
pool.rmTxSub.Unsubscribe()
pool.wg.Wait()
if pool.journal != nil {
......@@ -442,8 +487,8 @@ func (pool *TxPool) stats() (int, int) {
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
pool.mu.RLock()
defer pool.mu.RUnlock()
pool.mu.Lock()
defer pool.mu.Unlock()
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
......@@ -499,7 +544,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.blockChain.GasLimit().Cmp(tx.Gas()) < 0 {
if pool.currentMaxGas.Cmp(tx.Gas()) < 0 {
return ErrGasLimit
}
// Make sure the transaction is signed properly
......@@ -513,16 +558,12 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
currentState, err := pool.blockChain.State()
if err != nil {
return err
}
if currentState.GetNonce(from) > tx.Nonce() {
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
return ErrInsufficientFunds
}
intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
......@@ -721,12 +762,8 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
}
// If we added a new transaction, run promotion checks and return
if !replace {
state, err := pool.blockChain.State()
if err != nil {
return err
}
from, _ := types.Sender(pool.signer, tx) // already validated
pool.promoteExecutables(state, []common.Address{from})
pool.promoteExecutables([]common.Address{from})
}
return nil
}
......@@ -736,6 +773,12 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
pool.mu.Lock()
defer pool.mu.Unlock()
return pool.addTxsLocked(txs, local)
}
// addTxsLocked attempts to queue a batch of transactions if they are valid,
// whilst assuming the transaction pool lock is already held.
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
// Add the batch of transaction, tracking the accepted ones
dirty := make(map[common.Address]struct{})
for _, tx := range txs {
......@@ -748,15 +791,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
}
// Only reprocess the internal state if something was actually added
if len(dirty) > 0 {
state, err := pool.blockChain.State()
if err != nil {
return err
}
addrs := make([]common.Address, 0, len(dirty))
for addr, _ := range dirty {
addrs = append(addrs, addr)
}
pool.promoteExecutables(state, addrs)
pool.promoteExecutables(addrs)
}
return nil
}
......@@ -770,24 +809,6 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
return pool.all[hash]
}
// Remove removes the transaction with the given hash from the pool.
func (pool *TxPool) Remove(hash common.Hash) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.removeTx(hash)
}
// RemoveBatch removes all given transactions from the pool.
func (pool *TxPool) RemoveBatch(txs types.Transactions) {
pool.mu.Lock()
defer pool.mu.Unlock()
for _, tx := range txs {
pool.removeTx(tx.Hash())
}
}
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash) {
......@@ -834,9 +855,7 @@ func (pool *TxPool) removeTx(hash common.Hash) {
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.Address) {
gaslimit := pool.blockChain.GasLimit()
func (pool *TxPool) promoteExecutables(accounts []common.Address) {
// Gather all the accounts potentially needing updates
if accounts == nil {
accounts = make([]common.Address, 0, len(pool.queue))
......@@ -851,14 +870,14 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
continue // Just in case someone calls with a non existing account
}
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) {
for _, tx := range list.Forward(pool.currentState.GetNonce(addr)) {
hash := tx.Hash()
log.Trace("Removed old queued transaction", "hash", hash)
delete(pool.all, hash)
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(state.GetBalance(addr), gaslimit)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable queued transaction", "hash", hash)
......@@ -1003,12 +1022,10 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB, accounts []common.A
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
gaslimit := pool.blockChain.GasLimit()
func (pool *TxPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {
nonce := state.GetNonce(addr)
nonce := pool.currentState.GetNonce(addr)
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(nonce) {
......@@ -1018,7 +1035,7 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
pool.priced.Removed()
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(state.GetBalance(addr), gaslimit)
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
......@@ -1031,6 +1048,14 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
log.Trace("Demoting pending transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
// If there's a gap in front, warn (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
for _, tx := range list.Cap(0) {
hash := tx.Hash()
log.Error("Demoting invalidated transaction", "hash", hash)
pool.enqueueTx(hash, tx)
}
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
......
此差异已折叠。
......@@ -115,10 +115,6 @@ func (b *EthApiBackend) GetEVM(ctx context.Context, msg core.Message, state *sta
return vm.NewEVM(context, state, b.eth.chainConfig, vmCfg), vmError, nil
}
func (b *EthApiBackend) SubscribeRemovedTxEvent(ch chan<- core.RemovedTransactionEvent) event.Subscription {
return b.eth.BlockChain().SubscribeRemovedTxEvent(ch)
}
func (b *EthApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return b.eth.BlockChain().SubscribeRemovedLogsEvent(ch)
}
......@@ -143,10 +139,6 @@ func (b *EthApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
return b.eth.txPool.AddLocal(signedTx)
}
func (b *EthApiBackend) RemoveTx(txHash common.Hash) {
b.eth.txPool.Remove(txHash)
}
func (b *EthApiBackend) GetPoolTransactions() (types.Transactions, error) {
pending, err := b.eth.txPool.Pending()
if err != nil {
......
......@@ -1265,7 +1265,6 @@ func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, sendArgs SendTxAr
if err != nil {
return common.Hash{}, err
}
s.b.RemoveTx(p.Hash())
if err = s.b.SendTx(ctx, signedTx); err != nil {
return common.Hash{}, err
}
......
......@@ -59,7 +59,6 @@ type Backend interface {
// TxPool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
RemoveTx(txHash common.Hash)
GetPoolTransactions() (types.Transactions, error)
GetPoolTransaction(txHash common.Hash) *types.Transaction
GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error)
......
......@@ -71,7 +71,6 @@ type Work struct {
family *set.Set // family set (used for checking uncle invalidity)
uncles *set.Set // uncle set
tcount int // tx count in cycle
failedTxs types.Transactions
Block *types.Block // the new block
......@@ -477,8 +476,6 @@ func (self *worker) commitNewWork() {
txs := types.NewTransactionsByPriceAndNonce(pending)
work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
self.eth.TxPool().RemoveBatch(work.failedTxs)
// compute uncles for the new block.
var (
uncles []*types.Header
......@@ -563,6 +560,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
log.Trace("Gas limit exceeded for current block", "sender", from)
txs.Pop()
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txs.Shift()
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
txs.Pop()
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
......@@ -570,10 +577,10 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
txs.Shift()
default:
// Pop the current failed transaction without shifting in the next from the account
log.Trace("Transaction failed, will be removed", "hash", tx.Hash(), "err", err)
env.failedTxs = append(env.failedTxs, tx)
txs.Pop()
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txs.Shift()
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册