peer.go 11.9 KB
Newer Older
O
obscuren 已提交
1 2 3
package eth

import (
O
obscuren 已提交
4
	"github.com/ethereum/ethchain-go"
O
obscuren 已提交
5 6 7 8 9
	"github.com/ethereum/ethutil-go"
	"github.com/ethereum/ethwire-go"
	"log"
	"net"
	"strconv"
O
obscuren 已提交
10
	"strings"
O
obscuren 已提交
11 12 13 14 15 16 17 18 19
	"sync/atomic"
	"time"
)

const (
	// The size of the output buffer for writing messages
	outputBufferSize = 50
)

O
obscuren 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
type DiscReason byte

const (
	DiscReRequested  = 0x00
	DiscReTcpSysErr  = 0x01
	DiscBadProto     = 0x02
	DiscBadPeer      = 0x03
	DiscTooManyPeers = 0x04
)

var discReasonToString = []string{
	"Disconnect requested",
	"Disconnect TCP sys error",
	"Disconnect Bad protocol",
	"Disconnect Useless peer",
	"Disconnect Too many peers",
}

func (d DiscReason) String() string {
	if len(discReasonToString) > int(d) {
		return "Unknown"
	}

	return discReasonToString[d]
}

// Peer capabilities
O
obscuren 已提交
47 48 49
type Caps byte

const (
O
obscuren 已提交
50 51 52
	CapPeerDiscTy = 0x01
	CapTxTy       = 0x02
	CapChainTy    = 0x04
O
obscuren 已提交
53

O
obscuren 已提交
54
	CapDefault = CapChainTy | CapTxTy | CapPeerDiscTy
O
obscuren 已提交
55 56 57
)

var capsToString = map[Caps]string{
O
obscuren 已提交
58 59 60
	CapPeerDiscTy: "Peer discovery",
	CapTxTy:       "Transaction relaying",
	CapChainTy:    "Block chain relaying",
O
obscuren 已提交
61 62 63 64
}

func (c Caps) String() string {
	var caps []string
O
obscuren 已提交
65 66
	if c&CapPeerDiscTy > 0 {
		caps = append(caps, capsToString[CapPeerDiscTy])
O
obscuren 已提交
67 68 69 70 71 72 73 74 75 76 77
	}
	if c&CapChainTy > 0 {
		caps = append(caps, capsToString[CapChainTy])
	}
	if c&CapTxTy > 0 {
		caps = append(caps, capsToString[CapTxTy])
	}

	return strings.Join(caps, " | ")
}

O
obscuren 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
type Peer struct {
	// Ethereum interface
	ethereum *Ethereum
	// Net connection
	conn net.Conn
	// Output queue which is used to communicate and handle messages
	outputQueue chan *ethwire.Msg
	// Quit channel
	quit chan bool
	// Determines whether it's an inbound or outbound peer
	inbound bool
	// Flag for checking the peer's connectivity state
	connected  int32
	disconnect int32
	// Last known message send
	lastSend time.Time
	// Indicated whether a verack has been send or not
	// This flag is used by writeMessage to check if messages are allowed
	// to be send or not. If no version is known all messages are ignored.
	versionKnown bool

	// Last received pong message
	lastPong int64
	// Indicates whether a MsgGetPeersTy was requested of the peer
	// this to prevent receiving false peers.
	requestedPeerList bool
O
obscuren 已提交
104

O
obscuren 已提交
105 106 107
	host []byte
	port uint16
	caps Caps
O
obscuren 已提交
108 109 110 111 112 113 114 115 116 117 118
}

func NewPeer(conn net.Conn, ethereum *Ethereum, inbound bool) *Peer {
	return &Peer{
		outputQueue: make(chan *ethwire.Msg, outputBufferSize),
		quit:        make(chan bool),
		ethereum:    ethereum,
		conn:        conn,
		inbound:     inbound,
		disconnect:  0,
		connected:   1,
O
obscuren 已提交
119
		port:        30303,
O
obscuren 已提交
120 121 122
	}
}

O
obscuren 已提交
123
func NewOutboundPeer(addr string, ethereum *Ethereum, caps Caps) *Peer {
O
obscuren 已提交
124 125 126 127 128 129 130
	p := &Peer{
		outputQueue: make(chan *ethwire.Msg, outputBufferSize),
		quit:        make(chan bool),
		ethereum:    ethereum,
		inbound:     false,
		connected:   0,
		disconnect:  0,
O
obscuren 已提交
131
		caps:        caps,
O
obscuren 已提交
132 133 134 135
	}

	// Set up the connection in another goroutine so we don't block the main thread
	go func() {
O
obscuren 已提交
136 137
		conn, err := net.DialTimeout("tcp", addr, 30*time.Second)

O
obscuren 已提交
138
		if err != nil {
O
obscuren 已提交
139
			log.Println("Connection to peer failed", err)
O
obscuren 已提交
140
			p.Stop()
O
obscuren 已提交
141
			return
O
obscuren 已提交
142 143 144 145 146 147 148
		}
		p.conn = conn

		// Atomically set the connection state
		atomic.StoreInt32(&p.connected, 1)
		atomic.StoreInt32(&p.disconnect, 0)

149
		p.Start()
O
obscuren 已提交
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
	}()

	return p
}

