transaction_pool.go 7.8 KB
Newer Older
O
obscuren 已提交
1
package core
O
obscuren 已提交
2 3

import (
4
	"errors"
O
obscuren 已提交
5
	"fmt"
O
obscuren 已提交
6
	"math/big"
7
	"sort"
8
	"sync"
9
	"time"
10

O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
12
	"github.com/ethereum/go-ethereum/core/state"
13
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
14
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
15
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
16
	"github.com/ethereum/go-ethereum/logger/glog"
17
	"gopkg.in/fatih/set.v0"
O
obscuren 已提交
18 19
)

20
var (
O
obscuren 已提交
21
	ErrInvalidSender      = errors.New("Invalid sender")
22
	ErrNonce              = errors.New("Nonce too low")
23
	ErrBalance            = errors.New("Insufficient balance")
O
obscuren 已提交
24
	ErrNonExistentAccount = errors.New("Account does not exist")
25
	ErrInsufficientFunds  = errors.New("Insufficient funds for gas * price + value")
O
obscuren 已提交
26
	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
27
	ErrGasLimit           = errors.New("Exceeds block gas limit")
28
)
Z
zelig 已提交
29

30
const txPoolQueueSize = 50
O
obscuren 已提交
31

32
type TxPoolHook chan *types.Transaction
33
type TxMsg struct{ Tx *types.Transaction }
O
obscuren 已提交
34

35 36
type stateFn func() *state.StateDB

O
obscuren 已提交
37
const (
38
	minGasPrice = 1000000
O
obscuren 已提交
39 40
)

O
obscuren 已提交
41
type TxProcessor interface {
42
	ProcessTransaction(tx *types.Transaction)
O
obscuren 已提交
43 44
}

O
obscuren 已提交
45 46
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
47
// independently read without needing access to the actual pool.
O
obscuren 已提交
48
type TxPool struct {
49
	mu sync.RWMutex
O
obscuren 已提交
50 51
	// Queueing channel for reading and writing incoming
	// transactions to
52
	queueChan chan *types.Transaction
O
obscuren 已提交
53 54
	// Quiting channel
	quit chan bool
O
obscuren 已提交
55
	// The state function which will allow us to do some pre checkes
56
	currentState stateFn
57 58
	// The current gas limit function callback
	gasLimit func() *big.Int
O
obscuren 已提交
59
	// The actual pool
60 61
	txs           map[common.Hash]*types.Transaction
	invalidHashes *set.Set
O
obscuren 已提交
62

63 64
	queue map[common.Address]types.Transactions

O
obscuren 已提交
65
	subscribers []chan TxMsg
O
obscuren 已提交
66

67
	eventMux *event.TypeMux
O
obscuren 已提交
68 69
}

70
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
71
	txPool := &TxPool{
72
		txs:           make(map[common.Hash]*types.Transaction),
73
		queue:         make(map[common.Address]types.Transactions),
74 75 76 77
		queueChan:     make(chan *types.Transaction, txPoolQueueSize),
		quit:          make(chan bool),
		eventMux:      eventMux,
		invalidHashes: set.New(),
O
obscuren 已提交
78
		currentState:  currentStateFn,
79
		gasLimit:      gasLimitFn,
O
obscuren 已提交
80
	}
81 82 83 84
	return txPool
}

func (pool *TxPool) Start() {
85 86 87 88 89
	// Queue timer will tick so we can attempt to move items from the queue to the
	// main transaction pool.
	queueTimer := time.NewTicker(300 * time.Millisecond)
	// Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
	removalTimer := time.NewTicker(1 * time.Second)
90 91 92
done:
	for {
		select {
93
		case <-queueTimer.C:
94
			pool.checkQueue()
95 96
		case <-removalTimer.C:
			pool.validatePool()
97 98 99 100
		case <-pool.quit:
			break done
		}
	}
O
obscuren 已提交
101 102
}

103
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
104
	// Validate sender
O
obscuren 已提交
105 106 107 108 109 110
	var (
		from common.Address
		err  error
	)

	if from, err = tx.From(); err != nil {
F
Felix Lange 已提交
111
		return ErrInvalidSender
O
obscuren 已提交
112
	}
O
obscuren 已提交
113

114
	// Validate curve param
O
obscuren 已提交
115 116
	v, _, _ := tx.Curve()
	if v > 28 || v < 27 {
117
		return fmt.Errorf("tx.v != (28 || 27) => %v", v)
O
obscuren 已提交
118
	}
119

O
obscuren 已提交
120 121
	if !pool.currentState().HasAccount(from) {
		return ErrNonExistentAccount
122
	}
O
obscuren 已提交
123

124 125 126 127
	if pool.gasLimit().Cmp(tx.GasLimit) < 0 {
		return ErrGasLimit
	}

128 129 130
	total := new(big.Int).Mul(tx.Price, tx.GasLimit)
	total.Add(total, tx.Value())
	if pool.currentState().GetBalance(from).Cmp(total) < 0 {
O
obscuren 已提交
131 132 133 134 135 136 137 138
		return ErrInsufficientFunds
	}

	if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
		return ErrIntrinsicGas
	}

	if pool.currentState().GetNonce(from) > tx.Nonce() {
139
		return ErrNonce
O
obscuren 已提交
140 141 142
	}

	return nil
O
obscuren 已提交
143 144
}

