提交 e7521afe 编写于 作者: S sunby 提交者: zhenshan.cao

Change SaveBinlogPath (#5576)

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 d0d84580
......@@ -3,10 +3,10 @@ package dataservice
import (
"errors"
"path"
"sort"
"strconv"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
......@@ -174,42 +174,39 @@ func (s *Server) getDDLBinlogMeta(collID UniqueID) (metas []*datapb.DDLBinlogMet
}
// GetVChanPositions get vchannel latest postitions with provided dml channel names
func (s *Server) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelPair, error) {
func (s *Server) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, error) {
if s.kvClient == nil {
return nil, errNilKvClient
}
pairs := make([]*datapb.VchannelPair, 0, len(vchans))
pairs := make([]*datapb.VchannelInfo, 0, len(vchans))
for _, vchan := range vchans {
segments := s.meta.GetSegmentsByChannel(vchan.DmlChannel)
sort.Slice(segments, func(i, j int) bool {
return segments[i].ID < segments[j].ID
})
dmlPos := &datapb.PositionPair{}
ddlPos := &datapb.PositionPair{}
flushedSegmentIDs := make([]UniqueID, 0)
unflushedCheckpoints := make([]*datapb.CheckPoint, 0)
for _, s := range segments {
if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
flushedSegmentIDs = append(flushedSegmentIDs, s.ID)
continue
}
zp := zeroPos(vchan.DmlChannel)
dmlPos.StartPosition = &zp
dmlPos.EndPosition = &zp
ddlPos.StartPosition = &zp
ddlPos.EndPosition = &zp
if s.DmlPosition == nil {
continue
}
// find the last segment with not-nil position
for i := 0; i < len(segments); i++ {
if segments[i].DmlPosition == nil {
break
cp := &datapb.CheckPoint{
SegmentID: s.ID,
Position: s.DmlPosition,
NumOfRows: s.NumOfRows,
}
dmlPos = segments[i].DmlPosition
ddlPos = segments[i].DdlPosition
unflushedCheckpoints = append(unflushedCheckpoints, cp)
}
pairs = append(pairs, &datapb.VchannelPair{
pairs = append(pairs, &datapb.VchannelInfo{
CollectionID: vchan.CollectionID,
DmlVchannelName: vchan.DmlChannel,
DdlVchannelName: vchan.DdlChannel,
DdlPosition: ddlPos,
DmlPosition: dmlPos,
ChannelName: vchan.DmlChannel,
FlushedSegments: flushedSegmentIDs,
CheckPoints: unflushedCheckpoints,
})
}
return pairs, nil
......
......@@ -23,24 +23,21 @@ type vchannel struct {
// positionProvider provides vchannel pair related position pairs
type positionProvider interface {
GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelPair, error)
GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, error)
GetDdlChannel() string
}
type dummyPosProvider struct{}
//GetVChanPositions implements positionProvider
func (dp dummyPosProvider) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelPair, error) {
pairs := make([]*datapb.VchannelPair, len(vchans))
func (dp dummyPosProvider) GetVChanPositions(vchans []vchannel) ([]*datapb.VchannelInfo, error) {
pairs := make([]*datapb.VchannelInfo, len(vchans))
for _, vchan := range vchans {
dmlPos := &datapb.PositionPair{}
ddlPos := &datapb.PositionPair{}
pairs = append(pairs, &datapb.VchannelPair{
pairs = append(pairs, &datapb.VchannelInfo{
CollectionID: vchan.CollectionID,
DmlVchannelName: vchan.DmlChannel,
DdlVchannelName: vchan.DmlChannel,
DdlPosition: ddlPos,
DmlPosition: dmlPos,
ChannelName: vchan.DmlChannel,
FlushedSegments: []int64{},
CheckPoints: []*datapb.CheckPoint{},
})
}
return pairs, nil
......
......@@ -165,8 +165,8 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
state.Status.ErrorCode = commonpb.ErrorCode_Success
state.State = segmentInfo.State
if segmentInfo.DmlPosition != nil {
state.StartPosition = segmentInfo.DmlPosition.StartPosition
state.EndPosition = segmentInfo.DmlPosition.EndPosition
state.StartPosition = segmentInfo.DmlPosition
// FIXME no need this rpc
} else {
}
......@@ -295,10 +295,6 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
resp.Reason = "server is closed"
return resp, nil
}
if s.flushMsgStream == nil {
resp.Reason = "flush msg stream nil"
return resp, nil
}
// check segment id & collection id matched
_, err := s.meta.GetCollection(req.GetCollectionID())
......@@ -308,26 +304,26 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}
meta, err := s.prepareBinlog(req)
binlogs, err := s.prepareBinlog(req)
if err != nil {
log.Error("prepare binlog meta failed", zap.Error(err))
resp.Reason = err.Error()
return resp, nil
}
// set segment to SegmentState_Flushing
err = s.meta.FlushSegmentWithBinlogAndPos(req.SegmentID, nil,
nil, meta)
// set segment to SegmentState_Flushing and save binlogs and checkpoints
err = s.meta.SaveBinlogAndCheckPoints(req.SegmentID, req.Flushed, binlogs, req.CheckPoints)
if err != nil {
resp.Reason = err.Error()
return resp, nil
}
log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID),
zap.Any("meta", meta))
zap.Any("meta", binlogs))
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
if req.Flushed {
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
}
resp.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
......
......@@ -16,6 +16,8 @@ import (
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
......@@ -141,7 +143,7 @@ func (m *meta) GetNumRowsOfCollection(collectionID UniqueID) (int64, error) {
var ret int64 = 0
for _, info := range m.segments {
if info.CollectionID == collectionID {
ret += info.NumRows
ret += info.NumOfRows
}
}
return ret, nil
......@@ -167,7 +169,7 @@ func (m *meta) UpdateSegmentStatistic(stats *internalpb.SegmentStatisticsUpdates
if !ok {
return newErrSegmentNotFound(stats.SegmentID)
}
seg.NumRows = stats.NumRows
seg.NumOfRows = stats.NumRows
if err := m.saveSegmentInfo(seg); err != nil {
return err
}
......@@ -231,27 +233,34 @@ func (m *meta) SealSegment(segID UniqueID) error {
return nil
}
func (m *meta) FlushSegmentWithBinlogAndPos(segID UniqueID,
dmlPositionPair *datapb.PositionPair,
ddlPositionPair *datapb.PositionPair,
binlogMeta map[string]string) error {
func (m *meta) SaveBinlogAndCheckPoints(segID UniqueID, flushed bool,
binlogs map[string]string, checkpoints []*datapb.CheckPoint) error {
m.Lock()
defer m.Unlock()
segInfo, ok := m.segments[segID]
if !ok {
return newErrSegmentNotFound(segID)
}
kv := make(map[string]string)
for k, v := range binlogMeta {
for k, v := range binlogs {
kv[k] = v
}
segInfo.State = commonpb.SegmentState_Flushing
segInfo.DmlPosition = dmlPositionPair
segInfo.DdlPosition = ddlPositionPair
segBytes := proto.MarshalTextString(segInfo)
key := m.prepareSegmentPath(segInfo)
kv[key] = segBytes
if flushed {
segInfo.State = commonpb.SegmentState_Flushing
}
for _, cp := range checkpoints {
segment, ok := m.segments[cp.SegmentID]
if !ok {
log.Warn("Failed to find segment", zap.Int64("id", cp.SegmentID))
continue
}
segment.DmlPosition = cp.Position
segment.NumOfRows = cp.NumOfRows
segBytes := proto.MarshalTextString(segInfo)
key := m.prepareSegmentPath(segInfo)
kv[key] = segBytes
}
if err := m.saveKvTxn(kv); err != nil {
return err
......@@ -382,7 +391,7 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID
var ret int64 = 0
for _, info := range m.segments {
if info.CollectionID == collectionID && info.PartitionID == partitionID {
ret += info.NumRows
ret += info.NumOfRows
}
}
return ret, nil
......@@ -440,7 +449,7 @@ func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueI
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
NumRows: 0,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
}, nil
}
......@@ -204,7 +204,7 @@ func TestMeta_Basic(t *testing.T) {
assert.Nil(t, err)
segInfo0, err := BuildSegment(collID, partID0, segID0, channelName)
assert.Nil(t, err)
segInfo0.NumRows = rowCount0
segInfo0.NumOfRows = rowCount0
err = meta.AddSegment(segInfo0)
assert.Nil(t, err)
......@@ -213,7 +213,7 @@ func TestMeta_Basic(t *testing.T) {
assert.Nil(t, err)
segInfo1, err := BuildSegment(collID, partID0, segID1, channelName)
assert.Nil(t, err)
segInfo1.NumRows = rowCount1
segInfo1.NumOfRows = rowCount1
err = meta.AddSegment(segInfo1)
assert.Nil(t, err)
......
......@@ -286,19 +286,18 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
lastExpireTime: 0,
currentRows: 0,
}
s.stats[id] = segStatus
segmentInfo := &datapb.SegmentInfo{
ID: id,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: channelName,
NumRows: 0,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
MaxRowNum: int64(totalRows),
LastExpireTime: 0,
}
if err := s.meta.AddSegment(segmentInfo); err != nil {
return nil, err
}
......@@ -386,7 +385,7 @@ func (s *SegmentManager) tryToSealSegment() error {
if segStatus.sealed {
continue
}
sealed, err := s.checkSegmentSealed(segStatus)
sealed, err := s.shouldSeal(segStatus)
if err != nil {
return err
}
......@@ -402,7 +401,7 @@ func (s *SegmentManager) tryToSealSegment() error {
return nil
}
func (s *SegmentManager) checkSegmentSealed(segStatus *segmentStatus) (bool, error) {
func (s *SegmentManager) shouldSeal(segStatus *segmentStatus) (bool, error) {
var allocSize int64
for _, allocation := range segStatus.allocations {
allocSize += allocation.rowNums
......
......@@ -241,7 +241,7 @@ func TestGetSegmentStates(t *testing.T) {
CollectionID: 100,
PartitionID: 0,
InsertChannel: "",
NumRows: 0,
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
})
assert.Nil(t, err)
......@@ -496,6 +496,19 @@ func TestSaveBinlogPaths(t *testing.T) {
TsBinlogPath: "/by-dev/test/0/ts/Allo8",
},
},
CheckPoints: []*datapb.CheckPoint{
{
SegmentID: 0,
Position: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
NumOfRows: 10,
},
},
Flushed: false,
})
assert.Nil(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success)
......@@ -527,6 +540,11 @@ func TestSaveBinlogPaths(t *testing.T) {
assert.EqualValues(t, "/by-dev/test/0/ddl/Allo9", collMetas[1].DdlBinlogPath)
}
segmentInfo, err := svr.meta.GetSegment(0)
assert.Nil(t, err)
assert.EqualValues(t, segmentInfo.DmlPosition.ChannelName, "ch1")
assert.EqualValues(t, segmentInfo.DmlPosition.MsgID, []byte{1, 2, 3})
assert.EqualValues(t, segmentInfo.NumOfRows, 10)
})
t.Run("Abnormal SaveRequest", func(t *testing.T) {
ctx := context.Background()
......@@ -660,6 +678,7 @@ func TestGetVChannelPos(t *testing.T) {
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
})
assert.Nil(t, err)
err = svr.meta.AddSegment(&datapb.SegmentInfo{
......@@ -667,32 +686,35 @@ func TestGetVChannelPos(t *testing.T) {
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
DmlPosition: &internalpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 0,
},
})
assert.Nil(t, err)
req := &datapb.SaveBinlogPathsRequest{
SegmentID: 1,
CollectionID: 0,
Field2BinlogPaths: []*datapb.ID2PathList{},
DdlBinlogPaths: []*datapb.DDLBinlogMeta{},
}
status, err := svr.SaveBinlogPaths(context.TODO(), req)
err = svr.meta.AddSegment(&datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, status.ErrorCode)
t.Run("get unexisted channel", func(t *testing.T) {
pair, err := svr.GetVChanPositions([]vchannel{
{
CollectionID: 0,
DmlChannel: "chx1",
DdlChannel: "chx2",
},
})
assert.Nil(t, err)
assert.EqualValues(t, 1, len(pair))
assert.Nil(t, pair[0].DmlPosition.StartPosition.MsgID)
assert.Nil(t, pair[0].DmlPosition.EndPosition.MsgID)
assert.Nil(t, pair[0].DdlPosition.StartPosition.MsgID)
assert.Nil(t, pair[0].DdlPosition.EndPosition.MsgID)
assert.Empty(t, pair[0].CheckPoints)
assert.Empty(t, pair[0].FlushedSegments)
})
t.Run("get existed channel", func(t *testing.T) {
......@@ -706,10 +728,11 @@ func TestGetVChannelPos(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, 1, len(pair))
assert.EqualValues(t, 0, pair[0].CollectionID)
// assert.EqualValues(t, []byte{1, 2, 3}, pair[0].DmlPosition.StartPosition.MsgID)
// assert.EqualValues(t, []byte{3, 4, 5}, pair[0].DmlPosition.EndPosition.MsgID)
// assert.EqualValues(t, []byte{1, 2, 3}, pair[0].DdlPosition.StartPosition.MsgID)
// assert.EqualValues(t, []byte{3, 4, 5}, pair[0].DdlPosition.EndPosition.MsgID)
assert.EqualValues(t, 1, len(pair[0].FlushedSegments))
assert.EqualValues(t, 1, pair[0].FlushedSegments[0])
assert.EqualValues(t, 1, len(pair[0].CheckPoints))
assert.EqualValues(t, 2, pair[0].CheckPoints[0].SegmentID)
assert.EqualValues(t, []byte{1, 2, 3}, pair[0].CheckPoints[0].Position.MsgID)
})
}
......
......@@ -771,7 +771,7 @@ func (c *Core) SetDataService(ctx context.Context, s types.DataService) error {
retErr = nil
return
}
retRows = segInfo.Infos[0].NumRows
retRows = segInfo.Infos[0].NumOfRows
retErr = nil
return
}
......
......@@ -99,8 +99,8 @@ func (d *dataMock) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInf
},
Infos: []*datapb.SegmentInfo{
{
NumRows: Params.MinSegmentSizeToEnableIndex,
State: commonpb.SegmentState_Flushed,
NumOfRows: Params.MinSegmentSizeToEnableIndex,
State: commonpb.SegmentState_Flushed,
},
},
}, nil
......
......@@ -169,13 +169,6 @@ message GetPartitionStatisticsResponse {
message GetSegmentInfoChannelRequest {
}
message VchannelPair {
int64 collectionID = 1;
string dml_vchannel_name = 2;
string ddl_vchannel_name = 3;
PositionPair ddl_position = 4;
PositionPair dml_position = 5;
}
message VchannelInfo {
int64 collectionID = 1;
......@@ -239,12 +232,11 @@ message SegmentInfo {
int64 collectionID = 2;
int64 partitionID = 3;
string insert_channel = 4;
int64 num_rows = 5;
int64 num_of_rows = 5;
common.SegmentState state = 6;
PositionPair dmlPosition = 7;
PositionPair ddlPosition = 8;
int64 max_row_num = 9;
uint64 last_expire_time = 10;
internal.MsgPosition dml_position = 7;
int64 max_row_num = 8;
uint64 last_expire_time = 9;
}
message ID2PathList {
......@@ -269,9 +261,8 @@ message SaveBinlogPathsRequest {
message CheckPoint {
int64 segmentID = 1;
int64 collectionID = 2;
internal.MsgPosition position = 3;
int64 rows = 4;
internal.MsgPosition position = 2;
int64 num_of_rows = 3;
}
message DataNodeTtMsg {
......
......@@ -1445,7 +1445,7 @@ func (node *ProxyNode) GetPersistentSegmentInfo(ctx context.Context, req *milvus
SegmentID: info.ID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
NumRows: info.NumRows,
NumRows: info.NumOfRows,
State: info.State,
}
}
......
......@@ -2892,9 +2892,9 @@ func (gibpt *GetIndexBuildProgressTask) Execute(ctx context.Context) error {
indexed := int64(0)
for _, info := range infoResp.Infos {
total += info.NumRows
total += info.NumOfRows
if buildFinishMap[buildIndexMap[info.ID]] {
indexed += info.NumRows
indexed += info.NumOfRows
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册