tx_pool.go 17.9 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17
package core
O
obscuren 已提交
18 19

import (
20
	"errors"
O
obscuren 已提交
21
	"fmt"
O
obscuren 已提交
22
	"math/big"
23
	"sort"
24
	"sync"
25
	"time"
26

O
obscuren 已提交
27
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/core/state"
29
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
30
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/logger/glog"
33
	"github.com/ethereum/go-ethereum/params"
O
obscuren 已提交
34 35
)

36
var (
37
	// Transaction Pool Errors
O
obscuren 已提交
38
	ErrInvalidSender      = errors.New("Invalid sender")
39
	ErrNonce              = errors.New("Nonce too low")
40
	ErrCheap              = errors.New("Gas price too low for acceptance")
41
	ErrBalance            = errors.New("Insufficient balance")
42
	ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
43
	ErrInsufficientFunds  = errors.New("Insufficient funds for gas * price + value")
O
obscuren 已提交
44
	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
45
	ErrGasLimit           = errors.New("Exceeds block gas limit")
46
	ErrNegativeValue      = errors.New("Negative value")
47
)
Z
zelig 已提交
48

49
const (
50
	maxQueued = 64 // max limit of queued txs per address
51 52
)

53
type stateFn func() (*state.StateDB, error)
54

55 56 57 58 59 60 61
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
O
obscuren 已提交
62
type TxPool struct {
63 64
	quit         chan bool // Quiting channel
	currentState stateFn   // The state function which will allow us to do some pre checkes
65
	pendingState *state.ManagedState
66
	gasLimit     func() *big.Int // The current gas limit function callback
67
	minGasPrice  *big.Int
68
	eventMux     *event.TypeMux
69
	events       event.Subscription
70 71 72 73
	localTx      *txSet
	mu           sync.RWMutex
	pending      map[common.Hash]*types.Transaction // processable transactions
	queue        map[common.Address]map[common.Hash]*types.Transaction
74 75

	homestead bool
O
obscuren 已提交
76 77
}

78
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
79
	pool := &TxPool{
O
obscuren 已提交
80
		pending:      make(map[common.Hash]*types.Transaction),
81 82 83 84 85
		queue:        make(map[common.Address]map[common.Hash]*types.Transaction),
		quit:         make(chan bool),
		eventMux:     eventMux,
		currentState: currentStateFn,
		gasLimit:     gasLimitFn,
86
		minGasPrice:  new(big.Int),
87
		pendingState: nil,
88
		localTx:      newTxSet(),
89
		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
O
obscuren 已提交
90
	}
91

92 93 94
	go pool.eventLoop()

	return pool
95 96
}

97
func (pool *TxPool) eventLoop() {
O
obscuren 已提交
98 99 100
	// Track chain events. When a chain events occurs (new chain canon block)
	// we need to know the new state. The new state will help us determine
	// the nonces in the managed state
101
	for ev := range pool.events.Chan() {
102
		switch ev := ev.Data.(type) {
103
		case ChainHeadEvent:
104
			pool.mu.Lock()
105 106 107 108
			if ev.Block != nil && params.IsHomestead(ev.Block.Number()) {
				pool.homestead = true
			}

109
			pool.resetState()
110
			pool.mu.Unlock()
111
		case GasPriceChanged:
112
			pool.mu.Lock()
113
			pool.minGasPrice = ev.Price
114 115 116
			pool.mu.Unlock()
		case RemovedTransactionEvent:
			pool.AddTransactions(ev.Txs)
117
		}
118
	}
O
obscuren 已提交
119 120
}

121
func (pool *TxPool) resetState() {
122 123 124 125 126 127 128 129 130 131 132
	currentState, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
	managedState := state.ManageState(currentState)
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get managed state: %v", err)
		return
	}
	pool.pendingState = managedState
133 134 135 136 137 138 139 140 141 142 143 144 145

	// validate the pool of pending transactions, this will remove
	// any transactions that have been included in the block or
	// have been invalidated because of another transaction (e.g.
	// higher gas price)
	pool.validatePool()

	// Loop over the pending transactions and base the nonce of the new
	// pending transaction set.
	for _, tx := range pool.pending {
		if addr, err := tx.From(); err == nil {
			// Set the nonce. Transaction nonce can never be lower
			// than the state nonce; validatePool took care of that.
146 147
			if pool.pendingState.GetNonce(addr) <= tx.Nonce() {
				pool.pendingState.SetNonce(addr, tx.Nonce()+1)
148
			}
149 150 151 152 153 154 155
		}
	}
	// Check the queue and move transactions over to the pending if possible
	// or remove those that have become invalid
	pool.checkQueue()
}

156 157 158
func (pool *TxPool) Stop() {
	close(pool.quit)
	pool.events.Unsubscribe()
159
	glog.V(logger.Info).Infoln("Transaction pool stopped")
160 161 162 163 164 165
}

