timesync.go 6.0 KB
Newer Older
S
sunby 已提交
1 2 3 4 5
package timesync

import (
	"context"
	"math"
S
sunby 已提交
6
	"sync"
S
sunby 已提交
7 8
	"sync/atomic"

S
sunby 已提交
9 10 11 12
	"github.com/zilliztech/milvus-distributed/internal/logutil"

	"go.uber.org/zap"

S
sunby 已提交
13 14 15
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"

	"github.com/zilliztech/milvus-distributed/internal/errors"
S
sunby 已提交
16
	"github.com/zilliztech/milvus-distributed/internal/log"
S
sunby 已提交
17 18 19 20 21 22 23 24 25
	ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
)

type (
	Timestamp = typeutil.Timestamp
	UniqueID  = typeutil.UniqueID

	TimeTickBarrier interface {
		GetTimeTick() (Timestamp, error)
S
sunby 已提交
26 27
		Start()
		Close()
S
sunby 已提交
28 29 30 31 32 33 34 35 36 37 38 39
	}

	softTimeTickBarrier struct {
		peer2LastTt   map[UniqueID]Timestamp
		minTtInterval Timestamp
		lastTt        int64
		outTt         chan Timestamp
		ttStream      ms.MsgStream
		ctx           context.Context
	}

	hardTimeTickBarrier struct {
S
sunby 已提交
40 41 42 43 44 45 46
		peer2Tt    map[UniqueID]Timestamp
		outTt      chan Timestamp
		ttStream   ms.MsgStream
		ctx        context.Context
		wg         sync.WaitGroup
		loopCtx    context.Context
		loopCancel context.CancelFunc
S
sunby 已提交
47 48 49
	}
)

C
cai.zhang 已提交
50
func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
S
sunby 已提交
51
	if len(peerIds) <= 0 {
S
sunby 已提交
52
		log.Debug("[newSoftTimeTickBarrier] Error: peerIds is empty!")
S
sunby 已提交
53 54 55 56 57
		return nil
	}

	sttbarrier := softTimeTickBarrier{}
	sttbarrier.minTtInterval = minTtInterval
S
sunby 已提交
58
	sttbarrier.ttStream = ttStream
S
sunby 已提交
59 60
	sttbarrier.outTt = make(chan Timestamp, 1024)
	sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp)
C
cai.zhang 已提交
61
	sttbarrier.ctx = ctx
S
sunby 已提交
62 63 64 65
	for _, id := range peerIds {
		sttbarrier.peer2LastTt[id] = Timestamp(0)
	}
	if len(peerIds) != len(sttbarrier.peer2LastTt) {
S
sunby 已提交
66
		log.Debug("[newSoftTimeTickBarrier] Warning: there are duplicate peerIds!")
S
sunby 已提交
67 68 69 70 71
	}

	return &sttbarrier
}

S
sunby 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
	select {
	case <-ttBarrier.ctx.Done():
		return 0, errors.Errorf("[GetTimeTick] closed.")
	case ts, ok := <-ttBarrier.outTt:
		if !ok {
			return 0, errors.Errorf("[GetTimeTick] closed.")
		}
		num := len(ttBarrier.outTt)
		for i := 0; i < num; i++ {
			ts, ok = <-ttBarrier.outTt
			if !ok {
				return 0, errors.Errorf("[GetTimeTick] closed.")
			}
		}
		atomic.StoreInt64(&(ttBarrier.lastTt), int64(ts))
		return ts, ttBarrier.ctx.Err()
	}
}

S
sunby 已提交
92
func (ttBarrier *softTimeTickBarrier) Start() {
S
sunby 已提交
93 94
	for {
		select {
C
cai.zhang 已提交
95
		case <-ttBarrier.ctx.Done():
S
sunby 已提交
96
			log.Debug("[TtBarrierStart] shut down", zap.Error(ttBarrier.ctx.Err()))
S
sunby 已提交
97
			return
S
sunby 已提交
98 99
		default:
		}
100
		ttmsgs, _ := ttBarrier.ttStream.Consume()
S
sunby 已提交
101 102 103 104 105 106 107
		if len(ttmsgs.Msgs) > 0 {
			for _, timetickmsg := range ttmsgs.Msgs {
				ttmsg := timetickmsg.(*ms.TimeTickMsg)
				oldT, ok := ttBarrier.peer2LastTt[ttmsg.Base.SourceID]
				// log.Printf("[softTimeTickBarrier] peer(%d)=%d\n", ttmsg.PeerID, ttmsg.Timestamp)

				if !ok {
S
sunby 已提交
108
					log.Warn("[softTimeTickBarrier] peerID not exist", zap.Int64("peerID", ttmsg.Base.SourceID))
S
sunby 已提交
109 110 111 112 113 114 115 116 117
					continue
				}
				if ttmsg.Base.Timestamp > oldT {
					ttBarrier.peer2LastTt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp

					// get a legal Timestamp
					ts := ttBarrier.minTimestamp()
					lastTt := atomic.LoadInt64(&(ttBarrier.lastTt))
					if lastTt != 0 && ttBarrier.minTtInterval > ts-Timestamp(lastTt) {
S
sunby 已提交
118 119
						continue
					}
S
sunby 已提交
120
					ttBarrier.outTt <- ts
S
sunby 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
				}
			}
		}
	}
}

