Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
whqwjb
go-ethereum
提交
5caff3bc
G
go-ethereum
项目概览
whqwjb
/
go-ethereum
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
G
go-ethereum
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5caff3bc
编写于
7月 01, 2015
作者:
J
Jeffrey Wilcke
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1351 from karalabe/eth61
Implement eth/61
上级
507869bf
d6f2c0a7
变更
13
展开全部
显示空白变更内容
内联
并排
Showing
13 changed file
with
981 addition
and
306 deletion
+981
-306
cmd/geth/main.go
cmd/geth/main.go
+1
-2
cmd/utils/flags.go
cmd/utils/flags.go
+0
-6
eth/backend.go
eth/backend.go
+35
-27
eth/downloader/downloader.go
eth/downloader/downloader.go
+391
-26
eth/downloader/downloader_test.go
eth/downloader/downloader_test.go
+231
-62
eth/downloader/peer.go
eth/downloader/peer.go
+16
-10
eth/downloader/queue.go
eth/downloader/queue.go
+16
-12
eth/handler.go
eth/handler.go
+120
-68
eth/metrics.go
eth/metrics.go
+28
-0
eth/peer.go
eth/peer.go
+104
-74
eth/protocol.go
eth/protocol.go
+33
-5
eth/protocol_test.go
eth/protocol_test.go
+5
-5
eth/sync.go
eth/sync.go
+1
-9
未找到文件。
cmd/geth/main.go
浏览文件 @
5caff3bc
...
...
@@ -277,7 +277,6 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
utils
.
ExecFlag
,
utils
.
WhisperEnabledFlag
,
utils
.
VMDebugFlag
,
utils
.
ProtocolVersionFlag
,
utils
.
NetworkIdFlag
,
utils
.
RPCCORSDomainFlag
,
utils
.
VerbosityFlag
,
...
...
@@ -644,7 +643,7 @@ func version(c *cli.Context) {
if
gitCommit
!=
""
{
fmt
.
Println
(
"Git Commit:"
,
gitCommit
)
}
fmt
.
Println
(
"Protocol Version
:"
,
c
.
GlobalInt
(
utils
.
ProtocolVersionFlag
.
Name
)
)
fmt
.
Println
(
"Protocol Version
s:"
,
eth
.
ProtocolVersions
)
fmt
.
Println
(
"Network Id:"
,
c
.
GlobalInt
(
utils
.
NetworkIdFlag
.
Name
))
fmt
.
Println
(
"Go Version:"
,
runtime
.
Version
())
fmt
.
Println
(
"OS:"
,
runtime
.
GOOS
)
...
...
cmd/utils/flags.go
浏览文件 @
5caff3bc
...
...
@@ -82,11 +82,6 @@ var (
Usage
:
"Data directory to be used"
,
Value
:
DirectoryString
{
common
.
DefaultDataDir
()},
}
ProtocolVersionFlag
=
cli
.
IntFlag
{
Name
:
"protocolversion"
,
Usage
:
"ETH protocol version (integer)"
,
Value
:
eth
.
ProtocolVersion
,
}
NetworkIdFlag
=
cli
.
IntFlag
{
Name
:
"networkid"
,
Usage
:
"Network Id (integer)"
,
...
...
@@ -359,7 +354,6 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
return
&
eth
.
Config
{
Name
:
common
.
MakeName
(
clientID
,
version
),
DataDir
:
ctx
.
GlobalString
(
DataDirFlag
.
Name
),
ProtocolVersion
:
ctx
.
GlobalInt
(
ProtocolVersionFlag
.
Name
),
GenesisNonce
:
ctx
.
GlobalInt
(
GenesisNonceFlag
.
Name
),
BlockChainVersion
:
ctx
.
GlobalInt
(
BlockchainVersionFlag
.
Name
),
SkipBcVersionCheck
:
false
,
...
...
eth/backend.go
浏览文件 @
5caff3bc
...
...
@@ -11,8 +11,6 @@ import (
"strings"
"time"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
...
...
@@ -26,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
...
...
@@ -58,7 +57,6 @@ var (
type
Config
struct
{
Name
string
ProtocolVersion
int
NetworkId
int
GenesisNonce
int
...
...
@@ -226,7 +224,6 @@ type Ethereum struct {
autodagquit
chan
bool
etherbase
common
.
Address
clientVersion
string
ethVersionId
int
netVersionId
int
shhVersionId
int
}
...
...
@@ -291,6 +288,11 @@ func New(config *Config) (*Ethereum, error) {
nodeDb
:=
filepath
.
Join
(
config
.
DataDir
,
"nodes"
)
// Perform database sanity checks
/*
// The databases were previously tied to protocol versions. Currently we
// are moving away from this decision as approaching Frontier. The below
// check was left in for now but should eventually be just dropped.
d, _ := blockDb.Get([]byte("ProtocolVersion"))
protov := int(common.NewValue(d).Uint())
if protov != config.ProtocolVersion && protov != 0 {
...
...
@@ -298,7 +300,8 @@ func New(config *Config) (*Ethereum, error) {
return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, config.ProtocolVersion, path)
}
saveProtocolVersion(blockDb, config.ProtocolVersion)
glog
.
V
(
logger
.
Info
)
.
Infof
(
"Protocol Version: %v, Network Id: %v"
,
config
.
ProtocolVersion
,
config
.
NetworkId
)
*/
glog
.
V
(
logger
.
Info
)
.
Infof
(
"Protocol Versions: %v, Network Id: %v"
,
ProtocolVersions
,
config
.
NetworkId
)
if
!
config
.
SkipBcVersionCheck
{
b
,
_
:=
blockDb
.
Get
([]
byte
(
"BlockchainVersion"
))
...
...
@@ -321,7 +324,6 @@ func New(config *Config) (*Ethereum, error) {
DataDir
:
config
.
DataDir
,
etherbase
:
common
.
HexToAddress
(
config
.
Etherbase
),
clientVersion
:
config
.
Name
,
// TODO should separate from Name
ethVersionId
:
config
.
ProtocolVersion
,
netVersionId
:
config
.
NetworkId
,
NatSpec
:
config
.
NatSpec
,
MinerThreads
:
config
.
MinerThreads
,
...
...
@@ -345,7 +347,7 @@ func New(config *Config) (*Ethereum, error) {
eth
.
blockProcessor
=
core
.
NewBlockProcessor
(
stateDb
,
extraDb
,
eth
.
pow
,
eth
.
chainManager
,
eth
.
EventMux
())
eth
.
chainManager
.
SetProcessor
(
eth
.
blockProcessor
)
eth
.
protocolManager
=
NewProtocolManager
(
config
.
ProtocolVersion
,
config
.
NetworkId
,
eth
.
eventMux
,
eth
.
txPool
,
eth
.
pow
,
eth
.
chainManager
)
eth
.
protocolManager
=
NewProtocolManager
(
config
.
NetworkId
,
eth
.
eventMux
,
eth
.
txPool
,
eth
.
pow
,
eth
.
chainManager
)
eth
.
miner
=
miner
.
New
(
eth
,
eth
.
EventMux
(),
eth
.
pow
)
eth
.
miner
.
SetGasPrice
(
config
.
GasPrice
)
...
...
@@ -358,7 +360,7 @@ func New(config *Config) (*Ethereum, error) {
if
err
!=
nil
{
return
nil
,
err
}
protocols
:=
[]
p2p
.
Protocol
{
eth
.
protocolManager
.
SubProtocol
}
protocols
:=
append
([]
p2p
.
Protocol
{},
eth
.
protocolManager
.
SubProtocols
...
)
if
config
.
Shh
{
protocols
=
append
(
protocols
,
eth
.
whisper
.
Protocol
())
}
...
...
@@ -495,7 +497,7 @@ func (s *Ethereum) PeerCount() int { return s.net.PeerCoun
func
(
s
*
Ethereum
)
Peers
()
[]
*
p2p
.
Peer
{
return
s
.
net
.
Peers
()
}
func
(
s
*
Ethereum
)
MaxPeers
()
int
{
return
s
.
net
.
MaxPeers
}
func
(
s
*
Ethereum
)
ClientVersion
()
string
{
return
s
.
clientVersion
}
func
(
s
*
Ethereum
)
EthVersion
()
int
{
return
s
.
ethVersionId
}
func
(
s
*
Ethereum
)
EthVersion
()
int
{
return
int
(
s
.
protocolManager
.
SubProtocols
[
0
]
.
Version
)
}
func
(
s
*
Ethereum
)
NetVersion
()
int
{
return
s
.
netVersionId
}
func
(
s
*
Ethereum
)
ShhVersion
()
int
{
return
s
.
shhVersionId
}
func
(
s
*
Ethereum
)
Downloader
()
*
downloader
.
Downloader
{
return
s
.
protocolManager
.
downloader
}
...
...
@@ -504,7 +506,7 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolMana
func
(
s
*
Ethereum
)
Start
()
error
{
jsonlogger
.
LogJson
(
&
logger
.
LogStarting
{
ClientString
:
s
.
net
.
Name
,
ProtocolVersion
:
ProtocolVersion
,
ProtocolVersion
:
s
.
EthVersion
()
,
})
err
:=
s
.
net
.
Start
()
if
err
!=
nil
{
...
...
@@ -560,7 +562,7 @@ done:
func
(
s
*
Ethereum
)
StartForTest
()
{
jsonlogger
.
LogJson
(
&
logger
.
LogStarting
{
ClientString
:
s
.
net
.
Name
,
ProtocolVersion
:
ProtocolVersion
,
ProtocolVersion
:
s
.
EthVersion
()
,
})
}
...
...
@@ -667,14 +669,20 @@ func (self *Ethereum) StopAutoDAG() {
glog
.
V
(
logger
.
Info
)
.
Infof
(
"Automatic pregeneration of ethash DAG OFF (ethash dir: %s)"
,
ethash
.
DefaultDir
)
}
func
saveProtocolVersion
(
db
common
.
Database
,
protov
int
)
{
/*
// The databases were previously tied to protocol versions. Currently we
// are moving away from this decision as approaching Frontier. The below
// code was left in for now but should eventually be just dropped.
func saveProtocolVersion(db common.Database, protov int) {
d, _ := db.Get([]byte("ProtocolVersion"))
protocolVersion := common.NewValue(d).Uint()
if protocolVersion == 0 {
db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes())
}
}
}
*/
func
saveBlockchainVersion
(
db
common
.
Database
,
bcVersion
int
)
{
d
,
_
:=
db
.
Get
([]
byte
(
"BlockchainVersion"
))
...
...
eth/downloader/downloader.go
浏览文件 @
5caff3bc
...
...
@@ -19,9 +19,14 @@ import (
"gopkg.in/fatih/set.v0"
)
const
(
eth60
=
60
// Constant to check for old protocol support
eth61
=
61
// Constant to check for new protocol support
)
var
(
MinHashFetch
=
512
// Minimum amount of hashes to not consider a peer stalling
MaxHashFetch
=
2048
// Amount of hashes to be fetched per retrieval request
MaxHashFetch
=
512
// Amount of hashes to be fetched per retrieval request
MaxBlockFetch
=
128
// Amount of blocks to be fetched per retrieval request
hashTTL
=
5
*
time
.
Second
// Time it takes for a hash request to time out
...
...
@@ -29,6 +34,7 @@ var (
blockHardTTL
=
3
*
blockSoftTTL
// Maximum time allowance before a block request is considered expired
crossCheckCycle
=
time
.
Second
// Period after which to check for expired cross checks
maxQueuedHashes
=
256
*
1024
// Maximum number of hashes to queue for import (DOS protection)
maxBannedHashes
=
4096
// Number of bannable hashes before phasing old ones out
maxBlockProcess
=
256
// Number of blocks to import at once into the chain
)
...
...
@@ -58,6 +64,9 @@ type hashCheckFn func(common.Hash) bool
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type
blockRetrievalFn
func
(
common
.
Hash
)
*
types
.
Block
// headRetrievalFn is a callback type for retrieving the head block from the local chain.
type
headRetrievalFn
func
()
*
types
.
Block
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
type
chainInsertFn
func
(
types
.
Blocks
)
(
int
,
error
)
...
...
@@ -98,6 +107,7 @@ type Downloader struct {
// Callbacks
hasBlock
hashCheckFn
// Checks if a block is present in the chain
getBlock
blockRetrievalFn
// Retrieves a block from the chain
headBlock
headRetrievalFn
// Retrieves the head block from the chain
insertChain
chainInsertFn
// Injects a batch of blocks into the chain
dropPeer
peerDropFn
// Drops a peer for misbehaving
...
...
@@ -109,8 +119,9 @@ type Downloader struct {
// Channels
newPeerCh
chan
*
peer
hashCh
chan
hashPack
blockCh
chan
blockPack
hashCh
chan
hashPack
// Channel receiving inbound hashes
blockCh
chan
blockPack
// Channel receiving inbound blocks
processCh
chan
bool
// Channel to signal the block fetcher of new or finished work
cancelCh
chan
struct
{}
// Channel to cancel mid-flight syncs
cancelLock
sync
.
RWMutex
// Lock to protect the cancel channel in delivers
...
...
@@ -123,7 +134,7 @@ type Block struct {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
func
New
(
mux
*
event
.
TypeMux
,
hasBlock
hashCheckFn
,
getBlock
blockRetrievalFn
,
insertChain
chainInsertFn
,
dropPeer
peerDropFn
)
*
Downloader
{
func
New
(
mux
*
event
.
TypeMux
,
hasBlock
hashCheckFn
,
getBlock
blockRetrievalFn
,
headBlock
headRetrievalFn
,
insertChain
chainInsertFn
,
dropPeer
peerDropFn
)
*
Downloader
{
// Create the base downloader
downloader
:=
&
Downloader
{
mux
:
mux
,
...
...
@@ -131,11 +142,13 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, in
peers
:
newPeerSet
(),
hasBlock
:
hasBlock
,
getBlock
:
getBlock
,
headBlock
:
headBlock
,
insertChain
:
insertChain
,
dropPeer
:
dropPeer
,
newPeerCh
:
make
(
chan
*
peer
,
1
),
hashCh
:
make
(
chan
hashPack
,
1
),
blockCh
:
make
(
chan
blockPack
,
1
),
processCh
:
make
(
chan
bool
,
1
),
}
// Inject all the known bad hashes
downloader
.
banned
=
set
.
New
()
...
...
@@ -175,7 +188,7 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
func
(
d
*
Downloader
)
RegisterPeer
(
id
string
,
head
common
.
Hash
,
getHashes
h
ashFetcherFn
,
getBlocks
blockFetcherFn
)
error
{
func
(
d
*
Downloader
)
RegisterPeer
(
id
string
,
version
int
,
head
common
.
Hash
,
getRelHashes
relativeHashFetcherFn
,
getAbsHashes
absoluteH
ashFetcherFn
,
getBlocks
blockFetcherFn
)
error
{
// If the peer wants to send a banned hash, reject
if
d
.
banned
.
Has
(
head
)
{
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Register rejected, head hash banned:"
,
id
)
...
...
@@ -183,7 +196,7 @@ func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFet
}
// Otherwise try to construct and register the peer
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Registering peer"
,
id
)
if
err
:=
d
.
peers
.
Register
(
newPeer
(
id
,
head
,
get
Hashes
,
getBlocks
));
err
!=
nil
{
if
err
:=
d
.
peers
.
Register
(
newPeer
(
id
,
version
,
head
,
getRelHashes
,
getAbs
Hashes
,
getBlocks
));
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infoln
(
"Register failed:"
,
err
)
return
err
}
...
...
@@ -289,13 +302,39 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
}
}()
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Synchronizing with the network using:"
,
p
.
id
)
if
err
=
d
.
fetchHashes
(
p
,
hash
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Synchronizing with the network using: %s, eth/%d"
,
p
.
id
,
p
.
version
)
switch
p
.
version
{
case
eth60
:
// Old eth/60 version, use reverse hash retrieval algorithm
if
err
=
d
.
fetchHashes60
(
p
,
hash
);
err
!=
nil
{
return
err
}
if
err
=
d
.
fetchBlocks60
();
err
!=
nil
{
return
err
}
case
eth61
:
// New eth/61, use forward, concurrent hash and block retrieval algorithm
number
,
err
:=
d
.
findAncestor
(
p
)
if
err
!=
nil
{
return
err
}
if
err
=
d
.
fetchBlocks
();
err
!=
nil
{
errc
:=
make
(
chan
error
,
2
)
go
func
()
{
errc
<-
d
.
fetchHashes
(
p
,
number
+
1
)
}()
go
func
()
{
errc
<-
d
.
fetchBlocks
(
number
+
1
)
}()
// If any fetcher fails, cancel the other
if
err
:=
<-
errc
;
err
!=
nil
{
d
.
cancel
()
<-
errc
return
err
}
return
<-
errc
default
:
// Something very wrong, stop right here
glog
.
V
(
logger
.
Error
)
.
Infof
(
"Unsupported eth protocol: %d"
,
p
.
version
)
return
errBadPeer
}
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Synchronization completed"
)
return
nil
...
...
@@ -326,10 +365,10 @@ func (d *Downloader) Terminate() {
d
.
cancel
()
}
// fetchHa
hes
starts retrieving hashes backwards from a specific peer and hash,
// fetchHa
shes60
starts retrieving hashes backwards from a specific peer and hash,
// up until it finds a common ancestor. If the source peer times out, alternative
// ones are tried for continuation.
func
(
d
*
Downloader
)
fetchHashes
(
p
*
peer
,
h
common
.
Hash
)
error
{
func
(
d
*
Downloader
)
fetchHashes
60
(
p
*
peer
,
h
common
.
Hash
)
error
{
var
(
start
=
time
.
Now
()
active
=
p
// active peer will help determine the current active peer
...
...
@@ -346,12 +385,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
<-
timeout
.
C
// timeout channel should be initially empty.
getHashes
:=
func
(
from
common
.
Hash
)
{
go
active
.
getHashes
(
from
)
go
active
.
get
Rel
Hashes
(
from
)
timeout
.
Reset
(
hashTTL
)
}
// Add the hash to the queue, and start hash retrieval.
d
.
queue
.
Insert
([]
common
.
Hash
{
h
})
d
.
queue
.
Insert
([]
common
.
Hash
{
h
}
,
false
)
getHashes
(
h
)
attempted
[
p
.
id
]
=
true
...
...
@@ -377,7 +416,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
if
d
.
banned
.
Has
(
hash
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer (%s) sent a known invalid chain"
,
active
.
id
)
d
.
queue
.
Insert
(
hashPack
.
hashes
[
:
index
+
1
])
d
.
queue
.
Insert
(
hashPack
.
hashes
[
:
index
+
1
]
,
false
)
if
err
:=
d
.
banBlocks
(
active
.
id
,
hash
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Failed to ban batch of blocks: %v"
,
err
)
}
...
...
@@ -395,7 +434,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
}
}
// Insert all the new hashes, but only continue if got something useful
inserts
:=
d
.
queue
.
Insert
(
hashPack
.
hashes
)
inserts
:=
d
.
queue
.
Insert
(
hashPack
.
hashes
,
false
)
if
len
(
inserts
)
==
0
&&
!
done
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer (%s) responded with stale hashes"
,
active
.
id
)
return
errBadPeer
...
...
@@ -422,9 +461,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
continue
}
// We're done, prepare the download cache and proceed pulling the blocks
offset
:=
0
offset
:=
uint64
(
0
)
if
block
:=
d
.
getBlock
(
head
);
block
!=
nil
{
offset
=
int
(
block
.
NumberU64
()
+
1
)
offset
=
block
.
NumberU64
()
+
1
}
d
.
queue
.
Prepare
(
offset
)
finished
=
true
...
...
@@ -481,10 +520,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
return
nil
}
// fetchBlocks iteratively downloads the entire schedules block-chain, taking
// fetchBlocks
60
iteratively downloads the entire schedules block-chain, taking
// any available peers, reserving a chunk of blocks for each, wait for delivery
// and periodically checking for timeouts.
func
(
d
*
Downloader
)
fetchBlocks
()
error
{
func
(
d
*
Downloader
)
fetchBlocks
60
()
error
{
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
"Downloading"
,
d
.
queue
.
Pending
(),
"block(s)"
)
start
:=
time
.
Now
()
...
...
@@ -619,6 +658,332 @@ out:
return
nil
}
// findAncestor tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
// In the rare scenario when we ended up on a long soft fork (i.e. none of the
// head blocks match), we do a binary search to find the common ancestor.
func
(
d
*
Downloader
)
findAncestor
(
p
*
peer
)
(
uint64
,
error
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: looking for common ancestor"
,
p
)
// Request out head blocks to short circuit ancestor location
head
:=
d
.
headBlock
()
.
NumberU64
()
from
:=
int64
(
head
)
-
int64
(
MaxHashFetch
)
if
from
<
0
{
from
=
0
}
go
p
.
getAbsHashes
(
uint64
(
from
),
MaxHashFetch
)
// Wait for the remote response to the head fetch
number
,
hash
:=
uint64
(
0
),
common
.
Hash
{}
timeout
:=
time
.
After
(
hashTTL
)
for
finished
:=
false
;
!
finished
;
{
select
{
case
<-
d
.
cancelCh
:
return
0
,
errCancelHashFetch
case
hashPack
:=
<-
d
.
hashCh
:
// Discard anything not from the origin peer
if
hashPack
.
peerId
!=
p
.
id
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Received hashes from incorrect peer(%s)"
,
hashPack
.
peerId
)
break
}
// Make sure the peer actually gave something valid
hashes
:=
hashPack
.
hashes
if
len
(
hashes
)
==
0
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: empty head hash set"
,
p
)
return
0
,
errEmptyHashSet
}
// Check if a common ancestor was found
finished
=
true
for
i
:=
len
(
hashes
)
-
1
;
i
>=
0
;
i
--
{
if
d
.
hasBlock
(
hashes
[
i
])
{
number
,
hash
=
uint64
(
from
)
+
uint64
(
i
),
hashes
[
i
]
break
}
}
case
<-
d
.
blockCh
:
// Out of bounds blocks received, ignore them
case
<-
timeout
:
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: head hash timeout"
,
p
)
return
0
,
errTimeout
}
}
// If the head fetch already found an ancestor, return
if
!
common
.
EmptyHash
(
hash
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: common ancestor: #%d [%x]"
,
p
,
number
,
hash
[
:
4
])
return
number
,
nil
}
// Ancestor not found, we need to binary search over our chain
start
,
end
:=
uint64
(
0
),
head
for
start
+
1
<
end
{
// Split our chain interval in two, and request the hash to cross check
check
:=
(
start
+
end
)
/
2
timeout
:=
time
.
After
(
hashTTL
)
go
p
.
getAbsHashes
(
uint64
(
check
),
1
)
// Wait until a reply arrives to this request
for
arrived
:=
false
;
!
arrived
;
{
select
{
case
<-
d
.
cancelCh
:
return
0
,
errCancelHashFetch
case
hashPack
:=
<-
d
.
hashCh
:
// Discard anything not from the origin peer
if
hashPack
.
peerId
!=
p
.
id
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Received hashes from incorrect peer(%s)"
,
hashPack
.
peerId
)
break
}
// Make sure the peer actually gave something valid
hashes
:=
hashPack
.
hashes
if
len
(
hashes
)
!=
1
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: invalid search hash set (%d)"
,
p
,
len
(
hashes
))
return
0
,
errBadPeer
}
arrived
=
true
// Modify the search interval based on the response
block
:=
d
.
getBlock
(
hashes
[
0
])
if
block
==
nil
{
end
=
check
break
}
if
block
.
NumberU64
()
!=
check
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: non requested hash #%d [%x], instead of #%d"
,
p
,
block
.
NumberU64
(),
block
.
Hash
()
.
Bytes
()[
:
4
],
check
)
return
0
,
errBadPeer
}
start
=
check
case
<-
d
.
blockCh
:
// Out of bounds blocks received, ignore them
case
<-
timeout
:
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: search hash timeout"
,
p
)
return
0
,
errTimeout
}
}
}
return
start
,
nil
}
// fetchHashes keeps retrieving hashes from the requested number, until no more
// are returned, potentially throttling on the way.
func
(
d
*
Downloader
)
fetchHashes
(
p
*
peer
,
from
uint64
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: downloading hashes from #%d"
,
p
,
from
)
// Create a timeout timer, and the associated hash fetcher
timeout
:=
time
.
NewTimer
(
0
)
// timer to dump a non-responsive active peer
<-
timeout
.
C
// timeout channel should be initially empty
defer
timeout
.
Stop
()
getHashes
:=
func
(
from
uint64
)
{
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: fetching %d hashes from #%d"
,
p
,
MaxHashFetch
,
from
)
go
p
.
getAbsHashes
(
from
,
MaxHashFetch
)
timeout
.
Reset
(
hashTTL
)
}
// Start pulling hashes, until all are exhausted
getHashes
(
from
)
for
{
select
{
case
<-
d
.
cancelCh
:
return
errCancelHashFetch
case
hashPack
:=
<-
d
.
hashCh
:
// Make sure the active peer is giving us the hashes
if
hashPack
.
peerId
!=
p
.
id
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Received hashes from incorrect peer(%s)"
,
hashPack
.
peerId
)
break
}
timeout
.
Stop
()
// If no more hashes are inbound, notify the block fetcher and return
if
len
(
hashPack
.
hashes
)
==
0
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: no available hashes"
,
p
)
select
{
case
d
.
processCh
<-
false
:
case
<-
d
.
cancelCh
:
}
return
nil
}
// Otherwise insert all the new hashes, aborting in case of junk
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: inserting %d hashes from #%d"
,
p
,
len
(
hashPack
.
hashes
),
from
)
inserts
:=
d
.
queue
.
Insert
(
hashPack
.
hashes
,
true
)
if
len
(
inserts
)
!=
len
(
hashPack
.
hashes
)
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: stale hashes"
,
p
)
return
errBadPeer
}
// Notify the block fetcher of new hashes, but stop if queue is full
cont
:=
d
.
queue
.
Pending
()
<
maxQueuedHashes
select
{
case
d
.
processCh
<-
cont
:
default
:
}
if
!
cont
{
return
nil
}
// Queue not yet full, fetch the next batch
from
+=
uint64
(
len
(
hashPack
.
hashes
))
getHashes
(
from
)
case
<-
timeout
.
C
:
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: hash request timed out"
,
p
)
return
errTimeout
}
}
}
// fetchBlocks iteratively downloads the scheduled hashes, taking any available
// peers, reserving a chunk of blocks for each, waiting for delivery and also
// periodically checking for timeouts.
func
(
d
*
Downloader
)
fetchBlocks
(
from
uint64
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Downloading blocks from #%d"
,
from
)
defer
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Block download terminated"
)
// Create a timeout timer for scheduling expiration tasks
ticker
:=
time
.
NewTicker
(
100
*
time
.
Millisecond
)
defer
ticker
.
Stop
()
update
:=
make
(
chan
struct
{},
1
)
// Prepare the queue and fetch blocks until the hash fetcher's done
d
.
queue
.
Prepare
(
from
)
finished
:=
false
for
{
select
{
case
<-
d
.
cancelCh
:
return
errCancelBlockFetch
case
blockPack
:=
<-
d
.
blockCh
:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if
peer
:=
d
.
peers
.
Peer
(
blockPack
.
peerId
);
peer
!=
nil
{
// Deliver the received chunk of blocks, and demote in case of errors
err
:=
d
.
queue
.
Deliver
(
blockPack
.
peerId
,
blockPack
.
blocks
)
switch
err
{
case
nil
:
// If no blocks were delivered, demote the peer (need the delivery above)
if
len
(
blockPack
.
blocks
)
==
0
{
peer
.
Demote
()
peer
.
SetIdle
()
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%s: no blocks delivered"
,
peer
)
break
}
// All was successful, promote the peer and potentially start processing
peer
.
Promote
()
peer
.
SetIdle
()
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%s: delivered %d blocks"
,
peer
,
len
(
blockPack
.
blocks
))
go
d
.
process
()
case
errInvalidChain
:
// The hash chain is invalid (blocks are not ordered properly), abort
return
err
case
errNoFetchesPending
:
// Peer probably timed out with its delivery but came through
// in the end, demote, but allow to to pull from this peer.
peer
.
Demote
()
peer
.
SetIdle
()
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%s: out of bound delivery"
,
peer
)
case
errStaleDelivery
:
// Delivered something completely else than requested, usually
// caused by a timeout and delivery during a new sync cycle.
// Don't set it to idle as the original request should still be
// in flight.
peer
.
Demote
()
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%s: stale delivery"
,
peer
)
default
:
// Peer did something semi-useful, demote but keep it around
peer
.
Demote
()
peer
.
SetIdle
()
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%s: delivery partially failed: %v"
,
peer
,
err
)
go
d
.
process
()
}
}
// Blocks arrived, try to update the progress
select
{
case
update
<-
struct
{}{}
:
default
:
}
case
cont
:=
<-
d
.
processCh
:
// The hash fetcher sent a continuation flag, check if it's done
if
!
cont
{
finished
=
true
}
// Hashes arrive, try to update the progress
select
{
case
update
<-
struct
{}{}
:
default
:
}
case
<-
ticker
.
C
:
// Sanity check update the progress
select
{
case
update
<-
struct
{}{}
:
default
:
}
case
<-
update
:
// Short circuit if we lost all our peers
if
d
.
peers
.
Len
()
==
0
{
return
errNoPeers
}
// Check for block request timeouts and demote the responsible peers
for
_
,
pid
:=
range
d
.
queue
.
Expire
(
blockHardTTL
)
{
if
peer
:=
d
.
peers
.
Peer
(
pid
);
peer
!=
nil
{
peer
.
Demote
()
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%s: block delivery timeout"
,
peer
)
}
}
// If there's noting more to fetch, wait or terminate
if
d
.
queue
.
Pending
()
==
0
{
if
d
.
queue
.
InFlight
()
==
0
&&
finished
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Block fetching completed"
)
return
nil
}
break
}
// Send a download request to all idle peers, until throttled
for
_
,
peer
:=
range
d
.
peers
.
IdlePeers
()
{
// Short circuit if throttling activated
if
d
.
queue
.
Throttle
()
{
break
}
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
request
:=
d
.
queue
.
Reserve
(
peer
,
peer
.
Capacity
())
if
request
==
nil
{
continue
}
if
glog
.
V
(
logger
.
Detail
)
{
glog
.
Infof
(
"%s: requesting %d blocks"
,
peer
,
len
(
request
.
Hashes
))
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if
err
:=
peer
.
Fetch
(
request
);
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Infof
(
"%v: fetch failed, rescheduling"
,
peer
)
d
.
queue
.
Cancel
(
request
)
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if
!
d
.
queue
.
Throttle
()
&&
d
.
queue
.
InFlight
()
==
0
{
return
errPeersUnavailable
}
}
}
}
// banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes,
// and bans the head of the retrieved batch.
//
...
...
eth/downloader/downloader_test.go
浏览文件 @
5caff3bc
此差异已折叠。
点击以展开。
eth/downloader/peer.go
浏览文件 @
5caff3bc
...
...
@@ -15,7 +15,8 @@ import (
"gopkg.in/fatih/set.v0"
)
type
hashFetcherFn
func
(
common
.
Hash
)
error
type
relativeHashFetcherFn
func
(
common
.
Hash
)
error
type
absoluteHashFetcherFn
func
(
uint64
,
int
)
error
type
blockFetcherFn
func
([]
common
.
Hash
)
error
var
(
...
...
@@ -37,20 +38,25 @@ type peer struct {
ignored
*
set
.
Set
// Set of hashes not to request (didn't have previously)
getHashes
hashFetcherFn
// Method to retrieve a batch of hashes (mockable for testing)
getBlocks
blockFetcherFn
// Method to retrieve a batch of blocks (mockable for testing)
getRelHashes
relativeHashFetcherFn
// Method to retrieve a batch of hashes from an origin hash
getAbsHashes
absoluteHashFetcherFn
// Method to retrieve a batch of hashes from an absolute position
getBlocks
blockFetcherFn
// Method to retrieve a batch of blocks
version
int
// Eth protocol version number to switch strategies
}
// newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms.
func
newPeer
(
id
string
,
head
common
.
Hash
,
getHashes
h
ashFetcherFn
,
getBlocks
blockFetcherFn
)
*
peer
{
func
newPeer
(
id
string
,
version
int
,
head
common
.
Hash
,
getRelHashes
relativeHashFetcherFn
,
getAbsHashes
absoluteH
ashFetcherFn
,
getBlocks
blockFetcherFn
)
*
peer
{
return
&
peer
{
id
:
id
,
head
:
head
,
capacity
:
1
,
getHashes
:
getHashes
,
getRelHashes
:
getRelHashes
,
getAbsHashes
:
getAbsHashes
,
getBlocks
:
getBlocks
,
ignored
:
set
.
New
(),
version
:
version
,
}
}
...
...
eth/downloader/queue.go
浏览文件 @
5caff3bc
...
...
@@ -40,9 +40,9 @@ type queue struct {
pendPool
map
[
string
]
*
fetchRequest
// Currently pending block retrieval operations
blockPool
map
[
common
.
Hash
]
int
// Hash-set of the downloaded data blocks, mapping to cache indexes
blockPool
map
[
common
.
Hash
]
uint64
// Hash-set of the downloaded data blocks, mapping to cache indexes
blockCache
[]
*
Block
// Downloaded but not yet delivered blocks
blockOffset
int
// Offset of the first cached block in the block-chain
blockOffset
uint64
// Offset of the first cached block in the block-chain
lock
sync
.
RWMutex
}
...
...
@@ -53,7 +53,7 @@ func newQueue() *queue {
hashPool
:
make
(
map
[
common
.
Hash
]
int
),
hashQueue
:
prque
.
New
(),
pendPool
:
make
(
map
[
string
]
*
fetchRequest
),
blockPool
:
make
(
map
[
common
.
Hash
]
int
),
blockPool
:
make
(
map
[
common
.
Hash
]
uint64
),
blockCache
:
make
([]
*
Block
,
blockCacheLimit
),
}
}
...
...
@@ -69,7 +69,7 @@ func (q *queue) Reset() {
q
.
pendPool
=
make
(
map
[
string
]
*
fetchRequest
)
q
.
blockPool
=
make
(
map
[
common
.
Hash
]
int
)
q
.
blockPool
=
make
(
map
[
common
.
Hash
]
uint64
)
q
.
blockOffset
=
0
q
.
blockCache
=
make
([]
*
Block
,
blockCacheLimit
)
}
...
...
@@ -130,7 +130,7 @@ func (q *queue) Has(hash common.Hash) bool {
// Insert adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered.
func
(
q
*
queue
)
Insert
(
hashes
[]
common
.
Hash
)
[]
common
.
Hash
{
func
(
q
*
queue
)
Insert
(
hashes
[]
common
.
Hash
,
fifo
bool
)
[]
common
.
Hash
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
...
...
@@ -147,8 +147,12 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash {
inserts
=
append
(
inserts
,
hash
)
q
.
hashPool
[
hash
]
=
q
.
hashCounter
if
fifo
{
q
.
hashQueue
.
Push
(
hash
,
-
float32
(
q
.
hashCounter
))
// Lowest gets schedules first
}
else
{
q
.
hashQueue
.
Push
(
hash
,
float32
(
q
.
hashCounter
))
// Highest gets schedules first
}
}
return
inserts
}
...
...
@@ -175,7 +179,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block {
return
nil
}
// Return the block if it's still available in the cache
if
q
.
blockOffset
<=
index
&&
index
<
q
.
blockOffset
+
len
(
q
.
blockCache
)
{
if
q
.
blockOffset
<=
index
&&
index
<
q
.
blockOffset
+
uint64
(
len
(
q
.
blockCache
)
)
{
return
q
.
blockCache
[
index
-
q
.
blockOffset
]
}
return
nil
...
...
@@ -202,7 +206,7 @@ func (q *queue) TakeBlocks() []*Block {
for
k
,
n
:=
len
(
q
.
blockCache
)
-
len
(
blocks
),
len
(
q
.
blockCache
);
k
<
n
;
k
++
{
q
.
blockCache
[
k
]
=
nil
}
q
.
blockOffset
+=
len
(
blocks
)
q
.
blockOffset
+=
uint64
(
len
(
blocks
)
)
return
blocks
}
...
...
@@ -318,7 +322,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
continue
}
// If a requested block falls out of the range, the hash chain is invalid
index
:=
int
(
block
.
NumberU64
())
-
q
.
blockOffset
index
:=
int
(
int64
(
block
.
NumberU64
())
-
int64
(
q
.
blockOffset
))
if
index
>=
len
(
q
.
blockCache
)
||
index
<
0
{
return
errInvalidChain
}
...
...
@@ -329,7 +333,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
delete
(
request
.
Hashes
,
hash
)
delete
(
q
.
hashPool
,
hash
)
q
.
blockPool
[
hash
]
=
int
(
block
.
NumberU64
()
)
q
.
blockPool
[
hash
]
=
block
.
NumberU64
(
)
}
// Return all failed or missing fetches to the queue
for
hash
,
index
:=
range
request
.
Hashes
{
...
...
@@ -346,7 +350,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
}
// Prepare configures the block cache offset to allow accepting inbound blocks.
func
(
q
*
queue
)
Prepare
(
offset
int
)
{
func
(
q
*
queue
)
Prepare
(
offset
uint64
)
{
q
.
lock
.
Lock
()
defer
q
.
lock
.
Unlock
()
...
...
eth/handler.go
浏览文件 @
5caff3bc
...
...
@@ -49,7 +49,7 @@ type ProtocolManager struct {
fetcher
*
fetcher
.
Fetcher
peers
*
peerSet
SubProtocol
p2p
.
Protocol
SubProtocol
s
[]
p2p
.
Protocol
eventMux
*
event
.
TypeMux
txSub
event
.
Subscription
...
...
@@ -68,8 +68,8 @@ type ProtocolManager struct {
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func
NewProtocolManager
(
protocolVersion
,
networkId
int
,
mux
*
event
.
TypeMux
,
txpool
txPool
,
pow
pow
.
PoW
,
chainman
*
core
.
ChainManager
)
*
ProtocolManager
{
// Create the protocol manager
and initialize peer handler
s
func
NewProtocolManager
(
networkId
int
,
mux
*
event
.
TypeMux
,
txpool
txPool
,
pow
pow
.
PoW
,
chainman
*
core
.
ChainManager
)
*
ProtocolManager
{
// Create the protocol manager
with the base field
s
manager
:=
&
ProtocolManager
{
eventMux
:
mux
,
txpool
:
txpool
,
...
...
@@ -79,18 +79,24 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo
txsyncCh
:
make
(
chan
*
txsync
),
quitSync
:
make
(
chan
struct
{}),
}
manager
.
SubProtocol
=
p2p
.
Protocol
{
// Initiate a sub-protocol for every implemented version we can handle
manager
.
SubProtocols
=
make
([]
p2p
.
Protocol
,
len
(
ProtocolVersions
))
for
i
:=
0
;
i
<
len
(
manager
.
SubProtocols
);
i
++
{
version
:=
ProtocolVersions
[
i
]
manager
.
SubProtocols
[
i
]
=
p2p
.
Protocol
{
Name
:
"eth"
,
Version
:
uint
(
protocolVersion
)
,
Length
:
ProtocolLength
,
Version
:
version
,
Length
:
ProtocolLengths
[
i
]
,
Run
:
func
(
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
error
{
peer
:=
manager
.
newPeer
(
protocolVersion
,
networkId
,
p
,
rw
)
peer
:=
manager
.
newPeer
(
int
(
version
)
,
networkId
,
p
,
rw
)
manager
.
newPeerCh
<-
peer
return
manager
.
handle
(
peer
)
},
}
}
// Construct the different synchronisation mechanisms
manager
.
downloader
=
downloader
.
New
(
manager
.
eventMux
,
manager
.
chainman
.
HasBlock
,
manager
.
chainman
.
GetBlock
,
manager
.
chainman
.
InsertChain
,
manager
.
removePeer
)
manager
.
downloader
=
downloader
.
New
(
manager
.
eventMux
,
manager
.
chainman
.
HasBlock
,
manager
.
chainman
.
GetBlock
,
manager
.
chainman
.
CurrentBlock
,
manager
.
chainman
.
InsertChain
,
manager
.
removePeer
)
validator
:=
func
(
block
*
types
.
Block
,
parent
*
types
.
Block
)
error
{
return
core
.
ValidateHeader
(
pow
,
block
.
Header
(),
parent
,
true
)
...
...
@@ -152,31 +158,32 @@ func (pm *ProtocolManager) Stop() {
}
func
(
pm
*
ProtocolManager
)
newPeer
(
pv
,
nv
int
,
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
*
peer
{
td
,
current
,
genesis
:=
pm
.
chainman
.
Status
()
return
newPeer
(
pv
,
nv
,
genesis
,
current
,
td
,
p
,
rw
)
return
newPeer
(
pv
,
nv
,
p
,
rw
)
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
// this function terminates, the peer is disconnected.
func
(
pm
*
ProtocolManager
)
handle
(
p
*
peer
)
error
{
// Execute the Ethereum handshake.
if
err
:=
p
.
handleStatus
();
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: peer connected [%s]"
,
p
,
p
.
Name
())
// Execute the Ethereum handshake
td
,
head
,
genesis
:=
pm
.
chainman
.
Status
()
if
err
:=
p
.
Handshake
(
td
,
head
,
genesis
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: handshake failed: %v"
,
p
,
err
)
return
err
}
// Register the peer locally.
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"Adding peer"
,
p
.
id
)
// Register the peer locally
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: adding peer"
,
p
)
if
err
:=
pm
.
peers
.
Register
(
p
);
err
!=
nil
{
glog
.
V
(
logger
.
Error
)
.
Info
ln
(
"Addition failed:"
,
err
)
glog
.
V
(
logger
.
Error
)
.
Info
f
(
"%v: addition failed: %v"
,
p
,
err
)
return
err
}
defer
pm
.
removePeer
(
p
.
id
)
// Register the peer in the downloader. If the downloader
// considers it banned, we disconnect.
if
err
:=
pm
.
downloader
.
RegisterPeer
(
p
.
id
,
p
.
Head
(),
p
.
requestHashes
,
p
.
requestBlocks
);
err
!=
nil
{
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if
err
:=
pm
.
downloader
.
RegisterPeer
(
p
.
id
,
p
.
version
,
p
.
Head
(),
p
.
RequestHashes
,
p
.
RequestHashesFromNumber
,
p
.
RequestBlocks
);
err
!=
nil
{
return
err
}
// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
pm
.
syncTransactions
(
p
)
...
...
@@ -184,13 +191,17 @@ func (pm *ProtocolManager) handle(p *peer) error {
// main loop. handle incoming messages.
for
{
if
err
:=
pm
.
handleMsg
(
p
);
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"%v: message handling failed: %v"
,
p
,
err
)
return
err
}
}
return
nil
}
// handleMsg is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func
(
pm
*
ProtocolManager
)
handleMsg
(
p
*
peer
)
error
{
// Read the next message from the remote peer, and ensure it's fully consumed
msg
,
err
:=
p
.
rw
.
ReadMsg
()
if
err
!=
nil
{
return
err
...
...
@@ -198,58 +209,69 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if
msg
.
Size
>
ProtocolMaxMsgSize
{
return
errResp
(
ErrMsgTooLarge
,
"%v > %v"
,
msg
.
Size
,
ProtocolMaxMsgSize
)
}
// make sure that the payload has been fully consumed
defer
msg
.
Discard
()
// Handle the message depending on its contents
switch
msg
.
Code
{
case
StatusMsg
:
// Status messages should never arrive after the handshake
return
errResp
(
ErrExtraStatusMsg
,
"uncontrolled status message"
)
case
Tx
Msg
:
//
TODO: rework using lazy RLP stream
var
txs
[]
*
types
.
Transaction
if
err
:=
msg
.
Decode
(
&
txs
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"
msg
%v: %v"
,
msg
,
err
)
case
GetBlockHashes
Msg
:
//
Retrieve the number of hashes to return and from which origin hash
var
request
getBlockHashesData
if
err
:=
msg
.
Decode
(
&
request
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"%v: %v"
,
msg
,
err
)
}
for
i
,
tx
:=
range
txs
{
if
tx
==
nil
{
return
errResp
(
ErrDecode
,
"transaction %d is nil"
,
i
)
if
request
.
Amount
>
uint64
(
downloader
.
MaxHashFetch
)
{
request
.
Amount
=
uint64
(
downloader
.
MaxHashFetch
)
}
jsonlogger
.
LogJson
(
&
logger
.
EthTxReceived
{
TxHash
:
tx
.
Hash
()
.
Hex
(),
RemoteId
:
p
.
ID
()
.
String
(),
}
)
// Retrieve the hashes from the block chain and return them
hashes
:=
pm
.
chainman
.
GetBlockHashesFromHash
(
request
.
Hash
,
request
.
Amount
)
if
len
(
hashes
)
==
0
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"invalid block hash %x"
,
request
.
Hash
.
Bytes
()[
:
4
]
)
}
pm
.
txpool
.
AddTransactions
(
tx
s
)
return
p
.
SendBlockHashes
(
hashe
s
)
case
GetBlockHashesMsg
:
var
request
getBlockHashesMsgData
case
GetBlockHashesFromNumberMsg
:
// Retrieve and decode the number of hashes to return and from which origin number
var
request
getBlockHashesFromNumberData
if
err
:=
msg
.
Decode
(
&
request
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"
->msg
%v: %v"
,
msg
,
err
)
return
errResp
(
ErrDecode
,
"%v: %v"
,
msg
,
err
)
}
if
request
.
Amount
>
uint64
(
downloader
.
MaxHashFetch
)
{
request
.
Amount
=
uint64
(
downloader
.
MaxHashFetch
)
}
hashes
:=
pm
.
chainman
.
GetBlockHashesFromHash
(
request
.
Hash
,
request
.
Amount
)
if
glog
.
V
(
logger
.
Debug
)
{
if
len
(
hashes
)
==
0
{
glog
.
Infof
(
"invalid block hash %x"
,
request
.
Hash
.
Bytes
()[
:
4
])
// Calculate the last block that should be retrieved, and short circuit if unavailable
last
:=
pm
.
chainman
.
GetBlockByNumber
(
request
.
Number
+
request
.
Amount
-
1
)
if
last
==
nil
{
last
=
pm
.
chainman
.
CurrentBlock
()
request
.
Amount
=
last
.
NumberU64
()
-
request
.
Number
+
1
}
if
last
.
NumberU64
()
<
request
.
Number
{
return
p
.
SendBlockHashes
(
nil
)
}
// Retrieve the hashes from the last block backwards, reverse and return
hashes
:=
[]
common
.
Hash
{
last
.
Hash
()}
hashes
=
append
(
hashes
,
pm
.
chainman
.
GetBlockHashesFromHash
(
last
.
Hash
(),
request
.
Amount
-
1
)
...
)
// returns either requested hashes or nothing (i.e. not found)
return
p
.
sendBlockHashes
(
hashes
)
for
i
:=
0
;
i
<
len
(
hashes
)
/
2
;
i
++
{
hashes
[
i
],
hashes
[
len
(
hashes
)
-
1
-
i
]
=
hashes
[
len
(
hashes
)
-
1
-
i
],
hashes
[
i
]
}
return
p
.
SendBlockHashes
(
hashes
)
case
BlockHashesMsg
:
// A batch of hashes arrived to one of our previous requests
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
,
uint64
(
msg
.
Size
))
reqHashInPacketsMeter
.
Mark
(
1
)
var
hashes
[]
common
.
Hash
if
err
:=
msgStream
.
Decode
(
&
hashes
);
err
!=
nil
{
break
}
reqHashInTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
// Deliver them all to the downloader for queuing
err
:=
pm
.
downloader
.
DeliverHashes
(
p
.
id
,
hashes
)
if
err
!=
nil
{
glog
.
V
(
logger
.
Debug
)
.
Infoln
(
err
)
...
...
@@ -293,13 +315,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
list
=
list
[
:
len
(
list
)
-
2
]
+
"]"
glog
.
Infof
(
"
Peer %s: no blocks found for requested hashes %s"
,
p
.
id
,
list
)
glog
.
Infof
(
"
%v: no blocks found for requested hashes %s"
,
p
,
list
)
}
return
p
.
s
endBlocks
(
blocks
)
return
p
.
S
endBlocks
(
blocks
)
case
BlocksMsg
:
// Decode the arrived block message
msgStream
:=
rlp
.
NewStream
(
msg
.
Payload
,
uint64
(
msg
.
Size
))
reqBlockInPacketsMeter
.
Mark
(
1
)
var
blocks
[]
*
types
.
Block
if
err
:=
msgStream
.
Decode
(
&
blocks
);
err
!=
nil
{
...
...
@@ -307,8 +330,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
blocks
=
nil
}
// Update the receive timestamp of each block
for
i
:=
0
;
i
<
len
(
blocks
);
i
++
{
blocks
[
i
]
.
ReceivedAt
=
msg
.
ReceivedAt
for
_
,
block
:=
range
blocks
{
reqBlockInTrafficMeter
.
Mark
(
block
.
Size
()
.
Int64
())
block
.
ReceivedAt
=
msg
.
ReceivedAt
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if
blocks
:=
pm
.
fetcher
.
Filter
(
blocks
);
len
(
blocks
)
>
0
{
...
...
@@ -323,9 +347,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if
err
:=
msgStream
.
Decode
(
&
hashes
);
err
!=
nil
{
break
}
propHashInPacketsMeter
.
Mark
(
1
)
propHashInTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
// Mark the hashes as present at the remote node
for
_
,
hash
:=
range
hashes
{
p
.
blockHashes
.
Add
(
hash
)
p
.
MarkBlock
(
hash
)
p
.
SetHead
(
hash
)
}
// Schedule all the unknown hashes for retrieval
...
...
@@ -336,15 +363,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for
_
,
hash
:=
range
unknown
{
pm
.
fetcher
.
Notify
(
p
.
id
,
hash
,
time
.
Now
(),
p
.
r
equestBlocks
)
pm
.
fetcher
.
Notify
(
p
.
id
,
hash
,
time
.
Now
(),
p
.
R
equestBlocks
)
}
case
NewBlockMsg
:
// Retrieve and decode the propagated block
var
request
newBlock
Msg
Data
var
request
newBlockData
if
err
:=
msg
.
Decode
(
&
request
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"%v: %v"
,
msg
,
err
)
}
propBlockInPacketsMeter
.
Mark
(
1
)
propBlockInTrafficMeter
.
Mark
(
request
.
Block
.
Size
()
.
Int64
())
if
err
:=
request
.
Block
.
ValidateFields
();
err
!=
nil
{
return
errResp
(
ErrDecode
,
"block validation %v: %v"
,
msg
,
err
)
}
...
...
@@ -360,7 +390,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
RemoteId
:
p
.
ID
()
.
String
(),
})
// Mark the peer as owning the block and schedule it for import
p
.
blockHashes
.
Add
(
request
.
Block
.
Hash
())
p
.
MarkBlock
(
request
.
Block
.
Hash
())
p
.
SetHead
(
request
.
Block
.
Hash
())
pm
.
fetcher
.
Enqueue
(
p
.
id
,
request
.
Block
)
...
...
@@ -369,6 +399,29 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p
.
SetTd
(
request
.
TD
)
go
pm
.
synchronise
(
p
)
case
TxMsg
:
// Transactions arrived, parse all of them and deliver to the pool
var
txs
[]
*
types
.
Transaction
if
err
:=
msg
.
Decode
(
&
txs
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
}
propTxnInPacketsMeter
.
Mark
(
1
)
for
i
,
tx
:=
range
txs
{
// Validate and mark the remote transaction
if
tx
==
nil
{
return
errResp
(
ErrDecode
,
"transaction %d is nil"
,
i
)
}
p
.
MarkTransaction
(
tx
.
Hash
())
// Log it's arrival for later analysis
propTxnInTrafficMeter
.
Mark
(
tx
.
Size
()
.
Int64
())
jsonlogger
.
LogJson
(
&
logger
.
EthTxReceived
{
TxHash
:
tx
.
Hash
()
.
Hex
(),
RemoteId
:
p
.
ID
()
.
String
(),
})
}
pm
.
txpool
.
AddTransactions
(
txs
)
default
:
return
errResp
(
ErrInvalidMsgCode
,
"%v"
,
msg
.
Code
)
}
...
...
@@ -385,28 +438,27 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
if
propagate
{
transfer
:=
peers
[
:
int
(
math
.
Sqrt
(
float64
(
len
(
peers
))))]
for
_
,
peer
:=
range
transfer
{
peer
.
s
endNewBlock
(
block
)
peer
.
S
endNewBlock
(
block
)
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"propagated block %x to %d peers in %v"
,
hash
[
:
4
],
len
(
transfer
),
time
.
Since
(
block
.
ReceivedAt
))
}
// Otherwise if the block is indeed in out own chain, announce it
if
pm
.
chainman
.
HasBlock
(
hash
)
{
for
_
,
peer
:=
range
peers
{
peer
.
s
endNewBlockHashes
([]
common
.
Hash
{
hash
})
peer
.
S
endNewBlockHashes
([]
common
.
Hash
{
hash
})
}
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"announced block %x to %d peers in %v"
,
hash
[
:
4
],
len
(
peers
),
time
.
Since
(
block
.
ReceivedAt
))
}
}
// BroadcastTx will propagate the block to its connected peers. It will sort
// out which peers do not contain the block in their block set and will do a
// sqrt(peers) to determine the amount of peers we broadcast to.
// BroadcastTx will propagate a transaction to all peers which are not known to
// already have the given transaction.
func
(
pm
*
ProtocolManager
)
BroadcastTx
(
hash
common
.
Hash
,
tx
*
types
.
Transaction
)
{
// Broadcast transaction to a batch of peers not knowing about it
peers
:=
pm
.
peers
.
PeersWithoutTx
(
hash
)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for
_
,
peer
:=
range
peers
{
peer
.
sendTransaction
(
tx
)
peer
.
SendTransactions
(
types
.
Transactions
{
tx
}
)
}
glog
.
V
(
logger
.
Detail
)
.
Infoln
(
"broadcast tx to"
,
len
(
peers
),
"peers"
)
}
...
...
eth/metrics.go
0 → 100644
浏览文件 @
5caff3bc
package
eth
import
(
"github.com/ethereum/go-ethereum/metrics"
)
var
(
propTxnInPacketsMeter
=
metrics
.
NewMeter
(
"eth/prop/txns/in/packets"
)
propTxnInTrafficMeter
=
metrics
.
NewMeter
(
"eth/prop/txns/in/traffic"
)
propTxnOutPacketsMeter
=
metrics
.
NewMeter
(
"eth/prop/txns/out/packets"
)
propTxnOutTrafficMeter
=
metrics
.
NewMeter
(
"eth/prop/txns/out/traffic"
)
propHashInPacketsMeter
=
metrics
.
NewMeter
(
"eth/prop/hashes/in/packets"
)
propHashInTrafficMeter
=
metrics
.
NewMeter
(
"eth/prop/hashes/in/traffic"
)
propHashOutPacketsMeter
=
metrics
.
NewMeter
(
"eth/prop/hashes/out/packets"
)
propHashOutTrafficMeter
=
metrics
.
NewMeter
(
"eth/prop/hashes/out/traffic"
)
propBlockInPacketsMeter
=
metrics
.
NewMeter
(
"eth/prop/blocks/in/packets"
)
propBlockInTrafficMeter
=
metrics
.
NewMeter
(
"eth/prop/blocks/in/traffic"
)
propBlockOutPacketsMeter
=
metrics
.
NewMeter
(
"eth/prop/blocks/out/packets"
)
propBlockOutTrafficMeter
=
metrics
.
NewMeter
(
"eth/prop/blocks/out/traffic"
)
reqHashInPacketsMeter
=
metrics
.
NewMeter
(
"eth/req/hashes/in/packets"
)
reqHashInTrafficMeter
=
metrics
.
NewMeter
(
"eth/req/hashes/in/traffic"
)
reqHashOutPacketsMeter
=
metrics
.
NewMeter
(
"eth/req/hashes/out/packets"
)
reqHashOutTrafficMeter
=
metrics
.
NewMeter
(
"eth/req/hashes/out/traffic"
)
reqBlockInPacketsMeter
=
metrics
.
NewMeter
(
"eth/req/blocks/in/packets"
)
reqBlockInTrafficMeter
=
metrics
.
NewMeter
(
"eth/req/blocks/in/traffic"
)
reqBlockOutPacketsMeter
=
metrics
.
NewMeter
(
"eth/req/blocks/out/packets"
)
reqBlockOutTrafficMeter
=
metrics
.
NewMeter
(
"eth/req/blocks/out/traffic"
)
)
eth/peer.go
浏览文件 @
5caff3bc
...
...
@@ -20,25 +20,18 @@ var (
errNotRegistered
=
errors
.
New
(
"peer is not registered"
)
)
type
statusMsgData
struct
{
ProtocolVersion
uint32
NetworkId
uint32
TD
*
big
.
Int
CurrentBlock
common
.
Hash
GenesisBlock
common
.
Hash
}
type
getBlockHashesMsgData
struct
{
Hash
common
.
Hash
Amount
uint64
}
const
(
maxKnownTxs
=
32768
// Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownBlocks
=
1024
// Maximum block hashes to keep in the known list (prevent DOS)
)
type
peer
struct
{
*
p2p
.
Peer
rw
p2p
.
MsgReadWriter
protv
,
netid
int
version
int
// Protocol version negotiated
network
int
// Network ID being on
id
string
...
...
@@ -46,27 +39,21 @@ type peer struct {
td
*
big
.
Int
lock
sync
.
RWMutex
genesis
,
ourHash
common
.
Hash
ourTd
*
big
.
Int
txHashes
*
set
.
Set
blockHashes
*
set
.
Set
knownTxs
*
set
.
Set
// Set of transaction hashes known to be known by this peer
knownBlocks
*
set
.
Set
// Set of block hashes known to be known by this peer
}
func
newPeer
(
protv
,
netid
int
,
genesis
,
head
common
.
Hash
,
td
*
big
.
I
nt
,
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
*
peer
{
func
newPeer
(
version
,
network
i
nt
,
p
*
p2p
.
Peer
,
rw
p2p
.
MsgReadWriter
)
*
peer
{
id
:=
p
.
ID
()
return
&
peer
{
Peer
:
p
,
rw
:
rw
,
genesis
:
genesis
,
ourHash
:
head
,
ourTd
:
td
,
protv
:
protv
,
netid
:
netid
,
version
:
version
,
network
:
network
,
id
:
fmt
.
Sprintf
(
"%x"
,
id
[
:
8
]),
txHashe
s
:
set
.
New
(),
blockHashe
s
:
set
.
New
(),
knownTx
s
:
set
.
New
(),
knownBlock
s
:
set
.
New
(),
}
}
...
...
@@ -103,68 +90,110 @@ func (p *peer) SetTd(td *big.Int) {
p
.
td
.
Set
(
td
)
}
// sendTransactions sends transactions to the peer and includes the hashes
// in it's tx hash set for future reference. The tx hash will allow the
// manager to check whether the peer has already received this particular
// transaction
func
(
p
*
peer
)
sendTransactions
(
txs
types
.
Transactions
)
error
{
for
_
,
tx
:=
range
txs
{
p
.
txHashes
.
Add
(
tx
.
Hash
())
// MarkBlock marks a block as known for the peer, ensuring that the block will
// never be propagated to this particular peer.
func
(
p
*
peer
)
MarkBlock
(
hash
common
.
Hash
)
{
// If we reached the memory allowance, drop a previously known block hash
for
p
.
knownBlocks
.
Size
()
>=
maxKnownBlocks
{
p
.
knownBlocks
.
Pop
()
}
p
.
knownBlocks
.
Add
(
hash
)
}
// MarkTransaction marks a transaction as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func
(
p
*
peer
)
MarkTransaction
(
hash
common
.
Hash
)
{
// If we reached the memory allowance, drop a previously known transaction hash
for
p
.
knownTxs
.
Size
()
>=
maxKnownTxs
{
p
.
knownTxs
.
Pop
()
}
p
.
knownTxs
.
Add
(
hash
)
}
// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func
(
p
*
peer
)
SendTransactions
(
txs
types
.
Transactions
)
error
{
propTxnOutPacketsMeter
.
Mark
(
1
)
for
_
,
tx
:=
range
txs
{
propTxnOutTrafficMeter
.
Mark
(
tx
.
Size
()
.
Int64
())
p
.
knownTxs
.
Add
(
tx
.
Hash
())
}
return
p2p
.
Send
(
p
.
rw
,
TxMsg
,
txs
)
}
func
(
p
*
peer
)
sendBlockHashes
(
hashes
[]
common
.
Hash
)
error
{
// SendBlockHashes sends a batch of known hashes to the remote peer.
func
(
p
*
peer
)
SendBlockHashes
(
hashes
[]
common
.
Hash
)
error
{
reqHashOutPacketsMeter
.
Mark
(
1
)
reqHashOutTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
return
p2p
.
Send
(
p
.
rw
,
BlockHashesMsg
,
hashes
)
}
func
(
p
*
peer
)
sendBlocks
(
blocks
[]
*
types
.
Block
)
error
{
// SendBlocks sends a batch of blocks to the remote peer.
func
(
p
*
peer
)
SendBlocks
(
blocks
[]
*
types
.
Block
)
error
{
reqBlockOutPacketsMeter
.
Mark
(
1
)
for
_
,
block
:=
range
blocks
{
reqBlockOutTrafficMeter
.
Mark
(
block
.
Size
()
.
Int64
())
}
return
p2p
.
Send
(
p
.
rw
,
BlocksMsg
,
blocks
)
}
func
(
p
*
peer
)
sendNewBlockHashes
(
hashes
[]
common
.
Hash
)
error
{
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func
(
p
*
peer
)
SendNewBlockHashes
(
hashes
[]
common
.
Hash
)
error
{
propHashOutPacketsMeter
.
Mark
(
1
)
propHashOutTrafficMeter
.
Mark
(
int64
(
32
*
len
(
hashes
)))
for
_
,
hash
:=
range
hashes
{
p
.
blockHashe
s
.
Add
(
hash
)
p
.
knownBlock
s
.
Add
(
hash
)
}
return
p2p
.
Send
(
p
.
rw
,
NewBlockHashesMsg
,
hashes
)
}
func
(
p
*
peer
)
sendNewBlock
(
block
*
types
.
Block
)
error
{
p
.
blockHashes
.
Add
(
block
.
Hash
())
// SendNewBlock propagates an entire block to a remote peer.
func
(
p
*
peer
)
SendNewBlock
(
block
*
types
.
Block
)
error
{
propBlockOutPacketsMeter
.
Mark
(
1
)
propBlockOutTrafficMeter
.
Mark
(
block
.
Size
()
.
Int64
())
p
.
knownBlocks
.
Add
(
block
.
Hash
())
return
p2p
.
Send
(
p
.
rw
,
NewBlockMsg
,
[]
interface
{}{
block
,
block
.
Td
})
}
func
(
p
*
peer
)
sendTransaction
(
tx
*
types
.
Transaction
)
error
{
p
.
txHashes
.
Add
(
tx
.
Hash
())
return
p2p
.
Send
(
p
.
rw
,
TxMsg
,
[]
*
types
.
Transaction
{
tx
})
// RequestHashes fetches a batch of hashes from a peer, starting at from, going
// towards the genesis block.
func
(
p
*
peer
)
RequestHashes
(
from
common
.
Hash
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer [%s] fetching hashes (%d) from %x...
\n
"
,
p
.
id
,
downloader
.
MaxHashFetch
,
from
[
:
4
])
return
p2p
.
Send
(
p
.
rw
,
GetBlockHashesMsg
,
getBlockHashesData
{
from
,
uint64
(
downloader
.
MaxHashFetch
)})
}
func
(
p
*
peer
)
requestHashes
(
from
common
.
Hash
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"[%s] fetching hashes (%d) %x...
\n
"
,
p
.
id
,
downloader
.
MaxHashFetch
,
from
[
:
4
])
return
p2p
.
Send
(
p
.
rw
,
GetBlockHashesMsg
,
getBlockHashesMsgData
{
from
,
uint64
(
downloader
.
MaxHashFetch
)})
// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at the
// requested block number, going upwards towards the genesis block.
func
(
p
*
peer
)
RequestHashesFromNumber
(
from
uint64
,
count
int
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"Peer [%s] fetching hashes (%d) from #%d...
\n
"
,
p
.
id
,
count
,
from
)
return
p2p
.
Send
(
p
.
rw
,
GetBlockHashesFromNumberMsg
,
getBlockHashesFromNumberData
{
from
,
uint64
(
count
)})
}
func
(
p
*
peer
)
requestBlocks
(
hashes
[]
common
.
Hash
)
error
{
// RequestBlocks fetches a batch of blocks corresponding to the specified hashes.
func
(
p
*
peer
)
RequestBlocks
(
hashes
[]
common
.
Hash
)
error
{
glog
.
V
(
logger
.
Debug
)
.
Infof
(
"[%s] fetching %v blocks
\n
"
,
p
.
id
,
len
(
hashes
))
return
p2p
.
Send
(
p
.
rw
,
GetBlocksMsg
,
hashes
)
}
func
(
p
*
peer
)
handleStatus
()
error
{
// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func
(
p
*
peer
)
Handshake
(
td
*
big
.
Int
,
head
common
.
Hash
,
genesis
common
.
Hash
)
error
{
// Send out own handshake in a new thread
errc
:=
make
(
chan
error
,
1
)
go
func
()
{
errc
<-
p2p
.
Send
(
p
.
rw
,
StatusMsg
,
&
status
Msg
Data
{
ProtocolVersion
:
uint32
(
p
.
protv
),
NetworkId
:
uint32
(
p
.
net
id
),
TD
:
p
.
ourT
d
,
CurrentBlock
:
p
.
ourHash
,
GenesisBlock
:
p
.
genesis
,
errc
<-
p2p
.
Send
(
p
.
rw
,
StatusMsg
,
&
statusData
{
ProtocolVersion
:
uint32
(
p
.
version
),
NetworkId
:
uint32
(
p
.
net
work
),
TD
:
t
d
,
CurrentBlock
:
head
,
GenesisBlock
:
genesis
,
})
}()
// read and handle remote status
// In the mean time retrieve the remote status message
msg
,
err
:=
p
.
rw
.
ReadMsg
()
if
err
!=
nil
{
return
err
...
...
@@ -175,31 +204,32 @@ func (p *peer) handleStatus() error {
if
msg
.
Size
>
ProtocolMaxMsgSize
{
return
errResp
(
ErrMsgTooLarge
,
"%v > %v"
,
msg
.
Size
,
ProtocolMaxMsgSize
)
}
var
status
status
Msg
Data
// Decode the handshake and make sure everything matches
var
status
statusData
if
err
:=
msg
.
Decode
(
&
status
);
err
!=
nil
{
return
errResp
(
ErrDecode
,
"msg %v: %v"
,
msg
,
err
)
}
if
status
.
GenesisBlock
!=
p
.
genesis
{
return
errResp
(
ErrGenesisBlockMismatch
,
"%x (!= %x)"
,
status
.
GenesisBlock
,
p
.
genesis
)
if
status
.
GenesisBlock
!=
genesis
{
return
errResp
(
ErrGenesisBlockMismatch
,
"%x (!= %x)"
,
status
.
GenesisBlock
,
genesis
)
}
if
int
(
status
.
NetworkId
)
!=
p
.
netid
{
return
errResp
(
ErrNetworkIdMismatch
,
"%d (!= %d)"
,
status
.
NetworkId
,
p
.
netid
)
if
int
(
status
.
NetworkId
)
!=
p
.
network
{
return
errResp
(
ErrNetworkIdMismatch
,
"%d (!= %d)"
,
status
.
NetworkId
,
p
.
network
)
}
if
int
(
status
.
ProtocolVersion
)
!=
p
.
protv
{
return
errResp
(
ErrProtocolVersionMismatch
,
"%d (!= %d)"
,
status
.
ProtocolVersion
,
p
.
protv
)
if
int
(
status
.
ProtocolVersion
)
!=
p
.
version
{
return
errResp
(
ErrProtocolVersionMismatch
,
"%d (!= %d)"
,
status
.
ProtocolVersion
,
p
.
version
)
}
// Set the total difficulty of the peer
p
.
td
=
status
.
TD
// set the best hash of the peer
p
.
head
=
status
.
CurrentBlock
// Configure the remote peer, and sanity check out handshake too
p
.
td
,
p
.
head
=
status
.
TD
,
status
.
CurrentBlock
return
<-
errc
}
// String implements fmt.Stringer.
func
(
p
*
peer
)
String
()
string
{
return
fmt
.
Sprintf
(
"Peer %s [%s]"
,
p
.
id
,
fmt
.
Sprintf
(
"eth/%2d"
,
p
.
version
),
)
}
// peerSet represents the collection of active peers currently participating in
// the Ethereum sub-protocol.
type
peerSet
struct
{
...
...
@@ -264,7 +294,7 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
list
:=
make
([]
*
peer
,
0
,
len
(
ps
.
peers
))
for
_
,
p
:=
range
ps
.
peers
{
if
!
p
.
blockHashe
s
.
Has
(
hash
)
{
if
!
p
.
knownBlock
s
.
Has
(
hash
)
{
list
=
append
(
list
,
p
)
}
}
...
...
@@ -279,7 +309,7 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
list
:=
make
([]
*
peer
,
0
,
len
(
ps
.
peers
))
for
_
,
p
:=
range
ps
.
peers
{
if
!
p
.
txHashe
s
.
Has
(
hash
)
{
if
!
p
.
knownTx
s
.
Has
(
hash
)
{
list
=
append
(
list
,
p
)
}
}
...
...
eth/protocol.go
浏览文件 @
5caff3bc
...
...
@@ -7,11 +7,15 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
// Supported versions of the eth protocol (first is primary).
var
ProtocolVersions
=
[]
uint
{
61
,
60
}
// Number of implemented message corresponding to different protocol versions.
var
ProtocolLengths
=
[]
uint64
{
9
,
8
}
const
(
ProtocolVersion
=
60
NetworkId
=
0
ProtocolLength
=
uint64
(
8
)
ProtocolMaxMsgSize
=
10
*
1024
*
1024
ProtocolMaxMsgSize
=
10
*
1024
*
1024
// Maximum cap on the size of a protocol message
)
// eth protocol message codes
...
...
@@ -24,6 +28,7 @@ const (
GetBlocksMsg
BlocksMsg
NewBlockMsg
GetBlockHashesFromNumberMsg
)
type
errCode
int
...
...
@@ -72,8 +77,31 @@ type chainManager interface {
Status
()
(
td
*
big
.
Int
,
currentBlock
common
.
Hash
,
genesisBlock
common
.
Hash
)
}
// message structs used for RLP serialization
type
newBlockMsgData
struct
{
// statusData is the network packet for the status message.
type
statusData
struct
{
ProtocolVersion
uint32
NetworkId
uint32
TD
*
big
.
Int
CurrentBlock
common
.
Hash
GenesisBlock
common
.
Hash
}
// getBlockHashesData is the network packet for the hash based block retrieval
// message.
type
getBlockHashesData
struct
{
Hash
common
.
Hash
Amount
uint64
}
// getBlockHashesFromNumberData is the network packet for the number based block
// retrieval message.
type
getBlockHashesFromNumberData
struct
{
Number
uint64
Amount
uint64
}
// newBlockData is the network packet for the block propagation message.
type
newBlockData
struct
{
Block
*
types
.
Block
TD
*
big
.
Int
}
eth/protocol_test.go
浏览文件 @
5caff3bc
...
...
@@ -39,15 +39,15 @@ func TestStatusMsgErrors(t *testing.T) {
wantError
:
errResp
(
ErrNoStatusMsg
,
"first msg has code 2 (!= 0)"
),
},
{
code
:
StatusMsg
,
data
:
status
Msg
Data
{
10
,
NetworkId
,
td
,
currentBlock
,
genesis
},
code
:
StatusMsg
,
data
:
statusData
{
10
,
NetworkId
,
td
,
currentBlock
,
genesis
},
wantError
:
errResp
(
ErrProtocolVersionMismatch
,
"10 (!= 0)"
),
},
{
code
:
StatusMsg
,
data
:
status
MsgData
{
ProtocolVersion
,
999
,
td
,
currentBlock
,
genesis
},
code
:
StatusMsg
,
data
:
status
Data
{
uint32
(
ProtocolVersions
[
0
])
,
999
,
td
,
currentBlock
,
genesis
},
wantError
:
errResp
(
ErrNetworkIdMismatch
,
"999 (!= 0)"
),
},
{
code
:
StatusMsg
,
data
:
status
MsgData
{
ProtocolVersion
,
NetworkId
,
td
,
currentBlock
,
common
.
Hash
{
3
}},
code
:
StatusMsg
,
data
:
status
Data
{
uint32
(
ProtocolVersions
[
0
])
,
NetworkId
,
td
,
currentBlock
,
common
.
Hash
{
3
}},
wantError
:
errResp
(
ErrGenesisBlockMismatch
,
"0300000000000000000000000000000000000000000000000000000000000000 (!= %x)"
,
genesis
),
},
}
...
...
@@ -167,7 +167,7 @@ func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *Protocol
db
,
_
=
ethdb
.
NewMemDatabase
()
chain
,
_
=
core
.
NewChainManager
(
core
.
GenesisBlock
(
0
,
db
),
db
,
db
,
core
.
FakePow
{},
em
)
txpool
=
&
fakeTxPool
{
added
:
txAdded
}
pm
=
NewProtocolManager
(
ProtocolVersion
,
0
,
em
,
txpool
,
core
.
FakePow
{},
chain
)
pm
=
NewProtocolManager
(
0
,
em
,
txpool
,
core
.
FakePow
{},
chain
)
)
pm
.
Start
()
return
pm
...
...
@@ -188,7 +188,7 @@ func newTestPeer(pm *ProtocolManager) (*testPeer, <-chan error) {
func
(
p
*
testPeer
)
handshake
(
t
*
testing
.
T
)
{
td
,
currentBlock
,
genesis
:=
p
.
pm
.
chainman
.
Status
()
msg
:=
&
status
Msg
Data
{
msg
:=
&
statusData
{
ProtocolVersion
:
uint32
(
p
.
pm
.
protVer
),
NetworkId
:
uint32
(
p
.
pm
.
netId
),
TD
:
td
,
...
...
eth/sync.go
浏览文件 @
5caff3bc
...
...
@@ -20,14 +20,6 @@ const (
txsyncPackSize
=
100
*
1024
)
// blockAnnounce is the hash notification of the availability of a new block in
// the network.
type
blockAnnounce
struct
{
hash
common
.
Hash
peer
*
peer
time
time
.
Time
}
type
txsync
struct
{
p
*
peer
txs
[]
*
types
.
Transaction
...
...
@@ -75,7 +67,7 @@ func (pm *ProtocolManager) txsyncLoop() {
// Send the pack in the background.
glog
.
V
(
logger
.
Detail
)
.
Infof
(
"%v: sending %d transactions (%v)"
,
s
.
p
.
Peer
,
len
(
pack
.
txs
),
size
)
sending
=
true
go
func
()
{
done
<-
pack
.
p
.
s
endTransactions
(
pack
.
txs
)
}()
go
func
()
{
done
<-
pack
.
p
.
S
endTransactions
(
pack
.
txs
)
}()
}
// pick chooses the next pending sync.
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录