tx_pool.go 18.0 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17
package core
O
obscuren 已提交
18 19

import (
20
	"errors"
O
obscuren 已提交
21
	"fmt"
O
obscuren 已提交
22
	"math/big"
23
	"sort"
24
	"sync"
25
	"time"
26

O
obscuren 已提交
27
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/core/state"
29
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
30
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/logger/glog"
O
obscuren 已提交
33 34
)

35
var (
36
	// Transaction Pool Errors
O
obscuren 已提交
37
	ErrInvalidSender      = errors.New("Invalid sender")
38
	ErrNonce              = errors.New("Nonce too low")
39
	ErrCheap              = errors.New("Gas price too low for acceptance")
40
	ErrBalance            = errors.New("Insufficient balance")
41
	ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
42
	ErrInsufficientFunds  = errors.New("Insufficient funds for gas * price + value")
O
obscuren 已提交
43
	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
44
	ErrGasLimit           = errors.New("Exceeds block gas limit")
45
	ErrNegativeValue      = errors.New("Negative value")
46
)
Z
zelig 已提交
47

48
const (
49
	maxQueued = 64 // max limit of queued txs per address
50 51
)

52
type stateFn func() (*state.StateDB, error)
53

54 55 56 57 58 59 60
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
O
obscuren 已提交
61
type TxPool struct {
62
	config       *ChainConfig
L
Leif Jurvetson 已提交
63 64
	quit         chan bool // Quitting channel
	currentState stateFn   // The state function which will allow us to do some pre checks
65
	pendingState *state.ManagedState
66
	gasLimit     func() *big.Int // The current gas limit function callback
67
	minGasPrice  *big.Int
68
	eventMux     *event.TypeMux
69
	events       event.Subscription
70 71 72 73
	localTx      *txSet
	mu           sync.RWMutex
	pending      map[common.Hash]*types.Transaction // processable transactions
	queue        map[common.Address]map[common.Hash]*types.Transaction
74 75

	homestead bool
O
obscuren 已提交
76 77
}

78
func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
79
	pool := &TxPool{
80
		config:       config,
O
obscuren 已提交
81
		pending:      make(map[common.Hash]*types.Transaction),
82 83 84 85 86
		queue:        make(map[common.Address]map[common.Hash]*types.Transaction),
		quit:         make(chan bool),
		eventMux:     eventMux,
		currentState: currentStateFn,
		gasLimit:     gasLimitFn,
87
		minGasPrice:  new(big.Int),
88
		pendingState: nil,
89
		localTx:      newTxSet(),
90
		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
O
obscuren 已提交
91
	}
92

93 94 95
	go pool.eventLoop()

	return pool
96 97
}

98
func (pool *TxPool) eventLoop() {
O
obscuren 已提交
99 100 101
	// Track chain events. When a chain events occurs (new chain canon block)
	// we need to know the new state. The new state will help us determine
	// the nonces in the managed state
102
	for ev := range pool.events.Chan() {
103
		switch ev := ev.Data.(type) {
104
		case ChainHeadEvent:
105
			pool.mu.Lock()
106
			if ev.Block != nil && pool.config.IsHomestead(ev.Block.Number()) {
107 108 109
				pool.homestead = true
			}

110
			pool.resetState()
111
			pool.mu.Unlock()
112
		case GasPriceChanged:
113
			pool.mu.Lock()
114
			pool.minGasPrice = ev.Price
115 116 117
			pool.mu.Unlock()
		case RemovedTransactionEvent:
			pool.AddTransactions(ev.Txs)
118
		}
119
	}
O
obscuren 已提交
120 121
}

122
func (pool *TxPool) resetState() {
123 124 125 126 127 128 129 130 131 132 133
	currentState, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
	managedState := state.ManageState(currentState)
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get managed state: %v", err)
		return
	}
	pool.pendingState = managedState
134 135 136 137 138 139 140 141 142 143 144 145 146

	// validate the pool of pending transactions, this will remove
	// any transactions that have been included in the block or
	// have been invalidated because of another transaction (e.g.
	// higher gas price)
	pool.validatePool()

	// Loop over the pending transactions and base the nonce of the new
	// pending transaction set.
	for _, tx := range pool.pending {
		if addr, err := tx.From(); err == nil {
			// Set the nonce. Transaction nonce can never be lower
			// than the state nonce; validatePool took care of that.
147 148
			if pool.pendingState.GetNonce(addr) <= tx.Nonce() {
				pool.pendingState.SetNonce(addr, tx.Nonce()+1)
149
			}
150 151 152 153 154 155 156
		}
	}
	// Check the queue and move transactions over to the pending if possible
	// or remove those that have become invalid
	pool.checkQueue()
}

