tx_pool.go 18.1 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17
package core
O
obscuren 已提交
18 19

import (
20
	"errors"
O
obscuren 已提交
21
	"fmt"
O
obscuren 已提交
22
	"math/big"
23
	"sort"
24
	"sync"
25
	"time"
26

O
obscuren 已提交
27
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/core/state"
29
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
30
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/logger/glog"
O
obscuren 已提交
33 34
)

35
var (
36
	// Transaction Pool Errors
O
obscuren 已提交
37
	ErrInvalidSender      = errors.New("Invalid sender")
38
	ErrNonce              = errors.New("Nonce too low")
39
	ErrCheap              = errors.New("Gas price too low for acceptance")
40
	ErrBalance            = errors.New("Insufficient balance")
41
	ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
42
	ErrInsufficientFunds  = errors.New("Insufficient funds for gas * price + value")
O
obscuren 已提交
43
	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
44
	ErrGasLimit           = errors.New("Exceeds block gas limit")
45
	ErrNegativeValue      = errors.New("Negative value")
46
)
Z
zelig 已提交
47

48
const (
49
	maxQueued = 64 // max limit of queued txs per address
50 51
)

52
type stateFn func() (*state.StateDB, error)
53

54 55 56 57 58 59 60
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
O
obscuren 已提交
61
type TxPool struct {
62
	config       *ChainConfig
63
	currentState stateFn // The state function which will allow us to do some pre checks
64
	pendingState *state.ManagedState
65
	gasLimit     func() *big.Int // The current gas limit function callback
66
	minGasPrice  *big.Int
67
	eventMux     *event.TypeMux
68
	events       event.Subscription
69 70 71 72
	localTx      *txSet
	mu           sync.RWMutex
	pending      map[common.Hash]*types.Transaction // processable transactions
	queue        map[common.Address]map[common.Hash]*types.Transaction
73

74 75
	wg sync.WaitGroup // for shutdown sync

76
	homestead bool
O
obscuren 已提交
77 78
}

79
func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
80
	pool := &TxPool{
81
		config:       config,
O
obscuren 已提交
82
		pending:      make(map[common.Hash]*types.Transaction),
83 84 85 86
		queue:        make(map[common.Address]map[common.Hash]*types.Transaction),
		eventMux:     eventMux,
		currentState: currentStateFn,
		gasLimit:     gasLimitFn,
87
		minGasPrice:  new(big.Int),
88
		pendingState: nil,
89
		localTx:      newTxSet(),
90
		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
O
obscuren 已提交
91
	}
92

93
	pool.wg.Add(1)
94 95 96
	go pool.eventLoop()

	return pool
97 98
}

99
func (pool *TxPool) eventLoop() {
100 101
	defer pool.wg.Done()

O
obscuren 已提交
102 103 104
	// Track chain events. When a chain events occurs (new chain canon block)
	// we need to know the new state. The new state will help us determine
	// the nonces in the managed state
105
	for ev := range pool.events.Chan() {
106
		switch ev := ev.Data.(type) {
107
		case ChainHeadEvent:
108
			pool.mu.Lock()
109
			if ev.Block != nil && pool.config.IsHomestead(ev.Block.Number()) {
110 111 112
				pool.homestead = true
			}

113
			pool.resetState()
114
			pool.mu.Unlock()
115
		case GasPriceChanged:
116
			pool.mu.Lock()
117
			pool.minGasPrice = ev.Price
118 119 120
			pool.mu.Unlock()
		case RemovedTransactionEvent:
			pool.AddTransactions(ev.Txs)
121
		}
122
	}
O
obscuren 已提交
123 124
}

125
func (pool *TxPool) resetState() {
126 127 128 129 130 131 132 133 134 135 136
	currentState, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
	managedState := state.ManageState(currentState)
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get managed state: %v", err)
		return
	}
	pool.pendingState = managedState
137 138 139 140 141 142 143 144 145 146 147 148 149

	// validate the pool of pending transactions, this will remove
	// any transactions that have been included in the block or
	// have been invalidated because of another transaction (e.g.
	// higher gas price)
	pool.validatePool()

	// Loop over the pending transactions and base the nonce of the new
	// pending transaction set.
	for _, tx := range pool.pending {
		if addr, err := tx.From(); err == nil {
			// Set the nonce. Transaction nonce can never be lower
			// than the state nonce; validatePool took care of that.
150 151
			if pool.pendingState.GetNonce(addr) <= tx.Nonce() {
				pool.pendingState.SetNonce(addr, tx.Nonce()+1)
152
			}
153 154 155 156 157 158 159
		}
	}
	// Check the queue and move transactions over to the pending if possible
	// or remove those that have become invalid
	pool.checkQueue()
}

