master.go 11.6 KB
Newer Older
1 2 3 4
package master

import (
	"context"
Z
zhenshan.cao 已提交
5 6 7 8 9 10 11 12
	"log"
	"math/rand"
	"net"
	"strconv"
	"sync"
	"sync/atomic"
	"time"

S
sunby 已提交
13 14 15 16 17 18
	"github.com/zilliztech/milvus-distributed/internal/querynode/client"

	indexbuilderclient "github.com/zilliztech/milvus-distributed/internal/indexbuilder/client"

	writerclient "github.com/zilliztech/milvus-distributed/internal/writenode/client"

Z
zhenshan.cao 已提交
19
	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
N
neza2017 已提交
20
	ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
21
	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
B
bigsheeper 已提交
22
	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
N
neza2017 已提交
23
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
24 25
	"go.etcd.io/etcd/clientv3"
	"google.golang.org/grpc"
26 27 28
)

// Server is the pd server.
N
neza2017 已提交
29

B
bigsheeper 已提交
30 31 32 33 34
type (
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
)

35 36 37 38 39 40 41 42 43 44 45 46 47
type Master struct {
	// Server state.
	isServing int64

	// Server start timestamp
	startTimestamp int64

	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel func()
	serverLoopWg     sync.WaitGroup

	//grpc server
48
	grpcServer *grpc.Server
49
	grpcErr    chan error
50

Z
zhenshan.cao 已提交
51
	kvBase               *etcdkv.EtcdKV
52
	scheduler            *ddRequestScheduler
53 54 55
	flushSch             *FlushScheduler
	indexBuildSch        *IndexBuildScheduler
	indexLoadSch         *IndexLoadScheduler
56 57
	metaTable            *metaTable
	timesSyncMsgProducer *timeSyncMsgProducer
Z
zhenshan.cao 已提交
58 59

	// tso ticker
60
	tsoTicker *time.Ticker
Z
zhenshan.cao 已提交
61

62 63 64
	// Add callback functions at different stages
	startCallbacks []func()
	closeCallbacks []func()
N
neza2017 已提交
65

S
sunby 已提交
66 67 68
	segmentManager   *SegmentManager
	segmentAssigner  *SegmentAssigner
	statProcessor    *StatsProcessor
69
	segmentStatusMsg ms.MsgStream
B
bigsheeper 已提交
70 71 72 73 74

	//id allocator
	idAllocator *GlobalIDAllocator
	//tso allocator
	tsoAllocator *GlobalTSOAllocator
S
sunby 已提交
75 76

	runtimeStats *RuntimeStats
77 78
}

Z
zhenshan.cao 已提交
79
func newKVBase(kvRoot string, etcdAddr []string) *etcdkv.EtcdKV {
80
	cli, _ := clientv3.New(clientv3.Config{
B
bigsheeper 已提交
81
		Endpoints:   etcdAddr,
82 83
		DialTimeout: 5 * time.Second,
	})
Z
zhenshan.cao 已提交
84
	kvBase := etcdkv.NewEtcdKV(cli, kvRoot)
85
	return kvBase
Z
zhenshan.cao 已提交
86 87
}

C
cai.zhang 已提交
88
func Init() {
B
bigsheeper 已提交
89
	rand.Seed(time.Now().UnixNano())
90
	Params.Init()
B
bigsheeper 已提交
91 92
}

93
// CreateServer creates the UNINITIALIZED pd server with given configuration.
94
func CreateServer(ctx context.Context) (*Master, error) {
C
cai.zhang 已提交
95
	//Init(etcdAddr, kvRootPath)
96
	etcdAddress := Params.EtcdAddress
C
cai.zhang 已提交
97 98
	metaRootPath := Params.MetaRootPath
	kvRootPath := Params.KvRootPath
99
	pulsarAddr := Params.PulsarAddress
B
bigsheeper 已提交
100

101
	etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{etcdAddress}})
B
bigsheeper 已提交
102 103 104
	if err != nil {
		return nil, err
	}
Z
zhenshan.cao 已提交
105 106
	etcdKV := etcdkv.NewEtcdKV(etcdClient, metaRootPath)
	metakv, err := NewMetaTable(etcdKV)
B
bigsheeper 已提交
107 108 109 110
	if err != nil {
		return nil, err
	}

N
neza2017 已提交
111
	//timeSyncMsgProducer
112
	tsMsgProducer, err := NewTimeSyncMsgProducer(ctx)
N
neza2017 已提交
113 114 115 116
	if err != nil {
		return nil, err
	}
	pulsarProxyStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