func (pool *TxPool) State() *state.ManagedState {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

166
	return pool.pendingState
167 168
}

169 170 171 172 173 174 175 176 177 178 179
func (pool *TxPool) Stats() (pending int, queued int) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

	pending = len(pool.pending)
	for _, txs := range pool.queue {
		queued += len(txs)
	}
	return
}

180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and nonce.
func (pool *TxPool) Content() (map[common.Address]map[uint64][]*types.Transaction, map[common.Address]map[uint64][]*types.Transaction) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

	// Retrieve all the pending transactions and sort by account and by nonce
	pending := make(map[common.Address]map[uint64][]*types.Transaction)
	for _, tx := range pool.pending {
		account, _ := tx.From()

		owned, ok := pending[account]
		if !ok {
			owned = make(map[uint64][]*types.Transaction)
			pending[account] = owned
		}
		owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
	}
	// Retrieve all the queued transactions and sort by account and by nonce
	queued := make(map[common.Address]map[uint64][]*types.Transaction)
	for account, txs := range pool.queue {
		owned := make(map[uint64][]*types.Transaction)
		for _, tx := range txs {
			owned[tx.Nonce()] = append(owned[tx.Nonce()], tx)
		}
		queued[account] = owned
	}
	return pending, queued
}

210 211 212 213 214 215 216 217
// SetLocal marks a transaction as local, skipping gas price
//  check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
	pool.localTx.add(tx.Hash())
}

218 219 220
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
221
	local := pool.localTx.contains(tx.Hash())
222
	// Drop transactions under our own minimal accepted gas price
223
	if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
224 225 226
		return ErrCheap
	}

227 228 229 230 231
	currentState, err := pool.currentState()
	if err != nil {
		return err
	}

232
	from, err := tx.From()
233
	if err != nil {
F
Felix Lange 已提交
234
		return ErrInvalidSender
O
obscuren 已提交
235
	}
O
obscuren 已提交
236

237
	// Make sure the account exist. Non existent accounts
O
obscuren 已提交
238
	// haven't got funds and well therefor never pass.
239
	if !currentState.HasAccount(from) {
O
obscuren 已提交
240
		return ErrNonExistentAccount
241
	}
O
obscuren 已提交
242

243
	// Last but not least check for nonce errors
244
	if currentState.GetNonce(from) > tx.Nonce() {
245 246 247
		return ErrNonce
	}

O
obscuren 已提交
248 249
	// Check the transaction doesn't exceed the current
	// block limit gas.
250
	if pool.gasLimit().Cmp(tx.Gas()) < 0 {
251 252 253
		return ErrGasLimit
	}

O
obscuren 已提交
254 255 256
	// Transactions can't be negative. This may never happen
	// using RLP decoded transactions but may occur if you create
	// a transaction using the RPC for example.
257
	if tx.Value().Cmp(common.Big0) < 0 {
258 259 260
		return ErrNegativeValue
	}

O
obscuren 已提交
261 262
	// Transactor should have enough funds to cover the costs
	// cost == V + GP * GL
263
	if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
O
obscuren 已提交
264 265 266
		return ErrInsufficientFunds
	}

267
	intrGas := IntrinsicGas(tx.Data(), MessageCreatesContract(tx), pool.homestead)
268
	if tx.Gas().Cmp(intrGas) < 0 {
O
obscuren 已提交
269 270 271 272
		return ErrIntrinsicGas
	}

	return nil
O
obscuren 已提交
273 274
}

275
// validate and queue transactions.
276
func (self *TxPool) add(tx *types.Transaction) error {
277
	hash := tx.Hash()
278

O
obscuren 已提交
279
	if self.pending[hash] != nil {
280
		return fmt.Errorf("Known transaction (%x)", hash[:4])
281
	}
282
	err := self.validateTx(tx)
283 284 285
	if err != nil {
		return err
	}
286
	self.queueTx(hash, tx)
O
obscuren 已提交
287 288

	if glog.V(logger.Debug) {
289 290 291 292 293 294 295 296 297 298 299
		var toname string
		if to := tx.To(); to != nil {
			toname = common.Bytes2Hex(to[:4])
		} else {
			toname = "[NEW_CONTRACT]"
		}
		// we can ignore the error here because From is
		// verified in ValidateTransaction.
		f, _ := tx.From()
		from := common.Bytes2Hex(f[:4])
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
O
obscuren 已提交
300
	}
301 302 303 304

	return nil
}

305 306 307 308 309 310 311 312 313 314 315
// queueTx will queue an unknown transaction
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
	from, _ := tx.From() // already validated
	if self.queue[from] == nil {
		self.queue[from] = make(map[common.Hash]*types.Transaction)
	}
	self.queue[from][hash] = tx
}

