transaction_pool.go 16.8 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17
package core
O
obscuren 已提交
18 19

import (
20
	"errors"
O
obscuren 已提交
21
	"fmt"
O
obscuren 已提交
22
	"math/big"
23
	"sort"
24
	"sync"
25
	"time"
26

O
obscuren 已提交
27
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/core/state"
29
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
30
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/logger/glog"
O
obscuren 已提交
33 34
)

35
var (
36
	// Transaction Pool Errors
O
obscuren 已提交
37
	ErrInvalidSender      = errors.New("Invalid sender")
38
	ErrNonce              = errors.New("Nonce too low")
39
	ErrCheap              = errors.New("Gas price too low for acceptance")
40
	ErrBalance            = errors.New("Insufficient balance")
41
	ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
42
	ErrInsufficientFunds  = errors.New("Insufficient funds for gas * price + value")
O
obscuren 已提交
43
	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
44
	ErrGasLimit           = errors.New("Exceeds block gas limit")
45
	ErrNegativeValue      = errors.New("Negative value")
46
)
Z
zelig 已提交
47

48
const (
49
	maxQueued = 64 // max limit of queued txs per address
50 51
)

52
type stateFn func() (*state.StateDB, error)
53

54 55 56 57 58 59 60
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
O
obscuren 已提交
61
type TxPool struct {
62 63
	quit         chan bool // Quiting channel
	currentState stateFn   // The state function which will allow us to do some pre checkes
64
	pendingState *state.ManagedState
65
	gasLimit     func() *big.Int // The current gas limit function callback
66
	minGasPrice  *big.Int
67
	eventMux     *event.TypeMux
68
	events       event.Subscription
69 70 71 72
	localTx      *txSet
	mu           sync.RWMutex
	pending      map[common.Hash]*types.Transaction // processable transactions
	queue        map[common.Address]map[common.Hash]*types.Transaction
O
obscuren 已提交
73 74
}

75
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
76
	pool := &TxPool{
O
obscuren 已提交
77
		pending:      make(map[common.Hash]*types.Transaction),
78 79 80 81 82
		queue:        make(map[common.Address]map[common.Hash]*types.Transaction),
		quit:         make(chan bool),
		eventMux:     eventMux,
		currentState: currentStateFn,
		gasLimit:     gasLimitFn,
83
		minGasPrice:  new(big.Int),
84
		pendingState: nil,
85
		localTx:      newTxSet(),
86
		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
O
obscuren 已提交
87
	}
88 89 90
	go pool.eventLoop()

	return pool
91 92
}

93
func (pool *TxPool) eventLoop() {
O
obscuren 已提交
94 95 96
	// Track chain events. When a chain events occurs (new chain canon block)
	// we need to know the new state. The new state will help us determine
	// the nonces in the managed state
97
	for ev := range pool.events.Chan() {
98
		switch ev := ev.Data.(type) {
99
		case ChainHeadEvent:
100
			pool.mu.Lock()
101
			pool.resetState()
102
			pool.mu.Unlock()
103
		case GasPriceChanged:
104
			pool.mu.Lock()
105
			pool.minGasPrice = ev.Price
106 107 108
			pool.mu.Unlock()
		case RemovedTransactionEvent:
			pool.AddTransactions(ev.Txs)
109
		}
110
	}
O
obscuren 已提交
111 112
}

113
func (pool *TxPool) resetState() {
114 115 116 117 118 119 120 121 122 123 124
	currentState, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
	managedState := state.ManageState(currentState)
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get managed state: %v", err)
		return
	}
	pool.pendingState = managedState
125 126 127 128 129 130 131 132 133 134 135 136 137

	// validate the pool of pending transactions, this will remove
	// any transactions that have been included in the block or
	// have been invalidated because of another transaction (e.g.
	// higher gas price)
	pool.validatePool()

	// Loop over the pending transactions and base the nonce of the new
	// pending transaction set.
	for _, tx := range pool.pending {
		if addr, err := tx.From(); err == nil {
			// Set the nonce. Transaction nonce can never be lower
			// than the state nonce; validatePool took care of that.
138 139
			if pool.pendingState.GetNonce(addr) <= tx.Nonce() {
				pool.pendingState.SetNonce(addr, tx.Nonce()+1)
140
			}
141 142 143 144 145 146 147
		}
	}
	// Check the queue and move transactions over to the pending if possible
	// or remove those that have become invalid
	pool.checkQueue()
}

148 149 150
func (pool *TxPool) Stop() {
	close(pool.quit)
	pool.events.Unsubscribe()
151
	glog.V(logger.Info).Infoln("Transaction pool stopped")
152 153 154 155 156 157
}

func (pool *TxPool) State() *state.ManagedState {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

158
	return pool.pendingState
159 160
}

