提交 3afa7dd5 编写于 作者: Z zhenshan.cao 提交者: yefu.chen

Fix GetSegmentStates

Signed-off-by: Nzhenshan.cao <zhenshan.cao@zilliz.com>
上级 5de89422
...@@ -132,7 +132,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg { ...@@ -132,7 +132,7 @@ func (ddNode *ddNode) Operate(in []*Msg) []*Msg {
} }
default: default:
log.Println(". default: do nothing ...") //log.Println(". default: do nothing ...")
} }
// generate binlog // generate binlog
......
...@@ -623,18 +623,28 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg ...@@ -623,18 +623,28 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg
return resp, nil return resp, nil
} }
segmentInfo, err := s.meta.GetSegment(req.SegmentID) for _, segmentID := range req.SegmentIDs {
if err != nil { state := &datapb.SegmentStateInfo{
resp.Status.Reason = "get segment states error: " + err.Error() Status: &commonpb.Status{},
return resp, nil SegmentID: segmentID,
}
segmentInfo, err := s.meta.GetSegment(segmentID)
if err != nil {
state.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
state.Status.Reason = "get segment states error: " + err.Error()
} else {
state.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
state.State = segmentInfo.State
state.CreateTime = segmentInfo.OpenTime
state.SealedTime = segmentInfo.SealedTime
state.FlushedTime = segmentInfo.FlushedTime
state.StartPositions = segmentInfo.StartPosition
state.EndPositions = segmentInfo.EndPosition
}
resp.States = append(resp.States, state)
} }
resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
resp.State = segmentInfo.State
resp.CreateTime = segmentInfo.OpenTime
resp.SealedTime = segmentInfo.SealedTime
resp.FlushedTime = segmentInfo.FlushedTime
resp.StartPositions = segmentInfo.StartPosition
resp.EndPositions = segmentInfo.EndPosition
return resp, nil return resp, nil
} }
......
...@@ -1271,7 +1271,7 @@ func (c *Core) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, e ...@@ -1271,7 +1271,7 @@ func (c *Core) AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, e
Count: 0, Count: 0,
}, nil }, nil
} }
log.Printf("AllocTimestamp : %d", ts) // log.Printf("AllocTimestamp : %d", ts)
return &masterpb.TsoResponse{ return &masterpb.TsoResponse{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS, ErrorCode: commonpb.ErrorCode_SUCCESS,
......
...@@ -82,17 +82,23 @@ enum SegmentState { ...@@ -82,17 +82,23 @@ enum SegmentState {
message SegmentStatesRequest { message SegmentStatesRequest {
common.MsgBase base = 1; common.MsgBase base = 1;
int64 segmentID = 2; repeated int64 segmentIDs = 2;
}
message SegmentStateInfo {
int64 segmentID = 1;
SegmentState state = 2;
uint64 create_time = 3;
uint64 sealed_time = 4;
uint64 flushed_time = 5;
repeated internal.MsgPosition start_positions = 6;
repeated internal.MsgPosition end_positions = 7;
common.Status status = 8;
} }
message SegmentStatesResponse { message SegmentStatesResponse {
SegmentState state = 1; common.Status status = 1;
uint64 create_time = 2; repeated SegmentStateInfo states = 2;
uint64 sealed_time = 3;
uint64 flushed_time = 4;
repeated internal.MsgPosition start_positions = 5;
repeated internal.MsgPosition end_positions = 6;
common.Status status = 7;
} }
message InsertBinlogPathRequest { message InsertBinlogPathRequest {
...@@ -128,15 +134,15 @@ message FlushSegRequest { ...@@ -128,15 +134,15 @@ message FlushSegRequest {
} }
message SegmentInfo { message SegmentInfo {
int64 segmentID=1; int64 segmentID = 1;
int64 collectionID =2; int64 collectionID = 2;
int64 partitionID=3; int64 partitionID = 3;
repeated string insert_channels = 4; repeated string insert_channels = 4;
uint64 open_time=5; uint64 open_time = 5;
uint64 sealed_time = 6; uint64 sealed_time = 6;
uint64 flushed_time = 7; uint64 flushed_time = 7;
int64 num_rows=8; int64 num_rows = 8;
int64 mem_size=9; int64 mem_size = 9;
SegmentState state = 10; SegmentState state = 10;
repeated internal.MsgPosition start_position = 11; repeated internal.MsgPosition start_position = 11;
repeated internal.MsgPosition end_position = 12; repeated internal.MsgPosition end_position = 12;
...@@ -171,41 +177,41 @@ message PartitionStatsResponse { ...@@ -171,41 +177,41 @@ message PartitionStatsResponse {
} }
message FieldFlushMeta { message FieldFlushMeta {
int64 fieldID = 1; int64 fieldID = 1;
repeated string binlog_paths = 2; repeated string binlog_paths = 2;
} }
message SegmentFlushMeta{ message SegmentFlushMeta{
int64 segmentID = 1; int64 segmentID = 1;
bool is_flushed = 2; bool is_flushed = 2;
repeated FieldFlushMeta fields = 5; repeated FieldFlushMeta fields = 5;
} }
message DDLFlushMeta { message DDLFlushMeta {
int64 collectionID = 1; int64 collectionID = 1;
repeated string binlog_paths = 2; repeated string binlog_paths = 2;
} }
service DataService { service DataService {
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {} rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {}
rpc Flush(FlushRequest) returns (common.Status) {} rpc Flush(FlushRequest) returns (common.Status) {}
rpc AssignSegmentID(AssignSegIDRequest) returns (AssignSegIDResponse) {} rpc AssignSegmentID(AssignSegIDRequest) returns (AssignSegIDResponse) {}
rpc ShowSegments(ShowSegmentRequest) returns (ShowSegmentResponse) {} rpc ShowSegments(ShowSegmentRequest) returns (ShowSegmentResponse) {}
rpc GetSegmentStates(SegmentStatesRequest) returns (SegmentStatesResponse) {} rpc GetSegmentStates(SegmentStatesRequest) returns (SegmentStatesResponse) {}
rpc GetInsertBinlogPaths(InsertBinlogPathRequest) returns (InsertBinlogPathsResponse) {} rpc GetInsertBinlogPaths(InsertBinlogPathRequest) returns (InsertBinlogPathsResponse) {}
rpc GetInsertChannels(InsertChannelRequest) returns (internal.StringList) {} rpc GetInsertChannels(InsertChannelRequest) returns (internal.StringList) {}
rpc GetCollectionStatistics(CollectionStatsRequest) returns (CollectionStatsResponse) {} rpc GetCollectionStatistics(CollectionStatsRequest) returns (CollectionStatsResponse) {}
rpc GetPartitionStatistics(PartitionStatsRequest) returns (PartitionStatsResponse) {} rpc GetPartitionStatistics(PartitionStatsRequest) returns (PartitionStatsResponse) {}
rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {} rpc GetComponentStates(common.Empty) returns (internal.ComponentStates) {}
rpc GetTimeTickChannel(common.Empty) returns(milvus.StringResponse) {} rpc GetTimeTickChannel(common.Empty) returns(milvus.StringResponse) {}
rpc GetStatisticsChannel(common.Empty) returns(milvus.StringResponse){} rpc GetStatisticsChannel(common.Empty) returns(milvus.StringResponse){}
rpc GetSegmentInfoChannel(common.Empty) returns (milvus.StringResponse){} rpc GetSegmentInfoChannel(common.Empty) returns (milvus.StringResponse){}
} }
service DataNode { service DataNode {
......
...@@ -124,7 +124,7 @@ message LoadSegmentRequest { ...@@ -124,7 +124,7 @@ message LoadSegmentRequest {
int64 partitionID = 4; int64 partitionID = 4;
repeated int64 segmentIDs = 5; repeated int64 segmentIDs = 5;
repeated int64 fieldIDs = 6; repeated int64 fieldIDs = 6;
data.SegmentStatesResponse last_segment_state = 7; data.SegmentStateInfo last_segment_state = 7;
} }
message ReleaseSegmentRequest { message ReleaseSegmentRequest {
...@@ -152,7 +152,7 @@ service QueryService { ...@@ -152,7 +152,7 @@ service QueryService {
rpc LoadCollection(LoadCollectionRequest) returns (common.Status) {} rpc LoadCollection(LoadCollectionRequest) returns (common.Status) {}
rpc ReleaseCollection(ReleaseCollectionRequest) returns (common.Status) {} rpc ReleaseCollection(ReleaseCollectionRequest) returns (common.Status) {}
rpc CreateQueryChannel(common.Empty ) returns (CreateQueryChannelResponse) {} rpc CreateQueryChannel(common.Empty) returns (CreateQueryChannelResponse) {}
rpc GetTimeTickChannel(common.Empty) returns (milvus.StringResponse) {} rpc GetTimeTickChannel(common.Empty) returns (milvus.StringResponse) {}
rpc GetStatisticsChannel(common.Empty) returns (milvus.StringResponse) {} rpc GetStatisticsChannel(common.Empty) returns (milvus.StringResponse) {}
rpc GetPartitionStates(PartitionStatesRequest) returns (PartitionStatesResponse) {} rpc GetPartitionStates(PartitionStatesRequest) returns (PartitionStatesResponse) {}
......
...@@ -972,16 +972,16 @@ func (m *WatchDmChannelsRequest) GetChannelIDs() []string { ...@@ -972,16 +972,16 @@ func (m *WatchDmChannelsRequest) GetChannelIDs() []string {
} }
type LoadSegmentRequest struct { type LoadSegmentRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"` PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"` SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"` FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
LastSegmentState *datapb.SegmentStatesResponse `protobuf:"bytes,7,opt,name=last_segment_state,json=lastSegmentState,proto3" json:"last_segment_state,omitempty"` LastSegmentState *datapb.SegmentStateInfo `protobuf:"bytes,7,opt,name=last_segment_state,json=lastSegmentState,proto3" json:"last_segment_state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
} }
func (m *LoadSegmentRequest) Reset() { *m = LoadSegmentRequest{} } func (m *LoadSegmentRequest) Reset() { *m = LoadSegmentRequest{} }
...@@ -1051,7 +1051,7 @@ func (m *LoadSegmentRequest) GetFieldIDs() []int64 { ...@@ -1051,7 +1051,7 @@ func (m *LoadSegmentRequest) GetFieldIDs() []int64 {
return nil return nil
} }
func (m *LoadSegmentRequest) GetLastSegmentState() *datapb.SegmentStatesResponse { func (m *LoadSegmentRequest) GetLastSegmentState() *datapb.SegmentStateInfo {
if m != nil { if m != nil {
return m.LastSegmentState return m.LastSegmentState
} }
...@@ -1211,81 +1211,81 @@ func init() { ...@@ -1211,81 +1211,81 @@ func init() {
func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) } func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) }
var fileDescriptor_5fcb6756dc1afb8d = []byte{ var fileDescriptor_5fcb6756dc1afb8d = []byte{
// 1174 bytes of a gzipped FileDescriptorProto // 1177 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xdd, 0x6e, 0x1b, 0x45, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0xdd, 0x6e, 0x1b, 0x45,
0x14, 0xf6, 0xc6, 0x4e, 0x5a, 0x9f, 0xb8, 0xb6, 0x3b, 0xf9, 0xd5, 0x82, 0xaa, 0x32, 0x40, 0x9b, 0x14, 0xf6, 0xc6, 0x3f, 0xad, 0x4f, 0x5c, 0xdb, 0x9d, 0xe6, 0x4f, 0x0b, 0xaa, 0xca, 0x00, 0x6d,
0x1f, 0x70, 0x50, 0x2a, 0x21, 0xae, 0x40, 0x49, 0x5c, 0x45, 0x96, 0x68, 0x08, 0x9b, 0x94, 0x8a, 0x7e, 0xc0, 0x46, 0xae, 0x84, 0xb8, 0x02, 0x25, 0x71, 0x15, 0x59, 0xa2, 0x21, 0x5d, 0xa7, 0xaa,
0x40, 0x65, 0xd6, 0xbb, 0x83, 0x3d, 0xed, 0xfe, 0xb8, 0x3b, 0xe3, 0x40, 0x72, 0x03, 0x48, 0x5c, 0x08, 0x54, 0x66, 0xbd, 0x3b, 0xb5, 0xa7, 0xdd, 0x1f, 0x77, 0x67, 0x1c, 0x48, 0x6e, 0x00, 0x89,
0xc3, 0x33, 0x20, 0x10, 0x95, 0xb8, 0xe6, 0x6d, 0xb8, 0xe2, 0x11, 0x78, 0x03, 0xb4, 0xb3, 0xeb, 0x6b, 0x78, 0x06, 0x04, 0xa2, 0x12, 0x37, 0x3c, 0x10, 0x57, 0x3c, 0x02, 0x6f, 0x80, 0x76, 0x76,
0xcd, 0xce, 0x7a, 0x1c, 0x3b, 0x71, 0xdb, 0xf4, 0x6e, 0x67, 0xf6, 0x9c, 0xf3, 0x7d, 0xe7, 0xcc, 0xbd, 0xd9, 0x5d, 0x8f, 0x63, 0xa7, 0x6e, 0x1b, 0xee, 0x76, 0x66, 0xcf, 0x39, 0xdf, 0x77, 0xce,
0xcc, 0x99, 0x6f, 0x60, 0xee, 0x59, 0x8f, 0x04, 0x27, 0x4d, 0x46, 0x82, 0x63, 0x6a, 0x91, 0x5a, 0xcc, 0x9c, 0xf9, 0x06, 0x6e, 0x3c, 0x1f, 0x12, 0xef, 0xa4, 0xc3, 0x88, 0x77, 0x4c, 0x0d, 0x52,
0x37, 0xf0, 0xb9, 0x8f, 0x90, 0x4b, 0x9d, 0xe3, 0x1e, 0x8b, 0x46, 0x35, 0x61, 0xa1, 0x97, 0x2c, 0x1b, 0x78, 0x2e, 0x77, 0x11, 0xb2, 0xa9, 0x75, 0x3c, 0x64, 0xc1, 0xa8, 0x26, 0x2c, 0xd4, 0x92,
0xdf, 0x75, 0x7d, 0x2f, 0x9a, 0xd3, 0x4b, 0x69, 0x0b, 0xbd, 0x4c, 0x3d, 0x4e, 0x02, 0xcf, 0x74, 0xe1, 0xda, 0xb6, 0xeb, 0x04, 0x73, 0x6a, 0x29, 0x6e, 0xa1, 0x96, 0xa9, 0xc3, 0x89, 0xe7, 0xe8,
0xe2, 0x31, 0xb2, 0x4d, 0x6e, 0xca, 0x31, 0xf1, 0x0f, 0x30, 0x67, 0x90, 0x36, 0x65, 0x9c, 0x04, 0x56, 0x38, 0x46, 0xa6, 0xce, 0xf5, 0x64, 0x4c, 0xfc, 0x3d, 0xdc, 0xd0, 0x48, 0x8f, 0x32, 0x4e,
0x7b, 0xbe, 0x4d, 0x0c, 0xf2, 0xac, 0x47, 0x18, 0x47, 0x1f, 0x40, 0xa1, 0x65, 0x32, 0xb2, 0xac, 0xbc, 0x7d, 0xd7, 0x24, 0x1a, 0x79, 0x3e, 0x24, 0x8c, 0xa3, 0x8f, 0x20, 0xd7, 0xd5, 0x19, 0x59,
0xdd, 0xd6, 0x56, 0x66, 0x37, 0xdf, 0xac, 0x49, 0xc8, 0x31, 0xe4, 0x03, 0xd6, 0xde, 0x36, 0x19, 0x53, 0x6e, 0x29, 0xeb, 0x8b, 0x8d, 0xb7, 0x6b, 0x09, 0xe4, 0x10, 0xf2, 0x3e, 0xeb, 0xed, 0xe8,
0x31, 0x84, 0x25, 0xfa, 0x10, 0xae, 0x99, 0xb6, 0x1d, 0x10, 0xc6, 0x96, 0xa7, 0xce, 0x71, 0xda, 0x8c, 0x68, 0xc2, 0x12, 0x7d, 0x0c, 0x57, 0x74, 0xd3, 0xf4, 0x08, 0x63, 0x6b, 0x0b, 0xe7, 0x38,
0x8a, 0x6c, 0x8c, 0xbe, 0x31, 0xfe, 0x55, 0x83, 0x79, 0x99, 0x01, 0xeb, 0xfa, 0x1e, 0x23, 0xe8, 0x6d, 0x07, 0x36, 0xda, 0xc8, 0x18, 0xff, 0xa2, 0xc0, 0x52, 0x92, 0x01, 0x1b, 0xb8, 0x0e, 0x23,
0x1e, 0xcc, 0x30, 0x6e, 0xf2, 0x1e, 0x8b, 0x49, 0xbc, 0xa1, 0x8c, 0x77, 0x20, 0x4c, 0x8c, 0xd8, 0xe8, 0x2e, 0x14, 0x18, 0xd7, 0xf9, 0x90, 0x85, 0x24, 0xde, 0x92, 0xc6, 0x6b, 0x0b, 0x13, 0x2d,
0x14, 0x6d, 0xc3, 0x2c, 0xf5, 0x28, 0x6f, 0x76, 0xcd, 0xc0, 0x74, 0xfb, 0x4c, 0xde, 0x92, 0x3d, 0x34, 0x45, 0x3b, 0xb0, 0x48, 0x1d, 0xca, 0x3b, 0x03, 0xdd, 0xd3, 0xed, 0x11, 0x93, 0x77, 0x92,
0x93, 0xaa, 0x34, 0x3c, 0xca, 0xf7, 0x85, 0xa1, 0x01, 0x34, 0xf9, 0xc6, 0x8f, 0x61, 0xe1, 0xa0, 0x9e, 0x51, 0x55, 0x5a, 0x0e, 0xe5, 0x07, 0xc2, 0x50, 0x03, 0x1a, 0x7d, 0xe3, 0xc7, 0xb0, 0xdc,
0xe3, 0x7f, 0xb7, 0xe3, 0x3b, 0x0e, 0xb1, 0x38, 0xf5, 0xbd, 0xcb, 0x17, 0x05, 0x41, 0xc1, 0x6e, 0xee, 0xbb, 0xdf, 0xee, 0xba, 0x96, 0x45, 0x0c, 0x4e, 0x5d, 0xe7, 0xe5, 0x8b, 0x82, 0x20, 0x67,
0x35, 0xea, 0x82, 0x47, 0xde, 0x10, 0xdf, 0x98, 0xc1, 0x62, 0x36, 0xfc, 0x24, 0x19, 0xbf, 0x03, 0x76, 0x5b, 0x4d, 0xc1, 0x23, 0xab, 0x89, 0x6f, 0xcc, 0x60, 0x25, 0x1d, 0x7e, 0x9e, 0x8c, 0xdf,
0x37, 0xac, 0x24, 0x54, 0xa3, 0x1e, 0xe6, 0x9c, 0x5f, 0xc9, 0x1b, 0xf2, 0x24, 0xfe, 0x49, 0x83, 0x83, 0x6b, 0x46, 0x14, 0xaa, 0xd5, 0xf4, 0x73, 0xce, 0xae, 0x67, 0xb5, 0xe4, 0x24, 0xfe, 0x51,
0x85, 0x4f, 0x7d, 0xd3, 0x7e, 0x49, 0x49, 0x21, 0x0c, 0xa5, 0x34, 0xe0, 0x72, 0x5e, 0xfc, 0x93, 0x81, 0xe5, 0xcf, 0x5d, 0xdd, 0x7c, 0x4d, 0x49, 0x21, 0x0c, 0xa5, 0x38, 0xe0, 0x5a, 0x56, 0xfc,
0xe6, 0xf0, 0xcf, 0x1a, 0x2c, 0x1b, 0xc4, 0x21, 0x26, 0x23, 0x57, 0x49, 0xe3, 0x47, 0x0d, 0xe6, 0x4b, 0xcc, 0xe1, 0x9f, 0x14, 0x58, 0xd3, 0x88, 0x45, 0x74, 0x46, 0x2e, 0x93, 0xc6, 0x0f, 0x0a,
0xc3, 0x05, 0xd8, 0x37, 0x03, 0x4e, 0xaf, 0x86, 0x42, 0x37, 0xda, 0x61, 0x29, 0x06, 0x93, 0xec, 0x2c, 0xf9, 0x0b, 0x70, 0xa0, 0x7b, 0x9c, 0x5e, 0x0e, 0x85, 0x41, 0xb0, 0xc3, 0x62, 0x0c, 0xe6,
0x00, 0x0c, 0xa5, 0x6e, 0x3f, 0xd2, 0xd9, 0x06, 0x90, 0xe6, 0xb0, 0x0b, 0x95, 0x04, 0x2d, 0x74, 0xd9, 0x01, 0x18, 0x4a, 0x83, 0x51, 0xa4, 0xb3, 0x0d, 0x90, 0x98, 0xc3, 0x36, 0x54, 0x22, 0x34,
0x27, 0x0c, 0xdd, 0x86, 0xd9, 0x94, 0x89, 0x00, 0xcc, 0x1b, 0xe9, 0x29, 0xf4, 0x11, 0x4c, 0x87, 0xdf, 0x9d, 0x30, 0x74, 0x0b, 0x16, 0x63, 0x26, 0x02, 0x30, 0xab, 0xc5, 0xa7, 0xd0, 0x27, 0x90,
0x10, 0x44, 0xe4, 0x57, 0xde, 0xc4, 0xb5, 0xc1, 0xfe, 0x53, 0x93, 0xa3, 0x1a, 0x91, 0x03, 0xfe, 0xf7, 0x21, 0x88, 0xc8, 0xaf, 0xdc, 0xc0, 0xb5, 0xf1, 0xfe, 0x53, 0x4b, 0x46, 0xd5, 0x02, 0x07,
0x53, 0x83, 0xc5, 0x0c, 0xde, 0x2b, 0xaf, 0xf2, 0x40, 0x5d, 0x0a, 0x8a, 0xba, 0xfc, 0xa5, 0xc1, 0xfc, 0x87, 0x02, 0x2b, 0x29, 0xbc, 0x37, 0x5e, 0xe5, 0xb1, 0xba, 0xe4, 0x24, 0x75, 0xf9, 0x53,
0xd2, 0x00, 0xd1, 0x49, 0x16, 0xe3, 0x08, 0x16, 0x13, 0x80, 0xa6, 0x4d, 0x98, 0x15, 0xd0, 0x6e, 0x81, 0xd5, 0x31, 0xa2, 0xf3, 0x2c, 0xc6, 0x11, 0xac, 0x44, 0x00, 0x1d, 0x93, 0x30, 0xc3, 0xa3,
0xf8, 0x1d, 0x2d, 0xcb, 0xec, 0xe6, 0xdb, 0xa3, 0x8b, 0xc8, 0x8c, 0x85, 0x24, 0x44, 0x3d, 0x15, 0x03, 0xff, 0x3b, 0x58, 0x96, 0xc5, 0xc6, 0xbb, 0xd3, 0x8b, 0xc8, 0xb4, 0xe5, 0x28, 0x44, 0x33,
0x01, 0xff, 0xa1, 0xc1, 0x7c, 0x78, 0x88, 0xaf, 0x6e, 0xe7, 0x8e, 0x55, 0xd3, 0xe7, 0x1a, 0x2c, 0x16, 0x01, 0xff, 0xae, 0xc0, 0x92, 0x7f, 0x88, 0x2f, 0x6f, 0xe7, 0xce, 0x54, 0xd3, 0x17, 0x0a,
0xc5, 0xe7, 0xfc, 0x35, 0x67, 0xfa, 0x9b, 0x06, 0xfa, 0x4e, 0x40, 0x4c, 0x4e, 0x3e, 0x0f, 0xd7, 0xac, 0x86, 0xe7, 0xfc, 0x7f, 0xce, 0xf4, 0x57, 0x05, 0xd4, 0x5d, 0x8f, 0xe8, 0x9c, 0x3c, 0xf0,
0x61, 0xa7, 0x63, 0x7a, 0x1e, 0x71, 0x26, 0xdb, 0x00, 0x77, 0xa1, 0x12, 0x44, 0xc9, 0x36, 0xad, 0xd7, 0x61, 0xb7, 0xaf, 0x3b, 0x0e, 0xb1, 0xe6, 0xdb, 0x00, 0x77, 0xa0, 0xe2, 0x05, 0xc9, 0x76,
0x28, 0x9e, 0xa0, 0x5e, 0x34, 0xca, 0xf1, 0x74, 0x8c, 0x82, 0xde, 0x85, 0x72, 0x40, 0x58, 0xcf, 0x8c, 0x20, 0x9e, 0xa0, 0x5e, 0xd4, 0xca, 0xe1, 0x74, 0x88, 0x82, 0xde, 0x87, 0xb2, 0x47, 0xd8,
0x39, 0xb3, 0xcb, 0x0b, 0xbb, 0x1b, 0xd1, 0x6c, 0x6c, 0x86, 0x7f, 0xd7, 0x60, 0x69, 0xcb, 0xb6, 0xd0, 0x3a, 0xb3, 0xcb, 0x0a, 0xbb, 0x6b, 0xc1, 0x6c, 0x68, 0x86, 0x7f, 0x53, 0x60, 0x75, 0xdb,
0xd3, 0x04, 0x27, 0x38, 0x4b, 0xeb, 0x70, 0x33, 0xc3, 0x2e, 0x2e, 0x6d, 0xd1, 0xa8, 0xca, 0xfc, 0x34, 0xe3, 0x04, 0xe7, 0x38, 0x4b, 0x5b, 0x70, 0x3d, 0xc5, 0x2e, 0x2c, 0x6d, 0x51, 0xab, 0x26,
0x1a, 0x75, 0xb4, 0x0a, 0x55, 0x99, 0x61, 0x5c, 0xea, 0xa2, 0x51, 0x91, 0x38, 0x36, 0xea, 0xf8, 0xf9, 0xb5, 0x9a, 0x68, 0x03, 0xaa, 0x49, 0x86, 0x61, 0xa9, 0x8b, 0x5a, 0x25, 0xc1, 0xb1, 0xd5,
0x1f, 0x0d, 0x74, 0x83, 0xb8, 0xfe, 0x31, 0x51, 0x12, 0xbd, 0x54, 0x25, 0xfb, 0xd9, 0x4d, 0x4d, 0xc4, 0x7f, 0x2b, 0xa0, 0x6a, 0xc4, 0x76, 0x8f, 0x89, 0x94, 0xe8, 0x4b, 0x55, 0x72, 0x94, 0xdd,
0x96, 0x5d, 0xfe, 0x02, 0xd9, 0x15, 0xd4, 0xd9, 0x3d, 0x81, 0xc5, 0x47, 0x26, 0xb7, 0x3a, 0x75, 0xc2, 0x7c, 0xd9, 0x65, 0x2f, 0x90, 0x5d, 0x4e, 0x9e, 0xdd, 0x53, 0x58, 0x79, 0xa4, 0x73, 0xa3,
0x77, 0xf2, 0x15, 0xb8, 0x05, 0x90, 0xe0, 0x45, 0x4d, 0xa1, 0x68, 0xa4, 0x66, 0xf0, 0xdf, 0x53, 0xdf, 0xb4, 0xe7, 0x5f, 0x81, 0x9b, 0x00, 0x11, 0x5e, 0xd0, 0x14, 0x8a, 0x5a, 0x6c, 0x06, 0xff,
0x80, 0xc2, 0x43, 0x7e, 0x40, 0xda, 0x2e, 0xf1, 0xf8, 0xab, 0x3f, 0x38, 0x99, 0x7b, 0xa1, 0x30, 0xb5, 0x00, 0xc8, 0x3f, 0xe4, 0x6d, 0xd2, 0xb3, 0x89, 0xc3, 0xdf, 0xfc, 0xc1, 0x49, 0xdd, 0x0b,
0x78, 0x2f, 0xdc, 0x02, 0x60, 0x11, 0xbb, 0x30, 0x85, 0x69, 0x71, 0xb0, 0x52, 0x33, 0x48, 0x87, 0xb9, 0xf1, 0x7b, 0xe1, 0x26, 0x00, 0x0b, 0xd8, 0xf9, 0x29, 0xe4, 0xc5, 0xc1, 0x8a, 0xcd, 0x20,
0xeb, 0xdf, 0x52, 0xe2, 0xd8, 0xe1, 0xdf, 0x19, 0xf1, 0x37, 0x19, 0xa3, 0x2f, 0x00, 0x39, 0x26, 0x15, 0xae, 0x3e, 0xa1, 0xc4, 0x32, 0xfd, 0xbf, 0x05, 0xf1, 0x37, 0x1a, 0xa3, 0x07, 0x80, 0x2c,
0xe3, 0xcd, 0xd8, 0xbc, 0x19, 0x5d, 0x30, 0xd7, 0x44, 0x56, 0x2b, 0x72, 0x56, 0xa1, 0x5a, 0xad, 0x9d, 0xf1, 0x4e, 0x68, 0xde, 0x09, 0x2e, 0x98, 0x2b, 0x22, 0xab, 0x54, 0x6f, 0xf4, 0xd5, 0x6a,
0xc5, 0x65, 0x90, 0x5b, 0xb3, 0x51, 0x0d, 0x63, 0xa4, 0x7f, 0xe1, 0x7f, 0x35, 0x58, 0x88, 0x9b, 0x2d, 0x2c, 0x83, 0x68, 0x8c, 0x2d, 0xe7, 0x89, 0xab, 0x55, 0x7d, 0xf7, 0xf8, 0x2c, 0xfe, 0x47,
0xce, 0x95, 0x55, 0x6e, 0x8c, 0x96, 0x33, 0x49, 0xed, 0xf0, 0x2f, 0x1a, 0x2c, 0xed, 0xf8, 0x6e, 0x81, 0xe5, 0xb0, 0xdf, 0x5c, 0x5a, 0xd1, 0x66, 0xe8, 0x36, 0xf3, 0x94, 0x0d, 0xff, 0xac, 0xc0,
0xd7, 0xf7, 0x06, 0x2a, 0x72, 0xb9, 0x13, 0xf6, 0x71, 0xe4, 0x44, 0xfa, 0x42, 0xf9, 0xce, 0x10, 0xea, 0xae, 0x6b, 0x0f, 0x5c, 0x67, 0x94, 0xf6, 0x9c, 0xf7, 0xd4, 0xa7, 0x81, 0x13, 0x19, 0x69,
0xa1, 0x9c, 0x05, 0x8d, 0xbd, 0xd6, 0x4e, 0xa1, 0x2c, 0x5f, 0x5d, 0xa8, 0x04, 0xd7, 0xf7, 0x7c, 0xe4, 0xdb, 0x13, 0x34, 0x72, 0x1a, 0x34, 0xf4, 0xda, 0x3c, 0x85, 0x72, 0xf2, 0xd6, 0x42, 0x25,
0x7e, 0xff, 0x7b, 0xca, 0x78, 0x35, 0x87, 0xca, 0x00, 0x7b, 0x3e, 0xdf, 0x0f, 0x08, 0x23, 0x1e, 0xb8, 0xba, 0xef, 0xf2, 0x7b, 0xdf, 0x51, 0xc6, 0xab, 0x19, 0x54, 0x06, 0xd8, 0x77, 0xf9, 0x81,
0xaf, 0x6a, 0x08, 0x60, 0xe6, 0x33, 0xaf, 0x4e, 0xd9, 0xd3, 0xea, 0x14, 0x9a, 0x8b, 0x15, 0x89, 0x47, 0x18, 0x71, 0x78, 0x55, 0x41, 0x00, 0x85, 0x2f, 0x9c, 0x26, 0x65, 0xcf, 0xaa, 0x0b, 0xe8,
0xe9, 0x34, 0xbc, 0x07, 0xc4, 0xf5, 0x83, 0x93, 0x6a, 0x3e, 0x74, 0x4f, 0x46, 0x05, 0x54, 0x85, 0x46, 0x28, 0x46, 0x74, 0xab, 0xe5, 0xdc, 0x27, 0xb6, 0xeb, 0x9d, 0x54, 0xb3, 0xbe, 0x7b, 0x34,
0x52, 0x62, 0xb2, 0xbb, 0xff, 0xb0, 0x3a, 0x8d, 0x8a, 0x30, 0x1d, 0x7d, 0xce, 0x6c, 0x3e, 0x2f, 0xca, 0xa1, 0x2a, 0x94, 0x22, 0x93, 0xbd, 0x83, 0x87, 0xd5, 0x3c, 0x2a, 0x42, 0x3e, 0xf8, 0x2c,
0x42, 0x49, 0xf4, 0x9a, 0x83, 0xe8, 0x3d, 0x83, 0x2c, 0x28, 0xa5, 0xdf, 0x11, 0xe8, 0xae, 0xea, 0x34, 0x5e, 0x14, 0xa1, 0x24, 0xda, 0x4c, 0x3b, 0x78, 0xca, 0x20, 0x03, 0x4a, 0xf1, 0x27, 0x04,
0xa6, 0x55, 0xbc, 0x75, 0xf4, 0x95, 0xd1, 0x86, 0x51, 0x91, 0x71, 0x0e, 0x3d, 0x81, 0x8a, 0x2c, 0xba, 0x23, 0xbb, 0x64, 0x25, 0xcf, 0x1c, 0x75, 0x7d, 0xba, 0x61, 0x50, 0x64, 0x9c, 0x41, 0x4f,
0xde, 0x19, 0x5a, 0x55, 0xb9, 0x2b, 0x1f, 0x10, 0xfa, 0xda, 0x38, 0xa6, 0x09, 0x56, 0x1b, 0xca, 0xa1, 0x92, 0xd4, 0xed, 0x0c, 0x6d, 0xc8, 0xdc, 0xa5, 0x6f, 0x07, 0x75, 0x73, 0x16, 0xd3, 0x08,
0x92, 0x4a, 0x64, 0x68, 0x65, 0x98, 0x7f, 0xf6, 0x9e, 0xd5, 0x57, 0xc7, 0xb0, 0x4c, 0x80, 0xbe, 0xab, 0x07, 0xe5, 0x84, 0x40, 0x64, 0x68, 0x7d, 0x92, 0x7f, 0xfa, 0x8a, 0x55, 0x37, 0x66, 0xb0,
0x84, 0xb2, 0x24, 0x2b, 0x86, 0x00, 0xa9, 0xa4, 0x87, 0x7e, 0xde, 0x3e, 0xc3, 0x39, 0xd4, 0x84, 0x8c, 0x80, 0xbe, 0x84, 0x72, 0x42, 0x51, 0x4c, 0x00, 0x92, 0xa9, 0x0e, 0xf5, 0xbc, 0x7d, 0x86,
0x9b, 0x59, 0x29, 0xc0, 0xd0, 0xba, 0xba, 0xe0, 0x4a, 0xc5, 0x30, 0x0a, 0xe0, 0x28, 0xe2, 0x7e, 0x33, 0xa8, 0x03, 0xd7, 0xd3, 0x2a, 0x80, 0xa1, 0x2d, 0x79, 0xc1, 0xa5, 0x62, 0x61, 0x1a, 0xc0,
0x56, 0x40, 0xf5, 0x7a, 0x28, 0xdf, 0x3e, 0xa3, 0x62, 0x7f, 0x93, 0x90, 0x4f, 0x85, 0x7f, 0xef, 0x51, 0xc0, 0xfd, 0xac, 0x80, 0xf2, 0xf5, 0x90, 0x3e, 0x7b, 0xa6, 0xc5, 0xfe, 0x26, 0x22, 0x1f,
0x1c, 0xf2, 0x17, 0x46, 0x68, 0x01, 0x1a, 0xd4, 0x1f, 0x48, 0x57, 0x3a, 0xdd, 0x77, 0xbb, 0xfc, 0x0b, 0xff, 0xc1, 0x39, 0xe4, 0x2f, 0x8c, 0xd0, 0x05, 0x34, 0x2e, 0x3d, 0x90, 0x2a, 0x75, 0xba,
0x44, 0xaf, 0xa9, 0xe0, 0x87, 0x6b, 0x18, 0x9c, 0x43, 0x8f, 0x00, 0xed, 0x12, 0x7e, 0x48, 0x5d, 0x67, 0x0f, 0xf8, 0x89, 0x5a, 0x93, 0xc1, 0x4f, 0x96, 0x2f, 0x38, 0x83, 0x1e, 0x01, 0xda, 0x23,
0x72, 0x48, 0xad, 0xa7, 0xe3, 0x60, 0x64, 0x34, 0x6a, 0x3c, 0x38, 0xe0, 0x01, 0xf5, 0xda, 0xd2, 0xfc, 0x90, 0xda, 0xe4, 0x90, 0x1a, 0xcf, 0x66, 0xc1, 0x48, 0xb5, 0xe0, 0x70, 0xd0, 0xe6, 0x1e,
0xb6, 0x99, 0xdf, 0x25, 0xa2, 0x25, 0x50, 0xc6, 0xa9, 0xc5, 0x5e, 0x60, 0x68, 0x5f, 0x70, 0xce, 0x75, 0x7a, 0x89, 0x6d, 0xb3, 0xb4, 0x47, 0x44, 0x4b, 0xa0, 0x8c, 0x53, 0x83, 0xbd, 0xc2, 0xd0,
0xbe, 0x58, 0xd6, 0xc6, 0xd1, 0xce, 0x71, 0xe1, 0xd7, 0xc7, 0xb2, 0x4d, 0x00, 0x8f, 0x04, 0x60, 0xae, 0xe0, 0x9c, 0x7e, 0xac, 0x6c, 0xce, 0x22, 0x9b, 0xc3, 0xc2, 0x6f, 0xcd, 0x64, 0x1b, 0x01,
0xa6, 0xcf, 0x9d, 0x9b, 0xc9, 0x98, 0xbd, 0x12, 0xe7, 0x36, 0xff, 0x9b, 0x86, 0xa2, 0x58, 0x1b, 0x1e, 0x09, 0xc0, 0x54, 0x9f, 0x3b, 0x37, 0x93, 0x19, 0x7b, 0x25, 0xce, 0x34, 0xfe, 0xcd, 0x43,
0xd1, 0x96, 0x5e, 0xda, 0x72, 0x1c, 0x42, 0x25, 0x5e, 0x8e, 0x17, 0xb9, 0x12, 0xcd, 0x0b, 0x17, 0x51, 0xac, 0x8d, 0x68, 0x4b, 0xaf, 0x6d, 0x39, 0x0e, 0xa1, 0x12, 0x2e, 0xc7, 0xab, 0x5c, 0x89,
0x46, 0x59, 0xf9, 0x21, 0xd7, 0x16, 0xce, 0xa1, 0xc7, 0x50, 0xc9, 0xc8, 0x5b, 0x75, 0x7f, 0x18, 0xce, 0x85, 0x0b, 0x23, 0xad, 0xfc, 0x84, 0x6b, 0x0b, 0x67, 0xd0, 0x63, 0xa8, 0xa4, 0x94, 0xad,
0xa2, 0x81, 0x47, 0x9d, 0x30, 0x0b, 0xd0, 0xa0, 0x2e, 0x45, 0x35, 0xf5, 0x21, 0x1e, 0xa6, 0x5f, 0xbc, 0x3f, 0x4c, 0x90, 0xbf, 0xd3, 0x4e, 0x98, 0x01, 0x68, 0x5c, 0x92, 0xa2, 0x9a, 0xfc, 0x10,
0x47, 0x81, 0x7c, 0x0d, 0x95, 0x8c, 0x3e, 0x54, 0xef, 0x55, 0xb5, 0x88, 0x1c, 0x15, 0xfd, 0x21, 0x4f, 0x92, 0xae, 0xd3, 0x40, 0xbe, 0x86, 0x4a, 0x4a, 0x1a, 0xca, 0xf7, 0xaa, 0x5c, 0x3f, 0x4e,
0x94, 0x52, 0x82, 0x90, 0xa1, 0x3b, 0xc3, 0x1a, 0x9c, 0x2c, 0x7c, 0x46, 0x85, 0xfd, 0x0a, 0x2a, 0x8b, 0xfe, 0x10, 0x4a, 0x31, 0x2d, 0xc8, 0xd0, 0xed, 0x49, 0x0d, 0x2e, 0x29, 0x7c, 0xa6, 0x85,
0xb2, 0x60, 0x1a, 0x72, 0x95, 0x29, 0x55, 0xd5, 0x88, 0xe0, 0xdb, 0x5b, 0x47, 0x9f, 0xb4, 0x29, 0xfd, 0x0a, 0x2a, 0x49, 0xc1, 0x34, 0xe1, 0x2a, 0x93, 0xaa, 0xaa, 0x29, 0xc1, 0x77, 0xb6, 0x8f,
0xef, 0xf4, 0x5a, 0xe1, 0x9f, 0x8d, 0x53, 0xea, 0x38, 0xf4, 0x94, 0x13, 0xab, 0xb3, 0x11, 0x79, 0x3e, 0xeb, 0x51, 0xde, 0x1f, 0x76, 0xfd, 0x3f, 0xf5, 0x53, 0x6a, 0x59, 0xf4, 0x94, 0x13, 0xa3,
0xbd, 0x6f, 0x53, 0xc6, 0x03, 0xda, 0xea, 0x71, 0x62, 0x6f, 0xf4, 0x8f, 0xce, 0x86, 0x08, 0xb5, 0x5f, 0x0f, 0xbc, 0x3e, 0x34, 0x29, 0xe3, 0x1e, 0xed, 0x0e, 0x39, 0x31, 0xeb, 0xa3, 0xa3, 0x53,
0x21, 0x50, 0xbb, 0xad, 0xd6, 0x8c, 0x18, 0xde, 0xfb, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x3a, 0x16, 0x17, 0xa1, 0xea, 0x02, 0x75, 0xd0, 0xed, 0x16, 0xc4, 0xf0, 0xee, 0x7f, 0x01, 0x00, 0x00, 0xff,
0x3b, 0xb4, 0x0b, 0x15, 0x00, 0x00, 0xff, 0x69, 0xea, 0xe4, 0xad, 0x06, 0x15, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
......
...@@ -80,7 +80,6 @@ func (node *NodeImpl) DropCollection(request *milvuspb.DropCollectionRequest) (* ...@@ -80,7 +80,6 @@ func (node *NodeImpl) DropCollection(request *milvuspb.DropCollectionRequest) (*
Condition: NewTaskCondition(ctx), Condition: NewTaskCondition(ctx),
DropCollectionRequest: request, DropCollectionRequest: request,
masterClient: node.masterClient, masterClient: node.masterClient,
dataServiceClient: node.dataServiceClient,
} }
var cancel func() var cancel func()
dct.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval) dct.ctx, cancel = context.WithTimeout(ctx, reqTimeoutInterval)
......
...@@ -47,6 +47,7 @@ type QueryServiceClient interface { ...@@ -47,6 +47,7 @@ type QueryServiceClient interface {
type DataServiceClient interface { type DataServiceClient interface {
AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error)
GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
Flush(req *datapb.FlushRequest) (*commonpb.Status, error)
GetComponentStates() (*internalpb2.ComponentStates, error) GetComponentStates() (*internalpb2.ComponentStates, error)
} }
......
...@@ -364,10 +364,9 @@ func (cct *CreateCollectionTask) PostExecute() error { ...@@ -364,10 +364,9 @@ func (cct *CreateCollectionTask) PostExecute() error {
type DropCollectionTask struct { type DropCollectionTask struct {
Condition Condition
*milvuspb.DropCollectionRequest *milvuspb.DropCollectionRequest
masterClient MasterClient masterClient MasterClient
dataServiceClient DataServiceClient result *commonpb.Status
result *commonpb.Status ctx context.Context
ctx context.Context
} }
func (dct *DropCollectionTask) OnEnqueue() error { func (dct *DropCollectionTask) OnEnqueue() error {
...@@ -1376,3 +1375,70 @@ func (dipt *GetIndexStateTask) Execute() error { ...@@ -1376,3 +1375,70 @@ func (dipt *GetIndexStateTask) Execute() error {
func (dipt *GetIndexStateTask) PostExecute() error { func (dipt *GetIndexStateTask) PostExecute() error {
return nil return nil
} }
type FlushTask struct {
Condition
*milvuspb.FlushRequest
dataServiceClient DataServiceClient
result *commonpb.Status
ctx context.Context
}
func (ft *FlushTask) OnEnqueue() error {
ft.Base = &commonpb.MsgBase{}
return nil
}
func (ft *FlushTask) ID() UniqueID {
return ft.Base.MsgID
}
func (ft *FlushTask) SetID(uid UniqueID) {
ft.Base.MsgID = uid
}
func (ft *FlushTask) Type() commonpb.MsgType {
return ft.Base.MsgType
}
func (ft *FlushTask) BeginTs() Timestamp {
return ft.Base.Timestamp
}
func (ft *FlushTask) EndTs() Timestamp {
return ft.Base.Timestamp
}
func (ft *FlushTask) SetTs(ts Timestamp) {
ft.Base.Timestamp = ts
}
func (ft *FlushTask) PreExecute() error {
ft.Base.MsgType = commonpb.MsgType_kFlush
ft.Base.SourceID = Params.ProxyID
return nil
}
func (ft *FlushTask) Execute() error {
var err error
collID, err := globalMetaCache.GetCollectionID(ft.CollectionName)
if err != nil {
return err
}
flushReq := &datapb.FlushRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kFlush,
MsgID: ft.Base.MsgID,
Timestamp: ft.Base.Timestamp,
SourceID: ft.Base.SourceID,
},
DbID: 0,
CollectionID: collID,
}
ft.result, err = ft.dataServiceClient.Flush(flushReq)
return err
}
func (ft *FlushTask) PostExecute() error {
return nil
}
...@@ -67,7 +67,7 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { ...@@ -67,7 +67,7 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
} }
} }
atomic.StoreInt64(&(ttBarrier.lastTt), int64(ts)) atomic.StoreInt64(&(ttBarrier.lastTt), int64(ts))
log.Println("current tick: ", ts) // log.Println("current tick: ", ts)
return ts, ttBarrier.ctx.Err() return ts, ttBarrier.ctx.Err()
} }
} }
...@@ -95,8 +95,8 @@ func (ttBarrier *softTimeTickBarrier) Start() error { ...@@ -95,8 +95,8 @@ func (ttBarrier *softTimeTickBarrier) Start() error {
log.Printf("[softTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.Base.SourceID) log.Printf("[softTimeTickBarrier] Warning: peerID %d not exist\n", ttmsg.Base.SourceID)
continue continue
} }
log.Println("ttmsg.Base.Timestamp: ", ttmsg.Base.Timestamp) // log.Println("ttmsg.Base.Timestamp: ", ttmsg.Base.Timestamp)
log.Println("oldT: ", oldT) // log.Println("oldT: ", oldT)
if ttmsg.Base.Timestamp > oldT { if ttmsg.Base.Timestamp > oldT {
ttBarrier.peer2LastTt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp ttBarrier.peer2LastTt[ttmsg.Base.SourceID] = ttmsg.Base.Timestamp
......
...@@ -269,19 +269,22 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm ...@@ -269,19 +269,22 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason) log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason)
} }
segmentIDs := showSegmentResponse.SegmentIDs segmentIDs := showSegmentResponse.SegmentIDs
segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
channel2id := make(map[string]int) channel2id := make(map[string]int)
//id2channels := make(map[int][]string) //id2channels := make(map[int][]string)
id2segs := make(map[int][]UniqueID) id2segs := make(map[int][]UniqueID)
offset := 0 offset := 0
for _, segmentID := range segmentIDs { resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{
state, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{ SegmentIDs: segmentIDs,
SegmentID: segmentID, })
})
if err != nil { if err != nil {
log.Fatal("get segment states fail") log.Fatal("get segment states fail")
} }
for _, state := range resp.States {
segmentID := state.SegmentID
segmentStates[segmentID] = state segmentStates[segmentID] = state
var flatChannelName string var flatChannelName string
channelNames := make([]string, 0) channelNames := make([]string, 0)
......
...@@ -91,7 +91,7 @@ func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvus ...@@ -91,7 +91,7 @@ func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvus
type dataMock struct { type dataMock struct {
segmentIDs []UniqueID segmentIDs []UniqueID
segmentStates map[UniqueID]*datapb.SegmentStatesResponse segmentStates map[UniqueID]*datapb.SegmentStateInfo
} }
func newDataMock() *dataMock { func newDataMock() *dataMock {
...@@ -110,23 +110,24 @@ func newDataMock() *dataMock { ...@@ -110,23 +110,24 @@ func newDataMock() *dataMock {
segmentIDs = append(segmentIDs, 5) segmentIDs = append(segmentIDs, 5)
segmentIDs = append(segmentIDs, 6) segmentIDs = append(segmentIDs, 6)
fillStates := func(time uint64, position []*internalpb2.MsgPosition, state datapb.SegmentState) *datapb.SegmentStatesResponse { fillStates := func(segmentID UniqueID, time uint64, position []*internalpb2.MsgPosition, state datapb.SegmentState) *datapb.SegmentStateInfo {
return &datapb.SegmentStatesResponse{ return &datapb.SegmentStateInfo{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS, ErrorCode: commonpb.ErrorCode_SUCCESS,
}, },
SegmentID: segmentID,
State: state, State: state,
CreateTime: time, CreateTime: time,
StartPositions: position, StartPositions: position,
} }
} }
segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
segmentStates[1] = fillStates(1, positions1, datapb.SegmentState_SegmentFlushed) segmentStates[1] = fillStates(1, 1, positions1, datapb.SegmentState_SegmentFlushed)
segmentStates[2] = fillStates(2, positions2, datapb.SegmentState_SegmentFlushed) segmentStates[2] = fillStates(2, 2, positions2, datapb.SegmentState_SegmentFlushed)
segmentStates[3] = fillStates(3, positions1, datapb.SegmentState_SegmentFlushed) segmentStates[3] = fillStates(3, 3, positions1, datapb.SegmentState_SegmentFlushed)
segmentStates[4] = fillStates(4, positions2, datapb.SegmentState_SegmentFlushed) segmentStates[4] = fillStates(4, 4, positions2, datapb.SegmentState_SegmentFlushed)
segmentStates[5] = fillStates(5, positions1, datapb.SegmentState_SegmentGrowing) segmentStates[5] = fillStates(5, 5, positions1, datapb.SegmentState_SegmentGrowing)
segmentStates[6] = fillStates(6, positions2, datapb.SegmentState_SegmentGrowing) segmentStates[6] = fillStates(6, 6, positions2, datapb.SegmentState_SegmentGrowing)
return &dataMock{ return &dataMock{
segmentIDs: segmentIDs, segmentIDs: segmentIDs,
...@@ -135,13 +136,24 @@ func newDataMock() *dataMock { ...@@ -135,13 +136,24 @@ func newDataMock() *dataMock {
} }
func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
segmentID := req.SegmentID ret := &datapb.SegmentStatesResponse{
for _, id := range data.segmentIDs { Status: &commonpb.Status{
if segmentID == id { ErrorCode: commonpb.ErrorCode_SUCCESS,
return data.segmentStates[id], nil },
}
for _, segID := range req.SegmentIDs {
for _, segmentID := range data.segmentIDs {
if segmentID == segID {
ret.States = append(ret.States, data.segmentStates[segmentID])
}
} }
} }
return nil, errors.New("segment id not found")
if ret.States == nil {
return nil, errors.New("segment id not found")
}
return ret, nil
} }
func TestQueryService_Init(t *testing.T) { func TestQueryService_Init(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册