提交 1aafe86f 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Change MsgPosition Logic

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 b92ff69c
......@@ -21,8 +21,7 @@ type collectionReplica interface {
hasCollection(collectionID UniqueID) bool
// segment
addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID,
positions []*internalpb2.MsgPosition) error
addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID, channelName string) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
updateStatistics(segmentID UniqueID, numRows int64) error
......@@ -32,16 +31,16 @@ type collectionReplica interface {
type (
Segment struct {
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew bool
createTime Timestamp // not using
endTime Timestamp // not using
startPositions []*internalpb2.MsgPosition
endPositions []*internalpb2.MsgPosition // not using
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew bool
createTime Timestamp // not using
endTime Timestamp // not using
startPosition *internalpb2.MsgPosition
endPosition *internalpb2.MsgPosition // not using
}
collectionReplicaImpl struct {
......@@ -74,21 +73,28 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se
return nil, errors.Errorf("Cannot find segment, id = %v", segmentID)
}
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collID UniqueID,
partitionID UniqueID, positions []*internalpb2.MsgPosition) error {
func (colReplica *collectionReplicaImpl) addSegment(
segmentID UniqueID,
collID UniqueID,
partitionID UniqueID,
channelName string) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
log.Println("Add Segment", segmentID)
position := &internalpb2.MsgPosition{
ChannelName: channelName,
}
seg := &Segment{
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
isNew: true,
createTime: 0,
startPositions: positions,
endPositions: make([]*internalpb2.MsgPosition, 0),
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
isNew: true,
createTime: 0,
startPosition: position,
endPosition: new(internalpb2.MsgPosition),
}
colReplica.segments = append(colReplica.segments, seg)
return nil
......@@ -151,7 +157,7 @@ func (colReplica *collectionReplicaImpl) getSegmentStatisticsUpdates(segmentID U
}
if ele.isNew {
updates.StartPositions = ele.startPositions
updates.StartPosition = ele.startPosition
ele.isNew = false
}
return updates, nil
......
......@@ -5,7 +5,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
func initTestReplicaMeta(t *testing.T, replica collectionReplica, collectionName string, collectionID UniqueID, segmentID UniqueID) {
......@@ -112,7 +111,7 @@ func TestReplica_Segment(t *testing.T) {
replica := newReplica()
assert.False(t, replica.hasSegment(0))
err := replica.addSegment(0, 1, 2, make([]*internalpb2.MsgPosition, 0))
err := replica.addSegment(0, 1, 2, "insert-01")
assert.NoError(t, err)
assert.True(t, replica.hasSegment(0))
......
......@@ -139,7 +139,7 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
DropPartitionRequest: dropPartitionReq,
}
replica.addSegment(1, collID, partitionID, make([]*internalpb2.MsgPosition, 0))
replica.addSegment(1, collID, partitionID, "insert-01")
inFlushCh <- &flushMsg{
msgID: 5,
timestamp: 5,
......
......@@ -112,7 +112,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
partitionID := msg.GetPartitionID()
if !ibNode.replica.hasSegment(currentSegID) {
err := ibNode.replica.addSegment(currentSegID, collID, partitionID, iMsg.startPositions)
err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())
if err != nil {
log.Println("Error: add segment error", err)
}
......@@ -134,13 +134,22 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
uniqueSeg[currentSegID] = true
}
}
segIDs := make([]UniqueID, 0, len(uniqueSeg))
for id := range uniqueSeg {
segIDs = append(segIDs, id)
}
err := ibNode.updateSegStatistics(segIDs)
if err != nil {
log.Println("Error: update segment statistics error, ", err)
if len(segIDs) > 0 {
switch {
case iMsg.startPositions == nil || len(iMsg.startPositions) <= 0:
log.Println("Warning: insert Msg StartPosition empty")
default:
err := ibNode.updateSegStatistics(segIDs, iMsg.startPositions[0])
if err != nil {
log.Println("Error: update segment statistics error, ", err)
}
}
}
// iMsg is insertMsg
......@@ -579,7 +588,7 @@ func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
return ibNode.timeTickStream.Produce(&msgPack)
}
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID, currentPosition *internalpb2.MsgPosition) error {
log.Println("Updating segments statistics...")
statsUpdates := make([]*internalpb2.SegmentStatisticsUpdates, 0, len(segIDs))
for _, segID := range segIDs {
......@@ -588,6 +597,8 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
log.Println("Error get segment", segID, "statistics updates", err)
continue
}
updates.StartPosition.Timestamp = currentPosition.GetTimestamp()
updates.StartPosition.MsgID = currentPosition.GetMsgID()
statsUpdates = append(statsUpdates, updates)
}
......@@ -603,7 +614,7 @@ func (ibNode *insertBufferNode) updateSegStatistics(segIDs []UniqueID) error {
var msg msgstream.TsMsg = &msgstream.SegmentStatisticsMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
HashValues: []uint32{0}, // GOOSE TODO
},
SegmentStatistics: segStats,
}
......
......@@ -648,8 +648,8 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg
state.CreateTime = segmentInfo.OpenTime
state.SealedTime = segmentInfo.SealedTime
state.FlushedTime = segmentInfo.FlushedTime
state.StartPositions = segmentInfo.StartPosition
state.EndPositions = segmentInfo.EndPosition
state.StartPosition = segmentInfo.StartPosition
state.EndPosition = segmentInfo.EndPosition
}
resp.States = append(resp.States, state)
}
......
......@@ -22,23 +22,9 @@ func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStat
if segStats.IsNewSegment {
segMeta.OpenTime = segStats.CreateTime
segMeta.StartPosition = append(segMeta.StartPosition, segStats.StartPositions...)
segMeta.StartPosition = segStats.StartPosition
}
segMeta.SealedTime = segStats.EndTime
for _, pos := range segStats.EndPositions {
isNew := true
for _, epos := range segMeta.EndPosition {
if epos.ChannelName == pos.ChannelName {
epos.Timestamp = pos.Timestamp
epos.MsgID = pos.MsgID
isNew = false
break
}
}
if isNew {
segMeta.EndPosition = append(segMeta.EndPosition, pos)
}
}
segMeta.NumRows = segStats.NumRows
segMeta.MemSize = segStats.MemorySize
......
......@@ -82,8 +82,8 @@ message SegmentStateInfo {
uint64 create_time = 3;
uint64 sealed_time = 4;
uint64 flushed_time = 5;
repeated internal.MsgPosition start_positions = 6;
repeated internal.MsgPosition end_positions = 7;
internal.MsgPosition start_position = 6;
internal.MsgPosition end_position = 7;
common.Status status = 8;
}
......@@ -145,8 +145,8 @@ message SegmentInfo {
int64 num_rows = 8;
int64 mem_size = 9;
common.SegmentState state = 10;
repeated internal.MsgPosition start_position = 11;
repeated internal.MsgPosition end_position = 12;
internal.MsgPosition start_position = 11;
internal.MsgPosition end_position = 12;
}
message SegmentMsg{
......
......@@ -179,8 +179,8 @@ message SegmentStatisticsUpdates {
int64 NumRows = 3;
uint64 create_time = 4;
uint64 end_time = 5;
repeated internal.MsgPosition start_positions = 6;
repeated internal.MsgPosition end_positions = 7;
internal.MsgPosition start_position = 6;
internal.MsgPosition end_position = 7;
bool isNewSegment = 8;
}
......
......@@ -442,10 +442,8 @@ func (node *NodeImpl) GetIndexState(request *milvuspb.IndexStateRequest) (*milvu
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
dipt := &GetIndexStateTask{
Condition: NewTaskCondition(ctx),
IndexStateRequest: request,
indexServiceClient: node.indexServiceClient,
masterClientInterface: node.masterClient,
Condition: NewTaskCondition(ctx),
IndexStateRequest: request,
}
err := node.sched.DdQueue.Enqueue(dipt)
......
......@@ -23,7 +23,6 @@ type MasterClient interface {
CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error)
DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error)
ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error)
DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error)
}
type IndexServiceClient interface {
......
......@@ -6,8 +6,6 @@ import (
"math"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/golang/protobuf/proto"
......@@ -1287,9 +1285,8 @@ func (dit *DescribeIndexTask) PostExecute() error {
type GetIndexStateTask struct {
Condition
*milvuspb.IndexStateRequest
indexServiceClient IndexServiceClient
masterClientInterface MasterClient
result *milvuspb.IndexStateResponse
indexServiceClient IndexServiceClient
result *milvuspb.IndexStateResponse
}
func (dipt *GetIndexStateTask) OnEnqueue() error {
......@@ -1339,98 +1336,17 @@ func (dipt *GetIndexStateTask) PreExecute() error {
}
func (dipt *GetIndexStateTask) Execute() error {
collectionName := dipt.CollectionName
collectionID, err := globalMetaCache.GetCollectionID(collectionName)
if err != nil { // err is not nil if collection not exists
return err
}
showPartitionRequest := &milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowPartitions,
MsgID: dipt.Base.MsgID,
Timestamp: dipt.Base.Timestamp,
SourceID: Params.ProxyID,
},
DbName: dipt.DbName,
CollectionName: collectionName,
CollectionID: collectionID,
}
partitions, err := dipt.masterClientInterface.ShowPartitions(showPartitionRequest)
if err != nil {
return err
}
for _, partitionID := range partitions.PartitionIDs {
showSegmentsRequest := &milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
MsgID: dipt.Base.MsgID,
Timestamp: dipt.Base.Timestamp,
SourceID: Params.ProxyID,
},
CollectionID: collectionID,
PartitionID: partitionID,
}
segments, err := dipt.masterClientInterface.ShowSegments(showSegmentsRequest)
if err != nil {
return err
}
getIndexStatesRequest := &indexpb.IndexStatesRequest{
IndexBuildIDs: make([]UniqueID, 0),
}
for _, segmentID := range segments.SegmentIDs {
describeSegmentRequest := &milvuspb.DescribeSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kDescribeSegment,
MsgID: dipt.Base.MsgID,
Timestamp: dipt.Base.Timestamp,
SourceID: Params.ProxyID,
},
CollectionID: collectionID,
SegmentID: segmentID,
}
segmentDesc, err := dipt.masterClientInterface.DescribeSegment(describeSegmentRequest)
if err != nil {
return err
}
getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, segmentDesc.BuildID)
}
states, err := dipt.indexServiceClient.GetIndexStates(getIndexStatesRequest)
if err != nil {
return err
}
if states.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
dipt.result = &milvuspb.IndexStateResponse{
Status: states.Status,
State: commonpb.IndexState_FAILED,
}
return nil
}
for _, state := range states.States {
if state.State != commonpb.IndexState_FINISHED {
dipt.result = &milvuspb.IndexStateResponse{
Status: states.Status,
State: state.State,
}
return nil
}
}
}
// TODO: use index service client
//var err error
//dipt.result, err = dipt.masterClient.GetIndexState(dipt.IndexStateRequest)
//return err
dipt.result = &milvuspb.IndexStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
ErrorCode: 0,
Reason: "",
},
State: commonpb.IndexState_FINISHED,
}
return nil
}
......
......@@ -383,7 +383,7 @@ func (s *loadService) releaseSegment(segmentID UniqueID) error {
return err
}
func (s *loadService) seekSegment(positions []*internalpb2.MsgPosition) error {
func (s *loadService) seekSegment(position *internalpb2.MsgPosition) error {
// TODO: open seek
//for _, position := range positions {
// err := s.dmStream.Seek(position)
......
......@@ -446,8 +446,8 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
// segments are ordered before LoadSegments calling
if in.LastSegmentState.State == commonpb.SegmentState_SegmentGrowing {
segmentNum := len(segmentIDs)
positions := in.LastSegmentState.StartPositions
err = node.loadService.seekSegment(positions)
position := in.LastSegmentState.StartPosition
err = node.loadService.seekSegment(position)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
......
......@@ -318,14 +318,14 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
segmentID := state.SegmentID
segmentStates[segmentID] = state
var flatChannelName string
channelNames := make([]string, 0)
for i, str := range state.StartPositions {
flatChannelName += str.ChannelName
channelNames = append(channelNames, str.ChannelName)
if i+1 < len(state.StartPositions) {
flatChannelName += "/"
}
}
// channelNames := make([]string, 0)
// for i, str := range state.StartPositions {
// flatChannelName += str.ChannelName
// channelNames = append(channelNames, str.ChannelName)
// if i+1 < len(state.StartPositions) {
// flatChannelName += "/"
// }
// }
if flatChannelName == "" {
log.Fatal("segmentState's channel name is empty")
}
......@@ -365,8 +365,8 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
if channels == node.insertChannels {
statesID := id2segs[i][len(id2segs[i])-1]
//TODO :: should be start position
position := segmentStates[statesID-1].StartPositions
segmentStates[statesID].StartPositions = position
// position := segmentStates[statesID-1].StartPositions
// segmentStates[statesID].StartPositions = position
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
......
......@@ -115,10 +115,10 @@ func newDataMock() *dataMock {
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
SegmentID: segmentID,
State: state,
CreateTime: time,
StartPositions: position,
SegmentID: segmentID,
State: state,
CreateTime: time,
// StartPositions: position,
}
}
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册