// addTx will add a transaction to the pending (processable queue) list of transactions
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
316 317 318 319 320
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}

321 322 323 324 325
	if _, ok := pool.pending[hash]; !ok {
		pool.pending[hash] = tx

		// Increment the nonce on the pending state. This can only happen if
		// the nonce is +1 to the previous one.
326
		pool.pendingState.SetNonce(addr, tx.Nonce()+1)
327 328 329 330 331 332 333
		// Notify the subscribers. This event is posted in a goroutine
		// because it's possible that somewhere during the post "Remove transaction"
		// gets called which will then wait for the global tx pool lock and deadlock.
		go pool.eventMux.Post(TxPreEvent{tx})
	}
}

334
// Add queues a single transaction in the pool if it is valid.
335
func (self *TxPool) Add(tx *types.Transaction) error {
336 337
	self.mu.Lock()
	defer self.mu.Unlock()
338

339 340
	if err := self.add(tx); err != nil {
		return err
341
	}
342 343
	self.checkQueue()
	return nil
344
}
345

346
// AddTransactions attempts to queue all valid transactions in txs.
Z
zelig 已提交
347
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
348 349 350
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
351
	for _, tx := range txs {
352
		if err := self.add(tx); err != nil {
353
			glog.V(logger.Debug).Infoln("tx error:", err)
Z
zelig 已提交
354
		} else {
355
			h := tx.Hash()
O
obscuren 已提交
356
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
Z
zelig 已提交
357 358
		}
	}
359 360 361

	// check and validate the queueue
	self.checkQueue()
Z
zelig 已提交
362 363
}

364 365
// GetTransaction returns a transaction if it is contained in the pool
// and nil otherwise.
366 367
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
	// check the txs first
O
obscuren 已提交
368
	if tx, ok := tp.pending[hash]; ok {
369 370 371 372
		return tx
	}
	// check queue
	for _, txs := range tp.queue {
373 374
		if tx, ok := txs[hash]; ok {
			return tx
375 376 377 378 379
		}
	}
	return nil
}

380
// GetTransactions returns all currently processable transactions.
381
// The returned slice may be modified by the caller.
O
Merge  
obscuren 已提交
382
func (self *TxPool) GetTransactions() (txs types.Transactions) {
383 384 385 386 387 388 389
	self.mu.Lock()
	defer self.mu.Unlock()

	// check queue first
	self.checkQueue()
	// invalidate any txs
	self.validatePool()
390

O
obscuren 已提交
391
	txs = make(types.Transactions, len(self.pending))
O
obscuren 已提交
392
	i := 0
O
obscuren 已提交
393
	for _, tx := range self.pending {
O
Merge  
obscuren 已提交
394
		txs[i] = tx
O
obscuren 已提交
395
		i++
O
Merge  
obscuren 已提交
396
	}
397
	return txs
398 399
}

400
// GetQueuedTransactions returns all non-processable transactions.
401 402 403 404
func (self *TxPool) GetQueuedTransactions() types.Transactions {
	self.mu.RLock()
	defer self.mu.RUnlock()

405 406 407 408 409
	var ret types.Transactions
	for _, txs := range self.queue {
		for _, tx := range txs {
			ret = append(ret, tx)
		}
410
	}
411
	sort.Sort(types.TxByNonce(ret))
412
	return ret
413 414
}

415
// RemoveTransactions removes all given transactions from the pool.
416
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
417 418
	self.mu.Lock()
	defer self.mu.Unlock()
419
	for _, tx := range txs {
420
		self.RemoveTx(tx.Hash())
421 422 423
	}
}

424 425
// RemoveTx removes the transaction with the given hash from the pool.
func (pool *TxPool) RemoveTx(hash common.Hash) {
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
	// delete from pending pool
	delete(pool.pending, hash)
	// delete from queue
	for address, txs := range pool.queue {
		if _, ok := txs[hash]; ok {
			if len(txs) == 1 {
				// if only one tx, remove entire address entry.
				delete(pool.queue, address)
			} else {
				delete(txs, hash)
			}
			break
		}
	}
}

442
// checkQueue moves transactions that have become processable to main pool.
443
func (pool *TxPool) checkQueue() {
444 445 446 447
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}
448

449
	var promote txQueue
