tsafe.go 1.1 KB
Newer Older
1
package querynode
2 3 4 5 6 7 8 9 10 11 12

import (
	"sync"
)

type tSafeWatcher struct {
	notifyChan chan bool
}

func newTSafeWatcher() *tSafeWatcher {
	return &tSafeWatcher{
C
cai.zhang 已提交
13
		notifyChan: make(chan bool, 1),
14 15 16 17 18 19 20 21 22 23 24 25 26
	}
}

func (watcher *tSafeWatcher) notify() {
	if len(watcher.notifyChan) == 0 {
		watcher.notifyChan <- true
	}
}

func (watcher *tSafeWatcher) hasUpdate() {
	<-watcher.notifyChan
}

27
type tSafer interface {
28 29 30 31 32
	get() Timestamp
	set(t Timestamp)
	registerTSafeWatcher(t *tSafeWatcher)
}

33
type tSafe struct {
Q
quicksilver 已提交
34
	tSafeMu     sync.Mutex // guards all fields
35 36 37 38
	tSafe       Timestamp
	watcherList []*tSafeWatcher
}

39 40
func newTSafe() tSafer {
	var t tSafer = &tSafe{
41 42
		watcherList: make([]*tSafeWatcher, 0),
	}
X
XuanYang-cn 已提交
43
	return t
44 45
}

46
func (ts *tSafe) registerTSafeWatcher(t *tSafeWatcher) {
B
bigsheeper 已提交
47 48
	ts.tSafeMu.Lock()
	defer ts.tSafeMu.Unlock()
49 50 51
	ts.watcherList = append(ts.watcherList, t)
}

52
func (ts *tSafe) get() Timestamp {
53 54 55 56 57
	ts.tSafeMu.Lock()
	defer ts.tSafeMu.Unlock()
	return ts.tSafe
}

58
func (ts *tSafe) set(t Timestamp) {
59
	ts.tSafeMu.Lock()
B
bigsheeper 已提交
60 61
	defer ts.tSafeMu.Unlock()

62 63 64 65 66
	ts.tSafe = t
	for _, watcher := range ts.watcherList {
		watcher.notify()
	}
}