From 03ce5c265648e7e8d4a9755fd752fefd94600908 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 12 Jan 2023 12:01:39 +0800 Subject: [PATCH] Register after start to prevent there are tow coordinator at the same time (#21641) Signed-off-by: cai.zhang --- Makefile | 3 + internal/datacoord/server.go | 63 +++--- internal/distributed/datacoord/service.go | 9 +- internal/distributed/indexcoord/service.go | 9 +- internal/distributed/querycoord/service.go | 4 +- internal/distributed/rootcoord/service.go | 9 +- internal/indexcoord/index_coord.go | 212 +++++++++++---------- internal/indexcoord/index_coord_test.go | 12 +- internal/indexcoord/task_scheduler.go | 5 +- internal/mocks/mock_datacoord.go | 61 +++++- internal/querycoordv2/server.go | 65 +++++-- internal/querycoordv2/server_test.go | 73 ++++++- internal/rootcoord/root_coord.go | 42 ++-- 13 files changed, 374 insertions(+), 193 deletions(-) diff --git a/Makefile b/Makefile index e4d5c3fc7..f84670801 100644 --- a/Makefile +++ b/Makefile @@ -299,5 +299,8 @@ mock-rootcoord: mock-tnx-kv: mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=TxnKV.go --with-expecter +mock-datacoord: + mockery --name=DataCoord --dir=$(PWD)/internal/types --output=$(PWD)/internal/mocks --filename=mock_datacoord.go --with-expecter + ci-ut: build-cpp-with-coverage generated-proto-go-without-cpp codecov-cpp codecov-go diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 33bcac808..223e8e24d 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -254,7 +254,30 @@ func (s *Server) Init() error { var err error s.stateCode.Store(commonpb.StateCode_Initializing) s.factory.Init(&Params) + if err = s.initSession(); err != nil { + return err + } + if s.enableActiveStandBy { + s.activateFunc = func() { + log.Info("DataCoord switch from standby to active, activating") + if err := s.initDataCoord(); err != nil { + log.Warn("DataCoord init failed", zap.Error(err)) + // TODO: panic if error occurred? + } + s.startDataCoord() + s.stateCode.Store(commonpb.StateCode_Healthy) + log.Info("DataCoord startup success") + } + s.stateCode.Store(commonpb.StateCode_StandBy) + log.Info("DataCoord enter standby mode successfully") + return nil + } + return s.initDataCoord() +} + +func (s *Server) initDataCoord() error { + var err error if err = s.initRootCoordClient(); err != nil { return err } @@ -276,10 +299,6 @@ func (s *Server) Init() error { s.allocator = newRootCoordAllocator(s.rootCoordClient) - if err = s.initSession(); err != nil { - return err - } - if err = s.initServiceDiscovery(); err != nil { return err } @@ -292,6 +311,8 @@ func (s *Server) Init() error { s.initGarbageCollection(storageCli) + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + return nil } @@ -303,38 +324,31 @@ func (s *Server) Init() error { // datanodes etcd watch, etcd alive check and flush completed status check // 4. set server state to Healthy func (s *Server) Start() error { - if Params.DataCoordCfg.EnableCompaction { - s.compactionHandler.start() - s.compactionTrigger.start() - } - - if s.enableActiveStandBy { - s.activateFunc = func() { - // todo complete the activateFunc - log.Info("datacoord switch from standby to active, activating") - s.startServerLoop() - s.stateCode.Store(commonpb.StateCode_Healthy) - logutil.Logger(s.ctx).Info("startup success") - } - s.stateCode.Store(commonpb.StateCode_StandBy) - logutil.Logger(s.ctx).Info("DataCoord enter standby mode successfully") - } else { - s.startServerLoop() + if !s.enableActiveStandBy { + s.startDataCoord() s.stateCode.Store(commonpb.StateCode_Healthy) - logutil.Logger(s.ctx).Info("DataCoord startup successfully") + log.Info("DataCoord startup successfully") } Params.DataCoordCfg.CreatedTime = time.Now() Params.DataCoordCfg.UpdatedTime = time.Now() + return nil +} + +func (s *Server) startDataCoord() { + if Params.DataCoordCfg.EnableCompaction { + s.compactionHandler.start() + s.compactionTrigger.start() + } + s.startServerLoop() + // DataCoord (re)starts successfully and starts to collection segment stats // data from all DataNode. // This will prevent DataCoord from missing out any important segment stats // data while offline. log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes") s.reCollectSegmentStats(s.ctx) - - return nil } func (s *Server) initCluster() error { @@ -469,7 +483,6 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error { } func (s *Server) startServerLoop() { - s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) s.serverLoopWg.Add(3) s.startDataNodeTtLoop(s.serverLoopCtx) s.startWatchService(s.serverLoopCtx) diff --git a/internal/distributed/datacoord/service.go b/internal/distributed/datacoord/service.go index 88b184245..27f37d948 100644 --- a/internal/distributed/datacoord/service.go +++ b/internal/distributed/datacoord/service.go @@ -192,14 +192,15 @@ func (s *Server) startGrpcLoop(grpcPort int) { } func (s *Server) start() error { - err := s.dataCoord.Start() + err := s.dataCoord.Register() if err != nil { - log.Error("DataCoord start failed", zap.Error(err)) + log.Warn("DataCoord register service failed", zap.Error(err)) return err } - err = s.dataCoord.Register() + + err = s.dataCoord.Start() if err != nil { - log.Warn("DataCoord register service failed", zap.Error(err)) + log.Error("DataCoord start failed", zap.Error(err)) return err } return nil diff --git a/internal/distributed/indexcoord/service.go b/internal/distributed/indexcoord/service.go index 9f2402e2a..07f1e52f9 100644 --- a/internal/distributed/indexcoord/service.go +++ b/internal/distributed/indexcoord/service.go @@ -188,15 +188,16 @@ func (s *Server) init() error { // start starts IndexCoord's grpc service. func (s *Server) start() error { - if err := s.indexcoord.Start(); err != nil { - return err - } - log.Info("indexCoord started") if err := s.indexcoord.Register(); err != nil { log.Error("IndexCoord", zap.Any("register session error", err)) return err } log.Info("IndexCoord registers service successfully") + + if err := s.indexcoord.Start(); err != nil { + return err + } + log.Info("indexCoord started") return nil } diff --git a/internal/distributed/querycoord/service.go b/internal/distributed/querycoord/service.go index 6d806c444..f930c08ea 100644 --- a/internal/distributed/querycoord/service.go +++ b/internal/distributed/querycoord/service.go @@ -284,11 +284,11 @@ func (s *Server) startGrpcLoop(grpcPort int) { // start starts QueryCoord's grpc service. func (s *Server) start() error { - err := s.queryCoord.Start() + err := s.queryCoord.Register() if err != nil { return err } - return s.queryCoord.Register() + return s.queryCoord.Start() } // Stop stops QueryCoord's grpc service. diff --git a/internal/distributed/rootcoord/service.go b/internal/distributed/rootcoord/service.go index cfde17a9f..170643e39 100644 --- a/internal/distributed/rootcoord/service.go +++ b/internal/distributed/rootcoord/service.go @@ -267,15 +267,16 @@ func (s *Server) startGrpcLoop(port int) { func (s *Server) start() error { log.Info("RootCoord Core start ...") - if err := s.rootCoord.Start(); err != nil { - log.Error(err.Error()) - return err - } if err := s.rootCoord.Register(); err != nil { log.Error("RootCoord registers service failed", zap.Error(err)) return err } + if err := s.rootCoord.Start(); err != nil { + log.Error("RootCoord start service failed", zap.Error(err)) + return err + } + return nil } diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index 2e87ddc23..e11a1efe6 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -171,110 +171,134 @@ func (i *IndexCoord) initSession() error { // Init initializes the IndexCoord component. func (i *IndexCoord) Init() error { + i.UpdateStateCode(commonpb.StateCode_Initializing) + log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode))) + var initErr error Params.InitOnce() - i.initOnce.Do(func() { - i.UpdateStateCode(commonpb.StateCode_Initializing) - log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode))) - - i.factory.Init(&Params) + i.factory.Init(&Params) + initErr = i.initSession() + if initErr != nil { + log.Error(initErr.Error()) + return initErr + } - err := i.initSession() - if err != nil { - log.Error(err.Error()) - initErr = err - return + if i.enableActiveStandBy { + i.activateFunc = func() { + log.Info("IndexCoord switch from standby to active, activating") + if err := i.initIndexCoord(); err != nil { + log.Warn("IndexCoord init failed", zap.Error(err)) + // TODO: panic if error occurred? + } + i.startIndexCoord() + i.stateCode.Store(commonpb.StateCode_Healthy) + log.Info("IndexCoord startup success") } + i.stateCode.Store(commonpb.StateCode_StandBy) + log.Info("IndexCoord enter standby mode successfully") + } else { + i.initOnce.Do(func() { + initErr = i.initIndexCoord() + }) + } - connectEtcdFn := func() error { - i.etcdKV = etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath) - i.metaTable, err = NewMetaTable(i.etcdKV) - return err - } - log.Info("IndexCoord try to connect etcd") - err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100)) - if err != nil { - log.Error("IndexCoord try to connect etcd failed", zap.Error(err)) - initErr = err - return - } + return initErr +} + +func (i *IndexCoord) initIndexCoord() error { + var err error + connectEtcdFn := func() error { + i.etcdKV = etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath) + i.metaTable, err = NewMetaTable(i.etcdKV) + return err + } + log.Info("IndexCoord try to connect etcd") + err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100)) + if err != nil { + log.Error("IndexCoord try to connect etcd failed", zap.Error(err)) + return err + } - log.Info("IndexCoord try to connect etcd success") - i.nodeManager = NewNodeManager(i.loopCtx) + log.Info("IndexCoord try to connect etcd success") + i.nodeManager = NewNodeManager(i.loopCtx) - sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole) - log.Info("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision)) - if err != nil { - log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err)) - initErr = err - return + sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole) + log.Info("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision)) + if err != nil { + log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err)) + return err + } + log.Info("IndexCoord get node sessions from etcd", zap.Bool("bind mode", Params.IndexCoordCfg.BindIndexNodeMode), + zap.String("node address", Params.IndexCoordCfg.IndexNodeAddress)) + aliveNodeID := make([]UniqueID, 0) + if Params.IndexCoordCfg.BindIndexNodeMode { + if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil { + log.Error("IndexCoord add node fail", zap.Int64("ServerID", Params.IndexCoordCfg.IndexNodeID), + zap.String("address", Params.IndexCoordCfg.IndexNodeAddress), zap.Error(err)) + return err } - log.Info("IndexCoord get node sessions from etcd", zap.Bool("bind mode", Params.IndexCoordCfg.BindIndexNodeMode), - zap.String("node address", Params.IndexCoordCfg.IndexNodeAddress)) - aliveNodeID := make([]UniqueID, 0) - if Params.IndexCoordCfg.BindIndexNodeMode { - if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil { - log.Error("IndexCoord add node fail", zap.Int64("ServerID", Params.IndexCoordCfg.IndexNodeID), - zap.String("address", Params.IndexCoordCfg.IndexNodeAddress), zap.Error(err)) - initErr = err - return - } - log.Info("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress), - zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID)) - aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID) - } else { - for _, session := range sessions { - session := session - if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil { - log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID), - zap.Error(err)) - continue - } - aliveNodeID = append(aliveNodeID, session.ServerID) + log.Info("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress), + zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID)) + aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID) + } else { + for _, session := range sessions { + session := session + if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil { + log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID), + zap.Error(err)) + continue } + aliveNodeID = append(aliveNodeID, session.ServerID) } - log.Info("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients()))) - i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID) - - // TODO silverxia add Rewatch logic - i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil) + } + log.Info("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients()))) + i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID) - chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx) - if err != nil { - log.Error("IndexCoord new minio chunkManager failed", zap.Error(err)) - initErr = err - return - } - log.Info("IndexCoord new minio chunkManager success") - i.chunkManager = chunkManager + // TODO silverxia add Rewatch logic + i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil) - i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i) - i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i) - i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i) - if err != nil { - initErr = err - return - } + chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx) + if err != nil { + log.Error("IndexCoord new minio chunkManager failed", zap.Error(err)) + return err + } + log.Info("IndexCoord new minio chunkManager success") + i.chunkManager = chunkManager - i.sched, err = NewTaskScheduler(i.loopCtx, i.rootCoordClient, i.chunkManager, i.metaTable) - if err != nil { - log.Error("IndexCoord new task scheduler failed", zap.Error(err)) - initErr = err - return - } - log.Info("IndexCoord new task scheduler success") + i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i) + i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i) + i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i) + if err != nil { + return err + } - i.metricsCacheManager = metricsinfo.NewMetricsCacheManager() - }) + i.sched, err = NewTaskScheduler(i.loopCtx, i.rootCoordClient, i.chunkManager, i.metaTable) + if err != nil { + log.Error("IndexCoord new task scheduler failed", zap.Error(err)) + return err + } + log.Info("IndexCoord new task scheduler success") - log.Info("IndexCoord init finished", zap.Error(initErr)) + i.metricsCacheManager = metricsinfo.NewMetricsCacheManager() + log.Info("IndexCoord init finished") - return initErr + return nil } // Start starts the IndexCoord component. func (i *IndexCoord) Start() error { - var startErr error + if !i.enableActiveStandBy { + i.startIndexCoord() + i.UpdateStateCode(commonpb.StateCode_Healthy) + log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load())) + } + + Params.IndexCoordCfg.CreatedTime = time.Now() + Params.IndexCoordCfg.UpdatedTime = time.Now() + return nil +} + +func (i *IndexCoord) startIndexCoord() { i.startOnce.Do(func() { i.loopWg.Add(1) go i.watchNodeLoop() @@ -282,7 +306,7 @@ func (i *IndexCoord) Start() error { i.loopWg.Add(1) go i.watchFlushedSegmentLoop() - startErr = i.sched.Start() + i.sched.Start() i.indexBuilder.Start() i.garbageCollector.Start() @@ -295,24 +319,6 @@ func (i *IndexCoord) Start() error { for _, cb := range i.startCallbacks { cb() } - - Params.IndexCoordCfg.CreatedTime = time.Now() - Params.IndexCoordCfg.UpdatedTime = time.Now() - - if i.enableActiveStandBy { - i.activateFunc = func() { - log.Info("IndexCoord switch from standby to active, reload the KV") - i.metaTable.reloadFromKV() - i.UpdateStateCode(commonpb.StateCode_Healthy) - } - i.UpdateStateCode(commonpb.StateCode_StandBy) - log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load())) - } else { - i.UpdateStateCode(commonpb.StateCode_Healthy) - log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load())) - } - - return startErr } // Stop stops the IndexCoord component. diff --git a/internal/indexcoord/index_coord_test.go b/internal/indexcoord/index_coord_test.go index b00c9798a..b9b0a507b 100644 --- a/internal/indexcoord/index_coord_test.go +++ b/internal/indexcoord/index_coord_test.go @@ -186,12 +186,6 @@ func testIndexCoord(t *testing.T) { err = ic.Init() assert.NoError(t, err) - mockKv := NewMockEtcdKVWithReal(ic.etcdKV) - ic.metaTable.catalog = &indexcoord.Catalog{ - Txn: mockKv, - } - assert.NoError(t, err) - err = ic.Register() assert.NoError(t, err) @@ -200,6 +194,12 @@ func testIndexCoord(t *testing.T) { ic.UpdateStateCode(commonpb.StateCode_Healthy) + mockKv := NewMockEtcdKVWithReal(ic.etcdKV) + ic.metaTable.catalog = &indexcoord.Catalog{ + Txn: mockKv, + } + assert.NoError(t, err) + ic.nodeManager.setClient(1, inm0) // Test IndexCoord function diff --git a/internal/indexcoord/task_scheduler.go b/internal/indexcoord/task_scheduler.go index ed2fbafa6..d850ec169 100644 --- a/internal/indexcoord/task_scheduler.go +++ b/internal/indexcoord/task_scheduler.go @@ -286,12 +286,9 @@ func (sched *TaskScheduler) indexAddLoop() { } // Start stats the task scheduler of indexing tasks. -func (sched *TaskScheduler) Start() error { - +func (sched *TaskScheduler) Start() { sched.wg.Add(1) go sched.indexAddLoop() - - return nil } // Close closes the task scheduler of indexing tasks. diff --git a/internal/mocks/mock_datacoord.go b/internal/mocks/mock_datacoord.go index d8cebc458..01ea4e542 100644 --- a/internal/mocks/mock_datacoord.go +++ b/internal/mocks/mock_datacoord.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.16.0. DO NOT EDIT. package mocks @@ -124,11 +124,11 @@ func (_c *DataCoord_AssignSegmentID_Call) Return(_a0 *datapb.AssignSegmentIDResp } // BroadcastAlteredCollection provides a mock function with given fields: ctx, req -func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) { +func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) { ret := _m.Called(ctx, req) var r0 *commonpb.Status - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) *commonpb.Status); ok { + if rf, ok := ret.Get(0).(func(context.Context, *datapb.AlterCollectionRequest) *commonpb.Status); ok { r0 = rf(ctx, req) } else { if ret.Get(0) != nil { @@ -137,7 +137,7 @@ func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvus } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionRequest) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, *datapb.AlterCollectionRequest) error); ok { r1 = rf(ctx, req) } else { r1 = ret.Error(1) @@ -153,14 +153,14 @@ type DataCoord_BroadcastAlteredCollection_Call struct { // BroadcastAlteredCollection is a helper method to define mock.On call // - ctx context.Context -// - req *milvuspb.AlterCollectionRequest +// - req *datapb.AlterCollectionRequest func (_e *DataCoord_Expecter) BroadcastAlteredCollection(ctx interface{}, req interface{}) *DataCoord_BroadcastAlteredCollection_Call { return &DataCoord_BroadcastAlteredCollection_Call{Call: _e.mock.On("BroadcastAlteredCollection", ctx, req)} } -func (_c *DataCoord_BroadcastAlteredCollection_Call) Run(run func(ctx context.Context, req *milvuspb.AlterCollectionRequest)) *DataCoord_BroadcastAlteredCollection_Call { +func (_c *DataCoord_BroadcastAlteredCollection_Call) Run(run func(ctx context.Context, req *datapb.AlterCollectionRequest)) *DataCoord_BroadcastAlteredCollection_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionRequest)) + run(args[0].(context.Context), args[1].(*datapb.AlterCollectionRequest)) }) return _c } @@ -1626,6 +1626,53 @@ func (_c *DataCoord_UnsetIsImportingState_Call) Return(_a0 *commonpb.Status, _a1 return _c } +// UpdateChannelCheckpoint provides a mock function with given fields: ctx, req +func (_m *DataCoord) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest) (*commonpb.Status, error) { + ret := _m.Called(ctx, req) + + var r0 *commonpb.Status + if rf, ok := ret.Get(0).(func(context.Context, *datapb.UpdateChannelCheckpointRequest) *commonpb.Status); ok { + r0 = rf(ctx, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*commonpb.Status) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *datapb.UpdateChannelCheckpointRequest) error); ok { + r1 = rf(ctx, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DataCoord_UpdateChannelCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateChannelCheckpoint' +type DataCoord_UpdateChannelCheckpoint_Call struct { + *mock.Call +} + +// UpdateChannelCheckpoint is a helper method to define mock.On call +// - ctx context.Context +// - req *datapb.UpdateChannelCheckpointRequest +func (_e *DataCoord_Expecter) UpdateChannelCheckpoint(ctx interface{}, req interface{}) *DataCoord_UpdateChannelCheckpoint_Call { + return &DataCoord_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, req)} +} + +func (_c *DataCoord_UpdateChannelCheckpoint_Call) Run(run func(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest)) *DataCoord_UpdateChannelCheckpoint_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*datapb.UpdateChannelCheckpointRequest)) + }) + return _c +} + +func (_c *DataCoord_UpdateChannelCheckpoint_Call) Return(_a0 *commonpb.Status, _a1 error) *DataCoord_UpdateChannelCheckpoint_Call { + _c.Call.Return(_a0, _a1) + return _c +} + // UpdateSegmentStatistics provides a mock function with given fields: ctx, req func (_m *DataCoord) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) { ret := _m.Called(ctx, req) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 19b0bfc21..f70a59c39 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -142,11 +142,7 @@ func (s *Server) Register() error { return nil } -func (s *Server) Init() error { - log.Info("QueryCoord start init", - zap.String("meta-root-path", Params.EtcdCfg.MetaRootPath), - zap.String("address", Params.QueryCoordCfg.Address)) - +func (s *Server) initSession() error { // Init QueryCoord session s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli) if s.session == nil { @@ -157,8 +153,40 @@ func (s *Server) Init() error { s.session.SetEnableActiveStandBy(s.enableActiveStandBy) Params.QueryCoordCfg.SetNodeID(s.session.ServerID) Params.SetLogger(s.session.ServerID) + return nil +} + +func (s *Server) Init() error { + log.Info("QueryCoord start init", + zap.String("meta-root-path", Params.EtcdCfg.MetaRootPath), + zap.String("address", Params.QueryCoordCfg.Address)) + s.factory.Init(Params) + if err := s.initSession(); err != nil { + return err + } + + if s.enableActiveStandBy { + s.activateFunc = func() { + log.Info("QueryCoord switch from standby to active, activating") + if err := s.initQueryCoord(); err != nil { + log.Warn("QueryCoord init failed", zap.Error(err)) + } + if err := s.startQueryCoord(); err != nil { + log.Warn("QueryCoord init failed", zap.Error(err)) + } + s.UpdateStateCode(commonpb.StateCode_Healthy) + log.Info("QueryCoord startup success") + } + s.UpdateStateCode(commonpb.StateCode_StandBy) + log.Info("QueryCoord enter standby mode successfully") + return nil + } + return s.initQueryCoord() +} + +func (s *Server) initQueryCoord() error { // Init KV etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath) s.kv = etcdKV @@ -302,6 +330,17 @@ func (s *Server) afterStart() { } func (s *Server) Start() error { + if !s.enableActiveStandBy { + if err := s.startQueryCoord(); err != nil { + return err + } + s.UpdateStateCode(commonpb.StateCode_Healthy) + log.Info("QueryCoord started") + } + return nil +} + +func (s *Server) startQueryCoord() error { log.Info("start watcher...") sessions, revision, err := s.session.GetSessions(typeutil.QueryNodeRole) if err != nil { @@ -323,22 +362,8 @@ func (s *Server) Start() error { if err != nil { return err } - - if s.enableActiveStandBy { - s.activateFunc = func() { - log.Info("querycoord switch from standby to active, activating") - s.startServerLoop() - s.UpdateStateCode(commonpb.StateCode_Healthy) - } - s.UpdateStateCode(commonpb.StateCode_StandBy) - } else { - s.startServerLoop() - s.UpdateStateCode(commonpb.StateCode_Healthy) - } - log.Info("QueryCoord started") - + s.startServerLoop() s.afterStart() - return nil } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 12dbcc33c..1c96f1282 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -22,12 +22,17 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/util/commonpbutil" + + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/log" + coordMocks "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/checkers" @@ -221,14 +226,41 @@ func (suite *ServerSuite) TestEnableActiveStandby() { suite.server, err = newQueryCoord() suite.NoError(err) - suite.hackServer() - err = suite.server.Start() + mockRootCoord := coordMocks.NewRootCoord(suite.T()) + mockDataCoord := coordMocks.NewDataCoord(suite.T()) + + mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{ + Status: successStatus, + Schema: &schemapb.CollectionSchema{}, + }, nil).Maybe() + for _, collection := range suite.collections { + if suite.loadTypes[collection] == querypb.LoadType_LoadCollection { + req := &milvuspb.ShowPartitionsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), + ), + CollectionID: collection, + } + mockRootCoord.EXPECT().ShowPartitions(mock.Anything, req).Return(&milvuspb.ShowPartitionsResponse{ + Status: successStatus, + PartitionIDs: suite.partitions[collection], + }, nil).Maybe() + } + suite.expectGetRecoverInfoByMockDataCoord(collection, mockDataCoord) + + } + err = suite.server.SetRootCoord(mockRootCoord) suite.NoError(err) + err = suite.server.SetDataCoord(mockDataCoord) + suite.NoError(err) + //suite.hackServer() states1, err := suite.server.GetComponentStates(context.Background()) suite.NoError(err) suite.Equal(commonpb.StateCode_StandBy, states1.GetState().GetStateCode()) err = suite.server.Register() suite.NoError(err) + err = suite.server.Start() + suite.NoError(err) states2, err := suite.server.GetComponentStates(context.Background()) suite.NoError(err) @@ -328,6 +360,43 @@ func (suite *ServerSuite) expectGetRecoverInfo(collection int64) { } } +func (suite *ServerSuite) expectGetRecoverInfoByMockDataCoord(collection int64, dataCoord *coordMocks.DataCoord) { + var ( + vChannels []*datapb.VchannelInfo + segmentBinlogs []*datapb.SegmentBinlogs + ) + + for partition, segments := range suite.segments[collection] { + segments := segments + getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_GetRecoveryInfo), + ), + CollectionID: collection, + PartitionID: partition, + } + vChannels = []*datapb.VchannelInfo{} + for _, channel := range suite.channels[collection] { + vChannels = append(vChannels, &datapb.VchannelInfo{ + CollectionID: collection, + ChannelName: channel, + }) + } + segmentBinlogs = []*datapb.SegmentBinlogs{} + for _, segment := range segments { + segmentBinlogs = append(segmentBinlogs, &datapb.SegmentBinlogs{ + SegmentID: segment, + InsertChannel: suite.channels[collection][segment%2], + }) + } + dataCoord.EXPECT().GetRecoveryInfo(mock.Anything, getRecoveryInfoRequest).Maybe().Return(&datapb.GetRecoveryInfoResponse{ + Status: successStatus, + Channels: vChannels, + Binlogs: segmentBinlogs, + }, nil) + } +} + func (suite *ServerSuite) updateCollectionStatus(collectionID int64, status querypb.LoadStatus) { collection := suite.server.meta.GetCollection(collectionID) if collection != nil { diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 4eea57009..86a3300c9 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -425,10 +425,6 @@ func (c *Core) initImportManager() error { } func (c *Core) initInternal() error { - if err := c.initSession(); err != nil { - return err - } - c.initKVCreator() if err := c.initMetaTable(); err != nil { @@ -445,8 +441,6 @@ func (c *Core) initInternal() error { c.scheduler = newScheduler(c.ctx, c.idAllocator, c.tsoAllocator) - c.factory.Init(&Params) - chanMap := c.meta.ListCollectionPhysicalChannels() c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.factory, chanMap) c.chanTimeTick.addSession(c.session) @@ -489,9 +483,30 @@ func (c *Core) initInternal() error { // Init initialize routine func (c *Core) Init() error { var initError error - c.initOnce.Do(func() { - initError = c.initInternal() - }) + c.factory.Init(&Params) + if err := c.initSession(); err != nil { + return err + } + if c.enableActiveStandBy { + c.activateFunc = func() { + log.Info("RootCoord switch from standby to active, activating") + if err := c.initInternal(); err != nil { + log.Warn("RootCoord init failed", zap.Error(err)) + } + if err := c.startInternal(); err != nil { + log.Warn("RootCoord start failed", zap.Error(err)) + } + c.UpdateStateCode(commonpb.StateCode_Healthy) + log.Info("RootCoord startup success") + } + c.UpdateStateCode(commonpb.StateCode_StandBy) + log.Info("RootCoord enter standby mode successfully") + } else { + c.initOnce.Do(func() { + initError = c.initInternal() + }) + } + return initError } @@ -657,9 +672,12 @@ func (c *Core) startServerLoop() { // Start starts RootCoord. func (c *Core) Start() error { var err error - c.startOnce.Do(func() { - err = c.startInternal() - }) + if !c.enableActiveStandBy { + c.startOnce.Do(func() { + err = c.startInternal() + }) + } + return err } -- GitLab