157 158 159
func (pool *TxPool) Stop() {
	close(pool.quit)
	pool.events.Unsubscribe()
160
	glog.V(logger.Info).Infoln("Transaction pool stopped")
161 162 163 164 165 166
}

func (pool *TxPool) State() *state.ManagedState {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

167
	return pool.pendingState
168 169
}

170 171 172 173 174 175 176 177 178 179 180
func (pool *TxPool) Stats() (pending int, queued int) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

	pending = len(pool.pending)
	for _, txs := range pool.queue {
		queued += len(txs)
	}
	return
}

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and nonce.
func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

	// Retrieve all the pending transactions and sort by account and by nonce
	pending := make(map[common.Address]map[uint64][]*types.Transaction)
	for _, tx := range pool.pending {
		account, _ := tx.From()

		owned, ok := pending[account]
		if !ok {
			owned = make(map[uint64][]*types.Transaction)
			pending[account] = owned
		}
		owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
	}
	// Retrieve all the queued transactions and sort by account and by nonce
	queued := make(map[common.Address]map[uint64][]*types.Transaction)
	for account, txs := range pool.queue {
		owned := make(map[uint64][]*types.Transaction)
		for _, tx := range txs {
			owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
		}
		queued[account] = owned
	}
	return pending, queued
}

211 212 213 214 215 216 217 218
// SetLocal marks a transaction as local, skipping gas price
//  check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
	pool.localTx.add(tx.Hash())
}

219 220 221
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
222
	local := pool.localTx.contains(tx.Hash())
223
	// Drop transactions under our own minimal accepted gas price
224
	if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
225 226 227
		return ErrCheap
	}

228 229 230 231 232
	currentState, err := pool.currentState()
	if err != nil {
		return err
	}

233
	from, err := tx.From()
234
	if err != nil {
F
Felix Lange 已提交
235
		return ErrInvalidSender
O
obscuren 已提交
236
	}
O
obscuren 已提交
237

238
	// Make sure the account exist. Non existent accounts
O
obscuren 已提交
239
	// haven't got funds and well therefor never pass.
240
	if !currentState.HasAccount(from) {
O
obscuren 已提交
241
		return ErrNonExistentAccount
242
	}
O
obscuren 已提交
243

244
	// Last but not least check for nonce errors
245
	if currentState.GetNonce(from) > tx.Nonce() {
246 247 248
		return ErrNonce
	}

O
obscuren 已提交
249 250
	// Check the transaction doesn't exceed the current
	// block limit gas.
251
	if pool.gasLimit().Cmp(tx.Gas()) < 0 {
252 253 254
		return ErrGasLimit
	}

O
obscuren 已提交
255 256 257
	// Transactions can't be negative. This may never happen
	// using RLP decoded transactions but may occur if you create
	// a transaction using the RPC for example.
258
	if tx.Value().Cmp(common.Big0) < 0 {
259 260 261
		return ErrNegativeValue
	}

O
obscuren 已提交
262 263
	// Transactor should have enough funds to cover the costs
	// cost == V + GP * GL
264
	if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
O
obscuren 已提交
265 266 267
		return ErrInsufficientFunds
	}

268
	intrGas := IntrinsicGas(tx.Data(), MessageCreatesContract(tx), pool.homestead)
269
	if tx.Gas().Cmp(intrGas) < 0 {
O
obscuren 已提交
270 271 272 273
		return ErrIntrinsicGas
	}

	return nil
O
obscuren 已提交
274 275
}

276
// validate and queue transactions.
277
func (self *TxPool) add(tx *types.Transaction) error {
278
	hash := tx.Hash()
279

O
obscuren 已提交
280
	if self.pending[hash] != nil {
281
		return fmt.Errorf("Known transaction (%x)", hash[:4])
282
	}
283
	err := self.validateTx(tx)
284 285 286
	if err != nil {
		return err
	}
287
	self.queueTx(hash, tx)
O
obscuren 已提交
288 289

	if glog.V(logger.Debug) {
290 291 292 293 294 295 296 297 298 299 300
		var toname string
		if to := tx.To(); to != nil {
			toname = common.Bytes2Hex(to[:4])
		} else {
			toname = "[NEW_CONTRACT]"
		}
		// we can ignore the error here because From is
		// verified in ValidateTransaction.
		f, _ := tx.From()
		from := common.Bytes2Hex(f[:4])
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
O
obscuren 已提交
301
	}
302 303 304 305

	return nil
}

306 307 308 309 310 311 312 313 314 315 316
// queueTx will queue an unknown transaction
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
	from, _ := tx.From() // already validated
	if self.queue[from] == nil {
		self.queue[from] = make(map[common.Hash]*types.Transaction)
	}
	self.queue[from][hash] = tx
}

// addTx will add a transaction to the pending (processable queue) list of transactions
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
317 318 319 320 321
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}

