tx_pool.go 23.5 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2014 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17
package core
O
obscuren 已提交
18 19

import (
20
	"errors"
O
obscuren 已提交
21
	"fmt"
O
obscuren 已提交
22
	"math/big"
23
	"sort"
24
	"sync"
25
	"time"
26

O
obscuren 已提交
27
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/core/state"
29
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
30
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/logger/glog"
33
	"github.com/ethereum/go-ethereum/metrics"
34
	"github.com/ethereum/go-ethereum/params"
35
	"gopkg.in/karalabe/cookiejar.v2/collections/prque"
O
obscuren 已提交
36 37
)

38
var (
39
	// Transaction Pool Errors
40 41 42 43 44 45 46 47
	ErrInvalidSender     = errors.New("Invalid sender")
	ErrNonce             = errors.New("Nonce too low")
	ErrCheap             = errors.New("Gas price too low for acceptance")
	ErrBalance           = errors.New("Insufficient balance")
	ErrInsufficientFunds = errors.New("Insufficient funds for gas * price + value")
	ErrIntrinsicGas      = errors.New("Intrinsic gas too low")
	ErrGasLimit          = errors.New("Exceeds block gas limit")
	ErrNegativeValue     = errors.New("Negative value")
48
)
Z
zelig 已提交
49

50
var (
51 52 53 54 55 56
	minPendingPerAccount = uint64(16)    // Min number of guaranteed transaction slots per address
	maxPendingTotal      = uint64(4096)  // Max limit of pending transactions from all accounts (soft)
	maxQueuedPerAccount  = uint64(64)    // Max limit of queued transactions per address
	maxQueuedInTotal     = uint64(1024)  // Max limit of queued transactions from all accounts
	maxQueuedLifetime    = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
	evictionInterval     = time.Minute   // Time interval to check for evictable transactions
57 58
)

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
var (
	// Metrics for the pending pool
	pendingDiscardCounter = metrics.NewCounter("txpool/pending/discard")
	pendingReplaceCounter = metrics.NewCounter("txpool/pending/replace")
	pendingRLCounter      = metrics.NewCounter("txpool/pending/ratelimit") // Dropped due to rate limiting
	pendingNofundsCounter = metrics.NewCounter("txpool/pending/nofunds")   // Dropped due to out-of-funds

	// Metrics for the queued pool
	queuedDiscardCounter = metrics.NewCounter("txpool/queued/discard")
	queuedReplaceCounter = metrics.NewCounter("txpool/queued/replace")
	queuedRLCounter      = metrics.NewCounter("txpool/queued/ratelimit") // Dropped due to rate limiting
	queuedNofundsCounter = metrics.NewCounter("txpool/queued/nofunds")   // Dropped due to out-of-funds

	// General tx metrics
	invalidTxCounter = metrics.NewCounter("txpool/invalid")
)

76
type stateFn func() (*state.StateDB, error)
77

78 79 80 81 82 83 84
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
//
// The pool separates processable transactions (which can be applied to the
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
O
obscuren 已提交
85
type TxPool struct {
86
	config       *params.ChainConfig
87
	currentState stateFn // The state function which will allow us to do some pre checks
88
	pendingState *state.ManagedState
89
	gasLimit     func() *big.Int // The current gas limit function callback
90
	minGasPrice  *big.Int
91
	eventMux     *event.TypeMux
92
	events       *event.TypeMuxSubscription
93
	localTx      *txSet
J
Jeffrey Wilcke 已提交
94
	signer       types.Signer
95
	mu           sync.RWMutex
96

97 98
	pending map[common.Address]*txList         // All currently processable transactions
	queue   map[common.Address]*txList         // Queued but non-processable transactions
99
	all     map[common.Hash]*types.Transaction // All transactions to allow lookups
100
	beats   map[common.Address]time.Time       // Last heartbeat from each known account
101

102 103
	wg   sync.WaitGroup // for shutdown sync
	quit chan struct{}
104

105
	homestead bool
O
obscuren 已提交
106 107
}

