未验证 提交 31de6cfb 编写于 作者: C cai.zhang 提交者: GitHub

Update component state to healthy after start (#22084)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 6313a455
......@@ -259,7 +259,6 @@ func (s *Server) initSession() error {
// Init change server state to Initializing
func (s *Server) Init() error {
var err error
s.stateCode.Store(commonpb.StateCode_Initializing)
s.factory.Init(&Params)
if err = s.initSession(); err != nil {
return err
......@@ -272,7 +271,6 @@ func (s *Server) Init() error {
return err
}
s.startDataCoord()
s.stateCode.Store(commonpb.StateCode_Healthy)
log.Info("DataCoord startup success")
return nil
}
......@@ -285,6 +283,7 @@ func (s *Server) Init() error {
}
func (s *Server) initDataCoord() error {
s.stateCode.Store(commonpb.StateCode_Initializing)
var err error
if err = s.initRootCoordClient(); err != nil {
return err
......@@ -334,7 +333,6 @@ func (s *Server) initDataCoord() error {
func (s *Server) Start() error {
if !s.enableActiveStandBy {
s.startDataCoord()
s.stateCode.Store(commonpb.StateCode_Healthy)
log.Info("DataCoord startup successfully")
}
......@@ -357,6 +355,7 @@ func (s *Server) startDataCoord() {
// data while offline.
log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
s.reCollectSegmentStats(s.ctx)
s.stateCode.Store(commonpb.StateCode_Healthy)
}
func (s *Server) initCluster() error {
......
......@@ -3520,10 +3520,16 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
err = svr.Init()
assert.Nil(t, err)
err = svr.Start()
assert.Nil(t, err)
if Params.IndexCoordCfg.EnableActiveStandby {
assert.Equal(t, commonpb.StateCode_StandBy, svr.stateCode.Load().(commonpb.StateCode))
} else {
assert.Equal(t, commonpb.StateCode_Initializing, svr.stateCode.Load().(commonpb.StateCode))
}
err = svr.Register()
assert.Nil(t, err)
err = svr.Start()
assert.Nil(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode))
// Stop channal watch state watcher in tests
if svr.channelManager != nil && svr.channelManager.stopChecker != nil {
......
......@@ -230,9 +230,6 @@ func (s *Server) init() error {
if err := s.SetIndexCoord(s.indexCoord); err != nil {
panic(err)
}
s.queryCoord.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing))
if err := s.queryCoord.Init(); err != nil {
return err
}
......
......@@ -184,9 +184,6 @@ func (s *Server) init() error {
}
log.Info("grpc init done ...")
s.rootCoord.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("RootCoord", zap.Any("State", commonpb.StateCode_Initializing))
if s.newDataCoordClient != nil {
log.Info("RootCoord start to create DataCoord client")
dataCoord := s.newDataCoordClient(rootcoord.Params.EtcdCfg.MetaRootPath, s.etcdCli)
......
......@@ -171,9 +171,6 @@ func (i *IndexCoord) initSession() error {
// Init initializes the IndexCoord component.
func (i *IndexCoord) Init() error {
i.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
var initErr error
Params.InitOnce()
i.factory.Init(&Params)
......@@ -191,7 +188,6 @@ func (i *IndexCoord) Init() error {
return err
}
i.startIndexCoord()
i.stateCode.Store(commonpb.StateCode_Healthy)
log.Info("IndexCoord startup success")
return nil
}
......@@ -207,6 +203,8 @@ func (i *IndexCoord) Init() error {
}
func (i *IndexCoord) initIndexCoord() error {
i.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
var err error
connectEtcdFn := func() error {
i.etcdKV = etcdkv.NewEtcdKV(i.etcdCli, Params.EtcdCfg.MetaRootPath)
......@@ -290,7 +288,6 @@ func (i *IndexCoord) initIndexCoord() error {
func (i *IndexCoord) Start() error {
if !i.enableActiveStandBy {
i.startIndexCoord()
i.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("IndexCoord start successfully", zap.Any("state", i.stateCode.Load()))
}
......@@ -314,12 +311,12 @@ func (i *IndexCoord) startIndexCoord() {
i.handoff.Start()
i.flushedSegmentWatcher.Start()
i.UpdateStateCode(commonpb.StateCode_Healthy)
})
// Start callbacks
for _, cb := range i.startCallbacks {
cb()
}
i.UpdateStateCode(commonpb.StateCode_Healthy)
}
// Stop stops the IndexCoord component.
......
......@@ -185,14 +185,18 @@ func testIndexCoord(t *testing.T) {
err = ic.Init()
assert.NoError(t, err)
if Params.IndexCoordCfg.EnableActiveStandby {
assert.Equal(t, commonpb.StateCode_StandBy, ic.stateCode.Load().(commonpb.StateCode))
} else {
assert.Equal(t, commonpb.StateCode_Initializing, ic.stateCode.Load().(commonpb.StateCode))
}
err = ic.Register()
assert.NoError(t, err)
err = ic.Start()
assert.NoError(t, err)
ic.UpdateStateCode(commonpb.StateCode_Healthy)
assert.Equal(t, commonpb.StateCode_Healthy, ic.stateCode.Load().(commonpb.StateCode))
mockKv := NewMockEtcdKVWithReal(ic.etcdKV)
ic.metaTable.catalog = &indexcoord.Catalog{
......
......@@ -186,7 +186,6 @@ func (s *Server) Init() error {
log.Error("QueryCoord init failed", zap.Error(err))
return err
}
s.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("QueryCoord startup success")
return nil
}
......@@ -199,6 +198,8 @@ func (s *Server) Init() error {
}
func (s *Server) initQueryCoord() error {
s.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing))
// Init KV
etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath)
s.kv = etcdKV
......@@ -359,7 +360,6 @@ func (s *Server) Start() error {
if err := s.startQueryCoord(); err != nil {
return err
}
s.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("QueryCoord started")
}
return nil
......@@ -389,6 +389,7 @@ func (s *Server) startQueryCoord() error {
}
s.startServerLoop()
s.afterStart()
s.UpdateStateCode(commonpb.StateCode_Healthy)
return nil
}
......
......@@ -210,11 +210,13 @@ func (suite *ServerSuite) TestDisableActiveStandby() {
suite.server, err = newQueryCoord()
suite.NoError(err)
suite.Equal(commonpb.StateCode_Initializing, suite.server.status.Load().(commonpb.StateCode))
suite.hackServer()
err = suite.server.Start()
suite.NoError(err)
err = suite.server.Register()
suite.NoError(err)
suite.Equal(commonpb.StateCode_Healthy, suite.server.status.Load().(commonpb.StateCode))
states, err := suite.server.GetComponentStates(context.Background())
suite.NoError(err)
......
......@@ -318,7 +318,6 @@ func (c *Core) Register() error {
}
})
c.UpdateStateCode(commonpb.StateCode_Healthy)
return nil
}
......@@ -429,6 +428,7 @@ func (c *Core) initImportManager() error {
}
func (c *Core) initInternal() error {
c.UpdateStateCode(commonpb.StateCode_Initializing)
c.initKVCreator()
if err := c.initMetaTable(); err != nil {
......@@ -512,8 +512,6 @@ func (c *Core) Init() error {
if err != nil {
return err
}
c.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("RootCoord startup success")
return nil
}
......
......@@ -1336,7 +1336,7 @@ func TestCore_startTimeTickLoop(t *testing.T) {
// make sure the main functions work well when EnableActiveStandby=true
func TestRootcoord_EnableActiveStandby(t *testing.T) {
Params.Init()
Params.InitOnce()
Params.RootCoordCfg.EnableActiveStandby = true
randVal := rand.Int()
Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
......@@ -1363,12 +1363,65 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
core.etcdCli = etcdCli
assert.NoError(t, err)
err = core.Init()
assert.Equal(t, commonpb.StateCode_StandBy, core.stateCode.Load().(commonpb.StateCode))
assert.NoError(t, err)
err = core.Start()
assert.NoError(t, err)
core.session.TriggerKill = false
err = core.Register()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode))
resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyCfg.GetNodeID(),
},
CollectionName: "unexist"})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
err = core.Stop()
assert.NoError(t, err)
}
// make sure the main functions work well when EnableActiveStandby=false
func TestRootcoord_DisableActiveStandby(t *testing.T) {
Params.InitOnce()
Params.RootCoordCfg.EnableActiveStandby = false
randVal := rand.Int()
Params.CommonCfg.RootCoordTimeTick = fmt.Sprintf("rootcoord-time-tick-%d", randVal)
Params.CommonCfg.RootCoordStatistics = fmt.Sprintf("rootcoord-statistics-%d", randVal)
Params.EtcdCfg.MetaRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.MetaRootPath)
Params.EtcdCfg.KvRootPath = fmt.Sprintf("/%d/%s", randVal, Params.EtcdCfg.KvRootPath)
Params.CommonCfg.RootCoordSubName = fmt.Sprintf("subname-%d", randVal)
Params.CommonCfg.RootCoordDml = fmt.Sprintf("rootcoord-dml-test-%d", randVal)
Params.CommonCfg.RootCoordDelta = fmt.Sprintf("rootcoord-delta-test-%d", randVal)
ctx := context.Background()
coreFactory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd,
Params.EtcdCfg.EtcdUseSSL,
Params.EtcdCfg.Endpoints,
Params.EtcdCfg.EtcdTLSCert,
Params.EtcdCfg.EtcdTLSKey,
Params.EtcdCfg.EtcdTLSCACert,
Params.EtcdCfg.EtcdTLSMinVersion)
assert.NoError(t, err)
defer etcdCli.Close()
core, err := NewCore(ctx, coreFactory)
core.etcdCli = etcdCli
assert.NoError(t, err)
err = core.Init()
assert.Equal(t, commonpb.StateCode_Initializing, core.stateCode.Load().(commonpb.StateCode))
assert.NoError(t, err)
err = core.Start()
assert.NoError(t, err)
core.session.TriggerKill = false
err = core.Register()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, core.stateCode.Load().(commonpb.StateCode))
resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册