transaction_pool.go 5.7 KB
Newer Older
O
obscuren 已提交
1
package core
O
obscuren 已提交
2 3 4 5 6

import (
	"bytes"
	"container/list"
	"fmt"
7 8 9
	"math/big"
	"sync"

O
obscuren 已提交
10
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
12
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
13
	"github.com/ethereum/go-ethereum/state"
O
obscuren 已提交
14 15
)

O
obscuren 已提交
16
var txplogger = logger.NewLogger("TXP")
Z
zelig 已提交
17

18
const txPoolQueueSize = 50
O
obscuren 已提交
19

20
type TxPoolHook chan *types.Transaction
Z
zelig 已提交
21 22 23
type TxMsg struct {
	Tx *types.Transaction
}
O
obscuren 已提交
24 25

const (
26
	minGasPrice = 1000000
O
obscuren 已提交
27 28
)

29 30
var MinGasPrice = big.NewInt(10000000000000)

31
func EachTx(pool *list.List, it func(*types.Transaction, *list.Element) bool) {
32
	for e := pool.Front(); e != nil; e = e.Next() {
33
		if it(e.Value.(*types.Transaction), e) {
34 35 36 37 38
			break
		}
	}
}

39
func FindTx(pool *list.List, finder func(*types.Transaction, *list.Element) bool) *types.Transaction {
O
obscuren 已提交
40
	for e := pool.Front(); e != nil; e = e.Next() {
41
		if tx, ok := e.Value.(*types.Transaction); ok {
O
obscuren 已提交
42 43 44 45 46 47 48 49 50
			if finder(tx, e) {
				return tx
			}
		}
	}

	return nil
}

O
obscuren 已提交
51
type TxProcessor interface {
52
	ProcessTransaction(tx *types.Transaction)
O
obscuren 已提交
53 54
}

O
obscuren 已提交
55 56 57 58 59 60 61 62 63 64
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
// independently read without needing access to the actual pool. If the
// pool is being drained or synced for whatever reason the transactions
// will simple queue up and handled when the mutex is freed.
type TxPool struct {
	// The mutex for accessing the Tx pool.
	mutex sync.Mutex
	// Queueing channel for reading and writing incoming
	// transactions to
65
	queueChan chan *types.Transaction
O
obscuren 已提交
66 67 68 69 70
	// Quiting channel
	quit chan bool
	// The actual pool
	pool *list.List

O
obscuren 已提交
71
	SecondaryProcessor TxProcessor
O
obscuren 已提交
72 73

	subscribers []chan TxMsg
O
obscuren 已提交
74 75 76 77

	broadcaster  types.Broadcaster
	chainManager *ChainManager
	eventMux     *event.TypeMux
O
obscuren 已提交
78 79
}

O
obscuren 已提交
80
func NewTxPool(chainManager *ChainManager, broadcaster types.Broadcaster, eventMux *event.TypeMux) *TxPool {
O
obscuren 已提交
81
	return &TxPool{
O
obscuren 已提交
82 83 84 85 86 87
		pool:         list.New(),
		queueChan:    make(chan *types.Transaction, txPoolQueueSize),
		quit:         make(chan bool),
		chainManager: chainManager,
		eventMux:     eventMux,
		broadcaster:  broadcaster,
O
obscuren 已提交
88 89 90 91
	}
}

// Blocking function. Don't use directly. Use QueueTransaction instead
92
func (pool *TxPool) addTransaction(tx *types.Transaction) {
O
obscuren 已提交
93
	pool.mutex.Lock()
O
obscuren 已提交
94 95
	defer pool.mutex.Unlock()

O
obscuren 已提交
96 97 98
	pool.pool.PushBack(tx)

	// Broadcast the transaction to the rest of the peers
Z
zelig 已提交
99
	pool.Ethereum.EventMux().Post(TxPreEvent{tx})
O
obscuren 已提交
100 101
}

102
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
O
obscuren 已提交
103 104
	// Get the last block so we can retrieve the sender and receiver from
	// the merkle trie
O
obscuren 已提交
105
	block := pool.chainManager.CurrentBlock
O
obscuren 已提交
106 107
	// Something has gone horribly wrong if this happens
	if block == nil {
O
obscuren 已提交
108
		return fmt.Errorf("No last block on the block chain")
O
obscuren 已提交
109 110
	}

111
	if len(tx.Recipient) != 0 && len(tx.Recipient) != 20 {
O
obscuren 已提交
112 113 114
		return fmt.Errorf("Invalid recipient. len = %d", len(tx.Recipient))
	}

O
obscuren 已提交
115 116
	v, _, _ := tx.Curve()
	if v > 28 || v < 27 {
O
obscuren 已提交
117
		return fmt.Errorf("tx.v != (28 || 27)")
O
obscuren 已提交
118 119
	}