108
func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
109
	pool := &TxPool{
110
		config:       config,
J
Jeffrey Wilcke 已提交
111
		signer:       types.NewEIP155Signer(config.ChainId),
112 113
		pending:      make(map[common.Address]*txList),
		queue:        make(map[common.Address]*txList),
114
		all:          make(map[common.Hash]*types.Transaction),
115
		beats:        make(map[common.Address]time.Time),
116 117 118
		eventMux:     eventMux,
		currentState: currentStateFn,
		gasLimit:     gasLimitFn,
119
		minGasPrice:  new(big.Int),
120
		pendingState: nil,
121
		localTx:      newTxSet(),
122
		events:       eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
123
		quit:         make(chan struct{}),
O
obscuren 已提交
124
	}
125

126 127
	pool.resetState()

128
	pool.wg.Add(2)
129
	go pool.eventLoop()
130
	go pool.expirationLoop()
131 132

	return pool
133 134
}

135
func (pool *TxPool) eventLoop() {
136 137
	defer pool.wg.Done()

O
obscuren 已提交
138 139 140
	// Track chain events. When a chain events occurs (new chain canon block)
	// we need to know the new state. The new state will help us determine
	// the nonces in the managed state
141
	for ev := range pool.events.Chan() {
142
		switch ev := ev.Data.(type) {
143
		case ChainHeadEvent:
144
			pool.mu.Lock()
J
Jeffrey Wilcke 已提交
145 146 147 148
			if ev.Block != nil {
				if pool.config.IsHomestead(ev.Block.Number()) {
					pool.homestead = true
				}
149 150
			}

151
			pool.resetState()
152
			pool.mu.Unlock()
153
		case GasPriceChanged:
154
			pool.mu.Lock()
155
			pool.minGasPrice = ev.Price
156 157
			pool.mu.Unlock()
		case RemovedTransactionEvent:
158
			pool.AddBatch(ev.Txs)
159
		}
160
	}
O
obscuren 已提交
161 162
}

163
func (pool *TxPool) resetState() {
164 165
	currentState, err := pool.currentState()
	if err != nil {
166
		glog.V(logger.Error).Infof("Failed to get current state: %v", err)
167 168 169 170
		return
	}
	managedState := state.ManageState(currentState)
	if err != nil {
171
		glog.V(logger.Error).Infof("Failed to get managed state: %v", err)
172 173 174
		return
	}
	pool.pendingState = managedState
175 176 177 178 179

	// validate the pool of pending transactions, this will remove
	// any transactions that have been included in the block or
	// have been invalidated because of another transaction (e.g.
	// higher gas price)
180
	pool.demoteUnexecutables(currentState)
181 182 183

	// Update all accounts to the latest known pending nonce
	for addr, list := range pool.pending {
184 185
		txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway
		pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1)
186 187 188
	}
	// Check the queue and move transactions over to the pending if possible
	// or remove those that have become invalid
189
	pool.promoteExecutables(currentState)
190 191
}

192 193
func (pool *TxPool) Stop() {
	pool.events.Unsubscribe()
194
	close(pool.quit)
195
	pool.wg.Wait()
196
	glog.V(logger.Info).Infoln("Transaction pool stopped")
197 198 199
}

func (pool *TxPool) State() *state.ManagedState {
200 201
	pool.mu.RLock()
	defer pool.mu.RUnlock()
202

203
	return pool.pendingState
204 205
}

206 207
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
208 209 210 211
func (pool *TxPool) Stats() (pending int, queued int) {
	pool.mu.RLock()
	defer pool.mu.RUnlock()

212 213
	for _, list := range pool.pending {
		pending += list.Len()
214
	}
215 216
	for _, list := range pool.queue {
		queued += list.Len()
217 218 219 220
	}
	return
}

221
// Content retrieves the data content of the transaction pool, returning all the
222 223
// pending as well as queued transactions, grouped by account and sorted by nonce.
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
224 225 226
	pool.mu.RLock()
	defer pool.mu.RUnlock()

227 228 229 230 231 232 233
	pending := make(map[common.Address]types.Transactions)
	for addr, list := range pool.pending {
		pending[addr] = list.Flatten()
	}
	queued := make(map[common.Address]types.Transactions)
	for addr, list := range pool.queue {
		queued[addr] = list.Flatten()
234 235 236 237
	}
	return pending, queued
}