322 323 324 325 326
	if _, ok := pool.pending[hash]; !ok {
		pool.pending[hash] = tx

		// Increment the nonce on the pending state. This can only happen if
		// the nonce is +1 to the previous one.
327
		pool.pendingState.SetNonce(addr, tx.Nonce()+1)
328 329 330 331 332 333 334
		// Notify the subscribers. This event is posted in a goroutine
		// because it's possible that somewhere during the post "Remove transaction"
		// gets called which will then wait for the global tx pool lock and deadlock.
		go pool.eventMux.Post(TxPreEvent{tx})
	}
}

335
// Add queues a single transaction in the pool if it is valid.
336
func (self *TxPool) Add(tx *types.Transaction) error {
337 338
	self.mu.Lock()
	defer self.mu.Unlock()
339

340 341
	if err := self.add(tx); err != nil {
		return err
342
	}
343 344
	self.checkQueue()
	return nil
345
}
346

347
// AddTransactions attempts to queue all valid transactions in txs.
Z
zelig 已提交
348
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
349 350 351
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
352
	for _, tx := range txs {
353
		if err := self.add(tx); err != nil {
354
			glog.V(logger.Debug).Infoln("tx error:", err)
Z
zelig 已提交
355
		} else {
356
			h := tx.Hash()
O
obscuren 已提交
357
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
Z
zelig 已提交
358 359
		}
	}
360

L
Leif Jurvetson 已提交
361
	// check and validate the queue
362
	self.checkQueue()
Z
zelig 已提交
363 364
}

365 366
// GetTransaction returns a transaction if it is contained in the pool
// and nil otherwise.
367 368
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
	// check the txs first
O
obscuren 已提交
369
	if tx, ok := tp.pending[hash]; ok {
370 371 372 373
		return tx
	}
	// check queue
	for _, txs := range tp.queue {
374 375
		if tx, ok := txs[hash]; ok {
			return tx
376 377 378 379 380
		}
	}
	return nil
}

381
// GetTransactions returns all currently processable transactions.
382
// The returned slice may be modified by the caller.
O
Merge  
obscuren 已提交
383
func (self *TxPool) GetTransactions() (txs types.Transactions) {
384 385 386 387 388 389 390
	self.mu.Lock()
	defer self.mu.Unlock()

	// check queue first
	self.checkQueue()
	// invalidate any txs
	self.validatePool()
391

O
obscuren 已提交
392
	txs = make(types.Transactions, len(self.pending))
O
obscuren 已提交
393
	i := 0
O
obscuren 已提交
394
	for _, tx := range self.pending {
O
Merge  
obscuren 已提交
395
		txs[i] = tx
O
obscuren 已提交
396
		i++
O
Merge  
obscuren 已提交
397
	}
398
	return txs
399 400
}

401
// GetQueuedTransactions returns all non-processable transactions.
402 403 404 405
func (self *TxPool) GetQueuedTransactions() types.Transactions {
	self.mu.RLock()
	defer self.mu.RUnlock()

406 407 408 409 410
	var ret types.Transactions
	for _, txs := range self.queue {
		for _, tx := range txs {
			ret = append(ret, tx)
		}
411
	}
412
	sort.Sort(types.TxByNonce(ret))
413
	return ret
414 415
}

416
// RemoveTransactions removes all given transactions from the pool.
417
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
418 419
	self.mu.Lock()
	defer self.mu.Unlock()
420
	for _, tx := range txs {
421
		self.RemoveTx(tx.Hash())
422 423 424
	}
}

425 426
// RemoveTx removes the transaction with the given hash from the pool.
func (pool *TxPool) RemoveTx(hash common.Hash) {
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
	// delete from pending pool
	delete(pool.pending, hash)
	// delete from queue
	for address, txs := range pool.queue {
		if _, ok := txs[hash]; ok {
			if len(txs) == 1 {
				// if only one tx, remove entire address entry.
				delete(pool.queue, address)
			} else {
				delete(txs, hash)
			}
			break
		}
	}
}

443
// checkQueue moves transactions that have become processable to main pool.
444
func (pool *TxPool) checkQueue() {
445 446 447 448
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}
449

450
	var promote txQueue
