未验证 提交 3a4df03b 编写于 作者: B bigsheeper 提交者: GitHub

Add SealedSegmentsChangeInfo and changeInfoMsg (#9479)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 1ec5d849
......@@ -284,7 +284,7 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"ssued\020\001\022\016\n\nInProgress\020\002\022\014\n\010Finished\020\003\022\n\n"
"\006Failed\020\004*f\n\014SegmentState\022\024\n\020SegmentStat"
"eNone\020\000\022\014\n\010NotExist\020\001\022\013\n\007Growing\020\002\022\n\n\006Se"
"aled\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005*\264\010\n\007Ms"
"aled\020\003\022\013\n\007Flushed\020\004\022\014\n\010Flushing\020\005*\323\010\n\007Ms"
"gType\022\r\n\tUndefined\020\000\022\024\n\020CreateCollection"
"\020d\022\022\n\016DropCollection\020e\022\021\n\rHasCollection\020"
"f\022\026\n\022DescribeCollection\020g\022\023\n\017ShowCollect"
......@@ -307,14 +307,15 @@ const char descriptor_table_protodef_common_2eproto[] PROTOBUF_SECTION_VARIABLE(
"eve\020\372\003\022\023\n\016RetrieveResult\020\373\003\022\024\n\017WatchDmCh"
"annels\020\374\003\022\025\n\020RemoveDmChannels\020\375\003\022\027\n\022Watc"
"hQueryChannels\020\376\003\022\030\n\023RemoveQueryChannels"
"\020\377\003\022\020\n\013SegmentInfo\020\330\004\022\r\n\010TimeTick\020\260\t\022\023\n\016"
"QueryNodeStats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tReq"
"uestID\020\263\t\022\017\n\nRequestTSO\020\264\t\022\024\n\017AllocateSe"
"gment\020\265\t\022\026\n\021SegmentStatistics\020\266\t\022\025\n\020Segm"
"entFlushDone\020\267\t\022\017\n\nDataNodeTt\020\270\t*\"\n\007DslT"
"ype\022\007\n\003Dsl\020\000\022\016\n\nBoolExprV1\020\001B5Z3github.c"
"om/milvus-io/milvus/internal/proto/commo"
"npbb\006proto3"
"\020\377\003\022\035\n\030SealedSegmentsChangeInfo\020\200\004\022\020\n\013Se"
"gmentInfo\020\330\004\022\r\n\010TimeTick\020\260\t\022\023\n\016QueryNode"
"Stats\020\261\t\022\016\n\tLoadIndex\020\262\t\022\016\n\tRequestID\020\263\t"
"\022\017\n\nRequestTSO\020\264\t\022\024\n\017AllocateSegment\020\265\t\022"
"\026\n\021SegmentStatistics\020\266\t\022\025\n\020SegmentFlushD"
"one\020\267\t\022\017\n\nDataNodeTt\020\270\t*\"\n\007DslType\022\007\n\003Ds"
"l\020\000\022\016\n\nBoolExprV1\020\001B5Z3github.com/milvus"
"-io/milvus/internal/proto/commonpbb\006prot"
"o3"
;
static const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable*const descriptor_table_common_2eproto_deps[1] = {
};
......@@ -331,7 +332,7 @@ static ::PROTOBUF_NAMESPACE_ID::internal::SCCInfoBase*const descriptor_table_com
static ::PROTOBUF_NAMESPACE_ID::internal::once_flag descriptor_table_common_2eproto_once;
static bool descriptor_table_common_2eproto_initialized = false;
const ::PROTOBUF_NAMESPACE_ID::internal::DescriptorTable descriptor_table_common_2eproto = {
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2451,
&descriptor_table_common_2eproto_initialized, descriptor_table_protodef_common_2eproto, "common.proto", 2482,
&descriptor_table_common_2eproto_once, descriptor_table_common_2eproto_sccs, descriptor_table_common_2eproto_deps, 8, 0,
schemas, file_default_instances, TableStruct_common_2eproto::offsets,
file_level_metadata_common_2eproto, 8, file_level_enum_descriptors_common_2eproto, file_level_service_descriptors_common_2eproto,
......@@ -465,6 +466,7 @@ bool MsgType_IsValid(int value) {
case 509:
case 510:
case 511:
case 512:
case 600:
case 1200:
case 1201:
......
......@@ -250,6 +250,7 @@ enum MsgType : int {
RemoveDmChannels = 509,
WatchQueryChannels = 510,
RemoveQueryChannels = 511,
SealedSegmentsChangeInfo = 512,
SegmentInfo = 600,
TimeTick = 1200,
QueryNodeStats = 1201,
......
......@@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
// MsgType is an alias ofo commonpb.MsgType
......@@ -981,6 +982,63 @@ func (l *LoadBalanceSegmentsMsg) Unmarshal(input MarshalType) (TsMsg, error) {
return loadMsg, nil
}
/////////////////////////////////////////SealedSegmentsChangeInfoMsg//////////////////////////////////////////
// SealedSegmentsChangeInfoMsg is a message pack that contains sealed segments change info
type SealedSegmentsChangeInfoMsg struct {
BaseMsg
querypb.SealedSegmentsChangeInfo
}
// interface implementation validation
var _ TsMsg = &SealedSegmentsChangeInfoMsg{}
// ID returns the ID of this message pack
func (s *SealedSegmentsChangeInfoMsg) ID() UniqueID {
return s.Base.MsgID
}
// Type returns the type of this message pack
func (s *SealedSegmentsChangeInfoMsg) Type() MsgType {
return s.Base.MsgType
}
// SourceID indicated which component generated this message
func (s *SealedSegmentsChangeInfoMsg) SourceID() int64 {
return s.Base.SourceID
}
// Marshal is used to serializing a message pack to byte array
func (s *SealedSegmentsChangeInfoMsg) Marshal(input TsMsg) (MarshalType, error) {
changeInfoMsg := input.(*SealedSegmentsChangeInfoMsg)
changeInfo := &changeInfoMsg.SealedSegmentsChangeInfo
mb, err := proto.Marshal(changeInfo)
if err != nil {
return nil, err
}
return mb, nil
}
// Unmarshal is used to deserializing a message pack from byte array
func (s *SealedSegmentsChangeInfoMsg) Unmarshal(input MarshalType) (TsMsg, error) {
changeInfo := querypb.SealedSegmentsChangeInfo{}
in, err := convertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, &changeInfo)
if err != nil {
return nil, err
}
changeInfoMsg := &SealedSegmentsChangeInfoMsg{SealedSegmentsChangeInfo: changeInfo}
changeInfoMsg.BeginTimestamp = changeInfo.Base.Timestamp
changeInfoMsg.EndTimestamp = changeInfo.Base.Timestamp
return changeInfoMsg, nil
}
/////////////////////////////////////////DataNodeTtMsg//////////////////////////////////////////
// DataNodeTtMsg is a message pack that contains datanode time tick
type DataNodeTtMsg struct {
BaseMsg
......
......@@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
......@@ -828,3 +829,50 @@ func TestDataNodeTtMsg_Unmarshal_IllegalParameter(t *testing.T) {
assert.NotNil(t, err)
assert.Nil(t, tsMsg)
}
func TestSealedSegmentsChangeInfoMsg(t *testing.T) {
changeInfoMsg := &SealedSegmentsChangeInfoMsg{
BaseMsg: generateBaseMsg(),
SealedSegmentsChangeInfo: querypb.SealedSegmentsChangeInfo{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SealedSegmentsChangeInfo,
MsgID: 1,
Timestamp: 2,
SourceID: 3,
},
OnlineNodeID: int64(1),
OnlineSegmentIDs: []int64{1, 2, 3},
OfflineNodeID: int64(2),
OfflineSegmentIDs: []int64{4, 5, 6},
},
}
assert.NotNil(t, changeInfoMsg.TraceCtx())
ctx := context.Background()
changeInfoMsg.SetTraceCtx(ctx)
assert.Equal(t, ctx, changeInfoMsg.TraceCtx())
assert.Equal(t, int64(1), changeInfoMsg.ID())
assert.Equal(t, commonpb.MsgType_SealedSegmentsChangeInfo, changeInfoMsg.Type())
assert.Equal(t, int64(3), changeInfoMsg.SourceID())
bytes, err := changeInfoMsg.Marshal(changeInfoMsg)
assert.Nil(t, err)
tsMsg, err := changeInfoMsg.Unmarshal(bytes)
assert.Nil(t, err)
changeInfoMsg2, ok := tsMsg.(*SealedSegmentsChangeInfoMsg)
assert.True(t, ok)
assert.Equal(t, int64(1), changeInfoMsg2.ID())
assert.Equal(t, commonpb.MsgType_SealedSegmentsChangeInfo, changeInfoMsg2.Type())
assert.Equal(t, int64(3), changeInfoMsg2.SourceID())
}
func TestSealedSegmentsChangeInfoMsg_Unmarshal_IllegalParameter(t *testing.T) {
changeInfoMsg := &SealedSegmentsChangeInfoMsg{}
tsMsg, err := changeInfoMsg.Unmarshal(10)
assert.NotNil(t, err)
assert.Nil(t, tsMsg)
}
......@@ -64,6 +64,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
segmentStatisticsMsg := SegmentStatisticsMsg{}
loadBalanceSegmentsMsg := LoadBalanceSegmentsMsg{}
dataNodeTtMsg := DataNodeTtMsg{}
sealedSegmentsChangeInfoMsg := SealedSegmentsChangeInfoMsg{}
p := &ProtoUnmarshalDispatcher{}
p.TempMap = make(map[commonpb.MsgType]UnmarshalFunc)
......@@ -82,6 +83,7 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_SegmentStatistics] = segmentStatisticsMsg.Unmarshal
p.TempMap[commonpb.MsgType_LoadBalanceSegments] = loadBalanceSegmentsMsg.Unmarshal
p.TempMap[commonpb.MsgType_DataNodeTt] = dataNodeTtMsg.Unmarshal
p.TempMap[commonpb.MsgType_SealedSegmentsChangeInfo] = sealedSegmentsChangeInfoMsg.Unmarshal
return p
}
......
......@@ -133,6 +133,7 @@ enum MsgType {
RemoveDmChannels = 509;
WatchQueryChannels = 510;
RemoveQueryChannels = 511;
SealedSegmentsChangeInfo = 512;
/* DATA SERVICE */
SegmentInfo = 600;
......
......@@ -303,3 +303,12 @@ message LoadBalanceRequest {
repeated int64 source_nodeIDs = 2;
TriggerCondition balance_reason = 3;
}
//---------------- common query proto -----------------
message SealedSegmentsChangeInfo {
common.MsgBase base = 1;
int64 online_nodeID = 2;
repeated int64 online_segmentIDs = 3;
int64 offline_nodeID = 4;
repeated int64 offline_segmentIDs = 5;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册