tx_pool.go 23.7 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17
package core
O
obscuren 已提交
18 19

import (
20
	"errors"
O
obscuren 已提交
21
	"fmt"
O
obscuren 已提交
22
	"math/big"
23
	"sort"
24
	"sync"
25
	"time"
26

O
obscuren 已提交
27
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/core/state"
29
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
30
	"github.com/ethereum/go-ethereum/event"
31
	"github.com/ethereum/go-ethereum/log"
32
	"github.com/ethereum/go-ethereum/metrics"
33
	"github.com/ethereum/go-ethereum/params"
34
	"gopkg.in/karalabe/cookiejar.v2/collections/prque"
O
obscuren 已提交
35 36
)

37
var (
38
	// Transaction Pool Errors
39 40 41 42 43 44 45 46
	ErrInvalidSender     = errors.New("Invalid sender")
	ErrNonce             = errors.New("Nonce too low")
	ErrCheap             = errors.New("Gas price too low for acceptance")
	ErrBalance           = errors.New("Insufficient balance")
	ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
	ErrIntrinsicGas      = errors.New("Intrinsic gas too low")
	ErrGasLimit          = errors.New("Exceeds block gas limit")
	ErrNegativeValue     = errors.New("Negative value")
47
)
Z
zelig 已提交
48

49
var (
50 51 52 53 54 55
	minPendingPerAccount = uint64(16)    // Min number of guaranteed transaction slots per address
	maxPendingTotal      = uint64(4096)  // Max limit of pending transactions from all accounts (soft)
	maxQueuedPerAccount  = uint64(64)    // Max limit of queued transactions per address
	maxQueuedInTotal     = uint64(1024)  // Max limit of queued transactions from all accounts
	maxQueuedLifetime    = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
	evictionInterval     = time.Minute   // Time interval to check for evictable transactions
56 57
)

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
var (
	// Metrics for the pending pool
	pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard")
	pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace")
	pendingRLCounter      = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting
	pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds")   // Dropped due to out-of-funds

	// Metrics for the queued pool
	queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard")
	queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace")
	queuedRLCounter      = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting
	queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds")   // Dropped due to out-of-funds

	// General tx metrics
	invalidTxCounter = metrics.NewCounter("txpool/invalid")
)

75
type stateFn func() (*state.StateDB, error)
76

77 78 79 80 81 82 83
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
O
obscuren 已提交
84
type TxPool struct {
85
	config       *params.ChainConfig
86
	currentState stateFn // The state function which will allow us to do some pre checks
87
	pendingState *state.ManagedState
88
	gasLimit     func() *big.Int // The current gas limit function callback
89
	minGasPrice  *big.Int
90
	eventMux     *event.TypeMux
91
	events       *event.TypeMuxSubscription
92
	localTx      *txSet
J
Jeffrey Wilcke 已提交
93
	signer       types.Signer
94
	mu           sync.RWMutex
95

96 97
	pending map[common.Address]*txList         // All currently processable transactions
	queue   map[common.Address]*txList         // Queued but non-processable transactions
98
	all     map[common.Hash]*types.Transaction // All transactions to allow lookups
99
	beats   map[common.Address]time.Time       // Last heartbeat from each known account
100

101 102
	wg   sync.WaitGroup // for shutdown sync
	quit chan struct{}
103

104
	homestead bool
O
obscuren 已提交
105 106
}

107
func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
108
	pool := &TxPool{
109
		config:       config,
J
Jeffrey Wilcke 已提交
110
		signer:       types.NewEIP155Signer(config.ChainId),
111 112
		pending:      make(map[common.Address]*txList),
		queue:        make(map[common.Address]*txList),
113
		all:          make(map[common.Hash]*types.Transaction),
114
		beats:        make(map[common.Address]time.Time),
115 116 117
		eventMux:     eventMux,
		currentState: currentStateFn,
		gasLimit:     gasLimitFn,
118
		minGasPrice:  new(big.Int),
119
		pendingState: nil,
120
		localTx:      newTxSet(),
121
		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
122
		quit:         make(chan struct{}),
O
obscuren 已提交
123
	}
124

125 126
	pool.resetState()

127
	pool.wg.Add(2)
