提交 7dad0266 编写于 作者: H Helin Wang

Master server registers itself to etcd.

上级 42313a3c
package main package main
import ( import (
"fmt"
"net" "net"
"net/http" "net/http"
"net/rpc" "net/rpc"
...@@ -12,13 +13,13 @@ import ( ...@@ -12,13 +13,13 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/PaddlePaddle/Paddle/go/master" "github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
) )
func main() { func main() {
port := flag.Int("port", 8080, "port of the master server.") port := flag.Int("port", 8080, "port of the master server.")
ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.") ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.")
endpoints := flag.String("endpoints", "", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.") endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
...@@ -31,8 +32,13 @@ func main() { ...@@ -31,8 +32,13 @@ func main() {
var store master.Store var store master.Store
if *endpoints != "" { if *endpoints != "" {
eps := strings.Split(*endpoints, ",") eps := strings.Split(*endpoints, ",")
var err error ip, err := networkhelper.GetExternalIP()
store, err = master.NewEtcd(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec) if err != nil {
log.Fatal(err)
}
addr := fmt.Sprintf("%s:%d", ip, *port)
store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
......
...@@ -2,7 +2,7 @@ package master ...@@ -2,7 +2,7 @@ package master
import ( import (
"context" "context"
"sync" "time"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/clientv3/concurrency"
...@@ -14,22 +14,22 @@ const ( ...@@ -14,22 +14,22 @@ const (
DefaultLockPath = "/master/lock" DefaultLockPath = "/master/lock"
// DefaultStatePath is the default etcd key for master state. // DefaultStatePath is the default etcd key for master state.
DefaultStatePath = "/master/state" DefaultStatePath = "/master/state"
// DefaultAddrPath is the default etcd key for master address.
DefaultAddrPath = "/master/addr"
) )
// Etcd is the etcd abstraction that master uses for fault tolerance // EtcdClient is the etcd client that master uses for fault tolerance
// and service registry. // and service registry.
type Etcd struct { type EtcdClient struct {
lockPath string lockPath string
statePath string statePath string
ttlSec int
client *clientv3.Client client *clientv3.Client
lock *concurrency.Mutex
mu sync.Mutex
lock *concurrency.Mutex
} }
// NewEtcd creates a new Etcd. // NewEtcdClient creates a new EtcdClient.
func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd, error) { func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) {
log.Debugf("Connecting to etcd at %v", endpoints)
// TODO(helin): gracefully shutdown etcd store. Becuase etcd // TODO(helin): gracefully shutdown etcd store. Becuase etcd
// store holds a etcd lock, even though the lock will expire // store holds a etcd lock, even though the lock will expire
// when the lease timeout, we need to implement graceful // when the lease timeout, we need to implement graceful
...@@ -53,27 +53,35 @@ func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd, ...@@ -53,27 +53,35 @@ func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd,
// one master running, but split-brain problem may cuase // one master running, but split-brain problem may cuase
// multiple master servers running), and the cluster management // multiple master servers running), and the cluster management
// software will kill one of them. // software will kill one of them.
log.Infof("Trying to acquire lock at %s.", lockPath) log.Debugf("Trying to acquire lock at %s.", lockPath)
err = lock.Lock(context.TODO()) err = lock.Lock(context.TODO())
if err != nil { if err != nil {
return nil, err return nil, err
} }
log.Infof("Successfully acquired lock at %s.", lockPath) log.Debugf("Successfully acquired lock at %s.", lockPath)
e := &Etcd{} put := clientv3.OpPut(addrPath, string(addr))
e.client = cli resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit()
e.lock = lock if err != nil {
e.lockPath = lockPath return nil, err
e.statePath = statePath }
e.ttlSec = ttlSec
if !resp.Succeeded {
log.Fatal("No longer owns the master lock. Exiting.")
}
e := &EtcdClient{
lockPath: lockPath,
statePath: statePath,
client: cli,
lock: lock,
}
return e, nil return e, nil
} }
// Save saves the state into the etcd. // Save saves the state into the etcd.
func (e *Etcd) Save(state []byte) error { func (e *EtcdClient) Save(state []byte) error {
e.mu.Lock()
defer e.mu.Unlock()
ctx := context.TODO() ctx := context.TODO()
put := clientv3.OpPut(e.statePath, string(state)) put := clientv3.OpPut(e.statePath, string(state))
resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit() resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit()
...@@ -82,17 +90,21 @@ func (e *Etcd) Save(state []byte) error { ...@@ -82,17 +90,21 @@ func (e *Etcd) Save(state []byte) error {
} }
if !resp.Succeeded { if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock and save again.") log.Errorln("No longer owns the lock, trying to lock again")
sess, err := concurrency.NewSession(e.client, concurrency.WithTTL(e.ttlSec)) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err != nil { err := e.lock.Lock(ctx)
return err cancel()
}
e.lock = concurrency.NewMutex(sess, e.lockPath)
log.Infof("Try to acquire lock at %s.", e.lockPath)
err = e.lock.Lock(context.TODO())
if err != nil { if err != nil {
return err // We lost the master lock and can not acquire
// it back, it means some other master is
// already started. We don't want cluster
// managment system to kill the master server
// who is holding the lock and running
// correctly. So the most feasible solution is
// to kill current master server. The current
// state is not saved, but the trainer's RPC
// call will fail, so the trainer will retry.
log.Fatalf("Could not acquire the lock at %s: %v. Exiting.", e.lockPath, err)
} }
log.Infof("Successfully acquired lock at %s.", e.lockPath) log.Infof("Successfully acquired lock at %s.", e.lockPath)
return e.Save(state) return e.Save(state)
...@@ -102,8 +114,7 @@ func (e *Etcd) Save(state []byte) error { ...@@ -102,8 +114,7 @@ func (e *Etcd) Save(state []byte) error {
} }
// Load loads the state from etcd. // Load loads the state from etcd.
func (e *Etcd) Load() ([]byte, error) { func (e *EtcdClient) Load() ([]byte, error) {
e.mu.Lock()
ctx := context.TODO() ctx := context.TODO()
get := clientv3.OpGet(e.statePath) get := clientv3.OpGet(e.statePath)
...@@ -114,14 +125,7 @@ func (e *Etcd) Load() ([]byte, error) { ...@@ -114,14 +125,7 @@ func (e *Etcd) Load() ([]byte, error) {
if !resp.Succeeded { if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock and load again.") log.Errorln("No longer owns the lock, trying to lock and load again.")
sess, err := concurrency.NewSession(e.client) err = e.lock.Lock(context.Background())
if err != nil {
return nil, err
}
e.lock = concurrency.NewMutex(sess, e.lockPath)
err = e.lock.Lock(context.TODO())
e.mu.Unlock()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -132,11 +136,9 @@ func (e *Etcd) Load() ([]byte, error) { ...@@ -132,11 +136,9 @@ func (e *Etcd) Load() ([]byte, error) {
kvs := resp.Responses[0].GetResponseRange().Kvs kvs := resp.Responses[0].GetResponseRange().Kvs
if len(kvs) == 0 { if len(kvs) == 0 {
// No state exists // No state exists
e.mu.Unlock()
return nil, nil return nil, nil
} }
state := kvs[0].Value state := kvs[0].Value
e.mu.Unlock()
return state, nil return state, nil
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册