diff --git a/go/master/client.go b/go/master/client.go index de883bf4b9a3de8d6d6e35e8e808dcf7ba54cb46..90b99470978d21480eb2d8097e7dec217b9524eb 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -69,7 +69,10 @@ func (c *Client) getRecords() { // We treat a task as finished whenever the last data // instance of the task is read. This is not exactly // correct, but a reasonable approximation. - c.taskFinished(t.Meta.ID) + err = c.taskFinished(t.Meta.ID) + if err != nil { + log.Errorln(err) + } } } diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go index 49263474c8fe2410ffb6db93a9647f5ab066b06b..70dc09bf9461142ff6498355a5858ba9a1320510 100644 --- a/go/master/client_internal_test.go +++ b/go/master/client_internal_test.go @@ -66,11 +66,21 @@ func TestGetFinishTask(t *testing.T) { for i := 0; i < totalTask*chunkPerTask; i++ { w := recordio.NewWriter(f, -1, -1) - w.Write(nil) + _, err = w.Write(nil) + if err != nil { + panic(err) + } + // call Close to force RecordIO writing a chunk. - w.Close() + err = w.Close() + if err != nil { + panic(err) + } + } + err = f.Close() + if err != nil { + panic(err) } - f.Close() // Manually intialize client to avoid calling c.getRecords() c := &Client{} @@ -79,7 +89,11 @@ func TestGetFinishTask(t *testing.T) { ch := make(chan string, 1) ch <- addr go c.monitorMaster(ch) - c.SetDataset([]string{path}) + err = c.SetDataset([]string{path}) + if err != nil { + panic(err) + } + checkOnePass := func(i int) { var tasks []Task for idx := 0; idx < totalTask; idx++ { diff --git a/go/master/client_test.go b/go/master/client_test.go index 6666d3860c412daa8711fbfa2d729a261b3fd887..bc92dc5ac973d62434b71e09705143ac8fbbd2fa 100644 --- a/go/master/client_test.go +++ b/go/master/client_test.go @@ -57,14 +57,30 @@ func TestNextRecord(t *testing.T) { w := recordio.NewWriter(f, -1, -1) for i := 0; i < total; i++ { - w.Write([]byte{byte(i)}) + _, err = w.Write([]byte{byte(i)}) + if err != nil { + panic(err) + } + } + + err = w.Close() + if err != nil { + panic(err) + } + + err = f.Close() + if err != nil { + panic(err) } - w.Close() - f.Close() + curAddr := make(chan string, 1) curAddr <- fmt.Sprintf(":%d", p) c := master.NewClient(curAddr, 10) - c.SetDataset([]string{path}) + err = c.SetDataset([]string{path}) + if err != nil { + panic(err) + } + for pass := 0; pass < 50; pass++ { received := make(map[byte]bool) for i := 0; i < total; i++ { diff --git a/go/pserver/client/client.go b/go/pserver/client/client.go index aa8bfe30c26fcc0875ad479ecd562700ccefa5a3..b4a45e1c21056550ef9264746bcf58a8abb369a1 100644 --- a/go/pserver/client/client.go +++ b/go/pserver/client/client.go @@ -233,7 +233,7 @@ func (c *Client) Save(path string) error { func strHash(s string) uint32 { h := fnv.New32a() - h.Write([]byte(s)) + _, _ = h.Write([]byte(s)) return h.Sum32() } diff --git a/go/pserver/client/client_test.go b/go/pserver/client/client_test.go index aab91556b4b91fab6de66322987eabe24f1b0f70..5c89882a297323034be2875a6d4cb71d715eb0c2 100644 --- a/go/pserver/client/client_test.go +++ b/go/pserver/client/client_test.go @@ -79,15 +79,33 @@ func initEtcdClient() { log.Errorf("err %v", err) } ctx, cancel := context.WithTimeout(context.Background(), timeout) - client.Delete(ctx, pserver.PsDesired) - client.Delete(ctx, pserver.PsPath) - client.Put(ctx, pserver.PsDesired, strconv.Itoa(numPserver)) + _, err = client.Delete(ctx, pserver.PsDesired) + if err != nil { + panic(err) + } + + _, err = client.Delete(ctx, pserver.PsPath) + if err != nil { + panic(err) + } + + _, err = client.Put(ctx, pserver.PsDesired, strconv.Itoa(numPserver)) + if err != nil { + panic(err) + } + ports := initClient() for i := 0; i < numPserver; i++ { - client.Put(ctx, pserver.PsPath+strconv.Itoa(i), ":"+strconv.Itoa(ports[i])) + _, err = client.Put(ctx, pserver.PsPath+strconv.Itoa(i), ":"+strconv.Itoa(ports[i])) + if err != nil { + panic(err) + } } cancel() - client.Close() + err = client.Close() + if err != nil { + panic(err) + } } type selector bool diff --git a/go/pserver/service.go b/go/pserver/service.go index fec2ec61dc67756439d9fa478788d1f155876538..5cb0293b9724c57607053bef312a90adaa21811b 100644 --- a/go/pserver/service.go +++ b/go/pserver/service.go @@ -219,7 +219,7 @@ func (s *Service) GetParam(name string, parameter *Parameter) error { } // pserver save checkpoint -func (s *Service) doCheckpoint() error { +func (s *Service) doCheckpoint() (err error) { <-s.initialized s.mu.Lock() defer s.mu.Unlock() @@ -237,9 +237,9 @@ func (s *Service) doCheckpoint() error { } var buf bytes.Buffer encoder := gob.NewEncoder(&buf) - err := encoder.Encode(cp) + err = encoder.Encode(cp) if err != nil { - return err + return } cpMeta := checkpointMeta{} @@ -248,10 +248,14 @@ func (s *Service) doCheckpoint() error { h := md5.New() cpMeta.MD5 = hex.EncodeToString(h.Sum(buf.Bytes())) - cpMetajson, _ := json.Marshal(cpMeta) + cpMetajson, err := json.Marshal(cpMeta) + if err != nil { + return + } + err = s.client.PutKey(filepath.Join(PsCheckpoint, strconv.Itoa(s.idx)), cpMetajson, 3*time.Second) if err != nil { - return err + return } if _, err = os.Stat(cpMeta.UUID); os.IsNotExist(err) { log.Info("checkpoint does not exists.") @@ -264,15 +268,32 @@ func (s *Service) doCheckpoint() error { } } f, err := os.Create(cpMeta.UUID) - defer f.Close() if err != nil { - return err + return } + + defer func() { + closeErr := f.Close() + if closeErr != nil { + if err != nil { + log.Errorln(closeErr) + } else { + // Set closeErr as return value. + err = closeErr + } + } + }() + writer := bufio.NewWriter(f) _, err = writer.Write(buf.Bytes()) - writer.Flush() if err != nil { - return err + return } - return nil + + err = writer.Flush() + if err != nil { + return + } + + return }