func (ttBarrier *softTimeTickBarrier) minTimestamp() Timestamp {
	tempMin := Timestamp(math.MaxUint64)
	for _, tt := range ttBarrier.peer2LastTt {
		if tt < tempMin {
			tempMin = tt
		}
	}
	return tempMin
}

func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) {
	select {
	case <-ttBarrier.ctx.Done():
		return 0, errors.Errorf("[GetTimeTick] closed.")
	case ts, ok := <-ttBarrier.outTt:
		if !ok {
			return 0, errors.Errorf("[GetTimeTick] closed.")
		}
		return ts, ttBarrier.ctx.Err()
	}
}

S
sunby 已提交
149
func (ttBarrier *hardTimeTickBarrier) Start() {
S
sunby 已提交
150
	// Last timestamp synchronized
S
sunby 已提交
151 152
	ttBarrier.wg.Add(1)
	ttBarrier.loopCtx, ttBarrier.loopCancel = context.WithCancel(ttBarrier.ctx)
S
sunby 已提交
153
	state := Timestamp(0)
S
sunby 已提交
154
	go func(ctx context.Context) {
S
sunby 已提交
155
		defer logutil.LogPanic()
S
sunby 已提交
156 157 158 159
		defer ttBarrier.wg.Done()
		for {
			select {
			case <-ctx.Done():
S
sunby 已提交
160
				log.Debug("[TtBarrierStart] shut down", zap.Error(ttBarrier.ctx.Err()))
S
sunby 已提交
161 162 163
				return
			default:
			}
164
			ttmsgs, _ := ttBarrier.ttStream.Consume()
S
sunby 已提交
165
			if len(ttmsgs.Msgs) > 0 {
S
sunby 已提交
166
				log.Debug("receive tt msg")
S
sunby 已提交
167 168 169 170 171 172 173
				for _, timetickmsg := range ttmsgs.Msgs {
					// Suppose ttmsg.Timestamp from stream is always larger than the previous one,
					// that `ttmsg.Timestamp > oldT`
					ttmsg := timetickmsg.(*ms.TimeTickMsg)

					oldT, ok := ttBarrier.peer2Tt[ttmsg.Base.SourceID]
					if !ok {
S
sunby 已提交
174
						log.Warn("[hardTimeTickBarrier] peerID not exist", zap.Int64("peerID", ttmsg.Base.SourceID))
S
sunby 已提交
175 176
						continue
					}
S
sunby 已提交
177

S
sunby 已提交
178
					if oldT > state {
S
sunby 已提交
179 180
						log.Warn("[hardTimeTickBarrier] peer's timestamp ahead",
							zap.Int64("peerID", ttmsg.Base.SourceID), zap.Uint64("timestamp", ttmsg.Base.Timestamp))
S
sunby 已提交
181
					}
S
sunby 已提交
182

S
sunby 已提交
183
					ttBarrier.peer2Tt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp
S
sunby 已提交
184

S
sunby 已提交
185 186 187 188 189
					newState := ttBarrier.minTimestamp()
					if newState > state {
						ttBarrier.outTt <- newState
						state = newState
					}
S
sunby 已提交
190 191 192
				}
			}
		}
S
sunby 已提交
193 194 195 196 197 198
	}(ttBarrier.loopCtx)
}

func (ttBarrier *hardTimeTickBarrier) Close() {
	ttBarrier.loopCancel()
	ttBarrier.wg.Wait()
S
sunby 已提交
199 200 201 202 203 204 205 206 207 208 209 210
}

func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
	tempMin := Timestamp(math.MaxUint64)
	for _, tt := range ttBarrier.peer2Tt {
		if tt < tempMin {
			tempMin = tt
		}
	}
	return tempMin
}

C
cai.zhang 已提交
211
func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
S
sunby 已提交
212
	if len(peerIds) <= 0 {
S
sunby 已提交
213
		log.Error("[newSoftTimeTickBarrier] peerIds is empty!")
S
sunby 已提交
214 215 216 217
		return nil
	}

	sttbarrier := hardTimeTickBarrier{}
S
sunby 已提交
218
	sttbarrier.ttStream = ttStream
S
sunby 已提交
219 220 221
	sttbarrier.outTt = make(chan Timestamp, 1024)

	sttbarrier.peer2Tt = make(map[UniqueID]Timestamp)
C
cai.zhang 已提交
222
	sttbarrier.ctx = ctx
S
sunby 已提交
223 224 225 226
	for _, id := range peerIds {
		sttbarrier.peer2Tt[id] = Timestamp(0)
	}
	if len(peerIds) != len(sttbarrier.peer2Tt) {
S
sunby 已提交
227
		log.Warn("[newSoftTimeTickBarrier] there are duplicate peerIds!", zap.Int64s("peerIDs", peerIds))
S
sunby 已提交
228 229 230 231
	}

	return &sttbarrier
}