transaction_pool.go 4.4 KB
Newer Older
O
obscuren 已提交
1
package core
O
obscuren 已提交
2 3

import (
4
	"errors"
O
obscuren 已提交
5
	"fmt"
6
	"sync"
7

O
obscuren 已提交
8
	"github.com/ethereum/go-ethereum/common"
9
	"github.com/ethereum/go-ethereum/core/types"
O
obscuren 已提交
10
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
12 13
)

14 15 16 17 18
var (
	txplogger = logger.NewLogger("TXP")

	ErrInvalidSender = errors.New("Invalid sender")
)
Z
zelig 已提交
19

20
const txPoolQueueSize = 50
O
obscuren 已提交
21

22
type TxPoolHook chan *types.Transaction
23
type TxMsg struct{ Tx *types.Transaction }
O
obscuren 已提交
24 25

const (
26
	minGasPrice = 1000000
O
obscuren 已提交
27 28
)

O
obscuren 已提交
29
type TxProcessor interface {
30
	ProcessTransaction(tx *types.Transaction)
O
obscuren 已提交
31 32
}

O
obscuren 已提交
33 34
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
35
// independently read without needing access to the actual pool.
O
obscuren 已提交
36
type TxPool struct {
37
	mu sync.RWMutex
O
obscuren 已提交
38 39
	// Queueing channel for reading and writing incoming
	// transactions to
40
	queueChan chan *types.Transaction
O
obscuren 已提交
41 42 43
	// Quiting channel
	quit chan bool
	// The actual pool
44
	//pool *list.List
45
	txs map[common.Hash]*types.Transaction
O
obscuren 已提交
46

O
obscuren 已提交
47
	SecondaryProcessor TxProcessor
O
obscuren 已提交
48 49

	subscribers []chan TxMsg
O
obscuren 已提交
50

51
	eventMux *event.TypeMux
O
obscuren 已提交
52 53
}

54
func NewTxPool(eventMux *event.TypeMux) *TxPool {
O
obscuren 已提交
55
	return &TxPool{
56
		txs:       make(map[common.Hash]*types.Transaction),
57 58 59
		queueChan: make(chan *types.Transaction, txPoolQueueSize),
		quit:      make(chan bool),
		eventMux:  eventMux,
O
obscuren 已提交
60 61 62
	}
}

63
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
64 65
	// Validate sender
	if _, err := tx.From(); err != nil {
F
Felix Lange 已提交
66
		return ErrInvalidSender
O
obscuren 已提交
67
	}
68
	// Validate curve param
O
obscuren 已提交
69 70
	v, _, _ := tx.Curve()
	if v > 28 || v < 27 {
71
		return fmt.Errorf("tx.v != (28 || 27) => %v", v)
O
obscuren 已提交
72
	}
73
	return nil
74

75 76 77 78
	/* XXX this kind of validation needs to happen elsewhere in the gui when sending txs.
	   Other clients should do their own validation. Value transfer could throw error
	   but doesn't necessarily invalidate the tx. Gas can still be payed for and miner
	   can still be rewarded for their inclusion and processing.
79
	sender := pool.stateQuery.GetAccount(senderAddr)
80
	totAmount := new(big.Int).Set(tx.Value())
O
obscuren 已提交
81 82
	// Make sure there's enough in the sender's account. Having insufficient
	// funds won't invalidate this transaction but simple ignores it.
O
obscuren 已提交
83
	if sender.Balance().Cmp(totAmount) < 0 {
84
		return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From())
85
	}
86
	*/
O
obscuren 已提交
87 88
}

O
Merge  
obscuren 已提交
89
func (self *TxPool) addTx(tx *types.Transaction) {
90
	self.txs[tx.Hash()] = tx
O
Merge  
obscuren 已提交
91
}
92

93
func (self *TxPool) add(tx *types.Transaction) error {
94 95 96
	hash := tx.Hash()
	if self.txs[hash] != nil {
		return fmt.Errorf("Known transaction (%x)", hash[0:4])
97
	}
98 99 100 101 102
	err := self.ValidateTransaction(tx)
	if err != nil {
		return err
	}

O
Merge  
obscuren 已提交
103
	self.addTx(tx)
104

105
	var toname string
106
	if to := tx.To(); to != nil {
107
		toname = common.Bytes2Hex(to[:4])
108
	} else {
109
		toname = "[NEW_CONTRACT]"
110
	}
111 112 113 114
	// we can ignore the error here because From is
	// verified in ValidateTransaction.
	f, _ := tx.From()
	from := common.Bytes2Hex(f[:4])
115
	txplogger.Debugf("(t) %x => %s (%v) %x\n", from, toname, tx.Value, tx.Hash())
116 117

	// Notify the subscribers
O
obscuren 已提交
118
	go self.eventMux.Post(TxPreEvent{tx})
119 120 121 122

	return nil
}

123
func (self *TxPool) Size() int {
O
Merge  
obscuren 已提交
124
	return len(self.txs)
125 126
}

127 128 129 130 131
func (self *TxPool) Add(tx *types.Transaction) error {
	self.mu.Lock()
	defer self.mu.Unlock()
	return self.add(tx)
}
132

Z
zelig 已提交
133
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
134 135 136
	self.mu.Lock()
	defer self.mu.Unlock()

Z
zelig 已提交
137
	for _, tx := range txs {
138 139
		if err := self.add(tx); err != nil {
			txplogger.Debugln(err)
Z
zelig 已提交
140
		} else {
141 142
			h := tx.Hash()
			txplogger.Debugf("tx %x\n", h[:4])
Z
zelig 已提交
143 144 145 146
		}
	}
}

O
Merge  
obscuren 已提交
147
func (self *TxPool) GetTransactions() (txs types.Transactions) {
148 149 150
	self.mu.RLock()
	defer self.mu.RUnlock()

O
Merge  
obscuren 已提交
151
	txs = make(types.Transactions, self.Size())
O
obscuren 已提交
152
	i := 0
O
Merge  
obscuren 已提交
153 154
	for _, tx := range self.txs {
		txs[i] = tx
O
obscuren 已提交
155
		i++
O
Merge  
obscuren 已提交
156
	}
157

O
Merge  
obscuren 已提交
158
	return
159 160
}

161
func (pool *TxPool) RemoveInvalid(query StateQuery) {
162 163
	pool.mu.Lock()

164
	var removedTxs types.Transactions
O
Merge  
obscuren 已提交
165
	for _, tx := range pool.txs {
166 167
		from, _ := tx.From()
		sender := query.GetAccount(from[:])
168
		err := pool.ValidateTransaction(tx)
169
		if err != nil || sender.Nonce() >= tx.Nonce() {
170
			removedTxs = append(removedTxs, tx)
171
		}
O
Merge  
obscuren 已提交
172
	}
173
	pool.mu.Unlock()
174 175

	pool.RemoveSet(removedTxs)
176 177
}

178
func (self *TxPool) RemoveSet(txs types.Transactions) {
179 180
	self.mu.Lock()
	defer self.mu.Unlock()
181
	for _, tx := range txs {
182
		delete(self.txs, tx.Hash())
183 184 185
	}
}

186
func (pool *TxPool) Flush() {
187
	pool.txs = make(map[common.Hash]*types.Transaction)
O
obscuren 已提交
188 189 190 191 192 193 194
}

func (pool *TxPool) Start() {
}

func (pool *TxPool) Stop() {
	pool.Flush()
O
obscuren 已提交
195

Z
zelig 已提交
196
	txplogger.Infoln("Stopped")
O
obscuren 已提交
197
}