提交 19bfb8a1 编写于 作者: Y Yancey 提交者: GitHub

PServer recovery from checkpoint (#2741)

* Server recovery from checkpoint
上级 f5f7d6bd
...@@ -22,3 +22,6 @@ cmake-build-* ...@@ -22,3 +22,6 @@ cmake-build-*
# generated while compiling # generated while compiling
python/paddle/v2/framework/core.so python/paddle/v2/framework/core.so
CMakeFiles
cmake_install.cmake
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"time" "time"
"github.com/namsral/flag" "github.com/namsral/flag"
"github.com/topicai/candy"
"github.com/PaddlePaddle/Paddle/go/pserver" "github.com/PaddlePaddle/Paddle/go/pserver"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -18,53 +19,47 @@ func main() { ...@@ -18,53 +19,47 @@ func main() {
index := flag.Int("index", -1, "index of this pserver, should be larger or equal than 0") index := flag.Int("index", -1, "index of this pserver, should be larger or equal than 0")
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379", etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd") "comma separated endpoint string for pserver to connect to etcd")
etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls") etcdTimeout := flag.Duration("etcd-timeout", 5*time.Second, "timeout for etcd calls")
numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job") numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path") checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
checkpointInterval := flag.Int("checkpoint-interval", 600, "save checkpoint per interval seconds") checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
logLevel := flag.String("log-level", "info", logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic") "log level, possible values: debug, info, warning, error, fatal, panic")
flag.Parse() flag.Parse()
level, err := log.ParseLevel(*logLevel) level, err := log.ParseLevel(*logLevel)
if err != nil { candy.Must(err)
panic(err)
}
log.SetLevel(level) log.SetLevel(level)
var idx int var idx int
var cp pserver.Checkpoint
var cp *pserver.Checkpoint
var e *pserver.EtcdClient var e *pserver.EtcdClient
if *index >= 0 { if *index >= 0 {
idx = *index idx = *index
} else { } else {
timeout := time.Second * time.Duration((*etcdTimeout)) e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, *etcdTimeout)
e = pserver.NewEtcdClient(*etcdEndpoint, *numPservers, timeout)
idx, err = e.Register() idx, err = e.Register()
candy.Must(err)
cp, err = pserver.NewCheckpointFromFile(*checkpointPath, idx, e)
if err != nil { if err != nil {
panic(err) log.Errorf("Fetch checkpoint failed, %s", err)
} }
} }
s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp) s, err := pserver.NewService(idx, *checkpointInterval, *checkpointPath, e, cp)
if err != nil { candy.Must(err)
panic(err)
}
err = rpc.Register(s) err = rpc.Register(s)
if err != nil { candy.Must(err)
panic(err)
}
rpc.HandleHTTP() rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil { candy.Must(err)
panic(err)
}
log.Infof("start pserver at port %d", *port) log.Infof("start pserver at port %d", *port)
err = http.Serve(l, nil) err = http.Serve(l, nil)
candy.Must(err)
if err != nil {
panic(err)
}
} }
hash: b8f18ce6784bd3fadd9fed0b8443e7b658234ea785ae1f220723ae2c1f652aa7 hash: a8faea3a363468a88917ddeb3b1c9ea36886fb2c622acbad42604fa9cb4d3855
updated: 2017-06-27T14:05:48.925262819+08:00 updated: 2017-07-11T10:04:40.786745417+08:00
imports: imports:
- name: github.com/coreos/etcd - name: github.com/coreos/etcd
version: 61fc123e7a8b14a0a258aa3f5c4159861b1ec2e7 version: cb2a496c4ddd1c87a9f280e116649b599999ec79
subpackages: subpackages:
- auth/authpb - auth/authpb
- clientv3 - clientv3
...@@ -22,7 +22,9 @@ imports: ...@@ -22,7 +22,9 @@ imports:
- name: github.com/PaddlePaddle/recordio - name: github.com/PaddlePaddle/recordio
version: edfb82af0739c84f241c87390ec5649c7b28c129 version: edfb82af0739c84f241c87390ec5649c7b28c129
- name: github.com/sirupsen/logrus - name: github.com/sirupsen/logrus
version: 202f25545ea4cf9b191ff7f846df5d87c9382c2b version: 7f976d3a76720c4c27af2ba716b85d2e0a7e38b1
- name: github.com/topicai/candy
version: 1b9030d056fa9f8c4b1f9c91b52fe4b8ab4cd8cc
- name: golang.org/x/net - name: golang.org/x/net
version: c8c74377599bd978aee1cf3b9b63a8634051cec2 version: c8c74377599bd978aee1cf3b9b63a8634051cec2
subpackages: subpackages:
...@@ -34,11 +36,11 @@ imports: ...@@ -34,11 +36,11 @@ imports:
- lex/httplex - lex/httplex
- trace - trace
- name: golang.org/x/sys - name: golang.org/x/sys
version: f7928cfef4d09d1b080aa2b6fd3ca9ba1567c733 version: abf9c25f54453410d0c6668e519582a9e1115027
subpackages: subpackages:
- unix - unix
- name: golang.org/x/text - name: golang.org/x/text
version: 4e9ab9ee170f2a39bd66c92b3e0a47ff47a4bc77 version: cfdf022e86b4ecfb646e1efbd7db175dd623a8fa
subpackages: subpackages:
- secure/bidirule - secure/bidirule
- transform - transform
......
...@@ -10,3 +10,4 @@ import: ...@@ -10,3 +10,4 @@ import:
version: ^1.7.4-pre version: ^1.7.4-pre
- package: github.com/sirupsen/logrus - package: github.com/sirupsen/logrus
version: ^1.0.0 version: ^1.0.0
- package: github.com/topicai/candy
...@@ -16,7 +16,7 @@ import ( ...@@ -16,7 +16,7 @@ import (
const ( const (
// PsDesired is etcd path for store desired pserver count // PsDesired is etcd path for store desired pserver count
PsDesired = "/ps_desired" PsDesired = "/ps_desired"
// PsAddr is the base dir for pserver to store their addr // PsPath is the base dir for pserver to store their addr
PsPath = "/ps/" PsPath = "/ps/"
// PsCheckpoint is the etcd path for store checkpoints information // PsCheckpoint is the etcd path for store checkpoints information
PsCheckpoint = "/checkpoints/" PsCheckpoint = "/checkpoints/"
...@@ -189,9 +189,25 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) { ...@@ -189,9 +189,25 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) {
return idx, nil return idx, nil
} }
// GetKey gets the value by the specified key
func (e *EtcdClient) GetKey(key string, timeout time.Duration) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
resp, err := e.etcdClient.Get(ctx, key)
cancel()
if err != nil {
return []byte{}, err
}
kvs := resp.Kvs
if len(kvs) == 0 {
return []byte{}, nil
}
v := kvs[0].Value
return v, nil
}
// PutKey put into etcd with value by key specified // PutKey put into etcd with value by key specified
func (e *EtcdClient) PutKey(key string, value []byte, timeout int) error { func (e *EtcdClient) PutKey(key string, value []byte, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err := e.etcdClient.Put(ctx, key, string(value)) _, err := e.etcdClient.Put(ctx, key, string(value))
cancel() cancel()
if err != nil { if err != nil {
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
...@@ -21,14 +22,14 @@ import ( ...@@ -21,14 +22,14 @@ import (
// ElementType is the type of elements of a Parameter. // ElementType is the type of elements of a Parameter.
type ElementType int type ElementType int
// RPC error message.
const ( const (
// AlreadyInitialized is true if pserver is initialized
AlreadyInitialized = "pserver already initialized" AlreadyInitialized = "pserver already initialized"
// Uninitialized is true if pserver not fully initialized
Uninitialized = "pserver not fully initialized" Uninitialized = "pserver not fully initialized"
CheckpointMD5Failed = "checkpoint file MD5 validation failed"
) )
// Supported element types // Supported element types.
const ( const (
Int32 ElementType = iota Int32 ElementType = iota
UInt32 UInt32
...@@ -51,21 +52,15 @@ type ParameterWithConfig struct { ...@@ -51,21 +52,15 @@ type ParameterWithConfig struct {
Config []byte // parameter configuration in Proto Buffer format Config []byte // parameter configuration in Proto Buffer format
} }
// ParameterCheckpoint is Parameter and State checkpoint // checkpointMeta saves checkpoint metadata
type ParameterCheckpoint struct {
ParamConfig ParameterWithConfig
State []byte
}
// checkpoint signature
type checkpointMeta struct { type checkpointMeta struct {
UUID string `json:"uuid"` UUID string `json:"uuid"`
Md5sum string `json:"md5sum"` MD5 string `json:"md5"`
Timestamp string `json:"timestamp"` Timestamp int64 `json:"timestamp"`
} }
// Checkpoint is the pserver shard persist in file // Checkpoint is the pserver shard persist in file
type Checkpoint []ParameterCheckpoint type Checkpoint []parameterCheckpoint
// Gradient is the gradient of the parameter. // Gradient is the gradient of the parameter.
type Gradient Parameter type Gradient Parameter
...@@ -81,12 +76,53 @@ type Service struct { ...@@ -81,12 +76,53 @@ type Service struct {
optMap map[string]*optimizer optMap map[string]*optimizer
} }
// parameterCheckpoint saves parameter checkpoint
type parameterCheckpoint struct {
ParameterWithConfig
State []byte
}
// NewCheckpointFromFile loads parameters and state from checkpoint file
func NewCheckpointFromFile(cpPath string, idx int, e *EtcdClient) (*Checkpoint, error) {
v, err := e.GetKey(PsPath+string(idx), 3*time.Second)
if err != nil {
return nil, err
}
var cpMeta checkpointMeta
if err = json.Unmarshal(v, &cpMeta); err != nil {
return nil, err
}
fn := filepath.Join(cpPath, cpMeta.UUID)
if _, err = os.Stat(fn); os.IsNotExist(err) {
return nil, err
}
content, err := ioutil.ReadFile(fn)
if err != nil {
return nil, err
}
h := md5.New()
md5 := hex.EncodeToString(h.Sum(content))
if md5 != cpMeta.MD5 {
return nil, errors.New(CheckpointMD5Failed)
}
dec := gob.NewDecoder(bytes.NewReader(content))
cp := &Checkpoint{}
if err = dec.Decode(cp); err != nil {
return nil, err
}
return cp, nil
}
// NewService creates a new service, will bypass etcd registration if no // NewService creates a new service, will bypass etcd registration if no
// endpoints specified. // endpoints specified. It will recovery from checkpoint file if a exists a specified checkpoint.
func NewService(idx int, seconds int, path string, client *EtcdClient, cp Checkpoint) (*Service, error) { func NewService(idx int, interval time.Duration, path string, client *EtcdClient, cp *Checkpoint) (*Service, error) {
s := &Service{ s := &Service{
idx: idx, idx: idx,
checkpointInterval: time.Second * time.Duration(seconds), checkpointInterval: interval,
checkpointPath: path, checkpointPath: path,
client: client, client: client,
} }
...@@ -94,10 +130,12 @@ func NewService(idx int, seconds int, path string, client *EtcdClient, cp Checkp ...@@ -94,10 +130,12 @@ func NewService(idx int, seconds int, path string, client *EtcdClient, cp Checkp
s.initialized = make(chan struct{}) s.initialized = make(chan struct{})
if cp != nil { if cp != nil {
for _, item := range cp { for _, item := range *cp {
p := item.ParamConfig p := ParameterWithConfig{
st := item.State Param: item.Param,
s.optMap[p.Param.Name] = newOptimizer(p, st) Config: item.Config,
}
s.optMap[p.Param.Name] = newOptimizer(p, item.State)
} }
} }
return s, nil return s, nil
...@@ -186,13 +224,13 @@ func (s *Service) doCheckpoint() error { ...@@ -186,13 +224,13 @@ func (s *Service) doCheckpoint() error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
cp := make([]ParameterCheckpoint, 0, len(s.optMap)) cp := make([]parameterCheckpoint, len(s.optMap))
index := 0 index := 0
for name, opt := range s.optMap { for name, opt := range s.optMap {
var pc ParameterCheckpoint var pc parameterCheckpoint
pc.ParamConfig.Param.Name = name pc.Param.Name = name
pc.ParamConfig.Param.ElementType = opt.elementType pc.Param.ElementType = opt.elementType
pc.ParamConfig.Param.Content = opt.GetWeights() pc.Param.Content = opt.GetWeights()
pc.State = opt.GetStates() pc.State = opt.GetStates()
cp[index] = pc cp[index] = pc
index++ index++
...@@ -206,12 +244,12 @@ func (s *Service) doCheckpoint() error { ...@@ -206,12 +244,12 @@ func (s *Service) doCheckpoint() error {
cpMeta := checkpointMeta{} cpMeta := checkpointMeta{}
cpMeta.UUID = s.checkpointPath + strconv.Itoa(s.idx) cpMeta.UUID = s.checkpointPath + strconv.Itoa(s.idx)
cpMeta.Timestamp = time.Now().String() cpMeta.Timestamp = time.Now().UnixNano()
h := md5.New() h := md5.New()
cpMeta.Md5sum = hex.EncodeToString(h.Sum(buf.Bytes())) cpMeta.MD5 = hex.EncodeToString(h.Sum(buf.Bytes()))
cpMetajson, _ := json.Marshal(cpMeta) cpMetajson, _ := json.Marshal(cpMeta)
err = s.client.PutKey(filepath.Join(PsCheckpoint, strconv.Itoa(s.idx)), cpMetajson, 3) err = s.client.PutKey(filepath.Join(PsCheckpoint, strconv.Itoa(s.idx)), cpMetajson, 3*time.Second)
if err != nil { if err != nil {
return err return err
} }
...@@ -219,8 +257,12 @@ func (s *Service) doCheckpoint() error { ...@@ -219,8 +257,12 @@ func (s *Service) doCheckpoint() error {
log.Info("checkpoint does not exists.") log.Info("checkpoint does not exists.")
} else { } else {
err = os.Remove(cpMeta.UUID) err = os.Remove(cpMeta.UUID)
if err != nil {
log.Infof("Removing checkpoint %s failed", cpMeta.UUID)
} else {
log.Infof("checkpoint %s already exsits, removing ", cpMeta.UUID) log.Infof("checkpoint %s already exsits, removing ", cpMeta.UUID)
} }
}
f, err := os.Create(cpMeta.UUID) f, err := os.Create(cpMeta.UUID)
defer f.Close() defer f.Close()
if err != nil { if err != nil {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册