提交 0b936e93 编写于 作者: W wuyi05

update pserver etcd

上级 b7a52bd9
...@@ -18,7 +18,8 @@ func main() { ...@@ -18,7 +18,8 @@ func main() {
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379", etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd") "comma separated endpoint string for pserver to connect to etcd")
etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls") etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls")
logLevel := flag.String("log-level", "info", "log level, one of debug") logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic")
flag.Parse() flag.Parse()
level, err := log.ParseLevel(*logLevel) level, err := log.ParseLevel(*logLevel)
......
...@@ -9,7 +9,7 @@ import ( ...@@ -9,7 +9,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/PaddlePaddle/Paddle/go/utils" "github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -33,6 +33,9 @@ const ( ...@@ -33,6 +33,9 @@ const (
Float64 Float64
) )
// PsDesired is etcd path for store desired pserver count
const PsDesired = "/ps_desired"
// Parameter is a piece of data to sync with the parameter server. // Parameter is a piece of data to sync with the parameter server.
type Parameter struct { type Parameter struct {
Name string Name string
...@@ -68,7 +71,8 @@ type Service struct { ...@@ -68,7 +71,8 @@ type Service struct {
externalIP string externalIP string
} }
// NewService creates a new service. // NewService creates a new service, will bypass etcd registration if no
// endpoints specified.
func NewService(endpoints string, timeout time.Duration) (*Service, error) { func NewService(endpoints string, timeout time.Duration) (*Service, error) {
s := &Service{opt: newOptimizer(sgd, 0.005)} s := &Service{opt: newOptimizer(sgd, 0.005)}
s.paramMap = make(map[string]Parameter) s.paramMap = make(map[string]Parameter)
...@@ -77,7 +81,7 @@ func NewService(endpoints string, timeout time.Duration) (*Service, error) { ...@@ -77,7 +81,7 @@ func NewService(endpoints string, timeout time.Duration) (*Service, error) {
s.etcdTimeout = timeout s.etcdTimeout = timeout
var err error var err error
s.externalIP, err = utils.GetExternalIP() s.externalIP, err = networkhelper.GetExternalIP()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -102,67 +106,74 @@ func NewService(endpoints string, timeout time.Duration) (*Service, error) { ...@@ -102,67 +106,74 @@ func NewService(endpoints string, timeout time.Duration) (*Service, error) {
// wait and set s.desired init value // wait and set s.desired init value
for { for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := s.etcdClient.Get(ctx, "/ps_desired") resp, err := s.etcdClient.Get(ctx, PsDesired)
cancel() cancel()
if err != nil { if err != nil {
log.Errorf("getting /ps_desired error: %v", err) log.Errorf("getting %s error: %v", PsDesired, err)
time.Sleep(s.etcdTimeout) time.Sleep(s.etcdTimeout)
continue continue
} }
for _, ev := range resp.Kvs { if len(resp.Kvs) != 0 {
log.Debugf("key: %s, value: %s", ev.Key, ev.Value) s.desired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if string(ev.Key) == "/ps_desired" { if err != nil {
s.desired, err = strconv.Atoi(string(ev.Value)) log.Errorf("value of %s invalid %v\n", PsDesired, err)
if err != nil { time.Sleep(s.etcdTimeout)
log.Errorf("value of /ps_desired invalid %v\n", err) // NOTE: wait util ps_desired value change
time.Sleep(s.etcdTimeout) continue
// NOTE: wait util ps_desired value change
continue
}
} }
break
}
}
// try register pserver node on etcd
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := s.registerPserverEtcd(ctx)
cancel()
if err != nil {
log.Warn(err)
time.Sleep(s.etcdTimeout)
continue
} }
break break
} }
s.registerPserverEtcd()
} // if endpoints != "" } // if endpoints != ""
// Bypass etcd registration if no endpoints specified // Bypass etcd registration if no endpoints specified
return s, nil return s, nil
} }
// registerPserverEtcd registers pserver node on etcd using transaction. // registerPserverEtcd registers pserver node on etcd using transaction.
func (s *Service) registerPserverEtcd() (*clientv3.TxnResponse, error) { func (s *Service) registerPserverEtcd(ctx context.Context) (*clientv3.TxnResponse, error) {
return concurrency.NewSTMRepeatable(context.TODO(), s.etcdClient, func(c concurrency.STM) error { return concurrency.NewSTM(s.etcdClient, func(c concurrency.STM) error {
registered := false
for i := 0; i < s.desired; i++ { for i := 0; i < s.desired; i++ {
psKey := "/ps/" + strconv.Itoa(i) psKey := "/ps/" + strconv.Itoa(i)
log.Debugf("checking %s", psKey) log.Debugf("checking %s", psKey)
ps := c.Get(psKey) ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey) log.Debugf("got value (%s) for key: %s", ps, psKey)
resp, err := s.etcdClient.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
if ps == "" { if ps == "" {
resp, err := s.etcdClient.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// find the first id and write info // find the first id and write info
c.Put(psKey, s.externalIP, clientv3.WithLease(resp.ID)) c.Put(psKey, s.externalIP, clientv3.WithLease(resp.ID))
log.Debugf("set pserver node %s with value %s", psKey, s.externalIP) log.Debugf("set pserver node %s with value %s", psKey, s.externalIP)
ch, kaerr := s.etcdClient.KeepAlive(context.TODO(), resp.ID) _, kaerr := s.etcdClient.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil { if kaerr != nil {
log.Errorf("keepalive etcd node error: %v", kaerr) log.Errorf("keepalive etcd node error: %v", kaerr)
return kaerr return kaerr
} }
// FIXME: does this really needed? log.Debug("register finished")
go func(ch <-chan *clientv3.LeaseKeepAliveResponse) { registered = true
ka := <-ch
log.Debugf("keepalive: %d\n", ka.TTL)
}(ch)
break break
} }
} }
log.Debug("register finished") if registered == true {
return nil return nil
}) }
return errors.New("not registerd, may due to already have enough pservers")
}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
} }
// InitParam initializes a parameter. // InitParam initializes a parameter.
......
package utils package networkhelper
import ( import (
"errors" "errors"
......
package utils package networkhelper
import "testing" import "testing"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册