160 161
func (pool *TxPool) Stop() {
	pool.events.Unsubscribe()
162
	pool.wg.Wait()
163
	glog.V(logger.Info).Infoln("Transaction pool stopped")
164 165 166 167 168 169
}

func (pool *TxPool) State() *state.ManagedState {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

170
	return pool.pendingState
171 172
}

173 174 175 176 177 178 179 180 181 182 183
func (pool *TxPool) Stats() (pending int, queued int) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

	pending = len(pool.pending)
	for _, txs := range pool.queue {
		queued += len(txs)
	}
	return
}

184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and nonce.
func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

	// Retrieve all the pending transactions and sort by account and by nonce
	pending := make(map[common.Address]map[uint64][]*types.Transaction)
	for _, tx := range pool.pending {
		account, _ := tx.From()

		owned, ok := pending[account]
		if !ok {
			owned = make(map[uint64][]*types.Transaction)
			pending[account] = owned
		}
		owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
	}
	// Retrieve all the queued transactions and sort by account and by nonce
	queued := make(map[common.Address]map[uint64][]*types.Transaction)
	for account, txs := range pool.queue {
		owned := make(map[uint64][]*types.Transaction)
		for _, tx := range txs {
			owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
		}
		queued[account] = owned
	}
	return pending, queued
}

214 215 216 217 218 219 220 221
// SetLocal marks a transaction as local, skipping gas price
//  check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
	pool.localTx.add(tx.Hash())
}

222 223 224
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
225
	local := pool.localTx.contains(tx.Hash())
226
	// Drop transactions under our own minimal accepted gas price
227
	if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
228 229 230
		return ErrCheap
	}

231 232 233 234 235
	currentState, err := pool.currentState()
	if err != nil {
		return err
	}

236
	from, err := tx.From()
237
	if err != nil {
F
Felix Lange 已提交
238
		return ErrInvalidSender
O
obscuren 已提交
239
	}
O
obscuren 已提交
240

241
	// Make sure the account exist. Non existent accounts
O
obscuren 已提交
242
	// haven't got funds and well therefor never pass.
243
	if !currentState.HasAccount(from) {
O
obscuren 已提交
244
		return ErrNonExistentAccount
245
	}
O
obscuren 已提交
246

247
	// Last but not least check for nonce errors
248
	if currentState.GetNonce(from) > tx.Nonce() {
249 250 251
		return ErrNonce
	}

O
obscuren 已提交
252 253
	// Check the transaction doesn't exceed the current
	// block limit gas.
254
	if pool.gasLimit().Cmp(tx.Gas()) < 0 {
255 256 257
		return ErrGasLimit
	}

O
obscuren 已提交
258 259 260
	// Transactions can't be negative. This may never happen
	// using RLP decoded transactions but may occur if you create
	// a transaction using the RPC for example.
261
	if tx.Value().Cmp(common.Big0) < 0 {
262 263 264
		return ErrNegativeValue
	}

O
obscuren 已提交
265 266
	// Transactor should have enough funds to cover the costs
	// cost == V + GP * GL
267
	if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
O
obscuren 已提交
268 269 270
		return ErrInsufficientFunds
	}

271
	intrGas := IntrinsicGas(tx.Data(), MessageCreatesContract(tx), pool.homestead)
272
	if tx.Gas().Cmp(intrGas) < 0 {
O
obscuren 已提交
273 274 275 276
		return ErrIntrinsicGas
	}

	return nil
O
obscuren 已提交
277 278
}

279
// validate and queue transactions.
280
func (self *TxPool) add(tx *types.Transaction) error {
281
	hash := tx.Hash()
282

O
obscuren 已提交
283
	if self.pending[hash] != nil {
284
		return fmt.Errorf("Known transaction (%x)", hash[:4])
285
	}
286
	err := self.validateTx(tx)
287 288 289
	if err != nil {
		return err
	}
290
	self.queueTx(hash, tx)
O
obscuren 已提交
291 292

	if glog.V(logger.Debug) {
293 294 295 296 297 298 299 300 301 302 303
		var toname string
		if to := tx.To(); to != nil {
			toname = common.Bytes2Hex(to[:4])
		} else {
			toname = "[NEW_CONTRACT]"
		}
		// we can ignore the error here because From is
		// verified in ValidateTransaction.
		f, _ := tx.From()
		from := common.Bytes2Hex(f[:4])
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
O
obscuren 已提交
304
	}
305 306 307 308

	return nil
}

309 310 311 312 313 314 315 316 317 318 319
// queueTx will queue an unknown transaction
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
	from, _ := tx.From() // already validated
	if self.queue[from] == nil {
		self.queue[from] = make(map[common.Hash]*types.Transaction)
	}
	self.queue[from][hash] = tx
}

// addTx will add a transaction to the pending (processable queue) list of transactions
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
320 321 322 323 324
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}