// Outputs any RLP encoded data to the peer
func (p *Peer) QueueMessage(msg *ethwire.Msg) {
	p.outputQueue <- msg
}

func (p *Peer) writeMessage(msg *ethwire.Msg) {
	// Ignore the write if we're not connected
	if atomic.LoadInt32(&p.connected) != 1 {
		return
	}

	if !p.versionKnown {
		switch msg.Type {
		case ethwire.MsgHandshakeTy: // Ok
		default: // Anything but ack is allowed
			return
		}
	}

	err := ethwire.WriteMessage(p.conn, msg)
	if err != nil {
		log.Println("Can't send message:", err)
		// Stop the client if there was an error writing to it
		p.Stop()
		return
	}
}

// Outbound message handler. Outbound messages are handled here
func (p *Peer) HandleOutbound() {
	// The ping timer. Makes sure that every 2 minutes a ping is send to the peer
O
obscuren 已提交
186
	pingTimer := time.NewTicker(2 * time.Minute)
O
obscuren 已提交
187
	serviceTimer := time.NewTicker(5 * time.Minute)
O
obscuren 已提交
188 189 190 191 192 193 194 195 196
out:
	for {
		select {
		// Main message queue. All outbound messages are processed through here
		case msg := <-p.outputQueue:
			p.writeMessage(msg)

			p.lastSend = time.Now()

O
obscuren 已提交
197 198
		// Ping timer sends a ping to the peer each 2 minutes
		case <-pingTimer.C:
O
obscuren 已提交
199
			p.writeMessage(ethwire.NewMessage(ethwire.MsgPingTy, ""))
O
obscuren 已提交
200

O
obscuren 已提交
201 202 203
		// Service timer takes care of peer broadcasting, transaction
		// posting or block posting
		case <-serviceTimer.C:
O
obscuren 已提交
204
			if p.caps&CapPeerDiscTy > 0 {
O
obscuren 已提交
205 206 207 208
				msg := p.peersMessage()
				p.ethereum.BroadcastMsg(msg)
			}

O
obscuren 已提交
209
		case <-p.quit:
O
obscuren 已提交
210
			// Break out of the for loop if a quit message is posted
O
obscuren 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
			break out
		}
	}

clean:
	// This loop is for draining the output queue and anybody waiting for us
	for {
		select {
		case <-p.outputQueue:
			// TODO
		default:
			break clean
		}
	}
}

