etcd_store.go 3.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
package master

import (
	"context"
	"sync"

	"github.com/coreos/etcd/clientv3"
	"github.com/coreos/etcd/clientv3/concurrency"
	log "github.com/sirupsen/logrus"
)

const (
	// DefaultLockPath is the default etcd master lock path.
	DefaultLockPath = "/master/lock"
	// DefaultStatePath is the default etcd key for master state.
	DefaultStatePath = "/master/state"
)

H
Helin Wang 已提交
19 20 21
// Etcd is the etcd abstraction that master uses for fault tolerance
// and service registry.
type Etcd struct {
22 23 24 25 26 27 28 29 30
	lockPath  string
	statePath string
	ttlSec    int
	client    *clientv3.Client

	mu   sync.Mutex
	lock *concurrency.Mutex
}

H
Helin Wang 已提交
31 32
// NewEtcd creates a new Etcd.
func NewEtcd(endpoints []string, lockPath, statePath string, ttlSec int) (*Etcd, error) {
33 34 35 36
	// TODO(helin): gracefully shutdown etcd store. Becuase etcd
	// store holds a etcd lock, even though the lock will expire
	// when the lease timeout, we need to implement graceful
	// shutdown to release the lock.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		return nil, err
	}

	sess, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlSec))
	if err != nil {
		return nil, err
	}

	lock := concurrency.NewMutex(sess, lockPath)
	// It's fine for the lock to get stuck, in this case we have
	// multiple master servers running (only configured to have
	// one master running, but split-brain problem may cuase
	// multiple master servers running), and the cluster management
	// software will kill one of them.
	log.Infof("Trying to acquire lock at %s.", lockPath)
	err = lock.Lock(context.TODO())
	if err != nil {
		return nil, err
	}
	log.Infof("Successfully acquired lock at %s.", lockPath)

H
Helin Wang 已提交
63
	e := &Etcd{}
64 65 66 67 68 69 70 71 72
	e.client = cli
	e.lock = lock
	e.lockPath = lockPath
	e.statePath = statePath
	e.ttlSec = ttlSec
	return e, nil
}

// Save saves the state into the etcd.
H
Helin Wang 已提交
73
func (e *Etcd) Save(state []byte) error {
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
	e.mu.Lock()
	defer e.mu.Unlock()

	ctx := context.TODO()
	put := clientv3.OpPut(e.statePath, string(state))
	resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit()
	if err != nil {
		return err
	}

	if !resp.Succeeded {
		log.Errorln("No longer owns the lock, trying to lock and save again.")
		sess, err := concurrency.NewSession(e.client, concurrency.WithTTL(e.ttlSec))
		if err != nil {
			return err
		}

		e.lock = concurrency.NewMutex(sess, e.lockPath)
		log.Infof("Try to acquire lock at %s.", e.lockPath)
		err = e.lock.Lock(context.TODO())
		if err != nil {
			return err
		}
		log.Infof("Successfully acquired lock at %s.", e.lockPath)
		return e.Save(state)
	}

	return nil
}

// Load loads the state from etcd.
H
Helin Wang 已提交
105
func (e *Etcd) Load() ([]byte, error) {
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
	e.mu.Lock()
	ctx := context.TODO()
	get := clientv3.OpGet(e.statePath)

	resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(get).Commit()
	if err != nil {
		return nil, err
	}

	if !resp.Succeeded {
		log.Errorln("No longer owns the lock, trying to lock and load again.")
		sess, err := concurrency.NewSession(e.client)
		if err != nil {
			return nil, err
		}

		e.lock = concurrency.NewMutex(sess, e.lockPath)
H
Helin Wang 已提交
123
		err = e.lock.Lock(context.TODO())
124
		e.mu.Unlock()
H
Helin Wang 已提交
125 126 127 128
		if err != nil {
			return nil, err
		}

129 130 131 132 133 134 135 136 137 138 139 140 141 142
		return e.Load()
	}

	kvs := resp.Responses[0].GetResponseRange().Kvs
	if len(kvs) == 0 {
		// No state exists
		e.mu.Unlock()
		return nil, nil
	}

	state := kvs[0].Value
	e.mu.Unlock()
	return state, nil
}