450
	for address, txs := range pool.queue {
451 452 453 454 455
		currentState, err := pool.currentState()
		if err != nil {
			glog.Errorf("could not get current state: %v", err)
			return
		}
456 457 458 459 460 461 462
		balance := currentState.GetBalance(address)

		var (
			guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
			trueNonce    = currentState.GetNonce(address)      // nonce known by the last state
		)
		promote = promote[:0]
463
		for hash, tx := range txs {
464 465 466 467 468
			// Drop processed or out of fund transactions
			if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
				if glog.V(logger.Core) {
					glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
				}
469
				delete(txs, hash)
470 471
				continue
			}
472 473 474 475 476 477 478 479 480 481
			// Collect the remaining transactions for the next pass.
			promote = append(promote, txQueueEntry{hash, address, tx})
		}
		// Find the next consecutive nonce range starting at the current account nonce,
		// pushing the guessed nonce forward if we add consecutive transactions.
		sort.Sort(promote)
		for i, entry := range promote {
			// If we reached a gap in the nonces, enforce transaction limit and stop
			if entry.Nonce() > guessedNonce {
				if len(promote)-i > maxQueued {
482
					if glog.V(logger.Debug) {
483
						glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
484
					}
485 486
					for _, drop := range promote[i+maxQueued:] {
						delete(txs, drop.hash)
487 488
					}
				}
489 490
				break
			}
491 492 493 494 495 496 497
			// Otherwise promote the transaction and move the guess nonce if needed
			pool.addTx(entry.hash, address, entry.Transaction)
			delete(txs, entry.hash)

			if entry.Nonce() == guessedNonce {
				guessedNonce++
			}
498
		}
499 500
		// Delete the entire queue entry if it became empty.
		if len(txs) == 0 {
501 502 503 504
			delete(pool.queue, address)
		}
	}
}
505

506
// validatePool removes invalid and processed transactions from the main pool.
507 508 509 510
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
// sequent (Still valid) transactions are moved back into the future queue. This
// is important to prevent a drained account from DOSing the network with non
// executable transactions.
511
func (pool *TxPool) validatePool() {
512 513 514 515 516
	state, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
517 518 519 520 521
	balanceCache := make(map[common.Address]*big.Int)

	// Clean up the pending pool, accumulating invalid nonces
	gaps := make(map[common.Address]uint64)

O
obscuren 已提交
522
	for hash, tx := range pool.pending {
523 524 525 526 527 528 529 530 531 532
		sender, _ := tx.From() // err already checked

		// Perform light nonce and balance validation
		balance := balanceCache[sender]
		if balance == nil {
			balance = state.GetBalance(sender)
			balanceCache[sender] = balance
		}
		if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
			// Remove an already past it invalidated transaction
O
obscuren 已提交
533
			if glog.V(logger.Core) {
534
				glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
535
			}
O
obscuren 已提交
536
			delete(pool.pending, hash)
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556

			// Track the smallest invalid nonce to postpone subsequent transactions
			if !past {
				if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
					gaps[sender] = tx.Nonce()
				}
			}
		}
	}
	// Move all transactions after a gap back to the future queue
	if len(gaps) > 0 {
		for hash, tx := range pool.pending {
			sender, _ := tx.From()
			if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
				if glog.V(logger.Core) {
					glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
				}
				pool.queueTx(hash, tx)
				delete(pool.pending, hash)
			}
557 558 559
		}
	}
}
560 561 562 563 564

type txQueue []txQueueEntry

type txQueueEntry struct {
	hash common.Hash
565
	addr common.Address
566 567 568 569 570
	*types.Transaction
}

func (q txQueue) Len() int           { return len(q) }
func (q txQueue) Swap(i, j int)      { q[i], q[j] = q[j], q[i] }
571
func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }
572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617

// txSet represents a set of transaction hashes in which entries
//  are automatically dropped after txSetDuration time
type txSet struct {
	txMap          map[common.Hash]struct{}
	txOrd          map[uint64]txOrdType
	addPtr, delPtr uint64
}

const txSetDuration = time.Hour * 2

// txOrdType represents an entry in the time-ordered list of transaction hashes
type txOrdType struct {
	hash common.Hash
	time time.Time
}

// newTxSet creates a new transaction set
func newTxSet() *txSet {
	return &txSet{
		txMap: make(map[common.Hash]struct{}),
		txOrd: make(map[uint64]txOrdType),
	}
}

// contains returns true if the set contains the given transaction hash
// (not thread safe, should be called from a locked environment)
func (self *txSet) contains(hash common.Hash) bool {
	_, ok := self.txMap[hash]
	return ok
}

// add adds a transaction hash to the set, then removes entries older than txSetDuration
// (not thread safe, should be called from a locked environment)
func (self *txSet) add(hash common.Hash) {
	self.txMap[hash] = struct{}{}
	now := time.Now()
	self.txOrd[self.addPtr] = txOrdType{hash: hash, time: now}
	self.addPtr++
	delBefore := now.Add(-txSetDuration)
	for self.delPtr < self.addPtr && self.txOrd[self.delPtr].time.Before(delBefore) {
		delete(self.txMap, self.txOrd[self.delPtr].hash)
		delete(self.txOrd, self.delPtr)
		self.delPtr++
	}
}