117 118
	pulsarProxyStream.SetPulsarClient(pulsarAddr)
	pulsarProxyStream.CreatePulsarConsumers(Params.ProxyTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
N
neza2017 已提交
119 120
	pulsarProxyStream.Start()
	var proxyStream ms.MsgStream = pulsarProxyStream
121
	proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, Params.ProxyIDList, Params.SoftTimeTickBarrierInterval)
122
	tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier)
N
neza2017 已提交
123 124

	pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
125 126
	pulsarWriteStream.SetPulsarClient(pulsarAddr)
	pulsarWriteStream.CreatePulsarConsumers(Params.WriteNodeTimeTickChannelNames, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
N
neza2017 已提交
127 128
	pulsarWriteStream.Start()
	var writeStream ms.MsgStream = pulsarWriteStream
129
	writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, Params.WriteNodeIDList)
130
	tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier)
N
neza2017 已提交
131

Z
zhenshan.cao 已提交
132 133 134 135 136
	pulsarDDStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
	pulsarDDStream.SetPulsarClient(pulsarAddr)
	pulsarDDStream.CreatePulsarProducers(Params.DDChannelNames)
	tsMsgProducer.SetDDSyncStream(pulsarDDStream)

N
neza2017 已提交
137
	pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
138 139
	pulsarDMStream.SetPulsarClient(pulsarAddr)
	pulsarDMStream.CreatePulsarProducers(Params.InsertChannelNames)
140
	tsMsgProducer.SetDMSyncStream(pulsarDMStream)
N
neza2017 已提交
141 142

	pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
143 144
	pulsarK2SStream.SetPulsarClient(pulsarAddr)
	pulsarK2SStream.CreatePulsarProducers(Params.K2SChannelNames)
145
	tsMsgProducer.SetK2sSyncStream(pulsarK2SStream)
N
neza2017 已提交
146

S
sunby 已提交
147 148 149 150 151
	proxyTtBarrierWatcher := make(chan *ms.TimeTickMsg, 1024)
	writeNodeTtBarrierWatcher := make(chan *ms.TimeTickMsg, 1024)
	tsMsgProducer.WatchProxyTtBarrier(proxyTtBarrierWatcher)
	tsMsgProducer.WatchWriteNodeTtBarrier(writeNodeTtBarrierWatcher)

N
neza2017 已提交
152 153
	// stats msg stream
	statsMs := ms.NewPulsarMsgStream(ctx, 1024)
154
	statsMs.SetPulsarClient(pulsarAddr)
155
	statsMs.CreatePulsarConsumers([]string{Params.QueryNodeStatsChannelName}, Params.MsgChannelSubName, ms.NewUnmarshalDispatcher(), 1024)
N
neza2017 已提交
156 157
	statsMs.Start()

158
	m := &Master{
159 160
		ctx:                  ctx,
		startTimestamp:       time.Now().Unix(),
161
		kvBase:               newKVBase(kvRootPath, []string{etcdAddress}),
162 163 164 165
		metaTable:            metakv,
		timesSyncMsgProducer: tsMsgProducer,
		grpcErr:              make(chan error),
		segmentStatusMsg:     statsMs,
166
	}
B
bigsheeper 已提交
167 168

	//init idAllocator
169
	m.idAllocator = NewGlobalIDAllocator("idTimestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "gid"))
B
bigsheeper 已提交
170 171 172 173 174
	if err := m.idAllocator.Initialize(); err != nil {
		return nil, err
	}

	//init tsoAllocator
175
	m.tsoAllocator = NewGlobalTSOAllocator("timestamp", tsoutil.NewTSOKVBase([]string{etcdAddress}, kvRootPath, "tso"))
B
bigsheeper 已提交
176 177 178 179
	if err := m.tsoAllocator.Initialize(); err != nil {
		return nil, err
	}

Z
zhenshan.cao 已提交
180 181 182 183
	m.scheduler = NewDDRequestScheduler(ctx)
	m.scheduler.SetDDMsgStream(pulsarDDStream)
	m.scheduler.SetIDAllocator(func() (UniqueID, error) { return m.idAllocator.AllocOne() })

X
XuanYang-cn 已提交
184
	flushClient, err := writerclient.NewWriterClient(Params.EtcdAddress, Params.MetaRootPath, Params.WriteNodeSegKvSubPath, pulsarDDStream)
S
sunby 已提交
185 186 187 188 189 190 191 192
	if err != nil {
		return nil, err
	}
	buildIndexClient, err := indexbuilderclient.NewBuildIndexClient(ctx, Params.IndexBuilderAddress)
	if err != nil {
		return nil, err
	}
	loadIndexClient := client.NewLoadIndexClient(ctx, Params.PulsarAddress, Params.LoadIndexChannelNames)
193 194 195

	m.indexLoadSch = NewIndexLoadScheduler(ctx, loadIndexClient, m.metaTable)
	m.indexBuildSch = NewIndexBuildScheduler(ctx, buildIndexClient, m.metaTable, m.indexLoadSch)
S
sunby 已提交
196
	m.flushSch = NewFlushScheduler(ctx, flushClient, m.metaTable, m.indexBuildSch, func() (Timestamp, error) { return m.tsoAllocator.AllocOne() })
197

S
sunby 已提交
198 199 200 201
	m.segmentAssigner = NewSegmentAssigner(ctx, metakv,
		func() (Timestamp, error) { return m.tsoAllocator.AllocOne() },
		proxyTtBarrierWatcher,
	)
202

S
sunby 已提交
203
	m.segmentManager, err = NewSegmentManager(ctx, metakv,
B
bigsheeper 已提交
204 205
		func() (UniqueID, error) { return m.idAllocator.AllocOne() },
		func() (Timestamp, error) { return m.tsoAllocator.AllocOne() },
S
sunby 已提交
206
		writeNodeTtBarrierWatcher,
207
		m.flushSch,
S
sunby 已提交
208 209 210 211 212 213
		m.segmentAssigner)

	if err != nil {
		return nil, err
	}

S
sunby 已提交
214 215
	m.runtimeStats = NewRuntimeStats()
	m.statProcessor = NewStatsProcessor(metakv, m.runtimeStats,
S
sunby 已提交
216
		func() (Timestamp, error) { return m.tsoAllocator.AllocOne() },
B
bigsheeper 已提交
217 218
	)

219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
	m.grpcServer = grpc.NewServer()
	masterpb.RegisterMasterServer(m.grpcServer, m)
	return m, nil
}

