Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
whqwjb
go-ethereum
提交
7038b573
G
go-ethereum
项目概览
whqwjb
/
go-ethereum
与 Fork 源项目一致
从无法访问的项目Fork
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
G
go-ethereum
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
7038b573
编写于
1月 30, 2019
作者:
E
Elad
提交者:
Rafael Matias
2月 19, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
cmd/swarm/swarm-smoke: sliding window test (#18967)
(cherry picked from commit
b91bf088
)
上级
1ecf2860
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
416 addition
and
298 deletion
+416
-298
cmd/swarm/swarm-smoke/feed_upload_and_sync.go
cmd/swarm/swarm-smoke/feed_upload_and_sync.go
+5
-93
cmd/swarm/swarm-smoke/main.go
cmd/swarm/swarm-smoke/main.go
+9
-3
cmd/swarm/swarm-smoke/sliding_window.go
cmd/swarm/swarm-smoke/sliding_window.go
+122
-0
cmd/swarm/swarm-smoke/upload_and_sync.go
cmd/swarm/swarm-smoke/upload_and_sync.go
+5
-155
cmd/swarm/swarm-smoke/upload_speed.go
cmd/swarm/swarm-smoke/upload_speed.go
+8
-47
cmd/swarm/swarm-smoke/util.go
cmd/swarm/swarm-smoke/util.go
+267
-0
未找到文件。
cmd/swarm/swarm-smoke/feed_upload_and_sync.go
浏览文件 @
7038b573
...
...
@@ -2,13 +2,11 @@ package main
import
(
"bytes"
"context"
"crypto/md5"
crand
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptrace"
"os"
"os/exec"
"strings"
...
...
@@ -18,13 +16,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/storage/feed"
"github.com/ethereum/go-ethereum/swarm/testutil"
colorable
"github.com/mattn/go-colorable"
opentracing
"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
cli
"gopkg.in/urfave/cli.v1"
)
...
...
@@ -33,27 +25,6 @@ const (
feedRandomDataLength
=
8
)
func
cliFeedUploadAndSync
(
c
*
cli
.
Context
)
error
{
metrics
.
GetOrRegisterCounter
(
"feed-and-sync"
,
nil
)
.
Inc
(
1
)
log
.
Root
()
.
SetHandler
(
log
.
CallerFileHandler
(
log
.
LvlFilterHandler
(
log
.
Lvl
(
verbosity
),
log
.
StreamHandler
(
colorable
.
NewColorableStderr
(),
log
.
TerminalFormat
(
true
)))))
errc
:=
make
(
chan
error
)
go
func
()
{
errc
<-
feedUploadAndSync
(
c
)
}()
select
{
case
err
:=
<-
errc
:
if
err
!=
nil
{
metrics
.
GetOrRegisterCounter
(
"feed-and-sync.fail"
,
nil
)
.
Inc
(
1
)
}
return
err
case
<-
time
.
After
(
time
.
Duration
(
timeout
)
*
time
.
Second
)
:
metrics
.
GetOrRegisterCounter
(
"feed-and-sync.timeout"
,
nil
)
.
Inc
(
1
)
return
fmt
.
Errorf
(
"timeout after %v sec"
,
timeout
)
}
}
// TODO: retrieve with manifest + extract repeating code
func
feedUploadAndSync
(
c
*
cli
.
Context
)
error
{
defer
func
(
now
time
.
Time
)
{
log
.
Info
(
"total time"
,
"time"
,
time
.
Since
(
now
),
"size (kb)"
,
filesize
)
}(
time
.
Now
())
...
...
@@ -232,9 +203,10 @@ func feedUploadAndSync(c *cli.Context) error {
seed
:=
int
(
time
.
Now
()
.
UnixNano
()
/
1e6
)
log
.
Info
(
"feed uploading to "
+
endpoints
[
0
]
+
" and syncing"
,
"seed"
,
seed
)
randomBytes
:=
testutil
.
RandomBytes
(
seed
,
filesize
*
1000
)
h
=
md5
.
New
()
r
:=
io
.
TeeReader
(
io
.
LimitReader
(
crand
.
Reader
,
int64
(
filesize
*
1000
)),
h
)
hash
,
err
:=
upload
(
&
randomBytes
,
endpoints
[
0
])
hash
,
err
:=
upload
(
r
,
filesize
*
1000
,
endpoints
[
0
])
if
err
!=
nil
{
return
err
}
...
...
@@ -243,10 +215,7 @@ func feedUploadAndSync(c *cli.Context) error {
return
err
}
multihashHex
:=
hexutil
.
Encode
(
hashBytes
)
fileHash
,
err
:=
digest
(
bytes
.
NewReader
(
randomBytes
))
if
err
!=
nil
{
return
err
}
fileHash
:=
h
.
Sum
(
nil
)
log
.
Info
(
"uploaded successfully"
,
"hash"
,
hash
,
"digest"
,
fmt
.
Sprintf
(
"%x"
,
fileHash
))
...
...
@@ -307,60 +276,3 @@ func feedUploadAndSync(c *cli.Context) error {
return
nil
}
func
fetchFeed
(
topic
string
,
user
string
,
endpoint
string
,
original
[]
byte
,
ruid
string
)
error
{
ctx
,
sp
:=
spancontext
.
StartSpan
(
context
.
Background
(),
"feed-and-sync.fetch"
)
defer
sp
.
Finish
()
log
.
Trace
(
"sleeping"
,
"ruid"
,
ruid
)
time
.
Sleep
(
3
*
time
.
Second
)
log
.
Trace
(
"http get request (feed)"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"topic"
,
topic
,
"user"
,
user
)
var
tn
time
.
Time
reqUri
:=
endpoint
+
"/bzz-feed:/?topic="
+
topic
+
"&user="
+
user
req
,
_
:=
http
.
NewRequest
(
"GET"
,
reqUri
,
nil
)
opentracing
.
GlobalTracer
()
.
Inject
(
sp
.
Context
(),
opentracing
.
HTTPHeaders
,
opentracing
.
HTTPHeadersCarrier
(
req
.
Header
))
trace
:=
client
.
GetClientTrace
(
"feed-and-sync - http get"
,
"feed-and-sync"
,
ruid
,
&
tn
)
req
=
req
.
WithContext
(
httptrace
.
WithClientTrace
(
ctx
,
trace
))
transport
:=
http
.
DefaultTransport
//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
tn
=
time
.
Now
()
res
,
err
:=
transport
.
RoundTrip
(
req
)
if
err
!=
nil
{
log
.
Error
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"http get response (feed)"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"topic"
,
topic
,
"user"
,
user
,
"code"
,
res
.
StatusCode
,
"len"
,
res
.
ContentLength
)
if
res
.
StatusCode
!=
200
{
return
fmt
.
Errorf
(
"expected status code %d, got %v (ruid %v)"
,
200
,
res
.
StatusCode
,
ruid
)
}
defer
res
.
Body
.
Close
()
rdigest
,
err
:=
digest
(
res
.
Body
)
if
err
!=
nil
{
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
if
!
bytes
.
Equal
(
rdigest
,
original
)
{
err
:=
fmt
.
Errorf
(
"downloaded imported file md5=%x is not the same as the generated one=%x"
,
rdigest
,
original
)
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"downloaded file matches random file"
,
"ruid"
,
ruid
,
"len"
,
res
.
ContentLength
)
return
nil
}
cmd/swarm/swarm-smoke/main.go
浏览文件 @
7038b573
...
...
@@ -140,19 +140,25 @@ func main() {
Name
:
"upload_and_sync"
,
Aliases
:
[]
string
{
"c"
},
Usage
:
"upload and sync"
,
Action
:
cliUploadAndSync
,
Action
:
wrapCliCommand
(
"upload-and-sync"
,
true
,
uploadAndSync
)
,
},
{
Name
:
"feed_sync"
,
Aliases
:
[]
string
{
"f"
},
Usage
:
"feed update generate, upload and sync"
,
Action
:
cliFeedUploadAndSync
,
Action
:
wrapCliCommand
(
"feed-and-sync"
,
true
,
feedUploadAndSync
)
,
},
{
Name
:
"upload_speed"
,
Aliases
:
[]
string
{
"u"
},
Usage
:
"measure upload speed"
,
Action
:
cliUploadSpeed
,
Action
:
wrapCliCommand
(
"upload-speed"
,
true
,
uploadSpeed
),
},
{
Name
:
"sliding_window"
,
Aliases
:
[]
string
{
"s"
},
Usage
:
"measure network aggregate capacity"
,
Action
:
wrapCliCommand
(
"sliding-window"
,
false
,
slidingWindow
),
},
}
...
...
cmd/swarm/swarm-smoke/sliding_window.go
0 → 100644
浏览文件 @
7038b573
// Copyright 2018 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package
main
import
(
"crypto/md5"
crand
"crypto/rand"
"fmt"
"io"
"math/rand"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/pborman/uuid"
cli
"gopkg.in/urfave/cli.v1"
)
var
seed
=
time
.
Now
()
.
UTC
()
.
UnixNano
()
func
init
()
{
rand
.
Seed
(
seed
)
}
type
uploadResult
struct
{
hash
string
digest
[]
byte
}
func
slidingWindow
(
c
*
cli
.
Context
)
error
{
defer
func
(
now
time
.
Time
)
{
totalTime
:=
time
.
Since
(
now
)
log
.
Info
(
"total time"
,
"time"
,
totalTime
)
metrics
.
GetOrRegisterCounter
(
"sliding-window.total-time"
,
nil
)
.
Inc
(
int64
(
totalTime
))
}(
time
.
Now
())
generateEndpoints
(
scheme
,
cluster
,
appName
,
from
,
to
)
hashes
:=
[]
uploadResult
{}
//swarm hashes of the uploads
nodes
:=
to
-
from
const
iterationTimeout
=
30
*
time
.
Second
log
.
Info
(
"sliding window test started"
,
"nodes"
,
nodes
,
"filesize(kb)"
,
filesize
,
"timeout"
,
timeout
)
uploadedBytes
:=
0
networkDepth
:=
0
errored
:=
false
outer
:
for
{
log
.
Info
(
"uploading to "
+
endpoints
[
0
]
+
" and syncing"
,
"seed"
,
seed
)
h
:=
md5
.
New
()
r
:=
io
.
TeeReader
(
io
.
LimitReader
(
crand
.
Reader
,
int64
(
filesize
*
1000
)),
h
)
t1
:=
time
.
Now
()
hash
,
err
:=
upload
(
r
,
filesize
*
1000
,
endpoints
[
0
])
if
err
!=
nil
{
log
.
Error
(
err
.
Error
())
return
err
}
metrics
.
GetOrRegisterResettingTimer
(
"sliding-window.upload-time"
,
nil
)
.
UpdateSince
(
t1
)
fhash
:=
h
.
Sum
(
nil
)
log
.
Info
(
"uploaded successfully"
,
"hash"
,
hash
,
"digest"
,
fmt
.
Sprintf
(
"%x"
,
fhash
),
"sleeping"
,
syncDelay
)
hashes
=
append
(
hashes
,
uploadResult
{
hash
:
hash
,
digest
:
fhash
})
time
.
Sleep
(
time
.
Duration
(
syncDelay
)
*
time
.
Second
)
uploadedBytes
+=
filesize
*
1000
for
i
,
v
:=
range
hashes
{
timeout
:=
time
.
After
(
time
.
Duration
(
timeout
)
*
time
.
Second
)
errored
=
false
inner
:
for
{
select
{
case
<-
timeout
:
errored
=
true
log
.
Error
(
"error retrieving hash. timeout"
,
"hash idx"
,
i
,
"err"
,
err
)
metrics
.
GetOrRegisterCounter
(
"sliding-window.single.error"
,
nil
)
.
Inc
(
1
)
break
inner
default
:
randIndex
:=
1
+
rand
.
Intn
(
len
(
endpoints
)
-
1
)
ruid
:=
uuid
.
New
()[
:
8
]
start
:=
time
.
Now
()
err
:=
fetch
(
v
.
hash
,
endpoints
[
randIndex
],
v
.
digest
,
ruid
)
if
err
!=
nil
{
continue
inner
}
metrics
.
GetOrRegisterResettingTimer
(
"sliding-window.single.fetch-time"
,
nil
)
.
UpdateSince
(
start
)
break
inner
}
}
if
errored
{
break
outer
}
networkDepth
=
i
metrics
.
GetOrRegisterGauge
(
"sliding-window.network-depth"
,
nil
)
.
Update
(
int64
(
networkDepth
))
}
}
log
.
Info
(
"sliding window test finished"
,
"errored?"
,
errored
,
"networkDepth"
,
networkDepth
,
"networkDepth(kb)"
,
networkDepth
*
filesize
)
log
.
Info
(
"stats"
,
"uploadedFiles"
,
len
(
hashes
),
"uploadedKb"
,
uploadedBytes
/
1000
,
"filesizeKb"
,
filesize
)
return
nil
}
cmd/swarm/swarm-smoke/upload_and_sync.go
浏览文件 @
7038b573
...
...
@@ -17,76 +17,21 @@
package
main
import
(
"bytes"
"context"
"crypto/md5"
crand
"crypto/rand"
"errors"
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptrace"
"os"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
"github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/ethereum/go-ethereum/swarm/testutil"
opentracing
"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
cli
"gopkg.in/urfave/cli.v1"
)
func
generateEndpoints
(
scheme
string
,
cluster
string
,
app
string
,
from
int
,
to
int
)
{
if
cluster
==
"prod"
{
for
port
:=
from
;
port
<
to
;
port
++
{
endpoints
=
append
(
endpoints
,
fmt
.
Sprintf
(
"%s://%v.swarm-gateways.net"
,
scheme
,
port
))
}
}
else
if
cluster
==
"private-internal"
{
for
port
:=
from
;
port
<
to
;
port
++
{
endpoints
=
append
(
endpoints
,
fmt
.
Sprintf
(
"%s://swarm-private-internal-%v:8500"
,
scheme
,
port
))
}
}
else
{
for
port
:=
from
;
port
<
to
;
port
++
{
endpoints
=
append
(
endpoints
,
fmt
.
Sprintf
(
"%s://%s-%v-%s.stg.swarm-gateways.net"
,
scheme
,
app
,
port
,
cluster
))
}
}
if
includeLocalhost
{
endpoints
=
append
(
endpoints
,
"http://localhost:8500"
)
}
}
func
cliUploadAndSync
(
c
*
cli
.
Context
)
error
{
log
.
PrintOrigins
(
true
)
log
.
Root
()
.
SetHandler
(
log
.
LvlFilterHandler
(
log
.
Lvl
(
verbosity
),
log
.
StreamHandler
(
os
.
Stdout
,
log
.
TerminalFormat
(
true
))))
metrics
.
GetOrRegisterCounter
(
"upload-and-sync"
,
nil
)
.
Inc
(
1
)
errc
:=
make
(
chan
error
)
go
func
()
{
errc
<-
uploadAndSync
(
c
)
}()
select
{
case
err
:=
<-
errc
:
if
err
!=
nil
{
metrics
.
GetOrRegisterCounter
(
"upload-and-sync.fail"
,
nil
)
.
Inc
(
1
)
}
return
err
case
<-
time
.
After
(
time
.
Duration
(
timeout
)
*
time
.
Second
)
:
metrics
.
GetOrRegisterCounter
(
"upload-and-sync.timeout"
,
nil
)
.
Inc
(
1
)
return
fmt
.
Errorf
(
"timeout after %v sec"
,
timeout
)
}
}
func
uploadAndSync
(
c
*
cli
.
Context
)
error
{
defer
func
(
now
time
.
Time
)
{
totalTime
:=
time
.
Since
(
now
)
...
...
@@ -96,23 +41,21 @@ func uploadAndSync(c *cli.Context) error {
generateEndpoints
(
scheme
,
cluster
,
appName
,
from
,
to
)
seed
:=
int
(
time
.
Now
()
.
UnixNano
()
/
1e6
)
log
.
Info
(
"uploading to "
+
endpoints
[
0
]
+
" and syncing"
,
"seed"
,
seed
)
randomBytes
:=
testutil
.
RandomBytes
(
seed
,
filesize
*
1000
)
h
:=
md5
.
New
()
r
:=
io
.
TeeReader
(
io
.
LimitReader
(
crand
.
Reader
,
int64
(
filesize
*
1000
)),
h
)
t1
:=
time
.
Now
()
hash
,
err
:=
upload
(
&
randomBytes
,
endpoints
[
0
])
hash
,
err
:=
upload
(
r
,
filesize
*
1000
,
endpoints
[
0
])
if
err
!=
nil
{
log
.
Error
(
err
.
Error
())
return
err
}
metrics
.
GetOrRegisterResettingTimer
(
"upload-and-sync.upload-time"
,
nil
)
.
UpdateSince
(
t1
)
fhash
,
err
:=
digest
(
bytes
.
NewReader
(
randomBytes
))
if
err
!=
nil
{
log
.
Error
(
err
.
Error
())
return
err
}
fhash
:=
h
.
Sum
(
nil
)
log
.
Info
(
"uploaded successfully"
,
"hash"
,
hash
,
"digest"
,
fmt
.
Sprintf
(
"%x"
,
fhash
))
...
...
@@ -161,96 +104,3 @@ func uploadAndSync(c *cli.Context) error {
return
nil
}
// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file
func
fetch
(
hash
string
,
endpoint
string
,
original
[]
byte
,
ruid
string
)
error
{
ctx
,
sp
:=
spancontext
.
StartSpan
(
context
.
Background
(),
"upload-and-sync.fetch"
)
defer
sp
.
Finish
()
log
.
Trace
(
"http get request"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"hash"
,
hash
)
var
tn
time
.
Time
reqUri
:=
endpoint
+
"/bzz:/"
+
hash
+
"/"
req
,
_
:=
http
.
NewRequest
(
"GET"
,
reqUri
,
nil
)
opentracing
.
GlobalTracer
()
.
Inject
(
sp
.
Context
(),
opentracing
.
HTTPHeaders
,
opentracing
.
HTTPHeadersCarrier
(
req
.
Header
))
trace
:=
client
.
GetClientTrace
(
"upload-and-sync - http get"
,
"upload-and-sync"
,
ruid
,
&
tn
)
req
=
req
.
WithContext
(
httptrace
.
WithClientTrace
(
ctx
,
trace
))
transport
:=
http
.
DefaultTransport
//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
tn
=
time
.
Now
()
res
,
err
:=
transport
.
RoundTrip
(
req
)
if
err
!=
nil
{
log
.
Error
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"http get response"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"hash"
,
hash
,
"code"
,
res
.
StatusCode
,
"len"
,
res
.
ContentLength
)
if
res
.
StatusCode
!=
200
{
err
:=
fmt
.
Errorf
(
"expected status code %d, got %v"
,
200
,
res
.
StatusCode
)
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
defer
res
.
Body
.
Close
()
rdigest
,
err
:=
digest
(
res
.
Body
)
if
err
!=
nil
{
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
if
!
bytes
.
Equal
(
rdigest
,
original
)
{
err
:=
fmt
.
Errorf
(
"downloaded imported file md5=%x is not the same as the generated one=%x"
,
rdigest
,
original
)
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"downloaded file matches random file"
,
"ruid"
,
ruid
,
"len"
,
res
.
ContentLength
)
return
nil
}
// upload is uploading a file `f` to `endpoint` via the `swarm up` cmd
func
upload
(
dataBytes
*
[]
byte
,
endpoint
string
)
(
string
,
error
)
{
swarm
:=
client
.
NewClient
(
endpoint
)
f
:=
&
client
.
File
{
ReadCloser
:
ioutil
.
NopCloser
(
bytes
.
NewReader
(
*
dataBytes
)),
ManifestEntry
:
api
.
ManifestEntry
{
ContentType
:
"text/plain"
,
Mode
:
0660
,
Size
:
int64
(
len
(
*
dataBytes
)),
},
}
// upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded.
return
swarm
.
Upload
(
f
,
""
,
false
)
}
func
digest
(
r
io
.
Reader
)
([]
byte
,
error
)
{
h
:=
md5
.
New
()
_
,
err
:=
io
.
Copy
(
h
,
r
)
if
err
!=
nil
{
return
nil
,
err
}
return
h
.
Sum
(
nil
),
nil
}
// generates random data in heap buffer
func
generateRandomData
(
datasize
int
)
([]
byte
,
error
)
{
b
:=
make
([]
byte
,
datasize
)
c
,
err
:=
crand
.
Read
(
b
)
if
err
!=
nil
{
return
nil
,
err
}
else
if
c
!=
datasize
{
return
nil
,
errors
.
New
(
"short read"
)
}
return
b
,
nil
}
cmd/swarm/swarm-smoke/upload_speed.go
浏览文件 @
7038b573
...
...
@@ -17,54 +17,18 @@
package
main
import
(
"bytes"
"crypto/md5"
crand
"crypto/rand"
"fmt"
"
os
"
"
io
"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/testutil"
cli
"gopkg.in/urfave/cli.v1"
)
var
endpoint
string
//just use the first endpoint
func
generateEndpoint
(
scheme
string
,
cluster
string
,
app
string
,
from
int
)
{
if
cluster
==
"prod"
{
endpoint
=
fmt
.
Sprintf
(
"%s://%v.swarm-gateways.net"
,
scheme
,
from
)
}
else
if
cluster
==
"private-internal"
{
endpoint
=
fmt
.
Sprintf
(
"%s://swarm-private-internal-%v:8500"
,
scheme
,
from
)
}
else
{
endpoint
=
fmt
.
Sprintf
(
"%s://%s-%v-%s.stg.swarm-gateways.net"
,
scheme
,
app
,
from
,
cluster
)
}
}
func
cliUploadSpeed
(
c
*
cli
.
Context
)
error
{
log
.
PrintOrigins
(
true
)
log
.
Root
()
.
SetHandler
(
log
.
LvlFilterHandler
(
log
.
Lvl
(
verbosity
),
log
.
StreamHandler
(
os
.
Stdout
,
log
.
TerminalFormat
(
true
))))
metrics
.
GetOrRegisterCounter
(
"upload-speed"
,
nil
)
.
Inc
(
1
)
errc
:=
make
(
chan
error
)
go
func
()
{
errc
<-
uploadSpeed
(
c
)
}()
select
{
case
err
:=
<-
errc
:
if
err
!=
nil
{
metrics
.
GetOrRegisterCounter
(
"upload-speed.fail"
,
nil
)
.
Inc
(
1
)
}
return
err
case
<-
time
.
After
(
time
.
Duration
(
timeout
)
*
time
.
Second
)
:
metrics
.
GetOrRegisterCounter
(
"upload-speed.timeout"
,
nil
)
.
Inc
(
1
)
return
fmt
.
Errorf
(
"timeout after %v sec"
,
timeout
)
}
}
func
uploadSpeed
(
c
*
cli
.
Context
)
error
{
defer
func
(
now
time
.
Time
)
{
totalTime
:=
time
.
Since
(
now
)
...
...
@@ -73,25 +37,22 @@ func uploadSpeed(c *cli.Context) error {
metrics
.
GetOrRegisterCounter
(
"upload-speed.total-time"
,
nil
)
.
Inc
(
int64
(
totalTime
))
}(
time
.
Now
())
generateEndpoint
(
scheme
,
cluster
,
appName
,
from
)
endpoint
:=
generateEndpoint
(
scheme
,
cluster
,
appName
,
from
)
seed
:=
int
(
time
.
Now
()
.
UnixNano
()
/
1e6
)
log
.
Info
(
"uploading to "
+
endpoint
,
"seed"
,
seed
)
randomBytes
:=
testutil
.
RandomBytes
(
seed
,
filesize
*
1000
)
h
:=
md5
.
New
()
r
:=
io
.
TeeReader
(
io
.
LimitReader
(
crand
.
Reader
,
int64
(
filesize
*
1000
)),
h
)
t1
:=
time
.
Now
()
hash
,
err
:=
upload
(
&
randomBytes
,
endpoint
)
hash
,
err
:=
upload
(
r
,
filesize
*
1000
,
endpoint
)
if
err
!=
nil
{
log
.
Error
(
err
.
Error
())
return
err
}
metrics
.
GetOrRegisterCounter
(
"upload-speed.upload-time"
,
nil
)
.
Inc
(
int64
(
time
.
Since
(
t1
)))
fhash
,
err
:=
digest
(
bytes
.
NewReader
(
randomBytes
))
if
err
!=
nil
{
log
.
Error
(
err
.
Error
())
return
err
}
fhash
:=
h
.
Sum
(
nil
)
log
.
Info
(
"uploaded successfully"
,
"hash"
,
hash
,
"digest"
,
fmt
.
Sprintf
(
"%x"
,
fhash
))
return
nil
...
...
cmd/swarm/swarm-smoke/util.go
0 → 100644
浏览文件 @
7038b573
// Copyright 2018 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package
main
import
(
"bytes"
"context"
"crypto/md5"
crand
"crypto/rand"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptrace"
"os"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/api"
"github.com/ethereum/go-ethereum/swarm/api/client"
"github.com/ethereum/go-ethereum/swarm/spancontext"
opentracing
"github.com/opentracing/opentracing-go"
cli
"gopkg.in/urfave/cli.v1"
)
var
(
commandName
=
""
)
func
wrapCliCommand
(
name
string
,
killOnTimeout
bool
,
command
func
(
*
cli
.
Context
)
error
)
func
(
*
cli
.
Context
)
error
{
return
func
(
ctx
*
cli
.
Context
)
error
{
log
.
PrintOrigins
(
true
)
log
.
Root
()
.
SetHandler
(
log
.
LvlFilterHandler
(
log
.
Lvl
(
verbosity
),
log
.
StreamHandler
(
os
.
Stdout
,
log
.
TerminalFormat
(
true
))))
defer
func
(
now
time
.
Time
)
{
totalTime
:=
time
.
Since
(
now
)
log
.
Info
(
"total time"
,
"time"
,
totalTime
)
metrics
.
GetOrRegisterCounter
(
name
+
".total-time"
,
nil
)
.
Inc
(
int64
(
totalTime
))
}(
time
.
Now
())
log
.
Info
(
"smoke test starting"
,
"task"
,
name
,
"timeout"
,
timeout
)
commandName
=
name
metrics
.
GetOrRegisterCounter
(
name
,
nil
)
.
Inc
(
1
)
errc
:=
make
(
chan
error
)
done
:=
make
(
chan
struct
{})
if
killOnTimeout
{
go
func
()
{
<-
time
.
After
(
time
.
Duration
(
timeout
)
*
time
.
Second
)
close
(
done
)
}()
}
go
func
()
{
errc
<-
command
(
ctx
)
}()
select
{
case
err
:=
<-
errc
:
if
err
!=
nil
{
metrics
.
GetOrRegisterCounter
(
fmt
.
Sprintf
(
"%s.fail"
,
name
),
nil
)
.
Inc
(
1
)
}
return
err
case
<-
done
:
metrics
.
GetOrRegisterCounter
(
fmt
.
Sprintf
(
"%s.timeout"
,
name
),
nil
)
.
Inc
(
1
)
return
fmt
.
Errorf
(
"timeout after %v sec"
,
timeout
)
}
}
}
func
generateEndpoints
(
scheme
string
,
cluster
string
,
app
string
,
from
int
,
to
int
)
{
if
cluster
==
"prod"
{
for
port
:=
from
;
port
<
to
;
port
++
{
endpoints
=
append
(
endpoints
,
fmt
.
Sprintf
(
"%s://%v.swarm-gateways.net"
,
scheme
,
port
))
}
}
else
if
cluster
==
"private-internal"
{
for
port
:=
from
;
port
<
to
;
port
++
{
endpoints
=
append
(
endpoints
,
fmt
.
Sprintf
(
"%s://swarm-private-internal-%v:8500"
,
scheme
,
port
))
}
}
else
{
for
port
:=
from
;
port
<
to
;
port
++
{
endpoints
=
append
(
endpoints
,
fmt
.
Sprintf
(
"%s://%s-%v-%s.stg.swarm-gateways.net"
,
scheme
,
app
,
port
,
cluster
))
}
}
if
includeLocalhost
{
endpoints
=
append
(
endpoints
,
"http://localhost:8500"
)
}
}
//just use the first endpoint
func
generateEndpoint
(
scheme
string
,
cluster
string
,
app
string
,
from
int
)
string
{
if
cluster
==
"prod"
{
return
fmt
.
Sprintf
(
"%s://%v.swarm-gateways.net"
,
scheme
,
from
)
}
else
if
cluster
==
"private-internal"
{
return
fmt
.
Sprintf
(
"%s://swarm-private-internal-%v:8500"
,
scheme
,
from
)
}
else
{
return
fmt
.
Sprintf
(
"%s://%s-%v-%s.stg.swarm-gateways.net"
,
scheme
,
app
,
from
,
cluster
)
}
}
func
fetchFeed
(
topic
string
,
user
string
,
endpoint
string
,
original
[]
byte
,
ruid
string
)
error
{
ctx
,
sp
:=
spancontext
.
StartSpan
(
context
.
Background
(),
"feed-and-sync.fetch"
)
defer
sp
.
Finish
()
log
.
Trace
(
"sleeping"
,
"ruid"
,
ruid
)
time
.
Sleep
(
3
*
time
.
Second
)
log
.
Trace
(
"http get request (feed)"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"topic"
,
topic
,
"user"
,
user
)
var
tn
time
.
Time
reqUri
:=
endpoint
+
"/bzz-feed:/?topic="
+
topic
+
"&user="
+
user
req
,
_
:=
http
.
NewRequest
(
"GET"
,
reqUri
,
nil
)
opentracing
.
GlobalTracer
()
.
Inject
(
sp
.
Context
(),
opentracing
.
HTTPHeaders
,
opentracing
.
HTTPHeadersCarrier
(
req
.
Header
))
trace
:=
client
.
GetClientTrace
(
"feed-and-sync - http get"
,
"feed-and-sync"
,
ruid
,
&
tn
)
req
=
req
.
WithContext
(
httptrace
.
WithClientTrace
(
ctx
,
trace
))
transport
:=
http
.
DefaultTransport
//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
tn
=
time
.
Now
()
res
,
err
:=
transport
.
RoundTrip
(
req
)
if
err
!=
nil
{
log
.
Error
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"http get response (feed)"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"topic"
,
topic
,
"user"
,
user
,
"code"
,
res
.
StatusCode
,
"len"
,
res
.
ContentLength
)
if
res
.
StatusCode
!=
200
{
return
fmt
.
Errorf
(
"expected status code %d, got %v (ruid %v)"
,
200
,
res
.
StatusCode
,
ruid
)
}
defer
res
.
Body
.
Close
()
rdigest
,
err
:=
digest
(
res
.
Body
)
if
err
!=
nil
{
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
if
!
bytes
.
Equal
(
rdigest
,
original
)
{
err
:=
fmt
.
Errorf
(
"downloaded imported file md5=%x is not the same as the generated one=%x"
,
rdigest
,
original
)
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"downloaded file matches random file"
,
"ruid"
,
ruid
,
"len"
,
res
.
ContentLength
)
return
nil
}
// fetch is getting the requested `hash` from the `endpoint` and compares it with the `original` file
func
fetch
(
hash
string
,
endpoint
string
,
original
[]
byte
,
ruid
string
)
error
{
ctx
,
sp
:=
spancontext
.
StartSpan
(
context
.
Background
(),
"upload-and-sync.fetch"
)
defer
sp
.
Finish
()
log
.
Trace
(
"http get request"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"hash"
,
hash
)
var
tn
time
.
Time
reqUri
:=
endpoint
+
"/bzz:/"
+
hash
+
"/"
req
,
_
:=
http
.
NewRequest
(
"GET"
,
reqUri
,
nil
)
opentracing
.
GlobalTracer
()
.
Inject
(
sp
.
Context
(),
opentracing
.
HTTPHeaders
,
opentracing
.
HTTPHeadersCarrier
(
req
.
Header
))
trace
:=
client
.
GetClientTrace
(
commandName
+
" - http get"
,
commandName
,
ruid
,
&
tn
)
req
=
req
.
WithContext
(
httptrace
.
WithClientTrace
(
ctx
,
trace
))
transport
:=
http
.
DefaultTransport
//transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
tn
=
time
.
Now
()
res
,
err
:=
transport
.
RoundTrip
(
req
)
if
err
!=
nil
{
log
.
Error
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"http get response"
,
"ruid"
,
ruid
,
"api"
,
endpoint
,
"hash"
,
hash
,
"code"
,
res
.
StatusCode
,
"len"
,
res
.
ContentLength
)
if
res
.
StatusCode
!=
200
{
err
:=
fmt
.
Errorf
(
"expected status code %d, got %v"
,
200
,
res
.
StatusCode
)
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
defer
res
.
Body
.
Close
()
rdigest
,
err
:=
digest
(
res
.
Body
)
if
err
!=
nil
{
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
if
!
bytes
.
Equal
(
rdigest
,
original
)
{
err
:=
fmt
.
Errorf
(
"downloaded imported file md5=%x is not the same as the generated one=%x"
,
rdigest
,
original
)
log
.
Warn
(
err
.
Error
(),
"ruid"
,
ruid
)
return
err
}
log
.
Trace
(
"downloaded file matches random file"
,
"ruid"
,
ruid
,
"len"
,
res
.
ContentLength
)
return
nil
}
// upload an arbitrary byte as a plaintext file to `endpoint` using the api client
func
upload
(
r
io
.
Reader
,
size
int
,
endpoint
string
)
(
string
,
error
)
{
swarm
:=
client
.
NewClient
(
endpoint
)
f
:=
&
client
.
File
{
ReadCloser
:
ioutil
.
NopCloser
(
r
),
ManifestEntry
:
api
.
ManifestEntry
{
ContentType
:
"text/plain"
,
Mode
:
0660
,
Size
:
int64
(
size
),
},
}
// upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded.
return
swarm
.
Upload
(
f
,
""
,
false
)
}
func
digest
(
r
io
.
Reader
)
([]
byte
,
error
)
{
h
:=
md5
.
New
()
_
,
err
:=
io
.
Copy
(
h
,
r
)
if
err
!=
nil
{
return
nil
,
err
}
return
h
.
Sum
(
nil
),
nil
}
// generates random data in heap buffer
func
generateRandomData
(
datasize
int
)
([]
byte
,
error
)
{
b
:=
make
([]
byte
,
datasize
)
c
,
err
:=
crand
.
Read
(
b
)
if
err
!=
nil
{
return
nil
,
err
}
else
if
c
!=
datasize
{
return
nil
,
errors
.
New
(
"short read"
)
}
return
b
,
nil
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录