128
	go pool.eventLoop()
129
	go pool.expirationLoop()
130 131

	return pool
132 133
}

134
func (pool *TxPool) eventLoop() {
135 136
	defer pool.wg.Done()

O
obscuren 已提交
137 138 139
	// Track chain events. When a chain events occurs (new chain canon block)
	// we need to know the new state. The new state will help us determine
	// the nonces in the managed state
140
	for ev := range pool.events.Chan() {
141
		switch ev := ev.Data.(type) {
142
		case ChainHeadEvent:
143
			pool.mu.Lock()
J
Jeffrey Wilcke 已提交
144 145 146 147
			if ev.Block != nil {
				if pool.config.IsHomestead(ev.Block.Number()) {
					pool.homestead = true
				}
148 149
			}

150
			pool.resetState()
151
			pool.mu.Unlock()
152
		case GasPriceChanged:
153
			pool.mu.Lock()
154
			pool.minGasPrice = ev.Price
155 156
			pool.mu.Unlock()
		case RemovedTransactionEvent:
157
			pool.AddBatch(ev.Txs)
158
		}
159
	}
O
obscuren 已提交
160 161
}

162
func (pool *TxPool) resetState() {
163 164
	currentState, err := pool.currentState()
	if err != nil {
165
		log.Error(fmt.Sprintf("Failed to get current state: %v", err))
166 167 168 169
		return
	}
	managedState := state.ManageState(currentState)
	if err != nil {
170
		log.Error(fmt.Sprintf("Failed to get managed state: %v", err))
171 172 173
		return
	}
	pool.pendingState = managedState
174 175 176 177 178

	// validate the pool of pending transactions, this will remove
	// any transactions that have been included in the block or
	// have been invalidated because of another transaction (e.g.
	// higher gas price)
179
	pool.demoteUnexecutables(currentState)
180 181 182

	// Update all accounts to the latest known pending nonce
	for addr, list := range pool.pending {
183 184
		txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
		pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
185 186 187
	}
	// Check the queue and move transactions over to the pending if possible
	// or remove those that have become invalid
188
	pool.promoteExecutables(currentState)
189 190
}

191 192
func (pool *TxPool) Stop() {
	pool.events.Unsubscribe()
193
	close(pool.quit)
194
	pool.wg.Wait()
195
	log.Info(fmt.Sprint("Transaction pool stopped"))
196 197 198
}

func (pool *TxPool) State() *state.ManagedState {
199 200
	pool.mu.RLock()
	defer pool.mu.RUnlock()
201

202
	return pool.pendingState
203 204
}

205 206
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
207 208 209 210
func (pool *TxPool) Stats() (pending int, queued int) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

211 212
	for _, list := range pool.pending {
		pending += list.Len()
213
	}
214 215
	for _, list := range pool.queue {
		queued += list.Len()
216 217 218 219
	}
	return
}

220
// Content retrieves the data content of the transaction pool, returning all the
221 222
// pending as well as queued transactions, grouped by account and sorted by nonce.
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
223 224 225
	pool.mu.RLock()
	defer pool.mu.RUnlock()

226 227 228 229 230 231 232
	pending := make(map[common.Address]types.Transactions)
	for addr, list := range pool.pending {
		pending[addr] = list.Flatten()
	}
	queued := make(map[common.Address]types.Transactions)
	for addr, list := range pool.queue {
		queued[addr] = list.Flatten()
233 234 235 236
	}
	return pending, queued
}

237 238 239
// Pending retrieves all currently processable transactions, groupped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
240
func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
241 242 243
	pool.mu.Lock()
	defer pool.mu.Unlock()

244 245 246 247 248
	state, err := pool.currentState()
	if err != nil {
		return nil, err
	}

249
	// check queue first
250
	pool.promoteExecutables(state)
251 252

	// invalidate any txs
253
	pool.demoteUnexecutables(state)
254 255 256 257 258

	pending := make(map[common.Address]types.Transactions)
	for addr, list := range pool.pending {
		pending[addr] = list.Flatten()
	}
259
	return pending, nil
260 261
}

262 263 264 265 266 267 268 269
// SetLocal marks a transaction as local, skipping gas price
//  check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
	pool.localTx.add(tx.Hash())
}

