diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 48fe2e6f75a8aa044e17002194228f383ab025b7..a62bc4310e62e5a90787bf660982b8f52ae34ea6 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -32,7 +32,7 @@ func main() { if *endpoints != "" { eps := strings.Split(*endpoints, ",") var err error - store, err = master.NewEtcdStore(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec) + store, err = master.NewEtcd(eps, master.DefaultLockPath, master.DefaultStatePath, *ttlSec) if err != nil { log.Fatal(err) } diff --git a/go/master/etcd_store.go b/go/master/etcd_store.go index d8e95056d5d5c7736ba100898797316c4f392293..21b3e2cb0f539c4d89266d273cb74c6f93026ddd 100644 --- a/go/master/etcd_store.go +++ b/go/master/etcd_store.go @@ -16,8 +16,9 @@ const ( DefaultStatePath = "/master/state" ) -// EtcdStore is the Store implementation backed by etcd. -type EtcdStore struct { +// Etcd is the etcd abstraction that master uses for fault tolerance +// and service registry. +type Etcd struct { lockPath string statePath string ttlSec int @@ -27,8 +28,8 @@ type EtcdStore struct { lock *concurrency.Mutex } -// NewEtcdStore creates a new EtcdStore. -func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (*EtcdStore, error) { +// NewEtcd creates a new Etcd. +func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd, error) { // TODO(helin): gracefully shutdown etcd store. Becuase etcd // store holds a etcd lock, even though the lock will expire // when the lease timeout, we need to implement graceful @@ -59,7 +60,7 @@ func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (* } log.Infof("Successfully acquired lock at %s.", lockPath) - e := &EtcdStore{} + e := &Etcd{} e.client = cli e.lock = lock e.lockPath = lockPath @@ -69,7 +70,7 @@ func NewEtcdStore(endpoints []string, lockPath, statePath string, ttlSec int) (* } // Save saves the state into the etcd. -func (e *EtcdStore) Save(state []byte) error { +func (e *Etcd) Save(state []byte) error { e.mu.Lock() defer e.mu.Unlock() @@ -101,7 +102,7 @@ func (e *EtcdStore) Save(state []byte) error { } // Load loads the state from etcd. -func (e *EtcdStore) Load() ([]byte, error) { +func (e *Etcd) Load() ([]byte, error) { e.mu.Lock() ctx := context.TODO() get := clientv3.OpGet(e.statePath) @@ -119,8 +120,12 @@ func (e *EtcdStore) Load() ([]byte, error) { } e.lock = concurrency.NewMutex(sess, e.lockPath) - e.lock.Lock(context.TODO()) + err = e.lock.Lock(context.TODO()) e.mu.Unlock() + if err != nil { + return nil, err + } + return e.Load() }