transaction_pool.go 8.3 KB
Newer Older
O
obscuren 已提交
1
package core
O
obscuren 已提交
2 3

import (
4
	"errors"
O
obscuren 已提交
5
	"fmt"
O
obscuren 已提交
6
	"math/big"
7
	"sort"
8
	"sync"
9
	"time"
10

O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
12
	"github.com/ethereum/go-ethereum/core/state"
13
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
14
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
15
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
16
	"github.com/ethereum/go-ethereum/logger/glog"
17
	"gopkg.in/fatih/set.v0"
O
obscuren 已提交
18 19
)

20
var (
O
obscuren 已提交
21
	ErrInvalidSender      = errors.New("Invalid sender")
22
	ErrNonce              = errors.New("Nonce too low")
23
	ErrBalance            = errors.New("Insufficient balance")
24
	ErrNonExistentAccount = errors.New("Account does not exist or account balance too low")
25
	ErrInsufficientFunds  = errors.New("Insufficient funds for gas * price + value")
O
obscuren 已提交
26
	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
27
	ErrGasLimit           = errors.New("Exceeds block gas limit")
28
	ErrNegativeValue      = errors.New("Negative value")
29
)
Z
zelig 已提交
30

31
const txPoolQueueSize = 50
O
obscuren 已提交
32

33
type TxPoolHook chan *types.Transaction
34
type TxMsg struct{ Tx *types.Transaction }
O
obscuren 已提交
35

36 37
type stateFn func() *state.StateDB

O
obscuren 已提交
38
const (
39
	minGasPrice = 1000000
O
obscuren 已提交
40 41
)

O
obscuren 已提交
42
type TxProcessor interface {
43
	ProcessTransaction(tx *types.Transaction)
O
obscuren 已提交
44 45
}

O
obscuren 已提交
46 47
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
48
// independently read without needing access to the actual pool.
O
obscuren 已提交
49
type TxPool struct {
50
	mu sync.RWMutex
O
obscuren 已提交
51 52
	// Queueing channel for reading and writing incoming
	// transactions to
53
	queueChan chan *types.Transaction
O
obscuren 已提交
54 55
	// Quiting channel
	quit chan bool
O
obscuren 已提交
56
	// The state function which will allow us to do some pre checkes
57
	currentState stateFn
58 59
	// The current gas limit function callback
	gasLimit func() *big.Int
O
obscuren 已提交
60
	// The actual pool
61 62
	txs           map[common.Hash]*types.Transaction
	invalidHashes *set.Set
O
obscuren 已提交
63

64 65
	queue map[common.Address]types.Transactions

O
obscuren 已提交
66
	subscribers []chan TxMsg
O
obscuren 已提交
67

68
	eventMux *event.TypeMux
O
obscuren 已提交
69 70
}

71
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
72
	txPool := &TxPool{
73
		txs:           make(map[common.Hash]*types.Transaction),
74
		queue:         make(map[common.Address]types.Transactions),
75 76 77 78
		queueChan:     make(chan *types.Transaction, txPoolQueueSize),
		quit:          make(chan bool),
		eventMux:      eventMux,
		invalidHashes: set.New(),
O
obscuren 已提交
79
		currentState:  currentStateFn,
80
		gasLimit:      gasLimitFn,
O
obscuren 已提交
81
	}
82 83 84 85
	return txPool
}

func (pool *TxPool) Start() {
86 87 88 89 90
	// Queue timer will tick so we can attempt to move items from the queue to the
	// main transaction pool.
	queueTimer := time.NewTicker(300 * time.Millisecond)
	// Removal timer will tick and attempt to remove bad transactions (account.nonce>tx.nonce)
	removalTimer := time.NewTicker(1 * time.Second)
91 92 93
done:
	for {
		select {
94
		case <-queueTimer.C:
95
			pool.checkQueue()
96 97
		case <-removalTimer.C:
			pool.validatePool()
98 99 100 101
		case <-pool.quit:
			break done
		}
	}
O
obscuren 已提交
102 103
}

104
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
105
	// Validate sender