270 271 272
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
273
	local := pool.localTx.contains(tx.Hash())
274
	// Drop transactions under our own minimal accepted gas price
275
	if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
276 277 278
		return ErrCheap
	}

279 280 281 282 283
	currentState, err := pool.currentState()
	if err != nil {
		return err
	}

J
Jeffrey Wilcke 已提交
284
	from, err := types.Sender(pool.signer, tx)
285
	if err != nil {
F
Felix Lange 已提交
286
		return ErrInvalidSender
O
obscuren 已提交
287
	}
288
	// Last but not least check for nonce errors
289
	if currentState.GetNonce(from) > tx.Nonce() {
290 291 292
		return ErrNonce
	}

O
obscuren 已提交
293 294
	// Check the transaction doesn't exceed the current
	// block limit gas.
295
	if pool.gasLimit().Cmp(tx.Gas()) < 0 {
296 297 298
		return ErrGasLimit
	}

O
obscuren 已提交
299 300 301
	// Transactions can't be negative. This may never happen
	// using RLP decoded transactions but may occur if you create
	// a transaction using the RPC for example.
302
	if tx.Value().Cmp(common.Big0) < 0 {
303 304 305
		return ErrNegativeValue
	}

O
obscuren 已提交
306 307
	// Transactor should have enough funds to cover the costs
	// cost == V + GP * GL
308
	if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
O
obscuren 已提交
309 310 311
		return ErrInsufficientFunds
	}

J
Jeffrey Wilcke 已提交
312
	intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
313
	if tx.Gas().Cmp(intrGas) < 0 {
O
obscuren 已提交
314 315 316 317
		return ErrIntrinsicGas
	}

	return nil
O
obscuren 已提交
318 319
}

320 321 322
// add validates a transaction and inserts it into the non-executable queue for
// later pending promotion and execution.
func (pool *TxPool) add(tx *types.Transaction) error {
P
Péter Szilágyi 已提交
323
	// If the transaction is already known, discard it
324
	hash := tx.Hash()
325 326
	if pool.all[hash] != nil {
		return fmt.Errorf("Known transaction: %x", hash[:4])
327
	}
328
	// Otherwise ensure basic validation passes and queue it up
329
	if err := pool.validateTx(tx); err != nil {
330
		invalidTxCounter.Inc(1)
331 332
		return err
	}
333
	pool.enqueueTx(hash, tx)
O
obscuren 已提交
334

335
	// Print a log message if low enough level is set
336
	log.Debug("", "msg", log.Lazy{Fn: func() string {
337
		rcpt := "[NEW_CONTRACT]"
338
		if to := tx.To(); to != nil {
339
			rcpt = common.Bytes2Hex(to[:4])
340
		}
J
Jeffrey Wilcke 已提交
341
		from, _ := types.Sender(pool.signer, tx) // from already verified during tx validation
342 343
		return fmt.Sprintf("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value(), hash)
	}})
344 345 346
	return nil
}

347
// enqueueTx inserts a new transaction into the non-executable transaction queue.
348 349 350 351
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
	// Try to insert the transaction into the future queue
J
Jeffrey Wilcke 已提交
352
	from, _ := types.Sender(pool.signer, tx) // already validated
353 354
	if pool.queue[from] == nil {
		pool.queue[from] = newTxList(false)
355
	}
356 357
	inserted, old := pool.queue[from].Add(tx)
	if !inserted {
358
		queuedDiscardCounter.Inc(1)
359 360 361 362 363
		return // An older transaction was better, discard this
	}
	// Discard any previous transaction and mark this
	if old != nil {
		delete(pool.all, old.Hash())
364
		queuedReplaceCounter.Inc(1)
365
	}
366
	pool.all[hash] = tx
367 368
}

369 370 371 372 373 374 375 376 377
// promoteTx adds a transaction to the pending (processable) list of transactions.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
	// Try to insert the transaction into the pending queue
	if pool.pending[addr] == nil {
		pool.pending[addr] = newTxList(true)
	}
	list := pool.pending[addr]
378

379 380 381 382
	inserted, old := list.Add(tx)
	if !inserted {
		// An older transaction was better, discard this
		delete(pool.all, hash)
383
		pendingDiscardCounter.Inc(1)
384
		return
385
	}