238 239 240
// Pending retrieves all currently processable transactions, groupped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
241
func (pool *TxPool) Pending() (map[common.Address]types.Transactions, error) {
242 243 244
	pool.mu.Lock()
	defer pool.mu.Unlock()

245 246 247 248 249
	state, err := pool.currentState()
	if err != nil {
		return nil, err
	}

250
	// check queue first
251
	pool.promoteExecutables(state)
252 253

	// invalidate any txs
254
	pool.demoteUnexecutables(state)
255 256 257 258 259

	pending := make(map[common.Address]types.Transactions)
	for addr, list := range pool.pending {
		pending[addr] = list.Flatten()
	}
260
	return pending, nil
261 262
}

263 264 265 266 267 268 269 270
// SetLocal marks a transaction as local, skipping gas price
//  check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
	pool.localTx.add(tx.Hash())
}

271 272 273
// validateTx checks whether a transaction is valid according
// to the consensus rules.
func (pool *TxPool) validateTx(tx *types.Transaction) error {
274
	local := pool.localTx.contains(tx.Hash())
275
	// Drop transactions under our own minimal accepted gas price
276
	if !local && pool.minGasPrice.Cmp(tx.GasPrice()) > 0 {
277 278 279
		return ErrCheap
	}

280 281 282 283 284
	currentState, err := pool.currentState()
	if err != nil {
		return err
	}

J
Jeffrey Wilcke 已提交
285
	from, err := types.Sender(pool.signer, tx)
286
	if err != nil {
F
Felix Lange 已提交
287
		return ErrInvalidSender
O
obscuren 已提交
288
	}
289
	// Last but not least check for nonce errors
290
	if currentState.GetNonce(from) > tx.Nonce() {
291 292 293
		return ErrNonce
	}

O
obscuren 已提交
294 295
	// Check the transaction doesn't exceed the current
	// block limit gas.
296
	if pool.gasLimit().Cmp(tx.Gas()) < 0 {
297 298 299
		return ErrGasLimit
	}

O
obscuren 已提交
300 301 302
	// Transactions can't be negative. This may never happen
	// using RLP decoded transactions but may occur if you create
	// a transaction using the RPC for example.
303
	if tx.Value().Cmp(common.Big0) < 0 {
304 305 306
		return ErrNegativeValue
	}

O
obscuren 已提交
307 308
	// Transactor should have enough funds to cover the costs
	// cost == V + GP * GL
309
	if currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
O
obscuren 已提交
310 311 312
		return ErrInsufficientFunds
	}

J
Jeffrey Wilcke 已提交
313
	intrGas := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
314
	if tx.Gas().Cmp(intrGas) < 0 {
O
obscuren 已提交
315 316 317 318
		return ErrIntrinsicGas
	}

	return nil
O
obscuren 已提交
319 320
}

321 322 323
// add validates a transaction and inserts it into the non-executable queue for
// later pending promotion and execution.
func (pool *TxPool) add(tx *types.Transaction) error {
P
Péter Szilágyi 已提交
324
	// If the transaction is already known, discard it
325
	hash := tx.Hash()
326 327
	if pool.all[hash] != nil {
		return fmt.Errorf("Known transaction: %x", hash[:4])
328
	}
329
	// Otherwise ensure basic validation passes and queue it up
330
	if err := pool.validateTx(tx); err != nil {
331
		invalidTxCounter.Inc(1)
332 333
		return err
	}
334
	pool.enqueueTx(hash, tx)
O
obscuren 已提交
335

336
	// Print a log message if low enough level is set
O
obscuren 已提交
337
	if glog.V(logger.Debug) {
338
		rcpt := "[NEW_CONTRACT]"
339
		if to := tx.To(); to != nil {
340
			rcpt = common.Bytes2Hex(to[:4])
341
		}
J
Jeffrey Wilcke 已提交
342
		from, _ := types.Sender(pool.signer, tx) // from already verified during tx validation
343
		glog.Infof("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value, hash)
O
obscuren 已提交
344
	}
345 346 347
	return nil
}

348
// enqueueTx inserts a new transaction into the non-executable transaction queue.
349 350 351 352
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
	// Try to insert the transaction into the future queue
J
Jeffrey Wilcke 已提交
353
	from, _ := types.Sender(pool.signer, tx) // already validated
354 355
	if pool.queue[from] == nil {
		pool.queue[from] = newTxList(false)
356
	}
357 358
	inserted, old := pool.queue[from].Add(tx)
	if !inserted {
359
		queuedDiscardCounter.Inc(1)
360 361 362 363 364
		return // An older transaction was better, discard this
	}
	// Discard any previous transaction and mark this
	if old != nil {
		delete(pool.all, old.Hash())
365
		queuedReplaceCounter.Inc(1)
366
	}
367
	pool.all[hash] = tx
368 369
}