325 326 327 328 329
	if _, ok := pool.pending[hash]; !ok {
		pool.pending[hash] = tx

		// Increment the nonce on the pending state. This can only happen if
		// the nonce is +1 to the previous one.
330
		pool.pendingState.SetNonce(addr, tx.Nonce()+1)
331 332 333 334 335 336 337
		// Notify the subscribers. This event is posted in a goroutine
		// because it's possible that somewhere during the post "Remove transaction"
		// gets called which will then wait for the global tx pool lock and deadlock.
		go pool.eventMux.Post(TxPreEvent{tx})
	}
}

338
// Add queues a single transaction in the pool if it is valid.
339
func (self *TxPool) Add(tx *types.Transaction) error {
340 341
	self.mu.Lock()
	defer self.mu.Unlock()
342

343 344
	if err := self.add(tx); err != nil {
		return err
345
	}
346 347
	self.checkQueue()
	return nil
348
}
349

350
// AddTransactions attempts to queue all valid transactions in txs.
Z
zelig 已提交
351
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
352 353 354
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
355
	for _, tx := range txs {
356
		if err := self.add(tx); err != nil {
357
			glog.V(logger.Debug).Infoln("tx error:", err)
Z
zelig 已提交
358
		} else {
359
			h := tx.Hash()
O
obscuren 已提交
360
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
Z
zelig 已提交
361 362
		}
	}
363

L
Leif Jurvetson 已提交
364
	// check and validate the queue
365
	self.checkQueue()
Z
zelig 已提交
366 367
}

368 369
// GetTransaction returns a transaction if it is contained in the pool
// and nil otherwise.
370
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
371 372 373
	tp.mu.RLock()
	defer tp.mu.RUnlock()

374
	// check the txs first
O
obscuren 已提交
375
	if tx, ok := tp.pending[hash]; ok {
376 377 378 379
		return tx
	}
	// check queue
	for _, txs := range tp.queue {
380 381
		if tx, ok := txs[hash]; ok {
			return tx
382 383 384 385 386
		}
	}
	return nil
}

387
// GetTransactions returns all currently processable transactions.
388
// The returned slice may be modified by the caller.
O
Merge  
obscuren 已提交
389
func (self *TxPool) GetTransactions() (txs types.Transactions) {
390 391 392 393 394 395 396
	self.mu.Lock()
	defer self.mu.Unlock()

	// check queue first
	self.checkQueue()
	// invalidate any txs
	self.validatePool()
397

O
obscuren 已提交
398
	txs = make(types.Transactions, len(self.pending))
O
obscuren 已提交
399
	i := 0
O
obscuren 已提交
400
	for _, tx := range self.pending {
O
Merge  
obscuren 已提交
401
		txs[i] = tx
O
obscuren 已提交
402
		i++
O
Merge  
obscuren 已提交
403
	}
404
	return txs
405 406
}

407
// GetQueuedTransactions returns all non-processable transactions.
408 409 410 411
func (self *TxPool) GetQueuedTransactions() types.Transactions {
	self.mu.RLock()
	defer self.mu.RUnlock()

412 413 414 415 416
	var ret types.Transactions
	for _, txs := range self.queue {
		for _, tx := range txs {
			ret = append(ret, tx)
		}
417
	}
418
	sort.Sort(types.TxByNonce(ret))
419
	return ret
420 421
}

422
// RemoveTransactions removes all given transactions from the pool.
423
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
424 425
	self.mu.Lock()
	defer self.mu.Unlock()
426
	for _, tx := range txs {
427
		self.removeTx(tx.Hash())
428 429 430
	}
}

431 432
// RemoveTx removes the transaction with the given hash from the pool.
func (pool *TxPool) RemoveTx(hash common.Hash) {
433 434 435 436 437 438
	pool.mu.Lock()
	defer pool.mu.Unlock()
	pool.removeTx(hash)
}

func (pool *TxPool) removeTx(hash common.Hash) {
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
	// delete from pending pool
	delete(pool.pending, hash)
	// delete from queue
	for address, txs := range pool.queue {
		if _, ok := txs[hash]; ok {
			if len(txs) == 1 {
				// if only one tx, remove entire address entry.
				delete(pool.queue, address)
			} else {
				delete(txs, hash)
			}
			break
		}
	}
}

455
// checkQueue moves transactions that have become processable to main pool.
456
func (pool *TxPool) checkQueue() {
457 458 459 460
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}
461

462
	var promote txQueue