386 387 388
	// Otherwise discard any previous transaction and mark this
	if old != nil {
		delete(pool.all, old.Hash())
389
		pendingReplaceCounter.Inc(1)
390
	}
391
	pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
392

393
	// Set the potentially new pending nonce and notify any subsystems of the new tx
394
	pool.beats[addr] = time.Now()
395
	pool.pendingState.SetNonce(addr, tx.Nonce()+1)
396
	go pool.eventMux.Post(TxPreEvent{tx})
397 398
}

399
// Add queues a single transaction in the pool if it is valid.
400 401 402
func (pool *TxPool) Add(tx *types.Transaction) error {
	pool.mu.Lock()
	defer pool.mu.Unlock()
403

404
	if err := pool.add(tx); err != nil {
405
		return err
406
	}
407 408 409 410 411 412 413

	state, err := pool.currentState()
	if err != nil {
		return err
	}

	pool.promoteExecutables(state)
414

415
	return nil
416
}
417

418
// AddBatch attempts to queue a batch of transactions.
419
func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
420 421
	pool.mu.Lock()
	defer pool.mu.Unlock()
422

Z
zelig 已提交
423
	for _, tx := range txs {
424
		if err := pool.add(tx); err != nil {
425
			log.Debug(fmt.Sprint("tx error:", err))
Z
zelig 已提交
426 427
		}
	}
428 429 430 431 432 433 434 435 436

	state, err := pool.currentState()
	if err != nil {
		return err
	}

	pool.promoteExecutables(state)

	return nil
Z
zelig 已提交
437 438
}

439
// Get returns a transaction if it is contained in the pool
440
// and nil otherwise.
441 442 443
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
	pool.mu.RLock()
	defer pool.mu.RUnlock()
444

445
	return pool.all[hash]
446 447
}

448 449 450 451
// Remove removes the transaction with the given hash from the pool.
func (pool *TxPool) Remove(hash common.Hash) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
452

453
	pool.removeTx(hash)
454 455
}

456 457
// RemoveBatch removes all given transactions from the pool.
func (pool *TxPool) RemoveBatch(txs types.Transactions) {
458 459
	pool.mu.Lock()
	defer pool.mu.Unlock()
460

461 462 463
	for _, tx := range txs {
		pool.removeTx(tx.Hash())
	}
464 465
}

466 467
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
468
func (pool *TxPool) removeTx(hash common.Hash) {
469 470 471 472 473
	// Fetch the transaction we wish to delete
	tx, ok := pool.all[hash]
	if !ok {
		return
	}
J
Jeffrey Wilcke 已提交
474
	addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
475

476
	// Remove it from the list of known transactions
477 478
	delete(pool.all, hash)

479 480 481
	// Remove the transaction from the pending lists and reset the account nonce
	if pending := pool.pending[addr]; pending != nil {
		if removed, invalids := pending.Remove(tx); removed {
482
			// If no more transactions are left, remove the list
483 484
			if pending.Empty() {
				delete(pool.pending, addr)
485
				delete(pool.beats, addr)
486
			} else {
487
				// Otherwise postpone any invalidated transactions
488 489 490 491
				for _, tx := range invalids {
					pool.enqueueTx(tx.Hash(), tx)
				}
			}
492 493 494 495
			// Update the account nonce if needed
			if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
				pool.pendingState.SetNonce(addr, tx.Nonce())
			}
496
		}
497
	}
498 499 500 501 502 503
	// Transaction is in the future queue
	if future := pool.queue[addr]; future != nil {
		future.Remove(tx)
		if future.Empty() {
			delete(pool.queue, addr)
		}
504 505 506
	}
}