370 371 372 373 374 375 376 377 378
// promoteTx adds a transaction to the pending (processable) list of transactions.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
	// Try to insert the transaction into the pending queue
	if pool.pending[addr] == nil {
		pool.pending[addr] = newTxList(true)
	}
	list := pool.pending[addr]
379

380 381 382 383
	inserted, old := list.Add(tx)
	if !inserted {
		// An older transaction was better, discard this
		delete(pool.all, hash)
384
		pendingDiscardCounter.Inc(1)
385
		return
386
	}
387 388 389
	// Otherwise discard any previous transaction and mark this
	if old != nil {
		delete(pool.all, old.Hash())
390
		pendingReplaceCounter.Inc(1)
391
	}
392
	pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
393

394
	// Set the potentially new pending nonce and notify any subsystems of the new tx
395
	pool.beats[addr] = time.Now()
396
	pool.pendingState.SetNonce(addr, tx.Nonce()+1)
397
	go pool.eventMux.Post(TxPreEvent{tx})
398 399
}

400
// Add queues a single transaction in the pool if it is valid.
401 402 403
func (pool *TxPool) Add(tx *types.Transaction) error {
	pool.mu.Lock()
	defer pool.mu.Unlock()
404

405
	if err := pool.add(tx); err != nil {
406
		return err
407
	}
408 409 410 411 412 413 414

	state, err := pool.currentState()
	if err != nil {
		return err
	}

	pool.promoteExecutables(state)
415

416
	return nil
417
}
418

419
// AddBatch attempts to queue a batch of transactions.
420
func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
421 422
	pool.mu.Lock()
	defer pool.mu.Unlock()
423

Z
zelig 已提交
424
	for _, tx := range txs {
425
		if err := pool.add(tx); err != nil {
426
			glog.V(logger.Debug).Infoln("tx error:", err)
Z
zelig 已提交
427 428
		}
	}
429 430 431 432 433 434 435 436 437

	state, err := pool.currentState()
	if err != nil {
		return err
	}

	pool.promoteExecutables(state)

	return nil
Z
zelig 已提交
438 439
}

440
// Get returns a transaction if it is contained in the pool
441
// and nil otherwise.
442 443 444
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
	pool.mu.RLock()
	defer pool.mu.RUnlock()
445

446
	return pool.all[hash]
447 448
}

449 450 451 452
// Remove removes the transaction with the given hash from the pool.
func (pool *TxPool) Remove(hash common.Hash) {
	pool.mu.Lock()
	defer pool.mu.Unlock()
453

454
	pool.removeTx(hash)
455 456
}

457 458
// RemoveBatch removes all given transactions from the pool.
func (pool *TxPool) RemoveBatch(txs types.Transactions) {
459 460
	pool.mu.Lock()
	defer pool.mu.Unlock()
461

462 463 464
	for _, tx := range txs {
		pool.removeTx(tx.Hash())
	}
465 466
}

467 468
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
469
func (pool *TxPool) removeTx(hash common.Hash) {
470 471 472 473 474
	// Fetch the transaction we wish to delete
	tx, ok := pool.all[hash]
	if !ok {
		return
	}
J
Jeffrey Wilcke 已提交
475
	addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
476

477
	// Remove it from the list of known transactions
478 479
	delete(pool.all, hash)

480 481 482
	// Remove the transaction from the pending lists and reset the account nonce
	if pending := pool.pending[addr]; pending != nil {
		if removed, invalids := pending.Remove(tx); removed {
483
			// If no more transactions are left, remove the list
484 485
			if pending.Empty() {
				delete(pool.pending, addr)
486
				delete(pool.beats, addr)
487
			} else {
488
				// Otherwise postpone any invalidated transactions
489 490 491 492
				for _, tx := range invalids {
					pool.enqueueTx(tx.Hash(), tx)
				}
			}
493 494 495 496
			// Update the account nonce if needed
			if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce {
				pool.pendingState.SetNonce(addr, tx.Nonce())
			}
497
		}
498
	}
499 500 501 502 503 504
	// Transaction is in the future queue
	if future := pool.queue[addr]; future != nil {
		future.Remove(tx)
		if future.Empty() {
			delete(pool.queue, addr)
		}
505 506 507
	}
}