145
func (self *TxPool) add(tx *types.Transaction) error {
146
	hash := tx.Hash()
147

O
obscuren 已提交
148 149
	/* XXX I'm unsure about this. This is extremely dangerous and may result
	 in total black listing of certain transactions
150 151 152
	if self.invalidHashes.Has(hash) {
		return fmt.Errorf("Invalid transaction (%x)", hash[:4])
	}
O
obscuren 已提交
153
	*/
154
	if self.txs[hash] != nil {
155
		return fmt.Errorf("Known transaction (%x)", hash[:4])
156
	}
157 158 159 160 161
	err := self.ValidateTransaction(tx)
	if err != nil {
		return err
	}

162
	self.queueTx(tx)
163

164
	var toname string
165
	if to := tx.To(); to != nil {
166
		toname = common.Bytes2Hex(to[:4])
167
	} else {
168
		toname = "[NEW_CONTRACT]"
169
	}
170 171 172 173
	// we can ignore the error here because From is
	// verified in ValidateTransaction.
	f, _ := tx.From()
	from := common.Bytes2Hex(f[:4])
O
obscuren 已提交
174 175 176 177

	if glog.V(logger.Debug) {
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
	}
178 179 180 181

	return nil
}

182
func (self *TxPool) Size() int {
O
Merge  
obscuren 已提交
183
	return len(self.txs)
184 185
}

186 187 188
func (self *TxPool) Add(tx *types.Transaction) error {
	self.mu.Lock()
	defer self.mu.Unlock()
189

190 191
	return self.add(tx)
}
192

Z
zelig 已提交
193
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
194 195 196
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
197
	for _, tx := range txs {
198
		if err := self.add(tx); err != nil {
199
			glog.V(logger.Debug).Infoln("tx error:", err)
Z
zelig 已提交
200
		} else {
201
			h := tx.Hash()
O
obscuren 已提交
202
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
Z
zelig 已提交
203 204 205 206
		}
	}
}

O
Merge  
obscuren 已提交
207
func (self *TxPool) GetTransactions() (txs types.Transactions) {
208 209 210
	self.mu.RLock()
	defer self.mu.RUnlock()

O
Merge  
obscuren 已提交
211
	txs = make(types.Transactions, self.Size())
O
obscuren 已提交
212
	i := 0
O
Merge  
obscuren 已提交
213 214
	for _, tx := range self.txs {
		txs[i] = tx
O
obscuren 已提交
215
		i++
O
Merge  
obscuren 已提交
216
	}
217

O
Merge  
obscuren 已提交
218
	return
219 220
}

221 222 223 224 225 226 227 228 229 230 231 232
func (self *TxPool) GetQueuedTransactions() types.Transactions {
	self.mu.RLock()
	defer self.mu.RUnlock()

	var txs types.Transactions
	for _, ts := range self.queue {
		txs = append(txs, ts...)
	}

	return txs
}

233
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
234 235
	self.mu.Lock()
	defer self.mu.Unlock()
236

237
	for _, tx := range txs {
238
		delete(self.txs, tx.Hash())
239 240 241
	}
}

242
func (pool *TxPool) Flush() {
243
	pool.txs = make(map[common.Hash]*types.Transaction)
O
obscuren 已提交
244 245 246 247
}

func (pool *TxPool) Stop() {
	pool.Flush()
248
	close(pool.quit)
O
obscuren 已提交
249

O
obscuren 已提交
250
	glog.V(logger.Info).Infoln("TX Pool stopped")
O
obscuren 已提交
251
}
252

253 254 255 256 257 258 259 260
func (self *TxPool) queueTx(tx *types.Transaction) {
	from, _ := tx.From()
	self.queue[from] = append(self.queue[from], tx)
}

func (pool *TxPool) addTx(tx *types.Transaction) {
	if _, ok := pool.txs[tx.Hash()]; !ok {
		pool.txs[tx.Hash()] = tx
261 262 263 264
		// Notify the subscribers. This event is posted in a goroutine
		// because it's possible that somewhere during the post "Remove transaction"
		// gets called which will then wait for the global tx pool lock and deadlock.
		go pool.eventMux.Post(TxPreEvent{tx})
265 266 267
	}
}

268 269 270 271 272
// check queue will attempt to insert
func (pool *TxPool) checkQueue() {
	pool.mu.Lock()
	defer pool.mu.Unlock()

273
	statedb := pool.currentState()
274 275 276 277
	for address, txs := range pool.queue {
		sort.Sort(types.TxByNonce{txs})

		var (
278
			nonce = statedb.GetNonce(address)
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
			start int
		)
		// Clean up the transactions first and determine the start of the nonces
		for _, tx := range txs {
			if tx.Nonce() >= nonce {
				break
			}
			start++
		}
		pool.queue[address] = txs[start:]

		// expected nonce
		enonce := nonce
		for _, tx := range pool.queue[address] {
			// If the expected nonce does not match up with the next one
			// (i.e. a nonce gap), we stop the loop
			if enonce != tx.Nonce() {
				break
			}
			enonce++

300
			pool.addTx(tx)
301 302 303 304 305 306 307
		}
		// delete the entire queue entry if it's empty. There's no need to keep it
		if len(pool.queue[address]) == 0 {
			delete(pool.queue, address)
		}
	}
}
308

309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
func (pool *TxPool) removeTx(hash common.Hash) {
	// delete from pending pool
	delete(pool.txs, hash)

	// delete from queue
out:
	for address, txs := range pool.queue {
		for i, tx := range txs {
			if tx.Hash() == hash {
				if len(txs) == 1 {
					// if only one tx, remove entire address entry
					delete(pool.queue, address)
				} else {
					pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...)
				}
				break out
			}
		}
	}
}

330 331 332 333 334
func (pool *TxPool) validatePool() {
	pool.mu.Lock()
	defer pool.mu.Unlock()

	for hash, tx := range pool.txs {
335 336 337
		if err := pool.ValidateTransaction(tx); err != nil {
			if glog.V(logger.Info) {
				glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
338 339
			}

340
			pool.removeTx(hash)
341 342 343
		}
	}
}