O
obscuren 已提交
120
	// Get the sender
O
obscuren 已提交
121
	sender := pool.chainManager.State().GetAccount(tx.Sender())
O
obscuren 已提交
122

O
obscuren 已提交
123
	totAmount := new(big.Int).Set(tx.Value)
O
obscuren 已提交
124 125
	// Make sure there's enough in the sender's account. Having insufficient
	// funds won't invalidate this transaction but simple ignores it.
O
obscuren 已提交
126
	if sender.Balance().Cmp(totAmount) < 0 {
O
obscuren 已提交
127
		return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.Sender())
O
obscuren 已提交
128 129
	}

130 131
	if tx.IsContract() {
		if tx.GasPrice.Cmp(big.NewInt(minGasPrice)) < 0 {
O
obscuren 已提交
132
			return fmt.Errorf("Gasprice too low, %s given should be at least %d.", tx.GasPrice, minGasPrice)
133 134 135
		}
	}

O
obscuren 已提交
136 137 138 139 140 141
	// Increment the nonce making each tx valid only once to prevent replay
	// attacks

	return nil
}

O
obscuren 已提交
142
func (self *TxPool) Add(tx *types.Transaction) error {
143
	hash := tx.Hash()
O
obscuren 已提交
144
	foundTx := FindTx(self.pool, func(tx *types.Transaction, e *list.Element) bool {
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
		return bytes.Compare(tx.Hash(), hash) == 0
	})

	if foundTx != nil {
		return fmt.Errorf("Known transaction (%x)", hash[0:4])
	}

	err := self.ValidateTransaction(tx)
	if err != nil {
		return err
	}

	self.addTransaction(tx)

	tmp := make([]byte, 4)
	copy(tmp, tx.Recipient)

	txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash())

	// Notify the subscribers
O
obscuren 已提交
165
	go self.eventMux.Post(TxPreEvent{tx})
166 167 168 169

	return nil
}

170 171 172 173
func (self *TxPool) Size() int {
	return self.pool.Len()
}

Z
zelig 已提交
174 175 176 177 178 179 180 181 182 183 184
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
	for _, tx := range txs {
		if err := self.Add(tx); err != nil {
			txplogger.Infoln(err)
		} else {
			txplogger.Infof("tx %x\n", tx.Hash()[0:4])
		}
	}
}

func (pool *TxPool) GetTransactions() []*types.Transaction {
O
obscuren 已提交
185 186 187
	pool.mutex.Lock()
	defer pool.mutex.Unlock()

188
	txList := make([]*types.Transaction, pool.pool.Len())
O
obscuren 已提交
189 190
	i := 0
	for e := pool.pool.Front(); e != nil; e = e.Next() {
191
		tx := e.Value.(*types.Transaction)
192 193

		txList[i] = tx
O
obscuren 已提交
194 195 196 197

		i++
	}

198 199 200
	return txList
}

O
obscuren 已提交
201
func (pool *TxPool) RemoveInvalid(state *state.StateDB) {
202 203 204
	pool.mutex.Lock()
	defer pool.mutex.Unlock()

205
	for e := pool.pool.Front(); e != nil; e = e.Next() {
206
		tx := e.Value.(*types.Transaction)
207 208
		sender := state.GetAccount(tx.Sender())
		err := pool.ValidateTransaction(tx)
O
obscuren 已提交
209
		if err != nil || sender.Nonce >= tx.Nonce {
210 211 212 213 214
			pool.pool.Remove(e)
		}
	}
}

215
func (self *TxPool) RemoveSet(txs types.Transactions) {
216 217 218 219
	self.mutex.Lock()
	defer self.mutex.Unlock()

	for _, tx := range txs {
220
		EachTx(self.pool, func(t *types.Transaction, element *list.Element) bool {
221 222 223 224 225 226 227 228 229
			if t == tx {
				self.pool.Remove(element)
				return true // To stop the loop
			}
			return false
		})
	}
}

230
func (pool *TxPool) Flush() []*types.Transaction {
Z
zelig 已提交
231
	txList := pool.GetTransactions()
232

O
obscuren 已提交
233 234 235 236 237 238 239 240
	// Recreate a new list all together
	// XXX Is this the fastest way?
	pool.pool = list.New()

	return txList
}

func (pool *TxPool) Start() {
O
obscuren 已提交
241
	//go pool.queueHandler()
O
obscuren 已提交
242 243 244 245
}

func (pool *TxPool) Stop() {
	pool.Flush()
O
obscuren 已提交
246

Z
zelig 已提交
247
	txplogger.Infoln("Stopped")
O
obscuren 已提交
248
}