Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
whqwjb
go-ethereum
提交
710775f4
G
go-ethereum
项目概览
whqwjb
/
go-ethereum
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
G
go-ethereum
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
710775f4
编写于
1月 17, 2019
作者:
F
Ferenc Szabo
提交者:
Rafael Matias
2月 19, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
swarm/network: fix data race in fetcher_test.go (#18469)
(cherry picked from commit
19bfcbf9
)
上级
0fd01085
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
25 addition
and
25 deletion
+25
-25
swarm/network/fetcher.go
swarm/network/fetcher.go
+20
-14
swarm/network/fetcher_test.go
swarm/network/fetcher_test.go
+5
-11
未找到文件。
swarm/network/fetcher.go
浏览文件 @
710775f4
...
...
@@ -26,20 +26,23 @@ import (
"github.com/ethereum/go-ethereum/swarm/storage"
)
var
searchTimeout
=
1
*
time
.
Second
const
(
defaultSearchTimeout
=
1
*
time
.
Second
// maximum number of forwarded requests (hops), to make sure requests are not
// forwarded forever in peer loops
maxHopCount
uint8
=
20
)
// Time to consider peer to be skipped.
// Also used in stream delivery.
var
RequestTimeout
=
10
*
time
.
Second
var
maxHopCount
uint8
=
20
// maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops
type
RequestFunc
func
(
context
.
Context
,
*
Request
)
(
*
enode
.
ID
,
chan
struct
{},
error
)
// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
// keeps it alive until all active requests are completed. This can happen:
// 1. either because the chunk is delivered
// 2. or bec
use the requesto
r cancelled/timed out
// 2. or bec
ause the requeste
r cancelled/timed out
// Fetcher self destroys itself after it is completed.
// TODO: cancel all forward requests after termination
type
Fetcher
struct
{
...
...
@@ -47,6 +50,7 @@ type Fetcher struct {
addr
storage
.
Address
// the address of the chunk to be fetched
offerC
chan
*
enode
.
ID
// channel of sources (peer node id strings)
requestC
chan
uint8
// channel for incoming requests (with the hopCount value in it)
searchTimeout
time
.
Duration
skipCheck
bool
}
...
...
@@ -79,7 +83,7 @@ func (r *Request) SkipPeer(nodeID string) bool {
}
t
,
ok
:=
val
.
(
time
.
Time
)
if
ok
&&
time
.
Now
()
.
After
(
t
.
Add
(
RequestTimeout
))
{
// deadine expired
// dead
l
ine expired
r
.
peersToSkip
.
Delete
(
nodeID
)
return
false
}
...
...
@@ -100,9 +104,10 @@ func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
}
}
// New contructs a new Fetcher, for the given chunk. All peers in peersToSkip are not requested to
// deliver the given chunk. peersToSkip should always contain the peers which are actively requesting
// this chunk, to make sure we don't request back the chunks from them.
// New constructs a new Fetcher, for the given chunk. All peers in peersToSkip
// are not requested to deliver the given chunk. peersToSkip should always
// contain the peers which are actively requesting this chunk, to make sure we
// don't request back the chunks from them.
// The created Fetcher is started and returned.
func
(
f
*
FetcherFactory
)
New
(
ctx
context
.
Context
,
source
storage
.
Address
,
peersToSkip
*
sync
.
Map
)
storage
.
NetFetcher
{
fetcher
:=
NewFetcher
(
source
,
f
.
request
,
f
.
skipCheck
)
...
...
@@ -117,6 +122,7 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
protoRequestFunc
:
rf
,
offerC
:
make
(
chan
*
enode
.
ID
),
requestC
:
make
(
chan
uint8
),
searchTimeout
:
defaultSearchTimeout
,
skipCheck
:
skipCheck
,
}
}
...
...
@@ -176,7 +182,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// loop that keeps the fetching process alive
// after every request a timer is set. If this goes off we request again from another peer
// note that the previous request is still alive and has the chance to deliver, so
// re
requesting
extends the search. ie.,
// re
questing again
extends the search. ie.,
// if a peer we requested from is gone we issue a new request, so the number of active
// requests never decreases
for
{
...
...
@@ -209,13 +215,13 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// search timeout: too much time passed since the last request,
// extend the search to a new peer if we can find one
case
<-
waitC
:
log
.
Trace
(
"search timed out: re
re
questing"
,
"request addr"
,
f
.
addr
)
log
.
Trace
(
"search timed out: requesting"
,
"request addr"
,
f
.
addr
)
doRequest
=
requested
// all Fetcher context closed, can quit
case
<-
ctx
.
Done
()
:
log
.
Trace
(
"terminate fetcher"
,
"request addr"
,
f
.
addr
)
// TODO: send cancelations to all peers left over in peers map (i.e., those we requested from)
// TODO: send cancel
l
ations to all peers left over in peers map (i.e., those we requested from)
return
}
...
...
@@ -231,7 +237,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
// if wait channel is not set, set it to a timer
if
requested
{
if
wait
==
nil
{
wait
=
time
.
NewTimer
(
searchTimeout
)
wait
=
time
.
NewTimer
(
f
.
searchTimeout
)
defer
wait
.
Stop
()
waitC
=
wait
.
C
}
else
{
...
...
@@ -242,8 +248,8 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
default
:
}
}
// reset the timer to go off after
s
earchTimeout
wait
.
Reset
(
searchTimeout
)
// reset the timer to go off after
defaultS
earchTimeout
wait
.
Reset
(
f
.
searchTimeout
)
}
}
doRequest
=
false
...
...
swarm/network/fetcher_test.go
浏览文件 @
710775f4
...
...
@@ -284,15 +284,11 @@ func TestFetcherRetryOnTimeout(t *testing.T) {
requester
:=
newMockRequester
()
addr
:=
make
([]
byte
,
32
)
fetcher
:=
NewFetcher
(
addr
,
requester
.
doRequest
,
true
)
// set searchTimeOut to low value so the test is quicker
fetcher
.
searchTimeout
=
250
*
time
.
Millisecond
peersToSkip
:=
&
sync
.
Map
{}
// set searchTimeOut to low value so the test is quicker
defer
func
(
t
time
.
Duration
)
{
searchTimeout
=
t
}(
searchTimeout
)
searchTimeout
=
250
*
time
.
Millisecond
ctx
,
cancel
:=
context
.
WithCancel
(
context
.
Background
())
defer
cancel
()
...
...
@@ -359,11 +355,9 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
addr
:=
make
([]
byte
,
32
)
fetcher
:=
NewFetcher
(
addr
,
requester
.
doRequest
,
true
)
// make sure searchTimeout is long so it is sure the request is not retried because of timeout
defer
func
(
t
time
.
Duration
)
{
searchTimeout
=
t
}(
searchTimeout
)
searchTimeout
=
10
*
time
.
Second
// make sure the searchTimeout is long so it is sure the request is not
// retried because of timeout
fetcher
.
searchTimeout
=
10
*
time
.
Second
peersToSkip
:=
&
sync
.
Map
{}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录