508 509 510
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
511
func (pool *TxPool) promoteExecutables(state *state.StateDB) {
512
	// Iterate over all accounts and promote any executable transactions
513
	queued := uint64(0)
514 515 516
	for addr, list := range pool.queue {
		// Drop all transactions that are deemed too old (low nonce)
		for _, tx := range list.Forward(state.GetNonce(addr)) {
517
			if glog.V(logger.Debug) {
518 519 520
				glog.Infof("Removed old queued transaction: %v", tx)
			}
			delete(pool.all, tx.Hash())
521
		}
522 523 524
		// Drop all transactions that are too costly (low balance)
		drops, _ := list.Filter(state.GetBalance(addr))
		for _, tx := range drops {
525
			if glog.V(logger.Debug) {
526
				glog.Infof("Removed unpayable queued transaction: %v", tx)
527
			}
528
			delete(pool.all, tx.Hash())
529
			queuedNofundsCounter.Inc(1)
530
		}
531 532
		// Gather all executable transactions and promote them
		for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
533
			if glog.V(logger.Debug) {
534
				glog.Infof("Promoting queued transaction: %v", tx)
535
			}
536 537 538
			pool.promoteTx(addr, tx.Hash(), tx)
		}
		// Drop all transactions over the allowed limit
539
		for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
540
			if glog.V(logger.Debug) {
541
				glog.Infof("Removed cap-exceeding queued transaction: %v", tx)
542
			}
543
			delete(pool.all, tx.Hash())
544
			queuedRLCounter.Inc(1)
545
		}
546 547
		queued += uint64(list.Len())

548
		// Delete the entire queue entry if it became empty.
549 550
		if list.Empty() {
			delete(pool.queue, addr)
551 552
		}
	}
553 554 555 556 557 558
	// If the pending limit is overflown, start equalizing allowances
	pending := uint64(0)
	for _, list := range pool.pending {
		pending += uint64(list.Len())
	}
	if pending > maxPendingTotal {
559
		pendingBeforeCap := pending
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
		// Assemble a spam order to penalize large transactors first
		spammers := prque.New()
		for addr, list := range pool.pending {
			// Only evict transactions from high rollers
			if uint64(list.Len()) > minPendingPerAccount {
				// Skip local accounts as pools should maintain backlogs for themselves
				for _, tx := range list.txs.items {
					if !pool.localTx.contains(tx.Hash()) {
						spammers.Push(addr, float32(list.Len()))
					}
					break // Checking on transaction for locality is enough
				}
			}
		}
		// Gradually drop transactions from offenders
		offenders := []common.Address{}
		for pending > maxPendingTotal && !spammers.Empty() {
			// Retrieve the next offender if not local address
			offender, _ := spammers.Pop()
			offenders = append(offenders, offender.(common.Address))

			// Equalize balances until all the same or below threshold
			if len(offenders) > 1 {
				// Calculate the equalization threshold for all current offenders
				threshold := pool.pending[offender.(common.Address)].Len()

				// Iteratively reduce all offenders until below limit or threshold reached
				for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
					for i := 0; i < len(offenders)-1; i++ {
						list := pool.pending[offenders[i]]
						list.Cap(list.Len() - 1)
						pending--
					}
				}
			}
		}
		// If still above threshold, reduce to limit or min allowance
		if pending > maxPendingTotal && len(offenders) > 0 {
			for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount {
				for _, addr := range offenders {
					list := pool.pending[addr]
					list.Cap(list.Len() - 1)
					pending--
				}
			}
		}
606
		pendingRLCounter.Inc(int64(pendingBeforeCap - pending))
607
	}
608 609 610 611
	// If we've queued more transactions than the hard limit, drop oldest ones
	if queued > maxQueuedInTotal {
		// Sort all accounts with queued transactions by heartbeat
		addresses := make(addresssByHeartbeat, 0, len(pool.queue))
F
Felix Lange 已提交
612
		for addr := range pool.queue {
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
			addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
		}
		sort.Sort(addresses)

		// Drop transactions until the total is below the limit
		for drop := queued - maxQueuedInTotal; drop > 0; {
			addr := addresses[len(addresses)-1]
			list := pool.queue[addr.address]

			addresses = addresses[:len(addresses)-1]

			// Drop all transactions if they are less than the overflow
			if size := uint64(list.Len()); size <= drop {
				for _, tx := range list.Flatten() {
					pool.removeTx(tx.Hash())
				}
				drop -= size
630
				queuedRLCounter.Inc(int64(size))
631 632 633 634 635 636 637
				continue
			}
			// Otherwise drop only last few transactions
			txs := list.Flatten()
			for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
				pool.removeTx(txs[i].Hash())
				drop--
638
				queuedRLCounter.Inc(1)
639 640 641
			}
		}
	}