161 162 163 164 165 166 167 168 169 170 171
func (pool *TxPool) Stats() (pending int, queued int) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

	pending = len(pool.pending)
	for _, txs := range pool.queue {
		queued += len(txs)
	}
	return
}

172 173 174 175 176 177 178 179
// SetLocal marks a transaction as local, skipping gas price
//  check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
	pool.localTx.add(tx.Hash())
}

180 181 182
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
183
	// Validate sender
O
obscuren 已提交
184 185 186 187 188
	var (
		from common.Address
		err  error
	)

189
	local := pool.localTx.contains(tx.Hash())
190
	// Drop transactions under our own minimal accepted gas price
191
	if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
192 193 194
		return ErrCheap
	}

O
obscuren 已提交
195 196
	// Validate the transaction sender and it's sig. Throw
	// if the from fields is invalid.
O
obscuren 已提交
197
	if from, err = tx.From(); err != nil {
F
Felix Lange 已提交
198
		return ErrInvalidSender
O
obscuren 已提交
199
	}
O
obscuren 已提交
200

201
	// Make sure the account exist. Non existent accounts
O
obscuren 已提交
202
	// haven't got funds and well therefor never pass.
203 204 205 206 207
	currentState, err := pool.currentState()
	if err != nil {
		return err
	}
	if !currentState.HasAccount(from) {
O
obscuren 已提交
208
		return ErrNonExistentAccount
209
	}
O
obscuren 已提交
210

211
	// Last but not least check for nonce errors
212
	if currentState.GetNonce(from) > tx.Nonce() {
213 214 215
		return ErrNonce
	}

O
obscuren 已提交
216 217
	// Check the transaction doesn't exceed the current
	// block limit gas.
218
	if pool.gasLimit().Cmp(tx.Gas()) < 0 {
219 220 221
		return ErrGasLimit
	}

O
obscuren 已提交
222 223 224
	// Transactions can't be negative. This may never happen
	// using RLP decoded transactions but may occur if you create
	// a transaction using the RPC for example.
225
	if tx.Value().Cmp(common.Big0) < 0 {
226 227 228
		return ErrNegativeValue
	}

O
obscuren 已提交
229 230
	// Transactor should have enough funds to cover the costs
	// cost == V + GP * GL
231
	if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
O
obscuren 已提交
232 233 234
		return ErrInsufficientFunds
	}

O
obscuren 已提交
235
	// Should supply enough intrinsic gas
F
Felix Lange 已提交
236
	if tx.Gas().Cmp(IntrinsicGas(tx.Data())) < 0 {
O
obscuren 已提交
237 238 239 240
		return ErrIntrinsicGas
	}

	return nil
O
obscuren 已提交
241 242
}

243
// validate and queue transactions.
244
func (self *TxPool) add(tx *types.Transaction) error {
245
	hash := tx.Hash()
246

O
obscuren 已提交
247
	if self.pending[hash] != nil {
248
		return fmt.Errorf("Known transaction (%x)", hash[:4])
249
	}
250
	err := self.validateTx(tx)
251 252 253
	if err != nil {
		return err
	}
254
	self.queueTx(hash, tx)
O
obscuren 已提交
255 256

	if glog.V(logger.Debug) {
257 258 259 260 261 262 263 264 265 266 267
		var toname string
		if to := tx.To(); to != nil {
			toname = common.Bytes2Hex(to[:4])
		} else {
			toname = "[NEW_CONTRACT]"
		}
		// we can ignore the error here because From is
		// verified in ValidateTransaction.
		f, _ := tx.From()
		from := common.Bytes2Hex(f[:4])
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
O
obscuren 已提交
268
	}
269 270 271 272

	return nil
}

273 274 275 276 277 278 279 280 281 282 283
// queueTx will queue an unknown transaction
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
	from, _ := tx.From() // already validated
	if self.queue[from] == nil {
		self.queue[from] = make(map[common.Hash]*types.Transaction)
	}
	self.queue[from][hash] = tx
}

// addTx will add a transaction to the pending (processable queue) list of transactions
func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Transaction) {
284 285 286 287 288
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}

289 290 291 292 293
	if _, ok := pool.pending[hash]; !ok {
		pool.pending[hash] = tx

		// Increment the nonce on the pending state. This can only happen if
		// the nonce is +1 to the previous one.
294
		pool.pendingState.SetNonce(addr, tx.Nonce()+1)
295 296 297 298 299 300 301
		// Notify the subscribers. This event is posted in a goroutine
		// because it's possible that somewhere during the post "Remove transaction"
		// gets called which will then wait for the global tx pool lock and deadlock.
		go pool.eventMux.Post(TxPreEvent{tx})
	}
}