507 508 509
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
510
func (pool *TxPool) promoteExecutables(state *state.StateDB) {
511
	// Iterate over all accounts and promote any executable transactions
512
	queued := uint64(0)
513 514 515
	for addr, list := range pool.queue {
		// Drop all transactions that are deemed too old (low nonce)
		for _, tx := range list.Forward(state.GetNonce(addr)) {
516 517 518
			log.Debug("", "msg", log.Lazy{Fn: func() string {
				return fmt.Sprintf("Removed old queued transaction: %v", tx)
			}})
519
			delete(pool.all, tx.Hash())
520
		}
521 522 523
		// Drop all transactions that are too costly (low balance)
		drops, _ := list.Filter(state.GetBalance(addr))
		for _, tx := range drops {
524 525 526
			log.Debug("", "msg", log.Lazy{Fn: func() string {
				return fmt.Sprintf("Removed unpayable queued transaction: %v", tx)
			}})
527
			delete(pool.all, tx.Hash())
528
			queuedNofundsCounter.Inc(1)
529
		}
530 531
		// Gather all executable transactions and promote them
		for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
532 533 534
			log.Debug("", "msg", log.Lazy{Fn: func() string {
				return fmt.Sprintf("Promoting queued transaction: %v", tx)
			}})
535 536 537
			pool.promoteTx(addr, tx.Hash(), tx)
		}
		// Drop all transactions over the allowed limit
538
		for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
539 540 541
			log.Debug("", "msg", log.Lazy{Fn: func() string {
				return fmt.Sprintf("Removed cap-exceeding queued transaction: %v", tx)
			}})
542
			delete(pool.all, tx.Hash())
543
			queuedRLCounter.Inc(1)
544
		}
545 546
		queued += uint64(list.Len())

547
		// Delete the entire queue entry if it became empty.
548 549
		if list.Empty() {
			delete(pool.queue, addr)
550 551
		}
	}
552 553 554 555 556 557
	// If the pending limit is overflown, start equalizing allowances
	pending := uint64(0)
	for _, list := range pool.pending {
		pending += uint64(list.Len())
	}
	if pending > maxPendingTotal {
558
		pendingBeforeCap := pending
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604
		// Assemble a spam order to penalize large transactors first
		spammers := prque.New()
		for addr, list := range pool.pending {
			// Only evict transactions from high rollers
			if uint64(list.Len()) > minPendingPerAccount {
				// Skip local accounts as pools should maintain backlogs for themselves
				for _, tx := range list.txs.items {
					if !pool.localTx.contains(tx.Hash()) {
						spammers.Push(addr, float32(list.Len()))
					}
					break // Checking on transaction for locality is enough
				}
			}
		}
		// Gradually drop transactions from offenders
		offenders := []common.Address{}
		for pending > maxPendingTotal && !spammers.Empty() {
			// Retrieve the next offender if not local address
			offender, _ := spammers.Pop()
			offenders = append(offenders, offender.(common.Address))

			// Equalize balances until all the same or below threshold
			if len(offenders) > 1 {
				// Calculate the equalization threshold for all current offenders
				threshold := pool.pending[offender.(common.Address)].Len()

				// Iteratively reduce all offenders until below limit or threshold reached
				for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
					for i := 0; i < len(offenders)-1; i++ {
						list := pool.pending[offenders[i]]
						list.Cap(list.Len() - 1)
						pending--
					}
				}
			}
		}
		// If still above threshold, reduce to limit or min allowance
		if pending > maxPendingTotal && len(offenders) > 0 {
			for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount {
				for _, addr := range offenders {
					list := pool.pending[addr]
					list.Cap(list.Len() - 1)
					pending--
				}
			}
		}
605
		pendingRLCounter.Inc(int64(pendingBeforeCap - pending))
606
	}
607 608 609 610
	// If we've queued more transactions than the hard limit, drop oldest ones
	if queued > maxQueuedInTotal {
		// Sort all accounts with queued transactions by heartbeat
		addresses := make(addresssByHeartbeat, 0, len(pool.queue))
F
Felix Lange 已提交
611
		for addr := range pool.queue {
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628
			addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
		}
		sort.Sort(addresses)

		// Drop transactions until the total is below the limit
		for drop := queued - maxQueuedInTotal; drop > 0; {
			addr := addresses[len(addresses)-1]
			list := pool.queue[addr.address]

			addresses = addresses[:len(addresses)-1]

			// Drop all transactions if they are less than the overflow
			if size := uint64(list.Len()); size <= drop {
				for _, tx := range list.Flatten() {
					pool.removeTx(tx.Hash())
				}
				drop -= size
629
				queuedRLCounter.Inc(int64(size))
630 631 632 633 634 635 636
				continue
			}
			// Otherwise drop only last few transactions
			txs := list.Flatten()
			for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
				pool.removeTx(txs[i].Hash())
				drop--
637
				queuedRLCounter.Inc(1)
638 639 640
			}
		}
	}
