未验证 提交 42aafdc4 编写于 作者: X Xiangyu Wang 提交者: GitHub

Remove unused msg from msg.go (#7612)

Signed-off-by: NXiangyu Wang <xiangyu.wang@zilliz.com>
上级 49b6c310
......@@ -679,38 +679,6 @@ func TestChannel(t *testing.T) {
err := statsStream.Produce(&msgPack)
assert.Nil(t, err)
})
t.Run("Test SegmentFlushChannel", func(t *testing.T) {
genMsg := func(msgType commonpb.MsgType, t Timestamp) *msgstream.FlushCompletedMsg {
return &msgstream.FlushCompletedMsg{
BaseMsg: msgstream.BaseMsg{
HashValues: []uint32{0},
},
SegmentFlushCompletedMsg: datapb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: msgType,
MsgID: 0,
Timestamp: t,
SourceID: 0,
},
Segment: &datapb.SegmentInfo{},
},
}
}
segInfoStream, _ := svr.msFactory.NewMsgStream(svr.ctx)
segInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
segInfoStream.Start()
defer segInfoStream.Close()
msgPack := msgstream.MsgPack{}
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 123))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentInfo, 234))
msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentFlushDone, 345))
err := segInfoStream.Produce(&msgPack)
assert.Nil(t, err)
time.Sleep(time.Second)
})
}
func TestSaveBinlogPaths(t *testing.T) {
......
......@@ -44,52 +44,6 @@ import (
"github.com/stretchr/testify/assert"
)
func GenSegInfoMsgPack(seg *datapb.SegmentInfo) *msgstream.MsgPack {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
segMsg := &msgstream.SegmentInfoMsg{
BaseMsg: baseMsg,
SegmentMsg: datapb.SegmentMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentInfo,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
Segment: seg,
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
return &msgPack
}
func GenFlushedSegMsgPack(segID typeutil.UniqueID) *msgstream.MsgPack {
msgPack := msgstream.MsgPack{}
baseMsg := msgstream.BaseMsg{
BeginTimestamp: 0,
EndTimestamp: 0,
HashValues: []uint32{0},
}
segMsg := &msgstream.FlushCompletedMsg{
BaseMsg: baseMsg,
SegmentFlushCompletedMsg: datapb.SegmentFlushCompletedMsg{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SegmentFlushDone,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
Segment: &datapb.SegmentInfo{ID: segID},
},
}
msgPack.Msgs = append(msgPack.Msgs, segMsg)
return &msgPack
}
type proxyMock struct {
types.Proxy
invalidateCollectionMetaCache func(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error)
......
......@@ -140,60 +140,6 @@ func (it *InsertMsg) Unmarshal(input MarshalType) (TsMsg, error) {
return insertMsg, nil
}
/////////////////////////////////////////FlushCompletedMsg//////////////////////////////////////////
// TODO(wxyu): Not needed, to remove
type FlushCompletedMsg struct {
BaseMsg
datapb.SegmentFlushCompletedMsg
}
func (fl *FlushCompletedMsg) TraceCtx() context.Context {
return fl.BaseMsg.Ctx
}
func (fl *FlushCompletedMsg) SetTraceCtx(ctx context.Context) {
fl.BaseMsg.Ctx = ctx
}
func (fl *FlushCompletedMsg) ID() UniqueID {
return fl.Base.MsgID
}
func (fl *FlushCompletedMsg) Type() MsgType {
return fl.Base.MsgType
}
func (fl *FlushCompletedMsg) SourceID() int64 {
return fl.Base.SourceID
}
func (fl *FlushCompletedMsg) Marshal(input TsMsg) (MarshalType, error) {
flushCompletedMsgTask := input.(*FlushCompletedMsg)
flushCompletedMsg := &flushCompletedMsgTask.SegmentFlushCompletedMsg
mb, err := proto.Marshal(flushCompletedMsg)
if err != nil {
return nil, err
}
return mb, nil
}
func (fl *FlushCompletedMsg) Unmarshal(input MarshalType) (TsMsg, error) {
flushCompletedMsg := datapb.SegmentFlushCompletedMsg{}
in, err := ConvertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, &flushCompletedMsg)
if err != nil {
return nil, err
}
flushCompletedMsgTask := &FlushCompletedMsg{SegmentFlushCompletedMsg: flushCompletedMsg}
flushCompletedMsgTask.BeginTimestamp = flushCompletedMsgTask.Base.Timestamp
flushCompletedMsgTask.EndTimestamp = flushCompletedMsgTask.Base.Timestamp
return flushCompletedMsgTask, nil
}
/////////////////////////////////////////Delete//////////////////////////////////////////
// TODO(wxyu): comment it until really needed
type DeleteMsg struct {
......@@ -857,6 +803,7 @@ func (dp *DropPartitionMsg) Unmarshal(input MarshalType) (TsMsg, error) {
}
/////////////////////////////////////////LoadIndex//////////////////////////////////////////
// TODO(wxyu): comment it until really needed
type LoadIndexMsg struct {
BaseMsg
internalpb.LoadIndex
......@@ -907,56 +854,6 @@ func (lim *LoadIndexMsg) Unmarshal(input MarshalType) (TsMsg, error) {
return loadIndexMsg, nil
}
/////////////////////////////////////////SegmentInfoMsg//////////////////////////////////////////
type SegmentInfoMsg struct {
BaseMsg
datapb.SegmentMsg
}
func (sim *SegmentInfoMsg) TraceCtx() context.Context {
return sim.BaseMsg.Ctx
}
func (sim *SegmentInfoMsg) SetTraceCtx(ctx context.Context) {
sim.BaseMsg.Ctx = ctx
}
func (sim *SegmentInfoMsg) ID() UniqueID {
return sim.Base.MsgID
}
func (sim *SegmentInfoMsg) Type() MsgType {
return sim.Base.MsgType
}
func (sim *SegmentInfoMsg) SourceID() int64 {
return sim.Base.SourceID
}
func (sim *SegmentInfoMsg) Marshal(input TsMsg) (MarshalType, error) {
segInfoMsg := input.(*SegmentInfoMsg)
mb, err := proto.Marshal(&segInfoMsg.SegmentMsg)
if err != nil {
return nil, err
}
return mb, nil
}
func (sim *SegmentInfoMsg) Unmarshal(input MarshalType) (TsMsg, error) {
segMsg := datapb.SegmentMsg{}
in, err := ConvertToByteArray(input)
if err != nil {
return nil, err
}
err = proto.Unmarshal(in, &segMsg)
if err != nil {
return nil, err
}
return &SegmentInfoMsg{
SegmentMsg: segMsg,
}, nil
}
/////////////////////////////////////////LoadBalanceSegments//////////////////////////////////////////
type LoadBalanceSegmentsMsg struct {
BaseMsg
......
......@@ -61,8 +61,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
createPartitionMsg := CreatePartitionMsg{}
dropPartitionMsg := DropPartitionMsg{}
loadIndexMsg := LoadIndexMsg{}
segmentInfoMsg := SegmentInfoMsg{}
flushCompletedMsg := FlushCompletedMsg{}
queryNodeSegStatsMsg := QueryNodeStatsMsg{}
segmentStatisticsMsg := SegmentStatisticsMsg{}
loadBalanceSegmentsMsg := LoadBalanceSegmentsMsg{}
......@@ -83,8 +81,6 @@ func (pudf *ProtoUDFactory) NewUnmarshalDispatcher() *ProtoUnmarshalDispatcher {
p.TempMap[commonpb.MsgType_CreatePartition] = createPartitionMsg.Unmarshal
p.TempMap[commonpb.MsgType_DropPartition] = dropPartitionMsg.Unmarshal
p.TempMap[commonpb.MsgType_LoadIndex] = loadIndexMsg.Unmarshal
p.TempMap[commonpb.MsgType_SegmentInfo] = segmentInfoMsg.Unmarshal
p.TempMap[commonpb.MsgType_SegmentFlushDone] = flushCompletedMsg.Unmarshal
p.TempMap[commonpb.MsgType_SegmentStatistics] = segmentStatisticsMsg.Unmarshal
p.TempMap[commonpb.MsgType_LoadBalanceSegments] = loadBalanceSegmentsMsg.Unmarshal
p.TempMap[commonpb.MsgType_DataNodeTt] = dataNodeTtMsg.Unmarshal
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册