// Inbound handler. Inbound messages are received here and passed to the appropriate methods
func (p *Peer) HandleInbound() {

out:
	for atomic.LoadInt32(&p.disconnect) == 0 {
		// Wait for a message from the peer
O
obscuren 已提交
233 234 235 236
		msgs, err := ethwire.ReadMessages(p.conn)
		for _, msg := range msgs {
			if err != nil {
				log.Println(err)
O
obscuren 已提交
237

O
obscuren 已提交
238 239
				break out
			}
O
obscuren 已提交
240

O
obscuren 已提交
241 242 243 244
			switch msg.Type {
			case ethwire.MsgHandshakeTy:
				// Version message
				p.handleHandshake(msg)
O
obscuren 已提交
245 246

				p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, ""))
O
obscuren 已提交
247 248
			case ethwire.MsgDiscTy:
				p.Stop()
O
obscuren 已提交
249
				log.Println("Disconnect peer:", DiscReason(msg.Data.Get(0).AsUint()))
O
obscuren 已提交
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
			case ethwire.MsgPingTy:
				// Respond back with pong
				p.QueueMessage(ethwire.NewMessage(ethwire.MsgPongTy, ""))
			case ethwire.MsgPongTy:
				// If we received a pong back from a peer we set the
				// last pong so the peer handler knows this peer is still
				// active.
				p.lastPong = time.Now().Unix()
			case ethwire.MsgBlockTy:
				// Get all blocks and process them
				msg.Data = msg.Data
				for i := msg.Data.Length() - 1; i >= 0; i-- {
					block := ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
					err := p.ethereum.BlockManager.ProcessBlock(block)

					if err != nil {
						log.Println(err)
					}
O
obscuren 已提交
268
				}
O
obscuren 已提交
269 270 271 272 273 274
			case ethwire.MsgTxTy:
				// If the message was a transaction queue the transaction
				// in the TxPool where it will undergo validation and
				// processing when a new block is found
				for i := 0; i < msg.Data.Length(); i++ {
					p.ethereum.TxPool.QueueTransaction(ethchain.NewTransactionFromRlpValue(msg.Data.Get(i)))
O
obscuren 已提交
275
				}
O
obscuren 已提交
276 277 278 279 280 281 282 283 284
			case ethwire.MsgGetPeersTy:
				// Flag this peer as a 'requested of new peers' this to
				// prevent malicious peers being forced.
				p.requestedPeerList = true
				// Peer asked for list of connected peers
				p.pushPeers()
			case ethwire.MsgPeersTy:
				// Received a list of peers (probably because MsgGetPeersTy was send)
				// Only act on message if we actually requested for a peers list
O
obscuren 已提交
285 286 287 288 289 290 291 292
				//if p.requestedPeerList {
				data := msg.Data
				// Create new list of possible peers for the ethereum to process
				peers := make([]string, data.Length())
				// Parse each possible peer
				for i := 0; i < data.Length(); i++ {
					peers[i] = unpackAddr(data.Get(i).Get(0), data.Get(i).Get(1).AsUint())
				}
O
obscuren 已提交
293

O
obscuren 已提交
294 295 296 297
				// Connect to the list of peers
				p.ethereum.ProcessPeerList(peers)
				// Mark unrequested again
				p.requestedPeerList = false
O
obscuren 已提交
298

O
obscuren 已提交
299
				//}
O
obscuren 已提交
300 301 302 303 304 305
			case ethwire.MsgGetChainTy:
				var parent *ethchain.Block
				// Length minus one since the very last element in the array is a count
				l := msg.Data.Length() - 1
				// Ignore empty get chains
				if l <= 1 {
O
obscuren 已提交
306
					break
O
obscuren 已提交
307 308
				}

O
obscuren 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333
				// Amount of parents in the canonical chain
				amountOfBlocks := msg.Data.Get(l).AsUint()
				// Check each SHA block hash from the message and determine whether
				// the SHA is in the database
				for i := 0; i < l; i++ {
					if data := msg.Data.Get(i).AsBytes(); p.ethereum.BlockManager.BlockChain().HasBlock(data) {
						parent = p.ethereum.BlockManager.BlockChain().GetBlock(data)
						break
					}
				}

				// If a parent is found send back a reply
				if parent != nil {
					chain := p.ethereum.BlockManager.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks)
					p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, append(chain, amountOfBlocks)))
				} else {
					// If no blocks are found we send back a reply with msg not in chain
					// and the last hash from get chain
					lastHash := msg.Data.Get(l - 1)
					log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw())
					p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.AsRaw()}))
				}
			case ethwire.MsgNotInChainTy:
				log.Printf("Not in chain %x\n", msg.Data)
				// TODO
O
obscuren 已提交
334

O
obscuren 已提交
335 336 337 338
				// Unofficial but fun nonetheless
			case ethwire.MsgTalkTy:
				log.Printf("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.AsString())
			}
O
obscuren 已提交
339 340 341 342 343 344
		}
	}

	p.Stop()
}

O
obscuren 已提交
345 346 347 348 349 350 351 352 353 354 355 356
func packAddr(address, port string) ([]byte, uint16) {
	addr := strings.Split(address, ".")
	a, _ := strconv.Atoi(addr[0])
	b, _ := strconv.Atoi(addr[1])
	c, _ := strconv.Atoi(addr[2])
	d, _ := strconv.Atoi(addr[3])
	host := []byte{byte(a), byte(b), byte(c), byte(d)}
	prt, _ := strconv.Atoi(port)

	return host, uint16(prt)
}

O
obscuren 已提交
357 358 359 360 361
func unpackAddr(value *ethutil.RlpValue, p uint64) string {
	a := strconv.Itoa(int(value.Get(0).AsUint()))
	b := strconv.Itoa(int(value.Get(1).AsUint()))
	c := strconv.Itoa(int(value.Get(2).AsUint()))
	d := strconv.Itoa(int(value.Get(3).AsUint()))
O
obscuren 已提交
362 363 364 365 366 367
	host := strings.Join([]string{a, b, c, d}, ".")
	port := strconv.Itoa(int(p))

	return net.JoinHostPort(host, port)
}