302
// Add queues a single transaction in the pool if it is valid.
303
func (self *TxPool) Add(tx *types.Transaction) error {
304 305
	self.mu.Lock()
	defer self.mu.Unlock()
306

307 308
	if err := self.add(tx); err != nil {
		return err
309
	}
310 311
	self.checkQueue()
	return nil
312
}
313

314
// AddTransactions attempts to queue all valid transactions in txs.
Z
zelig 已提交
315
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
316 317 318
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
319
	for _, tx := range txs {
320
		if err := self.add(tx); err != nil {
321
			glog.V(logger.Debug).Infoln("tx error:", err)
Z
zelig 已提交
322
		} else {
323
			h := tx.Hash()
O
obscuren 已提交
324
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
Z
zelig 已提交
325 326
		}
	}
327 328 329

	// check and validate the queueue
	self.checkQueue()
Z
zelig 已提交
330 331
}

332 333
// GetTransaction returns a transaction if it is contained in the pool
// and nil otherwise.
334 335
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
	// check the txs first
O
obscuren 已提交
336
	if tx, ok := tp.pending[hash]; ok {
337 338 339 340
		return tx
	}
	// check queue
	for _, txs := range tp.queue {
341 342
		if tx, ok := txs[hash]; ok {
			return tx
343 344 345 346 347
		}
	}
	return nil
}

348
// GetTransactions returns all currently processable transactions.
349
// The returned slice may be modified by the caller.
O
Merge  
obscuren 已提交
350
func (self *TxPool) GetTransactions() (txs types.Transactions) {
351 352 353 354 355 356 357
	self.mu.Lock()
	defer self.mu.Unlock()

	// check queue first
	self.checkQueue()
	// invalidate any txs
	self.validatePool()
358

O
obscuren 已提交
359
	txs = make(types.Transactions, len(self.pending))
O
obscuren 已提交
360
	i := 0
O
obscuren 已提交
361
	for _, tx := range self.pending {
O
Merge  
obscuren 已提交
362
		txs[i] = tx
O
obscuren 已提交
363
		i++
O
Merge  
obscuren 已提交
364
	}
365
	return txs
366 367
}

368
// GetQueuedTransactions returns all non-processable transactions.
369 370 371 372
func (self *TxPool) GetQueuedTransactions() types.Transactions {
	self.mu.RLock()
	defer self.mu.RUnlock()

373 374 375 376 377
	var ret types.Transactions
	for _, txs := range self.queue {
		for _, tx := range txs {
			ret = append(ret, tx)
		}
378
	}
379 380
	sort.Sort(types.TxByNonce{ret})
	return ret
381 382
}

383
// RemoveTransactions removes all given transactions from the pool.
384
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
385 386
	self.mu.Lock()
	defer self.mu.Unlock()
387
	for _, tx := range txs {
388
		self.RemoveTx(tx.Hash())
389 390 391
	}
}

392 393
// RemoveTx removes the transaction with the given hash from the pool.
func (pool *TxPool) RemoveTx(hash common.Hash) {
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
	// delete from pending pool
	delete(pool.pending, hash)
	// delete from queue
	for address, txs := range pool.queue {
		if _, ok := txs[hash]; ok {
			if len(txs) == 1 {
				// if only one tx, remove entire address entry.
				delete(pool.queue, address)
			} else {
				delete(txs, hash)
			}
			break
		}
	}
}

410
// checkQueue moves transactions that have become processable to main pool.
411
func (pool *TxPool) checkQueue() {
412 413 414 415
	// init delayed since tx pool could have been started before any state sync
	if pool.pendingState == nil {
		pool.resetState()
	}
416

417
	var promote txQueue
418
	for address, txs := range pool.queue {
419 420 421 422 423
		currentState, err := pool.currentState()
		if err != nil {
			glog.Errorf("could not get current state: %v", err)
			return
		}
424 425 426 427 428 429 430
		balance := currentState.GetBalance(address)

		var (
			guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
			trueNonce    = currentState.GetNonce(address)      // nonce known by the last state
		)
		promote = promote[:0]
431
		for hash, tx := range txs {
432 433 434 435 436
			// Drop processed or out of fund transactions
			if tx.Nonce() < trueNonce || balance.Cmp(tx.Cost()) < 0 {
				if glog.V(logger.Core) {
					glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
				}
437
				delete(txs, hash)
438 439
				continue
			}
440 441 442 443 444 445 446 447 448 449
			// Collect the remaining transactions for the next pass.
			promote = append(promote, txQueueEntry{hash, address, tx})
		}
		// Find the next consecutive nonce range starting at the current account nonce,
		// pushing the guessed nonce forward if we add consecutive transactions.
		sort.Sort(promote)
		for i, entry := range promote {
			// If we reached a gap in the nonces, enforce transaction limit and stop
			if entry.Nonce() > guessedNonce {
				if len(promote)-i > maxQueued {
450
					if glog.V(logger.Debug) {
451
						glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.hash[:]))
452
					}
453 454
					for _, drop := range promote[i+maxQueued:] {
						delete(txs, drop.hash)
455 456
					}
				}
457 458
				break
			}
459 460 461 462 463 464 465
			// Otherwise promote the transaction and move the guess nonce if needed
			pool.addTx(entry.hash, address, entry.Transaction)
			delete(txs, entry.hash)

			if entry.Nonce() == guessedNonce {
				guessedNonce++
			}
466
		}