// AddStartCallback adds a callback in the startServer phase.
func (s *Master) AddStartCallback(callbacks ...func()) {
	s.startCallbacks = append(s.startCallbacks, callbacks...)
}

// AddCloseCallback adds a callback in the Close phase.
func (s *Master) AddCloseCallback(callbacks ...func()) {
	s.closeCallbacks = append(s.closeCallbacks, callbacks...)
}

// Close closes the server.
func (s *Master) Close() {
	if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) {
		// server is already closed
		return
	}

	log.Print("closing server")

	s.stopServerLoop()
S
sunby 已提交
244 245
	s.segmentAssigner.Close()
	s.segmentManager.Close()
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
	if s.kvBase != nil {
		s.kvBase.Close()
	}

	// Run callbacks
	for _, cb := range s.closeCallbacks {
		cb()
	}

	log.Print("close server")
}

// IsClosed checks whether server is closed or not.
func (s *Master) IsClosed() bool {
	return atomic.LoadInt64(&s.isServing) == 0
}

N
neza2017 已提交
263 264 265 266
func (s *Master) IsServing() bool {
	return !s.IsClosed()
}

267
// Run runs the pd server.
B
bigsheeper 已提交
268
func (s *Master) Run(grpcPort int64) error {
Z
zhenshan.cao 已提交
269
	if err := s.startServerLoop(s.ctx, grpcPort); err != nil {
270 271
		return err
	}
S
sunby 已提交
272 273
	s.segmentAssigner.Start()
	s.segmentManager.Start()
Z
zhenshan.cao 已提交
274
	atomic.StoreInt64(&s.isServing, 1)
275

Z
zhenshan.cao 已提交
276 277 278 279
	// Run callbacks
	for _, cb := range s.startCallbacks {
		cb()
	}
280 281 282 283 284 285 286 287 288 289 290 291 292 293

	return nil
}

// Context returns the context of server.
func (s *Master) Context() context.Context {
	return s.ctx
}

// LoopContext returns the loop context of server.
func (s *Master) LoopContext() context.Context {
	return s.serverLoopCtx
}

Z
zhenshan.cao 已提交
294
func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
295 296
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(ctx)
	//go s.Se
B
bigsheeper 已提交
297

N
neza2017 已提交
298
	s.serverLoopWg.Add(1)
299
	if err := s.timesSyncMsgProducer.Start(); err != nil {
N
neza2017 已提交
300 301 302
		return err
	}

Z
zhenshan.cao 已提交
303 304 305 306
	s.serverLoopWg.Add(1)
	if err := s.scheduler.Start(); err != nil {
		return err
	}
307 308 309 310 311 312 313 314 315 316 317 318
	s.serverLoopWg.Add(1)
	if err := s.indexLoadSch.Start(); err != nil {
		return err
	}
	s.serverLoopWg.Add(1)
	if err := s.indexBuildSch.Start(); err != nil {
		return err
	}
	s.serverLoopWg.Add(1)
	if err := s.flushSch.Start(); err != nil {
		return err
	}
Z
zhenshan.cao 已提交
319

