提交 664da7c2 编写于 作者: D Davies Liu

add argument to limit bandwidth

上级 995edd31
......@@ -22,6 +22,7 @@ type Config struct {
Include []string
Manager string
Workers []string
BWLimit int
Verbose bool
Quiet bool
}
......@@ -43,6 +44,7 @@ func NewConfigFromCli(c *cli.Context) *Config {
Include: c.StringSlice("include"),
Workers: c.StringSlice("worker"),
Manager: c.String("manager"),
BWLimit: c.Int("bwlimit"),
Verbose: c.Bool("verbose"),
Quiet: c.Bool("quiet"),
}
......
......@@ -16,6 +16,7 @@ require (
github.com/googleapis/gax-go v2.0.0+incompatible // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v0.0.0-20190127152727-3a9e1f8023d5
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 // indirect
github.com/juju/ratelimit v1.0.1
github.com/kr/fs v0.1.0 // indirect
github.com/ks3sdklib/aws-sdk-go v0.0.0-20180820074416-dafab05ad142
github.com/kurin/blazer v0.2.1
......
......@@ -213,6 +213,11 @@ func main() {
Name: "worker",
Usage: "hosts (seperated by comma) to launch worker",
},
&cli.IntFlag{
Name: "bwlimit",
Value: 0,
Usage: "limit bandwidth in Mbps (default: unlimited)",
},
&cli.BoolFlag{
Name: "verbose",
Aliases: []string{"v"},
......
......@@ -18,6 +18,7 @@ import (
"github.com/juicedata/juicesync/config"
"github.com/juicedata/juicesync/object"
"github.com/juicedata/juicesync/utils"
"github.com/juju/ratelimit"
"github.com/mattn/go-isatty"
)
......@@ -38,6 +39,7 @@ var (
failed int64
deleted int64
concurrent chan int
limiter *ratelimit.Bucket
)
var logger = utils.GetLogger("juicesync")
......@@ -137,6 +139,9 @@ func iterate(store object.ObjectStorage, start, end string) (<-chan *object.Obje
}
func copyObject(src, dst object.ObjectStorage, obj *object.Object) error {
if limiter != nil {
limiter.Wait(obj.Size)
}
concurrent <- 1
defer func() {
<-concurrent
......@@ -241,6 +246,9 @@ func copyInParallel(src, dst object.ObjectStorage, obj *object.Object) error {
sz = obj.Size - int64(num)*partSize
}
var err error
if limiter != nil {
limiter.Wait(sz)
}
concurrent <- 1
defer func() {
<-concurrent
......@@ -572,6 +580,10 @@ func Sync(src, dst object.ObjectStorage, config *config.Config) error {
todo := make(chan *object.Object, bufferSize)
wg := sync.WaitGroup{}
concurrent = make(chan int, config.Threads)
if config.BWLimit > 0 {
bps := float64(config.BWLimit*(1<<20)/8) * 0.85 // 15% overhead
limiter = ratelimit.NewBucketWithRate(bps, int64(bps)*3)
}
for i := 0; i < config.Threads; i++ {
wg.Add(1)
go func() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册