未验证 提交 d5ab8ac3 编写于 作者: C congqixia 提交者: GitHub

Restore flush and stats stream pos (#5284)

Restore segment flush stream & statistic stream to last success pos
Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 4b712284
......@@ -21,6 +21,8 @@ etcd:
kvSubPath: kv # kvRootPath = rootPath + '/' + kvSubPath
segFlushMetaSubPath: datanode/segment # Full Path = rootPath/metaSubPath/segFlushMetaSubPath
ddlFlushMetaSubPath: datanode/ddl # Full Path = rootPath/metaSubPath/ddlFlushMetaSubPath
flushStreamPosSubPath: dataservice/flushstream # Full path = rootPath/metaSubPath/flushStreamPosSubPath
statsStreamPosSubPath: dataservice/statsstream # Full path = rootPath/metaSubPath/statsStreamPosSubPath
minio:
address: localhost
......
......@@ -30,6 +30,9 @@ type ParamTable struct {
KvRootPath string
PulsarAddress string
FlushStreamPosSubPath string
StatsStreamPosSubPath string
// segment
SegmentSize float64
SegmentSizeFactor float64
......@@ -83,6 +86,9 @@ func (p *ParamTable) Init() {
p.initSegmentFlushMetaPath()
p.initLogCfg()
p.initProxyServiceTimeTickChannelName()
p.initFlushStreamPosSubPath()
p.initStatsStreamPosSubPath()
})
}
......@@ -257,3 +263,19 @@ func (p *ParamTable) initProxyServiceTimeTickChannelName() {
}
p.ProxyTimeTickChannelName = ch
}
func (p *ParamTable) initFlushStreamPosSubPath() {
subPath, err := p.Load("etcd.flushStreamPosSubPath")
if err != nil {
panic(err)
}
p.FlushStreamPosSubPath = subPath
}
func (p *ParamTable) initStatsStreamPosSubPath() {
subPath, err := p.Load("etcd.statsStreamPosSubPath")
if err != nil {
panic(err)
}
p.StatsStreamPosSubPath = subPath
}
......@@ -332,6 +332,17 @@ func (s *Server) startStatsChannel(ctx context.Context) {
statsStream, _ := s.msFactory.NewMsgStream(ctx)
statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName)
// try to restore last processed pos
pos, err := s.loadStreamLastPos(streamTypeStats)
if err == nil {
err = statsStream.Seek(pos)
if err != nil {
log.Error("Failed to seek to last pos for statsStream",
zap.String("StatisChanName", Params.StatisticsChannelName),
zap.String("DataServiceSubscriptionName", Params.DataServiceSubscriptionName),
zap.Error(err))
}
}
statsStream.Start()
defer statsStream.Close()
for {
......@@ -356,6 +367,16 @@ func (s *Server) startStatsChannel(ctx context.Context) {
continue
}
}
if ssMsg.MsgPosition != nil {
err := s.storeStreamPos(streamTypeStats, ssMsg.MsgPosition)
if err != nil {
log.Error("Fail to store current success pos for Stats stream",
zap.Stringer("pos", ssMsg.MsgPosition),
zap.Error(err))
}
} else {
log.Warn("Empty Msg Pos found ", zap.Int64("msgid", msg.ID()))
}
}
}
}
......@@ -366,6 +387,19 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
flushStream, _ := s.msFactory.NewMsgStream(ctx)
flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
log.Debug("dataservice AsConsumer: " + Params.SegmentInfoChannelName + " : " + Params.DataServiceSubscriptionName)
// try to restore last processed pos
pos, err := s.loadStreamLastPos(streamTypeFlush)
if err == nil {
err = flushStream.Seek(pos)
if err != nil {
log.Error("Failed to seek to last pos for segment flush Stream",
zap.String("SegInfoChannelName", Params.SegmentInfoChannelName),
zap.String("DataServiceSubscriptionName", Params.DataServiceSubscriptionName),
zap.Error(err))
}
}
flushStream.Start()
defer flushStream.Close()
for {
......@@ -391,6 +425,17 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
log.Error("get segment from meta error", zap.Int64("segmentID", fcMsg.SegmentID), zap.Error(err))
continue
}
if fcMsg.MsgPosition != nil {
err = s.storeStreamPos(streamTypeFlush, fcMsg.MsgPosition)
if err != nil {
log.Error("Fail to store current success pos for segment flush stream",
zap.Stringer("pos", fcMsg.MsgPosition),
zap.Error(err))
}
} else {
log.Warn("Empty Msg Pos found ", zap.Int64("msgid", msg.ID()))
}
}
}
}
......
......@@ -13,6 +13,7 @@ package dataservice
import (
"context"
"math"
"math/rand"
"testing"
"time"
......@@ -486,6 +487,177 @@ func TestChannel(t *testing.T) {
})
}
func TestResumeChannel(t *testing.T) {
Params.Init()
segmentIDs := make([]int64, 0, 1000)
t.Run("Prepare Resume test set", func(t *testing.T) {
svr := newTestServer(t)
defer svr.Stop()
i := int64(-1)
cnt := 0
for ; cnt < 1000; i-- {
svr.meta.RLock()
_, has := svr.meta.segments[i]
svr.meta.RUnlock()
if has {
continue
}
err := svr.meta.AddSegment(&datapb.SegmentInfo{
ID: i,
CollectionID: -1,
})
assert.Nil(t, err)
segmentIDs = append(segmentIDs, i)
cnt++
}
})
t.Run("Test ResumeSegmentStatsChannel", func(t *testing.T) {
svr := newTestServer(t)
segRows := rand.Int63n(1000)
statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
statsStream.AsProducer([]string{Params.StatisticsChannelName})
statsStream.Start()
defer statsStream.Close()
genMsg := func(msgType commonpb.MsgType, t Timestamp, stats *internalpb.SegmentStatisticsUpdates) *msgstream.SegmentStatisticsMsg {
return &msgstream.SegmentStatisticsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentStatistics: internalpb.SegmentStatistics{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
SegStats: []*internalpb.SegmentStatisticsUpdates{stats},
},
}
}
ch := make(chan struct{})
go func() {
for _, segID := range segmentIDs {
stats := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
NumRows: segRows,
}
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, uint64(time.Now().Unix()), stats))
err := statsStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Millisecond * 5)
}
ch <- struct{}{}
}()
time.Sleep(time.Second)
svr.Stop()
time.Sleep(time.Millisecond * 50)
svr = newTestServer(t)
defer svr.Stop()
<-ch
//wait for Server processing last messages
time.Sleep(time.Second)
svr.meta.RLock()
defer svr.meta.RUnlock()
for _, segID := range segmentIDs {
seg, has := svr.meta.segments[segID]
assert.True(t, has)
if has {
assert.Equal(t, segRows, seg.NumRows)
}
}
})
t.Run("Test ResumeSegmentFlushChannel", func(t *testing.T) {
genMsg := func(msgType commonpb.MsgType, t Timestamp, segID int64) *msgstream.FlushCompletedMsg {
return &msgstream.FlushCompletedMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentFlushCompletedMsg: internalpb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
SegmentID: segID,
},
}
}
svr := newTestServer(t)
ch := make(chan struct{})
segInfoStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
segInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
segInfoStream.Start()
defer segInfoStream.Close()
go func() {
for _, segID := range segmentIDs {
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, uint64(time.Now().Unix()), segID))
err := segInfoStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Millisecond * 5)
}
ch <- struct{}{}
}()
time.Sleep(time.Millisecond * 50)
//stop current server, simulating server quit
svr.Stop()
time.Sleep(time.Second)
// start new test server as restarting
svr = newTestServer(t)
defer svr.Stop()
<-ch
//wait for Server processing last messages
time.Sleep(time.Second)
//ASSERT PART
svr.meta.RLock()
defer svr.meta.RUnlock()
for _, segID := range segmentIDs {
seg, has := svr.meta.segments[segID]
assert.True(t, has)
if has {
assert.Equal(t, seg.State, commonpb.SegmentState_Flushed)
}
}
})
t.Run("Clean up test segments", func(t *testing.T) {
svr := newTestServer(t)
defer closeTestServer(t, svr)
var err error
for _, segID := range segmentIDs {
err = svr.meta.DropSegment(segID)
assert.Nil(t, err)
}
})
}
func newTestServer(t *testing.T) *Server {
Params.Init()
var err error
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package dataservice
import (
"errors"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/msgstream"
)
type streamType int
const (
_ streamType = iota
streamTypeFlush
streamTypeStats
)
var (
errInvalidStreamType = errors.New("invalid stream type")
)
// storeStreamPos store current processed stream pos
func (s *Server) storeStreamPos(st streamType, pos *msgstream.MsgPosition) error {
key := s.streamTypeSubKey(st)
if key == "" {
return errInvalidStreamType
}
val := proto.MarshalTextString(pos)
err := s.kvClient.Save(key, val)
if err != nil {
return err
}
return nil
}
// loadStreamLastPos load last successful pos with specified stream type
func (s *Server) loadStreamLastPos(st streamType) (pos *msgstream.MsgPosition, err error) {
key := s.streamTypeSubKey(st)
if key == "" {
return nil, errInvalidStreamType
}
var val string
pos = &msgstream.MsgPosition{}
val, err = s.kvClient.Load(key)
if err != nil {
return pos, err
}
err = proto.UnmarshalText(val, pos)
return pos, err
}
// streamTypeSubKey converts stream type to corresponding k-v store key
func (s *Server) streamTypeSubKey(st streamType) string {
switch st {
case streamTypeFlush:
return Params.FlushStreamPosSubPath
case streamTypeStats:
return Params.StatsStreamPosSubPath
default:
return ""
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册