463
	for address, txs := range pool.queue {
464 465 466 467 468
		currentState, err := pool.currentState()
		if err != nil {
			glog.Errorf("could not get current state: %v", err)
			return
		}
469 470 471 472 473 474 475
		balance := currentState.GetBalance(address)

		var (
			guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
			trueNonce    = currentState.GetNonce(address)      // nonce known by the last state
		)
		promote = promote[:0]
476
		for hash, tx := range txs {
477 478 479 480 481
			// Drop processed or out of fund transactions
			if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
				if glog.V(logger.Core) {
					glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
				}
482
				delete(txs, hash)
483 484
				continue
			}
485 486 487 488 489 490 491 492 493 494
			// Collect the remaining transactions for the next pass.
			promote = append(promote, txQueueEntry{hash, address, tx})
		}
		// Find the next consecutive nonce range starting at the current account nonce,
		// pushing the guessed nonce forward if we add consecutive transactions.
		sort.Sort(promote)
		for i, entry := range promote {
			// If we reached a gap in the nonces, enforce transaction limit and stop
			if entry.Nonce() > guessedNonce {
				if len(promote)-i > maxQueued {
495
					if glog.V(logger.Debug) {
496
						glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
497
					}
498 499
					for _, drop := range promote[i+maxQueued:] {
						delete(txs, drop.hash)
500 501
					}
				}
502 503
				break
			}
504 505 506 507 508 509 510
			// Otherwise promote the transaction and move the guess nonce if needed
			pool.addTx(entry.hash, address, entry.Transaction)
			delete(txs, entry.hash)

			if entry.Nonce() == guessedNonce {
				guessedNonce++
			}
511
		}
512 513
		// Delete the entire queue entry if it became empty.
		if len(txs) == 0 {
514 515 516 517
			delete(pool.queue, address)
		}
	}
}
518

519
// validatePool removes invalid and processed transactions from the main pool.
520 521 522 523
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
// sequent (Still valid) transactions are moved back into the future queue. This
// is important to prevent a drained account from DOSing the network with non
// executable transactions.
524
func (pool *TxPool) validatePool() {
525 526 527 528 529
	state, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
530 531 532 533 534
	balanceCache := make(map[common.Address]*big.Int)

	// Clean up the pending pool, accumulating invalid nonces
	gaps := make(map[common.Address]uint64)

O
obscuren 已提交
535
	for hash, tx := range pool.pending {
536 537 538 539 540 541 542 543 544 545
		sender, _ := tx.From() // err already checked

		// Perform light nonce and balance validation
		balance := balanceCache[sender]
		if balance == nil {
			balance = state.GetBalance(sender)
			balanceCache[sender] = balance
		}
		if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
			// Remove an already past it invalidated transaction
O
obscuren 已提交
546
			if glog.V(logger.Core) {
547
				glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
548
			}
O
obscuren 已提交
549
			delete(pool.pending, hash)
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569

			// Track the smallest invalid nonce to postpone subsequent transactions
			if !past {
				if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
					gaps[sender] = tx.Nonce()
				}
			}
		}
	}
	// Move all transactions after a gap back to the future queue
	if len(gaps) > 0 {
		for hash, tx := range pool.pending {
			sender, _ := tx.From()
			if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
				if glog.V(logger.Core) {
					glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
				}
				pool.queueTx(hash, tx)
				delete(pool.pending, hash)
			}
570 571 572
		}
	}
}
573 574 575 576 577

type txQueue []txQueueEntry

type txQueueEntry struct {
	hash common.Hash
578
	addr common.Address
579 580 581 582 583
	*types.Transaction
}

func (q txQueue) Len() int           { return len(q) }
func (q txQueue) Swap(i, j int)      { q[i], q[j] = q[j], q[i] }
584
func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630

// txSet represents a set of transaction hashes in which entries
//  are automatically dropped after txSetDuration time
type txSet struct {
	txMap          map[common.Hash]struct{}
	txOrd          map[uint64]txOrdType
	addPtr, delPtr uint64
}

const txSetDuration = time.Hour * 2

// txOrdType represents an entry in the time-ordered list of transaction hashes
type txOrdType struct {
	hash common.Hash
	time time.Time
}

// newTxSet creates a new transaction set
func newTxSet() *txSet {
	return &txSet{
		txMap: make(map[common.Hash]struct{}),
		txOrd: make(map[uint64]txOrdType),
	}
}

// contains returns true if the set contains the given transaction hash
// (not thread safe, should be called from a locked environment)
func (self *txSet) contains(hash common.Hash) bool {
	_, ok := self.txMap[hash]
	return ok
}

// add adds a transaction hash to the set, then removes entries older than txSetDuration
// (not thread safe, should be called from a locked environment)
func (self *txSet) add(hash common.Hash) {
	self.txMap[hash] = struct{}{}
	now := time.Now()
	self.txOrd[self.addPtr] = txOrdType{hash: hash, time: now}
	self.addPtr++
	delBefore := now.Add(-txSetDuration)
	for self.delPtr < self.addPtr && self.txOrd[self.delPtr].time.Before(delBefore) {
		delete(self.txMap, self.txOrd[self.delPtr].hash)
		delete(self.txOrd, self.delPtr)
		self.delPtr++
	}
}