368
func (p *Peer) Start() {
O
obscuren 已提交
369 370
	peerHost, peerPort, _ := net.SplitHostPort(p.conn.LocalAddr().String())
	servHost, servPort, _ := net.SplitHostPort(p.conn.RemoteAddr().String())
O
obscuren 已提交
371
	if peerHost == servHost {
O
obscuren 已提交
372
		p.Stop()
O
obscuren 已提交
373

O
obscuren 已提交
374
		return
O
obscuren 已提交
375 376 377 378 379 380
	}

	if p.inbound {
		p.host, p.port = packAddr(peerHost, peerPort)
	} else {
		p.host, p.port = packAddr(servHost, servPort)
O
obscuren 已提交
381 382 383 384 385 386 387 388 389
	}

	err := p.pushHandshake()
	if err != nil {
		log.Printf("Peer can't send outbound version ack", err)

		p.Stop()

		return
O
obscuren 已提交
390 391 392 393 394 395
	}

	// Run the outbound handler in a new goroutine
	go p.HandleOutbound()
	// Run the inbound handler in a new goroutine
	go p.HandleInbound()
O
obscuren 已提交
396

O
obscuren 已提交
397 398 399 400 401 402 403 404 405
}

func (p *Peer) Stop() {
	if atomic.AddInt32(&p.disconnect, 1) != 1 {
		return
	}

	close(p.quit)
	if atomic.LoadInt32(&p.connected) != 0 {
O
obscuren 已提交
406
		p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, ""))
O
obscuren 已提交
407 408 409 410 411
		p.conn.Close()
	}
}

func (p *Peer) pushHandshake() error {
O
obscuren 已提交
412
	msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
O
obscuren 已提交
413
		uint32(0), uint32(0), "/Ethereum(G) v0.0.1/", byte(p.caps), p.port,
O
obscuren 已提交
414
	})
O
obscuren 已提交
415 416 417 418 419 420

	p.QueueMessage(msg)

	return nil
}

O
obscuren 已提交
421
func (p *Peer) peersMessage() *ethwire.Msg {
422
	outPeers := make([]interface{}, len(p.ethereum.InOutPeers()))
O
obscuren 已提交
423
	// Serialise each peer
424
	for i, peer := range p.ethereum.InOutPeers() {
O
obscuren 已提交
425
		outPeers[i] = peer.RlpData()
O
obscuren 已提交
426 427
	}

O
obscuren 已提交
428 429 430
	// Return the message to the peer with the known list of connected clients
	return ethwire.NewMessage(ethwire.MsgPeersTy, outPeers)
}
O
obscuren 已提交
431

O
obscuren 已提交
432 433 434
// Pushes the list of outbound peers to the client when requested
func (p *Peer) pushPeers() {
	p.QueueMessage(p.peersMessage())
O
obscuren 已提交
435 436 437
}

func (p *Peer) handleHandshake(msg *ethwire.Msg) {
438
	c := msg.Data
O
obscuren 已提交
439 440 441
	// [PROTOCOL_VERSION, NETWORK_ID, CLIENT_ID]
	p.versionKnown = true

O
obscuren 已提交
442
	var istr string
O
obscuren 已提交
443 444
	// If this is an inbound connection send an ack back
	if p.inbound {
O
obscuren 已提交
445 446 447
		if port := c.Get(4).AsUint(); port != 0 {
			p.port = uint16(port)
		}
O
obscuren 已提交
448

O
obscuren 已提交
449
		istr = "inbound"
O
obscuren 已提交
450
	} else {
O
obscuren 已提交
451 452
		msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{p.ethereum.BlockManager.BlockChain().CurrentBlock.Hash(), uint64(100)})
		p.QueueMessage(msg)
O
obscuren 已提交
453 454

		istr = "outbound"
O
obscuren 已提交
455
	}
O
obscuren 已提交
456

O
obscuren 已提交
457 458
	if caps := Caps(c.Get(3).AsByte()); caps != 0 {
		p.caps = caps
O
obscuren 已提交
459 460
	}

O
obscuren 已提交
461 462
	log.Printf("peer connect (%s) %v %s [%s]\n", istr, p.conn.RemoteAddr(), c.Get(2).AsString(), p.caps)
}
O
obscuren 已提交
463

O
obscuren 已提交
464 465
func (p *Peer) RlpData() []interface{} {
	return []interface{}{p.host, p.port /*port*/}
O
obscuren 已提交
466 467
}

O
obscuren 已提交
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
func (p *Peer) RlpEncode() []byte {
	host, prt, err := net.SplitHostPort(p.conn.RemoteAddr().String())
	if err != nil {
		return nil
	}

	i, err := strconv.Atoi(prt)
	if err != nil {
		return nil
	}

	port := ethutil.NumberToBytes(uint16(i), 16)

	return ethutil.Encode([]interface{}{host, port})
}