transaction_pool.go 5.9 KB
Newer Older
O
obscuren 已提交
1
package core
O
obscuren 已提交
2 3

import (
4
	"errors"
O
obscuren 已提交
5
	"fmt"
O
obscuren 已提交
6
	"math/big"
7
	"sort"
8
	"sync"
9
	"time"
10

O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
12
	"github.com/ethereum/go-ethereum/core/state"
13
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
14
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
15
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
16
	"github.com/ethereum/go-ethereum/logger/glog"
17
	"gopkg.in/fatih/set.v0"
O
obscuren 已提交
18 19
)

20
var (
O
obscuren 已提交
21
	ErrInvalidSender      = errors.New("Invalid sender")
22
	ErrNonce              = errors.New("Nonce too low")
O
obscuren 已提交
23 24 25
	ErrNonExistentAccount = errors.New("Account does not exist")
	ErrInsufficientFunds  = errors.New("Insufficient funds")
	ErrIntrinsicGas       = errors.New("Intrinsic gas too low")
26
)
Z
zelig 已提交
27

28
const txPoolQueueSize = 50
O
obscuren 已提交
29

30
type TxPoolHook chan *types.Transaction
31
type TxMsg struct{ Tx *types.Transaction }
O
obscuren 已提交
32

33 34
type stateFn func() *state.StateDB

O
obscuren 已提交
35
const (
36
	minGasPrice = 1000000
O
obscuren 已提交
37 38
)

O
obscuren 已提交
39
type TxProcessor interface {
40
	ProcessTransaction(tx *types.Transaction)
O
obscuren 已提交
41 42
}

O
obscuren 已提交
43 44
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
45
// independently read without needing access to the actual pool.
O
obscuren 已提交
46
type TxPool struct {
47
	mu sync.RWMutex
O
obscuren 已提交
48 49
	// Queueing channel for reading and writing incoming
	// transactions to
50
	queueChan chan *types.Transaction
O
obscuren 已提交
51 52
	// Quiting channel
	quit chan bool
O
obscuren 已提交
53
	// The state function which will allow us to do some pre checkes
54
	currentState stateFn
O
obscuren 已提交
55
	// The actual pool
56 57
	txs           map[common.Hash]*types.Transaction
	invalidHashes *set.Set
O
obscuren 已提交
58

59 60
	queue map[common.Address]types.Transactions

O
obscuren 已提交
61
	subscribers []chan TxMsg
O
obscuren 已提交
62

63
	eventMux *event.TypeMux
O
obscuren 已提交
64 65
}

66
func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool {
67
	txPool := &TxPool{
68
		txs:           make(map[common.Hash]*types.Transaction),
69
		queue:         make(map[common.Address]types.Transactions),
70 71 72 73
		queueChan:     make(chan *types.Transaction, txPoolQueueSize),
		quit:          make(chan bool),
		eventMux:      eventMux,
		invalidHashes: set.New(),
O
obscuren 已提交
74
		currentState:  currentStateFn,
O
obscuren 已提交
75
	}
76 77 78 79 80 81 82 83 84 85 86 87 88 89
	return txPool
}

func (pool *TxPool) Start() {
	ticker := time.NewTicker(300 * time.Millisecond)
done:
	for {
		select {
		case <-ticker.C:
			pool.checkQueue()
		case <-pool.quit:
			break done
		}
	}
O
obscuren 已提交
90 91
}

92
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
93
	// Validate sender
O
obscuren 已提交
94 95 96 97 98 99
	var (
		from common.Address
		err  error
	)

	if from, err = tx.From(); err != nil {
F
Felix Lange 已提交
100
		return ErrInvalidSender
O
obscuren 已提交
101
	}
O
obscuren 已提交
102

103
	// Validate curve param
O
obscuren 已提交
104 105
	v, _, _ := tx.Curve()
	if v > 28 || v < 27 {
106
		return fmt.Errorf("tx.v != (28 || 27) => %v", v)
O
obscuren 已提交
107
	}
108

O
obscuren 已提交
109 110
	if !pool.currentState().HasAccount(from) {
		return ErrNonExistentAccount
111
	}
O
obscuren 已提交
112 113 114 115 116 117 118 119 120 121

	if pool.currentState().GetBalance(from).Cmp(new(big.Int).Mul(tx.Price, tx.GasLimit)) < 0 {
		return ErrInsufficientFunds
	}

	if tx.GasLimit.Cmp(IntrinsicGas(tx)) < 0 {
		return ErrIntrinsicGas
	}

	if pool.currentState().GetNonce(from) > tx.Nonce() {
122
		return ErrNonce
O
obscuren 已提交
123 124 125
	}

	return nil
O
obscuren 已提交
126 127
}