B
bigsheeper 已提交
320 321 322
	s.serverLoopWg.Add(1)
	go s.grpcLoop(grpcPort)

Z
zhenshan.cao 已提交
323 324 325
	if err := <-s.grpcErr; err != nil {
		return err
	}
B
bigsheeper 已提交
326 327

	s.serverLoopWg.Add(1)
S
sunby 已提交
328
	go s.statisticsLoop()
Z
zhenshan.cao 已提交
329 330 331 332

	s.serverLoopWg.Add(1)
	go s.tsLoop()

Z
zhenshan.cao 已提交
333
	return nil
334 335 336
}

func (s *Master) stopServerLoop() {
337
	s.timesSyncMsgProducer.Close()
N
neza2017 已提交
338
	s.serverLoopWg.Done()
Z
zhenshan.cao 已提交
339 340
	s.scheduler.Close()
	s.serverLoopWg.Done()
341 342 343 344 345 346
	s.flushSch.Close()
	s.serverLoopWg.Done()
	s.indexBuildSch.Close()
	s.serverLoopWg.Done()
	s.indexLoadSch.Close()
	s.serverLoopWg.Done()
N
neza2017 已提交
347

B
bigsheeper 已提交
348
	if s.grpcServer != nil {
Z
zhenshan.cao 已提交
349
		s.grpcServer.GracefulStop()
350
		log.Printf("server is closed, exit grpc server")
Z
zhenshan.cao 已提交
351
	}
352 353 354 355 356 357 358 359 360
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

// StartTimestamp returns the start timestamp of this server
func (s *Master) StartTimestamp() int64 {
	return s.startTimestamp
}

Z
zhenshan.cao 已提交
361 362 363 364 365 366 367 368 369
func (s *Master) checkGrpcReady(ctx context.Context, targetCh chan error) {
	select {
	case <-time.After(100 * time.Millisecond):
		targetCh <- nil
	case <-ctx.Done():
		return
	}
}

B
bigsheeper 已提交
370
func (s *Master) grpcLoop(grpcPort int64) {
371 372 373
	defer s.serverLoopWg.Done()

	defaultGRPCPort := ":"
B
bigsheeper 已提交
374
	defaultGRPCPort += strconv.FormatInt(grpcPort, 10)
375 376
	lis, err := net.Listen("tcp", defaultGRPCPort)
	if err != nil {
N
neza2017 已提交
377
		log.Printf("failed to listen: %v", err)
Z
zhenshan.cao 已提交
378
		s.grpcErr <- err
379 380
		return
	}
Z
zhenshan.cao 已提交
381 382 383
	ctx, cancel := context.WithCancel(s.serverLoopCtx)
	defer cancel()
	go s.checkGrpcReady(ctx, s.grpcErr)
384
	if err := s.grpcServer.Serve(lis); err != nil {
Z
zhenshan.cao 已提交
385
		s.grpcErr <- err
386 387 388
	}
}

Z
zhenshan.cao 已提交
389 390
func (s *Master) tsLoop() {
	defer s.serverLoopWg.Done()
391 392
	s.tsoTicker = time.NewTicker(UpdateTimestampStep)
	defer s.tsoTicker.Stop()
Z
zhenshan.cao 已提交
393 394 395 396
	ctx, cancel := context.WithCancel(s.serverLoopCtx)
	defer cancel()
	for {
		select {
397
		case <-s.tsoTicker.C:
B
bigsheeper 已提交
398
			if err := s.tsoAllocator.UpdateTSO(); err != nil {
Z
zhenshan.cao 已提交
399 400 401
				log.Println("failed to update timestamp", err)
				return
			}
B
bigsheeper 已提交
402
			if err := s.idAllocator.UpdateID(); err != nil {
Z
zhenshan.cao 已提交
403 404 405 406 407 408 409 410 411 412 413
				log.Println("failed to update id", err)
				return
			}
		case <-ctx.Done():
			// Server is closed and it should return nil.
			log.Println("tsLoop is closed")
			return
		}
	}
}

S
sunby 已提交
414
func (s *Master) statisticsLoop() {
415
	defer s.serverLoopWg.Done()
416
	defer s.segmentStatusMsg.Close()
417 418 419 420 421
	ctx, cancel := context.WithCancel(s.serverLoopCtx)
	defer cancel()

	for {
		select {
422
		case msg := <-s.segmentStatusMsg.Chan():
S
sunby 已提交
423
			err := s.statProcessor.ProcessQueryNodeStats(msg)
N
neza2017 已提交
424 425 426
			if err != nil {
				log.Println(err)
			}
427
		case <-ctx.Done():
B
bigsheeper 已提交
428
			log.Print("server is closed, exit segment statistics loop")
429 430 431 432
			return
		}
	}
}