transaction_pool.go 4.2 KB
Newer Older
O
obscuren 已提交
1
package core
O
obscuren 已提交
2 3 4

import (
	"fmt"
5

O
obscuren 已提交
6
	"github.com/ethereum/go-ethereum/core/types"
7
	"github.com/ethereum/go-ethereum/ethutil"
O
obscuren 已提交
8
	"github.com/ethereum/go-ethereum/event"
O
obscuren 已提交
9
	"github.com/ethereum/go-ethereum/logger"
10
	"gopkg.in/fatih/set.v0"
O
obscuren 已提交
11 12
)

O
obscuren 已提交
13
var txplogger = logger.NewLogger("TXP")
Z
zelig 已提交
14

15
const txPoolQueueSize = 50
O
obscuren 已提交
16

17
type TxPoolHook chan *types.Transaction
Z
zelig 已提交
18 19 20
type TxMsg struct {
	Tx *types.Transaction
}
O
obscuren 已提交
21 22

const (
23
	minGasPrice = 1000000
O
obscuren 已提交
24 25
)

O
obscuren 已提交
26
type TxProcessor interface {
27
	ProcessTransaction(tx *types.Transaction)
O
obscuren 已提交
28 29
}

O
obscuren 已提交
30 31
// The tx pool a thread safe transaction pool handler. In order to
// guarantee a non blocking pool we use a queue channel which can be
32
// independently read without needing access to the actual pool.
O
obscuren 已提交
33 34 35
type TxPool struct {
	// Queueing channel for reading and writing incoming
	// transactions to
36
	queueChan chan *types.Transaction
O
obscuren 已提交
37 38 39
	// Quiting channel
	quit chan bool
	// The actual pool
40 41
	//pool *list.List
	pool *set.Set
O
obscuren 已提交
42

O
obscuren 已提交
43
	SecondaryProcessor TxProcessor
O
obscuren 已提交
44 45

	subscribers []chan TxMsg
O
obscuren 已提交
46

47
	eventMux *event.TypeMux
O
obscuren 已提交
48 49
}

50
func NewTxPool(eventMux *event.TypeMux) *TxPool {
O
obscuren 已提交
51
	return &TxPool{
52 53 54 55
		pool:      set.New(),
		queueChan: make(chan *types.Transaction, txPoolQueueSize),
		quit:      make(chan bool),
		eventMux:  eventMux,
O
obscuren 已提交
56 57 58
	}
}

59
func (pool *TxPool) addTransaction(tx *types.Transaction) {
60
	pool.pool.Add(tx)
O
obscuren 已提交
61 62

	// Broadcast the transaction to the rest of the peers
O
obscuren 已提交
63
	pool.eventMux.Post(TxPreEvent{tx})
O
obscuren 已提交
64 65
}

66
func (pool *TxPool) ValidateTransaction(tx *types.Transaction) error {
67 68
	if len(tx.To()) != 0 && len(tx.To()) != 20 {
		return fmt.Errorf("Invalid recipient. len = %d", len(tx.To()))
O
obscuren 已提交
69 70
	}

O
obscuren 已提交
71 72
	v, _, _ := tx.Curve()
	if v > 28 || v < 27 {
73
		return fmt.Errorf("tx.v != (28 || 27) => %v", v)
O
obscuren 已提交
74 75
	}

76 77 78 79
	/* XXX this kind of validation needs to happen elsewhere in the gui when sending txs.
	   Other clients should do their own validation. Value transfer could throw error
	   but doesn't necessarily invalidate the tx. Gas can still be payed for and miner
	   can still be rewarded for their inclusion and processing.
O
obscuren 已提交
80
	// Get the sender
81
	senderAddr := tx.From()
E
Ethan Buchman 已提交
82
	if senderAddr == nil {
83
		return fmt.Errorf("invalid sender")
E
Ethan Buchman 已提交
84
	}
85
	sender := pool.stateQuery.GetAccount(senderAddr)
O
obscuren 已提交
86

87
	totAmount := new(big.Int).Set(tx.Value())
O
obscuren 已提交
88 89
	// Make sure there's enough in the sender's account. Having insufficient
	// funds won't invalidate this transaction but simple ignores it.
O
obscuren 已提交
90
	if sender.Balance().Cmp(totAmount) < 0 {
91
		return fmt.Errorf("Insufficient amount in sender's (%x) account", tx.From())
92
	}
93
	*/
94

O
obscuren 已提交
95 96 97
	return nil
}

O
obscuren 已提交
98
func (self *TxPool) Add(tx *types.Transaction) error {
99
	hash := tx.Hash()
100
	if self.pool.Has(tx) {
101 102 103 104 105 106 107 108 109 110
		return fmt.Errorf("Known transaction (%x)", hash[0:4])
	}

	err := self.ValidateTransaction(tx)
	if err != nil {
		return err
	}

	self.addTransaction(tx)

111 112 113 114 115 116 117 118
	var to string
	if len(tx.To()) > 0 {
		to = ethutil.Bytes2Hex(tx.To()[:4])
	} else {
		to = "[NEW_CONTRACT]"
	}

	txplogger.Debugf("(t) %x => %s (%v) %x\n", tx.From()[:4], to, tx.Value, tx.Hash())
119 120

	// Notify the subscribers
O
obscuren 已提交
121
	go self.eventMux.Post(TxPreEvent{tx})
122 123 124 125

	return nil
}

126
func (self *TxPool) Size() int {
127
	return self.pool.Size()
128 129
}

Z
zelig 已提交
130 131 132 133 134 135 136 137 138 139 140
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
	for _, tx := range txs {
		if err := self.Add(tx); err != nil {
			txplogger.Infoln(err)
		} else {
			txplogger.Infof("tx %x\n", tx.Hash()[0:4])
		}
	}
}

func (pool *TxPool) GetTransactions() []*types.Transaction {
141
	txList := make([]*types.Transaction, pool.Size())
O
obscuren 已提交
142
	i := 0
143 144
	pool.pool.Each(func(v interface{}) bool {
		txList[i] = v.(*types.Transaction)
O
obscuren 已提交
145
		i++
146 147 148

		return true
	})
O
obscuren 已提交
149

150 151 152
	return txList
}

153 154 155 156 157
func (pool *TxPool) RemoveInvalid(query StateQuery) {
	var removedTxs types.Transactions
	pool.pool.Each(func(v interface{}) bool {
		tx := v.(*types.Transaction)
		sender := query.GetAccount(tx.From())
158
		err := pool.ValidateTransaction(tx)
159
		if err != nil || sender.Nonce >= tx.Nonce() {
160
			removedTxs = append(removedTxs, tx)
161
		}
162 163 164 165

		return true
	})
	pool.RemoveSet(removedTxs)
166 167
}

168
func (self *TxPool) RemoveSet(txs types.Transactions) {
169
	for _, tx := range txs {
170
		self.pool.Remove(tx)
171 172 173
	}
}

174
func (pool *TxPool) Flush() []*types.Transaction {
Z
zelig 已提交
175
	txList := pool.GetTransactions()
176
	pool.pool.Clear()
O
obscuren 已提交
177 178 179 180 181 182 183 184 185

	return txList
}

func (pool *TxPool) Start() {
}

func (pool *TxPool) Stop() {
	pool.Flush()
O
obscuren 已提交
186

Z
zelig 已提交
187
	txplogger.Infoln("Stopped")
O
obscuren 已提交
188
}