451
	for address, txs := range pool.queue {
452 453 454 455 456
		currentState, err := pool.currentState()
		if err != nil {
			glog.Errorf("could not get current state: %v", err)
			return
		}
457 458 459 460 461 462 463
		balance := currentState.GetBalance(address)

		var (
			guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
			trueNonce    = currentState.GetNonce(address)      // nonce known by the last state
		)
		promote = promote[:0]
464
		for hash, tx := range txs {
465 466 467 468 469
			// Drop processed or out of fund transactions
			if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
				if glog.V(logger.Core) {
					glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
				}
470
				delete(txs, hash)
471 472
				continue
			}
473 474 475 476 477 478 479 480 481 482
			// Collect the remaining transactions for the next pass.
			promote = append(promote, txQueueEntry{hash, address, tx})
		}
		// Find the next consecutive nonce range starting at the current account nonce,
		// pushing the guessed nonce forward if we add consecutive transactions.
		sort.Sort(promote)
		for i, entry := range promote {
			// If we reached a gap in the nonces, enforce transaction limit and stop
			if entry.Nonce() > guessedNonce {
				if len(promote)-i > maxQueued {
483
					if glog.V(logger.Debug) {
484
						glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
485
					}
486 487
					for _, drop := range promote[i+maxQueued:] {
						delete(txs, drop.hash)
488 489
					}
				}
490 491
				break
			}
492 493 494 495 496 497 498
			// Otherwise promote the transaction and move the guess nonce if needed
			pool.addTx(entry.hash, address, entry.Transaction)
			delete(txs, entry.hash)

			if entry.Nonce() == guessedNonce {
				guessedNonce++
			}
499
		}
500 501
		// Delete the entire queue entry if it became empty.
		if len(txs) == 0 {
502 503 504 505
			delete(pool.queue, address)
		}
	}
}
506

507
// validatePool removes invalid and processed transactions from the main pool.
508 509 510 511
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
// sequent (Still valid) transactions are moved back into the future queue. This
// is important to prevent a drained account from DOSing the network with non
// executable transactions.
512
func (pool *TxPool) validatePool() {
513 514 515 516 517
	state, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
518 519 520 521 522
	balanceCache := make(map[common.Address]*big.Int)

	// Clean up the pending pool, accumulating invalid nonces
	gaps := make(map[common.Address]uint64)

O
obscuren 已提交
523
	for hash, tx := range pool.pending {
524 525 526 527 528 529 530 531 532 533
		sender, _ := tx.From() // err already checked

		// Perform light nonce and balance validation
		balance := balanceCache[sender]
		if balance == nil {
			balance = state.GetBalance(sender)
			balanceCache[sender] = balance
		}
		if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
			// Remove an already past it invalidated transaction
O
obscuren 已提交
534
			if glog.V(logger.Core) {
535
				glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
536
			}
O
obscuren 已提交
537
			delete(pool.pending, hash)
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557

			// Track the smallest invalid nonce to postpone subsequent transactions
			if !past {
				if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
					gaps[sender] = tx.Nonce()
				}
			}
		}
	}
	// Move all transactions after a gap back to the future queue
	if len(gaps) > 0 {
		for hash, tx := range pool.pending {
			sender, _ := tx.From()
			if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
				if glog.V(logger.Core) {
					glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
				}
				pool.queueTx(hash, tx)
				delete(pool.pending, hash)
			}
558 559 560
		}
	}
}
561 562 563 564 565

type txQueue []txQueueEntry

type txQueueEntry struct {
	hash common.Hash
566
	addr common.Address
567 568 569 570 571
	*types.Transaction
}

func (q txQueue) Len() int           { return len(q) }
func (q txQueue) Swap(i, j int)      { q[i], q[j] = q[j], q[i] }
572
func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618

// txSet represents a set of transaction hashes in which entries
//  are automatically dropped after txSetDuration time
type txSet struct {
	txMap          map[common.Hash]struct{}
	txOrd          map[uint64]txOrdType
	addPtr, delPtr uint64
}

const txSetDuration = time.Hour * 2

// txOrdType represents an entry in the time-ordered list of transaction hashes
type txOrdType struct {
	hash common.Hash
	time time.Time
}

// newTxSet creates a new transaction set
func newTxSet() *txSet {
	return &txSet{
		txMap: make(map[common.Hash]struct{}),
		txOrd: make(map[uint64]txOrdType),
	}
}

// contains returns true if the set contains the given transaction hash
// (not thread safe, should be called from a locked environment)
func (self *txSet) contains(hash common.Hash) bool {
	_, ok := self.txMap[hash]
	return ok
}

// add adds a transaction hash to the set, then removes entries older than txSetDuration
// (not thread safe, should be called from a locked environment)
func (self *txSet) add(hash common.Hash) {
	self.txMap[hash] = struct{}{}
	now := time.Now()
	self.txOrd[self.addPtr] = txOrdType{hash: hash, time: now}
	self.addPtr++
	delBefore := now.Add(-txSetDuration)
	for self.delPtr < self.addPtr && self.txOrd[self.delPtr].time.Before(delBefore) {
		delete(self.txMap, self.txOrd[self.delPtr].hash)
		delete(self.txOrd, self.delPtr)
		self.delPtr++
	}
}