未验证 提交 03ce5c26 编写于 作者: C cai.zhang 提交者: GitHub

Register after start to prevent there are tow coordinator at the same time (#21641)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 bd50e1e0
......@@ -299,5 +299,8 @@ mock-rootcoord:
mock-tnx-kv:
mockery --name=TxnKV --dir=$(PWD)/internal/kv --output=$(PWD)/internal/kv/mocks --filename=TxnKV.go --with-expecter
mock-datacoord:
mockery --name=DataCoord --dir=$(PWD)/internal/types --output=$(PWD)/internal/mocks --filename=mock_datacoord.go --with-expecter
ci-ut: build-cpp-with-coverage generated-proto-go-without-cpp codecov-cpp codecov-go
......@@ -254,7 +254,30 @@ func (s *Server) Init() error {
var err error
s.stateCode.Store(commonpb.StateCode_Initializing)
s.factory.Init(&Params)
if err = s.initSession(); err != nil {
return err
}
if s.enableActiveStandBy {
s.activateFunc = func() {
log.Info("DataCoord switch from standby to active, activating")
if err := s.initDataCoord(); err != nil {
log.Warn("DataCoord init failed", zap.Error(err))
// TODO: panic if error occurred?
}
s.startDataCoord()
s.stateCode.Store(commonpb.StateCode_Healthy)
log.Info("DataCoord startup success")
}
s.stateCode.Store(commonpb.StateCode_StandBy)
log.Info("DataCoord enter standby mode successfully")
return nil
}
return s.initDataCoord()
}
func (s *Server) initDataCoord() error {
var err error
if err = s.initRootCoordClient(); err != nil {
return err
}
......@@ -276,10 +299,6 @@ func (s *Server) Init() error {
s.allocator = newRootCoordAllocator(s.rootCoordClient)
if err = s.initSession(); err != nil {
return err
}
if err = s.initServiceDiscovery(); err != nil {
return err
}
......@@ -292,6 +311,8 @@ func (s *Server) Init() error {
s.initGarbageCollection(storageCli)
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
return nil
}
......@@ -303,38 +324,31 @@ func (s *Server) Init() error {
// datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
func (s *Server) Start() error {
if Params.DataCoordCfg.EnableCompaction {
s.compactionHandler.start()
s.compactionTrigger.start()
}
if s.enableActiveStandBy {
s.activateFunc = func() {
// todo complete the activateFunc
log.Info("datacoord switch from standby to active, activating")
s.startServerLoop()
s.stateCode.Store(commonpb.StateCode_Healthy)
logutil.Logger(s.ctx).Info("startup success")
}
s.stateCode.Store(commonpb.StateCode_StandBy)
logutil.Logger(s.ctx).Info("DataCoord enter standby mode successfully")
} else {
s.startServerLoop()
if !s.enableActiveStandBy {
s.startDataCoord()
s.stateCode.Store(commonpb.StateCode_Healthy)
logutil.Logger(s.ctx).Info("DataCoord startup successfully")
log.Info("DataCoord startup successfully")
}
Params.DataCoordCfg.CreatedTime = time.Now()
Params.DataCoordCfg.UpdatedTime = time.Now()
return nil
}
func (s *Server) startDataCoord() {
if Params.DataCoordCfg.EnableCompaction {
s.compactionHandler.start()
s.compactionTrigger.start()
}
s.startServerLoop()
// DataCoord (re)starts successfully and starts to collection segment stats
// data from all DataNode.
// This will prevent DataCoord from missing out any important segment stats
// data while offline.
log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
s.reCollectSegmentStats(s.ctx)
return nil
}
func (s *Server) initCluster() error {
......@@ -469,7 +483,6 @@ func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
}
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(3)
s.startDataNodeTtLoop(s.serverLoopCtx)
s.startWatchService(s.serverLoopCtx)
......
......@@ -192,14 +192,15 @@ func (s *Server) startGrpcLoop(grpcPort int) {
}
func (s *Server) start() error {
err := s.dataCoord.Start()
err := s.dataCoord.Register()
if err != nil {
log.Error("DataCoord start failed", zap.Error(err))
log.Warn("DataCoord register service failed", zap.Error(err))
return err
}
err = s.dataCoord.Register()
err = s.dataCoord.Start()
if err != nil {
log.Warn("DataCoord register service failed", zap.Error(err))
log.Error("DataCoord start failed", zap.Error(err))
return err
}
return nil
......
......@@ -188,15 +188,16 @@ func (s *Server) init() error {
// start starts IndexCoord's grpc service.
func (s *Server) start() error {
if err := s.indexcoord.Start(); err != nil {
return err
}
log.Info("indexCoord started")
if err := s.indexcoord.Register(); err != nil {
log.Error("IndexCoord", zap.Any("register session error", err))
return err
}
log.Info("IndexCoord registers service successfully")
if err := s.indexcoord.Start(); err != nil {
return err
}
log.Info("indexCoord started")
return nil
}
......
......@@ -284,11 +284,11 @@ func (s *Server) startGrpcLoop(grpcPort int) {
// start starts QueryCoord's grpc service.
func (s *Server) start() error {
err := s.queryCoord.Start()
err := s.queryCoord.Register()
if err != nil {
return err
}
return s.queryCoord.Register()
return s.queryCoord.Start()
}
// Stop stops QueryCoord's grpc service.
......
......@@ -267,15 +267,16 @@ func (s *Server) startGrpcLoop(port int) {
func (s *Server) start() error {
log.Info("RootCoord Core start ...")
if err := s.rootCoord.Start(); err != nil {
log.Error(err.Error())
return err
}
if err := s.rootCoord.Register(); err != nil {
log.Error("RootCoord registers service failed", zap.Error(err))
return err
}
if err := s.rootCoord.Start(); err != nil {
log.Error("RootCoord start service failed", zap.Error(err))
return err
}
return nil
}
......
......@@ -171,110 +171,134 @@ func (i *IndexCoord) initSession() error {
// Init initializes the IndexCoord component.
func (i *IndexCoord) Init() error {
i.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
var initErr error
Params.InitOnce()
i.initOnce.Do(func() {
i.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
i.factory.Init(&Params)
i.factory.Init(&Params)
initErr = i.initSession()
if initErr != nil {
log.Error(initErr.Error())
return initErr
}
err := i.initSession()
if err != nil {
log.Error(err.Error())
initErr = err
return
if i.enableActiveStandBy {
i.activateFunc = func() {
log.Info("IndexCoord switch from standby to active, activating")
if err := i.initIndexCoord(); err != nil {
log.Warn("IndexCoord init failed", zap.Error(err))
// TODO: panic if error occurred?
}
i.startIndexCoord()
i.stateCode.Store(commonpb.StateCode_Healthy)
log.Info("IndexCoord startup success")
}
i.stateCode.Store(commonpb.StateCode_StandBy)
log.Info("IndexCoord enter standby mode successfully")
} else {
i.initOnce.Do(func() {
initErr = i.initIndexCoord()
})
}
connectEtcdFn := func() error {
i.etcdKV = etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath)
i.metaTable, err = NewMetaTable(i.etcdKV)
return err
}
log.Info("IndexCoord try to connect etcd")
err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100))
if err != nil {
log.Error("IndexCoord try to connect etcd failed", zap.Error(err))
initErr = err
return
}
return initErr
}
func (i *IndexCoord) initIndexCoord() error {
var err error
connectEtcdFn := func() error {
i.etcdKV = etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath)
i.metaTable, err = NewMetaTable(i.etcdKV)
return err
}
log.Info("IndexCoord try to connect etcd")
err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100))
if err != nil {
log.Error("IndexCoord try to connect etcd failed", zap.Error(err))
return err
}
log.Info("IndexCoord try to connect etcd success")
i.nodeManager = NewNodeManager(i.loopCtx)
log.Info("IndexCoord try to connect etcd success")
i.nodeManager = NewNodeManager(i.loopCtx)
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
log.Info("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision))
if err != nil {
log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err))
initErr = err
return
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
log.Info("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision))
if err != nil {
log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err))
return err
}
log.Info("IndexCoord get node sessions from etcd", zap.Bool("bind mode", Params.IndexCoordCfg.BindIndexNodeMode),
zap.String("node address", Params.IndexCoordCfg.IndexNodeAddress))
aliveNodeID := make([]UniqueID, 0)
if Params.IndexCoordCfg.BindIndexNodeMode {
if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil {
log.Error("IndexCoord add node fail", zap.Int64("ServerID", Params.IndexCoordCfg.IndexNodeID),
zap.String("address", Params.IndexCoordCfg.IndexNodeAddress), zap.Error(err))
return err
}
log.Info("IndexCoord get node sessions from etcd", zap.Bool("bind mode", Params.IndexCoordCfg.BindIndexNodeMode),
zap.String("node address", Params.IndexCoordCfg.IndexNodeAddress))
aliveNodeID := make([]UniqueID, 0)
if Params.IndexCoordCfg.BindIndexNodeMode {
if err = i.nodeManager.AddNode(Params.IndexCoordCfg.IndexNodeID, Params.IndexCoordCfg.IndexNodeAddress); err != nil {
log.Error("IndexCoord add node fail", zap.Int64("ServerID", Params.IndexCoordCfg.IndexNodeID),
zap.String("address", Params.IndexCoordCfg.IndexNodeAddress), zap.Error(err))
initErr = err
return
}
log.Info("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress),
zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID))
aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID)
} else {
for _, session := range sessions {
session := session
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID),
zap.Error(err))
continue
}
aliveNodeID = append(aliveNodeID, session.ServerID)
log.Info("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress),
zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID))
aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID)
} else {
for _, session := range sessions {
session := session
if err := i.nodeManager.AddNode(session.ServerID, session.Address); err != nil {
log.Error("IndexCoord", zap.Int64("ServerID", session.ServerID),
zap.Error(err))
continue
}
aliveNodeID = append(aliveNodeID, session.ServerID)
}
log.Info("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients())))
i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID)
// TODO silverxia add Rewatch logic
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil)
}
log.Info("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients())))
i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID)
chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx)
if err != nil {
log.Error("IndexCoord new minio chunkManager failed", zap.Error(err))
initErr = err
return
}
log.Info("IndexCoord new minio chunkManager success")
i.chunkManager = chunkManager
// TODO silverxia add Rewatch logic
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, revision+1, nil)
i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i)
i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i)
i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i)
if err != nil {
initErr = err
return
}
chunkManager, err := i.factory.NewPersistentStorageChunkManager(i.loopCtx)
if err != nil {
log.Error("IndexCoord new minio chunkManager failed", zap.Error(err))
return err
}
log.Info("IndexCoord new minio chunkManager success")
i.chunkManager = chunkManager
i.sched, err = NewTaskScheduler(i.loopCtx, i.rootCoordClient, i.chunkManager, i.metaTable)
if err != nil {
log.Error("IndexCoord new task scheduler failed", zap.Error(err))
initErr = err
return
}
log.Info("IndexCoord new task scheduler success")
i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i)
i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i)
i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i)
if err != nil {
return err
}
i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
})
i.sched, err = NewTaskScheduler(i.loopCtx, i.rootCoordClient, i.chunkManager, i.metaTable)
if err != nil {
log.Error("IndexCoord new task scheduler failed", zap.Error(err))
return err
}
log.Info("IndexCoord new task scheduler success")
log.Info("IndexCoord init finished", zap.Error(initErr))
i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
log.Info("IndexCoord init finished")
return initErr
return nil
}
// Start starts the IndexCoord component.
func (i *IndexCoord) Start() error {
var startErr error
if !i.enableActiveStandBy {
i.startIndexCoord()
i.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
}
Params.IndexCoordCfg.CreatedTime = time.Now()
Params.IndexCoordCfg.UpdatedTime = time.Now()
return nil
}
func (i *IndexCoord) startIndexCoord() {
i.startOnce.Do(func() {
i.loopWg.Add(1)
go i.watchNodeLoop()
......@@ -282,7 +306,7 @@ func (i *IndexCoord) Start() error {
i.loopWg.Add(1)
go i.watchFlushedSegmentLoop()
startErr = i.sched.Start()
i.sched.Start()
i.indexBuilder.Start()
i.garbageCollector.Start()
......@@ -295,24 +319,6 @@ func (i *IndexCoord) Start() error {
for _, cb := range i.startCallbacks {
cb()
}
Params.IndexCoordCfg.CreatedTime = time.Now()
Params.IndexCoordCfg.UpdatedTime = time.Now()
if i.enableActiveStandBy {
i.activateFunc = func() {
log.Info("IndexCoord switch from standby to active, reload the KV")
i.metaTable.reloadFromKV()
i.UpdateStateCode(commonpb.StateCode_Healthy)
}
i.UpdateStateCode(commonpb.StateCode_StandBy)
log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
} else {
i.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
}
return startErr
}
// Stop stops the IndexCoord component.
......
......@@ -186,12 +186,6 @@ func testIndexCoord(t *testing.T) {
err = ic.Init()
assert.NoError(t, err)
mockKv := NewMockEtcdKVWithReal(ic.etcdKV)
ic.metaTable.catalog = &indexcoord.Catalog{
Txn: mockKv,
}
assert.NoError(t, err)
err = ic.Register()
assert.NoError(t, err)
......@@ -200,6 +194,12 @@ func testIndexCoord(t *testing.T) {
ic.UpdateStateCode(commonpb.StateCode_Healthy)
mockKv := NewMockEtcdKVWithReal(ic.etcdKV)
ic.metaTable.catalog = &indexcoord.Catalog{
Txn: mockKv,
}
assert.NoError(t, err)
ic.nodeManager.setClient(1, inm0)
// Test IndexCoord function
......
......@@ -286,12 +286,9 @@ func (sched *TaskScheduler) indexAddLoop() {
}
// Start stats the task scheduler of indexing tasks.
func (sched *TaskScheduler) Start() error {
func (sched *TaskScheduler) Start() {
sched.wg.Add(1)
go sched.indexAddLoop()
return nil
}
// Close closes the task scheduler of indexing tasks.
......
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package mocks
......@@ -124,11 +124,11 @@ func (_c *DataCoord_AssignSegmentID_Call) Return(_a0 *datapb.AssignSegmentIDResp
}
// BroadcastAlteredCollection provides a mock function with given fields: ctx, req
func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
var r0 *commonpb.Status
if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.AlterCollectionRequest) *commonpb.Status); ok {
if rf, ok := ret.Get(0).(func(context.Context, *datapb.AlterCollectionRequest) *commonpb.Status); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
......@@ -137,7 +137,7 @@ func (_m *DataCoord) BroadcastAlteredCollection(ctx context.Context, req *milvus
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.AlterCollectionRequest) error); ok {
if rf, ok := ret.Get(1).(func(context.Context, *datapb.AlterCollectionRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
......@@ -153,14 +153,14 @@ type DataCoord_BroadcastAlteredCollection_Call struct {
// BroadcastAlteredCollection is a helper method to define mock.On call
// - ctx context.Context
// - req *milvuspb.AlterCollectionRequest
// - req *datapb.AlterCollectionRequest
func (_e *DataCoord_Expecter) BroadcastAlteredCollection(ctx interface{}, req interface{}) *DataCoord_BroadcastAlteredCollection_Call {
return &DataCoord_BroadcastAlteredCollection_Call{Call: _e.mock.On("BroadcastAlteredCollection", ctx, req)}
}
func (_c *DataCoord_BroadcastAlteredCollection_Call) Run(run func(ctx context.Context, req *milvuspb.AlterCollectionRequest)) *DataCoord_BroadcastAlteredCollection_Call {
func (_c *DataCoord_BroadcastAlteredCollection_Call) Run(run func(ctx context.Context, req *datapb.AlterCollectionRequest)) *DataCoord_BroadcastAlteredCollection_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*milvuspb.AlterCollectionRequest))
run(args[0].(context.Context), args[1].(*datapb.AlterCollectionRequest))
})
return _c
}
......@@ -1626,6 +1626,53 @@ func (_c *DataCoord_UnsetIsImportingState_Call) Return(_a0 *commonpb.Status, _a1
return _c
}
// UpdateChannelCheckpoint provides a mock function with given fields: ctx, req
func (_m *DataCoord) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
var r0 *commonpb.Status
if rf, ok := ret.Get(0).(func(context.Context, *datapb.UpdateChannelCheckpointRequest) *commonpb.Status); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*commonpb.Status)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *datapb.UpdateChannelCheckpointRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// DataCoord_UpdateChannelCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateChannelCheckpoint'
type DataCoord_UpdateChannelCheckpoint_Call struct {
*mock.Call
}
// UpdateChannelCheckpoint is a helper method to define mock.On call
// - ctx context.Context
// - req *datapb.UpdateChannelCheckpointRequest
func (_e *DataCoord_Expecter) UpdateChannelCheckpoint(ctx interface{}, req interface{}) *DataCoord_UpdateChannelCheckpoint_Call {
return &DataCoord_UpdateChannelCheckpoint_Call{Call: _e.mock.On("UpdateChannelCheckpoint", ctx, req)}
}
func (_c *DataCoord_UpdateChannelCheckpoint_Call) Run(run func(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest)) *DataCoord_UpdateChannelCheckpoint_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*datapb.UpdateChannelCheckpointRequest))
})
return _c
}
func (_c *DataCoord_UpdateChannelCheckpoint_Call) Return(_a0 *commonpb.Status, _a1 error) *DataCoord_UpdateChannelCheckpoint_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// UpdateSegmentStatistics provides a mock function with given fields: ctx, req
func (_m *DataCoord) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) {
ret := _m.Called(ctx, req)
......
......@@ -142,11 +142,7 @@ func (s *Server) Register() error {
return nil
}
func (s *Server) Init() error {
log.Info("QueryCoord start init",
zap.String("meta-root-path", Params.EtcdCfg.MetaRootPath),
zap.String("address", Params.QueryCoordCfg.Address))
func (s *Server) initSession() error {
// Init QueryCoord session
s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli)
if s.session == nil {
......@@ -157,8 +153,40 @@ func (s *Server) Init() error {
s.session.SetEnableActiveStandBy(s.enableActiveStandBy)
Params.QueryCoordCfg.SetNodeID(s.session.ServerID)
Params.SetLogger(s.session.ServerID)
return nil
}
func (s *Server) Init() error {
log.Info("QueryCoord start init",
zap.String("meta-root-path", Params.EtcdCfg.MetaRootPath),
zap.String("address", Params.QueryCoordCfg.Address))
s.factory.Init(Params)
if err := s.initSession(); err != nil {
return err
}
if s.enableActiveStandBy {
s.activateFunc = func() {
log.Info("QueryCoord switch from standby to active, activating")
if err := s.initQueryCoord(); err != nil {
log.Warn("QueryCoord init failed", zap.Error(err))
}
if err := s.startQueryCoord(); err != nil {
log.Warn("QueryCoord init failed", zap.Error(err))
}
s.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("QueryCoord startup success")
}
s.UpdateStateCode(commonpb.StateCode_StandBy)
log.Info("QueryCoord enter standby mode successfully")
return nil
}
return s.initQueryCoord()
}
func (s *Server) initQueryCoord() error {
// Init KV
etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath)
s.kv = etcdKV
......@@ -302,6 +330,17 @@ func (s *Server) afterStart() {
}
func (s *Server) Start() error {
if !s.enableActiveStandBy {
if err := s.startQueryCoord(); err != nil {
return err
}
s.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("QueryCoord started")
}
return nil
}
func (s *Server) startQueryCoord() error {
log.Info("start watcher...")
sessions, revision, err := s.session.GetSessions(typeutil.QueryNodeRole)
if err != nil {
......@@ -323,22 +362,8 @@ func (s *Server) Start() error {
if err != nil {
return err
}
if s.enableActiveStandBy {
s.activateFunc = func() {
log.Info("querycoord switch from standby to active, activating")
s.startServerLoop()
s.UpdateStateCode(commonpb.StateCode_Healthy)
}
s.UpdateStateCode(commonpb.StateCode_StandBy)
} else {
s.startServerLoop()
s.UpdateStateCode(commonpb.StateCode_Healthy)
}
log.Info("QueryCoord started")
s.startServerLoop()
s.afterStart()
return nil
}
......
......@@ -22,12 +22,17 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
coordMocks "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
......@@ -221,14 +226,41 @@ func (suite *ServerSuite) TestEnableActiveStandby() {
suite.server, err = newQueryCoord()
suite.NoError(err)
suite.hackServer()
err = suite.server.Start()
mockRootCoord := coordMocks.NewRootCoord(suite.T())
mockDataCoord := coordMocks.NewDataCoord(suite.T())
mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Status: successStatus,
Schema: &schemapb.CollectionSchema{},
}, nil).Maybe()
for _, collection := range suite.collections {
if suite.loadTypes[collection] == querypb.LoadType_LoadCollection {
req := &milvuspb.ShowPartitionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
),
CollectionID: collection,
}
mockRootCoord.EXPECT().ShowPartitions(mock.Anything, req).Return(&milvuspb.ShowPartitionsResponse{
Status: successStatus,
PartitionIDs: suite.partitions[collection],
}, nil).Maybe()
}
suite.expectGetRecoverInfoByMockDataCoord(collection, mockDataCoord)
}
err = suite.server.SetRootCoord(mockRootCoord)
suite.NoError(err)
err = suite.server.SetDataCoord(mockDataCoord)
suite.NoError(err)
//suite.hackServer()
states1, err := suite.server.GetComponentStates(context.Background())
suite.NoError(err)
suite.Equal(commonpb.StateCode_StandBy, states1.GetState().GetStateCode())
err = suite.server.Register()
suite.NoError(err)
err = suite.server.Start()
suite.NoError(err)
states2, err := suite.server.GetComponentStates(context.Background())
suite.NoError(err)
......@@ -328,6 +360,43 @@ func (suite *ServerSuite) expectGetRecoverInfo(collection int64) {
}
}
func (suite *ServerSuite) expectGetRecoverInfoByMockDataCoord(collection int64, dataCoord *coordMocks.DataCoord) {
var (
vChannels []*datapb.VchannelInfo
segmentBinlogs []*datapb.SegmentBinlogs
)
for partition, segments := range suite.segments[collection] {
segments := segments
getRecoveryInfoRequest := &datapb.GetRecoveryInfoRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_GetRecoveryInfo),
),
CollectionID: collection,
PartitionID: partition,
}
vChannels = []*datapb.VchannelInfo{}
for _, channel := range suite.channels[collection] {
vChannels = append(vChannels, &datapb.VchannelInfo{
CollectionID: collection,
ChannelName: channel,
})
}
segmentBinlogs = []*datapb.SegmentBinlogs{}
for _, segment := range segments {
segmentBinlogs = append(segmentBinlogs, &datapb.SegmentBinlogs{
SegmentID: segment,
InsertChannel: suite.channels[collection][segment%2],
})
}
dataCoord.EXPECT().GetRecoveryInfo(mock.Anything, getRecoveryInfoRequest).Maybe().Return(&datapb.GetRecoveryInfoResponse{
Status: successStatus,
Channels: vChannels,
Binlogs: segmentBinlogs,
}, nil)
}
}
func (suite *ServerSuite) updateCollectionStatus(collectionID int64, status querypb.LoadStatus) {
collection := suite.server.meta.GetCollection(collectionID)
if collection != nil {
......
......@@ -425,10 +425,6 @@ func (c *Core) initImportManager() error {
}
func (c *Core) initInternal() error {
if err := c.initSession(); err != nil {
return err
}
c.initKVCreator()
if err := c.initMetaTable(); err != nil {
......@@ -445,8 +441,6 @@ func (c *Core) initInternal() error {
c.scheduler = newScheduler(c.ctx, c.idAllocator, c.tsoAllocator)
c.factory.Init(&Params)
chanMap := c.meta.ListCollectionPhysicalChannels()
c.chanTimeTick = newTimeTickSync(c.ctx, c.session.ServerID, c.factory, chanMap)
c.chanTimeTick.addSession(c.session)
......@@ -489,9 +483,30 @@ func (c *Core) initInternal() error {
// Init initialize routine
func (c *Core) Init() error {
var initError error
c.initOnce.Do(func() {
initError = c.initInternal()
})
c.factory.Init(&Params)
if err := c.initSession(); err != nil {
return err
}
if c.enableActiveStandBy {
c.activateFunc = func() {
log.Info("RootCoord switch from standby to active, activating")
if err := c.initInternal(); err != nil {
log.Warn("RootCoord init failed", zap.Error(err))
}
if err := c.startInternal(); err != nil {
log.Warn("RootCoord start failed", zap.Error(err))
}
c.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("RootCoord startup success")
}
c.UpdateStateCode(commonpb.StateCode_StandBy)
log.Info("RootCoord enter standby mode successfully")
} else {
c.initOnce.Do(func() {
initError = c.initInternal()
})
}
return initError
}
......@@ -657,9 +672,12 @@ func (c *Core) startServerLoop() {
// Start starts RootCoord.
func (c *Core) Start() error {
var err error
c.startOnce.Do(func() {
err = c.startInternal()
})
if !c.enableActiveStandBy {
c.startOnce.Do(func() {
err = c.startInternal()
})
}
return err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册