641
}
642

643 644 645
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
646
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
647 648 649
	// Iterate over all accounts and demote any non-executable transactions
	for addr, list := range pool.pending {
		nonce := state.GetNonce(addr)
650

651 652
		// Drop all transactions that are deemed too old (low nonce)
		for _, tx := range list.Forward(nonce) {
653 654 655
			log.Debug("", "msg", log.Lazy{Fn: func() string {
				return fmt.Sprintf("Removed old pending transaction: %v", tx)
			}})
656 657 658 659 660
			delete(pool.all, tx.Hash())
		}
		// Drop all transactions that are too costly (low balance), and queue any invalids back for later
		drops, invalids := list.Filter(state.GetBalance(addr))
		for _, tx := range drops {
661 662 663
			log.Debug("", "msg", log.Lazy{Fn: func() string {
				return fmt.Sprintf("Removed unpayable pending transaction: %v", tx)
			}})
664
			delete(pool.all, tx.Hash())
665
			pendingNofundsCounter.Inc(1)
666
		}
667
		for _, tx := range invalids {
668 669 670
			log.Debug("", "msg", log.Lazy{Fn: func() string {
				return fmt.Sprintf("Demoting pending transaction: %v", tx)
			}})
671 672 673 674 675
			pool.enqueueTx(tx.Hash(), tx)
		}
		// Delete the entire queue entry if it became empty.
		if list.Empty() {
			delete(pool.pending, addr)
676
			delete(pool.beats, addr)
677 678 679
		}
	}
}
680

681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
// expirationLoop is a loop that periodically iterates over all accounts with
// queued transactions and drop all that have been inactive for a prolonged amount
// of time.
func (pool *TxPool) expirationLoop() {
	defer pool.wg.Done()

	evict := time.NewTicker(evictionInterval)
	defer evict.Stop()

	for {
		select {
		case <-evict.C:
			pool.mu.Lock()
			for addr := range pool.queue {
				if time.Since(pool.beats[addr]) > maxQueuedLifetime {
					for _, tx := range pool.queue[addr].Flatten() {
						pool.removeTx(tx.Hash())
					}
				}
			}
			pool.mu.Unlock()

		case <-pool.quit:
			return
		}
	}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
	address   common.Address
	heartbeat time.Time
}

type addresssByHeartbeat []addressByHeartbeat

func (a addresssByHeartbeat) Len() int           { return len(a) }
func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
func (a addresssByHeartbeat) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }

721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765
// txSet represents a set of transaction hashes in which entries
//  are automatically dropped after txSetDuration time
type txSet struct {
	txMap          map[common.Hash]struct{}
	txOrd          map[uint64]txOrdType
	addPtr, delPtr uint64
}

const txSetDuration = time.Hour * 2

// txOrdType represents an entry in the time-ordered list of transaction hashes
type txOrdType struct {
	hash common.Hash
	time time.Time
}

// newTxSet creates a new transaction set
func newTxSet() *txSet {
	return &txSet{
		txMap: make(map[common.Hash]struct{}),
		txOrd: make(map[uint64]txOrdType),
	}
}

// contains returns true if the set contains the given transaction hash
// (not thread safe, should be called from a locked environment)
func (self *txSet) contains(hash common.Hash) bool {
	_, ok := self.txMap[hash]
	return ok
}

// add adds a transaction hash to the set, then removes entries older than txSetDuration
// (not thread safe, should be called from a locked environment)
func (self *txSet) add(hash common.Hash) {
	self.txMap[hash] = struct{}{}
	now := time.Now()
	self.txOrd[self.addPtr] = txOrdType{hash: hash, time: now}
	self.addPtr++
	delBefore := now.Add(-txSetDuration)
	for self.delPtr < self.addPtr && self.txOrd[self.delPtr].time.Before(delBefore) {
		delete(self.txMap, self.txOrd[self.delPtr].hash)
		delete(self.txOrd, self.delPtr)
		self.delPtr++
	}
}