From 32c92640f093e27eb40d1e67f74ab07f07754945 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 25 Oct 2017 16:10:43 -0700 Subject: [PATCH] Fix pserver checkpoint The pserver checkpoint before failed because the MD5 checksum is calculated incorrectly. Now changed to CRC32 checksum. --- go/cmd/pserver/pserver.go | 4 +- go/pserver/optimizer.go | 6 +- go/pserver/service.go | 58 ++++++++++--------- go/pserver/service_internal_test.go | 86 +++++++++++++++++++++++++++++ go/pserver/service_test.go | 4 -- 5 files changed, 124 insertions(+), 34 deletions(-) create mode 100644 go/pserver/service_internal_test.go diff --git a/go/cmd/pserver/pserver.go b/go/cmd/pserver/pserver.go index 90f9cf3fc..1358801c1 100644 --- a/go/cmd/pserver/pserver.go +++ b/go/cmd/pserver/pserver.go @@ -67,7 +67,7 @@ func main() { cp, err = pserver.LoadCheckpoint(e, idx) if err != nil { if err == pserver.ErrCheckpointNotFound { - log.Info("Could not find the pserver checkpoint.") + log.Info("load checkpoint error", "error", err) } else { panic(err) } @@ -99,7 +99,7 @@ func main() { candy.Must(err) go func() { - log.Info("starting pserver", log.Ctx{"port": *port}) + log.Info("serving pserver", log.Ctx{"port": *port}) err = http.Serve(l, nil) candy.Must(err) }() diff --git a/go/pserver/optimizer.go b/go/pserver/optimizer.go index e04c86de0..160385073 100644 --- a/go/pserver/optimizer.go +++ b/go/pserver/optimizer.go @@ -71,9 +71,13 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer cstate = unsafe.Pointer(&s[0]) } + var cptr (*C.uchar) + if len(c) > 0 { + cptr = (*C.uchar)(&c[0]) + } o.config = c o.opt = C.paddle_create_optimizer( - (*C.uchar)(&c[0]), + cptr, C.int(len(c)), C.paddle_element_type(p.ElementType), cbuffer, diff --git a/go/pserver/service.go b/go/pserver/service.go index 6f66faaf2..f703d99a2 100644 --- a/go/pserver/service.go +++ b/go/pserver/service.go @@ -17,12 +17,11 @@ package pserver import ( "bufio" "bytes" - "crypto/md5" "encoding/gob" - "encoding/hex" "encoding/json" "errors" "fmt" + "hash/crc32" "io/ioutil" "os" "path" @@ -40,7 +39,7 @@ type ElementType int // ErrCheckpointNotFound indicates that the pserver checkpoint could // not be found. -var ErrCheckpointNotFound = errors.New("checkpoint not found") +var ErrCheckpointNotFound = errors.New("checkpoint not found in etcd") // RPC error message. const ( @@ -76,7 +75,7 @@ type ParameterWithConfig struct { type checkpointMeta struct { UUID string `json:"uuid"` Path string `json:"path"` - MD5 string `json:"md5"` + CRC32 uint32 `json:"crc32"` Timestamp int64 `json:"timestamp"` } @@ -92,7 +91,7 @@ type Service struct { idx int checkpointInterval time.Duration checkpointPath string - client *EtcdClient + client KVStore mu sync.Mutex optMap map[string]*optimizer @@ -104,7 +103,12 @@ type parameterCheckpoint struct { State []byte } -func loadMeta(e *EtcdClient, idx int) (meta checkpointMeta, err error) { +type KVStore interface { + GetKey(key string, timeout time.Duration) ([]byte, error) + PutKey(key string, value []byte, timeout time.Duration, withLease bool) error +} + +func loadMeta(e KVStore, idx int) (meta checkpointMeta, err error) { v, err := e.GetKey(PsCheckpoint+strconv.Itoa(idx), 3*time.Second) if err != nil { return @@ -123,7 +127,7 @@ func loadMeta(e *EtcdClient, idx int) (meta checkpointMeta, err error) { } // LoadCheckpoint loads checkpoint from file. -func LoadCheckpoint(e *EtcdClient, idx int) (Checkpoint, error) { +func LoadCheckpoint(e KVStore, idx int) (Checkpoint, error) { log.Info("Loading checkpoint", "pserver index", idx) defer traceTime(time.Now(), "load checkpoint") @@ -137,11 +141,8 @@ func LoadCheckpoint(e *EtcdClient, idx int) (Checkpoint, error) { return nil, err } - // TODO(helin): change MD5 to CRC since CRC is better for file - // checksum in our use case (emphasize speed over security). - h := md5.New() - md5 := hex.EncodeToString(h.Sum(content)) - if md5 != cpMeta.MD5 { + crc32 := crc32.ChecksumIEEE(content) + if crc32 != cpMeta.CRC32 { return nil, errors.New(WrongChecksum) } @@ -150,12 +151,13 @@ func LoadCheckpoint(e *EtcdClient, idx int) (Checkpoint, error) { if err = dec.Decode(&cp); err != nil { return nil, err } + return cp, nil } // NewService creates a new service, will bypass etcd registration if no // endpoints specified. It will recovery from checkpoint file if a exists a specified checkpoint. -func NewService(idx int, interval time.Duration, path string, client *EtcdClient, cp Checkpoint) (*Service, error) { +func NewService(idx int, interval time.Duration, path string, client KVStore, cp Checkpoint) (*Service, error) { s := &Service{ idx: idx, checkpointInterval: interval, @@ -173,6 +175,7 @@ func NewService(idx int, interval time.Duration, path string, client *EtcdClient } s.optMap[p.Param.Name] = newOptimizer(p, item.State) } + close(s.initialized) } return s, nil } @@ -221,7 +224,7 @@ func (s *Service) FinishInitParams(_ int, _ *int) error { for range t { err := s.checkpoint() if err != nil { - log.Error("finish init params error", log.Ctx{"error": err}) + log.Error("checkpoint error", log.Ctx{"error": err}) } } }() @@ -274,6 +277,7 @@ func (s *Service) GetParam(name string, parameter *Parameter) error { parameter.Name = name parameter.ElementType = opt.elementType parameter.Content = opt.GetWeights() + log.Info("sending parameter to the trainer", "name", parameter.Name, "size", len(parameter.Content), "type", parameter.ElementType) return nil } @@ -354,20 +358,29 @@ func (s *Service) checkpoint() (err error) { oldMeta, err := loadMeta(s.client, s.idx) if err == ErrCheckpointNotFound { - log.Info("Do not have existing checkpoint.") + log.Info("old meta not found, skip removing old meta") err = nil + } else if err == nil { + log.Info("removing old meta") + if oldMeta.Path != "" { + rmErr := os.Remove(oldMeta.Path) + if rmErr != nil { + // log error, but still treat checkpoint as + // successful. + log.Error("remove old meta file error", log.Ctx{"error": rmErr}) + } + } } if err != nil { return } - h := md5.New() - md5 := hex.EncodeToString(h.Sum(buf.Bytes())) + crc32 := crc32.ChecksumIEEE(buf.Bytes()) cpMeta := checkpointMeta{ UUID: id, Timestamp: time.Now().UnixNano(), - MD5: md5, + CRC32: crc32, Path: p, } @@ -381,14 +394,5 @@ func (s *Service) checkpoint() (err error) { return } - if oldMeta.Path != "" { - rmErr := os.Remove(oldMeta.Path) - if rmErr != nil { - // log error, but still treat checkpoint as - // successful. - log.Error("remove old meta file error", log.Ctx{"error": rmErr}) - } - } - return } diff --git a/go/pserver/service_internal_test.go b/go/pserver/service_internal_test.go new file mode 100644 index 000000000..36eca5112 --- /dev/null +++ b/go/pserver/service_internal_test.go @@ -0,0 +1,86 @@ +package pserver + +import ( + "bytes" + "encoding/binary" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const testDir = "./test_data" + +type myKV struct { + m map[string][]byte +} + +func (m *myKV) GetKey(key string, timeout time.Duration) ([]byte, error) { + if m.m == nil { + m.m = make(map[string][]byte) + } + return m.m[key], nil +} + +func (m *myKV) PutKey(key string, value []byte, timeout time.Duration, withLease bool) error { + if m.m == nil { + m.m = make(map[string][]byte) + } + m.m[key] = value + return nil +} + +func TestCheckpoint(t *testing.T) { + kv := &myKV{} + s, err := NewService(0, time.Hour, testDir, kv, nil) + assert.Nil(t, err) + err = s.checkpoint() + assert.Nil(t, err) + _, err = LoadCheckpoint(kv, 0) + assert.Nil(t, err) +} + +func float32ToByte(f float32) []byte { + var buf bytes.Buffer + err := binary.Write(&buf, binary.LittleEndian, f) + if err != nil { + fmt.Println("binary.Write failed:", err) + } + return buf.Bytes() +} + +func TestCheckpointWithData(t *testing.T) { + kv := &myKV{} + s, err := NewService(0, time.Hour, testDir, kv, nil) + assert.Nil(t, err) + + var content []byte + for i := 0; i < 50000; i++ { + content = append(content, float32ToByte(float32(i))...) + } + + p1 := Parameter{Name: "p1", ElementType: 1, Content: content} + err = s.InitParam(ParameterWithConfig{Param: p1}, nil) + assert.Nil(t, err) + + err = s.FinishInitParams(0, nil) + assert.Nil(t, err) + + var p2 Parameter + err = s.GetParam(p1.Name, &p2) + assert.Nil(t, err) + assert.Equal(t, p1, p2) + + err = s.checkpoint() + assert.Nil(t, err) + cp, err := LoadCheckpoint(kv, 0) + assert.Nil(t, err) + s1, err := NewService(0, time.Hour, testDir, kv, cp) + assert.Nil(t, err) + + var p3 Parameter + err = s1.GetParam(p1.Name, &p3) + assert.Nil(t, err) + assert.Equal(t, p1, p3) +} diff --git a/go/pserver/service_test.go b/go/pserver/service_test.go index be648cd1e..b6f4566eb 100644 --- a/go/pserver/service_test.go +++ b/go/pserver/service_test.go @@ -178,7 +178,3 @@ func TestBlockUntilInitialized(t *testing.T) { wg.Wait() } - -func TestCheckpointSpeed(t *testing.T) { - //TODO(zhihong): test speed -} -- GitLab