O
obscuren 已提交
106 107 108 109 110 111
	var (
		from common.Address
		err  error
	)

	if from, err = tx.From(); err != nil {
F
Felix Lange 已提交
112
		return ErrInvalidSender
O
obscuren 已提交
113
	}
O
obscuren 已提交
114

115
	// Validate curve param
O
obscuren 已提交
116 117
	v, _, _ := tx.Curve()
	if v > 28 || v < 27 {
118
		return fmt.Errorf("tx.v != (28 || 27) => %v", v)
O
obscuren 已提交
119
	}
120

O
obscuren 已提交
121 122
	if !pool.currentState().HasAccount(from) {
		return ErrNonExistentAccount
123
	}
O
obscuren 已提交
124

125 126 127 128
	if pool.gasLimit().Cmp(tx.GasLimit) < 0 {
		return ErrGasLimit
	}

129 130 131 132
	if tx.Amount.Cmp(common.Big0) < 0 {
		return ErrNegativeValue
	}

133 134 135
	total := new(big.Int).Mul(tx.Price, tx.GasLimit)
	total.Add(total, tx.Value())
	if pool.currentState().GetBalance(from).Cmp(total) < 0 {
O
obscuren 已提交
136 137 138 139 140 141 142 143
		return ErrInsufficientFunds
	}

	if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
		return ErrIntrinsicGas
	}

	if pool.currentState().GetNonce(from) > tx.Nonce() {
144
		return ErrNonce
O
obscuren 已提交
145 146 147
	}

	return nil
O
obscuren 已提交
148 149
}

150
func (self *TxPool) add(tx *types.Transaction) error {
151
	hash := tx.Hash()
152

O
obscuren 已提交
153 154
	/* XXX I'm unsure about this. This is extremely dangerous and may result
	 in total black listing of certain transactions
155 156 157
	if self.invalidHashes.Has(hash) {
		return fmt.Errorf("Invalid transaction (%x)", hash[:4])
	}
O
obscuren 已提交
158
	*/
159
	if self.txs[hash] != nil {
160
		return fmt.Errorf("Known transaction (%x)", hash[:4])
161
	}
162 163 164 165 166
	err := self.ValidateTransaction(tx)
	if err != nil {
		return err
	}

167
	self.queueTx(tx)
168

169
	var toname string
170
	if to := tx.To(); to != nil {
171
		toname = common.Bytes2Hex(to[:4])
172
	} else {
173
		toname = "[NEW_CONTRACT]"
174
	}
175 176 177 178
	// we can ignore the error here because From is
	// verified in ValidateTransaction.
	f, _ := tx.From()
	from := common.Bytes2Hex(f[:4])
O
obscuren 已提交
179 180 181 182

	if glog.V(logger.Debug) {
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
	}
183 184 185 186

	return nil
}

187
func (self *TxPool) Size() int {
O
Merge  
obscuren 已提交
188
	return len(self.txs)
189 190
}

191 192 193
func (self *TxPool) Add(tx *types.Transaction) error {
	self.mu.Lock()
	defer self.mu.Unlock()
194

195 196
	return self.add(tx)
}
197

Z
zelig 已提交
198
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
199 200 201
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
202
	for _, tx := range txs {
203
		if err := self.add(tx); err != nil {
204
			glog.V(logger.Debug).Infoln("tx error:", err)
Z
zelig 已提交
205
		} else {
206
			h := tx.Hash()
O
obscuren 已提交
207
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
Z
zelig 已提交
208 209 210 211
		}
	}
}

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
// GetTransaction allows you to check the pending and queued transaction in the
// transaction pool.
// It has two stategies, first check the pool (map) then check the queue
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
	// check the txs first
	if tx, ok := tp.txs[hash]; ok {
		return tx
	}

	// check queue
	for _, txs := range tp.queue {
		for _, tx := range txs {
			if tx.Hash() == hash {
				return tx
			}
		}
	}

	return nil
}