642
}
643

644 645 646
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
647
func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
648 649 650
	// Iterate over all accounts and demote any non-executable transactions
	for addr, list := range pool.pending {
		nonce := state.GetNonce(addr)
651

652 653
		// Drop all transactions that are deemed too old (low nonce)
		for _, tx := range list.Forward(nonce) {
654
			if glog.V(logger.Debug) {
655
				glog.Infof("Removed old pending transaction: %v", tx)
656
			}
657 658 659 660 661
			delete(pool.all, tx.Hash())
		}
		// Drop all transactions that are too costly (low balance), and queue any invalids back for later
		drops, invalids := list.Filter(state.GetBalance(addr))
		for _, tx := range drops {
662
			if glog.V(logger.Debug) {
663
				glog.Infof("Removed unpayable pending transaction: %v", tx)
664
			}
665
			delete(pool.all, tx.Hash())
666
			pendingNofundsCounter.Inc(1)
667
		}
668
		for _, tx := range invalids {
669
			if glog.V(logger.Debug) {
670
				glog.Infof("Demoting pending transaction: %v", tx)
671
			}
672 673 674 675 676
			pool.enqueueTx(tx.Hash(), tx)
		}
		// Delete the entire queue entry if it became empty.
		if list.Empty() {
			delete(pool.pending, addr)
677
			delete(pool.beats, addr)
678 679 680
		}
	}
}
681

682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
// expirationLoop is a loop that periodically iterates over all accounts with
// queued transactions and drop all that have been inactive for a prolonged amount
// of time.
func (pool *TxPool) expirationLoop() {
	defer pool.wg.Done()

	evict := time.NewTicker(evictionInterval)
	defer evict.Stop()

	for {
		select {
		case <-evict.C:
			pool.mu.Lock()
			for addr := range pool.queue {
				if time.Since(pool.beats[addr]) > maxQueuedLifetime {
					for _, tx := range pool.queue[addr].Flatten() {
						pool.removeTx(tx.Hash())
					}
				}
			}
			pool.mu.Unlock()

		case <-pool.quit:
			return
		}
	}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
	address   common.Address
	heartbeat time.Time
}

type addresssByHeartbeat []addressByHeartbeat

func (a addresssByHeartbeat) Len() int           { return len(a) }
func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
func (a addresssByHeartbeat) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }

722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766
// txSet represents a set of transaction hashes in which entries
//  are automatically dropped after txSetDuration time
type txSet struct {
	txMap          map[common.Hash]struct{}
	txOrd          map[uint64]txOrdType
	addPtr, delPtr uint64
}

const txSetDuration = time.Hour * 2

// txOrdType represents an entry in the time-ordered list of transaction hashes
type txOrdType struct {
	hash common.Hash
	time time.Time
}

// newTxSet creates a new transaction set
func newTxSet() *txSet {
	return &txSet{
		txMap: make(map[common.Hash]struct{}),
		txOrd: make(map[uint64]txOrdType),
	}
}

// contains returns true if the set contains the given transaction hash
// (not thread safe, should be called from a locked environment)
func (self *txSet) contains(hash common.Hash) bool {
	_, ok := self.txMap[hash]
	return ok
}

// add adds a transaction hash to the set, then removes entries older than txSetDuration
// (not thread safe, should be called from a locked environment)
func (self *txSet) add(hash common.Hash) {
	self.txMap[hash] = struct{}{}
	now := time.Now()
	self.txOrd[self.addPtr] = txOrdType{hash: hash, time: now}
	self.addPtr++
	delBefore := now.Add(-txSetDuration)
	for self.delPtr < self.addPtr && self.txOrd[self.delPtr].time.Before(delBefore) {
		delete(self.txMap, self.txOrd[self.delPtr].hash)
		delete(self.txOrd, self.delPtr)
		self.delPtr++
	}
}