128
func (self *TxPool) add(tx *types.Transaction) error {
129
	hash := tx.Hash()
130

O
obscuren 已提交
131 132
	/* XXX I'm unsure about this. This is extremely dangerous and may result
	 in total black listing of certain transactions
133 134 135
	if self.invalidHashes.Has(hash) {
		return fmt.Errorf("Invalid transaction (%x)", hash[:4])
	}
O
obscuren 已提交
136
	*/
137
	if self.txs[hash] != nil {
138
		return fmt.Errorf("Known transaction (%x)", hash[:4])
139
	}
140 141 142 143 144
	err := self.ValidateTransaction(tx)
	if err != nil {
		return err
	}

145
	self.queueTx(tx)
146

147
	var toname string
148
	if to := tx.To(); to != nil {
149
		toname = common.Bytes2Hex(to[:4])
150
	} else {
151
		toname = "[NEW_CONTRACT]"
152
	}
153 154 155 156
	// we can ignore the error here because From is
	// verified in ValidateTransaction.
	f, _ := tx.From()
	from := common.Bytes2Hex(f[:4])
O
obscuren 已提交
157 158 159 160

	if glog.V(logger.Debug) {
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
	}
161 162 163 164

	return nil
}

165
func (self *TxPool) Size() int {
O
Merge  
obscuren 已提交
166
	return len(self.txs)
167 168
}

169 170 171
func (self *TxPool) Add(tx *types.Transaction) error {
	self.mu.Lock()
	defer self.mu.Unlock()
172

173 174
	return self.add(tx)
}
175

Z
zelig 已提交
176
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
177 178 179
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
180
	for _, tx := range txs {
181
		if err := self.add(tx); err != nil {
O
obscuren 已提交
182
			glog.V(logger.Debug).Infoln(err)
Z
zelig 已提交
183
		} else {
184
			h := tx.Hash()
O
obscuren 已提交
185
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
Z
zelig 已提交
186 187 188 189
		}
	}
}

O
Merge  
obscuren 已提交
190
func (self *TxPool) GetTransactions() (txs types.Transactions) {
191 192 193
	self.mu.RLock()
	defer self.mu.RUnlock()

O
Merge  
obscuren 已提交
194
	txs = make(types.Transactions, self.Size())
O
obscuren 已提交
195
	i := 0
O
Merge  
obscuren 已提交
196 197
	for _, tx := range self.txs {
		txs[i] = tx
O
obscuren 已提交
198
		i++
O
Merge  
obscuren 已提交
199
	}
200

O
Merge  
obscuren 已提交
201
	return
202 203
}

204
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
205 206
	self.mu.Lock()
	defer self.mu.Unlock()
207

208
	for _, tx := range txs {
209
		delete(self.txs, tx.Hash())
210 211 212
	}
}

213
func (pool *TxPool) Flush() {
214
	pool.txs = make(map[common.Hash]*types.Transaction)
O
obscuren 已提交
215 216 217 218
}

func (pool *TxPool) Stop() {
	pool.Flush()
219
	close(pool.quit)
O
obscuren 已提交
220

O
obscuren 已提交
221
	glog.V(logger.Info).Infoln("TX Pool stopped")
O
obscuren 已提交
222
}
223

224 225 226 227 228 229 230 231 232 233 234 235 236
func (self *TxPool) queueTx(tx *types.Transaction) {
	from, _ := tx.From()
	self.queue[from] = append(self.queue[from], tx)
}

func (pool *TxPool) addTx(tx *types.Transaction) {
	if _, ok := pool.txs[tx.Hash()]; !ok {
		pool.txs[tx.Hash()] = tx
		// Notify the subscribers
		pool.eventMux.Post(TxPreEvent{tx})
	}
}

237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
// check queue will attempt to insert
func (pool *TxPool) checkQueue() {
	pool.mu.Lock()
	defer pool.mu.Unlock()

	for address, txs := range pool.queue {
		sort.Sort(types.TxByNonce{txs})

		var (
			nonce = pool.currentState().GetNonce(address)
			start int
		)
		// Clean up the transactions first and determine the start of the nonces
		for _, tx := range txs {
			if tx.Nonce() >= nonce {
				break
			}
			start++
		}
		pool.queue[address] = txs[start:]

		// expected nonce
		enonce := nonce
		for _, tx := range pool.queue[address] {
			// If the expected nonce does not match up with the next one
			// (i.e. a nonce gap), we stop the loop
			if enonce != tx.Nonce() {
				break
			}
			enonce++

268
			pool.addTx(tx)
269 270 271 272 273 274 275 276
		}
		//pool.queue[address] = txs[i:]
		// delete the entire queue entry if it's empty. There's no need to keep it
		if len(pool.queue[address]) == 0 {
			delete(pool.queue, address)
		}
	}
}