467 468
		// Delete the entire queue entry if it became empty.
		if len(txs) == 0 {
469 470 471 472
			delete(pool.queue, address)
		}
	}
}
473

474
// validatePool removes invalid and processed transactions from the main pool.
475 476 477 478
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
// sequent (Still valid) transactions are moved back into the future queue. This
// is important to prevent a drained account from DOSing the network with non
// executable transactions.
479
func (pool *TxPool) validatePool() {
480 481 482 483 484
	state, err := pool.currentState()
	if err != nil {
		glog.V(logger.Info).Infoln("failed to get current state: %v", err)
		return
	}
485 486 487 488 489
	balanceCache := make(map[common.Address]*big.Int)

	// Clean up the pending pool, accumulating invalid nonces
	gaps := make(map[common.Address]uint64)

O
obscuren 已提交
490
	for hash, tx := range pool.pending {
491 492 493 494 495 496 497 498 499 500
		sender, _ := tx.From() // err already checked

		// Perform light nonce and balance validation
		balance := balanceCache[sender]
		if balance == nil {
			balance = state.GetBalance(sender)
			balanceCache[sender] = balance
		}
		if past := state.GetNonce(sender) > tx.Nonce(); past || balance.Cmp(tx.Cost()) < 0 {
			// Remove an already past it invalidated transaction
O
obscuren 已提交
501
			if glog.V(logger.Core) {
502
				glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
503
			}
O
obscuren 已提交
504
			delete(pool.pending, hash)
505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524

			// Track the smallest invalid nonce to postpone subsequent transactions
			if !past {
				if prev, ok := gaps[sender]; !ok || tx.Nonce() < prev {
					gaps[sender] = tx.Nonce()
				}
			}
		}
	}
	// Move all transactions after a gap back to the future queue
	if len(gaps) > 0 {
		for hash, tx := range pool.pending {
			sender, _ := tx.From()
			if gap, ok := gaps[sender]; ok && tx.Nonce() >= gap {
				if glog.V(logger.Core) {
					glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
				}
				pool.queueTx(hash, tx)
				delete(pool.pending, hash)
			}
525 526 527
		}
	}
}
528 529 530 531 532

type txQueue []txQueueEntry

type txQueueEntry struct {
	hash common.Hash
533
	addr common.Address
534 535 536 537 538
	*types.Transaction
}

func (q txQueue) Len() int           { return len(q) }
func (q txQueue) Swap(i, j int)      { q[i], q[j] = q[j], q[i] }
539
func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585

// txSet represents a set of transaction hashes in which entries
//  are automatically dropped after txSetDuration time
type txSet struct {
	txMap          map[common.Hash]struct{}
	txOrd          map[uint64]txOrdType
	addPtr, delPtr uint64
}

const txSetDuration = time.Hour * 2

// txOrdType represents an entry in the time-ordered list of transaction hashes
type txOrdType struct {
	hash common.Hash
	time time.Time
}

// newTxSet creates a new transaction set
func newTxSet() *txSet {
	return &txSet{
		txMap: make(map[common.Hash]struct{}),
		txOrd: make(map[uint64]txOrdType),
	}
}

// contains returns true if the set contains the given transaction hash
// (not thread safe, should be called from a locked environment)
func (self *txSet) contains(hash common.Hash) bool {
	_, ok := self.txMap[hash]
	return ok
}

// add adds a transaction hash to the set, then removes entries older than txSetDuration
// (not thread safe, should be called from a locked environment)
func (self *txSet) add(hash common.Hash) {
	self.txMap[hash] = struct{}{}
	now := time.Now()
	self.txOrd[self.addPtr] = txOrdType{hash: hash, time: now}
	self.addPtr++
	delBefore := now.Add(-txSetDuration)
	for self.delPtr < self.addPtr && self.txOrd[self.delPtr].time.Before(delBefore) {
		delete(self.txMap, self.txOrd[self.delPtr].hash)
		delete(self.txOrd, self.delPtr)
		self.delPtr++
	}
}