Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
YottaChain
YTBP
提交
4a881463
Y
YTBP
项目概览
YottaChain
/
YTBP
通知
0
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Y
YTBP
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
4a881463
编写于
9月 08, 2017
作者:
B
Bart Wyatt
浏览文件
操作
浏览文件
下载
差异文件
merging changes to master @
90862d5d
上级
738e29a5
90862d5d
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
234 addition
and
105 deletion
+234
-105
plugins/net_plugin/net_plugin.cpp
plugins/net_plugin/net_plugin.cpp
+234
-105
未找到文件。
plugins/net_plugin/net_plugin.cpp
浏览文件 @
4a881463
...
...
@@ -80,21 +80,17 @@ namespace eos {
* Index by start_block
*/
struct
sync_state
{
uint32_t
start_block
=
0
;
uint32_t
end_block
=
0
;
uint32_t
last
=
0
;
///< last sent or received
sync_state
(
uint32_t
start
=
0
,
uint32_t
end
=
0
,
uint32_t
last_acted
=
0
)
:
start_block
(
start
),
end_block
(
end
),
last
(
last_acted
),
start_time
(
time_point
::
now
()),
block_cache
()
{}
uint32_t
start_block
;
uint32_t
end_block
;
uint32_t
last
;
///< last sent or received
time_point
start_time
;
///< time request made or received
vector
<
vector
<
char
>
>
block_cache
;
};
struct
by_start_block
;
typedef
multi_index_container
<
sync_state
,
indexed_by
<
ordered_unique
<
tag
<
by_start_block
>
,
member
<
sync_state
,
uint32_t
,
&
sync_state
::
start_block
>
>
>
>
sync_request_index
;
struct
handshake_initializer
{
static
void
populate
(
handshake_message
&
hello
);
};
...
...
@@ -104,8 +100,8 @@ namespace eos {
connection
(
string
endpoint
)
:
block_state
(),
trx_state
(),
in_sync_state
(),
out_sync_state
(),
sync_received
(),
sync_requested
(),
socket
(
std
::
make_shared
<
tcp
::
socket
>
(
std
::
ref
(
app
().
get_io_service
()
))),
pending_message_size
(),
pending_message_buffer
(),
...
...
@@ -113,19 +109,18 @@ namespace eos {
last_handshake
(),
out_queue
(),
connecting
(
false
),
syncing
(
false
),
peer_addr
(
endpoint
)
{
wlog
(
"created connection to ${n}"
,
(
"n"
,
endpoint
)
);
pending_message_buffer
.
resize
(
1024
*
1024
*
4
);
auto
*
rnd
=
remote_node_id
.
data
();
rnd
[
0
]
=
0
;
initialize
();
}
connection
(
socket_ptr
s
)
:
block_state
(),
trx_state
(),
in_sync_state
(),
out_sync_state
(),
sync_received
(),
sync_requested
(),
socket
(
s
),
pending_message_size
(),
pending_message_buffer
(),
...
...
@@ -133,12 +128,13 @@ namespace eos {
last_handshake
(),
out_queue
(),
connecting
(
false
),
syncing
(
false
),
peer_addr
()
{
wlog
(
"created connection from client"
);
pending_message_buffer
.
resize
(
1024
*
1024
*
4
);
auto
*
rnd
=
remote_node_id
.
data
(
);
rnd
[
0
]
=
0
;
boost
::
asio
::
ip
::
tcp
::
no_delay
option
(
true
);
socket
->
set_option
(
option
);
initialize
()
;
}
~
connection
()
{
...
...
@@ -148,31 +144,49 @@ namespace eos {
wlog
(
"released connection to server at ${addr}"
,
(
"addr"
,
peer_addr
)
);
}
void
initialize
()
{
pending_message_buffer
.
resize
(
1024
*
1024
*
4
);
auto
*
rnd
=
remote_node_id
.
data
();
rnd
[
0
]
=
0
;
}
block_state_index
block_state
;
transaction_state_index
trx_state
;
sync_request_index
in_sync_state
;
// we are requesting info from this peer
sync_request_index
out_sync_state
;
// this peer is requesting info from us
vector
<
sync_state
>
sync_received
;
// we are requesting info from this peer
vector
<
sync_state
>
sync_requested
;
// this peer is requesting info from us
socket_ptr
socket
;
uint32_t
pending_message_size
;
vector
<
char
>
pending_message_buffer
;
vector
<
char
>
blk_buffer
;
size_t
message_size
;
fc
::
sha256
remote_node_id
;
handshake_message
last_handshake
;
std
::
deque
<
net_message
>
out_queue
;
uint32_t
mtu
;
bool
connecting
;
bool
syncing
;
string
peer_addr
;
bool
ready
()
{
return
(
socket
->
is_open
()
&&
!
connecting
);
}
bool
ready_and_willing
()
{
return
(
ready
()
&&
!
syncing
);
}
void
reset
()
{
in_sync_state
.
clear
();
out_sync_state
.
clear
();
sync_received
.
clear
();
sync_requested
.
clear
();
block_state
.
clear
();
trx_state
.
clear
();
}
void
close
()
{
connecting
=
false
;
syncing
=
false
;
out_queue
.
clear
();
if
(
socket
)
{
socket
->
close
();
...
...
@@ -194,7 +208,7 @@ namespace eos {
void
send_next_message
()
{
if
(
!
out_queue
.
size
()
)
{
if
(
out_sync_state
.
size
()
>
0
)
{
if
(
sync_requested
.
size
()
>
0
)
{
write_block_backlog
();
}
return
;
...
...
@@ -207,7 +221,6 @@ namespace eos {
fc
::
datastream
<
char
*>
ds
(
buffer
.
data
(),
buffer
.
size
()
);
ds
.
write
(
(
char
*
)
&
size
,
sizeof
(
size
)
);
fc
::
raw
::
pack
(
ds
,
m
);
boost
::
asio
::
async_write
(
*
socket
,
boost
::
asio
::
buffer
(
buffer
.
data
(),
buffer
.
size
()
),
[
this
,
buf
=
std
::
move
(
buffer
)](
boost
::
system
::
error_code
ec
,
std
::
size_t
bytes_transferred
)
{
if
(
ec
)
{
...
...
@@ -225,25 +238,21 @@ namespace eos {
void
write_block_backlog
(
)
{
chain_controller
&
cc
=
app
().
find_plugin
<
chain_plugin
>
()
->
chain
();
auto
ss
=
out_sync_state
.
begin
();
uint32_t
num
=
++
ss
.
get_node
()
->
value
().
last
;
auto
ss
=
sync_requested
.
begin
();
uint32_t
num
=
++
ss
->
last
;
//
get_node()->value().last;
ilog
(
"num = ${num} end = ${end}"
,(
"num"
,
num
)(
"end"
,
ss
->
end_block
));
if
(
num
>=
ss
->
end_block
)
{
out_sync_state
.
erase
(
ss
);
ilog
(
"out sync size = ${s}"
,(
"s"
,
out_sync_state
.
size
()));
sync_requested
.
erase
(
ss
);
ilog
(
"out sync size = ${s}"
,(
"s"
,
sync_requested
.
size
()));
}
try
{
fc
::
optional
<
signed_block
>
sb
=
cc
.
fetch_block_by_number
(
num
);
if
(
sb
)
{
// dlog("write backlog, block #${num}",("num",num));
send
(
*
sb
);
}
}
catch
(
...
)
{
wlog
(
"write loop exception"
);
}
if
(
out_sync_state
.
size
()
==
0
)
{
send_handshake
(
);
}
}
...
...
@@ -304,16 +313,17 @@ namespace eos {
string
p2p_address
;
vector
<
string
>
supplied_peers
;
std
::
set
<
fc
::
sha256
>
resolved_nodes
;
std
::
set
<
fc
::
sha256
>
learned_nodes
;
std
::
set
<
connection_ptr
>
connections
;
bool
done
=
false
;
uint32_t
sync_head
;
uint32_t
sync_req_head
;
uint32_t
sync_req_span
;
unique_ptr
<
boost
::
asio
::
steady_timer
>
connector_check
;
unique_ptr
<
boost
::
asio
::
steady_timer
>
transaction_check
;
boost
::
asio
::
steady_timer
::
duration
connector_period
;
boost
::
asio
::
steady_timer
::
duration
txn_exp_period
;
unique_ptr
<
boost
::
asio
::
steady_timer
>
connector_check
;
unique_ptr
<
boost
::
asio
::
steady_timer
>
transaction_check
;
boost
::
asio
::
steady_timer
::
duration
connector_period
;
boost
::
asio
::
steady_timer
::
duration
txn_exp_period
;
int16_t
network_version
;
chain_id_type
chain_id
;
...
...
@@ -331,7 +341,6 @@ namespace eos {
void
connect
(
connection_ptr
c
)
{
c
->
connecting
=
true
;
auto
host
=
c
->
peer_addr
.
substr
(
0
,
c
->
peer_addr
.
find
(
':'
)
);
auto
port
=
c
->
peer_addr
.
substr
(
host
.
size
()
+
1
,
host
.
size
()
);
idump
((
host
)(
port
));
...
...
@@ -351,6 +360,7 @@ namespace eos {
void
connect
(
connection_ptr
c
,
tcp
::
resolver
::
iterator
endpoint_itr
)
{
auto
current_endpoint
=
*
endpoint_itr
;
++
endpoint_itr
;
c
->
connecting
=
true
;
c
->
socket
->
async_connect
(
current_endpoint
,
[
c
,
endpoint_itr
,
this
]
(
const
boost
::
system
::
error_code
&
err
)
{
...
...
@@ -358,10 +368,13 @@ namespace eos {
start_session
(
c
);
}
else
{
if
(
endpoint_itr
!=
tcp
::
resolver
::
iterator
()
)
{
c
->
close
();
connect
(
c
,
endpoint_itr
);
}
else
{
elog
(
"connection failed to ${peer}: ${error}"
,
(
"peer"
,
c
->
peer_addr
)(
"error"
,
err
.
message
()));
c
->
connecting
=
false
;
c
->
close
();
}
}
}
);
...
...
@@ -374,7 +387,6 @@ namespace eos {
just_send_it_max
=
mtu
;
}
start_read_message
(
con
);
con
->
send_handshake
(
);
// for now, we can just use the application main loop.
...
...
@@ -436,29 +448,108 @@ namespace eos {
template
<
typename
VerifierFunc
>
void
send_all
(
const
net_message
&
msg
,
VerifierFunc
verify
)
{
for
(
auto
&
c
:
connections
)
{
if
(
c
->
out_sync_state
.
size
()
==
0
&&
verify
(
c
))
{
if
(
c
->
ready_and_willing
()
&&
verify
(
c
))
{
c
->
send
(
msg
);
}
}
}
void
shared_fetch
(
uint32_t
low
,
uint32_t
high
)
{
bool
get_sync_req
(
connection_ptr
c
)
{
if
(
sync_req_head
==
sync_head
)
{
return
true
;
}
uint32_t
first
=
sync_req_head
+
1
;
uint32_t
last
=
(
first
+
sync_req_span
-
1
);
if
(
last
<
sync_head
)
last
=
sync_head
;
c
->
sync_received
.
emplace_back
(
first
,
last
,
sync_req_head
);
sync_request_message
srm
=
{
first
,
last
};
c
->
send
(
srm
);
sync_req_head
=
last
;
return
(
sync_req_head
==
sync_head
);
}
void
set_sync_head
(
connection_ptr
c
,
uint32_t
target
)
{
bool
stable
=
(
sync_head
==
sync_req_head
);
if
(
stable
)
{
sync_req_head
=
chain_plug
->
chain
().
head_block_num
();
}
ilog
(
"Catching up with chain, our head is ${cc}, theirs is ${t}"
,
(
"cc"
,
sync_req_head
)(
"t"
,
target
));
if
(
target
>
sync_head
)
{
sync_head
=
target
;
if
(
stable
)
{
for
(
auto
&
ci
:
connections
)
{
if
(
ci
->
ready
())
{
if
(
get_sync_req
(
ci
))
{
break
;
}
}
}
}
else
{
get_sync_req
(
c
);
}
}
}
struct
postcache
:
public
fc
::
visitor
<
void
>
{
connection_ptr
c
;
chain_plugin
*
chain_plug
;
postcache
(
connection_ptr
conn
,
chain_plugin
*
cp
)
:
c
(
conn
),
chain_plug
(
cp
)
{}
void
operator
()(
const
signed_block
&
block
)
const
{
chain_plug
->
accept_block
(
block
,
true
);
}
template
<
typename
T
>
void
operator
()(
const
T
&
msg
)
const
{
//no-op
}
};
void
apply_cached_blocks
(
connection_ptr
conn
)
{
bool
keep_going
=
true
;
while
(
keep_going
)
{
keep_going
=
false
;
for
(
auto
&
c
:
connections
)
{
try
{
auto
ss
=
c
->
sync_received
.
begin
();
if
(
ss
==
c
->
sync_received
.
end
())
{
get_sync_req
(
c
);
continue
;
}
uint32_t
start
=
1
+
chain_plug
->
chain
().
head_block_num
();
if
(
start
==
ss
->
start_block
)
{
if
(
ss
->
last
<
start
||
ss
->
block_cache
.
empty
())
{
return
;
}
uint32_t
delta
=
high
-
low
;
uint32_t
count
=
connections
.
size
();
FC_ASSERT
(
count
>
0
);
uint32_t
span
=
delta
/
count
;
uint32_t
lastSpan
=
delta
-
(
span
*
(
count
-
1
));
for
(
auto
&
cx
:
connections
)
{
if
(
--
count
==
0
)
{
span
=
lastSpan
;
for
(
auto
&
blk
:
ss
->
block_cache
)
{
auto
block
=
fc
::
raw
::
unpack
<
net_message
>
(
blk
);
postcache
pc
(
c
,
chain_plug
);
block
.
visit
(
pc
);
++
start
;
ss
->
start_block
++
;
}
ss
->
block_cache
.
clear
();
if
(
start
>
ss
->
end_block
)
{
c
->
sync_received
.
erase
(
ss
);
if
(
c
->
sync_received
.
empty
())
{
get_sync_req
(
c
);
}
keep_going
=
true
;
}
}
}
catch
(...)
{
elog
(
"caught something trying to accept blocks"
);
// not a problem. We found the list but no blocks were cached
return
;
}
}
sync_state
req
=
{
low
+
1
,
low
+
span
,
low
,
time_point
::
now
()
};
cx
->
in_sync_state
.
insert
(
req
);
sync_request_message
srm
=
{
req
.
start_block
,
req
.
end_block
};
cx
->
send
(
srm
);
low
+=
span
;
}
}
...
...
@@ -474,7 +565,8 @@ namespace eos {
return
;
}
if
(
msg
.
network_version
!=
network_version
)
{
elog
(
"Peer network version does not match "
);
elog
(
"Peer network version does not match expected ${nv} but got ${mnv}"
,
(
"nv"
,
network_version
)(
"mnv"
,
msg
.
network_version
));
close
(
c
);
return
;
}
...
...
@@ -499,29 +591,17 @@ namespace eos {
}
}
if
(
c
->
remote_node_id
!=
msg
.
node_id
)
{
// c->reset();
c
->
remote_node_id
=
msg
.
node_id
;
}
uint32_t
head
=
cc
.
head_block_num
();
if
(
msg
.
head_num
>
head
)
{
s
hared_fetch
(
head
,
msg
.
head_num
);
s
et_sync_head
(
c
,
msg
.
head_num
);
}
if
(
c
->
remote_node_id
!=
msg
.
node_id
)
{
c
->
reset
();
if
(
c
->
peer_addr
.
length
()
>
0
)
{
auto
old_id
=
resolved_nodes
.
find
(
c
->
remote_node_id
);
if
(
old_id
!=
resolved_nodes
.
end
())
{
resolved_nodes
.
erase
(
old_id
);
}
resolved_nodes
.
insert
(
msg
.
node_id
);
}
else
{
auto
old_id
=
learned_nodes
.
find
(
c
->
remote_node_id
);
if
(
old_id
!=
learned_nodes
.
end
())
{
learned_nodes
.
erase
(
old_id
);
}
learned_nodes
.
insert
(
msg
.
node_id
);
}
c
->
remote_node_id
=
msg
.
node_id
;
else
{
c
->
syncing
=
head
!=
msg
.
head_num
;
}
c
->
last_handshake
=
msg
;
}
...
...
@@ -553,6 +633,7 @@ namespace eos {
// collect a list of transactions that were found.
// collect a second list of transaction ids that were not found but are otherwise known by some peers
// finally, what remains are future(?) transactions
vector
<
SignedTransaction
>
send_now
;
map
<
connection_ptr
,
vector
<
transaction_id_type
>
>
forward_to
;
auto
conn_ndx
=
connections
.
begin
();
...
...
@@ -601,8 +682,7 @@ namespace eos {
}
void
handle_message
(
connection_ptr
c
,
const
sync_request_message
&
msg
)
{
sync_state
req
=
{
msg
.
start_block
,
msg
.
end_block
,
msg
.
start_block
-
1
,
time_point
::
now
()};
c
->
out_sync_state
.
insert
(
req
);
c
->
sync_requested
.
emplace_back
(
msg
.
start_block
,
msg
.
end_block
,
msg
.
start_block
-
1
);
c
->
write_block_backlog
();
}
...
...
@@ -660,28 +740,59 @@ namespace eos {
void
handle_message
(
connection_ptr
c
,
const
signed_block
&
msg
)
{
chain_controller
&
cc
=
chain_plug
->
chain
();
if
(
cc
.
is_known_block
(
msg
.
id
()))
{
return
;
try
{
if
(
cc
.
is_known_block
(
msg
.
id
()))
{
return
;
}
}
catch
(...)
{
}
uint32_t
num
=
0
;
bool
syncing
=
c
->
in_sync_state
.
size
()
>
0
;
uint32_t
num
=
msg
.
block_num
();
bool
syncing
=
sync_head
>
cc
.
head_block_num
();
bool
get_more
=
false
;
if
(
syncing
)
{
for
(
auto
ss
=
c
->
in_sync_state
.
begin
();
ss
!=
c
->
in_sync_state
.
end
();
ss
++
)
{
if
(
msg
.
block_num
()
==
ss
->
last
+
1
&&
msg
.
block_num
()
<=
ss
->
end_block
)
{
num
=
msg
.
block_num
();
ss
.
get_node
()
->
value
().
last
=
num
;
break
;
for
(
auto
&
ss
:
c
->
sync_received
)
{
if
(
num
>
ss
.
end_block
)
{
continue
;
}
ss
.
last
=
num
;
get_more
=
num
==
ss
.
end_block
;
if
(
num
==
cc
.
head_block_num
()
+
1
)
{
try
{
chain_plug
->
accept_block
(
msg
,
true
);
auto
s0
=
c
->
sync_received
.
begin
();
if
(
s0
->
start_block
==
s0
->
end_block
)
{
c
->
sync_received
.
erase
(
s0
);
apply_cached_blocks
(
c
);
}
else
{
s0
->
start_block
++
;
}
}
catch
(
const
unlinkable_block_exception
&
ex
)
{
elog
(
"unable to accept block #${n} syncing"
,(
"n"
,
num
));
//close (c);
}
catch
(
const
assert_exception
&
ex
)
{
elog
(
"unable to accept block on assert exception #${n}"
,(
"n"
,
num
));
//close (c);
}
}
else
{
ss
.
block_cache
.
emplace_back
(
std
::
move
(
c
->
blk_buffer
));
}
break
;
}
if
(
num
==
0
)
{
elog
(
"syncing, got out-of-order block ${n}"
,(
"n"
,
msg
.
block_num
()));
//close (c);
return
;
if
(
chain_plug
->
chain
().
head_block_num
()
==
sync_head
)
{
handshake_message
hello
;
handshake_initializer
::
populate
(
hello
);
send_all
(
hello
,
[
c
](
connection_ptr
conn
)
->
bool
{
return
true
;
});
}
else
if
(
get_more
)
{
get_sync_req
(
c
);
}
return
;
}
else
{
send_all
(
msg
,
[
c
](
connection_ptr
conn
)
->
bool
{
send_all
(
msg
,
[
c
](
connection_ptr
conn
)
->
bool
{
return
(
c
!=
conn
);
});
...
...
@@ -697,6 +808,25 @@ namespace eos {
}
}
struct
precache
:
public
fc
::
visitor
<
void
>
{
connection_ptr
c
;
precache
(
connection_ptr
conn
)
:
c
(
conn
)
{}
void
operator
()(
const
signed_block
&
msg
)
const
{
c
->
blk_buffer
.
resize
(
c
->
message_size
);
memcpy
(
c
->
blk_buffer
.
data
(),
c
->
pending_message_buffer
.
data
(),
c
->
message_size
);
}
template
<
typename
T
>
void
operator
()(
const
T
&
msg
)
const
{
//no-op
}
};
struct
msgHandler
:
public
fc
::
visitor
<
void
>
{
net_plugin_impl
&
impl
;
...
...
@@ -710,14 +840,16 @@ namespace eos {
}
};
void
start_reading_pending_buffer
(
connection_ptr
c
)
{
boost
::
asio
::
async_read
(
*
c
->
socket
,
boost
::
asio
::
buffer
(
c
->
pending_message_buffer
.
data
(),
c
->
pending_message_size
),
[
this
,
c
](
boost
::
system
::
error_code
ec
,
std
::
size_t
bytes_transferred
)
{
if
(
!
ec
)
{
try
{
c
->
message_size
=
bytes_transferred
;
auto
msg
=
fc
::
raw
::
unpack
<
net_message
>
(
c
->
pending_message_buffer
);
precache
pc
(
c
);
msg
.
visit
(
pc
);
start_read_message
(
c
);
msgHandler
m
(
*
this
,
c
);
...
...
@@ -899,7 +1031,6 @@ namespace eos {
}
}
};
// class net_plugin_impl
void
...
...
@@ -993,7 +1124,9 @@ namespace eos {
}
my
->
send_whole_blocks
=
true
;
my
->
sync_req_span
=
10
;
my
->
sync_head
=
0
;
my
->
sync_req_head
=
0
;
if
(
options
.
count
(
"remote-endpoint"
)
)
{
my
->
supplied_peers
=
options
.
at
(
"remote-endpoint"
).
as
<
vector
<
string
>
>
();
}
...
...
@@ -1027,10 +1160,6 @@ namespace eos {
my
->
connections
.
insert
(
c
);
my
->
connect
(
c
);
}
boost
::
asio
::
signal_set
signals
(
app
().
get_io_service
(),
SIGINT
,
SIGTERM
);
signals
.
async_wait
([
this
](
const
boost
::
system
::
error_code
&
ec
,
int
signum
)
{
dlog
(
"caught signal ${sn}"
,
(
"sn"
,
signum
)
)
;
});
}
void
net_plugin
::
plugin_shutdown
()
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录