From d5ab8ac3459b8d2927fea06b133c8f91ec118ba3 Mon Sep 17 00:00:00 2001 From: congqixia <84113973+congqixia@users.noreply.github.com> Date: Wed, 19 May 2021 14:13:53 +0800 Subject: [PATCH] Restore flush and stats stream pos (#5284) Restore segment flush stream & statistic stream to last success pos Signed-off-by: Congqi Xia --- configs/milvus.yaml | 2 + internal/dataservice/param.go | 22 ++++ internal/dataservice/server.go | 45 ++++++++ internal/dataservice/server_test.go | 172 ++++++++++++++++++++++++++++ internal/dataservice/stream_pos.go | 73 ++++++++++++ 5 files changed, 314 insertions(+) create mode 100644 internal/dataservice/stream_pos.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 27faaabf2..215189765 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -21,6 +21,8 @@ etcd: kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath segFlushMetaSubPath: datanode/segment # Full Path = rootPath/metaSubPath/segFlushMetaSubPath ddlFlushMetaSubPath: datanode/ddl # Full Path = rootPath/metaSubPath/ddlFlushMetaSubPath + flushStreamPosSubPath: dataservice/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath + statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath minio: address: localhost diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index edac3eb59..398ca1124 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -30,6 +30,9 @@ type ParamTable struct { KvRootPath string PulsarAddress string + FlushStreamPosSubPath string + StatsStreamPosSubPath string + // segment SegmentSize float64 SegmentSizeFactor float64 @@ -83,6 +86,9 @@ func (p *ParamTable) Init() { p.initSegmentFlushMetaPath() p.initLogCfg() p.initProxyServiceTimeTickChannelName() + + p.initFlushStreamPosSubPath() + p.initStatsStreamPosSubPath() }) } @@ -257,3 +263,19 @@ func (p *ParamTable) initProxyServiceTimeTickChannelName() { } p.ProxyTimeTickChannelName = ch } + +func (p *ParamTable) initFlushStreamPosSubPath() { + subPath, err := p.Load("etcd.flushStreamPosSubPath") + if err != nil { + panic(err) + } + p.FlushStreamPosSubPath = subPath +} + +func (p *ParamTable) initStatsStreamPosSubPath() { + subPath, err := p.Load("etcd.statsStreamPosSubPath") + if err != nil { + panic(err) + } + p.StatsStreamPosSubPath = subPath +} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 98db7f5d3..50c70c06f 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -332,6 +332,17 @@ func (s *Server) startStatsChannel(ctx context.Context) { statsStream, _ := s.msFactory.NewMsgStream(ctx) statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName) log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName) + // try to restore last processed pos + pos, err := s.loadStreamLastPos(streamTypeStats) + if err == nil { + err = statsStream.Seek(pos) + if err != nil { + log.Error("Failed to seek to last pos for statsStream", + zap.String("StatisChanName", Params.StatisticsChannelName), + zap.String("DataServiceSubscriptionName", Params.DataServiceSubscriptionName), + zap.Error(err)) + } + } statsStream.Start() defer statsStream.Close() for { @@ -356,6 +367,16 @@ func (s *Server) startStatsChannel(ctx context.Context) { continue } } + if ssMsg.MsgPosition != nil { + err := s.storeStreamPos(streamTypeStats, ssMsg.MsgPosition) + if err != nil { + log.Error("Fail to store current success pos for Stats stream", + zap.Stringer("pos", ssMsg.MsgPosition), + zap.Error(err)) + } + } else { + log.Warn("Empty Msg Pos found ", zap.Int64("msgid", msg.ID())) + } } } } @@ -366,6 +387,19 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { flushStream, _ := s.msFactory.NewMsgStream(ctx) flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName) log.Debug("dataservice AsConsumer: " + Params.SegmentInfoChannelName + " : " + Params.DataServiceSubscriptionName) + + // try to restore last processed pos + pos, err := s.loadStreamLastPos(streamTypeFlush) + if err == nil { + err = flushStream.Seek(pos) + if err != nil { + log.Error("Failed to seek to last pos for segment flush Stream", + zap.String("SegInfoChannelName", Params.SegmentInfoChannelName), + zap.String("DataServiceSubscriptionName", Params.DataServiceSubscriptionName), + zap.Error(err)) + } + } + flushStream.Start() defer flushStream.Close() for { @@ -391,6 +425,17 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { log.Error("get segment from meta error", zap.Int64("segmentID", fcMsg.SegmentID), zap.Error(err)) continue } + + if fcMsg.MsgPosition != nil { + err = s.storeStreamPos(streamTypeFlush, fcMsg.MsgPosition) + if err != nil { + log.Error("Fail to store current success pos for segment flush stream", + zap.Stringer("pos", fcMsg.MsgPosition), + zap.Error(err)) + } + } else { + log.Warn("Empty Msg Pos found ", zap.Int64("msgid", msg.ID())) + } } } } diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index 702ce3838..480b5ac9a 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -13,6 +13,7 @@ package dataservice import ( "context" "math" + "math/rand" "testing" "time" @@ -486,6 +487,177 @@ func TestChannel(t *testing.T) { }) } +func TestResumeChannel(t *testing.T) { + Params.Init() + + segmentIDs := make([]int64, 0, 1000) + + t.Run("Prepare Resume test set", func(t *testing.T) { + svr := newTestServer(t) + defer svr.Stop() + + i := int64(-1) + cnt := 0 + for ; cnt < 1000; i-- { + svr.meta.RLock() + _, has := svr.meta.segments[i] + svr.meta.RUnlock() + if has { + continue + } + err := svr.meta.AddSegment(&datapb.SegmentInfo{ + ID: i, + CollectionID: -1, + }) + assert.Nil(t, err) + segmentIDs = append(segmentIDs, i) + cnt++ + } + }) + + t.Run("Test ResumeSegmentStatsChannel", func(t *testing.T) { + svr := newTestServer(t) + + segRows := rand.Int63n(1000) + + statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx) + statsStream.AsProducer([]string{Params.StatisticsChannelName}) + statsStream.Start() + defer statsStream.Close() + + genMsg := func(msgType commonpb.MsgType, t Timestamp, stats *internalpb.SegmentStatisticsUpdates) *msgstream.SegmentStatisticsMsg { + return &msgstream.SegmentStatisticsMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, + SegmentStatistics: internalpb.SegmentStatistics{ + Base: &commonpb.MsgBase{ + MsgType: msgType, + MsgID: 0, + Timestamp: t, + SourceID: 0, + }, + SegStats: []*internalpb.SegmentStatisticsUpdates{stats}, + }, + } + } + ch := make(chan struct{}) + + go func() { + for _, segID := range segmentIDs { + stats := &internalpb.SegmentStatisticsUpdates{ + SegmentID: segID, + NumRows: segRows, + } + + msgPack := msgstream.MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, uint64(time.Now().Unix()), stats)) + + err := statsStream.Produce(&msgPack) + assert.Nil(t, err) + time.Sleep(time.Millisecond * 5) + } + ch <- struct{}{} + }() + + time.Sleep(time.Second) + + svr.Stop() + time.Sleep(time.Millisecond * 50) + + svr = newTestServer(t) + defer svr.Stop() + <-ch + + //wait for Server processing last messages + time.Sleep(time.Second) + + svr.meta.RLock() + defer svr.meta.RUnlock() + for _, segID := range segmentIDs { + seg, has := svr.meta.segments[segID] + assert.True(t, has) + if has { + assert.Equal(t, segRows, seg.NumRows) + } + } + }) + + t.Run("Test ResumeSegmentFlushChannel", func(t *testing.T) { + genMsg := func(msgType commonpb.MsgType, t Timestamp, segID int64) *msgstream.FlushCompletedMsg { + return &msgstream.FlushCompletedMsg{ + BaseMsg: msgstream.BaseMsg{ + HashValues: []uint32{0}, + }, + SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{ + Base: &commonpb.MsgBase{ + MsgType: msgType, + MsgID: 0, + Timestamp: t, + SourceID: 0, + }, + SegmentID: segID, + }, + } + } + svr := newTestServer(t) + + ch := make(chan struct{}) + + segInfoStream, _ := svr.msFactory.NewMsgStream(svr.ctx) + segInfoStream.AsProducer([]string{Params.SegmentInfoChannelName}) + segInfoStream.Start() + defer segInfoStream.Close() + go func() { + for _, segID := range segmentIDs { + + msgPack := msgstream.MsgPack{} + msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, uint64(time.Now().Unix()), segID)) + + err := segInfoStream.Produce(&msgPack) + assert.Nil(t, err) + time.Sleep(time.Millisecond * 5) + } + ch <- struct{}{} + }() + + time.Sleep(time.Millisecond * 50) + //stop current server, simulating server quit + svr.Stop() + + time.Sleep(time.Second) + // start new test server as restarting + svr = newTestServer(t) + defer svr.Stop() + <-ch + + //wait for Server processing last messages + time.Sleep(time.Second) + + //ASSERT PART + svr.meta.RLock() + defer svr.meta.RUnlock() + for _, segID := range segmentIDs { + seg, has := svr.meta.segments[segID] + assert.True(t, has) + if has { + assert.Equal(t, seg.State, commonpb.SegmentState_Flushed) + } + } + }) + + t.Run("Clean up test segments", func(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + var err error + for _, segID := range segmentIDs { + err = svr.meta.DropSegment(segID) + assert.Nil(t, err) + } + }) + +} + func newTestServer(t *testing.T) *Server { Params.Init() var err error diff --git a/internal/dataservice/stream_pos.go b/internal/dataservice/stream_pos.go new file mode 100644 index 000000000..49ccf0ab9 --- /dev/null +++ b/internal/dataservice/stream_pos.go @@ -0,0 +1,73 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package dataservice + +import ( + "errors" + + "github.com/golang/protobuf/proto" + "github.com/milvus-io/milvus/internal/msgstream" +) + +type streamType int + +const ( + _ streamType = iota + streamTypeFlush + streamTypeStats +) + +var ( + errInvalidStreamType = errors.New("invalid stream type") +) + +// storeStreamPos store current processed stream pos +func (s *Server) storeStreamPos(st streamType, pos *msgstream.MsgPosition) error { + key := s.streamTypeSubKey(st) + if key == "" { + return errInvalidStreamType + } + val := proto.MarshalTextString(pos) + err := s.kvClient.Save(key, val) + if err != nil { + return err + } + return nil +} + +// loadStreamLastPos load last successful pos with specified stream type +func (s *Server) loadStreamLastPos(st streamType) (pos *msgstream.MsgPosition, err error) { + key := s.streamTypeSubKey(st) + if key == "" { + return nil, errInvalidStreamType + } + var val string + pos = &msgstream.MsgPosition{} + val, err = s.kvClient.Load(key) + if err != nil { + return pos, err + } + err = proto.UnmarshalText(val, pos) + return pos, err +} + +// streamTypeSubKey converts stream type to corresponding k-v store key +func (s *Server) streamTypeSubKey(st streamType) string { + switch st { + case streamTypeFlush: + return Params.FlushStreamPosSubPath + case streamTypeStats: + return Params.StatsStreamPosSubPath + default: + return "" + } +} -- GitLab