diff --git a/go/connection/conn.go b/go/connection/conn.go index ea6bf972f641be9b58c4e8a8260d6c23de8d1163..bc9b5f0617e35f049c3e14f0b441aca2033f9645 100644 --- a/go/connection/conn.go +++ b/go/connection/conn.go @@ -2,6 +2,7 @@ package connection import ( "errors" + "log" "net/rpc" "sync" ) @@ -62,7 +63,11 @@ func (c *Conn) Connect(addr string) error { c.waitConn = nil } } else { - client.Close() + err := client.Close() + if err != nil { + log.Println(err) + } + return errors.New("client already set from a concurrent goroutine") } diff --git a/go/master/service.go b/go/master/service.go index 30859d92963f9b2bb67cea971c771c826c27c3fd..3edbb7e9c0c20602ca7d38855357fe9901d5770f 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -50,7 +50,6 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry { if len(cur.Task.Chunks) > 0 { cur.Task.ID = id - id++ result = append(result, cur) } diff --git a/go/pserver/client.go b/go/pserver/client.go index bbe93cbb6b3f958d1a4a68706fd08639ec89a0a8..d8c65b2e137aac25aa42951b096501505dd19d3f 100644 --- a/go/pserver/client.go +++ b/go/pserver/client.go @@ -83,7 +83,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) { } monitor() - for _ = range ticker.C { + for range ticker.C { monitor() } } diff --git a/go/pserver/service_test.go b/go/pserver/service_test.go index c40cecd0b6b1ad1fdcea19be1d761c4584011643..175c3c3ad876e12d42fa6554e0542ac28e5e35af 100644 --- a/go/pserver/service_test.go +++ b/go/pserver/service_test.go @@ -15,7 +15,7 @@ func TestFull(t *testing.T) { p.Name = "param_a" p.Content = []byte{1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0} p.ElementType = pserver.Int32 - err := s.InitParam(pserver.ParameterWithConfig{p, nil}, nil) + err := s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil) if err != nil { t.FailNow() } @@ -24,7 +24,7 @@ func TestFull(t *testing.T) { p1.Name = "param_b" p1.Content = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} p1.ElementType = pserver.Float32 - err = s.InitParam(pserver.ParameterWithConfig{p1, nil}, nil) + err = s.InitParam(pserver.ParameterWithConfig{Param: p1, Config: nil}, nil) if err != nil { t.FailNow() } @@ -95,13 +95,14 @@ func TestUninitialized(t *testing.T) { func TestBlockUntilInitialized(t *testing.T) { s := pserver.NewService() ch := make(chan struct{}, 2) + errCh := make(chan error, 2) var wg sync.WaitGroup wg.Add(1) go func() { var param pserver.Parameter err := s.GetParam("param_a", ¶m) if err != nil { - t.FailNow() + errCh <- err } wg.Done() ch <- struct{}{} @@ -111,7 +112,7 @@ func TestBlockUntilInitialized(t *testing.T) { go func() { err := s.Save("", nil) if err != nil { - t.FailNow() + errCh <- err } wg.Done() ch <- struct{}{} @@ -123,6 +124,8 @@ func TestBlockUntilInitialized(t *testing.T) { case <-ch: // some function returned before initialization is completed. t.FailNow() + case <-errCh: + t.FailNow() default: } @@ -130,7 +133,7 @@ func TestBlockUntilInitialized(t *testing.T) { p.Name = "param_a" p.Content = []byte{1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0} p.ElementType = pserver.Int32 - err := s.InitParam(pserver.ParameterWithConfig{p, nil}, nil) + err := s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil) if err != nil { t.FailNow() }