O
Merge  
obscuren 已提交
233
func (self *TxPool) GetTransactions() (txs types.Transactions) {
234 235 236
	self.mu.RLock()
	defer self.mu.RUnlock()

O
Merge  
obscuren 已提交
237
	txs = make(types.Transactions, self.Size())
O
obscuren 已提交
238
	i := 0
O
Merge  
obscuren 已提交
239 240
	for _, tx := range self.txs {
		txs[i] = tx
O
obscuren 已提交
241
		i++
O
Merge  
obscuren 已提交
242
	}
243

O
Merge  
obscuren 已提交
244
	return
245 246
}

247 248 249 250 251 252 253 254 255 256 257 258
func (self *TxPool) GetQueuedTransactions() types.Transactions {
	self.mu.RLock()
	defer self.mu.RUnlock()

	var txs types.Transactions
	for _, ts := range self.queue {
		txs = append(txs, ts...)
	}

	return txs
}

259
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
260 261
	self.mu.Lock()
	defer self.mu.Unlock()
262

263
	for _, tx := range txs {
O
obscuren 已提交
264
		self.removeTx(tx.Hash())
265 266 267
	}
}

268
func (pool *TxPool) Flush() {
269
	pool.txs = make(map[common.Hash]*types.Transaction)
O
obscuren 已提交
270 271 272 273
}

func (pool *TxPool) Stop() {
	pool.Flush()
274
	close(pool.quit)
O
obscuren 已提交
275

O
obscuren 已提交
276
	glog.V(logger.Info).Infoln("TX Pool stopped")
O
obscuren 已提交
277
}
278

279 280 281 282 283 284 285 286
func (self *TxPool) queueTx(tx *types.Transaction) {
	from, _ := tx.From()
	self.queue[from] = append(self.queue[from], tx)
}

func (pool *TxPool) addTx(tx *types.Transaction) {
	if _, ok := pool.txs[tx.Hash()]; !ok {
		pool.txs[tx.Hash()] = tx
287 288 289 290
		// Notify the subscribers. This event is posted in a goroutine
		// because it's possible that somewhere during the post "Remove transaction"
		// gets called which will then wait for the global tx pool lock and deadlock.
		go pool.eventMux.Post(TxPreEvent{tx})
291 292 293
	}
}

294 295 296 297 298
// check queue will attempt to insert
func (pool *TxPool) checkQueue() {
	pool.mu.Lock()
	defer pool.mu.Unlock()

299
	statedb := pool.currentState()
300 301 302 303
	for address, txs := range pool.queue {
		sort.Sort(types.TxByNonce{txs})

		var (
304
			nonce = statedb.GetNonce(address)
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
			start int
		)
		// Clean up the transactions first and determine the start of the nonces
		for _, tx := range txs {
			if tx.Nonce() >= nonce {
				break
			}
			start++
		}
		pool.queue[address] = txs[start:]

		// expected nonce
		enonce := nonce
		for _, tx := range pool.queue[address] {
			// If the expected nonce does not match up with the next one
			// (i.e. a nonce gap), we stop the loop
			if enonce != tx.Nonce() {
				break
			}
			enonce++

326
			pool.addTx(tx)
327 328 329 330 331 332 333
		}
		// delete the entire queue entry if it's empty. There's no need to keep it
		if len(pool.queue[address]) == 0 {
			delete(pool.queue, address)
		}
	}
}
334

335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355
func (pool *TxPool) removeTx(hash common.Hash) {
	// delete from pending pool
	delete(pool.txs, hash)

	// delete from queue
out:
	for address, txs := range pool.queue {
		for i, tx := range txs {
			if tx.Hash() == hash {
				if len(txs) == 1 {
					// if only one tx, remove entire address entry
					delete(pool.queue, address)
				} else {
					pool.queue[address][len(txs)-1], pool.queue[address] = nil, append(txs[:i], txs[i+1:]...)
				}
				break out
			}
		}
	}
}

356 357 358 359 360
func (pool *TxPool) validatePool() {
	pool.mu.Lock()
	defer pool.mu.Unlock()

	for hash, tx := range pool.txs {
361 362 363
		if err := pool.ValidateTransaction(tx); err != nil {
			if glog.V(logger.Info) {
				glog.Infof("removed tx (%x) from pool: %v\n", hash[:4], err)
364 365
			}

366
			pool.removeTx(hash)
367 368 369
		}
	}
}